From 174c64fc8c40331fee571bd4b568a72e0f67c610 Mon Sep 17 00:00:00 2001
From: dilipkumar <dilipbalaut@gmail.com>
Date: Fri, 5 Feb 2021 18:21:47 +0530
Subject: [PATCH 02/12] Built-in compression method

Add syntax allowing a compression method to be specified.
As of now there is only 2 option for build-in compression
method (pglz, lz4) which can be set while creating a table
or adding a new column.  No option for altering the
compression method for an existing column.

Dilip Kumar based on the patches from Ildus Kurbangaliev.
Design input from Robert Haas and Tomas Vondra.
Reviewed by Robert Haas, Tomas Vondra, Alexander Korotkov and Justin Pryzby

Discussions:
https://www.postgresql.org/message-id/20171213151818.75a20259@postgrespro.ru
https://www.postgresql.org/message-id/CA%2BTgmoaKDW1Oi9V%3Djc9hOGyf77NbkNEABuqgHD1Cq%3D%3D1QsOcxg%40mail.gmail.com
https://www.postgresql.org/message-id/CA%2BTgmobSDVgUage9qQ5P_%3DF_9jaMkCgyKxUQGtFQU7oN4kX-AA%40mail.gmail.com
https://www.postgresql.org/message-id/20201005160355.byp74sh3ejsv7wrj%40development
https://www.postgresql.org/message-id/CAFiTN-tzTTT2oqWdRGLv1dvvS5MC1W%2BLE%2B3bqWPJUZj4GnHOJg%40mail.gmail.com
---
 configure                                     | 116 ++++++++++
 configure.ac                                  |  17 ++
 doc/src/sgml/ddl.sgml                         |   3 +
 doc/src/sgml/ref/create_table.sgml            |  25 +++
 src/backend/access/Makefile                   |   2 +-
 src/backend/access/brin/brin_tuple.c          |   5 +-
 src/backend/access/common/detoast.c           | 103 ++++++---
 src/backend/access/common/indextuple.c        |   4 +-
 src/backend/access/common/toast_internals.c   |  63 +++---
 src/backend/access/common/tupdesc.c           |   8 +
 src/backend/access/compression/Makefile       |  17 ++
 src/backend/access/compression/compress_lz4.c | 164 ++++++++++++++
 .../access/compression/compress_pglz.c        | 138 ++++++++++++
 src/backend/access/table/toast_helper.c       |   5 +-
 src/backend/bootstrap/bootstrap.c             |   5 +
 src/backend/catalog/genbki.pl                 |   7 +-
 src/backend/catalog/heap.c                    |   4 +
 src/backend/catalog/index.c                   |   2 +
 src/backend/catalog/toasting.c                |   5 +
 src/backend/commands/amcmds.c                 |  10 +
 src/backend/commands/createas.c               |  14 ++
 src/backend/commands/matview.c                |  14 ++
 src/backend/commands/tablecmds.c              | 111 +++++++++-
 src/backend/executor/nodeModifyTable.c        | 122 +++++++++++
 src/backend/nodes/copyfuncs.c                 |   1 +
 src/backend/nodes/equalfuncs.c                |   1 +
 src/backend/nodes/nodeFuncs.c                 |   2 +
 src/backend/nodes/outfuncs.c                  |   1 +
 src/backend/parser/gram.y                     |  26 ++-
 src/backend/parser/parse_utilcmd.c            |   8 +
 src/backend/utils/adt/pseudotypes.c           |   1 +
 src/backend/utils/adt/varlena.c               |  42 ++++
 src/bin/pg_dump/pg_backup.h                   |   1 +
 src/bin/pg_dump/pg_dump.c                     |  36 +++-
 src/bin/pg_dump/pg_dump.h                     |   1 +
 src/bin/pg_dump/t/002_pg_dump.pl              |  12 +-
 src/bin/psql/describe.c                       |  28 ++-
 src/include/access/compressamapi.h            |  99 +++++++++
 src/include/access/detoast.h                  |   8 +
 src/include/access/toast_helper.h             |   1 +
 src/include/access/toast_internals.h          |  19 +-
 src/include/catalog/pg_am.dat                 |   6 +
 src/include/catalog/pg_am.h                   |   1 +
 src/include/catalog/pg_attribute.h            |   8 +-
 src/include/catalog/pg_proc.dat               |  20 ++
 src/include/catalog/pg_type.dat               |   5 +
 src/include/commands/defrem.h                 |   1 +
 src/include/executor/executor.h               |   4 +-
 src/include/nodes/execnodes.h                 |   6 +
 src/include/nodes/nodes.h                     |   1 +
 src/include/nodes/parsenodes.h                |   2 +
 src/include/parser/kwlist.h                   |   1 +
 src/include/pg_config.h.in                    |   3 +
 src/include/postgres.h                        |  12 +-
 src/test/regress/expected/compression.out     | 201 ++++++++++++++++++
 src/test/regress/expected/compression_1.out   | 189 ++++++++++++++++
 src/test/regress/expected/psql.out            |  68 +++---
 src/test/regress/parallel_schedule            |   2 +-
 src/test/regress/serial_schedule              |   1 +
 src/test/regress/sql/compression.sql          |  85 ++++++++
 src/tools/msvc/Solution.pm                    |   1 +
 src/tools/pgindent/typedefs.list              |   1 +
 62 files changed, 1746 insertions(+), 123 deletions(-)
 create mode 100644 src/backend/access/compression/Makefile
 create mode 100644 src/backend/access/compression/compress_lz4.c
 create mode 100644 src/backend/access/compression/compress_pglz.c
 create mode 100644 src/include/access/compressamapi.h
 create mode 100644 src/test/regress/expected/compression.out
 create mode 100644 src/test/regress/expected/compression_1.out
 create mode 100644 src/test/regress/sql/compression.sql

diff --git a/configure b/configure
index ce9ea36999..0895b2f326 100755
--- a/configure
+++ b/configure
@@ -699,6 +699,7 @@ with_gnu_ld
 LD
 LDFLAGS_SL
 LDFLAGS_EX
+with_lz4
 with_zlib
 with_system_tzdata
 with_libxslt
@@ -864,6 +865,7 @@ with_libxml
 with_libxslt
 with_system_tzdata
 with_zlib
+with_lz4
 with_gnu_ld
 with_ssl
 with_openssl
@@ -1569,6 +1571,7 @@ Optional Packages:
   --with-system-tzdata=DIR
                           use system time zone data in DIR
   --without-zlib          do not use Zlib
+  --with-lz4              build with LZ4 support
   --with-gnu-ld           assume the C compiler uses GNU ld [default=no]
   --with-ssl=LIB          use LIB for SSL/TLS support (openssl)
   --with-openssl          obsolete spelling of --with-ssl=openssl
@@ -8563,6 +8566,39 @@ fi
 
 
 
+#
+# LZ4
+#
+{ $as_echo "$as_me:${as_lineno-$LINENO}: checking whether to build with LZ4 support" >&5
+$as_echo_n "checking whether to build with LZ4 support... " >&6; }
+
+
+
+# Check whether --with-lz4 was given.
+if test "${with_lz4+set}" = set; then :
+  withval=$with_lz4;
+  case $withval in
+    yes)
+
+$as_echo "#define USE_LZ4 1" >>confdefs.h
+
+      ;;
+    no)
+      :
+      ;;
+    *)
+      as_fn_error $? "no argument expected for --with-lz4 option" "$LINENO" 5
+      ;;
+  esac
+
+else
+  with_lz4=no
+
+fi
+
+
+
+
 #
 # Assignments
 #
@@ -12054,6 +12090,56 @@ fi
 
 fi
 
+if test "$with_lz4" = yes ; then
+  { $as_echo "$as_me:${as_lineno-$LINENO}: checking for LZ4_compress in -llz4" >&5
+$as_echo_n "checking for LZ4_compress in -llz4... " >&6; }
+if ${ac_cv_lib_lz4_LZ4_compress+:} false; then :
+  $as_echo_n "(cached) " >&6
+else
+  ac_check_lib_save_LIBS=$LIBS
+LIBS="-llz4  $LIBS"
+cat confdefs.h - <<_ACEOF >conftest.$ac_ext
+/* end confdefs.h.  */
+
+/* Override any GCC internal prototype to avoid an error.
+   Use char because int might match the return type of a GCC
+   builtin and then its argument prototype would still apply.  */
+#ifdef __cplusplus
+extern "C"
+#endif
+char LZ4_compress ();
+int
+main ()
+{
+return LZ4_compress ();
+  ;
+  return 0;
+}
+_ACEOF
+if ac_fn_c_try_link "$LINENO"; then :
+  ac_cv_lib_lz4_LZ4_compress=yes
+else
+  ac_cv_lib_lz4_LZ4_compress=no
+fi
+rm -f core conftest.err conftest.$ac_objext \
+    conftest$ac_exeext conftest.$ac_ext
+LIBS=$ac_check_lib_save_LIBS
+fi
+{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_lz4_LZ4_compress" >&5
+$as_echo "$ac_cv_lib_lz4_LZ4_compress" >&6; }
+if test "x$ac_cv_lib_lz4_LZ4_compress" = xyes; then :
+  cat >>confdefs.h <<_ACEOF
+#define HAVE_LIBLZ4 1
+_ACEOF
+
+  LIBS="-llz4 $LIBS"
+
+else
+  as_fn_error $? "library 'lz4' is required for LZ4 support" "$LINENO" 5
+fi
+
+fi
+
 if test "$enable_spinlocks" = yes; then
 
 $as_echo "#define HAVE_SPINLOCKS 1" >>confdefs.h
@@ -13320,6 +13406,36 @@ Use --without-zlib to disable zlib support." "$LINENO" 5
 fi
 
 
+fi
+
+if test "$with_lz4" = yes ; then
+  for ac_header in lz4/lz4.h
+do :
+  ac_fn_c_check_header_mongrel "$LINENO" "lz4/lz4.h" "ac_cv_header_lz4_lz4_h" "$ac_includes_default"
+if test "x$ac_cv_header_lz4_lz4_h" = xyes; then :
+  cat >>confdefs.h <<_ACEOF
+#define HAVE_LZ4_LZ4_H 1
+_ACEOF
+
+else
+  for ac_header in lz4.h
+do :
+  ac_fn_c_check_header_mongrel "$LINENO" "lz4.h" "ac_cv_header_lz4_h" "$ac_includes_default"
+if test "x$ac_cv_header_lz4_h" = xyes; then :
+  cat >>confdefs.h <<_ACEOF
+#define HAVE_LZ4_H 1
+_ACEOF
+
+else
+  as_fn_error $? "lz4.h header file is required for LZ4" "$LINENO" 5
+fi
+
+done
+
+fi
+
+done
+
 fi
 
 if test "$with_gssapi" = yes ; then
diff --git a/configure.ac b/configure.ac
index 07da84d401..63940b78a0 100644
--- a/configure.ac
+++ b/configure.ac
@@ -986,6 +986,14 @@ PGAC_ARG_BOOL(with, zlib, yes,
               [do not use Zlib])
 AC_SUBST(with_zlib)
 
+#
+# LZ4
+#
+AC_MSG_CHECKING([whether to build with LZ4 support])
+PGAC_ARG_BOOL(with, lz4, no, [build with LZ4 support],
+              [AC_DEFINE([USE_LZ4], 1, [Define to 1 to build with LZ4 support. (--with-lz4)])])
+AC_SUBST(with_lz4)
+
 #
 # Assignments
 #
@@ -1173,6 +1181,10 @@ failure.  It is possible the compiler isn't looking in the proper directory.
 Use --without-zlib to disable zlib support.])])
 fi
 
+if test "$with_lz4" = yes ; then
+  AC_CHECK_LIB(lz4, LZ4_compress, [], [AC_MSG_ERROR([library 'lz4' is required for LZ4 support])])
+fi
+
 if test "$enable_spinlocks" = yes; then
   AC_DEFINE(HAVE_SPINLOCKS, 1, [Define to 1 if you have spinlocks.])
 else
@@ -1406,6 +1418,11 @@ failure.  It is possible the compiler isn't looking in the proper directory.
 Use --without-zlib to disable zlib support.])])
 fi
 
+if test "$with_lz4" = yes ; then
+  AC_CHECK_HEADERS(lz4/lz4.h, [],
+	[AC_CHECK_HEADERS(lz4.h, [], [AC_MSG_ERROR([lz4.h header file is required for LZ4])])])
+fi
+
 if test "$with_gssapi" = yes ; then
   AC_CHECK_HEADERS(gssapi/gssapi.h, [],
 	[AC_CHECK_HEADERS(gssapi.h, [], [AC_MSG_ERROR([gssapi.h header file is required for GSSAPI])])])
diff --git a/doc/src/sgml/ddl.sgml b/doc/src/sgml/ddl.sgml
index 1e9a4625cc..4d7ed698b9 100644
--- a/doc/src/sgml/ddl.sgml
+++ b/doc/src/sgml/ddl.sgml
@@ -3762,6 +3762,9 @@ CREATE TABLE measurement (
        <productname>PostgreSQL</productname>
        tables (or, possibly, foreign tables).  It is possible to specify a
        tablespace and storage parameters for each partition separately.
+       By default, each column in a partition inherits the compression method
+       from parent table's column, however a different compression method can be
+       set for each partition.
       </para>
 
       <para>
diff --git a/doc/src/sgml/ref/create_table.sgml b/doc/src/sgml/ref/create_table.sgml
index 3b2b227683..3374012940 100644
--- a/doc/src/sgml/ref/create_table.sgml
+++ b/doc/src/sgml/ref/create_table.sgml
@@ -69,6 +69,7 @@ CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXI
   GENERATED { ALWAYS | BY DEFAULT } AS IDENTITY [ ( <replaceable>sequence_options</replaceable> ) ] |
   UNIQUE <replaceable class="parameter">index_parameters</replaceable> |
   PRIMARY KEY <replaceable class="parameter">index_parameters</replaceable> |
+  COMPRESSION <replaceable class="parameter">compression_method</replaceable> |
   REFERENCES <replaceable class="parameter">reftable</replaceable> [ ( <replaceable class="parameter">refcolumn</replaceable> ) ] [ MATCH FULL | MATCH PARTIAL | MATCH SIMPLE ]
     [ ON DELETE <replaceable class="parameter">referential_action</replaceable> ] [ ON UPDATE <replaceable class="parameter">referential_action</replaceable> ] }
 [ DEFERRABLE | NOT DEFERRABLE ] [ INITIALLY DEFERRED | INITIALLY IMMEDIATE ]
@@ -605,6 +606,17 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
         </listitem>
        </varlistentry>
 
+       <varlistentry>
+        <term><literal>INCLUDING COMPRESSION</literal></term>
+        <listitem>
+         <para>
+          Compression method of the columns will be copied.  The default
+          behavior is to exclude compression methods, resulting in the columns
+          having the default compression method.
+         </para>
+        </listitem>
+       </varlistentry>
+
        <varlistentry>
         <term><literal>INCLUDING CONSTRAINTS</literal></term>
         <listitem>
@@ -981,6 +993,19 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>COMPRESSION <replaceable class="parameter">compression_method</replaceable></literal></term>
+    <listitem>
+     <para>
+      This sets the compression method for a column.  The supported compression
+      methods are <literal>pglz</literal> and <literal>lz4</literal>.
+      <literal>lz4</literal> is available only if <literal>--with-lz4</literal>
+      was used when building <productname>PostgreSQL</productname>. The default
+      is <literal>pglz</literal>.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry id="sql-createtable-exclude">
     <term><literal>EXCLUDE [ USING <replaceable class="parameter">index_method</replaceable> ] ( <replaceable class="parameter">exclude_element</replaceable> WITH <replaceable class="parameter">operator</replaceable> [, ... ] ) <replaceable class="parameter">index_parameters</replaceable> [ WHERE ( <replaceable class="parameter">predicate</replaceable> ) ]</literal></term>
     <listitem>
diff --git a/src/backend/access/Makefile b/src/backend/access/Makefile
index 0880e0a8bb..ba08bdd63e 100644
--- a/src/backend/access/Makefile
+++ b/src/backend/access/Makefile
@@ -8,7 +8,7 @@ subdir = src/backend/access
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
-SUBDIRS	    = brin common gin gist hash heap index nbtree rmgrdesc spgist \
+SUBDIRS	    = brin common compression gin gist hash heap index nbtree rmgrdesc spgist \
 			  table tablesample transam
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/brin/brin_tuple.c b/src/backend/access/brin/brin_tuple.c
index a7eb1c9473..0ab5712c71 100644
--- a/src/backend/access/brin/brin_tuple.c
+++ b/src/backend/access/brin/brin_tuple.c
@@ -213,7 +213,10 @@ brin_form_tuple(BrinDesc *brdesc, BlockNumber blkno, BrinMemTuple *tuple,
 				(atttype->typstorage == TYPSTORAGE_EXTENDED ||
 				 atttype->typstorage == TYPSTORAGE_MAIN))
 			{
-				Datum		cvalue = toast_compress_datum(value);
+				Form_pg_attribute att = TupleDescAttr(brdesc->bd_tupdesc,
+													  keyno);
+				Datum		cvalue = toast_compress_datum(value,
+														  att->attcompression);
 
 				if (DatumGetPointer(cvalue) != NULL)
 				{
diff --git a/src/backend/access/common/detoast.c b/src/backend/access/common/detoast.c
index d1cdbaf648..b78d49167b 100644
--- a/src/backend/access/common/detoast.c
+++ b/src/backend/access/common/detoast.c
@@ -13,6 +13,7 @@
 
 #include "postgres.h"
 
+#include "access/compressamapi.h"
 #include "access/detoast.h"
 #include "access/table.h"
 #include "access/tableam.h"
@@ -457,28 +458,78 @@ toast_fetch_datum_slice(struct varlena *attr, int32 sliceoffset,
 }
 
 /* ----------
- * toast_decompress_datum -
+ * toast_get_compression_oid -
  *
- * Decompress a compressed version of a varlena datum
+ * Returns the Oid of the compression method stored in the compressed data.  If
+ * the varlena is not compressed then returns InvalidOid.
  */
-static struct varlena *
-toast_decompress_datum(struct varlena *attr)
+Oid
+toast_get_compression_oid(struct varlena *attr)
 {
-	struct varlena *result;
+	if (VARATT_IS_EXTERNAL_ONDISK(attr))
+	{
+		struct varatt_external toast_pointer;
+
+		VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
+
+		/* fast path for non-compressed external datums */
+		if (!VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
+			return InvalidOid;
+
+		/*
+		 * Just fetch the toast compress header to know the compression method
+		 * in the compressed data.
+		 */
+		attr = toast_fetch_datum_slice(attr, 0, VARHDRSZ_COMPRESS);
+	}
+	else if (!VARATT_IS_COMPRESSED(attr))
+		return InvalidOid;
+
+	return CompressionIdToOid(TOAST_COMPRESS_METHOD(attr));
+}
+
+/* ----------
+ * toast_get_compression_handler - get the compression handler routines
+ *
+ * helper function for toast_decompress_datum and toast_decompress_datum_slice
+ */
+static inline const CompressionAmRoutine *
+toast_get_compression_handler(struct varlena *attr)
+{
+	const CompressionAmRoutine *cmroutine;
+	CompressionId cmid;
 
 	Assert(VARATT_IS_COMPRESSED(attr));
 
-	result = (struct varlena *)
-		palloc(TOAST_COMPRESS_RAWSIZE(attr) + VARHDRSZ);
-	SET_VARSIZE(result, TOAST_COMPRESS_RAWSIZE(attr) + VARHDRSZ);
+	cmid = TOAST_COMPRESS_METHOD(attr);
 
-	if (pglz_decompress(TOAST_COMPRESS_RAWDATA(attr),
-						TOAST_COMPRESS_SIZE(attr),
-						VARDATA(result),
-						TOAST_COMPRESS_RAWSIZE(attr), true) < 0)
-		elog(ERROR, "compressed data is corrupted");
+	/* Get the handler routines for the compression method */
+	switch (cmid)
+	{
+		case PGLZ_COMPRESSION_ID:
+			cmroutine = &pglz_compress_methods;
+			break;
+		case LZ4_COMPRESSION_ID:
+			cmroutine = &lz4_compress_methods;
+			break;
+		default:
+			elog(ERROR, "invalid compression method id %d", cmid);
+	}
 
-	return result;
+	return cmroutine;
+}
+
+/* ----------
+ * toast_decompress_datum -
+ *
+ * Decompress a compressed version of a varlena datum
+ */
+static struct varlena *
+toast_decompress_datum(struct varlena *attr)
+{
+	const CompressionAmRoutine *cmroutine = toast_get_compression_handler(attr);
+
+	return cmroutine->datum_decompress(attr);
 }
 
 
@@ -492,22 +543,16 @@ toast_decompress_datum(struct varlena *attr)
 static struct varlena *
 toast_decompress_datum_slice(struct varlena *attr, int32 slicelength)
 {
-	struct varlena *result;
-	int32		rawsize;
-
-	Assert(VARATT_IS_COMPRESSED(attr));
-
-	result = (struct varlena *) palloc(slicelength + VARHDRSZ);
-
-	rawsize = pglz_decompress(TOAST_COMPRESS_RAWDATA(attr),
-							  VARSIZE(attr) - TOAST_COMPRESS_HDRSZ,
-							  VARDATA(result),
-							  slicelength, false);
-	if (rawsize < 0)
-		elog(ERROR, "compressed data is corrupted");
+	const CompressionAmRoutine *cmroutine = toast_get_compression_handler(attr);
 
-	SET_VARSIZE(result, rawsize + VARHDRSZ);
-	return result;
+	/*
+	 * If the handler supports the slice decompression then decompress the
+	 * slice otherwise decompress complete data.
+	 */
+	if (cmroutine->datum_decompress_slice)
+		return cmroutine->datum_decompress_slice(attr, slicelength);
+	else
+		return cmroutine->datum_decompress(attr);
 }
 
 /* ----------
diff --git a/src/backend/access/common/indextuple.c b/src/backend/access/common/indextuple.c
index b72a138497..1d43d5d2ff 100644
--- a/src/backend/access/common/indextuple.c
+++ b/src/backend/access/common/indextuple.c
@@ -16,6 +16,7 @@
 
 #include "postgres.h"
 
+#include "access/compressamapi.h"
 #include "access/detoast.h"
 #include "access/heaptoast.h"
 #include "access/htup_details.h"
@@ -103,7 +104,8 @@ index_form_tuple(TupleDesc tupleDescriptor,
 			(att->attstorage == TYPSTORAGE_EXTENDED ||
 			 att->attstorage == TYPSTORAGE_MAIN))
 		{
-			Datum		cvalue = toast_compress_datum(untoasted_values[i]);
+			Datum		cvalue = toast_compress_datum(untoasted_values[i],
+													  att->attcompression);
 
 			if (DatumGetPointer(cvalue) != NULL)
 			{
diff --git a/src/backend/access/common/toast_internals.c b/src/backend/access/common/toast_internals.c
index 9b9da0f41b..b04c5a5eb8 100644
--- a/src/backend/access/common/toast_internals.c
+++ b/src/backend/access/common/toast_internals.c
@@ -44,46 +44,51 @@ static bool toastid_valueid_exists(Oid toastrelid, Oid valueid);
  * ----------
  */
 Datum
-toast_compress_datum(Datum value)
+toast_compress_datum(Datum value, Oid cmoid)
 {
-	struct varlena *tmp;
-	int32		valsize = VARSIZE_ANY_EXHDR(DatumGetPointer(value));
-	int32		len;
+	struct varlena *tmp = NULL;
+	int32		valsize;
+	const CompressionAmRoutine *cmroutine = NULL;
 
 	Assert(!VARATT_IS_EXTERNAL(DatumGetPointer(value)));
 	Assert(!VARATT_IS_COMPRESSED(DatumGetPointer(value)));
 
-	/*
-	 * No point in wasting a palloc cycle if value size is out of the allowed
-	 * range for compression
-	 */
-	if (valsize < PGLZ_strategy_default->min_input_size ||
-		valsize > PGLZ_strategy_default->max_input_size)
-		return PointerGetDatum(NULL);
+	Assert(OidIsValid(cmoid));
+
+	/* Get the handler routines for the compression method */
+	switch (cmoid)
+	{
+		case PGLZ_COMPRESSION_AM_OID:
+			cmroutine = &pglz_compress_methods;
+			break;
+		case LZ4_COMPRESSION_AM_OID:
+			cmroutine = &lz4_compress_methods;
+			break;
+		default:
+			elog(ERROR, "Invalid compression method oid %u", cmoid);
+	}
 
-	tmp = (struct varlena *) palloc(PGLZ_MAX_OUTPUT(valsize) +
-									TOAST_COMPRESS_HDRSZ);
+	/* Call the actual compression function */
+	tmp = cmroutine->datum_compress((const struct varlena *) value);
+	if (!tmp)
+		return PointerGetDatum(NULL);
 
 	/*
-	 * We recheck the actual size even if pglz_compress() reports success,
-	 * because it might be satisfied with having saved as little as one byte
-	 * in the compressed data --- which could turn into a net loss once you
-	 * consider header and alignment padding.  Worst case, the compressed
-	 * format might require three padding bytes (plus header, which is
-	 * included in VARSIZE(tmp)), whereas the uncompressed format would take
-	 * only one header byte and no padding if the value is short enough.  So
-	 * we insist on a savings of more than 2 bytes to ensure we have a gain.
+	 * We recheck the actual size even if compression reports success, because
+	 * it might be satisfied with having saved as little as one byte in the
+	 * compressed data --- which could turn into a net loss once you consider
+	 * header and alignment padding.  Worst case, the compressed format might
+	 * require three padding bytes (plus header, which is included in
+	 * VARSIZE(tmp)), whereas the uncompressed format would take only one
+	 * header byte and no padding if the value is short enough.  So we insist
+	 * on a savings of more than 2 bytes to ensure we have a gain.
 	 */
-	len = pglz_compress(VARDATA_ANY(DatumGetPointer(value)),
-						valsize,
-						TOAST_COMPRESS_RAWDATA(tmp),
-						PGLZ_strategy_default);
-	if (len >= 0 &&
-		len + TOAST_COMPRESS_HDRSZ < valsize - 2)
+	valsize = VARSIZE_ANY_EXHDR(DatumGetPointer(value));
+
+	if (VARSIZE(tmp) < valsize - 2)
 	{
-		TOAST_COMPRESS_SET_RAWSIZE(tmp, valsize);
-		SET_VARSIZE_COMPRESSED(tmp, len + TOAST_COMPRESS_HDRSZ);
 		/* successful compression */
+		TOAST_COMPRESS_SET_SIZE_AND_METHOD(tmp, valsize, CompressionOidToId(cmoid));
 		return PointerGetDatum(tmp);
 	}
 	else
diff --git a/src/backend/access/common/tupdesc.c b/src/backend/access/common/tupdesc.c
index 902f59440c..ca26fab487 100644
--- a/src/backend/access/common/tupdesc.c
+++ b/src/backend/access/common/tupdesc.c
@@ -19,6 +19,7 @@
 
 #include "postgres.h"
 
+#include "access/compressamapi.h"
 #include "access/htup_details.h"
 #include "access/tupdesc_details.h"
 #include "catalog/pg_collation.h"
@@ -470,6 +471,8 @@ equalTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2)
 			return false;
 		if (attr1->attcollation != attr2->attcollation)
 			return false;
+		if (attr1->attcompression != attr2->attcompression)
+			return false;
 		/* attacl, attoptions and attfdwoptions are not even present... */
 	}
 
@@ -664,6 +667,11 @@ TupleDescInitEntry(TupleDesc desc,
 	att->attstorage = typeForm->typstorage;
 	att->attcollation = typeForm->typcollation;
 
+	if (IsStorageCompressible(typeForm->typstorage))
+		att->attcompression = DefaultCompressionOid;
+	else
+		att->attcompression = InvalidOid;
+
 	ReleaseSysCache(tuple);
 }
 
diff --git a/src/backend/access/compression/Makefile b/src/backend/access/compression/Makefile
new file mode 100644
index 0000000000..779f54e785
--- /dev/null
+++ b/src/backend/access/compression/Makefile
@@ -0,0 +1,17 @@
+#-------------------------------------------------------------------------
+#
+# Makefile--
+#    Makefile for access/compression
+#
+# IDENTIFICATION
+#    src/backend/access/compression/Makefile
+#
+#-------------------------------------------------------------------------
+
+subdir = src/backend/access/compression
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+OBJS = compress_pglz.o compress_lz4.o
+
+include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/compression/compress_lz4.c b/src/backend/access/compression/compress_lz4.c
new file mode 100644
index 0000000000..1856cf7df7
--- /dev/null
+++ b/src/backend/access/compression/compress_lz4.c
@@ -0,0 +1,164 @@
+/*-------------------------------------------------------------------------
+ *
+ * compress_lz4.c
+ *		lz4 compression method.
+ *
+ * Portions Copyright (c) 2021, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/compression/compress_lz4.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#ifdef HAVE_LIBLZ4
+#include <lz4.h>
+#endif
+
+#include "access/compressamapi.h"
+#include "fmgr.h"
+#include "utils/builtins.h"
+
+/*
+ * lz4_cmcompress - compression routine for lz4 compression method
+ *
+ * Compresses source into dest using the default strategy. Returns the
+ * compressed varlena, or NULL if compression fails.
+ */
+static struct varlena *
+lz4_cmcompress(const struct varlena *value)
+{
+#ifndef HAVE_LIBLZ4
+	ereport(ERROR,
+			(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+			 errmsg("not built with lz4 support")));
+#else
+	int32		valsize;
+	int32		len;
+	int32		max_size;
+	struct varlena *tmp = NULL;
+
+	valsize = VARSIZE_ANY_EXHDR(value);
+
+	/*
+	 * Get maximum size of the compressed data that lz4 compression may output
+	 * and allocate the memory for holding the compressed data and the header.
+	 */
+	max_size = LZ4_compressBound(valsize);
+	tmp = (struct varlena *) palloc(max_size + VARHDRSZ_COMPRESS);
+
+	len = LZ4_compress_default(VARDATA_ANY(value),
+							   (char *) tmp + VARHDRSZ_COMPRESS,
+							   valsize, max_size);
+	if (len <= 0)
+		elog(ERROR, "could not compress data with lz4");
+
+	/* data is incompressible so just free the memory and return NULL */
+	if (len > valsize)
+	{
+		pfree(tmp);
+		return NULL;
+	}
+
+	SET_VARSIZE_COMPRESSED(tmp, len + VARHDRSZ_COMPRESS);
+
+	return tmp;
+#endif
+}
+
+/*
+ * lz4_cmdecompress - decompression routine for lz4 compression method
+ *
+ * Returns the decompressed varlena.
+ */
+static struct varlena *
+lz4_cmdecompress(const struct varlena *value)
+{
+#ifndef HAVE_LIBLZ4
+	ereport(ERROR,
+			(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+			 errmsg("not built with lz4 support")));
+#else
+	int32		rawsize;
+	struct varlena *result;
+
+	/* allocate memory for holding the uncompressed data */
+	result = (struct varlena *) palloc(VARRAWSIZE_4B_C(value) + VARHDRSZ);
+
+	/* decompress data using lz4 routine */
+	rawsize = LZ4_decompress_safe((char *) value + VARHDRSZ_COMPRESS,
+								  VARDATA(result),
+								  VARSIZE(value) - VARHDRSZ_COMPRESS,
+								  VARRAWSIZE_4B_C(value));
+	if (rawsize < 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_DATA_CORRUPTED),
+				 errmsg_internal("compressed lz4 data is corrupt")));
+
+
+	SET_VARSIZE(result, rawsize + VARHDRSZ);
+
+	return result;
+#endif
+}
+
+/*
+ * lz4_cmdecompress_slice - slice decompression routine for lz4 compression
+ *
+ * Decompresses part of the data. Returns the decompressed varlena.
+ */
+static struct varlena *
+lz4_cmdecompress_slice(const struct varlena *value, int32 slicelength)
+{
+#ifndef HAVE_LIBLZ4
+	ereport(ERROR,
+			(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+			 errmsg("not built with lz4 support")));
+#else
+	int32		rawsize;
+	struct varlena *result;
+
+	/* allocate memory for holding the uncompressed data */
+	result = (struct varlena *) palloc(VARRAWSIZE_4B_C(value) + VARHDRSZ);
+
+	/* decompress partial data using lz4 routine */
+	rawsize = LZ4_decompress_safe_partial((char *) value + VARHDRSZ_COMPRESS,
+										  VARDATA(result),
+										  VARSIZE(value) - VARHDRSZ_COMPRESS,
+										  slicelength,
+										  VARRAWSIZE_4B_C(value));
+	if (rawsize < 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_DATA_CORRUPTED),
+				 errmsg_internal("compressed lz4 data is corrupt")));
+
+	SET_VARSIZE(result, rawsize + VARHDRSZ);
+
+	return result;
+#endif
+}
+
+/* ------------------------------------------------------------------------
+ * Definition of the lz4 compression access method.
+ * ------------------------------------------------------------------------
+ */
+const CompressionAmRoutine lz4_compress_methods = {
+	.type = T_CompressionAmRoutine,
+	.datum_compress = lz4_cmcompress,
+	.datum_decompress = lz4_cmdecompress,
+	.datum_decompress_slice = lz4_cmdecompress_slice
+};
+
+/* lz4 compression handler function */
+Datum
+lz4handler(PG_FUNCTION_ARGS)
+{
+#ifndef HAVE_LIBLZ4
+	ereport(ERROR,
+			(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+			 errmsg("not built with lz4 support")));
+#else
+	PG_RETURN_POINTER(&lz4_compress_methods);
+#endif
+}
diff --git a/src/backend/access/compression/compress_pglz.c b/src/backend/access/compression/compress_pglz.c
new file mode 100644
index 0000000000..8a4bf427cf
--- /dev/null
+++ b/src/backend/access/compression/compress_pglz.c
@@ -0,0 +1,138 @@
+/*-------------------------------------------------------------------------
+ *
+ * compress_pglz.c
+ *	  pglz compression method
+ *
+ * Portions Copyright (c) 2021, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/compression/compress_pglz.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/compressamapi.h"
+#include "common/pg_lzcompress.h"
+
+#include "fmgr.h"
+#include "utils/builtins.h"
+
+/*
+ * pglz_cmcompress - compression routine for pglz compression method
+ *
+ * Compresses source into dest using the default strategy. Returns the
+ * compressed varlena, or NULL if compression fails.
+ */
+static struct varlena *
+pglz_cmcompress(const struct varlena *value)
+{
+	int32		valsize,
+				len;
+	struct varlena *tmp = NULL;
+
+	valsize = VARSIZE_ANY_EXHDR(DatumGetPointer(value));
+
+	/*
+	 * No point in wasting a palloc cycle if value size is outside the allowed
+	 * range for compression.
+	 */
+	if (valsize < PGLZ_strategy_default->min_input_size ||
+		valsize > PGLZ_strategy_default->max_input_size)
+		return NULL;
+
+	/*
+	 * Get maximum size of the compressed data that pglz compression may output
+	 * and allocate the memory for holding the compressed data and the header.
+	 */
+	tmp = (struct varlena *) palloc(PGLZ_MAX_OUTPUT(valsize) +
+									VARHDRSZ_COMPRESS);
+
+	len = pglz_compress(VARDATA_ANY(value),
+						valsize,
+						(char *) tmp + VARHDRSZ_COMPRESS,
+						NULL);
+	if (len < 0)
+	{
+		pfree(tmp);
+		return NULL;
+	}
+
+	SET_VARSIZE_COMPRESSED(tmp, len + VARHDRSZ_COMPRESS);
+
+	return tmp;
+}
+
+/*
+ * pglz_cmdecompress - decompression routine for pglz compression method
+ *
+ * Returns the decompressed varlena.
+ */
+static struct varlena *
+pglz_cmdecompress(const struct varlena *value)
+{
+	struct varlena *result;
+	int32		rawsize;
+
+	result = (struct varlena *) palloc(VARRAWSIZE_4B_C(value) + VARHDRSZ);
+
+	rawsize = pglz_decompress((char *) value + VARHDRSZ_COMPRESS,
+							  VARSIZE(value) - VARHDRSZ_COMPRESS,
+							  VARDATA(result),
+							  VARRAWSIZE_4B_C(value), true);
+	if (rawsize < 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_DATA_CORRUPTED),
+				 errmsg_internal("compressed pglz data is corrupt")));
+
+	SET_VARSIZE(result, rawsize + VARHDRSZ);
+
+	return result;
+}
+
+/*
+ * pglz_decompress - slice decompression routine for pglz compression method
+ *
+ * Decompresses part of the data. Returns the decompressed varlena.
+ */
+static struct varlena *
+pglz_cmdecompress_slice(const struct varlena *value,
+						int32 slicelength)
+{
+	struct varlena *result;
+	int32		rawsize;
+
+	result = (struct varlena *) palloc(slicelength + VARHDRSZ);
+
+	rawsize = pglz_decompress((char *) value + VARHDRSZ_COMPRESS,
+							  VARSIZE(value) - VARHDRSZ_COMPRESS,
+							  VARDATA(result),
+							  slicelength, false);
+	if (rawsize < 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_DATA_CORRUPTED),
+				 errmsg_internal("compressed pglz data is corrupt")));
+
+	SET_VARSIZE(result, rawsize + VARHDRSZ);
+
+	return result;
+}
+
+/* ------------------------------------------------------------------------
+ * Definition of the pglz compression access method.
+ * ------------------------------------------------------------------------
+ */
+const CompressionAmRoutine pglz_compress_methods = {
+	.type = T_CompressionAmRoutine,
+	.datum_compress = pglz_cmcompress,
+	.datum_decompress = pglz_cmdecompress,
+	.datum_decompress_slice = pglz_cmdecompress_slice
+};
+
+/* pglz compression handler function */
+Datum
+pglzhandler(PG_FUNCTION_ARGS)
+{
+	PG_RETURN_POINTER(&pglz_compress_methods);
+}
diff --git a/src/backend/access/table/toast_helper.c b/src/backend/access/table/toast_helper.c
index fb36151ce5..53f78f9c3e 100644
--- a/src/backend/access/table/toast_helper.c
+++ b/src/backend/access/table/toast_helper.c
@@ -54,6 +54,7 @@ toast_tuple_init(ToastTupleContext *ttc)
 
 		ttc->ttc_attr[i].tai_colflags = 0;
 		ttc->ttc_attr[i].tai_oldexternal = NULL;
+		ttc->ttc_attr[i].tai_compression = att->attcompression;
 
 		if (ttc->ttc_oldvalues != NULL)
 		{
@@ -226,9 +227,11 @@ void
 toast_tuple_try_compression(ToastTupleContext *ttc, int attribute)
 {
 	Datum	   *value = &ttc->ttc_values[attribute];
-	Datum		new_value = toast_compress_datum(*value);
+	Datum		new_value;
 	ToastAttrInfo *attr = &ttc->ttc_attr[attribute];
 
+	new_value = toast_compress_datum(*value, attr->tai_compression);
+
 	if (DatumGetPointer(new_value) != NULL)
 	{
 		/* successful compression */
diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c
index 6f615e6622..9b451eaa71 100644
--- a/src/backend/bootstrap/bootstrap.c
+++ b/src/backend/bootstrap/bootstrap.c
@@ -17,6 +17,7 @@
 #include <unistd.h>
 #include <signal.h>
 
+#include "access/compressamapi.h"
 #include "access/genam.h"
 #include "access/heapam.h"
 #include "access/htup_details.h"
@@ -731,6 +732,10 @@ DefineAttr(char *name, char *type, int attnum, int nullness)
 	attrtypes[attnum]->attcacheoff = -1;
 	attrtypes[attnum]->atttypmod = -1;
 	attrtypes[attnum]->attislocal = true;
+	if (IsStorageCompressible(attrtypes[attnum]->attstorage))
+		attrtypes[attnum]->attcompression = DefaultCompressionOid;
+	else
+		attrtypes[attnum]->attcompression = InvalidOid;
 
 	if (nullness == BOOTCOL_NULL_FORCE_NOT_NULL)
 	{
diff --git a/src/backend/catalog/genbki.pl b/src/backend/catalog/genbki.pl
index b159958112..f5bb37b972 100644
--- a/src/backend/catalog/genbki.pl
+++ b/src/backend/catalog/genbki.pl
@@ -187,7 +187,9 @@ my $GenbkiNextOid = $FirstGenbkiObjectId;
 my $C_COLLATION_OID =
   Catalog::FindDefinedSymbolFromData($catalog_data{pg_collation},
 	'C_COLLATION_OID');
-
+my $PGLZ_COMPRESSION_AM_OID =
+  Catalog::FindDefinedSymbolFromData($catalog_data{pg_am},
+	'PGLZ_COMPRESSION_AM_OID');
 
 # Fill in pg_class.relnatts by looking at the referenced catalog's schema.
 # This is ugly but there's no better place; Catalog::AddDefaultValues
@@ -906,6 +908,9 @@ sub morph_row_for_pgattr
 	$row->{attcollation} =
 	  $type->{typcollation} ne '0' ? $C_COLLATION_OID : 0;
 
+	$row->{attcompression} =
+	  $type->{typstorage} ne 'p' && $type->{typstorage} ne 'e' ? $PGLZ_COMPRESSION_AM_OID : 0;
+
 	if (defined $attr->{forcenotnull})
 	{
 		$row->{attnotnull} = 't';
diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c
index 9abc4a1f55..b53b6b50e6 100644
--- a/src/backend/catalog/heap.c
+++ b/src/backend/catalog/heap.c
@@ -29,6 +29,7 @@
  */
 #include "postgres.h"
 
+#include "access/compressamapi.h"
 #include "access/genam.h"
 #include "access/htup_details.h"
 #include "access/multixact.h"
@@ -789,6 +790,7 @@ InsertPgAttributeTuples(Relation pg_attribute_rel,
 		slot[slotCount]->tts_values[Anum_pg_attribute_attislocal - 1] = BoolGetDatum(attrs->attislocal);
 		slot[slotCount]->tts_values[Anum_pg_attribute_attinhcount - 1] = Int32GetDatum(attrs->attinhcount);
 		slot[slotCount]->tts_values[Anum_pg_attribute_attcollation - 1] = ObjectIdGetDatum(attrs->attcollation);
+		slot[slotCount]->tts_values[Anum_pg_attribute_attcompression - 1] = CharGetDatum(attrs->attcompression);
 		if (attoptions && attoptions[natts] != (Datum) 0)
 			slot[slotCount]->tts_values[Anum_pg_attribute_attoptions - 1] = attoptions[natts];
 		else
@@ -1715,6 +1717,8 @@ RemoveAttributeById(Oid relid, AttrNumber attnum)
 		/* Unset this so no one tries to look up the generation expression */
 		attStruct->attgenerated = '\0';
 
+		attStruct->attcompression = InvalidOid;
+
 		/*
 		 * Change the column name to something that isn't likely to conflict
 		 */
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index b4ab0b88ad..0db2a7430f 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -24,6 +24,7 @@
 #include <unistd.h>
 
 #include "access/amapi.h"
+#include "access/compressamapi.h"
 #include "access/heapam.h"
 #include "access/multixact.h"
 #include "access/reloptions.h"
@@ -348,6 +349,7 @@ ConstructTupleDescriptor(Relation heapRelation,
 			to->attbyval = from->attbyval;
 			to->attstorage = from->attstorage;
 			to->attalign = from->attalign;
+			to->attcompression = from->attcompression;
 		}
 		else
 		{
diff --git a/src/backend/catalog/toasting.c b/src/backend/catalog/toasting.c
index d7b806020d..a549481557 100644
--- a/src/backend/catalog/toasting.c
+++ b/src/backend/catalog/toasting.c
@@ -220,6 +220,11 @@ create_toast_table(Relation rel, Oid toastOid, Oid toastIndexOid,
 	TupleDescAttr(tupdesc, 1)->attstorage = TYPSTORAGE_PLAIN;
 	TupleDescAttr(tupdesc, 2)->attstorage = TYPSTORAGE_PLAIN;
 
+	/* Toast field should not be compressed */
+	TupleDescAttr(tupdesc, 0)->attcompression = InvalidOid;
+	TupleDescAttr(tupdesc, 1)->attcompression = InvalidOid;
+	TupleDescAttr(tupdesc, 2)->attcompression = InvalidOid;
+
 	/*
 	 * Toast tables for regular relations go in pg_toast; those for temp
 	 * relations go into the per-backend temp-toast-table namespace.
diff --git a/src/backend/commands/amcmds.c b/src/backend/commands/amcmds.c
index eff9535ed0..3ad4a61739 100644
--- a/src/backend/commands/amcmds.c
+++ b/src/backend/commands/amcmds.c
@@ -175,6 +175,16 @@ get_table_am_oid(const char *amname, bool missing_ok)
 	return get_am_type_oid(amname, AMTYPE_TABLE, missing_ok);
 }
 
+/*
+ * get_compression_am_oid - given an access method name, look up its OID
+ *		and verify it corresponds to an compression AM.
+ */
+Oid
+get_compression_am_oid(const char *amname, bool missing_ok)
+{
+	return get_am_type_oid(amname, AMTYPE_COMPRESSION, missing_ok);
+}
+
 /*
  * get_am_oid - given an access method name, look up its OID.
  *		The type is not checked.
diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index dce882012e..1d17dc0d6b 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -61,6 +61,7 @@ typedef struct
 	CommandId	output_cid;		/* cmin to insert in output tuples */
 	int			ti_options;		/* table_tuple_insert performance options */
 	BulkInsertState bistate;	/* bulk insert state */
+	TupleTableSlot *decompress_tuple_slot;	/* to hold the decompress tuple */
 } DR_intorel;
 
 /* utility functions for CTAS definition creation */
@@ -581,6 +582,15 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self)
 	/* Nothing to insert if WITH NO DATA is specified. */
 	if (!myState->into->skipData)
 	{
+		/*
+		 * Compare the compression method of the compressed attribute in the
+		 * source tuple with target attribute and if those are different then
+		 * decompress those attributes.
+		 */
+		slot = CompareCompressionMethodAndDecompress(slot,
+													 &myState->decompress_tuple_slot,
+													 myState->rel->rd_att);
+
 		/*
 		 * Note that the input slot might not be of the type of the target
 		 * relation. That's supported by table_tuple_insert(), but slightly
@@ -619,6 +629,10 @@ intorel_shutdown(DestReceiver *self)
 	/* close rel, but keep lock until commit */
 	table_close(myState->rel, NoLock);
 	myState->rel = NULL;
+
+	/* release the slot used for decompressing the tuple */
+	if (myState->decompress_tuple_slot)
+		ExecDropSingleTupleTableSlot(myState->decompress_tuple_slot);
 }
 
 /*
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index c5c25ce11d..713fc3fceb 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -56,6 +56,7 @@ typedef struct
 	CommandId	output_cid;		/* cmin to insert in output tuples */
 	int			ti_options;		/* table_tuple_insert performance options */
 	BulkInsertState bistate;	/* bulk insert state */
+	TupleTableSlot *decompress_tuple_slot;	/* to hold the decompress tuple */
 } DR_transientrel;
 
 static int	matview_maintenance_depth = 0;
@@ -486,6 +487,15 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
 {
 	DR_transientrel *myState = (DR_transientrel *) self;
 
+	/*
+	 * Compare the compression method of the compressed attribute in the
+	 * source tuple with target attribute and if those are different then
+	 * decompress those attributes.
+	 */
+	slot = CompareCompressionMethodAndDecompress(slot,
+												 &myState->decompress_tuple_slot,
+												 myState->transientrel->rd_att);
+
 	/*
 	 * Note that the input slot might not be of the type of the target
 	 * relation. That's supported by table_tuple_insert(), but slightly less
@@ -521,6 +531,10 @@ transientrel_shutdown(DestReceiver *self)
 	/* close transientrel, but keep lock until commit */
 	table_close(myState->transientrel, NoLock);
 	myState->transientrel = NULL;
+
+	/* release the slot used for decompressing the tuple */
+	if (myState->decompress_tuple_slot)
+		ExecDropSingleTupleTableSlot(myState->decompress_tuple_slot);
 }
 
 /*
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index b2457a6924..811dbc3741 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -15,6 +15,7 @@
 #include "postgres.h"
 
 #include "access/attmap.h"
+#include "access/compressamapi.h"
 #include "access/genam.h"
 #include "access/heapam.h"
 #include "access/heapam_xlog.h"
@@ -559,7 +560,7 @@ static void refuseDupeIndexAttach(Relation parentIdx, Relation partIdx,
 static List *GetParentedForeignKeyRefs(Relation partition);
 static void ATDetachCheckNoForeignKeyRefs(Relation partition);
 static void ATExecAlterCollationRefreshVersion(Relation rel, List *coll);
-
+static Oid GetAttributeCompression(Form_pg_attribute att, char *compression);
 
 /* ----------------------------------------------------------------
  *		DefineRelation
@@ -853,6 +854,18 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
 
 		if (colDef->generated)
 			attr->attgenerated = colDef->generated;
+
+		/*
+		 * lookup attribute's compression method and store its Oid in the
+		 * attr->attcompression.
+		 */
+		if (relkind == RELKIND_RELATION ||
+			relkind == RELKIND_PARTITIONED_TABLE ||
+			relkind == RELKIND_MATVIEW)
+			attr->attcompression =
+				GetAttributeCompression(attr, colDef->compression);
+		else
+			attr->attcompression = InvalidOid;
 	}
 
 	/*
@@ -2397,6 +2410,21 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
 									   storage_name(def->storage),
 									   storage_name(attribute->attstorage))));
 
+				/* Copy/check compression parameter */
+				if (OidIsValid(attribute->attcompression))
+				{
+					char *compression = get_am_name(attribute->attcompression);
+
+					if (!def->compression)
+						def->compression = compression;
+					else if (strcmp(def->compression, compression) != 0)
+						ereport(ERROR,
+								(errcode(ERRCODE_DATATYPE_MISMATCH),
+								 errmsg("column \"%s\" has a compression method conflict",
+										attributeName),
+								 errdetail("%s versus %s", def->compression, compression)));
+				}
+
 				def->inhcount++;
 				/* Merge of NOT NULL constraints = OR 'em together */
 				def->is_not_null |= attribute->attnotnull;
@@ -2431,6 +2459,7 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
 				def->collOid = attribute->attcollation;
 				def->constraints = NIL;
 				def->location = -1;
+				def->compression = get_am_name(attribute->attcompression);
 				inhSchema = lappend(inhSchema, def);
 				newattmap->attnums[parent_attno - 1] = ++child_attno;
 			}
@@ -2676,6 +2705,19 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
 									   storage_name(def->storage),
 									   storage_name(newdef->storage))));
 
+				/* Copy compression parameter */
+				if (!def->compression)
+					def->compression = newdef->compression;
+				else if (newdef->compression)
+				{
+					if (strcmp(def->compression, newdef->compression))
+						ereport(ERROR,
+								(errcode(ERRCODE_DATATYPE_MISMATCH),
+								 errmsg("column \"%s\" has a compression method conflict",
+										attributeName),
+								 errdetail("%s versus %s", def->compression, newdef->compression)));
+				}
+
 				/* Mark the column as locally defined */
 				def->is_local = true;
 				/* Merge of NOT NULL constraints = OR 'em together */
@@ -6341,6 +6383,18 @@ ATExecAddColumn(List **wqueue, AlteredTableInfo *tab, Relation rel,
 	attribute.attislocal = colDef->is_local;
 	attribute.attinhcount = colDef->inhcount;
 	attribute.attcollation = collOid;
+
+	/*
+	 * lookup attribute's compression method and store its Oid in the
+	 * attr->attcompression.
+	 */
+	if (rel->rd_rel->relkind == RELKIND_RELATION ||
+		rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+		attribute.attcompression = GetAttributeCompression(&attribute,
+														   colDef->compression);
+	else
+		attribute.attcompression = InvalidOid;
+
 	/* attribute.attacl is handled by InsertPgAttributeTuples() */
 
 	ReleaseSysCache(typeTuple);
@@ -11860,6 +11914,22 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
 
 	ReleaseSysCache(typeTuple);
 
+	/* Setup attribute compression */
+	if (rel->rd_rel->relkind == RELKIND_RELATION ||
+		rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+	{
+		/*
+		 * InvalidOid for the plain/external storage otherwise default
+		 * compression id.
+		 */
+		if (!IsStorageCompressible(tform->typstorage))
+			attTup->attcompression = InvalidOid;
+		else if (!OidIsValid(attTup->attcompression))
+			attTup->attcompression = DefaultCompressionOid;
+	}
+	else
+		attTup->attcompression = InvalidOid;
+
 	CatalogTupleUpdate(attrelation, &heapTup->t_self, heapTup);
 
 	table_close(attrelation, RowExclusiveLock);
@@ -17665,3 +17735,42 @@ ATExecAlterCollationRefreshVersion(Relation rel, List *coll)
 	index_update_collation_versions(rel->rd_id, get_collation_oid(coll, false));
 	CacheInvalidateRelcache(rel);
 }
+
+/*
+ * resolve column compression specification to an OID.
+ */
+static Oid
+GetAttributeCompression(Form_pg_attribute att, char *compression)
+{
+	char		typstorage = get_typstorage(att->atttypid);
+	Oid			amoid;
+
+	/*
+	 * No compression for the plain/external storage, refer comments atop
+	 * attcompression parameter in pg_attribute.h
+	 */
+	if (!IsStorageCompressible(typstorage))
+	{
+		if (compression == NULL)
+			return InvalidOid;
+
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("column data type %s does not support compression",
+						format_type_be(att->atttypid))));
+	}
+
+	/* fallback to default compression if it's not specified */
+	if (compression == NULL)
+		return DefaultCompressionOid;
+
+	amoid = get_compression_am_oid(compression, false);
+
+#ifndef HAVE_LIBLZ4
+	if (amoid == LZ4_COMPRESSION_AM_OID)
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("not built with lz4 support")));
+#endif
+	return amoid;
+}
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 2993ba43e3..f77deee399 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -37,6 +37,8 @@
 
 #include "postgres.h"
 
+#include "access/compressamapi.h"
+#include "access/detoast.h"
 #include "access/heapam.h"
 #include "access/htup_details.h"
 #include "access/tableam.h"
@@ -2036,6 +2038,113 @@ ExecPrepareTupleRouting(ModifyTableState *mtstate,
 	return slot;
 }
 
+/*
+ * Compare the compression method of each compressed value with the
+ * compression method of the target attribute.  If the compression method
+ * of the compressed value is not supported in the target attribute then
+ * decompress the value.  If any of the value need to decompress then we
+ * need to store that into the new slot.
+ *
+ * The slot will hold the input slot but if any of the value need to be
+ * decompressed then the new slot will be stored into this and the old
+ * slot will be dropped.
+ */
+TupleTableSlot *
+CompareCompressionMethodAndDecompress(TupleTableSlot *slot,
+									  TupleTableSlot **outslot,
+									  TupleDesc targetTupDesc)
+{
+	int			i;
+	int			attnum;
+	int			natts = slot->tts_tupleDescriptor->natts;
+	bool		decompressed_any = false;
+	bool		slot_tup_deformed = false;
+	Oid			cmoid;
+	TupleDesc	tupleDesc = slot->tts_tupleDescriptor;
+
+	if (natts == 0)
+		return slot;
+
+	/*
+	 * Loop over all the attributes in the tuple and check if any attribute is
+	 * compressed and its compression method is not same as the target
+	 * atrribute's compression method then decompress it.
+	 */
+	for (i = 0; i < natts; i++)
+	{
+		attnum = tupleDesc->attrs[i].attnum;
+
+		if (TupleDescAttr(tupleDesc, i)->attlen == -1)
+		{
+			struct varlena *new_value;
+
+			/* fetch the values of all the attribute, if not already done */
+			if (!slot_tup_deformed)
+			{
+				slot_getallattrs(slot);
+				slot_tup_deformed = true;
+			}
+
+			/* nothing to be done, if the value is null */
+			if (slot->tts_isnull[attnum - 1])
+				continue;
+
+			new_value = (struct varlena *)
+				DatumGetPointer(slot->tts_values[attnum - 1]);
+
+			/*
+			 * Get the compression method Oid stored in the toast header and
+			 * compare it with the compression method of the target.
+			 */
+			cmoid = toast_get_compression_oid(new_value);
+			if (OidIsValid(cmoid) &&
+				targetTupDesc->attrs[i].attcompression != cmoid)
+			{
+				new_value = detoast_attr(new_value);
+				slot->tts_values[attnum - 1] = PointerGetDatum(new_value);
+				decompressed_any = true;
+			}
+		}
+	}
+
+	/*
+	 * If we have decompressed any of the fields then we need to copy the
+	 * values/null array from oldslot to the new slot and materialize the new
+	 * slot.
+	 */
+	if (decompressed_any)
+	{
+		TupleTableSlot *newslot = *outslot;
+
+		/*
+		 * If the called has passed an invalid slot then create a new slot.
+		 * Otherwise, just clear the existing tuple from the slot.  This slot
+		 * should be stored by the caller so that it can be reused for
+		 * decompressing the subsequent tuples.
+		 */
+		if (newslot == NULL)
+			newslot = MakeSingleTupleTableSlot(tupleDesc, slot->tts_ops);
+		else
+			ExecClearTuple(newslot);
+
+		/*
+		 * Copy the value/null array to the new slot and materialize it,
+		 * before clearing the tuple from the old slot.
+		 */
+		memcpy(newslot->tts_values, slot->tts_values, natts * sizeof(Datum));
+		memcpy(newslot->tts_isnull, slot->tts_isnull, natts * sizeof(bool));
+		ExecStoreVirtualTuple(newslot);
+		ExecMaterializeSlot(newslot);
+		ExecClearTuple(slot);
+
+		*outslot = newslot;
+
+		return newslot;
+	}
+
+	return slot;
+}
+
 /* ----------------------------------------------------------------
  *	   ExecModifyTable
  *
@@ -2244,6 +2353,15 @@ ExecModifyTable(PlanState *pstate)
 				slot = ExecFilterJunk(junkfilter, slot);
 		}
 
+		/*
+		 * Compare the compression method of the compressed attribute in the
+		 * source tuple with target attribute and if those are different then
+		 * decompress those attributes.
+		 */
+		slot = CompareCompressionMethodAndDecompress(slot,
+									&node->mt_decompress_tuple_slot,
+									resultRelInfo->ri_RelationDesc->rd_att);
+
 		switch (operation)
 		{
 			case CMD_INSERT:
@@ -2876,6 +2994,10 @@ ExecEndModifyTable(ModifyTableState *node)
 			ExecDropSingleTupleTableSlot(node->mt_root_tuple_slot);
 	}
 
+	/* release the slot used for decompressing the tuple */
+	if (node->mt_decompress_tuple_slot)
+		ExecDropSingleTupleTableSlot(node->mt_decompress_tuple_slot);
+
 	/*
 	 * Free the exprcontext
 	 */
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 65bbc18ecb..1338e04409 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -2966,6 +2966,7 @@ _copyColumnDef(const ColumnDef *from)
 
 	COPY_STRING_FIELD(colname);
 	COPY_NODE_FIELD(typeName);
+	COPY_STRING_FIELD(compression);
 	COPY_SCALAR_FIELD(inhcount);
 	COPY_SCALAR_FIELD(is_local);
 	COPY_SCALAR_FIELD(is_not_null);
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index c2d73626fc..f3592003da 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -2599,6 +2599,7 @@ _equalColumnDef(const ColumnDef *a, const ColumnDef *b)
 {
 	COMPARE_STRING_FIELD(colname);
 	COMPARE_NODE_FIELD(typeName);
+	COMPARE_STRING_FIELD(compression);
 	COMPARE_SCALAR_FIELD(inhcount);
 	COMPARE_SCALAR_FIELD(is_local);
 	COMPARE_SCALAR_FIELD(is_not_null);
diff --git a/src/backend/nodes/nodeFuncs.c b/src/backend/nodes/nodeFuncs.c
index 49357ac5c2..38226530c6 100644
--- a/src/backend/nodes/nodeFuncs.c
+++ b/src/backend/nodes/nodeFuncs.c
@@ -3897,6 +3897,8 @@ raw_expression_tree_walker(Node *node,
 
 				if (walker(coldef->typeName, context))
 					return true;
+				if (walker(coldef->compression, context))
+					return true;
 				if (walker(coldef->raw_default, context))
 					return true;
 				if (walker(coldef->collClause, context))
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index f5dcedf6e8..0605ef3f84 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -2863,6 +2863,7 @@ _outColumnDef(StringInfo str, const ColumnDef *node)
 
 	WRITE_STRING_FIELD(colname);
 	WRITE_NODE_FIELD(typeName);
+	WRITE_STRING_FIELD(compression);
 	WRITE_INT_FIELD(inhcount);
 	WRITE_BOOL_FIELD(is_local);
 	WRITE_BOOL_FIELD(is_not_null);
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index dd72a9fc3c..52d92df25d 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -596,6 +596,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <list>		hash_partbound
 %type <defelt>		hash_partbound_elem
 
+%type <str>	optColumnCompression
+
 /*
  * Non-keyword token types.  These are hard-wired into the "flex" lexer.
  * They must be listed first so that their numeric codes do not depend on
@@ -631,9 +633,9 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 	CACHE CALL CALLED CASCADE CASCADED CASE CAST CATALOG_P CHAIN CHAR_P
 	CHARACTER CHARACTERISTICS CHECK CHECKPOINT CLASS CLOSE
 	CLUSTER COALESCE COLLATE COLLATION COLUMN COLUMNS COMMENT COMMENTS COMMIT
-	COMMITTED CONCURRENTLY CONFIGURATION CONFLICT CONNECTION CONSTRAINT
-	CONSTRAINTS CONTENT_P CONTINUE_P CONVERSION_P COPY COST CREATE
-	CROSS CSV CUBE CURRENT_P
+	COMMITTED COMPRESSION CONCURRENTLY CONFIGURATION CONFLICT
+	CONNECTION CONSTRAINT CONSTRAINTS CONTENT_P CONTINUE_P CONVERSION_P COPY
+	COST CREATE CROSS CSV CUBE CURRENT_P
 	CURRENT_CATALOG CURRENT_DATE CURRENT_ROLE CURRENT_SCHEMA
 	CURRENT_TIME CURRENT_TIMESTAMP CURRENT_USER CURSOR CYCLE
 
@@ -3421,11 +3423,12 @@ TypedTableElement:
 			| TableConstraint					{ $$ = $1; }
 		;
 
-columnDef:	ColId Typename create_generic_options ColQualList
+columnDef:	ColId Typename optColumnCompression create_generic_options ColQualList
 				{
 					ColumnDef *n = makeNode(ColumnDef);
 					n->colname = $1;
 					n->typeName = $2;
+					n->compression = $3;
 					n->inhcount = 0;
 					n->is_local = true;
 					n->is_not_null = false;
@@ -3434,8 +3437,8 @@ columnDef:	ColId Typename create_generic_options ColQualList
 					n->raw_default = NULL;
 					n->cooked_default = NULL;
 					n->collOid = InvalidOid;
-					n->fdwoptions = $3;
-					SplitColQualList($4, &n->constraints, &n->collClause,
+					n->fdwoptions = $4;
+					SplitColQualList($5, &n->constraints, &n->collClause,
 									 yyscanner);
 					n->location = @1;
 					$$ = (Node *)n;
@@ -3480,6 +3483,14 @@ columnOptions:	ColId ColQualList
 				}
 		;
 
+optColumnCompression:
+					COMPRESSION name
+					{
+						$$ = $2;
+					}
+					| /*EMPTY*/	{ $$ = NULL; }
+				;
+
 ColQualList:
 			ColQualList ColConstraint				{ $$ = lappend($1, $2); }
 			| /*EMPTY*/								{ $$ = NIL; }
@@ -3710,6 +3721,7 @@ TableLikeOption:
 				| INDEXES			{ $$ = CREATE_TABLE_LIKE_INDEXES; }
 				| STATISTICS		{ $$ = CREATE_TABLE_LIKE_STATISTICS; }
 				| STORAGE			{ $$ = CREATE_TABLE_LIKE_STORAGE; }
+				| COMPRESSION		{ $$ = CREATE_TABLE_LIKE_COMPRESSION; }
 				| ALL				{ $$ = CREATE_TABLE_LIKE_ALL; }
 		;
 
@@ -15285,6 +15297,7 @@ unreserved_keyword:
 			| COMMENTS
 			| COMMIT
 			| COMMITTED
+			| COMPRESSION
 			| CONFIGURATION
 			| CONFLICT
 			| CONNECTION
@@ -15805,6 +15818,7 @@ bare_label_keyword:
 			| COMMENTS
 			| COMMIT
 			| COMMITTED
+			| COMPRESSION
 			| CONCURRENTLY
 			| CONFIGURATION
 			| CONFLICT
diff --git a/src/backend/parser/parse_utilcmd.c b/src/backend/parser/parse_utilcmd.c
index 75266caeb4..e262ec5bcb 100644
--- a/src/backend/parser/parse_utilcmd.c
+++ b/src/backend/parser/parse_utilcmd.c
@@ -27,6 +27,7 @@
 #include "postgres.h"
 
 #include "access/amapi.h"
+#include "access/compressamapi.h"
 #include "access/htup_details.h"
 #include "access/relation.h"
 #include "access/reloptions.h"
@@ -1082,6 +1083,13 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
 		else
 			def->storage = 0;
 
+		/* Likewise, copy compression if requested */
+		if ((table_like_clause->options & CREATE_TABLE_LIKE_COMPRESSION) != 0
+			&& OidIsValid(attribute->attcompression))
+			def->compression = get_am_name(attribute->attcompression);
+		else
+			def->compression = NULL;
+
 		/* Likewise, copy comment if requested */
 		if ((table_like_clause->options & CREATE_TABLE_LIKE_COMMENTS) &&
 			(comment = GetComment(attribute->attrelid,
diff --git a/src/backend/utils/adt/pseudotypes.c b/src/backend/utils/adt/pseudotypes.c
index c2f910d606..fe133c76c2 100644
--- a/src/backend/utils/adt/pseudotypes.c
+++ b/src/backend/utils/adt/pseudotypes.c
@@ -383,6 +383,7 @@ PSEUDOTYPE_DUMMY_IO_FUNCS(language_handler);
 PSEUDOTYPE_DUMMY_IO_FUNCS(fdw_handler);
 PSEUDOTYPE_DUMMY_IO_FUNCS(table_am_handler);
 PSEUDOTYPE_DUMMY_IO_FUNCS(index_am_handler);
+PSEUDOTYPE_DUMMY_IO_FUNCS(compression_am_handler);
 PSEUDOTYPE_DUMMY_IO_FUNCS(tsm_handler);
 PSEUDOTYPE_DUMMY_IO_FUNCS(internal);
 PSEUDOTYPE_DUMMY_IO_FUNCS(anyelement);
diff --git a/src/backend/utils/adt/varlena.c b/src/backend/utils/adt/varlena.c
index 479ed9ae54..a35abe5f1a 100644
--- a/src/backend/utils/adt/varlena.c
+++ b/src/backend/utils/adt/varlena.c
@@ -17,9 +17,11 @@
 #include <ctype.h>
 #include <limits.h>
 
+#include "access/compressamapi.h"
 #include "access/detoast.h"
 #include "catalog/pg_collation.h"
 #include "catalog/pg_type.h"
+#include "commands/defrem.h"
 #include "common/hashfn.h"
 #include "common/hex.h"
 #include "common/int.h"
@@ -5299,6 +5301,46 @@ pg_column_size(PG_FUNCTION_ARGS)
 	PG_RETURN_INT32(result);
 }
 
+/*
+ * Return the compression method stored in the compressed attribute.  Return
+ * NULL for non varlena type or the uncompressed data.
+ */
+Datum
+pg_column_compression(PG_FUNCTION_ARGS)
+{
+	Datum		value = PG_GETARG_DATUM(0);
+	int			typlen;
+	Oid			cmoid;
+
+	/* On first call, get the input type's typlen, and save at *fn_extra */
+	if (fcinfo->flinfo->fn_extra == NULL)
+	{
+		/* Lookup the datatype of the supplied argument */
+		Oid			argtypeid = get_fn_expr_argtype(fcinfo->flinfo, 0);
+
+		typlen = get_typlen(argtypeid);
+		if (typlen == 0)		/* should not happen */
+			elog(ERROR, "cache lookup failed for type %u", argtypeid);
+
+		fcinfo->flinfo->fn_extra = MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
+													  sizeof(int));
+		*((int *) fcinfo->flinfo->fn_extra) = typlen;
+	}
+	else
+		typlen = *((int *) fcinfo->flinfo->fn_extra);
+
+	if (typlen != -1)
+		PG_RETURN_NULL();
+
+	cmoid =
+		toast_get_compression_oid((struct varlena *) DatumGetPointer(value));
+
+	if (!OidIsValid(cmoid))
+		PG_RETURN_NULL();
+	else
+		PG_RETURN_TEXT_P(cstring_to_text(get_am_name(cmoid)));
+}
+
 /*
  * string_agg - Concatenates values and returns string.
  *
diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h
index eea9f30a79..7332f93a25 100644
--- a/src/bin/pg_dump/pg_backup.h
+++ b/src/bin/pg_dump/pg_backup.h
@@ -160,6 +160,7 @@ typedef struct _dumpOptions
 	int			no_subscriptions;
 	int			no_synchronized_snapshots;
 	int			no_unlogged_table_data;
+	int			no_compression_methods;
 	int			serializable_deferrable;
 	int			disable_triggers;
 	int			outputNoTablespaces;
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index eb988d7eb4..46044cb92a 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -387,6 +387,7 @@ main(int argc, char **argv)
 		{"no-synchronized-snapshots", no_argument, &dopt.no_synchronized_snapshots, 1},
 		{"no-unlogged-table-data", no_argument, &dopt.no_unlogged_table_data, 1},
 		{"no-subscriptions", no_argument, &dopt.no_subscriptions, 1},
+		{"no-compression-methods", no_argument, &dopt.no_compression_methods, 1},
 		{"no-sync", no_argument, NULL, 7},
 		{"on-conflict-do-nothing", no_argument, &dopt.do_nothing, 1},
 		{"rows-per-insert", required_argument, NULL, 10},
@@ -1047,6 +1048,7 @@ help(const char *progname)
 	printf(_("  --no-publications            do not dump publications\n"));
 	printf(_("  --no-security-labels         do not dump security label assignments\n"));
 	printf(_("  --no-subscriptions           do not dump subscriptions\n"));
+	printf(_("  --no-compression-methods     do not dump compression methods\n"));
 	printf(_("  --no-synchronized-snapshots  do not use synchronized snapshots in parallel jobs\n"));
 	printf(_("  --no-tablespaces             do not dump tablespace assignments\n"));
 	printf(_("  --no-unlogged-table-data     do not dump unlogged table data\n"));
@@ -8617,6 +8619,7 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables)
 {
 	DumpOptions *dopt = fout->dopt;
 	PQExpBuffer q = createPQExpBuffer();
+	bool		createWithCompression;
 
 	for (int i = 0; i < numTables; i++)
 	{
@@ -8702,6 +8705,15 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables)
 			appendPQExpBufferStr(q,
 								 "'' AS attidentity,\n");
 
+		createWithCompression = (fout->remoteVersion >= 140000);
+
+		if (createWithCompression)
+			appendPQExpBuffer(q,
+							  "am.amname AS attcmname,\n");
+		else
+			appendPQExpBuffer(q,
+							  "NULL AS attcmname,\n");
+
 		if (fout->remoteVersion >= 110000)
 			appendPQExpBufferStr(q,
 								 "CASE WHEN a.atthasmissing AND NOT a.attisdropped "
@@ -8720,7 +8732,12 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables)
 		/* need left join here to not fail on dropped columns ... */
 		appendPQExpBuffer(q,
 						  "FROM pg_catalog.pg_attribute a LEFT JOIN pg_catalog.pg_type t "
-						  "ON a.atttypid = t.oid\n"
+						  "ON a.atttypid = t.oid\n");
+
+		if (createWithCompression)
+			appendPQExpBuffer(q, "LEFT JOIN pg_catalog.pg_am am "
+							  "ON a.attcompression = am.oid\n");
+		appendPQExpBuffer(q,
 						  "WHERE a.attrelid = '%u'::pg_catalog.oid "
 						  "AND a.attnum > 0::pg_catalog.int2\n"
 						  "ORDER BY a.attnum",
@@ -8747,6 +8764,7 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables)
 		tbinfo->attcollation = (Oid *) pg_malloc(ntups * sizeof(Oid));
 		tbinfo->attfdwoptions = (char **) pg_malloc(ntups * sizeof(char *));
 		tbinfo->attmissingval = (char **) pg_malloc(ntups * sizeof(char *));
+		tbinfo->attcmnames = (char **) pg_malloc(ntups * sizeof(char *));
 		tbinfo->notnull = (bool *) pg_malloc(ntups * sizeof(bool));
 		tbinfo->inhNotNull = (bool *) pg_malloc(ntups * sizeof(bool));
 		tbinfo->attrdefs = (AttrDefInfo **) pg_malloc(ntups * sizeof(AttrDefInfo *));
@@ -8775,6 +8793,7 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables)
 			tbinfo->attcollation[j] = atooid(PQgetvalue(res, j, PQfnumber(res, "attcollation")));
 			tbinfo->attfdwoptions[j] = pg_strdup(PQgetvalue(res, j, PQfnumber(res, "attfdwoptions")));
 			tbinfo->attmissingval[j] = pg_strdup(PQgetvalue(res, j, PQfnumber(res, "attmissingval")));
+			tbinfo->attcmnames[j] = pg_strdup(PQgetvalue(res, j, PQfnumber(res, "attcmname")));
 			tbinfo->attrdefs[j] = NULL; /* fix below */
 			if (PQgetvalue(res, j, PQfnumber(res, "atthasdef"))[0] == 't')
 				hasdefaults = true;
@@ -15832,6 +15851,7 @@ dumpTableSchema(Archive *fout, const TableInfo *tbinfo)
 				{
 					bool		print_default;
 					bool		print_notnull;
+					bool		has_non_default_compression;
 
 					/*
 					 * Default value --- suppress if to be printed separately.
@@ -15856,6 +15876,9 @@ dumpTableSchema(Archive *fout, const TableInfo *tbinfo)
 						!dopt->binary_upgrade)
 						continue;
 
+					has_non_default_compression = (tbinfo->attcmnames[j] &&
+												   (strcmp(tbinfo->attcmnames[j], "pglz") != 0));
+
 					/* Format properly if not first attr */
 					if (actual_atts == 0)
 						appendPQExpBufferStr(q, " (");
@@ -15891,6 +15914,17 @@ dumpTableSchema(Archive *fout, const TableInfo *tbinfo)
 										  tbinfo->atttypnames[j]);
 					}
 
+					/*
+					 * Attribute compression
+					 */
+					if (!dopt->no_compression_methods &&
+						tbinfo->attcmnames[j] && strlen(tbinfo->attcmnames[j]) &&
+						(has_non_default_compression || dopt->binary_upgrade))
+					{
+						appendPQExpBuffer(q, " COMPRESSION %s",
+										  tbinfo->attcmnames[j]);
+					}
+
 					if (print_default)
 					{
 						if (tbinfo->attgenerated[j] == ATTRIBUTE_GENERATED_STORED)
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 0a2213fb06..1789e18f46 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -326,6 +326,7 @@ typedef struct _tableInfo
 	char	   *partbound;		/* partition bound definition */
 	bool		needs_override; /* has GENERATED ALWAYS AS IDENTITY */
 	char	   *amname;			/* relation access method */
+	char	  **attcmnames;		/* per-attribute current compression method */
 
 	/*
 	 * Stuff computed only for dumpable tables.
diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl
index 737e46464a..97791f8dbe 100644
--- a/src/bin/pg_dump/t/002_pg_dump.pl
+++ b/src/bin/pg_dump/t/002_pg_dump.pl
@@ -2284,9 +2284,9 @@ my %tests = (
 		regexp => qr/^
 			\QCREATE TABLE dump_test.test_table (\E\n
 			\s+\Qcol1 integer NOT NULL,\E\n
-			\s+\Qcol2 text,\E\n
-			\s+\Qcol3 text,\E\n
-			\s+\Qcol4 text,\E\n
+			\s+\Qcol2 text\E\D*,\n
+			\s+\Qcol3 text\E\D*,\n
+			\s+\Qcol4 text\E\D*,\n
 			\s+\QCONSTRAINT test_table_col1_check CHECK ((col1 <= 1000))\E\n
 			\Q)\E\n
 			\QWITH (autovacuum_enabled='false', fillfactor='80');\E\n/xm,
@@ -2326,7 +2326,7 @@ my %tests = (
 		regexp => qr/^
 			\QCREATE TABLE dump_test.test_second_table (\E
 			\n\s+\Qcol1 integer,\E
-			\n\s+\Qcol2 text\E
+			\n\s+\Qcol2 text\E\D*
 			\n\);
 			/xm,
 		like =>
@@ -2441,7 +2441,7 @@ my %tests = (
 			\n\s+\Qcol1 integer,\E
 			\n\s+\Qcol2 boolean,\E
 			\n\s+\Qcol3 boolean,\E
-			\n\s+\Qcol4 bit(5),\E
+			\n\s+\Qcol4 bit(5)\E\D*,
 			\n\s+\Qcol5 double precision\E
 			\n\);
 			/xm,
@@ -2459,7 +2459,7 @@ my %tests = (
 		regexp => qr/^
 			\QCREATE TABLE dump_test.test_table_identity (\E\n
 			\s+\Qcol1 integer NOT NULL,\E\n
-			\s+\Qcol2 text\E\n
+			\s+\Qcol2 text\E\D*\n
 			\);
 			.*
 			\QALTER TABLE dump_test.test_table_identity ALTER COLUMN col1 ADD GENERATED ALWAYS AS IDENTITY (\E\n
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 20af5a92b4..ba464d463e 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -170,10 +170,12 @@ describeAccessMethods(const char *pattern, bool verbose)
 					  "  CASE amtype"
 					  " WHEN 'i' THEN '%s'"
 					  " WHEN 't' THEN '%s'"
+					  " WHEN 'c' THEN '%s'"
 					  " END AS \"%s\"",
 					  gettext_noop("Name"),
 					  gettext_noop("Index"),
 					  gettext_noop("Table"),
+					  gettext_noop("Compression"),
 					  gettext_noop("Type"));
 
 	if (verbose)
@@ -1459,7 +1461,7 @@ describeOneTableDetails(const char *schemaname,
 	bool		printTableInitialized = false;
 	int			i;
 	char	   *view_def = NULL;
-	char	   *headers[11];
+	char	   *headers[12];
 	PQExpBufferData title;
 	PQExpBufferData tmpbuf;
 	int			cols;
@@ -1475,7 +1477,8 @@ describeOneTableDetails(const char *schemaname,
 				fdwopts_col = -1,
 				attstorage_col = -1,
 				attstattarget_col = -1,
-				attdescr_col = -1;
+				attdescr_col = -1,
+				attcompression_col = -1;
 	int			numrows;
 	struct
 	{
@@ -1892,6 +1895,20 @@ describeOneTableDetails(const char *schemaname,
 		appendPQExpBufferStr(&buf, ",\n  a.attstorage");
 		attstorage_col = cols++;
 
+		/* compresssion info */
+		if (pset.sversion >= 140000 &&
+			(tableinfo.relkind == RELKIND_RELATION ||
+			 tableinfo.relkind == RELKIND_PARTITIONED_TABLE ||
+			 tableinfo.relkind == RELKIND_MATVIEW))
+		{
+			appendPQExpBufferStr(&buf, ",\n  CASE WHEN attcompression = 0 THEN NULL ELSE "
+								 " (SELECT am.amname "
+								 "  FROM pg_catalog.pg_am am "
+								 "  WHERE am.oid = a.attcompression) "
+								 " END AS attcompression");
+			attcompression_col = cols++;
+		}
+
 		/* stats target, if relevant to relkind */
 		if (tableinfo.relkind == RELKIND_RELATION ||
 			tableinfo.relkind == RELKIND_INDEX ||
@@ -2018,6 +2035,8 @@ describeOneTableDetails(const char *schemaname,
 		headers[cols++] = gettext_noop("FDW options");
 	if (attstorage_col >= 0)
 		headers[cols++] = gettext_noop("Storage");
+	if (attcompression_col >= 0)
+		headers[cols++] = gettext_noop("Compression");
 	if (attstattarget_col >= 0)
 		headers[cols++] = gettext_noop("Stats target");
 	if (attdescr_col >= 0)
@@ -2097,6 +2116,11 @@ describeOneTableDetails(const char *schemaname,
 							  false, false);
 		}
 
+		/* Column compression */
+		if (attcompression_col >= 0)
+			printTableAddCell(&cont, PQgetvalue(res, i, attcompression_col),
+							  false, false);
+
 		/* Statistics target, if the relkind supports this feature */
 		if (attstattarget_col >= 0)
 			printTableAddCell(&cont, PQgetvalue(res, i, attstattarget_col),
diff --git a/src/include/access/compressamapi.h b/src/include/access/compressamapi.h
new file mode 100644
index 0000000000..5a8e23d926
--- /dev/null
+++ b/src/include/access/compressamapi.h
@@ -0,0 +1,99 @@
+/*-------------------------------------------------------------------------
+ *
+ * compressamapi.h
+ *	  API for Postgres compression methods.
+ *
+ * Portions Copyright (c) 2021, PostgreSQL Global Development Group
+ *
+ * src/include/access/compressamapi.h
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef COMPRESSAMAPI_H
+#define COMPRESSAMAPI_H
+
+#include "postgres.h"
+
+#include "catalog/pg_am_d.h"
+#include "nodes/nodes.h"
+
+/*
+ * Built-in compression method-id.  The toast compression header will store
+ * this in the first 2 bits of the raw length.  These built-in compression
+ * method-id are directly mapped to the built-in compression method oid.
+ */
+typedef enum CompressionId
+{
+	PGLZ_COMPRESSION_ID = 0,
+	LZ4_COMPRESSION_ID = 1
+} CompressionId;
+
+/* Use default compression method if it is not specified. */
+#define DefaultCompressionOid	PGLZ_COMPRESSION_AM_OID
+#define IsStorageCompressible(storage) ((storage) != TYPSTORAGE_PLAIN && \
+										(storage) != TYPSTORAGE_EXTERNAL)
+/* compression handler routines */
+typedef struct varlena *(*cmcompress_function) (const struct varlena *value);
+typedef struct varlena *(*cmdecompress_function) (const struct varlena *value);
+typedef struct varlena *(*cmdecompress_slice_function)
+			(const struct varlena *value, int32 slicelength);
+
+/*
+ * API struct for a compression AM.
+ *
+ * 'datum_compress' - varlena compression function.
+ * 'datum_decompress' - varlena decompression function.
+ * 'datum_decompress_slice' - varlena slice decompression functions.
+ */
+typedef struct CompressionAmRoutine
+{
+	NodeTag		type;
+
+	cmcompress_function datum_compress;
+	cmdecompress_function datum_decompress;
+	cmdecompress_slice_function datum_decompress_slice;
+} CompressionAmRoutine;
+
+extern const CompressionAmRoutine pglz_compress_methods;
+extern const CompressionAmRoutine lz4_compress_methods;
+
+/*
+ * CompressionOidToId - Convert compression Oid to built-in compression id.
+ *
+ * For more details refer comment atop CompressionId in compressamapi.h
+ */
+static inline CompressionId
+CompressionOidToId(Oid cmoid)
+{
+	switch (cmoid)
+	{
+		case PGLZ_COMPRESSION_AM_OID:
+			return PGLZ_COMPRESSION_ID;
+		case LZ4_COMPRESSION_AM_OID:
+			return LZ4_COMPRESSION_ID;
+		default:
+			elog(ERROR, "invalid compression method oid %u", cmoid);
+	}
+}
+
+/*
+ * CompressionIdToOid - Convert built-in compression id to Oid
+ *
+ * For more details refer comment atop CompressionId in compressamapi.h
+ */
+static inline Oid
+CompressionIdToOid(CompressionId cmid)
+{
+	switch (cmid)
+	{
+		case PGLZ_COMPRESSION_ID:
+			return PGLZ_COMPRESSION_AM_OID;
+		case LZ4_COMPRESSION_ID:
+			return LZ4_COMPRESSION_AM_OID;
+		default:
+			elog(ERROR, "invalid compression method id %d", cmid);
+	}
+}
+
+#endif							/* COMPRESSAMAPI_H */
diff --git a/src/include/access/detoast.h b/src/include/access/detoast.h
index 0adf53c77b..6cdc37542d 100644
--- a/src/include/access/detoast.h
+++ b/src/include/access/detoast.h
@@ -89,4 +89,12 @@ extern Size toast_raw_datum_size(Datum value);
  */
 extern Size toast_datum_size(Datum value);
 
+/* ----------
+ * toast_get_compression_oid -
+ *
+ *	Return the compression method oid from the compressed value
+ * ----------
+ */
+extern Oid toast_get_compression_oid(struct varlena *attr);
+
 #endif							/* DETOAST_H */
diff --git a/src/include/access/toast_helper.h b/src/include/access/toast_helper.h
index a9a6d644bc..dca0bc37f3 100644
--- a/src/include/access/toast_helper.h
+++ b/src/include/access/toast_helper.h
@@ -32,6 +32,7 @@ typedef struct
 	struct varlena *tai_oldexternal;
 	int32		tai_size;
 	uint8		tai_colflags;
+	Oid			tai_compression;
 } ToastAttrInfo;
 
 /*
diff --git a/src/include/access/toast_internals.h b/src/include/access/toast_internals.h
index cedfb890d8..31ff91a09c 100644
--- a/src/include/access/toast_internals.h
+++ b/src/include/access/toast_internals.h
@@ -12,6 +12,7 @@
 #ifndef TOAST_INTERNALS_H
 #define TOAST_INTERNALS_H
 
+#include "access/compressamapi.h"
 #include "storage/lockdefs.h"
 #include "utils/relcache.h"
 #include "utils/snapshot.h"
@@ -22,22 +23,22 @@
 typedef struct toast_compress_header
 {
 	int32		vl_len_;		/* varlena header (do not touch directly!) */
-	int32		rawsize;
+	uint32		info;			/* 2 bits for compression method and 30 bits
+								 * rawsize */
 } toast_compress_header;
 
 /*
  * Utilities for manipulation of header information for compressed
  * toast entries.
  */
-#define TOAST_COMPRESS_HDRSZ		((int32) sizeof(toast_compress_header))
-#define TOAST_COMPRESS_RAWSIZE(ptr) (((toast_compress_header *) (ptr))->rawsize)
-#define TOAST_COMPRESS_SIZE(ptr)	((int32) VARSIZE_ANY(ptr) - TOAST_COMPRESS_HDRSZ)
-#define TOAST_COMPRESS_RAWDATA(ptr) \
-	(((char *) (ptr)) + TOAST_COMPRESS_HDRSZ)
-#define TOAST_COMPRESS_SET_RAWSIZE(ptr, len) \
-	(((toast_compress_header *) (ptr))->rawsize = (len))
+#define TOAST_COMPRESS_METHOD(ptr)  (((toast_compress_header *) (ptr))->info >> VARLENA_RAWSIZE_BITS)
+#define TOAST_COMPRESS_SET_SIZE_AND_METHOD(ptr, len, cm_method) \
+	do { \
+		Assert((len) > 0 && (len) <= VARLENA_RAWSIZE_MASK); \
+		((toast_compress_header *) (ptr))->info = ((len) | (cm_method) << VARLENA_RAWSIZE_BITS); \
+	} while (0)
 
-extern Datum toast_compress_datum(Datum value);
+extern Datum toast_compress_datum(Datum value, Oid cmoid);
 extern Oid	toast_get_valid_index(Oid toastoid, LOCKMODE lock);
 
 extern void toast_delete_datum(Relation rel, Datum value, bool is_speculative);
diff --git a/src/include/catalog/pg_am.dat b/src/include/catalog/pg_am.dat
index 6082f0e6a8..605684408c 100644
--- a/src/include/catalog/pg_am.dat
+++ b/src/include/catalog/pg_am.dat
@@ -33,5 +33,11 @@
 { oid => '3580', oid_symbol => 'BRIN_AM_OID',
   descr => 'block range index (BRIN) access method',
   amname => 'brin', amhandler => 'brinhandler', amtype => 'i' },
+{ oid => '4572', oid_symbol => 'PGLZ_COMPRESSION_AM_OID',
+  descr => 'pglz compression access method',
+  amname => 'pglz', amhandler => 'pglzhandler', amtype => 'c' },
+{ oid => '4573', oid_symbol => 'LZ4_COMPRESSION_AM_OID',
+  descr => 'lz4 compression access method',
+  amname => 'lz4', amhandler => 'lz4handler', amtype => 'c' },
 
 ]
diff --git a/src/include/catalog/pg_am.h b/src/include/catalog/pg_am.h
index ced86faef8..65079f3821 100644
--- a/src/include/catalog/pg_am.h
+++ b/src/include/catalog/pg_am.h
@@ -59,6 +59,7 @@ DECLARE_UNIQUE_INDEX_PKEY(pg_am_oid_index, 2652, on pg_am using btree(oid oid_op
  */
 #define AMTYPE_INDEX					'i' /* index access method */
 #define AMTYPE_TABLE					't' /* table access method */
+#define AMTYPE_COMPRESSION				'c'	/* compression access method */
 
 #endif							/* EXPOSE_TO_CLIENT_CODE */
 
diff --git a/src/include/catalog/pg_attribute.h b/src/include/catalog/pg_attribute.h
index 3db42abf08..797e78b17c 100644
--- a/src/include/catalog/pg_attribute.h
+++ b/src/include/catalog/pg_attribute.h
@@ -160,6 +160,12 @@ CATALOG(pg_attribute,1249,AttributeRelationId) BKI_BOOTSTRAP BKI_ROWTYPE_OID(75,
 	/* attribute's collation, if any */
 	Oid			attcollation BKI_LOOKUP_OPT(pg_collation);
 
+	/*
+	 * OID of compression AM.  Must be InvalidOid if and only if typstorage is
+	 * 'plain' or 'external'.
+	 */
+	Oid			attcompression BKI_DEFAULT(0);
+
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* NOTE: The following fields are not present in tuple descriptors. */
 
@@ -187,7 +193,7 @@ CATALOG(pg_attribute,1249,AttributeRelationId) BKI_BOOTSTRAP BKI_ROWTYPE_OID(75,
  * can access fields beyond attcollation except in a real tuple!
  */
 #define ATTRIBUTE_FIXED_PART_SIZE \
-	(offsetof(FormData_pg_attribute,attcollation) + sizeof(Oid))
+	(offsetof(FormData_pg_attribute,attcompression) + sizeof(Oid))
 
 /* ----------------
  *		Form_pg_attribute corresponds to a pointer to a tuple with
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 1487710d59..4b3d34ae1a 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -941,6 +941,15 @@
   prorettype => 'void', proargtypes => 'regclass int8',
   prosrc => 'brin_desummarize_range' },
 
+{ oid => '6015', descr => 'pglz compression access method handler',
+  proname => 'pglzhandler', provolatile => 'v',
+  prorettype => 'compression_am_handler', proargtypes => 'internal',
+  prosrc => 'pglzhandler' },
+{ oid => '6016', descr => 'lz4 compression access method handler',
+  proname => 'lz4handler', provolatile => 'v',
+  prorettype => 'compression_am_handler', proargtypes => 'internal',
+  prosrc => 'lz4handler' },
+
 { oid => '338', descr => 'validate an operator class',
   proname => 'amvalidate', provolatile => 'v', prorettype => 'bool',
   proargtypes => 'oid', prosrc => 'amvalidate' },
@@ -7095,6 +7104,10 @@
   descr => 'bytes required to store the value, perhaps with compression',
   proname => 'pg_column_size', provolatile => 's', prorettype => 'int4',
   proargtypes => 'any', prosrc => 'pg_column_size' },
+{ oid => '2121',
+  descr => 'compression method for the compressed datum',
+  proname => 'pg_column_compression', provolatile => 's', prorettype => 'text',
+  proargtypes => 'any', prosrc => 'pg_column_compression' },
 { oid => '2322',
   descr => 'total disk space usage for the specified tablespace',
   proname => 'pg_tablespace_size', provolatile => 'v', prorettype => 'int8',
@@ -7265,6 +7278,13 @@
 { oid => '268', descr => 'I/O',
   proname => 'table_am_handler_out', prorettype => 'cstring',
   proargtypes => 'table_am_handler', prosrc => 'table_am_handler_out' },
+{ oid => '560', descr => 'I/O',
+  proname => 'compression_am_handler_in', proisstrict => 'f',
+  prorettype => 'compression_am_handler', proargtypes => 'cstring',
+  prosrc => 'compression_am_handler_in' },
+{ oid => '561', descr => 'I/O',
+  proname => 'compression_am_handler_out', prorettype => 'cstring',
+  proargtypes => 'compression_am_handler', prosrc => 'compression_am_handler_out' },
 { oid => '5086', descr => 'I/O',
   proname => 'anycompatible_in', prorettype => 'anycompatible',
   proargtypes => 'cstring', prosrc => 'anycompatible_in' },
diff --git a/src/include/catalog/pg_type.dat b/src/include/catalog/pg_type.dat
index 8959c2f53b..306aabf6c1 100644
--- a/src/include/catalog/pg_type.dat
+++ b/src/include/catalog/pg_type.dat
@@ -626,6 +626,11 @@
   typcategory => 'P', typinput => 'index_am_handler_in',
   typoutput => 'index_am_handler_out', typreceive => '-', typsend => '-',
   typalign => 'i' },
+ { oid => '5559',
+  typname => 'compression_am_handler', typlen => '4', typbyval => 't', typtype => 'p',
+  typcategory => 'P', typinput => 'compression_am_handler_in',
+  typoutput => 'compression_am_handler_out', typreceive => '-', typsend => '-',
+  typalign => 'i' },
 { oid => '3310',
   descr => 'pseudo-type for the result of a tablesample method function',
   typname => 'tsm_handler', typlen => '4', typbyval => 't', typtype => 'p',
diff --git a/src/include/commands/defrem.h b/src/include/commands/defrem.h
index 1a79540c94..e5aea8a240 100644
--- a/src/include/commands/defrem.h
+++ b/src/include/commands/defrem.h
@@ -139,6 +139,7 @@ extern Datum transformGenericOptions(Oid catalogId,
 extern ObjectAddress CreateAccessMethod(CreateAmStmt *stmt);
 extern Oid	get_index_am_oid(const char *amname, bool missing_ok);
 extern Oid	get_table_am_oid(const char *amname, bool missing_ok);
+extern Oid	get_compression_am_oid(const char *amname, bool missing_ok);
 extern Oid	get_am_oid(const char *amname, bool missing_ok);
 extern char *get_am_name(Oid amOid);
 
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 071e363d54..6495162a33 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -621,5 +621,7 @@ extern void CheckCmdReplicaIdentity(Relation rel, CmdType cmd);
 
 extern void CheckSubscriptionRelkind(char relkind, const char *nspname,
 									 const char *relname);
-
+extern TupleTableSlot *CompareCompressionMethodAndDecompress(TupleTableSlot *slot,
+												  TupleTableSlot **outslot,
+												  TupleDesc targetTupDesc);
 #endif							/* EXECUTOR_H  */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 943931f65d..ac3884b3a8 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1185,6 +1185,12 @@ typedef struct ModifyTableState
 	 */
 	TupleTableSlot *mt_root_tuple_slot;
 
+	/*
+	 * Slot for storing the modified tuple, incase the target attribute's
+	 * compression method doesn't match with the source table.
+	 */
+	TupleTableSlot *mt_decompress_tuple_slot;
+
 	/* Tuple-routing support info */
 	struct PartitionTupleRouting *mt_partition_tuple_routing;
 
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 40ae489c23..20d6f96f62 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -512,6 +512,7 @@ typedef enum NodeTag
 	T_IndexAmRoutine,			/* in access/amapi.h */
 	T_TableAmRoutine,			/* in access/tableam.h */
 	T_TsmRoutine,				/* in access/tsmapi.h */
+	T_CompressionAmRoutine,		/* in access/compressamapi.h */
 	T_ForeignKeyCacheInfo,		/* in utils/rel.h */
 	T_CallContext,				/* in nodes/parsenodes.h */
 	T_SupportRequestSimplify,	/* in nodes/supportnodes.h */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 236832a2ca..19d2ba26bf 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -646,6 +646,7 @@ typedef struct ColumnDef
 	NodeTag		type;
 	char	   *colname;		/* name of column */
 	TypeName   *typeName;		/* type of column */
+	char	   *compression;	/* compression method for column */
 	int			inhcount;		/* number of times column is inherited */
 	bool		is_local;		/* column has local (non-inherited) def'n */
 	bool		is_not_null;	/* NOT NULL constraint specified? */
@@ -685,6 +686,7 @@ typedef enum TableLikeOption
 	CREATE_TABLE_LIKE_INDEXES = 1 << 5,
 	CREATE_TABLE_LIKE_STATISTICS = 1 << 6,
 	CREATE_TABLE_LIKE_STORAGE = 1 << 7,
+	CREATE_TABLE_LIKE_COMPRESSION = 1 << 8,
 	CREATE_TABLE_LIKE_ALL = PG_INT32_MAX
 } TableLikeOption;
 
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index 28083aaac9..ca1f950cbe 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -88,6 +88,7 @@ PG_KEYWORD("comment", COMMENT, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("comments", COMMENTS, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("commit", COMMIT, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("committed", COMMITTED, UNRESERVED_KEYWORD, BARE_LABEL)
+PG_KEYWORD("compression", COMPRESSION, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("concurrently", CONCURRENTLY, TYPE_FUNC_NAME_KEYWORD, BARE_LABEL)
 PG_KEYWORD("configuration", CONFIGURATION, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("conflict", CONFLICT, UNRESERVED_KEYWORD, BARE_LABEL)
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index 55cab4d2bf..53d378b67d 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -346,6 +346,9 @@
 /* Define to 1 if you have the `z' library (-lz). */
 #undef HAVE_LIBZ
 
+/* Define to 1 if you have the `lz4' library (-llz4). */
+#undef HAVE_LIBLZ4
+
 /* Define to 1 if you have the `link' function. */
 #undef HAVE_LINK
 
diff --git a/src/include/postgres.h b/src/include/postgres.h
index 2ed572004d..667927fd7c 100644
--- a/src/include/postgres.h
+++ b/src/include/postgres.h
@@ -145,7 +145,8 @@ typedef union
 	struct						/* Compressed-in-line format */
 	{
 		uint32		va_header;
-		uint32		va_rawsize; /* Original data size (excludes header) */
+		uint32		va_info;	/* Original data size (excludes header) and
+								 * flags */
 		char		va_data[FLEXIBLE_ARRAY_MEMBER]; /* Compressed data */
 	}			va_compressed;
 } varattrib_4b;
@@ -274,14 +275,21 @@ typedef struct
 	(VARSIZE(PTR) - VARHDRSZ + VARHDRSZ_SHORT)
 
 #define VARHDRSZ_EXTERNAL		offsetof(varattrib_1b_e, va_data)
+#define VARHDRSZ_COMPRESS		offsetof(varattrib_4b, va_compressed.va_data)
 
 #define VARDATA_4B(PTR)		(((varattrib_4b *) (PTR))->va_4byte.va_data)
 #define VARDATA_4B_C(PTR)	(((varattrib_4b *) (PTR))->va_compressed.va_data)
 #define VARDATA_1B(PTR)		(((varattrib_1b *) (PTR))->va_data)
 #define VARDATA_1B_E(PTR)	(((varattrib_1b_e *) (PTR))->va_data)
 
+#define VARLENA_RAWSIZE_BITS	30
+#define VARLENA_RAWSIZE_MASK	((1U << VARLENA_RAWSIZE_BITS) - 1)
+
+/* va_info in va_compress contains raw size of datum and optional flags */
 #define VARRAWSIZE_4B_C(PTR) \
-	(((varattrib_4b *) (PTR))->va_compressed.va_rawsize)
+	(((varattrib_4b *) (PTR))->va_compressed.va_info & VARLENA_RAWSIZE_MASK)
+#define VARFLAGS_4B_C(PTR) \
+	(((varattrib_4b *) (PTR))->va_compressed.va_info >> VARLENA_RAWSIZE_BITS)
 
 /* Externally visible macros */
 
diff --git a/src/test/regress/expected/compression.out b/src/test/regress/expected/compression.out
new file mode 100644
index 0000000000..167878e78b
--- /dev/null
+++ b/src/test/regress/expected/compression.out
@@ -0,0 +1,201 @@
+\set HIDE_COMPRESSAM off
+-- test creating table with compression method
+CREATE TABLE cmdata(f1 text COMPRESSION pglz);
+CREATE INDEX idx ON cmdata(f1);
+INSERT INTO cmdata VALUES(repeat('1234567890',1000));
+\d+ cmdata
+                                 Table "public.cmdata"
+ Column | Type | Collation | Nullable | Default | Storage  | Stats target | Description 
+--------+------+-----------+----------+---------+----------+--------------+-------------
+ f1     | text |           |          |         | extended |              | 
+Indexes:
+    "idx" btree (f1)
+
+CREATE TABLE cmdata1(f1 TEXT COMPRESSION lz4);
+INSERT INTO cmdata1 VALUES(repeat('1234567890',1004));
+\d+ cmdata1
+                                 Table "public.cmdata1"
+ Column | Type | Collation | Nullable | Default | Storage  | Stats target | Description 
+--------+------+-----------+----------+---------+----------+--------------+-------------
+ f1     | text |           |          |         | extended |              | 
+
+-- try setting compression for incompressible data type
+CREATE TABLE cmdata2 (f1 int COMPRESSION pglz);
+ERROR:  column data type integer does not support compression
+-- verify stored compression method
+SELECT pg_column_compression(f1) FROM cmdata;
+ pg_column_compression 
+-----------------------
+ pglz
+(1 row)
+
+SELECT pg_column_compression(f1) FROM cmdata1;
+ pg_column_compression 
+-----------------------
+ lz4
+(1 row)
+
+-- decompress data slice
+SELECT SUBSTR(f1, 200, 5) FROM cmdata;
+ substr 
+--------
+ 01234
+(1 row)
+
+SELECT SUBSTR(f1, 2000, 50) FROM cmdata1;
+                       substr                       
+----------------------------------------------------
+ 01234567890123456789012345678901234567890123456789
+(1 row)
+
+-- copy with table creation
+SELECT * INTO cmmove1 FROM cmdata;
+SELECT pg_column_compression(f1) FROM cmmove1;
+ pg_column_compression 
+-----------------------
+ pglz
+(1 row)
+
+-- update using datum from different table
+CREATE TABLE cmmove2(f1 text COMPRESSION pglz);
+INSERT INTO cmmove2 VALUES (repeat('1234567890',1004));
+SELECT pg_column_compression(f1) FROM cmmove2;
+ pg_column_compression 
+-----------------------
+ pglz
+(1 row)
+
+UPDATE cmmove2 SET f1 = cmdata.f1 FROM cmdata;
+SELECT pg_column_compression(f1) FROM cmmove2;
+ pg_column_compression 
+-----------------------
+ pglz
+(1 row)
+
+UPDATE cmmove2 SET f1 = cmdata1.f1 FROM cmdata1;
+SELECT pg_column_compression(f1) FROM cmmove2;
+ pg_column_compression 
+-----------------------
+ pglz
+(1 row)
+
+-- copy to existing table
+CREATE TABLE cmmove3(f1 text COMPRESSION pglz);
+INSERT INTO cmmove3 SELECT * FROM cmdata;
+INSERT INTO cmmove3 SELECT * FROM cmdata1;
+SELECT pg_column_compression(f1) FROM cmmove2;
+ pg_column_compression 
+-----------------------
+ pglz
+(1 row)
+
+-- test external compressed data
+CREATE OR REPLACE FUNCTION large_val() RETURNS TEXT LANGUAGE SQL AS
+'select array_agg(md5(g::text))::text from generate_series(1, 256) g';
+CREATE TABLE cmdata2 (f1 text COMPRESSION pglz);
+INSERT INTO cmdata2 select large_val() || repeat('a', 4000);
+SELECT pg_column_compression(f1) FROM cmdata2;
+ pg_column_compression 
+-----------------------
+ pglz
+(1 row)
+
+INSERT INTO cmdata1 SELECT * FROM cmdata2;
+SELECT pg_column_compression(f1) FROM cmdata1;
+ pg_column_compression 
+-----------------------
+ lz4
+ lz4
+(2 rows)
+
+DROP TABLE cmdata2;
+-- test LIKE INCLUDING COMPRESSION
+CREATE TABLE cmdata2 (LIKE cmdata1 INCLUDING COMPRESSION);
+\d+ cmdata2
+                                 Table "public.cmdata2"
+ Column | Type | Collation | Nullable | Default | Storage  | Stats target | Description 
+--------+------+-----------+----------+---------+----------+--------------+-------------
+ f1     | text |           |          |         | extended |              | 
+
+-- test compression with materialized view
+CREATE MATERIALIZED VIEW mv(x) AS SELECT * FROM cmdata1;
+\d+ mv
+                             Materialized view "public.mv"
+ Column | Type | Collation | Nullable | Default | Storage  | Stats target | Description 
+--------+------+-----------+----------+---------+----------+--------------+-------------
+ x      | text |           |          |         | extended |              | 
+View definition:
+ SELECT cmdata1.f1 AS x
+   FROM cmdata1;
+
+SELECT pg_column_compression(f1) FROM cmdata1;
+ pg_column_compression 
+-----------------------
+ lz4
+ lz4
+(2 rows)
+
+SELECT pg_column_compression(x) FROM mv;
+ pg_column_compression 
+-----------------------
+ pglz
+ pglz
+(2 rows)
+
+-- test compression with partition
+CREATE TABLE cmpart(f1 text COMPRESSION lz4) PARTITION BY HASH(f1);
+CREATE TABLE cmpart1 PARTITION OF cmpart FOR VALUES WITH (MODULUS 2, REMAINDER 0);
+CREATE TABLE cmpart2(f1 text COMPRESSION pglz);
+ALTER TABLE cmpart ATTACH PARTITION cmpart2 FOR VALUES WITH (MODULUS 2, REMAINDER 1);
+INSERT INTO cmpart VALUES (repeat('123456789',1004));
+INSERT INTO cmpart VALUES (repeat('123456789',4004));
+SELECT pg_column_compression(f1) FROM cmpart;
+ pg_column_compression 
+-----------------------
+ lz4
+ pglz
+(2 rows)
+
+-- test compression with inheritence, error
+CREATE TABLE cminh() INHERITS(cmdata, cmdata1);
+NOTICE:  merging multiple inherited definitions of column "f1"
+ERROR:  column "f1" has a compression method conflict
+DETAIL:  pglz versus lz4
+CREATE TABLE cminh(f1 TEXT COMPRESSION lz4) INHERITS(cmdata);
+NOTICE:  merging column "f1" with inherited definition
+ERROR:  column "f1" has a compression method conflict
+DETAIL:  pglz versus lz4
+-- check data is ok
+SELECT length(f1) FROM cmdata;
+ length 
+--------
+  10000
+(1 row)
+
+SELECT length(f1) FROM cmdata1;
+ length 
+--------
+  10040
+  12449
+(2 rows)
+
+SELECT length(f1) FROM cmmove1;
+ length 
+--------
+  10000
+(1 row)
+
+SELECT length(f1) FROM cmmove2;
+ length 
+--------
+  10040
+(1 row)
+
+SELECT length(f1) FROM cmmove3;
+ length 
+--------
+  10000
+  10040
+(2 rows)
+
+\set HIDE_COMPRESSAM on
diff --git a/src/test/regress/expected/compression_1.out b/src/test/regress/expected/compression_1.out
new file mode 100644
index 0000000000..329e7881b0
--- /dev/null
+++ b/src/test/regress/expected/compression_1.out
@@ -0,0 +1,189 @@
+\set HIDE_COMPRESSAM off
+-- test creating table with compression method
+CREATE TABLE cmdata(f1 text COMPRESSION pglz);
+CREATE INDEX idx ON cmdata(f1);
+INSERT INTO cmdata VALUES(repeat('1234567890',1000));
+\d+ cmdata
+                                 Table "public.cmdata"
+ Column | Type | Collation | Nullable | Default | Storage  | Stats target | Description 
+--------+------+-----------+----------+---------+----------+--------------+-------------
+ f1     | text |           |          |         | extended |              | 
+Indexes:
+    "idx" btree (f1)
+
+CREATE TABLE cmdata1(f1 TEXT COMPRESSION lz4);
+ERROR:  not built with lz4 support
+INSERT INTO cmdata1 VALUES(repeat('1234567890',1004));
+ERROR:  relation "cmdata1" does not exist
+LINE 1: INSERT INTO cmdata1 VALUES(repeat('1234567890',1004));
+                    ^
+\d+ cmdata1
+-- try setting compression for incompressible data type
+CREATE TABLE cmdata2 (f1 int COMPRESSION pglz);
+ERROR:  column data type integer does not support compression
+-- verify stored compression method
+SELECT pg_column_compression(f1) FROM cmdata;
+ pg_column_compression 
+-----------------------
+ pglz
+(1 row)
+
+SELECT pg_column_compression(f1) FROM cmdata1;
+ERROR:  relation "cmdata1" does not exist
+LINE 1: SELECT pg_column_compression(f1) FROM cmdata1;
+                                              ^
+-- decompress data slice
+SELECT SUBSTR(f1, 200, 5) FROM cmdata;
+ substr 
+--------
+ 01234
+(1 row)
+
+SELECT SUBSTR(f1, 2000, 50) FROM cmdata1;
+ERROR:  relation "cmdata1" does not exist
+LINE 1: SELECT SUBSTR(f1, 2000, 50) FROM cmdata1;
+                                         ^
+-- copy with table creation
+SELECT * INTO cmmove1 FROM cmdata;
+SELECT pg_column_compression(f1) FROM cmmove1;
+ pg_column_compression 
+-----------------------
+ pglz
+(1 row)
+
+-- update using datum from different table
+CREATE TABLE cmmove2(f1 text COMPRESSION pglz);
+INSERT INTO cmmove2 VALUES (repeat('1234567890',1004));
+SELECT pg_column_compression(f1) FROM cmmove2;
+ pg_column_compression 
+-----------------------
+ pglz
+(1 row)
+
+UPDATE cmmove2 SET f1 = cmdata.f1 FROM cmdata;
+SELECT pg_column_compression(f1) FROM cmmove2;
+ pg_column_compression 
+-----------------------
+ pglz
+(1 row)
+
+UPDATE cmmove2 SET f1 = cmdata1.f1 FROM cmdata1;
+ERROR:  relation "cmdata1" does not exist
+LINE 1: UPDATE cmmove2 SET f1 = cmdata1.f1 FROM cmdata1;
+                                                ^
+SELECT pg_column_compression(f1) FROM cmmove2;
+ pg_column_compression 
+-----------------------
+ pglz
+(1 row)
+
+-- copy to existing table
+CREATE TABLE cmmove3(f1 text COMPRESSION pglz);
+INSERT INTO cmmove3 SELECT * FROM cmdata;
+INSERT INTO cmmove3 SELECT * FROM cmdata1;
+ERROR:  relation "cmdata1" does not exist
+LINE 1: INSERT INTO cmmove3 SELECT * FROM cmdata1;
+                                          ^
+SELECT pg_column_compression(f1) FROM cmmove2;
+ pg_column_compression 
+-----------------------
+ pglz
+(1 row)
+
+-- test external compressed data
+CREATE OR REPLACE FUNCTION large_val() RETURNS TEXT LANGUAGE SQL AS
+'select array_agg(md5(g::text))::text from generate_series(1, 256) g';
+CREATE TABLE cmdata2 (f1 text COMPRESSION pglz);
+INSERT INTO cmdata2 select large_val() || repeat('a', 4000);
+SELECT pg_column_compression(f1) FROM cmdata2;
+ pg_column_compression 
+-----------------------
+ pglz
+(1 row)
+
+INSERT INTO cmdata1 SELECT * FROM cmdata2;
+ERROR:  relation "cmdata1" does not exist
+LINE 1: INSERT INTO cmdata1 SELECT * FROM cmdata2;
+                    ^
+SELECT pg_column_compression(f1) FROM cmdata1;
+ERROR:  relation "cmdata1" does not exist
+LINE 1: SELECT pg_column_compression(f1) FROM cmdata1;
+                                              ^
+DROP TABLE cmdata2;
+-- test LIKE INCLUDING COMPRESSION
+CREATE TABLE cmdata2 (LIKE cmdata1 INCLUDING COMPRESSION);
+ERROR:  relation "cmdata1" does not exist
+LINE 1: CREATE TABLE cmdata2 (LIKE cmdata1 INCLUDING COMPRESSION);
+                                   ^
+\d+ cmdata2
+-- test compression with materialized view
+CREATE MATERIALIZED VIEW mv(x) AS SELECT * FROM cmdata1;
+ERROR:  relation "cmdata1" does not exist
+LINE 1: CREATE MATERIALIZED VIEW mv(x) AS SELECT * FROM cmdata1;
+                                                        ^
+\d+ mv
+SELECT pg_column_compression(f1) FROM cmdata1;
+ERROR:  relation "cmdata1" does not exist
+LINE 1: SELECT pg_column_compression(f1) FROM cmdata1;
+                                              ^
+SELECT pg_column_compression(x) FROM mv;
+ERROR:  relation "mv" does not exist
+LINE 1: SELECT pg_column_compression(x) FROM mv;
+                                             ^
+-- test compression with partition
+CREATE TABLE cmpart(f1 text COMPRESSION lz4) PARTITION BY HASH(f1);
+ERROR:  not built with lz4 support
+CREATE TABLE cmpart1 PARTITION OF cmpart FOR VALUES WITH (MODULUS 2, REMAINDER 0);
+ERROR:  relation "cmpart" does not exist
+CREATE TABLE cmpart2(f1 text COMPRESSION pglz);
+ALTER TABLE cmpart ATTACH PARTITION cmpart2 FOR VALUES WITH (MODULUS 2, REMAINDER 1);
+ERROR:  relation "cmpart" does not exist
+INSERT INTO cmpart VALUES (repeat('123456789',1004));
+ERROR:  relation "cmpart" does not exist
+LINE 1: INSERT INTO cmpart VALUES (repeat('123456789',1004));
+                    ^
+INSERT INTO cmpart VALUES (repeat('123456789',4004));
+ERROR:  relation "cmpart" does not exist
+LINE 1: INSERT INTO cmpart VALUES (repeat('123456789',4004));
+                    ^
+SELECT pg_column_compression(f1) FROM cmpart;
+ERROR:  relation "cmpart" does not exist
+LINE 1: SELECT pg_column_compression(f1) FROM cmpart;
+                                              ^
+-- test compression with inheritence, error
+CREATE TABLE cminh() INHERITS(cmdata, cmdata1);
+ERROR:  relation "cmdata1" does not exist
+CREATE TABLE cminh(f1 TEXT COMPRESSION lz4) INHERITS(cmdata);
+NOTICE:  merging column "f1" with inherited definition
+ERROR:  column "f1" has a compression method conflict
+DETAIL:  pglz versus lz4
+-- check data is ok
+SELECT length(f1) FROM cmdata;
+ length 
+--------
+  10000
+(1 row)
+
+SELECT length(f1) FROM cmdata1;
+ERROR:  relation "cmdata1" does not exist
+LINE 1: SELECT length(f1) FROM cmdata1;
+                               ^
+SELECT length(f1) FROM cmmove1;
+ length 
+--------
+  10000
+(1 row)
+
+SELECT length(f1) FROM cmmove2;
+ length 
+--------
+  10000
+(1 row)
+
+SELECT length(f1) FROM cmmove3;
+ length 
+--------
+  10000
+(1 row)
+
+\set HIDE_COMPRESSAM on
diff --git a/src/test/regress/expected/psql.out b/src/test/regress/expected/psql.out
index 7204fdb0b4..14f3fa3fc8 100644
--- a/src/test/regress/expected/psql.out
+++ b/src/test/regress/expected/psql.out
@@ -4898,8 +4898,8 @@ Indexes:
 -- check printing info about access methods
 \dA
 List of access methods
-  Name  | Type  
---------+-------
+  Name  |    Type     
+--------+-------------
  brin   | Index
  btree  | Index
  gin    | Index
@@ -4907,13 +4907,15 @@ List of access methods
  hash   | Index
  heap   | Table
  heap2  | Table
+ lz4    | Compression
+ pglz   | Compression
  spgist | Index
-(8 rows)
+(10 rows)
 
 \dA *
 List of access methods
-  Name  | Type  
---------+-------
+  Name  |    Type     
+--------+-------------
  brin   | Index
  btree  | Index
  gin    | Index
@@ -4921,8 +4923,10 @@ List of access methods
  hash   | Index
  heap   | Table
  heap2  | Table
+ lz4    | Compression
+ pglz   | Compression
  spgist | Index
-(8 rows)
+(10 rows)
 
 \dA h*
 List of access methods
@@ -4947,32 +4951,36 @@ List of access methods
 
 \dA: extra argument "bar" ignored
 \dA+
-                             List of access methods
-  Name  | Type  |       Handler        |              Description               
---------+-------+----------------------+----------------------------------------
- brin   | Index | brinhandler          | block range index (BRIN) access method
- btree  | Index | bthandler            | b-tree index access method
- gin    | Index | ginhandler           | GIN index access method
- gist   | Index | gisthandler          | GiST index access method
- hash   | Index | hashhandler          | hash index access method
- heap   | Table | heap_tableam_handler | heap table access method
- heap2  | Table | heap_tableam_handler | 
- spgist | Index | spghandler           | SP-GiST index access method
-(8 rows)
+                                List of access methods
+  Name  |    Type     |       Handler        |              Description               
+--------+-------------+----------------------+----------------------------------------
+ brin   | Index       | brinhandler          | block range index (BRIN) access method
+ btree  | Index       | bthandler            | b-tree index access method
+ gin    | Index       | ginhandler           | GIN index access method
+ gist   | Index       | gisthandler          | GiST index access method
+ hash   | Index       | hashhandler          | hash index access method
+ heap   | Table       | heap_tableam_handler | heap table access method
+ heap2  | Table       | heap_tableam_handler | 
+ lz4    | Compression | lz4handler           | lz4 compression access method
+ pglz   | Compression | pglzhandler          | pglz compression access method
+ spgist | Index       | spghandler           | SP-GiST index access method
+(10 rows)
 
 \dA+ *
-                             List of access methods
-  Name  | Type  |       Handler        |              Description               
---------+-------+----------------------+----------------------------------------
- brin   | Index | brinhandler          | block range index (BRIN) access method
- btree  | Index | bthandler            | b-tree index access method
- gin    | Index | ginhandler           | GIN index access method
- gist   | Index | gisthandler          | GiST index access method
- hash   | Index | hashhandler          | hash index access method
- heap   | Table | heap_tableam_handler | heap table access method
- heap2  | Table | heap_tableam_handler | 
- spgist | Index | spghandler           | SP-GiST index access method
-(8 rows)
+                                List of access methods
+  Name  |    Type     |       Handler        |              Description               
+--------+-------------+----------------------+----------------------------------------
+ brin   | Index       | brinhandler          | block range index (BRIN) access method
+ btree  | Index       | bthandler            | b-tree index access method
+ gin    | Index       | ginhandler           | GIN index access method
+ gist   | Index       | gisthandler          | GiST index access method
+ hash   | Index       | hashhandler          | hash index access method
+ heap   | Table       | heap_tableam_handler | heap table access method
+ heap2  | Table       | heap_tableam_handler | 
+ lz4    | Compression | lz4handler           | lz4 compression access method
+ pglz   | Compression | pglzhandler          | pglz compression access method
+ spgist | Index       | spghandler           | SP-GiST index access method
+(10 rows)
 
 \dA+ h*
                      List of access methods
diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule
index 12bb67e491..e9c0fc9760 100644
--- a/src/test/regress/parallel_schedule
+++ b/src/test/regress/parallel_schedule
@@ -114,7 +114,7 @@ test: plancache limit plpgsql copy2 temp domain rangefuncs prepare conversion tr
 # ----------
 # Another group of parallel tests
 # ----------
-test: partition_join partition_prune reloptions hash_part indexing partition_aggregate partition_info tuplesort explain
+test: partition_join partition_prune reloptions hash_part indexing partition_aggregate partition_info tuplesort explain compression
 
 # event triggers cannot run concurrently with any test that runs DDL
 # oidjoins is read-only, though, and should run late for best coverage
diff --git a/src/test/regress/serial_schedule b/src/test/regress/serial_schedule
index 59b416fd80..4d5577bae3 100644
--- a/src/test/regress/serial_schedule
+++ b/src/test/regress/serial_schedule
@@ -199,6 +199,7 @@ test: partition_aggregate
 test: partition_info
 test: tuplesort
 test: explain
+test: compression
 test: event_trigger
 test: oidjoins
 test: fast_default
diff --git a/src/test/regress/sql/compression.sql b/src/test/regress/sql/compression.sql
new file mode 100644
index 0000000000..450416ecb4
--- /dev/null
+++ b/src/test/regress/sql/compression.sql
@@ -0,0 +1,85 @@
+\set HIDE_COMPRESSAM off
+
+-- test creating table with compression method
+CREATE TABLE cmdata(f1 text COMPRESSION pglz);
+CREATE INDEX idx ON cmdata(f1);
+INSERT INTO cmdata VALUES(repeat('1234567890',1000));
+\d+ cmdata
+
+CREATE TABLE cmdata1(f1 TEXT COMPRESSION lz4);
+INSERT INTO cmdata1 VALUES(repeat('1234567890',1004));
+\d+ cmdata1
+
+-- try setting compression for incompressible data type
+CREATE TABLE cmdata2 (f1 int COMPRESSION pglz);
+
+-- verify stored compression method
+SELECT pg_column_compression(f1) FROM cmdata;
+SELECT pg_column_compression(f1) FROM cmdata1;
+
+-- decompress data slice
+SELECT SUBSTR(f1, 200, 5) FROM cmdata;
+SELECT SUBSTR(f1, 2000, 50) FROM cmdata1;
+
+-- copy with table creation
+SELECT * INTO cmmove1 FROM cmdata;
+SELECT pg_column_compression(f1) FROM cmmove1;
+
+-- update using datum from different table
+CREATE TABLE cmmove2(f1 text COMPRESSION pglz);
+INSERT INTO cmmove2 VALUES (repeat('1234567890',1004));
+SELECT pg_column_compression(f1) FROM cmmove2;
+
+UPDATE cmmove2 SET f1 = cmdata.f1 FROM cmdata;
+SELECT pg_column_compression(f1) FROM cmmove2;
+UPDATE cmmove2 SET f1 = cmdata1.f1 FROM cmdata1;
+SELECT pg_column_compression(f1) FROM cmmove2;
+
+-- copy to existing table
+CREATE TABLE cmmove3(f1 text COMPRESSION pglz);
+INSERT INTO cmmove3 SELECT * FROM cmdata;
+INSERT INTO cmmove3 SELECT * FROM cmdata1;
+SELECT pg_column_compression(f1) FROM cmmove2;
+
+-- test external compressed data
+CREATE OR REPLACE FUNCTION large_val() RETURNS TEXT LANGUAGE SQL AS
+'select array_agg(md5(g::text))::text from generate_series(1, 256) g';
+CREATE TABLE cmdata2 (f1 text COMPRESSION pglz);
+INSERT INTO cmdata2 select large_val() || repeat('a', 4000);
+SELECT pg_column_compression(f1) FROM cmdata2;
+INSERT INTO cmdata1 SELECT * FROM cmdata2;
+SELECT pg_column_compression(f1) FROM cmdata1;
+DROP TABLE cmdata2;
+
+-- test LIKE INCLUDING COMPRESSION
+CREATE TABLE cmdata2 (LIKE cmdata1 INCLUDING COMPRESSION);
+\d+ cmdata2
+
+-- test compression with materialized view
+CREATE MATERIALIZED VIEW mv(x) AS SELECT * FROM cmdata1;
+\d+ mv
+SELECT pg_column_compression(f1) FROM cmdata1;
+SELECT pg_column_compression(x) FROM mv;
+
+-- test compression with partition
+CREATE TABLE cmpart(f1 text COMPRESSION lz4) PARTITION BY HASH(f1);
+CREATE TABLE cmpart1 PARTITION OF cmpart FOR VALUES WITH (MODULUS 2, REMAINDER 0);
+CREATE TABLE cmpart2(f1 text COMPRESSION pglz);
+
+ALTER TABLE cmpart ATTACH PARTITION cmpart2 FOR VALUES WITH (MODULUS 2, REMAINDER 1);
+INSERT INTO cmpart VALUES (repeat('123456789',1004));
+INSERT INTO cmpart VALUES (repeat('123456789',4004));
+SELECT pg_column_compression(f1) FROM cmpart;
+
+-- test compression with inheritence, error
+CREATE TABLE cminh() INHERITS(cmdata, cmdata1);
+CREATE TABLE cminh(f1 TEXT COMPRESSION lz4) INHERITS(cmdata);
+
+-- check data is ok
+SELECT length(f1) FROM cmdata;
+SELECT length(f1) FROM cmdata1;
+SELECT length(f1) FROM cmmove1;
+SELECT length(f1) FROM cmmove2;
+SELECT length(f1) FROM cmmove3;
+
+\set HIDE_COMPRESSAM on
diff --git a/src/tools/msvc/Solution.pm b/src/tools/msvc/Solution.pm
index 2aa062b2c9..c64b369047 100644
--- a/src/tools/msvc/Solution.pm
+++ b/src/tools/msvc/Solution.pm
@@ -307,6 +307,7 @@ sub GenerateFiles
 		HAVE_LIBXML2                                => undef,
 		HAVE_LIBXSLT                                => undef,
 		HAVE_LIBZ                   => $self->{options}->{zlib} ? 1 : undef,
+		HAVE_LIBLZ4                 => undef,
 		HAVE_LINK                   => undef,
 		HAVE_LOCALE_T               => 1,
 		HAVE_LONG_INT_64            => undef,
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index bab4f3adb3..46cac11d39 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -395,6 +395,7 @@ CompositeIOData
 CompositeTypeStmt
 CompoundAffixFlag
 CompressionAlgorithm
+CompressionAmRoutine
 CompressorState
 ComputeXidHorizonsResult
 ConditionVariable
-- 
2.17.0

