From a31743fa72025c836283504a930fb173b5eabe81 Mon Sep 17 00:00:00 2001
From: Ashwin Agrawal <aagrawal@pivotal.io>
Date: Mon, 8 Apr 2019 16:54:21 -0700
Subject: [PATCH v1] Zedstore - compressed in-core columnar storage.

---
 configure                                     |  118 ++
 configure.in                                  |   19 +
 src/backend/access/Makefile                   |    2 +-
 src/backend/access/zedstore/Makefile          |   19 +
 src/backend/access/zedstore/README            |  253 +++
 src/backend/access/zedstore/zedstore_btree.c  | 1723 +++++++++++++++++
 .../access/zedstore/zedstore_compression.c    |  362 ++++
 .../access/zedstore/zedstore_inspect.c        |  445 +++++
 src/backend/access/zedstore/zedstore_meta.c   |  218 +++
 src/backend/access/zedstore/zedstore_toast.c  |  188 ++
 src/backend/access/zedstore/zedstore_undo.c   |  783 ++++++++
 .../access/zedstore/zedstore_visibility.c     |  259 +++
 .../access/zedstore/zedstoream_handler.c      | 1500 ++++++++++++++
 src/backend/executor/execScan.c               |   75 +
 src/backend/executor/nodeIndexonlyscan.c      |   16 +-
 src/backend/executor/nodeIndexscan.c          |    7 +
 src/backend/executor/nodeSeqscan.c            |   18 +-
 src/include/access/tableam.h                  |   44 +
 src/include/access/zedstore_compression.h     |   51 +
 src/include/access/zedstore_internal.h        |  409 ++++
 src/include/access/zedstore_undo.h            |  133 ++
 src/include/catalog/pg_am.dat                 |    3 +
 src/include/catalog/pg_proc.dat               |   24 +
 src/include/executor/executor.h               |    2 +
 src/include/nodes/execnodes.h                 |    1 +
 src/include/pg_config.h.in                    |    9 +
 src/test/regress/expected/create_am.out       |   11 +-
 src/test/regress/expected/zedstore.out        |  259 +++
 src/test/regress/parallel_schedule            |    2 +-
 src/test/regress/serial_schedule              |    1 +
 src/test/regress/sql/zedstore.sql             |  104 +
 31 files changed, 7045 insertions(+), 13 deletions(-)
 create mode 100644 src/backend/access/zedstore/Makefile
 create mode 100644 src/backend/access/zedstore/README
 create mode 100644 src/backend/access/zedstore/zedstore_btree.c
 create mode 100644 src/backend/access/zedstore/zedstore_compression.c
 create mode 100644 src/backend/access/zedstore/zedstore_inspect.c
 create mode 100644 src/backend/access/zedstore/zedstore_meta.c
 create mode 100644 src/backend/access/zedstore/zedstore_toast.c
 create mode 100644 src/backend/access/zedstore/zedstore_undo.c
 create mode 100644 src/backend/access/zedstore/zedstore_visibility.c
 create mode 100644 src/backend/access/zedstore/zedstoream_handler.c
 create mode 100644 src/include/access/zedstore_compression.h
 create mode 100644 src/include/access/zedstore_internal.h
 create mode 100644 src/include/access/zedstore_undo.h
 create mode 100644 src/test/regress/expected/zedstore.out
 create mode 100644 src/test/regress/sql/zedstore.sql

diff --git a/configure b/configure
index 806810817d..8ca80562c8 100755
--- a/configure
+++ b/configure
@@ -700,6 +700,7 @@ LDFLAGS_EX
 ELF_SYS
 EGREP
 GREP
+with_lz4
 with_zlib
 with_system_tzdata
 with_libxslt
@@ -864,6 +865,7 @@ with_libxml
 with_libxslt
 with_system_tzdata
 with_zlib
+with_lz4
 with_gnu_ld
 enable_largefile
 enable_float4_byval
@@ -1570,6 +1572,7 @@ Optional Packages:
   --with-system-tzdata=DIR
                           use system time zone data in DIR
   --without-zlib          do not use Zlib
+  --with-lz4              build with LZ4 support
   --with-gnu-ld           assume the C compiler uses GNU ld [default=no]
 
 Some influential environment variables:
@@ -8306,6 +8309,41 @@ fi
 
 
 
+#
+# LZ4
+#
+{ $as_echo "$as_me:${as_lineno-$LINENO}: checking whether to build with LZ4 support" >&5
+$as_echo_n "checking whether to build with LZ4 support... " >&6; }
+
+
+
+# Check whether --with-lz4 was given.
+if test "${with_lz4+set}" = set; then :
+  withval=$with_lz4;
+  case $withval in
+    yes)
+
+$as_echo "#define USE_LZ4 1" >>confdefs.h
+
+      ;;
+    no)
+      :
+      ;;
+    *)
+      as_fn_error $? "no argument expected for --with-lz4 option" "$LINENO" 5
+      ;;
+  esac
+
+else
+  with_lz4=no
+
+fi
+
+
+{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $with_lz4" >&5
+$as_echo "$with_lz4" >&6; }
+
+
 #
 # Elf
 #
@@ -11828,6 +11866,56 @@ fi
 
 fi
 
+if test "$with_lz4" = yes; then
+  { $as_echo "$as_me:${as_lineno-$LINENO}: checking for LZ4_compress_default in -llz4" >&5
+$as_echo_n "checking for LZ4_compress_default in -llz4... " >&6; }
+if ${ac_cv_lib_lz4_LZ4_compress_default+:} false; then :
+  $as_echo_n "(cached) " >&6
+else
+  ac_check_lib_save_LIBS=$LIBS
+LIBS="-llz4  $LIBS"
+cat confdefs.h - <<_ACEOF >conftest.$ac_ext
+/* end confdefs.h.  */
+
+/* Override any GCC internal prototype to avoid an error.
+   Use char because int might match the return type of a GCC
+   builtin and then its argument prototype would still apply.  */
+#ifdef __cplusplus
+extern "C"
+#endif
+char LZ4_compress_default ();
+int
+main ()
+{
+return LZ4_compress_default ();
+  ;
+  return 0;
+}
+_ACEOF
+if ac_fn_c_try_link "$LINENO"; then :
+  ac_cv_lib_lz4_LZ4_compress_default=yes
+else
+  ac_cv_lib_lz4_LZ4_compress_default=no
+fi
+rm -f core conftest.err conftest.$ac_objext \
+    conftest$ac_exeext conftest.$ac_ext
+LIBS=$ac_check_lib_save_LIBS
+fi
+{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_lz4_LZ4_compress_default" >&5
+$as_echo "$ac_cv_lib_lz4_LZ4_compress_default" >&6; }
+if test "x$ac_cv_lib_lz4_LZ4_compress_default" = xyes; then :
+  cat >>confdefs.h <<_ACEOF
+#define HAVE_LIBLZ4 1
+_ACEOF
+
+  LIBS="-llz4 $LIBS"
+
+else
+  as_fn_error $? "library 'lz4' is required for LZ4 support" "$LINENO" 5
+fi
+
+fi
+
 if test "$enable_spinlocks" = yes; then
 
 $as_echo "#define HAVE_SPINLOCKS 1" >>confdefs.h
@@ -13027,6 +13115,36 @@ Use --without-zlib to disable zlib support." "$LINENO" 5
 fi
 
 
+fi
+
+if test "$with_lz4" = yes; then
+  for ac_header in lz4.h
+do :
+  ac_fn_c_check_header_mongrel "$LINENO" "lz4.h" "ac_cv_header_lz4_h" "$ac_includes_default"
+if test "x$ac_cv_header_lz4_h" = xyes; then :
+  cat >>confdefs.h <<_ACEOF
+#define HAVE_LZ4_H 1
+_ACEOF
+
+else
+  for ac_header in lz4.h
+do :
+  ac_fn_c_check_header_mongrel "$LINENO" "lz4.h" "ac_cv_header_lz4_h" "$ac_includes_default"
+if test "x$ac_cv_header_lz4_h" = xyes; then :
+  cat >>confdefs.h <<_ACEOF
+#define HAVE_LZ4_H 1
+_ACEOF
+
+else
+  as_fn_error $? "lz4.h header file is required for LZ4" "$LINENO" 5
+fi
+
+done
+
+fi
+
+done
+
 fi
 
 if test "$with_gssapi" = yes ; then
diff --git a/configure.in b/configure.in
index 9c7a9738bc..e229faf75a 100644
--- a/configure.in
+++ b/configure.in
@@ -964,6 +964,16 @@ PGAC_ARG_BOOL(with, zlib, yes,
               [do not use Zlib])
 AC_SUBST(with_zlib)
 
+#
+# LZ4
+#
+AC_MSG_CHECKING([whether to build with LZ4 support])
+PGAC_ARG_BOOL(with, lz4, no,
+              [build with LZ4 support],
+              [AC_DEFINE([USE_LZ4], 1, [Define to 1 to build with LZ4 support. (--with-lz4)])])
+AC_MSG_RESULT([$with_lz4])
+AC_SUBST(with_lz4)
+
 #
 # Elf
 #
@@ -1174,6 +1184,10 @@ failure.  It is possible the compiler isn't looking in the proper directory.
 Use --without-zlib to disable zlib support.])])
 fi
 
+if test "$with_lz4" = yes; then
+  AC_CHECK_LIB(lz4, LZ4_compress_default, [], [AC_MSG_ERROR([library 'lz4' is required for LZ4 support])])
+fi
+
 if test "$enable_spinlocks" = yes; then
   AC_DEFINE(HAVE_SPINLOCKS, 1, [Define to 1 if you have spinlocks.])
 else
@@ -1387,6 +1401,11 @@ failure.  It is possible the compiler isn't looking in the proper directory.
 Use --without-zlib to disable zlib support.])])
 fi
 
+if test "$with_lz4" = yes; then
+  AC_CHECK_HEADERS(lz4.h, [],
+	[AC_CHECK_HEADERS(lz4.h, [], [AC_MSG_ERROR([lz4.h header file is required for LZ4])])])
+fi
+
 if test "$with_gssapi" = yes ; then
   AC_CHECK_HEADERS(gssapi/gssapi.h, [],
 	[AC_CHECK_HEADERS(gssapi.h, [], [AC_MSG_ERROR([gssapi.h header file is required for GSSAPI])])])
diff --git a/src/backend/access/Makefile b/src/backend/access/Makefile
index 0880e0a8bb..6d36f3bd26 100644
--- a/src/backend/access/Makefile
+++ b/src/backend/access/Makefile
@@ -9,6 +9,6 @@ top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
 SUBDIRS	    = brin common gin gist hash heap index nbtree rmgrdesc spgist \
-			  table tablesample transam
+			  table tablesample transam zedstore
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/zedstore/Makefile b/src/backend/access/zedstore/Makefile
new file mode 100644
index 0000000000..52c2b2025c
--- /dev/null
+++ b/src/backend/access/zedstore/Makefile
@@ -0,0 +1,19 @@
+#-------------------------------------------------------------------------
+#
+# Makefile--
+#    Makefile for access/zedstore
+#
+# IDENTIFICATION
+#    src/backend/access/zedstore/Makefile
+#
+#-------------------------------------------------------------------------
+
+subdir = src/backend/access/zedstore
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+OBJS = zedstore_btree.o zedstore_compression.o zedstoream_handler.o \
+       zedstore_meta.o zedstore_undo.o zedstore_toast.o zedstore_visibility.o \
+       zedstore_inspect.o
+
+include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/zedstore/README b/src/backend/access/zedstore/README
new file mode 100644
index 0000000000..96d50ae580
--- /dev/null
+++ b/src/backend/access/zedstore/README
@@ -0,0 +1,253 @@
+
+src/backend/access/zedstore/README
+
+ZedStore - compressed column (and row) store for PostgreSQL
+===========================================================
+
+The purpose of this README is to provide overview of zedstore's
+design, major requirements/objectives it intends to fulfill and
+high-level implementation details.
+
+Objectives
+----------
+
+* Performance improvement for queries selecting subset of columns (reduced IO).
+
+* Reduced on-disk footprint compared to heap table. Shorter tuple
+  headers and also leveraging compression of similar type data
+
+* Be first-class citizen in the Postgres architecture (tables data can
+  just independently live in columnar storage) and not be at arm's
+  length though an opaque interface.
+
+* Fully MVCC compliant - basically all operations supported similar to
+  heap, like update, delete, serializable transactions etc...
+
+* All Indexes supported
+
+* Hybrid row-column store, where some columns are stored together, and
+  others separately. Provide flexibility of granularity on how to
+  divide the columns. Columns accessed together can be stored
+  together.
+
+* Provide better control over bloat (using zheap)
+
+* Eliminate need for separate toast tables
+
+* Faster add / drop column or changing data type of column by avoiding
+  full rewrite of the table.
+
+Highlevel design of zedStore - B-trees for the win!
+---------------------------------------------------
+
+To start simple, let's ignore column store aspect and consider it as
+compressed row store. The column store is natural externsion of this
+concept, explained in next section.
+
+The basic on-disk data structure leveraged is a B-tree, indexed by
+TID. BTree being a great data structure, fast and versatile. Note this
+is not refering to existing Btree indexes, but instead net new BTree
+for table data storage.
+
+TID - used as a logical row identifier:
+TID is just a 48-bit row identifier. The traditional division into
+block and offset numbers is meaningless. In order to find a tuple with
+a given TID, one must always descend the B-tree. Having logical TID
+provides flexibility to move the tuples around different pages on page
+splits or page merges can be performed.
+
+The internal pages of the B-tree are super simple and boring. Each
+internal page just stores an array of TID/downlink pairs. Let's focus
+on the leaf level. Leaf blocks have short uncompressed header,
+followed by btree items. It contains two kind of items:
+
+ - plain item, holds one tuple or one datum, uncompressed payload
+ - a "container item", holds multiple plain items, compressed payload
+
++-----------------------------
+| Fixed-size page header:
+|
+|   LSN
+|   TID low and hi key (for Lehman & Yao B-tree operations)
+|   left and right page pointers
+|
+| Items:
+|
+|   TID | size | flags | uncompressed size | lastTID | payload (container item)
+|   TID | size | flags | uncompressed size | lastTID | payload (container item)
+|   TID | size | flags | undo pointer | payload (plain item)
+|   TID | size | flags | undo pointer | payload (plain item)
+|   ...
+|
++----------------------------
+
+Row store
+---------
+
+The tuples are stored one after another, sorted by TID. For each
+tuple, we store its 48-bit TID, a undo record pointer, and the actual
+tuple data uncompressed.
+
+In uncompressed form, the page can be arbitrarily large. But after
+compression, it must fit into a physical 8k block. If on insert or
+update of a tuple, the page cannot be compressed below 8k anymore, the
+page is split. Note that because TIDs are logical rather than physical
+identifiers, we can freely move tuples from one physical page to
+another during page split. A tuple's TID never changes.
+
+The buffer cache caches compressed blocks. Likewise, WAL-logging,
+full-page images etc. work on compressed blocks. Uncompression is done
+on-the-fly, as and when needed in backend-private memory, when
+reading. For some compressions like rel encoding or delta encoding
+tuples can be constructed directly from compressed data.
+
+Column store
+------------
+
+A column store uses the same structure but we have *multiple* B-trees,
+one for each column, all indexed by TID. Imagine zedstore as a forest
+of B-trees. The B-trees for all columns are stored in the same
+physical file.
+
+A metapage at block 0, has links to the roots of the B-trees. Leaf
+pages look the same, but instead of storing the whole tuple, stores
+just a single attribute. To reconstruct a row with given TID, scan
+descends down the B-trees for all the columns using that TID, and
+fetches all attributes. Likewise, a sequential scan walks all the
+B-trees in lockstep.
+
+
+MVCC
+----
+
+Undo record pointers are used to implement MVCC, like in zheap. Hence,
+transaction information if not directly stored with the data. In
+zheap, there's a small, fixed, number of "transaction slots" on each
+page, but zedstore has undo pointer with each item directly; in normal
+cases, the compression squeezes this down to almost nothing. In case
+of bulk load the undo record pointer can be maintained for bulk of
+items and not per item.
+
+
+Insert:
+
+Inserting a new row, splits the row into datums. Then for first column
+decide which block to insert the same to, and pick a TID for it, and
+write undo record for the same. Rest of the columns are inserted using
+that TID and point to same undo position.
+
+There's one subtle little issue here:
+
+Imagine that you load the table with very large rows, so that every page
+has just a single row. If you assign the TID ranges naively, as you add
+new leaf pages to the end, you will end up with leaf pages with only one
+TID each. So the first page covers TIDs [1, 2), the second [2, 3), and
+so forth. If you then delete a row, and try to insert 10 smaller rows to
+the same page, you can't, because there aren't enough unused TIDs in the
+page's range.
+
+Can avoid that by simply padding the TID ranges, as we add new pages,
+so that each page is initially allocated e.g. 50000 TIDs, even if you
+only place one row to it. That gives a lot of breathing room. There
+might still be some corner cases, where repeated updates cause page
+splits, so that you still end up with very small TIDs ranges on the
+split pages. But that seems fine.
+
+Toast:
+When an overly large datum is stored, it is divided into chunks, and
+each chunk is stored on a dedicated toast page within the same
+physical file. The toast pages of a datum form list, each page has a
+next/prev pointer.
+
+Select:
+Property is added to Table AM to convey if column projection is
+leveraged by AM for scans. While scanning tables with AM leveraging
+this property, executor parses the plan. Leverages the target list and
+quals to find the required columns for query. This list is passed down
+to AM on beginscan. Zedstore uses this column projection list to only
+pull data from selected columns. Virtual tuple table slot is used to
+pass back the datums for subset of columns.
+
+Current table am API requires enhancement here to pass down column
+projection to AM. The patch showcases two different ways for the same.
+
+* For sequential scans added new beginscan_with_column_projection()
+  API. Executor checks AM property and if it leverages column
+  projection uses this new API else normal beginscan() API.
+
+* For index scans instead of modifying the begin scan API, added new
+  API to specifically pass column projection list after calling begin
+  scan to populate the scan descriptor but before fetching the tuples.
+
+Update:
+
+Index Support:
+Building index also leverages columnar storage and only scans columns
+required to build the index. Indexes work pretty similar to heap
+tables. Data is inserted into tables and TID for the tuple same gets
+stored in index. On index scans, required column Btrees are scanned
+for given TID and datums passed back using virtual tuple.
+
+Page Format
+-----------
+A ZedStore table contains different kinds of pages, all in the same
+file. Kinds of pages are meta-page, per-attribute btree internal and
+leaf pages, UNDO log page, and toast pages. Each page type has its own
+distinct data storage format.
+
+META Page:
+Block 0 is always a metapage. It contains the block numbers of the
+other data structures stored within the file, like the per-attribute
+B-trees, and the UNDO log.
+
+BTREE Page:
+
+UNDO Page:
+
+TOAST Page:
+
+
+Free Space Map
+--------------
+
+
+Enhancements
+------------
+
+Instead of compressing all the tuples on a page in one batch, store a
+small "dictionary", e.g. in page header or meta page or separate
+dedicated page, and use it to compress tuple by tuple. That could make
+random reads and updates of individual tuples faster. Need to find how
+to create the dictionary first.
+
+Only cached compressed pages in the page cache. If we want to cache
+uncompressed pages instead, or in addition to that, we need to invent
+a whole new kind of a buffer cache that can deal with the
+variable-size blocks. For a first version, I think we can live without
+it.
+
+Instead of storing all columns in the same file, we could store them in
+separate files (separate forks?). That would allow immediate reuse of
+space, after dropping a column. It's not clear how to use an FSM in that
+case, though. Might have to implement an integrated FSM, too. (Which
+might not be a bad idea, anyway).
+
+Design allows for hybrid row-column store, where some columns are
+stored together, and others have a dedicated B-tree. Need to have user
+facing syntax to allow specifying how to group the columns.
+
+Salient points for the design
+------------------------------
+
+* Layout the data/tuples in mapped fashion instead of keeping the
+  logical to physical mapping separate from actual data. So, keep all
+  the meta-data and data logically in single stream of file, avoiding
+  the need for separate forks/files to store meta-data and data.
+
+* Handle/treat operations at tuple level and not block level.
+
+* Stick to fixed size physical blocks. Variable size blocks (for
+  possibly higher compression ratios) pose need for increased logical
+  to physical mapping maintenance, plus restrictions on concurrency of
+  writes and reads to files. Hence adopt compression to fit fixed size
+  blocks instead of other way round.
diff --git a/src/backend/access/zedstore/zedstore_btree.c b/src/backend/access/zedstore/zedstore_btree.c
new file mode 100644
index 0000000000..a1e0cb31b7
--- /dev/null
+++ b/src/backend/access/zedstore/zedstore_btree.c
@@ -0,0 +1,1723 @@
+/*
+ * zedstore_btree.c
+ *		Routines for handling B-trees structures in ZedStore
+ *
+ * A Zedstore table consists of multiple B-trees, one for each attribute. The
+ * functions in this file deal with one B-tree at a time, it is the caller's
+ * responsibility to tie together the scans of each btree.
+ *
+ * Operations:
+ *
+ * - Sequential scan in TID order
+ *  - must be efficient with scanning multiple trees in sync
+ *
+ * - random lookups, by TID (for index scan)
+ *
+ * - range scans by TID (for bitmap index scan)
+ *
+ * NOTES:
+ * - Locking order: child before parent, left before right
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/zedstore/zedstore_btree.c
+ */
+#include "postgres.h"
+
+#include "access/tableam.h"
+#include "access/xact.h"
+#include "access/zedstore_compression.h"
+#include "access/zedstore_internal.h"
+#include "access/zedstore_undo.h"
+#include "miscadmin.h"
+#include "storage/bufmgr.h"
+#include "storage/procarray.h"
+#include "utils/datum.h"
+#include "utils/rel.h"
+
+/* prototypes for local functions */
+static zstid zsbt_insert_item(Relation rel, AttrNumber attno, ZSUncompressedBtreeItem *newitem,
+							  TransactionId xid, CommandId cid, ZSUndoRecPtr *undorecptr);
+static Buffer zsbt_descend(Relation rel, BlockNumber rootblk, zstid key);
+static Buffer zsbt_find_downlink(Relation rel, AttrNumber attno,
+								 zstid key, BlockNumber childblk, int level,
+								 int *itemno);
+static void zsbt_recompress_replace(Relation rel, AttrNumber attno,
+									Buffer oldbuf, List *items);
+static void zsbt_insert_downlink(Relation rel, AttrNumber attno, Buffer leftbuf,
+								 zstid rightlokey, BlockNumber rightblkno);
+static void zsbt_split_internal_page(Relation rel, AttrNumber attno,
+									 Buffer leftbuf, Buffer childbuf,
+									 OffsetNumber newoff, zstid newkey, BlockNumber childblk);
+static void zsbt_newroot(Relation rel, AttrNumber attno, int level,
+						 zstid key1, BlockNumber blk1,
+						 zstid key2, BlockNumber blk2,
+						 Buffer leftchildbuf);
+static ZSUncompressedBtreeItem *zsbt_scan_next_internal(ZSBtreeScan *scan);
+static void zsbt_replace_item(Relation rel, AttrNumber attno, Buffer buf,
+							  ZSBtreeItem *olditem, ZSBtreeItem *replacementitem,
+							  ZSBtreeItem *newitem, List *newitems);
+
+static int zsbt_binsrch_internal(zstid key, ZSBtreeInternalPageItem *arr, int arr_elems);
+
+/* ----------------------------------------------------------------
+ *						 Public interface
+ * ----------------------------------------------------------------
+ */
+
+/*
+ * Begin a scan of the btree.
+ */
+void
+zsbt_begin_scan(Relation rel, AttrNumber attno, zstid starttid, Snapshot snapshot, ZSBtreeScan *scan)
+{
+	BlockNumber	rootblk;
+	Buffer		buf;
+
+	rootblk = zsmeta_get_root_for_attribute(rel, attno, false);
+
+	if (rootblk == InvalidBlockNumber)
+	{
+		/* completely empty tree */
+		scan->rel = NULL;
+		scan->attno = InvalidAttrNumber;
+		scan->active = false;
+		scan->lastbuf = InvalidBuffer;
+		scan->lastbuf_is_locked = false;
+		scan->lastoff = InvalidOffsetNumber;
+		scan->snapshot = NULL;
+		memset(&scan->recent_oldest_undo, 0, sizeof(scan->recent_oldest_undo));
+		scan->nexttid = InvalidZSTid;
+		return;
+	}
+
+	buf = zsbt_descend(rel, rootblk, starttid);
+	LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+
+	scan->rel = rel;
+	scan->attno = attno;
+	scan->snapshot = snapshot;
+	scan->for_update = false;		/* caller can change this */
+
+	scan->active = true;
+	scan->lastbuf = buf;
+	scan->lastbuf_is_locked = false;
+	scan->lastoff = InvalidOffsetNumber;
+	scan->nexttid = starttid;
+
+	scan->has_decompressed = false;
+	zs_decompress_init(&scan->decompressor);
+
+	memset(&scan->recent_oldest_undo, 0, sizeof(scan->recent_oldest_undo));
+}
+
+void
+zsbt_end_scan(ZSBtreeScan *scan)
+{
+	if (!scan->active)
+		return;
+
+	if (scan->lastbuf != InvalidBuffer)
+	{
+		if (scan->lastbuf_is_locked)
+			LockBuffer(scan->lastbuf, BUFFER_LOCK_UNLOCK);
+		ReleaseBuffer(scan->lastbuf);
+	}
+	zs_decompress_free(&scan->decompressor);
+
+	scan->active = false;
+}
+
+/*
+ * Return true if there was another tuple. The datum is returned in *datum,
+ * and its TID in *tid. For a pass-by-ref datum, it's a palloc'd copy.
+ */
+bool
+zsbt_scan_next(ZSBtreeScan *scan, Datum *datum, bool *isnull, zstid *tid)
+{
+	TupleDesc	desc;
+	Form_pg_attribute attr;
+	ZSUncompressedBtreeItem *item;
+
+	if (!scan->active)
+		return false;
+
+	desc = RelationGetDescr(scan->rel);
+	attr = &desc->attrs[scan->attno - 1];
+
+	while ((item = zsbt_scan_next_internal(scan)) != NULL)
+	{
+		if (zs_SatisfiesVisibility(scan, item))
+		{
+			char		*ptr = item->t_payload;
+
+			*tid = item->t_tid;
+			if (item->t_flags & ZSBT_NULL)
+				*isnull = true;
+			else
+			{
+				*isnull = false;
+				*datum = fetchatt(attr, ptr);
+				*datum = zs_datumCopy(*datum, attr->attbyval, attr->attlen);
+			}
+
+			if (scan->lastbuf_is_locked)
+			{
+				LockBuffer(scan->lastbuf, BUFFER_LOCK_UNLOCK);
+				scan->lastbuf_is_locked = false;
+			}
+
+			return true;
+		}
+	}
+	return false;
+}
+
+/*
+ * Get the last tid (plus one) in the tree.
+ */
+zstid
+zsbt_get_last_tid(Relation rel, AttrNumber attno)
+{
+	BlockNumber	rootblk;
+	zstid		rightmostkey;
+	zstid		tid;
+	Buffer		buf;
+	Page		page;
+	ZSBtreePageOpaque *opaque;
+	OffsetNumber maxoff;
+
+	/* Find the rightmost leaf */
+	rootblk = zsmeta_get_root_for_attribute(rel, attno, true);
+	rightmostkey = MaxZSTid;
+	buf = zsbt_descend(rel, rootblk, rightmostkey);
+	page = BufferGetPage(buf);
+	opaque = ZSBtreePageGetOpaque(page);
+
+	/*
+	 * Look at the last item, for its tid.
+	 */
+	maxoff = PageGetMaxOffsetNumber(page);
+	if (maxoff >= FirstOffsetNumber)
+	{
+		ItemId		iid = PageGetItemId(page, maxoff);
+		ZSBtreeItem	*hitup = (ZSBtreeItem *) PageGetItem(page, iid);
+
+		/* COMPRESSED items cover a range of TIDs */
+		if ((hitup->t_flags & ZSBT_COMPRESSED) != 0)
+			tid = ((ZSCompressedBtreeItem *) hitup)->t_lasttid;
+		else
+			tid = hitup->t_tid;
+		tid = ZSTidIncrementForInsert(tid);
+	}
+	else
+	{
+		tid = opaque->zs_lokey;
+	}
+	UnlockReleaseBuffer(buf);
+
+	return tid;
+}
+
+ZSUncompressedBtreeItem *
+zsbt_create_item(Form_pg_attribute attr, zstid tid, Datum datum, bool isnull)
+{
+	Size		datumsz;
+	Size		itemsz;
+	ZSUncompressedBtreeItem *newitem;
+	char	   *dataptr;
+
+	/*
+	 * Form a ZSBtreeItem to insert.
+	 */
+	if (isnull)
+		datumsz = 0;
+	else
+		datumsz = zs_datumGetSize(datum, attr->attbyval, attr->attlen);
+	itemsz = offsetof(ZSUncompressedBtreeItem, t_payload) + datumsz;
+
+	newitem = palloc(itemsz);
+	memset(newitem, 0, offsetof(ZSUncompressedBtreeItem, t_payload)); /* zero padding */
+	newitem->t_tid = tid;
+	newitem->t_flags = 0;
+	newitem->t_size = itemsz;
+	memset(&newitem->t_undo_ptr, 0, sizeof(ZSUndoRecPtr));
+
+	if (isnull)
+		newitem->t_flags |= ZSBT_NULL;
+	else
+	{
+		dataptr = ((char *) newitem) + offsetof(ZSUncompressedBtreeItem, t_payload);
+		if (attr->attbyval)
+			store_att_byval(dataptr, datum, attr->attlen);
+		else
+			memcpy(dataptr, DatumGetPointer(datum), datumsz);
+	}
+
+	return newitem;
+}
+
+/*
+ * Insert a new datum to the given attribute's btree.
+ *
+ * Returns the TID of the new tuple.
+ *
+ * If 'tid' is valid, then that TID is used. It better not be in use already. If
+ * it's invalid, then a new TID is allocated, as we see best. (When inserting the
+ * first column of the row, pass invalid, and for other columns, pass the TID
+ * you got for the first column.)
+ */
+zstid
+zsbt_insert(Relation rel, AttrNumber attno, Datum datum, bool isnull,
+			TransactionId xid, CommandId cid, zstid tid, ZSUndoRecPtr *undorecptr)
+{
+	TupleDesc	desc = RelationGetDescr(rel);
+	Form_pg_attribute attr = &desc->attrs[attno - 1];
+	ZSUncompressedBtreeItem *newitem;
+
+	newitem = zsbt_create_item(attr, tid, datum, isnull);
+	tid = zsbt_insert_item(rel, attno, newitem, xid, cid, undorecptr);
+
+	pfree(newitem);
+
+	return tid;
+}
+
+static zstid
+zsbt_insert_item(Relation rel, AttrNumber attno, ZSUncompressedBtreeItem *newitem,
+				 TransactionId xid, CommandId cid, ZSUndoRecPtr *undorecptr)
+{
+	zstid		tid = newitem->t_tid;
+	BlockNumber	rootblk;
+	Buffer		buf;
+	Page		page;
+	ZSBtreePageOpaque *opaque;
+	OffsetNumber maxoff;
+	zstid		lasttid;
+	zstid		insert_target_key;
+	ZSUndoRec_Insert undorec;
+
+	rootblk = zsmeta_get_root_for_attribute(rel, attno, true);
+
+	/*
+	 * If TID was given, find the right place for it. Otherwise, insert to
+	 * the rightmost leaf.
+	 *
+	 * TODO: use a Free Space Map to find suitable target.
+	 */
+	if (tid != InvalidZSTid)
+		insert_target_key = tid;
+	else
+		insert_target_key = MaxZSTid;
+
+	buf = zsbt_descend(rel, rootblk, insert_target_key);
+	page = BufferGetPage(buf);
+	opaque = ZSBtreePageGetOpaque(page);
+
+	/*
+	 * Look at the last item, for its tid.
+	 */
+	maxoff = PageGetMaxOffsetNumber(page);
+	if (maxoff >= FirstOffsetNumber)
+	{
+		ItemId		iid = PageGetItemId(page, maxoff);
+		ZSBtreeItem	*hitup = (ZSBtreeItem *) PageGetItem(page, iid);
+
+		if ((hitup->t_flags & ZSBT_COMPRESSED) != 0)
+			lasttid = ((ZSCompressedBtreeItem *) hitup)->t_lasttid;
+		else
+			lasttid = hitup->t_tid;
+
+		if (tid == InvalidZSTid)
+		{
+			tid = lasttid;
+			tid = ZSTidIncrementForInsert(tid);
+		}
+	}
+	else
+	{
+		lasttid = opaque->zs_lokey;
+		if (tid == InvalidZSTid)
+			tid = lasttid;
+	}
+
+	/* Form an undo record */
+	if (!IsZSUndoRecPtrValid(undorecptr))
+	{
+		undorec.rec.size = sizeof(ZSUndoRec_Insert);
+		undorec.rec.type = ZSUNDO_TYPE_INSERT;
+		undorec.rec.attno = attno;
+		undorec.rec.xid = xid;
+		undorec.rec.cid = cid;
+		undorec.rec.tid = tid;
+		undorec.endtid = tid;
+		*undorecptr = zsundo_insert(rel, &undorec.rec);
+	}
+
+	/* fill in the remaining fields in the item */
+	newitem->t_undo_ptr = *undorecptr;
+
+	if (newitem->t_tid == InvalidZSTid)
+		newitem->t_tid = tid;
+
+	/*
+	 * If there's enough space on the page, insert it directly. Otherwise, try to
+	 * compress all existing items. If that still doesn't create enough space, we
+	 * have to split the page.
+	 *
+	 * TODO: We also resort to the slow way, if the new TID is not at the end of
+	 * the page. Things get difficult, if the new TID is covered by the range of
+	 * an existing compressed item.
+	 */
+	if (PageGetFreeSpace(page) >= MAXALIGN(newitem->t_size) &&
+		(maxoff == InvalidOffsetNumber || tid > lasttid))
+	{
+		OffsetNumber off;
+
+		off = PageAddItemExtended(page, (Item) newitem, newitem->t_size, maxoff + 1, PAI_OVERWRITE);
+		if (off == InvalidOffsetNumber)
+			elog(ERROR, "didn't fit, after all?");
+		MarkBufferDirty(buf);
+		/* TODO: WAL-log */
+
+		UnlockReleaseBuffer(buf);
+	}
+	else
+	{
+		/* recompress and possibly split the page */
+		zsbt_replace_item(rel, attno, buf,
+						  NULL, NULL,
+						  (ZSBtreeItem *) newitem, NIL);
+		/* zsbt_replace_item unlocked 'buf' */
+		ReleaseBuffer(buf);
+	}
+
+	return tid;
+}
+
+/*
+ * Insert a multiple items to the given attribute's btree.
+ *
+ * Populates the TIDs of the new tuples.
+ *
+ * If 'tid' in list is valid, then that TID is used. It better not be in use already. If
+ * it's invalid, then a new TID is allocated, as we see best. (When inserting the
+ * first column of the row, pass invalid, and for other columns, pass the TID
+ * you got for the first column.)
+ *
+ * TODO: this routine is very similar to zsbt_insert_item() can be easily combined.
+ */
+void
+zsbt_insert_multi_items(Relation rel, AttrNumber attno, List *newitems,
+						TransactionId xid, CommandId cid,
+						ZSUndoRecPtr *undorecptr, zstid *tid_return_list)
+{
+	ZSUncompressedBtreeItem *firstItem = (ZSUncompressedBtreeItem *) linitial(newitems);
+	zstid		tid = firstItem->t_tid;
+	zstid       firsttid;
+	BlockNumber	rootblk;
+	Buffer		buf;
+	Page		page;
+	ZSBtreePageOpaque *opaque;
+	OffsetNumber maxoff;
+	zstid		lasttid;
+	zstid		insert_target_key;
+	ZSUndoRec_Insert undorec;
+	int			i;
+	ListCell   *lc;
+
+	rootblk = zsmeta_get_root_for_attribute(rel, attno, true);
+
+	/*
+	 * If TID was given, find the right place for it. Otherwise, insert to
+	 * the rightmost leaf.
+	 *
+	 * TODO: use a Free Space Map to find suitable target.
+	 */
+	if (tid != InvalidZSTid)
+		insert_target_key = tid;
+	else
+		insert_target_key = MaxZSTid;
+
+	buf = zsbt_descend(rel, rootblk, insert_target_key);
+	page = BufferGetPage(buf);
+	opaque = ZSBtreePageGetOpaque(page);
+
+	/*
+	 * Look at the last item, for its tid.
+	 */
+	maxoff = PageGetMaxOffsetNumber(page);
+	if (maxoff >= FirstOffsetNumber)
+	{
+		ItemId		iid = PageGetItemId(page, maxoff);
+		ZSBtreeItem	*hitup = (ZSBtreeItem *) PageGetItem(page, iid);
+
+		if ((hitup->t_flags & ZSBT_COMPRESSED) != 0)
+			lasttid = ((ZSCompressedBtreeItem *) hitup)->t_lasttid;
+		else
+			lasttid = hitup->t_tid;
+
+		if (tid == InvalidZSTid)
+		{
+			tid = lasttid;
+			tid = ZSTidIncrementForInsert(tid);
+		}
+	}
+	else
+	{
+		lasttid = opaque->zs_lokey;
+		if (tid == InvalidZSTid)
+			tid = lasttid;
+	}
+
+	firsttid = tid;
+	lasttid = InvalidZSTid;
+	i = 0;
+
+	/* assign tids for all the items */
+	foreach(lc, newitems)
+	{
+		ZSUncompressedBtreeItem *newitem = (ZSUncompressedBtreeItem *) lfirst(lc);
+
+		/* fill in the remaining fields in the item */
+		newitem->t_undo_ptr = *undorecptr;
+
+		if (newitem->t_tid == InvalidZSTid)
+		{
+			newitem->t_tid = tid;
+			tid_return_list[i++] = tid;
+			lasttid = tid;
+
+			tid = ZSTidIncrementForInsert(tid);
+		}
+	}
+
+	/* Form an undo record */
+	if (!IsZSUndoRecPtrValid(undorecptr))
+	{
+		undorec.rec.size = sizeof(ZSUndoRec_Insert);
+		undorec.rec.type = ZSUNDO_TYPE_INSERT;
+		undorec.rec.attno = attno;
+		undorec.rec.xid = xid;
+		undorec.rec.cid = cid;
+		undorec.rec.tid = firsttid;
+		undorec.endtid = lasttid;
+		*undorecptr = zsundo_insert(rel, &undorec.rec);
+	}
+
+	/*
+	 * update undo record pointer for all the items.
+	 *
+	 * TODO: refactor later to avoid this loop. Can assign above with tids as
+	 * undo pointer is known similar to tid for rest of the columns, but just
+	 * not for first attribute.
+	 */
+	foreach(lc, newitems)
+	{
+		ZSUncompressedBtreeItem *newitem = (ZSUncompressedBtreeItem *) lfirst(lc);
+		newitem->t_undo_ptr = *undorecptr;
+	}
+
+	while (list_length(newitems))
+	{
+		ZSUncompressedBtreeItem *newitem = (ZSUncompressedBtreeItem *) linitial(newitems);
+
+		/*
+		 * If there's enough space on the page, insert it directly. Otherwise, try to
+		 * compress all existing items. If that still doesn't create enough space, we
+		 * have to split the page.
+		 *
+		 * TODO: We also resort to the slow way, if the new TID is not at the end of
+		 * the page. Things get difficult, if the new TID is covered by the range of
+		 * an existing compressed item.
+		 */
+		if (PageGetFreeSpace(page) >= MAXALIGN(newitem->t_size) &&
+			(maxoff > FirstOffsetNumber || tid > lasttid))
+		{
+			OffsetNumber off;
+
+			off = PageAddItemExtended(page, (Item) newitem, newitem->t_size,
+									  maxoff + 1, PAI_OVERWRITE);
+			if (off == InvalidOffsetNumber)
+				elog(ERROR, "didn't fit, after all?");
+
+			maxoff = PageGetMaxOffsetNumber(page);
+			newitems = list_delete_first(newitems);
+		}
+		else
+			break;
+	}
+
+	if (list_length(newitems))
+	{
+		/* recompress and possibly split the page */
+		zsbt_replace_item(rel, attno, buf,
+						  NULL, NULL,
+						  NULL, newitems);
+		/* zsbt_replace_item unlocked 'buf' */
+		ReleaseBuffer(buf);
+	}
+	else
+	{
+		MarkBufferDirty(buf);
+		/* TODO: WAL-log */
+		UnlockReleaseBuffer(buf);
+	}
+}
+
+TM_Result
+zsbt_delete(Relation rel, AttrNumber attno, zstid tid,
+			TransactionId xid, CommandId cid,
+			Snapshot snapshot, Snapshot crosscheck, bool wait,
+			TM_FailureData *hufd, bool changingPart)
+{
+	ZSBtreeScan scan;
+	ZSUncompressedBtreeItem *item;
+	TM_Result	result;
+	ZSUndoRecPtr undorecptr;
+	ZSUncompressedBtreeItem *deleteditem;
+
+	zsbt_begin_scan(rel, attno, tid, snapshot, &scan);
+	scan.for_update = true;
+
+	/* Find the item to delete. (It could be compressed) */
+	item = zsbt_scan_next_internal(&scan);
+	if (item->t_tid != tid)
+	{
+		/*
+		 * or should this be TM_Invisible? The heapam at least just throws
+		 * an error, I think..
+		 */
+		elog(ERROR, "could not find tuple to delete with TID (%u, %u) for attribute %d",
+			 ZSTidGetBlockNumber(tid), ZSTidGetOffsetNumber(tid), attno);
+	}
+	result = zs_SatisfiesUpdate(&scan, item);
+	if (result != TM_Ok)
+	{
+		zsbt_end_scan(&scan);
+		return result;
+	}
+
+	/* Create UNDO record. */
+	{
+		ZSUndoRec_Delete undorec;
+
+		undorec.rec.size = sizeof(ZSUndoRec_Delete);
+		undorec.rec.type = ZSUNDO_TYPE_DELETE;
+		undorec.rec.attno = attno;
+		undorec.rec.xid = xid;
+		undorec.rec.cid = cid;
+		undorec.rec.tid = tid;
+		undorec.prevundorec = item->t_undo_ptr;
+
+		undorecptr = zsundo_insert(rel, &undorec.rec);
+	}
+
+	/* Replace the ZSBreeItem with a DELETED item. */
+	deleteditem = palloc(item->t_size);
+	memcpy(deleteditem, item, item->t_size);
+	deleteditem->t_flags |= ZSBT_DELETED;
+	deleteditem->t_undo_ptr = undorecptr;
+
+	zsbt_replace_item(rel, attno, scan.lastbuf,
+					  (ZSBtreeItem *) item, (ZSBtreeItem *) deleteditem,
+					  NULL, NIL);
+	scan.lastbuf_is_locked = false;	/* zsbt_replace_item released */
+	zsbt_end_scan(&scan);
+
+	pfree(deleteditem);
+
+	return TM_Ok;
+}
+
+/*
+ * If 'newtid' is valid, then that TID is used for the new item. It better not
+ * be in use already. If it's invalid, then a new TID is allocated, as we see
+ * best. (When inserting the first column of the row, pass invalid, and for
+ * other columns, pass the TID you got for the first column.)
+ */
+TM_Result
+zsbt_update(Relation rel, AttrNumber attno, zstid otid, Datum newdatum,
+			bool newisnull, TransactionId xid, CommandId cid, Snapshot snapshot,
+			Snapshot crosscheck, bool wait, TM_FailureData *hufd,
+			zstid *newtid_p)
+{
+	TupleDesc	desc = RelationGetDescr(rel);
+	Form_pg_attribute attr = &desc->attrs[attno - 1];
+	ZSBtreeScan scan;
+	ZSUncompressedBtreeItem *olditem;
+	TM_Result	result;
+	ZSUndoRecPtr undorecptr;
+	ZSUncompressedBtreeItem *deleteditem;
+	ZSUncompressedBtreeItem *newitem;
+	OffsetNumber maxoff;
+	Buffer		buf;
+	Page		page;
+	ZSBtreePageOpaque *opaque;
+	zstid		newtid = *newtid_p;
+	Size		datumsz;
+	Size		newitemsz;
+	char	   *dataptr;
+
+	/*
+	 * Find the item to delete.  It could be part of a compressed item,
+	 * we let zsbt_scan_next_internal() handle that.
+	 */
+	zsbt_begin_scan(rel, attno, otid, snapshot, &scan);
+	scan.for_update = true;
+
+	olditem = zsbt_scan_next_internal(&scan);
+	if (olditem->t_tid != otid)
+	{
+		/*
+		 * or should this be TM_Invisible? The heapam at least just throws
+		 * an error, I think..
+		 */
+		elog(ERROR, "could not find old tuple to update with TID (%u, %u) for attribute %d",
+			 ZSTidGetBlockNumber(otid), ZSTidGetOffsetNumber(otid), attno);
+	}
+
+	/*
+	 * Is it visible to us?
+	 */
+	result = zs_SatisfiesUpdate(&scan, olditem);
+	if (result != TM_Ok)
+	{
+		zsbt_end_scan(&scan);
+		return result;
+	}
+
+	/*
+	 * Look at the last item on the page, for its tid. We will use that + 1,
+	 * as the TID of the new item.
+	 */
+	buf = scan.lastbuf;
+	page = BufferGetPage(buf);
+	opaque = ZSBtreePageGetOpaque(page);
+	maxoff = PageGetMaxOffsetNumber(page);
+	if (maxoff >= FirstOffsetNumber)
+	{
+		ItemId		iid = PageGetItemId(page, maxoff);
+		ZSBtreeItem	*hitup = (ZSBtreeItem *) PageGetItem(page, iid);
+
+		if ((hitup->t_flags & ZSBT_COMPRESSED) != 0)
+			newtid = ((ZSCompressedBtreeItem *) hitup)->t_lasttid;
+		else
+			newtid = hitup->t_tid;
+		newtid = ZSTidIncrementForInsert(newtid);
+	}
+	else
+	{
+		newtid = opaque->zs_lokey;
+	}
+
+	if (newtid >= opaque->zs_hikey)
+	{
+		/* no more free TIDs on the page. Bail out */
+		/*
+		 * TODO: what we should do, is to find another target page for the
+		 * new tuple.
+		 */
+		elog(ERROR, "out of TID space on page");
+	}
+
+	/* Create UNDO record. */
+	{
+		ZSUndoRec_Update undorec;
+
+		undorec.rec.size = sizeof(ZSUndoRec_Update);
+		undorec.rec.type = ZSUNDO_TYPE_UPDATE;
+		undorec.rec.attno = attno;
+		undorec.rec.xid = xid;
+		undorec.rec.cid = cid;
+		undorec.rec.tid = newtid;
+		undorec.prevundorec = olditem->t_undo_ptr;
+		undorec.otid = otid;
+
+		undorecptr = zsundo_insert(rel, &undorec.rec);
+	}
+
+	/* Replace the ZSBreeItem with an UPDATED item. */
+	deleteditem = palloc(olditem->t_size);
+	memcpy(deleteditem, olditem, olditem->t_size);
+	deleteditem->t_flags |= ZSBT_UPDATED;
+	deleteditem->t_undo_ptr = undorecptr;
+
+	/*
+	 * Form a ZSBtreeItem to insert.
+	 */
+	if (newisnull)
+		datumsz = 0;
+	else
+		datumsz = zs_datumGetSize(newdatum, attr->attbyval, attr->attlen);
+	newitemsz = offsetof(ZSUncompressedBtreeItem, t_payload) + datumsz;
+
+	newitem = palloc(newitemsz);
+	memset(newitem, 0, offsetof(ZSUncompressedBtreeItem, t_payload)); /* zero padding */
+	newitem->t_tid = newtid;
+	newitem->t_flags = 0;
+	newitem->t_size = newitemsz;
+	newitem->t_undo_ptr = undorecptr;
+
+	if (newisnull)
+		newitem->t_flags |= ZSBT_NULL;
+	else
+	{
+		dataptr = ((char *) newitem) + offsetof(ZSUncompressedBtreeItem, t_payload);
+		if (attr->attbyval)
+			store_att_byval(dataptr, newdatum, attr->attlen);
+		else
+			memcpy(dataptr, DatumGetPointer(newdatum), datumsz);
+	}
+
+	zsbt_replace_item(rel, attno, scan.lastbuf,
+					  (ZSBtreeItem *) olditem, (ZSBtreeItem *) deleteditem,
+					  (ZSBtreeItem *) newitem, NIL);
+	scan.lastbuf_is_locked = false;	/* zsbt_recompress_replace released */
+	zsbt_end_scan(&scan);
+
+	pfree(deleteditem);
+	pfree(newitem);
+
+	*newtid_p = newtid;
+	return TM_Ok;
+}
+
+/*
+ * Mark item with given TID as dead.
+ *
+ * This is used during VACUUM.
+ */
+void
+zsbt_mark_item_dead(Relation rel, AttrNumber attno, zstid tid, ZSUndoRecPtr undoptr)
+{
+	ZSBtreeScan scan;
+	ZSUncompressedBtreeItem *item;
+	ZSUncompressedBtreeItem deaditem;
+
+	zsbt_begin_scan(rel, attno, tid, NULL, &scan);
+	scan.for_update = true;
+
+	/* Find the item to delete. (It could be compressed) */
+	item = zsbt_scan_next_internal(&scan);
+	if (item->t_tid != tid)
+	{
+		zsbt_end_scan(&scan);
+		elog(WARNING, "could not find tuple to remove with TID (%u, %u) for attribute %d",
+			 ZSTidGetBlockNumber(tid), ZSTidGetOffsetNumber(tid), attno);
+		return;
+	}
+
+	/* Replace the ZSBreeItem with a DEAD item. (Unless it's already dead) */
+	if ((item->t_flags & ZSBT_DEAD) != 0)
+		return;
+
+	memset(&deaditem, 0, offsetof(ZSUncompressedBtreeItem, t_payload));
+	deaditem.t_tid = tid;
+	deaditem.t_size = sizeof(ZSUncompressedBtreeItem);
+	deaditem.t_flags = ZSBT_DEAD;
+	deaditem.t_undo_ptr = undoptr;
+
+	zsbt_replace_item(rel, attno, scan.lastbuf,
+					  (ZSBtreeItem *) item, (ZSBtreeItem *) &deaditem,
+					  NULL, NIL);
+	scan.lastbuf_is_locked = false;	/* zsbt_replace_item released */
+	zsbt_end_scan(&scan);
+}
+
+/* ----------------------------------------------------------------
+ *						 Internal routines
+ * ----------------------------------------------------------------
+ */
+
+/*
+ * Find the leaf page containing the given key TID.
+ */
+static Buffer
+zsbt_descend(Relation rel, BlockNumber rootblk, zstid key)
+{
+	BlockNumber next;
+	Buffer		buf;
+	Page		page;
+	ZSBtreePageOpaque *opaque;
+	ZSBtreeInternalPageItem *items;
+	int			nitems;
+	int			itemno;
+	int			nextlevel = -1;
+
+	next = rootblk;
+	for (;;)
+	{
+		buf = ReadBuffer(rel, next);
+		LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);		/* TODO: shared */
+		page = BufferGetPage(buf);
+		opaque = ZSBtreePageGetOpaque(page);
+
+		if (nextlevel == -1)
+			nextlevel = opaque->zs_level;
+		else if (opaque->zs_level != nextlevel)
+			elog(ERROR, "unexpected level encountered when descending tree");
+
+		if (opaque->zs_level == 0)
+			return buf;
+
+		/*
+		 * Do we need to walk right? This could happen if the page was concurrently split.
+		 */
+		if (key >= opaque->zs_hikey)
+		{
+			/* follow the right-link */
+			next = opaque->zs_next;
+			if (next == InvalidBlockNumber)
+				elog(ERROR, "fell off the end of btree");
+		}
+		else
+		{
+			/* follow the downlink */
+			items = ZSBtreeInternalPageGetItems(page);
+			nitems = ZSBtreeInternalPageGetNumItems(page);
+
+			itemno = zsbt_binsrch_internal(key, items, nitems);
+			if (itemno < 0)
+				elog(ERROR, "could not descend tree for tid (%u, %u)",
+					 ZSTidGetBlockNumber(key), ZSTidGetOffsetNumber(key));
+			next = items[itemno].childblk;
+			nextlevel--;
+		}
+		UnlockReleaseBuffer(buf);
+	}
+}
+
+/*
+ * Re-find the parent page containing downlink for given block.
+ * The returned page is exclusive-locked, and *itemno_p is set to the
+ * position of the downlink in the parent.
+ *
+ * If 'childblk' is the root, returns InvalidBuffer.
+ */
+static Buffer
+zsbt_find_downlink(Relation rel, AttrNumber attno,
+				   zstid key, BlockNumber childblk, int level,
+				   int *itemno_p)
+{
+	BlockNumber rootblk;
+	BlockNumber next;
+	Buffer		buf;
+	Page		page;
+	ZSBtreePageOpaque *opaque;
+	ZSBtreeInternalPageItem *items;
+	int			nitems;
+	int			itemno;
+	int			nextlevel = -1;
+
+	/* start from root */
+	rootblk = zsmeta_get_root_for_attribute(rel, attno, true);
+	if (rootblk == childblk)
+		return InvalidBuffer;
+
+	/* XXX: this is mostly the same as zsbt_descend, but we stop at an internal
+	 * page instead of descending all the way down to leaf */
+	next = rootblk;
+	for (;;)
+	{
+		buf = ReadBuffer(rel, next);
+		LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+		page = BufferGetPage(buf);
+		opaque = ZSBtreePageGetOpaque(page);
+
+		if (nextlevel == -1)
+			nextlevel = opaque->zs_level;
+		else if (nextlevel != opaque->zs_level)
+			elog(ERROR, "unexpected level encountered when descending tree");
+
+		if (opaque->zs_level <= level)
+			elog(ERROR, "unexpected page level encountered");
+
+		/*
+		 * Do we need to walk right? This could happen if the page was concurrently split.
+		 */
+		if (key >= opaque->zs_hikey)
+		{
+			next = opaque->zs_next;
+			if (next == InvalidBlockNumber)
+				elog(ERROR, "fell off the end of btree");
+		}
+		else
+		{
+			items = ZSBtreeInternalPageGetItems(page);
+			nitems = ZSBtreeInternalPageGetNumItems(page);
+
+			itemno = zsbt_binsrch_internal(key, items, nitems);
+			if (itemno < 0)
+				elog(ERROR, "could not descend tree for tid (%u, %u)",
+					 ZSTidGetBlockNumber(key), ZSTidGetOffsetNumber(key));
+
+			if (opaque->zs_level == level + 1)
+			{
+				if (items[itemno].childblk != childblk)
+					elog(ERROR, "could not re-find downlink for block %u", childblk);
+				*itemno_p = itemno;
+				return buf;
+			}
+
+			next = items[itemno].childblk;
+			nextlevel--;
+		}
+		UnlockReleaseBuffer(buf);
+	}
+}
+
+/*
+ * Create a new btree root page, containing two downlinks.
+ *
+ * NOTE: the very first root page of a btree, which is also the leaf, is created
+ * in zsmeta_get_root_for_attribute(), not here.
+ */
+static void
+zsbt_newroot(Relation rel, AttrNumber attno, int level,
+			 zstid key1, BlockNumber blk1,
+			 zstid key2, BlockNumber blk2,
+			 Buffer leftchildbuf)
+{
+	ZSBtreePageOpaque *opaque;
+	ZSBtreePageOpaque *leftchildopaque;
+	Buffer		buf;
+	Page		page;
+	ZSBtreeInternalPageItem *items;
+	Buffer		metabuf;
+
+	metabuf = ReadBuffer(rel, ZS_META_BLK);
+	LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
+
+	Assert(key1 < key2);
+
+	buf = zs_getnewbuf(rel);
+	page = BufferGetPage(buf);
+	PageInit(page, BLCKSZ, sizeof(ZSBtreePageOpaque));
+	opaque = ZSBtreePageGetOpaque(page);
+	opaque->zs_attno = attno;
+	opaque->zs_next = InvalidBlockNumber;
+	opaque->zs_lokey = MinZSTid;
+	opaque->zs_hikey = MaxPlusOneZSTid;
+	opaque->zs_level = level;
+	opaque->zs_flags = 0;
+	opaque->zs_page_id = ZS_BTREE_PAGE_ID;
+
+	items = ZSBtreeInternalPageGetItems(page);
+	items[0].tid = key1;
+	items[0].childblk =  blk1;
+	items[1].tid = key2;
+	items[1].childblk = blk2;
+	((PageHeader) page)->pd_lower += 2 * sizeof(ZSBtreeInternalPageItem);
+	Assert(ZSBtreeInternalPageGetNumItems(page) == 2);
+
+	/* clear the follow-right flag on left child */
+	leftchildopaque = ZSBtreePageGetOpaque(BufferGetPage(leftchildbuf));
+	leftchildopaque->zs_flags &= ZS_FOLLOW_RIGHT;
+
+	/* TODO: wal-log all, including metapage */
+
+	MarkBufferDirty(buf);
+	MarkBufferDirty(leftchildbuf);
+
+	/* Before exiting, update the metapage */
+	zsmeta_update_root_for_attribute(rel, attno, metabuf, BufferGetBlockNumber(buf));
+
+	UnlockReleaseBuffer(leftchildbuf);
+	UnlockReleaseBuffer(buf);
+	UnlockReleaseBuffer(metabuf);
+}
+
+/*
+ * After page split, insert the downlink of 'rightblkno' to the parent.
+ *
+ * On entry, 'leftbuf' must be pinned exclusive-locked. It is released on exit.
+ */
+static void
+zsbt_insert_downlink(Relation rel, AttrNumber attno, Buffer leftbuf,
+					 zstid rightlokey, BlockNumber rightblkno)
+{
+	BlockNumber	leftblkno = BufferGetBlockNumber(leftbuf);
+	Page		leftpage = BufferGetPage(leftbuf);
+	ZSBtreePageOpaque *leftopaque = ZSBtreePageGetOpaque(leftpage);
+	zstid		leftlokey = leftopaque->zs_lokey;
+	ZSBtreeInternalPageItem *items;
+	int			nitems;
+	int			itemno;
+	Buffer		parentbuf;
+	Page		parentpage;
+
+	/*
+	 * re-find parent
+	 *
+	 * TODO: this is a bit inefficient. Usually, we have just descended the
+	 * tree, and if we just remembered the path we descended, we could just
+	 * walk back up.
+	 */
+	parentbuf = zsbt_find_downlink(rel, attno, leftlokey, leftblkno, leftopaque->zs_level, &itemno);
+	if (parentbuf == InvalidBuffer)
+	{
+		zsbt_newroot(rel, attno, leftopaque->zs_level + 1,
+					 leftlokey, BufferGetBlockNumber(leftbuf),
+					 rightlokey, rightblkno, leftbuf);
+		return;
+	}
+	parentpage = BufferGetPage(parentbuf);
+
+	/* Find the position in the parent for the downlink */
+	items = ZSBtreeInternalPageGetItems(parentpage);
+	nitems = ZSBtreeInternalPageGetNumItems(parentpage);
+	itemno = zsbt_binsrch_internal(rightlokey, items, nitems);
+
+	/* sanity checks */
+	if (itemno < 1 || items[itemno].tid != leftlokey ||
+		items[itemno].childblk != leftblkno)
+	{
+		elog(ERROR, "could not find downlink");
+	}
+	itemno++;
+
+	if (ZSBtreeInternalPageIsFull(parentpage))
+	{
+		/* split internal page */
+		zsbt_split_internal_page(rel, attno, parentbuf, leftbuf, itemno, rightlokey, rightblkno);
+	}
+	else
+	{
+		/* insert the new downlink for the right page. */
+		memmove(&items[itemno + 1],
+				&items[itemno],
+				(nitems - itemno) * sizeof(ZSBtreeInternalPageItem));
+		items[itemno].tid = rightlokey;
+		items[itemno].childblk = rightblkno;
+		((PageHeader) parentpage)->pd_lower += sizeof(ZSBtreeInternalPageItem);
+
+		leftopaque->zs_flags &= ~ZS_FOLLOW_RIGHT;
+
+		/* TODO: WAL-log */
+
+		MarkBufferDirty(leftbuf);
+		MarkBufferDirty(parentbuf);
+		UnlockReleaseBuffer(leftbuf);
+		UnlockReleaseBuffer(parentbuf);
+	}
+}
+
+/*
+ * Split an internal page.
+ *
+ * The new downlink specified by 'newkey' and 'childblk' is inserted to
+ * position 'newoff', on 'leftbuf'. The page is split.
+ */
+static void
+zsbt_split_internal_page(Relation rel, AttrNumber attno, Buffer leftbuf, Buffer childbuf,
+						 OffsetNumber newoff, zstid newkey, BlockNumber childblk)
+{
+	Buffer		rightbuf;
+	Page		origpage = BufferGetPage(leftbuf);
+	Page		leftpage;
+	Page		rightpage;
+	BlockNumber rightblkno;
+	ZSBtreePageOpaque *leftopaque;
+	ZSBtreePageOpaque *rightopaque;
+	ZSBtreeInternalPageItem *origitems;
+	ZSBtreeInternalPageItem *leftitems;
+	ZSBtreeInternalPageItem *rightitems;
+	int			orignitems;
+	int			leftnitems;
+	int			rightnitems;
+	int			splitpoint;
+	zstid		splittid;
+	bool		newitemonleft;
+	int			i;
+	ZSBtreeInternalPageItem newitem;
+
+	leftpage = PageGetTempPageCopySpecial(origpage);
+	leftopaque = ZSBtreePageGetOpaque(leftpage);
+	Assert(leftopaque->zs_level > 0);
+	/* any previous incomplete split must be finished first */
+	Assert((leftopaque->zs_flags & ZS_FOLLOW_RIGHT) == 0);
+
+	rightbuf = zs_getnewbuf(rel);
+	rightpage = BufferGetPage(rightbuf);
+	rightblkno = BufferGetBlockNumber(rightbuf);
+	PageInit(rightpage, BLCKSZ, sizeof(ZSBtreePageOpaque));
+	rightopaque = ZSBtreePageGetOpaque(rightpage);
+
+	/*
+	 * Figure out the split point.
+	 *
+	 * TODO: currently, always do 90/10 split.
+	 */
+	origitems = ZSBtreeInternalPageGetItems(origpage);
+	orignitems = ZSBtreeInternalPageGetNumItems(origpage);
+	splitpoint = orignitems * 0.9;
+	splittid = origitems[splitpoint].tid;
+	newitemonleft = (newkey < splittid);
+
+	/* Set up the page headers */
+	rightopaque->zs_attno = attno;
+	rightopaque->zs_next = leftopaque->zs_next;
+	rightopaque->zs_lokey = splittid;
+	rightopaque->zs_hikey = leftopaque->zs_hikey;
+	rightopaque->zs_level = leftopaque->zs_level;
+	rightopaque->zs_flags = 0;
+	rightopaque->zs_page_id = ZS_BTREE_PAGE_ID;
+
+	leftopaque->zs_next = rightblkno;
+	leftopaque->zs_hikey = splittid;
+	leftopaque->zs_flags |= ZS_FOLLOW_RIGHT;
+
+	/* copy the items */
+	leftitems = ZSBtreeInternalPageGetItems(leftpage);
+	leftnitems = 0;
+	rightitems = ZSBtreeInternalPageGetItems(rightpage);
+	rightnitems = 0;
+
+	newitem.tid = newkey;
+	newitem.childblk = childblk;
+
+	for (i = 0; i < orignitems; i++)
+	{
+		if (i == newoff)
+		{
+			if (newitemonleft)
+				leftitems[leftnitems++] = newitem;
+			else
+				rightitems[rightnitems++] = newitem;
+		}
+
+		if (i < splitpoint)
+			leftitems[leftnitems++] = origitems[i];
+		else
+			rightitems[rightnitems++] = origitems[i];
+	}
+	/* cope with possibility that newitem goes at the end */
+	if (i <= newoff)
+	{
+		Assert(!newitemonleft);
+		rightitems[rightnitems++] = newitem;
+	}
+	((PageHeader) leftpage)->pd_lower += leftnitems * sizeof(ZSBtreeInternalPageItem);
+	((PageHeader) rightpage)->pd_lower += rightnitems * sizeof(ZSBtreeInternalPageItem);
+
+	Assert(leftnitems + rightnitems == orignitems + 1);
+
+	PageRestoreTempPage(leftpage, origpage);
+
+	/* TODO: WAL-logging */
+	MarkBufferDirty(leftbuf);
+	MarkBufferDirty(rightbuf);
+
+	MarkBufferDirty(childbuf);
+	ZSBtreePageGetOpaque(BufferGetPage(childbuf))->zs_flags &= ~ZS_FOLLOW_RIGHT;
+	UnlockReleaseBuffer(childbuf);
+
+	UnlockReleaseBuffer(rightbuf);
+
+	/* recurse to insert downlink. (this releases 'leftbuf') */
+	zsbt_insert_downlink(rel, attno, leftbuf, splittid, rightblkno);
+}
+
+/*
+ * Returns the next item in the scan. This doesn't pay attention to visibility.
+ *
+ * The returned pointer might point directly to a btree-buffer, or it might be
+ * palloc'd copy. If it points to a buffer, scan->lastbuf_is_locked is true,
+ * otherwise false.
+ */
+static ZSUncompressedBtreeItem *
+zsbt_scan_next_internal(ZSBtreeScan *scan)
+{
+	Buffer		buf;
+	Page		page;
+	ZSBtreePageOpaque *opaque;
+	OffsetNumber off;
+	OffsetNumber maxoff;
+	BlockNumber	next;
+
+	if (!scan->active)
+		return NULL;
+
+	for (;;)
+	{
+		while (scan->has_decompressed)
+		{
+			ZSUncompressedBtreeItem *item = zs_decompress_read_item(&scan->decompressor);
+
+			if (item == NULL)
+			{
+				scan->has_decompressed = false;
+				break;
+			}
+			if (item->t_tid >= scan->nexttid)
+			{
+				scan->nexttid = item->t_tid;
+				scan->nexttid = ZSTidIncrement(scan->nexttid);
+				return item;
+			}
+		}
+
+		buf = scan->lastbuf;
+		page = BufferGetPage(buf);
+		opaque = ZSBtreePageGetOpaque(page);
+
+		if (!scan->lastbuf_is_locked)
+			LockBuffer(buf, scan->for_update ? BUFFER_LOCK_EXCLUSIVE : BUFFER_LOCK_SHARE);
+		scan->lastbuf_is_locked = true;
+
+		/* TODO: check that the page is a valid zs btree page */
+
+		/* TODO: check the last offset first, as an optimization */
+		maxoff = PageGetMaxOffsetNumber(page);
+		for (off = FirstOffsetNumber; off <= maxoff; off++)
+		{
+			ItemId		iid = PageGetItemId(page, off);
+			ZSBtreeItem	*item = (ZSBtreeItem *) PageGetItem(page, iid);
+
+			if ((item->t_flags & ZSBT_COMPRESSED) != 0)
+			{
+				ZSCompressedBtreeItem *citem = (ZSCompressedBtreeItem *) item;
+
+				if (citem->t_lasttid >= scan->nexttid)
+				{
+					zs_decompress_chunk(&scan->decompressor, citem);
+					scan->has_decompressed = true;
+					if (!scan->for_update)
+					{
+						LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+						scan->lastbuf_is_locked = false;
+					}
+					break;
+				}
+			}
+			else
+			{
+				ZSUncompressedBtreeItem *uitem = (ZSUncompressedBtreeItem *) item;
+
+				if (uitem->t_tid >= scan->nexttid)
+				{
+					scan->nexttid = uitem->t_tid;
+					scan->nexttid = ZSTidIncrement(scan->nexttid);
+					return uitem;
+				}
+			}
+		}
+
+		if (scan->has_decompressed)
+			continue;
+
+		/* No more items on this page. Walk right, if possible */
+		next = opaque->zs_next;
+		if (next == BufferGetBlockNumber(buf))
+			elog(ERROR, "btree page %u next-pointer points to itself", next);
+		LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+		scan->lastbuf_is_locked = false;
+
+		if (next == InvalidBlockNumber)
+		{
+			scan->active = false;
+			ReleaseBuffer(scan->lastbuf);
+			scan->lastbuf = InvalidBuffer;
+			return NULL;
+		}
+
+		scan->lastbuf = ReleaseAndReadBuffer(scan->lastbuf, scan->rel, next);
+	}
+}
+
+/*
+ * This helper function is used to implement INSERT, UPDATE and DELETE.
+ *
+ * If 'olditem' is not NULL, then 'olditem' on the page is replaced with
+ * 'replacementitem'. 'replacementitem' can be NULL, to remove an old item.
+ *
+ * If 'newitem' is not NULL, it is added to the page, to the correct position.
+ *
+ * This function handles decompressing and recompressing items, and splitting
+ * the page if needed.
+ */
+static void
+zsbt_replace_item(Relation rel, AttrNumber attno, Buffer buf,
+				  ZSBtreeItem *olditem,
+				  ZSBtreeItem *replacementitem,
+				  ZSBtreeItem *newitem,
+				  List       *newitems)
+{
+	Page		page = BufferGetPage(buf);
+	OffsetNumber off;
+	OffsetNumber maxoff;
+	List	   *items;
+	bool		found_old_item = false;
+	/* We might need to decompress up to two previously compressed items */
+	ZSDecompressContext decompressors[2];
+	int			numdecompressors = 0;
+
+	/*
+	 * Helper routine, to append the given old item 'x' to the list.
+	 * If the 'x' matches the old item, then append 'replacementitem' instead.
+	 * And if thew 'newitem' shoudl go before 'x', then append that first.
+	 *
+	 * TODO: We could also leave out any old, deleted, items that are no longer
+	 * visible to anyone.
+	 */
+#define PROCESS_ITEM(x) \
+	do { \
+		if (newitem && (x)->t_tid >= newitem->t_tid) \
+		{ \
+			Assert((x)->t_tid != newitem->t_tid); \
+			items = lappend(items, newitem); \
+			newitem = NULL; \
+		} \
+		if (olditem && (x)->t_tid == olditem->t_tid) \
+		{ \
+			Assert(!found_old_item); \
+			found_old_item = true; \
+			if (replacementitem) \
+				items = lappend(items, replacementitem); \
+		} \
+		else \
+			items = lappend(items, x); \
+	} while(0)
+
+	/* Loop through all old items on the page */
+	items = NIL;
+	maxoff = PageGetMaxOffsetNumber(page);
+	for (off = FirstOffsetNumber; off <= maxoff; off++)
+	{
+		ItemId		iid = PageGetItemId(page, off);
+		ZSBtreeItem	*item = (ZSBtreeItem *) PageGetItem(page, iid);
+
+		if ((item->t_flags & ZSBT_COMPRESSED) != 0)
+		{
+			ZSCompressedBtreeItem *citem = (ZSCompressedBtreeItem *) item;
+
+			if ((olditem && citem->t_tid <= olditem->t_tid && olditem->t_tid <= citem->t_lasttid) ||
+				(newitem && citem->t_tid <= newitem->t_tid && newitem->t_tid <= citem->t_lasttid))
+			{
+				/* Found it, this compressed item covers the target or the new TID. */
+				/* We have to decompress it, and recompress */
+				ZSDecompressContext *decompressor = &decompressors[numdecompressors++];
+				ZSUncompressedBtreeItem *uitem;
+
+				Assert(numdecompressors <= 2);
+
+				zs_decompress_init(decompressor);
+				zs_decompress_chunk(decompressor, citem);
+
+				while ((uitem = zs_decompress_read_item(decompressor)) != NULL)
+					PROCESS_ITEM(uitem);
+			}
+			else
+			{
+				/* this item does not cover the target, nor the newitem. Add as it is. */
+				items = lappend(items, item);
+				continue;
+			}
+		}
+		else
+			PROCESS_ITEM(item);
+	}
+
+	if (olditem && !found_old_item)
+		elog(ERROR, "could not find old item to replace");
+
+	/* if the new item was not added in the loop, it goes to the end */
+	if (newitem)
+		items = lappend(items, newitem);
+
+	if (newitems)
+		items = list_concat(items, newitems);
+
+	/* Now pass the list to the recompressor. */
+	IncrBufferRefCount(buf);
+	zsbt_recompress_replace(rel, attno, buf, items);
+
+	/*
+	 * We can now free the decompression contexts. The pointers in the 'items' list
+	 * point to decompression buffers, so we cannot free them until after writing out
+	 * the pages.
+	 */
+	for (int i = 0; i < numdecompressors; i++)
+		zs_decompress_free(&decompressors[i]);
+	list_free(items);
+}
+
+/*
+ * Recompressor routines
+ */
+typedef struct
+{
+	Page		currpage;
+	ZSCompressContext compressor;
+	int			compressed_items;
+	List	   *pages;		/* first page writes over the old buffer,
+							 * subsequent pages get newly-allocated buffers */
+
+	int			total_items;
+	int			total_compressed_items;
+	int			total_already_compressed_items;
+
+	AttrNumber	attno;
+	zstid		hikey;
+} zsbt_recompress_context;
+
+static void
+zsbt_recompress_newpage(zsbt_recompress_context *cxt, zstid nexttid)
+{
+	Page		newpage;
+	ZSBtreePageOpaque *newopaque;
+
+	if (cxt->currpage)
+	{
+		/* set the last tid on previous page */
+		ZSBtreePageOpaque *oldopaque = ZSBtreePageGetOpaque(cxt->currpage);
+
+		oldopaque->zs_hikey = nexttid;
+	}
+
+	newpage = (Page) palloc(BLCKSZ);
+	PageInit(newpage, BLCKSZ, sizeof(ZSBtreePageOpaque));
+	cxt->pages = lappend(cxt->pages, newpage);
+	cxt->currpage = newpage;
+
+	newopaque = ZSBtreePageGetOpaque(newpage);
+	newopaque->zs_attno = cxt->attno;
+	newopaque->zs_next = InvalidBlockNumber; /* filled in later */
+	newopaque->zs_lokey = nexttid;
+	newopaque->zs_hikey = cxt->hikey;		/* overwritten later, if this is not last page */
+	newopaque->zs_level = 0;
+	newopaque->zs_flags = 0;
+	newopaque->zs_page_id = ZS_BTREE_PAGE_ID;
+}
+
+static void
+zsbt_recompress_add_to_page(zsbt_recompress_context *cxt, ZSBtreeItem *item)
+{
+	if (PageGetFreeSpace(cxt->currpage) < MAXALIGN(item->t_size))
+		zsbt_recompress_newpage(cxt, item->t_tid);
+
+	if (PageAddItemExtended(cxt->currpage,
+							(Item) item, item->t_size,
+							PageGetMaxOffsetNumber(cxt->currpage) + 1,
+							PAI_OVERWRITE) == InvalidOffsetNumber)
+		elog(ERROR, "could not add item to page while recompressing");
+
+	cxt->total_items++;
+}
+
+static bool
+zsbt_recompress_add_to_compressor(zsbt_recompress_context *cxt, ZSUncompressedBtreeItem *item)
+{
+	bool		result;
+
+	if (cxt->compressed_items == 0)
+		zs_compress_begin(&cxt->compressor, PageGetFreeSpace(cxt->currpage));
+
+	result = zs_compress_add(&cxt->compressor, item);
+	if (result)
+	{
+		cxt->compressed_items++;
+
+		cxt->total_compressed_items++;
+	}
+
+	return result;
+}
+
+static void
+zsbt_recompress_flush(zsbt_recompress_context *cxt)
+{
+	ZSCompressedBtreeItem *citem;
+
+	if (cxt->compressed_items == 0)
+		return;
+
+	citem = zs_compress_finish(&cxt->compressor);
+
+	zsbt_recompress_add_to_page(cxt, (ZSBtreeItem *) citem);
+	cxt->compressed_items = 0;
+}
+
+/*
+ * Rewrite a leaf page, with given 'items' as the new content.
+ *
+ * If there are any uncompressed items in the list, we try to compress them.
+ * Any already-compressed items are added as is.
+ *
+ * If the items no longer fit on the page, then the page is split. It is
+ * entirely possible that they don't fit even on two pages; we split the page
+ * into as many pages as needed. Hopefully not more than a few pages, though,
+ * because otherwise you might hit limits on the number of buffer pins (with
+ * tiny shared_buffers).
+ *
+ * On entry, 'oldbuf' must be pinned and exclusive-locked. On exit, the lock
+ * is released, but it's still pinned.
+ */
+static void
+zsbt_recompress_replace(Relation rel, AttrNumber attno, Buffer oldbuf, List *items)
+{
+	ListCell   *lc;
+	ListCell   *lc2;
+	zsbt_recompress_context cxt;
+	ZSBtreePageOpaque *oldopaque = ZSBtreePageGetOpaque(BufferGetPage(oldbuf));
+	ZSUndoRecPtr recent_oldest_undo = { 0 };
+	List	   *bufs;
+	int			i;
+	BlockNumber orignextblk;
+
+	cxt.currpage = NULL;
+	zs_compress_init(&cxt.compressor);
+	cxt.compressed_items = 0;
+	cxt.pages = NIL;
+	cxt.attno = attno;
+	cxt.hikey = oldopaque->zs_hikey;
+
+	cxt.total_items = 0;
+	cxt.total_compressed_items = 0;
+	cxt.total_already_compressed_items = 0;
+
+	zsbt_recompress_newpage(&cxt, oldopaque->zs_lokey);
+
+	foreach(lc, items)
+	{
+		ZSBtreeItem *item = (ZSBtreeItem *) lfirst(lc);
+
+		/* We can leave out any old-enough DEAD items */
+		if ((item->t_flags & ZSBT_DEAD) != 0)
+		{
+			ZSUncompressedBtreeItem *uitem = (ZSUncompressedBtreeItem *) item;
+
+			if (recent_oldest_undo.counter == 0)
+				recent_oldest_undo = zsundo_get_oldest_undo_ptr(rel);
+
+			if (uitem->t_undo_ptr.counter < recent_oldest_undo.counter)
+				continue;
+		}
+
+		if ((item->t_flags & ZSBT_COMPRESSED) != 0)
+		{
+			/* already compressed, add as it is. */
+			zsbt_recompress_flush(&cxt);
+			cxt.total_already_compressed_items++;
+			zsbt_recompress_add_to_page(&cxt, item);
+		}
+		else
+		{
+			/* try to add this item to the compressor */
+			ZSUncompressedBtreeItem *uitem = (ZSUncompressedBtreeItem *) item;
+
+			if (!zsbt_recompress_add_to_compressor(&cxt, uitem))
+			{
+				if (cxt.compressed_items > 0)
+				{
+					/* flush, and retry */
+					zsbt_recompress_flush(&cxt);
+
+					if (!zsbt_recompress_add_to_compressor(&cxt, uitem))
+					{
+						/* could not compress, even on its own. Store it uncompressed, then */
+						zsbt_recompress_add_to_page(&cxt, item);
+					}
+				}
+				else
+				{
+					/* could not compress, even on its own. Store it uncompressed, then */
+					zsbt_recompress_add_to_page(&cxt, item);
+				}
+			}
+		}
+	}
+
+	/* flush the last one, if any */
+	zsbt_recompress_flush(&cxt);
+
+	zs_compress_free(&cxt.compressor);
+
+	/*
+	 * Ok, we now have a list of pages, to replace the original page, as private
+	 * in-memory copies. Allocate buffers for them, and write them out
+	 *
+	 * allocate all the pages before entering critical section, so that
+	 * out-of-disk-space doesn't lead to PANIC
+	 */
+	bufs = list_make1_int(oldbuf);
+	for (i = 0; i < list_length(cxt.pages) - 1; i++)
+	{
+		Buffer		newbuf = zs_getnewbuf(rel);
+
+		bufs = lappend_int(bufs, newbuf);
+	}
+
+	START_CRIT_SECTION();
+
+	orignextblk = oldopaque->zs_next;
+	forboth(lc, cxt.pages, lc2, bufs)
+	{
+		Page		page_copy = (Page) lfirst(lc);
+		Buffer		buf = (Buffer) lfirst_int(lc2);
+		Page		page = BufferGetPage(buf);
+		ZSBtreePageOpaque *opaque;
+
+		PageRestoreTempPage(page_copy, page);
+		opaque = ZSBtreePageGetOpaque(page);
+
+		/* TODO: WAL-log */
+		if (lnext(lc2))
+		{
+			Buffer		nextbuf = (Buffer) lfirst_int(lnext(lc2));
+
+			opaque->zs_next = BufferGetBlockNumber(nextbuf);
+			opaque->zs_flags |= ZS_FOLLOW_RIGHT;
+		}
+		else
+		{
+			/* last one in the chain. */
+			opaque->zs_next = orignextblk;
+		}
+
+		MarkBufferDirty(buf);
+	}
+	list_free(cxt.pages);
+
+	END_CRIT_SECTION();
+
+	/* If we had to split, insert downlinks for the new pages. */
+	while (list_length(bufs) > 1)
+	{
+		Buffer		leftbuf = (Buffer) linitial_int(bufs);
+		Buffer		rightbuf = (Buffer) lsecond_int(bufs);
+
+		zsbt_insert_downlink(rel, attno, leftbuf,
+							 ZSBtreePageGetOpaque(BufferGetPage(leftbuf))->zs_hikey,
+							 BufferGetBlockNumber(rightbuf));
+		/* zsbt_insert_downlink() released leftbuf */
+		bufs = list_delete_first(bufs);
+	}
+	/* release the last page */
+	UnlockReleaseBuffer((Buffer) linitial_int(bufs));
+	list_free(bufs);
+}
+
+static int
+zsbt_binsrch_internal(zstid key, ZSBtreeInternalPageItem *arr, int arr_elems)
+{
+	int			low,
+		high,
+		mid;
+
+	low = 0;
+	high = arr_elems;
+	while (high > low)
+	{
+		mid = low + (high - low) / 2;
+
+		if (key >= arr[mid].tid)
+			low = mid + 1;
+		else
+			high = mid;
+	}
+	return low - 1;
+}
diff --git a/src/backend/access/zedstore/zedstore_compression.c b/src/backend/access/zedstore/zedstore_compression.c
new file mode 100644
index 0000000000..2aeac56beb
--- /dev/null
+++ b/src/backend/access/zedstore/zedstore_compression.c
@@ -0,0 +1,362 @@
+/*
+ * zedstore_compression.c
+ *		Routines for compression
+ *
+ * There are two implementations at the moment: LZ4, and the Postgres
+ * pg_lzcompress(). LZ4 support requires that the server was compiled
+ * with --with-lz4.
+ *
+ * The compressor works on ZSUncompressedBtreeItems.
+ *
+ * Compression interface
+ * ---------------------
+ *
+ * Call zs_compress_init() to initialize.
+ *
+ * Call zs_compress_begin(), to begin compressing a group of items. Pass the
+ * maximum amount of space it's allowed to use after compression, as argument.
+ *
+ * Feed them to the compressor one by one with zs_compress_add(), until it
+ * returns false.
+ *
+ * Finally, call zs_compress_finish(). It returns a ZSCompressedBtreeItem,
+ * which contains all the plain items that were added (except for the last one
+ * for which zs_compress_add() returned false)
+ *
+ * Decompression interface
+ * -----------------------
+ *
+ * zs_decompress_chunk() takes a ZSCompressedBtreeItem as argument. It
+ * initializes a "context" with the given chunk.
+ *
+ * Call zs_decompress_read_item() to return the uncompressed items one by one.
+ *
+ *
+ * NOTES:
+ *
+ * Currently, the compressor accepts input, until the *uncompressed* size exceeds
+ * the *compressed* size available. I.e it assumes that the compressed size is never
+ * larger than uncompressed size.
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/zedstore/zedstore_compression.c
+ */
+#include "postgres.h"
+
+#ifdef USE_LZ4
+#include <lz4.h>
+#endif
+
+#include "access/zedstore_compression.h"
+#include "access/zedstore_internal.h"
+#include "common/pg_lzcompress.h"
+#include "utils/datum.h"
+
+
+/*
+ * There are two implementations at the moment: LZ4, and the Postgres
+ * pg_lzcompress(). LZ4 support requires that the server was compiled
+ * with --with-lz4.
+ */
+#ifdef USE_LZ4
+
+/*
+ * Begin compression, with given max compressed size.
+ */
+void
+zs_compress_init(ZSCompressContext *context)
+{
+	context->uncompressedbuffer = palloc(BLCKSZ * 10); // FIXME: arbitrary size
+	context->buffer = palloc(BLCKSZ);
+	context->maxCompressedSize = 0;
+	context->nitems = 0;
+	context->rawsize = 0;
+}
+
+void
+zs_compress_begin(ZSCompressContext *context, int maxCompressedSize)
+{
+	context->buffer = repalloc(context->buffer, maxCompressedSize);
+
+	maxCompressedSize -= offsetof(ZSCompressedBtreeItem, t_payload);
+	if (maxCompressedSize < 0)
+		maxCompressedSize = 0;
+
+	context->maxCompressedSize = maxCompressedSize;
+	context->nitems = 0;
+	context->rawsize = 0;
+}
+
+/*
+ * Try to add some data to the compressed block.
+ *
+ * If it wouldn't fit, return false.
+ */
+bool
+zs_compress_add(ZSCompressContext *context, ZSUncompressedBtreeItem *item)
+{
+	ZSCompressedBtreeItem *chunk = (ZSCompressedBtreeItem *) context->buffer;
+
+	Assert((item->t_flags & ZSBT_COMPRESSED) == 0);
+	Assert(item->t_tid != InvalidZSTid);
+
+	if (LZ4_COMPRESSBOUND(context->rawsize + item->t_size) > context->maxCompressedSize)
+		return false;
+
+	memcpy(context->uncompressedbuffer + context->rawsize, item, item->t_size);
+	if (context->nitems == 0)
+		chunk->t_tid = item->t_tid;
+	chunk->t_lasttid = item->t_tid;
+	context->nitems++;
+	context->rawsize += item->t_size;
+
+	return true;
+}
+
+ZSCompressedBtreeItem *
+zs_compress_finish(ZSCompressContext *context)
+{
+	ZSCompressedBtreeItem *chunk = (ZSCompressedBtreeItem *) context->buffer;
+	int32		compressed_size;
+
+	compressed_size = LZ4_compress_default(context->uncompressedbuffer,
+										   chunk->t_payload,
+										   context->rawsize,
+										   context->maxCompressedSize);
+	if (compressed_size < 0)
+		elog(ERROR, "compression failed. what now?");
+
+	chunk->t_size = offsetof(ZSCompressedBtreeItem, t_payload) + compressed_size;
+	chunk->t_flags = ZSBT_COMPRESSED;
+	chunk->t_uncompressedsize = context->rawsize;
+
+	return chunk;
+}
+
+void
+zs_compress_free(ZSCompressContext *context)
+{
+	pfree(context->uncompressedbuffer);
+	pfree(context->buffer);
+}
+
+void
+zs_decompress_init(ZSDecompressContext *context)
+{
+	context->buffer = NULL;
+	context->bufsize = 0;
+	context->uncompressedsize = 0;
+}
+
+void
+zs_decompress_chunk(ZSDecompressContext *context, ZSCompressedBtreeItem *chunk)
+{
+	Assert((chunk->t_flags & ZSBT_COMPRESSED) != 0);
+	Assert(chunk->t_uncompressedsize > 0);
+	if (context->bufsize < chunk->t_uncompressedsize)
+	{
+		if (context->buffer)
+			pfree(context->buffer);
+		context->buffer = palloc(chunk->t_uncompressedsize);
+		context->bufsize = chunk->t_uncompressedsize;
+	}
+	context->uncompressedsize = chunk->t_uncompressedsize;
+
+	if (LZ4_decompress_safe(chunk->t_payload,
+							context->buffer,
+							chunk->t_size - offsetof(ZSCompressedBtreeItem, t_payload),
+							context->uncompressedsize) != context->uncompressedsize)
+		elog(ERROR, "could not decompress chunk");
+
+	context->bytesread = 0;
+}
+
+ZSUncompressedBtreeItem *
+zs_decompress_read_item(ZSDecompressContext *context)
+{
+	ZSUncompressedBtreeItem *next;
+
+	if (context->bytesread == context->uncompressedsize)
+		return NULL;
+	next = (ZSUncompressedBtreeItem *) (context->buffer + context->bytesread);
+	if (context->bytesread + next->t_size > context->uncompressedsize)
+		elog(ERROR, "invalid compressed item");
+	context->bytesread += next->t_size;
+
+	Assert(next->t_size >= sizeof(ZSUncompressedBtreeItem));
+	Assert(next->t_tid != InvalidZSTid);
+
+	return next;
+}
+
+void
+zs_decompress_free(ZSDecompressContext *context)
+{
+	if (context->buffer)
+		pfree(context->buffer);
+	context->buffer = NULL;
+	context->bufsize = 0;
+	context->uncompressedsize = 0;
+}
+
+
+#else
+/* PGLZ imlementation */
+
+/*
+ * In the worst case, pg_lz outputs everything as "literals", and emits one
+ * "control byte" ever 8 bytes. Also, it requires 4 bytes extra at the end
+ * of the buffer. And add 10 bytes of slop, for good measure.
+ */
+#define MAX_COMPRESS_EXPANSION_OVERHEAD	(8)
+#define MAX_COMPRESS_EXPANSION_BYTES	(4 + 10)
+
+/*
+ * Begin compression, with given max compressed size.
+ */
+void
+zs_compress_init(ZSCompressContext *context)
+{
+	context->uncompressedbuffer = palloc(BLCKSZ * 10); // FIXME: arbitrary size
+	context->buffer = palloc(BLCKSZ);
+	context->maxCompressedSize = 0;
+	context->maxUncompressedSize = 0;
+	context->nitems = 0;
+	context->rawsize = 0;
+}
+
+void
+zs_compress_begin(ZSCompressContext *context, int maxCompressedSize)
+{
+	int			maxUncompressedSize;
+
+	context->buffer = repalloc(context->buffer, maxCompressedSize + 4 /* LZ slop */);
+
+	context->maxCompressedSize = maxCompressedSize;
+
+	/* determine the max uncompressed size */
+	maxUncompressedSize = maxCompressedSize;
+	maxUncompressedSize -= offsetof(ZSCompressedBtreeItem, t_payload);
+	maxUncompressedSize -= maxUncompressedSize / MAX_COMPRESS_EXPANSION_OVERHEAD;
+	maxUncompressedSize -= MAX_COMPRESS_EXPANSION_BYTES;
+	if (maxUncompressedSize < 0)
+		maxUncompressedSize = 0;
+	context->maxUncompressedSize = maxUncompressedSize;
+	context->nitems = 0;
+	context->rawsize = 0;
+}
+
+/*
+ * Try to add some data to the compressed block.
+ *
+ * If it wouldn't fit, return false.
+ */
+bool
+zs_compress_add(ZSCompressContext *context, ZSUncompressedBtreeItem *item)
+{
+	ZSCompressedBtreeItem *chunk = (ZSCompressedBtreeItem *) context->buffer;
+
+	Assert ((item->t_flags & ZSBT_COMPRESSED) == 0);
+
+	if (context->rawsize + item->t_size > context->maxUncompressedSize)
+		return false;
+
+	memcpy(context->uncompressedbuffer + context->rawsize, item, item->t_size);
+	if (context->nitems == 0)
+		chunk->t_tid = item->t_tid;
+	chunk->t_lasttid = item->t_tid;
+	context->nitems++;
+	context->rawsize += item->t_size;
+
+	return true;
+}
+
+ZSCompressedBtreeItem *
+zs_compress_finish(ZSCompressContext *context)
+{
+	ZSCompressedBtreeItem *chunk = (ZSCompressedBtreeItem *) context->buffer;
+	int32		compressed_size;
+
+	compressed_size = pglz_compress(context->uncompressedbuffer, context->rawsize,
+									chunk->t_payload,
+									PGLZ_strategy_always);
+	if (compressed_size < 0)
+		elog(ERROR, "compression failed. what now?");
+
+	chunk->t_size = offsetof(ZSCompressedBtreeItem, t_payload) + compressed_size;
+	chunk->t_flags = ZSBT_COMPRESSED;
+	chunk->t_uncompressedsize = context->rawsize;
+
+	return chunk;
+}
+
+void
+zs_compress_free(ZSCompressContext *context)
+{
+	pfree(context->uncompressedbuffer);
+	pfree(context->buffer);
+}
+
+void
+zs_decompress_init(ZSDecompressContext *context)
+{
+	context->buffer = NULL;
+	context->bufsize = 0;
+	context->uncompressedsize = 0;
+}
+
+void
+zs_decompress_chunk(ZSDecompressContext *context, ZSCompressedBtreeItem *chunk)
+{
+	Assert((chunk->t_flags & ZSBT_COMPRESSED) != 0);
+	Assert(chunk->t_uncompressedsize > 0);
+	if (context->bufsize < chunk->t_uncompressedsize)
+	{
+		if (context->buffer)
+			pfree(context->buffer);
+		context->buffer = palloc(chunk->t_uncompressedsize);
+		context->bufsize = chunk->t_uncompressedsize;
+	}
+	context->uncompressedsize = chunk->t_uncompressedsize;
+
+	if (pglz_decompress(chunk->t_payload,
+						chunk->t_size - offsetof(ZSCompressedBtreeItem, t_payload),
+						context->buffer,
+						context->uncompressedsize, true) != context->uncompressedsize)
+		elog(ERROR, "could not decompress chunk");
+
+	context->bytesread = 0;
+}
+
+ZSUncompressedBtreeItem *
+zs_decompress_read_item(ZSDecompressContext *context)
+{
+	ZSUncompressedBtreeItem *next;
+
+	if (context->bytesread == context->uncompressedsize)
+		return NULL;
+	next = (ZSUncompressedBtreeItem *) (context->buffer + context->bytesread);
+	if (context->bytesread + next->t_size > context->uncompressedsize)
+		elog(ERROR, "invalid compressed item");
+	context->bytesread += next->t_size;
+
+	Assert(next->t_size >= sizeof(ZSUncompressedBtreeItem));
+	Assert(next->t_tid != InvalidZSTid);
+
+	return next;
+}
+
+void
+zs_decompress_free(ZSDecompressContext *context)
+{
+	if (context->buffer)
+		pfree(context->buffer);
+	context->buffer = NULL;
+	context->bufsize = 0;
+	context->uncompressedsize = 0;
+}
+
+#endif		/* !USE_LZ4 */
diff --git a/src/backend/access/zedstore/zedstore_inspect.c b/src/backend/access/zedstore/zedstore_inspect.c
new file mode 100644
index 0000000000..b7c18bc04c
--- /dev/null
+++ b/src/backend/access/zedstore/zedstore_inspect.c
@@ -0,0 +1,445 @@
+/*-------------------------------------------------------------------------
+ *
+ * zedstoream_inspect.c
+ *	  Debugging functions, for viewing ZedStore page contents
+ *
+ * These should probably be moved to contrib/, but it's handy to have them
+ * here during development.
+ *
+ * Example queries
+ * ---------------
+ *
+ * How many pages of each type a table has?
+ *
+ * select count(*), pg_zs_page_type('t_zedstore', g)
+ *   from generate_series(0, pg_table_size('t_zedstore') / 8192 - 1) g group by 2;
+ *
+ *  count | pg_zs_page_type 
+ * -------+-----------------
+ *      1 | META
+ *   3701 | BTREE
+ *      6 | UNDO
+ * (3 rows)
+ *
+ * Compression ratio of B-tree leaf pages (other pages are not compressed):
+ *
+ * select sum(uncompressedsz::numeric) / sum(totalsz) as compratio
+ *   from pg_zs_btree_pages('t_zedstore') ;
+ *      compratio      
+ * --------------------
+ *  3.6623829559208134
+ * (1 row)
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/zedstore/zedstoream_inspect.c
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <math.h>
+
+#include "miscadmin.h"
+
+#include "access/relscan.h"
+#include "access/table.h"
+#include "access/zedstore_internal.h"
+#include "access/zedstore_undo.h"
+#include "commands/vacuum.h"
+#include "funcapi.h"
+#include "utils/builtins.h"
+#include "utils/rel.h"
+
+Datum pg_zs_page_type(PG_FUNCTION_ARGS);
+Datum pg_zs_undo_pages(PG_FUNCTION_ARGS);
+Datum pg_zs_btree_pages(PG_FUNCTION_ARGS);
+
+Datum
+pg_zs_page_type(PG_FUNCTION_ARGS)
+{
+	Oid			relid = PG_GETARG_OID(0);
+	uint64		pageno = PG_GETARG_INT64(1);
+	Relation	rel;
+	uint16		zs_page_id;
+	Buffer		buf;
+	Page		page;
+	char	   *result;
+
+	if (!superuser())
+		ereport(ERROR,
+				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+				 (errmsg("must be superuser to use zedstore inspection functions"))));
+
+	rel = table_open(relid, AccessShareLock);
+
+	/*
+	 * Reject attempts to read non-local temporary relations; we would be
+	 * likely to get wrong data since we have no visibility into the owning
+	 * session's local buffers.
+	 */
+	if (RELATION_IS_OTHER_TEMP(rel))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("cannot access temporary tables of other sessions")));
+
+	buf = ReadBuffer(rel, pageno);
+	LockBuffer(buf, BUFFER_LOCK_SHARE);
+	page = BufferGetPage(buf);
+
+	zs_page_id = *((uint16 *) ((char *) page + BLCKSZ - sizeof(uint16)));
+
+	UnlockReleaseBuffer(buf);
+				  
+	table_close(rel, AccessShareLock);
+
+	switch (zs_page_id)
+	{
+		case ZS_META_PAGE_ID:
+			result = "META";
+			break;
+		case ZS_BTREE_PAGE_ID:
+			result = "BTREE";
+			break;
+		case ZS_UNDO_PAGE_ID:
+			result = "UNDO";
+			break;
+		case ZS_TOAST_PAGE_ID:
+			result = "UNDO";
+			break;
+		default:
+			result = psprintf("UNKNOWN 0x%04x", zs_page_id);
+	}
+
+	PG_RETURN_TEXT_P(cstring_to_text(result));
+}
+
+/*
+ *  blkno int8
+ *  nrecords int4
+ *  freespace int4
+ *  firstrecptr int8
+ *  lastrecptr int8
+ */
+Datum
+pg_zs_undo_pages(PG_FUNCTION_ARGS)
+{
+	Oid			relid = PG_GETARG_OID(0);
+	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	Relation	rel;
+	Buffer		metabuf;
+	Page		metapage;
+	ZSMetaPageOpaque *metaopaque;
+	BlockNumber	firstblk;
+	BlockNumber	blkno;
+	char	   *ptr;
+	char	   *endptr;
+	TupleDesc	tupdesc;
+	Tuplestorestate *tupstore;
+	MemoryContext per_query_ctx;
+	MemoryContext oldcontext;
+
+	if (!superuser())
+		ereport(ERROR,
+				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+				 (errmsg("must be superuser to use zedstore inspection functions"))));
+
+	/* check to see if caller supports us returning a tuplestore */
+	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("set-valued function called in context that cannot accept a set")));
+	if (!(rsinfo->allowedModes & SFRM_Materialize))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("materialize mode required, but it is not " \
+						"allowed in this context")));
+
+	/* Switch into long-lived context to construct returned data structures */
+	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+	oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+	/* Build a tuple descriptor for our result type */
+	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+		elog(ERROR, "return type must be a row type");
+
+	tupstore = tuplestore_begin_heap(true, false, work_mem);
+	rsinfo->returnMode = SFRM_Materialize;
+	rsinfo->setResult = tupstore;
+	rsinfo->setDesc = tupdesc;
+
+	MemoryContextSwitchTo(oldcontext);
+
+	rel = table_open(relid, AccessShareLock);
+
+	/*
+	 * Reject attempts to read non-local temporary relations; we would be
+	 * likely to get wrong data since we have no visibility into the owning
+	 * session's local buffers.
+	 */
+	if (RELATION_IS_OTHER_TEMP(rel))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("cannot access temporary tables of other sessions")));
+
+	/*
+	 * Get the current oldest undo page from the metapage.
+	 */
+	metabuf = ReadBuffer(rel, ZS_META_BLK);
+	metapage = BufferGetPage(metabuf);
+	LockBuffer(metabuf, BUFFER_LOCK_SHARE);
+	metaopaque = (ZSMetaPageOpaque *) PageGetSpecialPointer(metapage);
+
+	firstblk = metaopaque->zs_undo_head;
+
+	UnlockReleaseBuffer(metabuf);
+
+	/*
+	 * Loop through UNDO records, starting from the oldest page.
+	 */
+	blkno = firstblk;
+	while (blkno != InvalidBlockNumber)
+	{
+		Datum		values[5];
+		bool		nulls[5];
+		Buffer		buf;
+		Page		page;
+		ZSUndoPageOpaque *opaque;
+		int			nrecords;
+		ZSUndoRecPtr firstptr = { 0, 0, 0 };
+		ZSUndoRecPtr lastptr = { 0, 0, 0 };
+
+		memset(values, 0, sizeof(values));
+		memset(nulls, 0, sizeof(nulls));
+
+		CHECK_FOR_INTERRUPTS();
+
+		/* Read the UNDO page */
+		buf = ReadBuffer(rel, blkno);
+		page = BufferGetPage(buf);
+		LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+		opaque = (ZSUndoPageOpaque *) PageGetSpecialPointer(page);
+
+		if (opaque->zs_page_id != ZS_UNDO_PAGE_ID)
+		{
+			elog(WARNING, "unexpected page id on UNDO page %u", blkno);
+			break;
+		}
+
+		/* loop through all records on the page */
+		endptr = (char *) page + ((PageHeader) page)->pd_lower;
+		ptr = (char *) page + SizeOfPageHeaderData;
+		nrecords = 0;
+		while (ptr < endptr)
+		{
+			ZSUndoRec *undorec = (ZSUndoRec *) ptr;
+
+			Assert(undorec->undorecptr.blkno == blkno);
+
+			lastptr = undorec->undorecptr;
+			if (nrecords == 0)
+				firstptr = lastptr;
+			nrecords++;
+
+			ptr += undorec->size;
+		}
+
+		values[0] = Int64GetDatum(blkno);
+		values[1] = Int32GetDatum(nrecords);
+		values[2] = Int32GetDatum(PageGetExactFreeSpace(page));
+		values[3] = Int64GetDatum(firstptr.counter);
+		values[4] = Int64GetDatum(lastptr.counter);
+
+		blkno = opaque->next;
+		UnlockReleaseBuffer(buf);
+
+		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+	}
+	tuplestore_donestoring(tupstore);
+
+	table_close(rel, AccessShareLock);
+
+	return (Datum) 0;
+}
+
+
+/*
+ *  blkno int8
+ *  nextblk int8
+ *  attno int4
+ *  level int4
+ *  
+ *  lokey int8
+ *  hikey int8
+
+ *  nitems int4
+ *  ncompressed int4
+ *  totalsz int4
+ *  uncompressedsz int4
+ *  freespace int4
+ */
+Datum
+pg_zs_btree_pages(PG_FUNCTION_ARGS)
+{
+	Oid			relid = PG_GETARG_OID(0);
+	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	Relation	rel;
+	BlockNumber	blkno;
+	BlockNumber	nblocks;
+	TupleDesc	tupdesc;
+	Tuplestorestate *tupstore;
+	MemoryContext per_query_ctx;
+	MemoryContext oldcontext;
+
+	if (!superuser())
+		ereport(ERROR,
+				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+				 (errmsg("must be superuser to use zedstore inspection functions"))));
+
+	/* check to see if caller supports us returning a tuplestore */
+	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("set-valued function called in context that cannot accept a set")));
+	if (!(rsinfo->allowedModes & SFRM_Materialize))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("materialize mode required, but it is not " \
+						"allowed in this context")));
+
+	/* Switch into long-lived context to construct returned data structures */
+	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+	oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+	/* Build a tuple descriptor for our result type */
+	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+		elog(ERROR, "return type must be a row type");
+
+	tupstore = tuplestore_begin_heap(true, false, work_mem);
+	rsinfo->returnMode = SFRM_Materialize;
+	rsinfo->setResult = tupstore;
+	rsinfo->setDesc = tupdesc;
+
+	MemoryContextSwitchTo(oldcontext);
+
+	rel = table_open(relid, AccessShareLock);
+
+	/*
+	 * Reject attempts to read non-local temporary relations; we would be
+	 * likely to get wrong data since we have no visibility into the owning
+	 * session's local buffers.
+	 */
+	if (RELATION_IS_OTHER_TEMP(rel))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("cannot access temporary tables of other sessions")));
+
+	nblocks = RelationGetNumberOfBlocks(rel);
+
+	/* scan all blocks in physical order */
+	for (blkno = 1; blkno < nblocks; blkno++)
+	{
+		Datum		values[11];
+		bool		nulls[11];
+		OffsetNumber off;
+		OffsetNumber maxoff;
+		Buffer		buf;
+		Page		page;
+		ZSBtreePageOpaque *opaque;
+		int			nitems;
+		int			ncompressed;
+		int			totalsz;
+		int			uncompressedsz;
+
+		memset(values, 0, sizeof(values));
+		memset(nulls, 0, sizeof(nulls));
+
+		CHECK_FOR_INTERRUPTS();
+
+		/* Read the page */
+		buf = ReadBuffer(rel, blkno);
+		page = BufferGetPage(buf);
+		LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+
+		/*
+		 * we're only interested in B-tree pages. (Presumably, most of the
+		 * pages in the relation are b-tree pages, so it makes sense to
+		 * scan the whole relation in physical order)
+		 */
+		if (PageGetSpecialSize(page) != MAXALIGN(sizeof(ZSBtreePageOpaque)))
+		{
+			UnlockReleaseBuffer(buf);
+			continue;
+		}
+		opaque = (ZSBtreePageOpaque *) PageGetSpecialPointer(page);
+		if (opaque->zs_page_id != ZS_BTREE_PAGE_ID)
+		{
+			UnlockReleaseBuffer(buf);
+			continue;
+		}
+
+		nitems = 0;
+		ncompressed = 0;
+		totalsz = 0;
+		uncompressedsz = 0;
+		if (opaque->zs_level == 0)
+		{
+			/* leaf page */
+			maxoff = PageGetMaxOffsetNumber(page);
+			for (off = FirstOffsetNumber; off <= maxoff; off++)
+			{
+				ItemId		iid = PageGetItemId(page, off);
+				ZSBtreeItem	*item = (ZSBtreeItem *) PageGetItem(page, iid);
+
+				nitems++;
+				totalsz += item->t_size;
+
+				if ((item->t_flags & ZSBT_COMPRESSED) != 0)
+				{
+					ZSCompressedBtreeItem *citem = (ZSCompressedBtreeItem *) PageGetItem(page, iid);
+
+					ncompressed++;
+					uncompressedsz += citem->t_uncompressedsize;
+				}
+				else
+					uncompressedsz += item->t_size;
+			}
+		}
+		else
+		{
+			/* internal page */
+			nitems = ZSBtreeInternalPageGetNumItems(page);
+		}
+		values[0] = Int64GetDatum(blkno);
+		values[1] = Int64GetDatum(opaque->zs_next);
+		values[2] = Int32GetDatum(opaque->zs_attno);
+		values[3] = Int32GetDatum(opaque->zs_level);
+		values[4] = Int64GetDatum(opaque->zs_lokey);
+		values[5] = Int64GetDatum(opaque->zs_hikey);
+		values[6] = Int32GetDatum(nitems);
+		if (opaque->zs_level == 0)
+		{
+			values[7] = Int32GetDatum(ncompressed);
+			values[8] = Int32GetDatum(totalsz);
+			values[9] = Int32GetDatum(uncompressedsz);
+		}
+		else
+		{
+			nulls[7] = true;
+			nulls[8] = true;
+			nulls[9] = true;
+		}
+		values[10] = Int32GetDatum(PageGetExactFreeSpace(page));
+
+		UnlockReleaseBuffer(buf);
+
+		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+	}
+	tuplestore_donestoring(tupstore);
+
+	table_close(rel, AccessShareLock);
+
+	return (Datum) 0;
+}
diff --git a/src/backend/access/zedstore/zedstore_meta.c b/src/backend/access/zedstore/zedstore_meta.c
new file mode 100644
index 0000000000..b58b3bb31c
--- /dev/null
+++ b/src/backend/access/zedstore/zedstore_meta.c
@@ -0,0 +1,218 @@
+/*
+ * zedstore_meta.c
+ *		Routines for handling ZedStore metapage
+ *
+ * The metapage holds a directory of B-tree root block numbers, one for each
+ * column.
+ *
+ * TODO:
+ * - support ALTER TABLE ADD COLUMN.
+ * - extend the root block dir to an overflow page if there are too many
+ *   attributes to fit on one page
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/zedstore/zedstore_meta.c
+ */
+#include "postgres.h"
+
+#include "access/itup.h"
+#include "access/zedstore_internal.h"
+#include "storage/bufmgr.h"
+#include "storage/lmgr.h"
+#include "utils/rel.h"
+
+static void zs_initmetapage(Relation rel);
+
+/*
+ * Allocate a new page.
+ *
+ * The page is exclusive-locked, but not initialized.
+ *
+ * Currently, this just extends the relation, but we should have a free space
+ * map of some kind.
+ */
+Buffer
+zs_getnewbuf(Relation rel)
+{
+	Buffer		buf;
+	bool		needLock;
+
+	/*
+	 * Extend the relation by one page.
+	 *
+	 * We have to use a lock to ensure no one else is extending the rel at
+	 * the same time, else we will both try to initialize the same new
+	 * page.  We can skip locking for new or temp relations, however,
+	 * since no one else could be accessing them.
+	 */
+	needLock = !RELATION_IS_LOCAL(rel);
+
+	if (needLock)
+		LockRelationForExtension(rel, ExclusiveLock);
+
+	buf = ReadBuffer(rel, P_NEW);
+
+	/* Acquire buffer lock on new page */
+	LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+
+	/*
+	 * Release the file-extension lock; it's now OK for someone else to
+	 * extend the relation some more.  Note that we cannot release this
+	 * lock before we have buffer lock on the new page, or we risk a race
+	 * condition against btvacuumscan --- see comments therein.
+	 */
+	if (needLock)
+		UnlockRelationForExtension(rel, ExclusiveLock);
+
+	return buf;
+}
+
+/*
+ * Initialize the metapage for an empty relation.
+ */
+static void
+zs_initmetapage(Relation rel)
+{
+	int			natts = RelationGetNumberOfAttributes(rel);
+	Buffer		buf;
+	Page		page;
+	ZSMetaPage *metapg;
+	ZSMetaPageOpaque *opaque;
+	Size		freespace;
+	int			maxatts;
+
+	buf = ReadBuffer(rel, P_NEW);
+	if (BufferGetBlockNumber(buf) != ZS_META_BLK)
+		elog(ERROR, "index is not empty");
+	page = BufferGetPage(buf);
+	LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+
+	PageInit(page, BLCKSZ, sizeof(ZSMetaPageOpaque));
+
+	/* Initialize the attribute root dir */
+	freespace = PageGetExactFreeSpace(page);
+	maxatts = freespace / sizeof(BlockNumber);
+	if (natts > maxatts)
+	{
+		/*
+		 * The root block directory must fit on the metapage.
+		 *
+		 * TODO: We could extend this by overflowing to another page.
+		 */
+		elog(ERROR, "too many attributes for zedstore");
+	}
+
+	metapg = (ZSMetaPage *) PageGetContents(page);
+	metapg->nattributes = natts;
+	for (int i = 0; i < natts; i++)
+		metapg->roots[i] = InvalidBlockNumber;
+	((PageHeader) page)->pd_lower += natts * sizeof(BlockNumber);
+
+	opaque = (ZSMetaPageOpaque *) PageGetSpecialPointer(page);
+	opaque->zs_flags = 0;
+	opaque->zs_page_id = ZS_META_PAGE_ID;
+
+	/* UNDO-related fields */
+	opaque->zs_undo_counter = 1; /* start at 1, so that 0 is always "old" */
+	opaque->zs_undo_head = InvalidBlockNumber;
+	opaque->zs_undo_tail = InvalidBlockNumber;
+	opaque->zs_undo_oldestptr.counter = 1;
+
+	MarkBufferDirty(buf);
+	/* TODO: WAL-log */
+
+	UnlockReleaseBuffer(buf);
+}
+
+/*
+ * Get the block number of the b-tree root for given attribute.
+ *
+ * If 'forupdate' is true, and the root doesn't exist yet (ie. it's an empty
+ * table), a new root is allocated. Otherwise, returns InvalidBlockNumber if
+ * the root doesn't exist.
+ */
+BlockNumber
+zsmeta_get_root_for_attribute(Relation rel, AttrNumber attno, bool forupdate)
+{
+	Buffer		metabuf;
+	ZSMetaPage *metapg;
+	BlockNumber	rootblk;
+
+	if (RelationGetNumberOfBlocks(rel) == 0)
+	{
+		if (!forupdate)
+			return InvalidBlockNumber;
+
+		zs_initmetapage(rel);
+	}
+
+	metabuf = ReadBuffer(rel, ZS_META_BLK);
+
+	/* TODO: get share lock to begin with */
+	LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
+	metapg = (ZSMetaPage *) PageGetContents(BufferGetPage(metabuf));
+
+	if (attno <= 0 || attno > metapg->nattributes)
+		elog(ERROR, "invalid attribute number %d (table has only %d attributes)", attno, metapg->nattributes);
+
+	rootblk = metapg->roots[attno - 1];
+
+	if (forupdate && rootblk == InvalidBlockNumber)
+	{
+		/* try to allocate one */
+		Buffer		rootbuf;
+		Page		rootpage;
+		ZSBtreePageOpaque *opaque;
+
+		/* TODO: release lock on metapage while we do I/O */
+		rootbuf = zs_getnewbuf(rel);
+		rootblk = BufferGetBlockNumber(rootbuf);
+
+		metapg->roots[attno - 1] = rootblk;
+
+		/* initialize the page to look like a root leaf */
+		rootpage = BufferGetPage(rootbuf);
+		PageInit(rootpage, BLCKSZ, sizeof(ZSBtreePageOpaque));
+		opaque = ZSBtreePageGetOpaque(rootpage);
+		opaque->zs_attno = attno;
+		opaque->zs_next = InvalidBlockNumber;
+		opaque->zs_lokey = MinZSTid;
+		opaque->zs_hikey = MaxPlusOneZSTid;
+		opaque->zs_level = 0;
+		opaque->zs_flags = 0;
+		opaque->zs_page_id = ZS_BTREE_PAGE_ID;
+
+		MarkBufferDirty(rootbuf);
+		MarkBufferDirty(metabuf);
+		/* TODO: WAL-log both pages */
+
+		UnlockReleaseBuffer(rootbuf);
+	}
+
+	UnlockReleaseBuffer(metabuf);
+
+	return rootblk;
+}
+
+/*
+ *
+ * Caller is responsible for WAL-logging this.
+ */
+void
+zsmeta_update_root_for_attribute(Relation rel, AttrNumber attno,
+								 Buffer metabuf, BlockNumber rootblk)
+{
+	ZSMetaPage *metapg;
+
+	metapg = (ZSMetaPage *) PageGetContents(BufferGetPage(metabuf));
+
+	if (attno <= 0 || attno > metapg->nattributes)
+		elog(ERROR, "invalid attribute number %d (table has only %d attributes)", attno, metapg->nattributes);
+
+	metapg->roots[attno - 1] = rootblk;
+
+	MarkBufferDirty(metabuf);
+}
diff --git a/src/backend/access/zedstore/zedstore_toast.c b/src/backend/access/zedstore/zedstore_toast.c
new file mode 100644
index 0000000000..f1e90f0b4b
--- /dev/null
+++ b/src/backend/access/zedstore/zedstore_toast.c
@@ -0,0 +1,188 @@
+/*
+ * zedstore_toast.c
+ *		Routines for Toasting oversized tuples in Zedstore
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/zedstore/zedstore_toast.c
+ */
+#include "postgres.h"
+
+#include "access/zedstore_compression.h"
+#include "access/zedstore_internal.h"
+#include "storage/bufmgr.h"
+#include "utils/datum.h"
+#include "utils/rel.h"
+
+/*
+ * Toast a datum, inside the ZedStore file.
+ *
+ * This is similar to regular toasting, but instead of using a separate index and
+ * heap, the datum is stored within the same ZedStore file as all the btrees and
+ * stuff. A chain of "toast-pages" is allocated for the datum, and each page is filled
+ * with as much of the datum as possible.
+ *
+ *
+ * Note: You must call zedstore_toast_finish() after this,
+ * to set the TID in the toast-chain's first block. Otherwise, it's considered recyclable.
+ */
+Datum
+zedstore_toast_datum(Relation rel, AttrNumber attno, Datum value)
+{
+	varatt_zs_toastptr *toastptr;
+	BlockNumber firstblk = InvalidBlockNumber;
+	Buffer		buf = InvalidBuffer;
+	Page		page;
+	ZSToastPageOpaque *opaque;
+	Buffer		prevbuf = InvalidBuffer;
+	ZSToastPageOpaque *prevopaque = NULL;
+	char	   *ptr;
+	int32		total_size;
+	int32		offset;
+
+	/* TODO: try to compress it in place first. Maybe just call toast_compress_datum? */
+
+	/*
+	 * If that doesn't reduce it enough, allocate a toast page
+	 * for it.
+	 */
+	ptr = VARDATA_ANY(value);
+	total_size = VARSIZE_ANY_EXHDR(value);
+	offset = 0;
+
+	while (total_size - offset > 0)
+	{
+		Size		thisbytes;
+
+		buf = zs_getnewbuf(rel);
+		if (prevbuf == InvalidBuffer)
+			firstblk = BufferGetBlockNumber(buf);
+
+		page = BufferGetPage(buf);
+		PageInit(page, BLCKSZ, sizeof(ZSToastPageOpaque));
+
+		thisbytes = Min(total_size - offset, PageGetExactFreeSpace(page));
+
+		opaque = (ZSToastPageOpaque *) PageGetSpecialPointer(page);
+		opaque->zs_attno = attno;
+		opaque->zs_tid = InvalidZSTid;
+		opaque->zs_total_size = total_size;
+		opaque->zs_slice_offset = offset;
+		opaque->zs_prev = BufferIsValid(prevbuf) ? BufferGetBlockNumber(prevbuf) : InvalidBlockNumber;
+		opaque->zs_next = InvalidBlockNumber;
+		opaque->zs_flags = 0;
+		opaque->zs_page_id = ZS_TOAST_PAGE_ID;
+
+		memcpy((char *) page + SizeOfPageHeaderData, ptr, thisbytes);
+		((PageHeader) page)->pd_lower += thisbytes;
+		ptr += thisbytes;
+		offset += thisbytes;
+
+		if (prevbuf != InvalidBuffer)
+		{
+			prevopaque->zs_next = BufferGetBlockNumber(buf);
+			MarkBufferDirty(prevbuf);
+		}
+
+		/* TODO: WAL-log */
+		MarkBufferDirty(buf);
+
+		if (prevbuf != InvalidBuffer)
+			UnlockReleaseBuffer(prevbuf);
+		prevbuf = buf;
+		prevopaque = opaque;
+	}
+
+	UnlockReleaseBuffer(buf);
+
+	toastptr = palloc0(sizeof(varatt_zs_toastptr));
+	SET_VARTAG_1B_E(toastptr, VARTAG_ZEDSTORE);
+	toastptr->zst_block = firstblk;
+
+	return PointerGetDatum(toastptr);
+}
+
+void
+zedstore_toast_finish(Relation rel, AttrNumber attno, Datum toasted, zstid tid)
+{
+	varatt_zs_toastptr *toastptr = (varatt_zs_toastptr *) DatumGetPointer(toasted);
+	Buffer		buf;
+	Page		page;
+	ZSToastPageOpaque *opaque;
+
+	Assert(toastptr->va_tag == VARTAG_ZEDSTORE);
+
+	buf = ReadBuffer(rel, toastptr->zst_block);
+	page = BufferGetPage(buf);
+	LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+	opaque = (ZSToastPageOpaque *) PageGetSpecialPointer(page);
+
+	Assert(opaque->zs_tid == InvalidZSTid);
+	Assert(opaque->zs_attno == attno);
+	Assert(opaque->zs_prev == InvalidBlockNumber);
+
+	opaque->zs_tid = tid;
+
+	/* TODO: WAL-log */
+	MarkBufferDirty(buf);
+
+	UnlockReleaseBuffer(buf);
+}
+
+Datum
+zedstore_toast_flatten(Relation rel, AttrNumber attno, zstid tid, Datum toasted)
+{
+	varatt_zs_toastptr *toastptr = (varatt_zs_toastptr *) DatumGetPointer(toasted);
+	BlockNumber	nextblk;
+	BlockNumber	prevblk;
+	char	   *result = NULL;
+	char	   *ptr = NULL;
+	int32		total_size = 0;
+
+	Assert(toastptr->va_tag == VARTAG_ZEDSTORE);
+
+	prevblk = InvalidBlockNumber;
+	nextblk = toastptr->zst_block;
+
+	while (nextblk != InvalidBlockNumber)
+	{
+		Buffer		buf;
+		Page		page;
+		ZSToastPageOpaque *opaque;
+		uint32		size;
+
+		buf = ReadBuffer(rel, nextblk);
+		page = BufferGetPage(buf);
+		LockBuffer(buf, BUFFER_LOCK_SHARE);
+
+		opaque = (ZSToastPageOpaque *) PageGetSpecialPointer(page);
+
+		Assert(opaque->zs_attno == attno);
+		Assert(opaque->zs_prev == prevblk);
+
+		if (prevblk == InvalidBlockNumber)
+		{
+			Assert(opaque->zs_tid == tid);
+
+			total_size = opaque->zs_total_size;
+
+			result = palloc(total_size + VARHDRSZ);
+			SET_VARSIZE(result, total_size + VARHDRSZ);
+			ptr = result + VARHDRSZ;
+		}
+
+		size = ((PageHeader) page)->pd_lower - SizeOfPageHeaderData;
+		memcpy(ptr, (char *) page + SizeOfPageHeaderData, size);
+		ptr += size;
+
+		prevblk = nextblk;
+		nextblk = opaque->zs_next;
+		UnlockReleaseBuffer(buf);
+	}
+	Assert(total_size > 0);
+	Assert(ptr == result + total_size + VARHDRSZ);
+
+	return PointerGetDatum(result);
+}
diff --git a/src/backend/access/zedstore/zedstore_undo.c b/src/backend/access/zedstore/zedstore_undo.c
new file mode 100644
index 0000000000..65bf67137e
--- /dev/null
+++ b/src/backend/access/zedstore/zedstore_undo.c
@@ -0,0 +1,783 @@
+/*
+ * zedstore_undo.c
+ *		Temporary UNDO-logging for zedstore.
+ *
+ * XXX: This is hopefully replaced with an upstream UNDO facility later.
+ *
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/zedstore/zedstore_undo.c
+ */
+#include "postgres.h"
+
+#include "access/genam.h"
+#include "access/multixact.h"
+#include "access/zedstore_internal.h"
+#include "access/zedstore_undo.h"
+#include "commands/progress.h"
+#include "commands/vacuum.h"
+#include "miscadmin.h"
+#include "postmaster/autovacuum.h"
+#include "pgstat.h"
+#include "utils/memutils.h"
+#include "utils/pg_rusage.h"
+#include "utils/rel.h"
+#include "utils/lsyscache.h"
+
+/*
+ * Working area for zsundo_scan().
+ */
+typedef struct ZSUndoTrimStats
+{
+	/* List of TIDs of tuples we intend to delete */
+	/* NB: this list is ordered by TID address */
+	int			num_dead_tuples;	/* current # of entries */
+	int			max_dead_tuples;	/* # slots allocated in array */
+	ItemPointer dead_tuples;	/* array of ItemPointerData */
+	bool		dead_tuples_overflowed;
+
+	BlockNumber	deleted_undo_pages;
+
+	bool		can_advance_oldestundorecptr;
+} ZSUndoTrimStats;
+
+/*
+ * Working area for VACUUM.
+ */
+typedef struct ZSVacRelStats
+{
+	int			elevel;
+	BufferAccessStrategy vac_strategy;
+
+	/* hasindex = true means two-pass strategy; false means one-pass */
+	bool		hasindex;
+	/* Overall statistics about rel */
+	BlockNumber old_rel_pages;	/* previous value of pg_class.relpages */
+	BlockNumber rel_pages;		/* total number of pages */
+	BlockNumber scanned_pages;	/* number of pages we examined */
+	BlockNumber pinskipped_pages;	/* # of pages we skipped due to a pin */
+	BlockNumber frozenskipped_pages;	/* # of frozen pages we skipped */
+	BlockNumber tupcount_pages; /* pages whose tuples we counted */
+	double		old_live_tuples;	/* previous value of pg_class.reltuples */
+	double		new_rel_tuples; /* new estimated total # of tuples */
+	double		new_live_tuples;	/* new estimated total # of live tuples */
+	double		new_dead_tuples;	/* new estimated total # of dead tuples */
+	BlockNumber pages_removed;
+	double		tuples_deleted;
+	BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */
+
+	ZSUndoTrimStats trimstats;
+} ZSVacRelStats;
+
+/*
+ * Guesstimation of number of dead tuples per page.  This is used to
+ * provide an upper limit to memory allocated when vacuuming small
+ * tables.
+ */
+#define LAZY_ALLOC_TUPLES		MaxHeapTuplesPerPage
+
+static int zs_vac_cmp_itemptr(const void *left, const void *right);
+static bool zs_lazy_tid_reaped(ItemPointer itemptr, void *state);
+static void lazy_space_alloc(ZSVacRelStats *vacrelstats, BlockNumber relblocks);
+static void lazy_vacuum_index(Relation indrel,
+				  IndexBulkDeleteResult **stats,
+				  ZSVacRelStats *vacrelstats);
+static void lazy_cleanup_index(Relation indrel,
+				   IndexBulkDeleteResult *stats,
+				   ZSVacRelStats *vacrelstats);
+static ZSUndoRecPtr zsundo_scan(Relation rel, TransactionId OldestXmin, ZSUndoTrimStats *trimstats, BlockNumber *oldest_undopage);
+static void zsundo_update_oldest_ptr(Relation rel, ZSUndoRecPtr oldest_undorecptr, BlockNumber oldest_undopage);
+static void zsundo_record_dead_tuple(ZSUndoTrimStats *trimstats, zstid tid);
+
+/*
+ * Insert the given UNDO record to the UNDO log.
+ */
+ZSUndoRecPtr
+zsundo_insert(Relation rel, ZSUndoRec *rec)
+{
+	Buffer		metabuf;
+	Page		metapage;
+	ZSMetaPageOpaque *metaopaque;
+	BlockNumber	tail_blk;
+	Buffer		tail_buf = InvalidBuffer;
+	Page		tail_pg = NULL;
+	ZSUndoPageOpaque *tail_opaque = NULL;
+	char	   *dst;
+	ZSUndoRecPtr undorecptr;
+	int			offset;
+	uint64		undo_counter;
+
+	metabuf = ReadBuffer(rel, ZS_META_BLK);
+	metapage = BufferGetPage(metabuf);
+
+	/* TODO: get share lock to begin with, for more concurrency */
+	LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
+	metaopaque = (ZSMetaPageOpaque *) PageGetSpecialPointer(metapage);
+
+	tail_blk = metaopaque->zs_undo_tail;
+
+	/* Is there space on the tail page? */
+	if (tail_blk != InvalidBlockNumber)
+	{
+		tail_buf = ReadBuffer(rel, tail_blk);
+		LockBuffer(tail_buf, BUFFER_LOCK_EXCLUSIVE);
+		tail_pg = BufferGetPage(tail_buf);
+		tail_opaque = (ZSUndoPageOpaque *) PageGetSpecialPointer(tail_pg);
+	}
+	if (tail_blk == InvalidBlockNumber || PageGetExactFreeSpace(tail_pg) < rec->size)
+	{
+		Buffer 		newbuf;
+		BlockNumber newblk;
+		Page		newpage;
+		ZSUndoPageOpaque *newopaque;
+
+		/* new page */
+		newbuf = zs_getnewbuf(rel);
+		newblk = BufferGetBlockNumber(newbuf);
+		newpage = BufferGetPage(newbuf);
+		PageInit(newpage, BLCKSZ, sizeof(ZSUndoPageOpaque));
+		newopaque = (ZSUndoPageOpaque *) PageGetSpecialPointer(newpage);
+		newopaque->next = InvalidBlockNumber;
+		newopaque->zs_page_id = ZS_UNDO_PAGE_ID;
+
+		metaopaque->zs_undo_tail = newblk;
+		if (tail_blk == InvalidBlockNumber)
+			metaopaque->zs_undo_head = newblk;
+
+		MarkBufferDirty(metabuf);
+
+		if (tail_blk != InvalidBlockNumber)
+		{
+			tail_opaque->next = newblk;
+			MarkBufferDirty(tail_buf);
+			UnlockReleaseBuffer(tail_buf);
+		}
+
+		tail_blk = newblk;
+		tail_buf = newbuf;
+		tail_pg = newpage;
+		tail_opaque = newopaque;
+	}
+
+	undo_counter = metaopaque->zs_undo_counter++;
+	MarkBufferDirty(metabuf);
+
+	UnlockReleaseBuffer(metabuf);
+
+	/* insert the record to this page */
+	offset = ((PageHeader) tail_pg)->pd_lower;
+
+	undorecptr.counter = undo_counter;
+	undorecptr.blkno = tail_blk;
+	undorecptr.offset = offset;
+	rec->undorecptr = undorecptr;
+	dst = ((char *) tail_pg) + offset;
+	memcpy(dst, rec, rec->size);
+	((PageHeader) tail_pg)->pd_lower += rec->size;
+	MarkBufferDirty(tail_buf);
+	UnlockReleaseBuffer(tail_buf);
+
+	return undorecptr;
+}
+
+/*
+ * Fetch the UNDO record with the given undo-pointer.
+ *
+ * The returned record is a palloc'd copy.
+ */
+ZSUndoRec *
+zsundo_fetch(Relation rel, ZSUndoRecPtr undoptr)
+{
+	Buffer		buf;
+	Page		page;
+	PageHeader	pagehdr;
+	ZSUndoPageOpaque *opaque;
+	ZSUndoRec  *undorec;
+	ZSUndoRec  *undorec_copy;
+
+	buf = ReadBuffer(rel, undoptr.blkno);
+	page = BufferGetPage(buf);
+	pagehdr = (PageHeader) page;
+
+	LockBuffer(buf, BUFFER_LOCK_SHARE);
+	opaque = (ZSUndoPageOpaque *) PageGetSpecialPointer(page);
+	if (opaque->zs_page_id != ZS_UNDO_PAGE_ID)
+		elog(ERROR, "could not find UNDO record " UINT64_FORMAT " at blk %u offset %u; not an UNDO page",
+			 undoptr.counter, undoptr.blkno, undoptr.offset);
+
+	/* Sanity check that the pointer pointed to a valid place */
+	if (undoptr.offset < SizeOfPageHeaderData ||
+		undoptr.offset + sizeof(ZSUndoRec) > pagehdr->pd_lower)
+		elog(ERROR, "could not find UNDO record " UINT64_FORMAT " at blk %u offset %u",
+			 undoptr.counter, undoptr.blkno, undoptr.offset);
+
+	undorec = (ZSUndoRec *) (((char *) page) + undoptr.offset);
+
+	if (memcmp(&undorec->undorecptr, &undoptr, sizeof(ZSUndoRecPtr)) != 0)
+		elog(ERROR, "could not find UNDO record");
+
+	undorec_copy = palloc(undorec->size);
+	memcpy(undorec_copy, undorec, undorec->size);
+
+	UnlockReleaseBuffer(buf);
+
+	return undorec_copy;
+}
+
+static bool
+zs_lazy_tid_reaped(ItemPointer itemptr, void *state)
+{
+	ZSVacRelStats *vacrelstats = (ZSVacRelStats *) state;
+	ItemPointer res;
+
+	res = (ItemPointer) bsearch((void *) itemptr,
+								(void *) vacrelstats->trimstats.dead_tuples,
+								vacrelstats->trimstats.num_dead_tuples,
+								sizeof(ItemPointerData),
+								zs_vac_cmp_itemptr);
+
+	return (res != NULL);
+}
+
+/*
+ * Comparator routines for use with qsort() and bsearch().
+ */
+static int
+zs_vac_cmp_itemptr(const void *left, const void *right)
+{
+	BlockNumber lblk,
+				rblk;
+	OffsetNumber loff,
+				roff;
+
+	lblk = ItemPointerGetBlockNumber((ItemPointer) left);
+	rblk = ItemPointerGetBlockNumber((ItemPointer) right);
+
+	if (lblk < rblk)
+		return -1;
+	if (lblk > rblk)
+		return 1;
+
+	loff = ItemPointerGetOffsetNumber((ItemPointer) left);
+	roff = ItemPointerGetOffsetNumber((ItemPointer) right);
+
+	if (loff < roff)
+		return -1;
+	if (loff > roff)
+		return 1;
+
+	return 0;
+}
+
+void
+zsundo_vacuum(Relation rel, VacuumParams *params, BufferAccessStrategy bstrategy,
+			  TransactionId OldestXmin)
+{
+	ZSVacRelStats *vacrelstats;
+	ZSUndoTrimStats *trimstats;
+	Relation   *Irel;
+	int			nindexes;
+	IndexBulkDeleteResult **indstats;
+	BlockNumber	nblocks;
+
+	nblocks = RelationGetNumberOfBlocks(rel);
+	if (nblocks == 0)
+		return;		/* empty table */
+
+	vacrelstats = (ZSVacRelStats *) palloc0(sizeof(ZSVacRelStats));
+	trimstats = &vacrelstats->trimstats;
+
+	if (params->options & VACOPT_VERBOSE)
+		vacrelstats->elevel = INFO;
+	else
+		vacrelstats->elevel = DEBUG2;
+	vacrelstats->vac_strategy = bstrategy;
+
+	/* Open all indexes of the relation */
+	vac_open_indexes(rel, RowExclusiveLock, &nindexes, &Irel);
+	vacrelstats->hasindex = (nindexes > 0);
+	indstats = (IndexBulkDeleteResult **)
+		palloc0(nindexes * sizeof(IndexBulkDeleteResult *));
+
+	lazy_space_alloc(vacrelstats, nblocks);
+
+	ereport(vacrelstats->elevel,
+			(errmsg("vacuuming \"%s.%s\"",
+					get_namespace_name(RelationGetNamespace(rel)),
+					RelationGetRelationName(rel))));
+
+	do
+	{
+		ZSUndoRecPtr reaped_upto;
+		BlockNumber oldest_undopage;
+		int			j;
+
+		trimstats->dead_tuples_overflowed = false;
+		trimstats->num_dead_tuples = 0;
+		trimstats->deleted_undo_pages = 0;
+
+		reaped_upto = zsundo_scan(rel, OldestXmin, trimstats, &oldest_undopage);
+
+		if (trimstats->num_dead_tuples > 0)
+		{
+			pg_qsort(trimstats->dead_tuples, trimstats->num_dead_tuples,
+					 sizeof(ItemPointerData), zs_vac_cmp_itemptr);
+			/* TODO: currently, we write a separate UNDO record for each attribute, so there will
+			 * be duplicates. Eliminate them. */
+			j = 1;
+			for (int i = 1; i < trimstats->num_dead_tuples; i++)
+			{
+				if (!ItemPointerEquals(&trimstats->dead_tuples[j - 1],
+									   &trimstats->dead_tuples[i]))
+					trimstats->dead_tuples[j++] = trimstats->dead_tuples[i];
+			}
+			trimstats->num_dead_tuples = j;
+
+			/* Remove index entries */
+			for (int i = 0; i < nindexes; i++)
+				lazy_vacuum_index(Irel[i],
+								  &indstats[i],
+								  vacrelstats);
+
+			/*
+			 * Mark the items as dead in the attribute b-trees.
+			 *
+			 * We cannot remove them immediately, because we must prevent the TIDs from
+			 * being reused, until we have trimmed the UNDO records. Otherwise, this might
+			 * happen:
+			 *
+			 * 1. We remove items from all the B-trees.
+			 * 2. An inserter reuses the now-unused TID for a new tuple
+			 * 3. We abort the VACUUM, for some reason
+			 * 4. We start VACUUM again. We will now try to remove the item again, but
+			 *    we will remove the new item with the same TID instead.
+			 *
+			 * There would be other ways to deal with it. One easy optimization would be
+			 * to leave the DEAD item in only one of the attributes, and remove all others
+			 * completely. Or in step #4, we could refrain from removing items, whose
+			 * UNDO pointers are newer than expected. But that's tricky, because we scan
+			 * the indexes first, and we must refrain from removing index entries for
+			 * new items, too.
+			 */
+			for (int attno = 1; attno <= RelationGetNumberOfAttributes(rel); attno++)
+			{
+				for (int i = 0; i < trimstats->num_dead_tuples; i++)
+					zsbt_mark_item_dead(rel, attno,
+										ZSTidFromItemPointer(trimstats->dead_tuples[i]),
+										reaped_upto);
+			}
+		}
+
+		/*
+		 * The UNDO records for the tuple versions we just removed are no longer
+		 * interesting to anyone. Advance the UNDO tail, so that the UNDO pages
+		 * can be recycled.
+		 */
+		zsundo_update_oldest_ptr(rel, reaped_upto, oldest_undopage);
+
+		ereport(vacrelstats->elevel,
+				(errmsg("\"%s\": removed %d row versions and %d undo pages",
+						RelationGetRelationName(rel),
+						trimstats->num_dead_tuples,
+						trimstats->deleted_undo_pages)));
+	} while(trimstats->dead_tuples_overflowed);
+
+	/* Do post-vacuum cleanup and statistics update for each index */
+	for (int i = 0; i < nindexes; i++)
+		lazy_cleanup_index(Irel[i], indstats[i], vacrelstats);
+
+	/* Done with indexes */
+	vac_close_indexes(nindexes, Irel, NoLock);
+}
+
+
+/*
+ * lazy_space_alloc - space allocation decisions for lazy vacuum
+ *
+ * See the comments at the head of this file for rationale.
+ */
+static void
+lazy_space_alloc(ZSVacRelStats *vacrelstats, BlockNumber relblocks)
+{
+	long		maxtuples;
+	int			vac_work_mem = IsAutoVacuumWorkerProcess() &&
+	autovacuum_work_mem != -1 ?
+	autovacuum_work_mem : maintenance_work_mem;
+
+	if (vacrelstats->hasindex)
+	{
+		maxtuples = (vac_work_mem * 1024L) / sizeof(ItemPointerData);
+		maxtuples = Min(maxtuples, INT_MAX);
+		maxtuples = Min(maxtuples, MaxAllocSize / sizeof(ItemPointerData));
+
+		/* curious coding here to ensure the multiplication can't overflow */
+		if ((BlockNumber) (maxtuples / LAZY_ALLOC_TUPLES) > relblocks)
+			maxtuples = relblocks * LAZY_ALLOC_TUPLES;
+
+		/* stay sane if small maintenance_work_mem */
+		maxtuples = Max(maxtuples, MaxHeapTuplesPerPage);
+	}
+	else
+	{
+		maxtuples = MaxHeapTuplesPerPage;
+	}
+
+	vacrelstats->trimstats.num_dead_tuples = 0;
+	vacrelstats->trimstats.max_dead_tuples = (int) maxtuples;
+	vacrelstats->trimstats.dead_tuples = (ItemPointer)
+		palloc(maxtuples * sizeof(ItemPointerData));
+}
+
+/*
+ *	lazy_vacuum_index() -- vacuum one index relation.
+ *
+ *		Delete all the index entries pointing to tuples listed in
+ *		vacrelstats->dead_tuples, and update running statistics.
+ */
+static void
+lazy_vacuum_index(Relation indrel,
+				  IndexBulkDeleteResult **stats,
+				  ZSVacRelStats *vacrelstats)
+{
+	IndexVacuumInfo ivinfo;
+	PGRUsage	ru0;
+
+	pg_rusage_init(&ru0);
+
+	ivinfo.index = indrel;
+	ivinfo.analyze_only = false;
+	ivinfo.estimated_count = true;
+	ivinfo.message_level = vacrelstats->elevel;
+	/* We can only provide an approximate value of num_heap_tuples here */
+	ivinfo.num_heap_tuples = vacrelstats->old_live_tuples;
+	ivinfo.strategy = vacrelstats->vac_strategy;
+
+	/* Do bulk deletion */
+	*stats = index_bulk_delete(&ivinfo, *stats,
+							   zs_lazy_tid_reaped, (void *) vacrelstats);
+
+	ereport(vacrelstats->elevel,
+			(errmsg("scanned index \"%s\" to remove %d row versions",
+					RelationGetRelationName(indrel),
+					vacrelstats->trimstats.num_dead_tuples),
+			 errdetail_internal("%s", pg_rusage_show(&ru0))));
+}
+
+/*
+ *	lazy_cleanup_index() -- do post-vacuum cleanup for one index relation.
+ */
+static void
+lazy_cleanup_index(Relation indrel,
+				   IndexBulkDeleteResult *stats,
+				   ZSVacRelStats *vacrelstats)
+{
+	IndexVacuumInfo ivinfo;
+	PGRUsage	ru0;
+
+	pg_rusage_init(&ru0);
+
+	ivinfo.index = indrel;
+	ivinfo.analyze_only = false;
+	ivinfo.estimated_count = (vacrelstats->tupcount_pages < vacrelstats->rel_pages);
+	ivinfo.message_level = vacrelstats->elevel;
+
+	/*
+	 * Now we can provide a better estimate of total number of surviving
+	 * tuples (we assume indexes are more interested in that than in the
+	 * number of nominally live tuples).
+	 */
+	ivinfo.num_heap_tuples = vacrelstats->new_rel_tuples;
+	ivinfo.strategy = vacrelstats->vac_strategy;
+
+	stats = index_vacuum_cleanup(&ivinfo, stats);
+
+	if (!stats)
+		return;
+
+	/*
+	 * Now update statistics in pg_class, but only if the index says the count
+	 * is accurate.
+	 */
+	if (!stats->estimated_count)
+		vac_update_relstats(indrel,
+							stats->num_pages,
+							stats->num_index_tuples,
+							0,
+							false,
+							InvalidTransactionId,
+							InvalidMultiXactId,
+							false);
+
+	ereport(vacrelstats->elevel,
+			(errmsg("index \"%s\" now contains %.0f row versions in %u pages",
+					RelationGetRelationName(indrel),
+					stats->num_index_tuples,
+					stats->num_pages),
+			 errdetail("%.0f index row versions were removed.\n"
+					   "%u index pages have been deleted, %u are currently reusable.\n"
+					   "%s.",
+					   stats->tuples_removed,
+					   stats->pages_deleted, stats->pages_free,
+					   pg_rusage_show(&ru0))));
+
+	pfree(stats);
+}
+
+/*
+ * Scan the UNDO log, starting from oldest entry. For every tuple that is
+ * now considered dead, add it to 'dead_tuples'. Records for committed
+ * transactions can be trimmed away immediately.
+ *
+ * Returns the value that the oldest UNDO ptr can be trimmed upto, after
+ * removing all the dead TIDs.
+ *
+ * The caller must initialize ZSUndoTrimStats. This function updates the
+ * counters, and adds dead TIDs that can be removed to trimstats->dead_tuples.
+ * If there are more dead TIDs than fit in the dead_tuples array, this
+ * function sets trimstats->dead_tuples_overflow flag, and stops just before
+ * the UNDO record for the TID that did not fit. An important special case is
+ * calling this with trimstats->max_dead_tuples == 0. In that case, we scan
+ * as much as is possible without scanning the indexes (i.e. only UNDO
+ * records belonging to committed transactions at the tail of the UNDO log).
+ * IOW, it returns the oldest UNDO rec pointer that is still needed by
+ * active snapshots.
+ */
+static ZSUndoRecPtr
+zsundo_scan(Relation rel, TransactionId OldestXmin, ZSUndoTrimStats *trimstats,
+			BlockNumber *oldest_undopage)
+{
+	/* Scan the undo log from oldest to newest */
+	Buffer		metabuf;
+	Page		metapage;
+	ZSMetaPageOpaque *metaopaque;
+	BlockNumber	firstblk;
+	BlockNumber	lastblk;
+	ZSUndoRecPtr oldest_undorecptr;
+	bool		can_advance_oldestundorecptr;
+	char	   *ptr;
+	char	   *endptr;
+
+	/*
+	 * Get the current oldest undo page from the metapage.
+	 */
+	metabuf = ReadBuffer(rel, ZS_META_BLK);
+	metapage = BufferGetPage(metabuf);
+	LockBuffer(metabuf, BUFFER_LOCK_SHARE);
+	metaopaque = (ZSMetaPageOpaque *) PageGetSpecialPointer(metapage);
+
+	firstblk = metaopaque->zs_undo_head;
+
+	oldest_undorecptr = metaopaque->zs_undo_oldestptr;
+
+	/*
+	 * If we assume that only one process can call TRIM at a time, then we
+	 * don't need to hold the metapage locked. Alternatively, if multiple
+	 * concurrent trims is possible, we could check after reading the head
+	 * page, that it is the page we expect, and re-read the metapage if it's
+	 * not.
+	 *
+	 * FIXME: Currently this works even if two backends call zsundo_trim()
+	 * concurrently, because we never recycle UNDO pages.
+	 */
+	UnlockReleaseBuffer(metabuf);
+
+	/*
+	 * Loop through UNDO records, starting from the oldest page, until we
+	 * hit a record that we cannot remove.
+	 */
+	lastblk = firstblk;
+	can_advance_oldestundorecptr = false;
+	while (lastblk != InvalidBlockNumber && !trimstats->dead_tuples_overflowed)
+	{
+		Buffer		buf;
+		Page		page;
+		ZSUndoPageOpaque *opaque;
+
+		CHECK_FOR_INTERRUPTS();
+
+		/* Read the UNDO page */
+		buf = ReadBuffer(rel, lastblk);
+		page = BufferGetPage(buf);
+		LockBuffer(buf, BUFFER_LOCK_SHARE);
+		opaque = (ZSUndoPageOpaque *) PageGetSpecialPointer(page);
+
+		if (opaque->zs_page_id != ZS_UNDO_PAGE_ID)
+			elog(ERROR, "unexpected page id on UNDO page");
+
+		/* loop through all records on the page */
+		endptr = (char *) page + ((PageHeader) page)->pd_lower;
+		ptr = (char *) page + SizeOfPageHeaderData;
+		while (ptr < endptr && !trimstats->dead_tuples_overflowed)
+		{
+			ZSUndoRec *undorec = (ZSUndoRec *) ptr;
+			bool		did_commit;
+
+			Assert(undorec->undorecptr.blkno == lastblk);
+
+			oldest_undorecptr = undorec->undorecptr;
+
+			if (undorec->undorecptr.counter < oldest_undorecptr.counter)
+			{
+				ptr += undorec->size;
+				continue;
+			}
+
+			if (!TransactionIdPrecedes(undorec->xid, OldestXmin))
+			{
+				/* This is still needed. Bail out */
+				break;
+			}
+
+			/*
+			 * No one thinks this transaction is in-progress anymore. If it
+			 * committed, we can just trim away its UNDO record. If it aborted,
+			 * we need to apply the UNDO record first.
+			 *
+			 * TODO: applying UNDO actions has not been implemented, so if we
+			 * encounter an aborted record, we just stop dead right there, and
+			 * never trim past that point.
+			 */
+			did_commit = TransactionIdDidCommit(undorec->xid);
+
+			switch (undorec->type)
+			{
+				case ZSUNDO_TYPE_INSERT:
+					if (!did_commit)
+						zsundo_record_dead_tuple(trimstats, undorec->tid);
+					break;
+				case ZSUNDO_TYPE_DELETE:
+					if (did_commit)
+						zsundo_record_dead_tuple(trimstats, undorec->tid);
+					break;
+				case ZSUNDO_TYPE_UPDATE:
+					if (did_commit)
+						zsundo_record_dead_tuple(trimstats, ((ZSUndoRec_Update *) undorec)->otid);
+					else
+						zsundo_record_dead_tuple(trimstats, undorec->tid);
+					break;
+			}
+			ptr += undorec->size;
+
+			can_advance_oldestundorecptr = true;
+		}
+
+		if (ptr < endptr)
+		{
+			UnlockReleaseBuffer(buf);
+			break;
+		}
+		else
+		{
+			/* We processed all records on the page. Step to the next one, if any. */
+			Assert(ptr == endptr);
+			lastblk = opaque->next;
+			UnlockReleaseBuffer(buf);
+			if (lastblk != InvalidBlockNumber)
+				trimstats->deleted_undo_pages++;
+		}
+	}
+
+	trimstats->can_advance_oldestundorecptr = can_advance_oldestundorecptr;
+	*oldest_undopage = lastblk;
+	return oldest_undorecptr;
+}
+
+/* Update metapage with the oldest value */
+static void
+zsundo_update_oldest_ptr(Relation rel, ZSUndoRecPtr oldest_undorecptr, BlockNumber oldest_undopage)
+{
+	/* Scan the undo log from oldest to newest */
+	Buffer		metabuf;
+	Page		metapage;
+	ZSMetaPageOpaque *metaopaque;
+
+	metabuf = ReadBuffer(rel, ZS_META_BLK);
+	metapage = BufferGetPage(metabuf);
+	LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
+	metaopaque = (ZSMetaPageOpaque *) PageGetSpecialPointer(metapage);
+
+	metaopaque->zs_undo_oldestptr = oldest_undorecptr;
+	if (oldest_undopage == InvalidBlockNumber)
+	{
+		metaopaque->zs_undo_head = InvalidBlockNumber;
+		metaopaque->zs_undo_tail = InvalidBlockNumber;
+	}
+	else
+		metaopaque->zs_undo_head = oldest_undopage;
+
+	/* TODO: WAL-log */
+
+	MarkBufferDirty(metabuf);
+	UnlockReleaseBuffer(metabuf);
+
+	/* TODO: we leak all the removed undo pages */
+}
+
+/*
+ * zsundo_record_dead_tuple - remember one deletable tuple
+ */
+static void
+zsundo_record_dead_tuple(ZSUndoTrimStats *trimstats, zstid tid)
+{
+	/*
+	 * The array shouldn't overflow under normal behavior, but perhaps it
+	 * could if we are given a really small maintenance_work_mem. In that
+	 * case, just forget the last few tuples (we'll get 'em next time).
+	 */
+	if (trimstats->num_dead_tuples < trimstats->max_dead_tuples)
+	{
+		trimstats->dead_tuples[trimstats->num_dead_tuples] = ItemPointerFromZSTid(tid);
+		 trimstats->num_dead_tuples++;
+		pgstat_progress_update_param(PROGRESS_VACUUM_NUM_DEAD_TUPLES,
+									 trimstats->num_dead_tuples);
+	}
+	else
+		trimstats->dead_tuples_overflowed = true;
+}
+
+/*
+ * Return the current "Oldest undo pointer". The effects of any actions with
+ * undo pointer older than this is known to be visible to everyone. (i.e.
+ * an inserted tuple is known to be visible, and a deleted tuple is known to
+ * be invisible.)
+ */
+ZSUndoRecPtr
+zsundo_get_oldest_undo_ptr(Relation rel)
+{
+	ZSUndoRecPtr result;
+	ZSUndoTrimStats trimstats;
+	BlockNumber oldest_undopage;
+
+	if (RelationGetNumberOfBlocks(rel) == 0)
+	{
+		memset(&result, 0, sizeof(ZSUndoRecPtr));
+		return result;
+	}
+
+	/*
+	 * Call zsundo_scan, with max_dead_tuples = 0. It scans the UNDO log,
+	 * starting from the oldest record, and advances the oldest UNDO pointer
+	 * past as many committed, visible-to-all transactions as possible.
+	 *
+	 * TODO:
+	 * We could get the latest cached value directly from the metapage, but
+	 * this allows trimming the UNDO log more aggressively, whenever we're
+	 * scanning. Fetching records from the UNDO log is pretty expensive,
+	 * so until that is somehow sped up, it is a good tradeoff to be
+	 * aggressive about that.
+	 */
+	trimstats.num_dead_tuples = 0;
+	trimstats.max_dead_tuples = 0;
+	trimstats.dead_tuples = NULL;
+	trimstats.dead_tuples_overflowed = false;
+	trimstats.deleted_undo_pages = 0;
+	result = zsundo_scan(rel, RecentGlobalXmin, &trimstats, &oldest_undopage);
+
+	if (trimstats.can_advance_oldestundorecptr)
+		zsundo_update_oldest_ptr(rel, result, oldest_undopage);
+
+	return result;
+}
diff --git a/src/backend/access/zedstore/zedstore_visibility.c b/src/backend/access/zedstore/zedstore_visibility.c
new file mode 100644
index 0000000000..302331f2cb
--- /dev/null
+++ b/src/backend/access/zedstore/zedstore_visibility.c
@@ -0,0 +1,259 @@
+/*
+ * zedstore_visibility.c
+ *		Routines for MVCC in Zedstore
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/zedstore/zedstore_visibility.c
+ */
+#include "postgres.h"
+
+#include "access/tableam.h"
+#include "access/xact.h"
+#include "access/zedstore_internal.h"
+#include "access/zedstore_undo.h"
+#include "storage/procarray.h"
+
+/*
+ * Like HeapTupleSatisfiesUpdate.
+ */
+TM_Result
+zs_SatisfiesUpdate(ZSBtreeScan *scan, ZSUncompressedBtreeItem *item)
+{
+	Relation	rel = scan->rel;
+	Snapshot	snapshot = scan->snapshot;
+	ZSUndoRecPtr recent_oldest_undo;
+	bool		is_deleted;
+	ZSUndoRec  *undorec;
+
+	if (scan->recent_oldest_undo.counter == 0)
+		scan->recent_oldest_undo = zsundo_get_oldest_undo_ptr(scan->rel);
+	recent_oldest_undo = scan->recent_oldest_undo;
+
+	is_deleted = (item->t_flags & (ZSBT_UPDATED | ZSBT_DELETED)) != 0;
+
+	/* Is it visible? */
+	if (item->t_undo_ptr.counter < recent_oldest_undo.counter)
+	{
+		if (is_deleted)
+		{
+			/* this probably shouldn't happen.. */
+			return  TM_Invisible;
+		}
+		else
+			return  TM_Ok;
+	}
+
+	/* have to fetch the UNDO record */
+	undorec = zsundo_fetch(rel, item->t_undo_ptr);
+
+	if (!is_deleted)
+	{
+		/* Inserted tuple */
+		Assert(undorec->type == ZSUNDO_TYPE_INSERT ||
+			   undorec->type == ZSUNDO_TYPE_UPDATE);
+
+		if (TransactionIdIsCurrentTransactionId(undorec->xid))
+		{
+			if (undorec->cid >= snapshot->curcid)
+				return TM_Invisible;	/* inserted after scan started */
+			return TM_Ok;
+		}
+		else if (TransactionIdIsInProgress(undorec->xid))
+			return TM_Invisible;		/* inserter has not committed yet */
+		else if (TransactionIdDidCommit(undorec->xid))
+			return TM_Ok;
+		else
+		{
+			/* it must have aborted or crashed */
+			return TM_Invisible;
+		}
+	}
+	else
+	{
+		/* deleted or updated-away tuple */
+		Assert(undorec->type == ZSUNDO_TYPE_DELETE ||
+			   undorec->type == ZSUNDO_TYPE_UPDATE);
+
+		if (TransactionIdIsCurrentTransactionId(undorec->xid))
+		{
+			if (undorec->cid >= snapshot->curcid)
+				return TM_SelfModified;	/* deleted/updated after scan started */
+			else
+				return TM_Invisible;	/* deleted before scan started */
+		}
+
+		if (TransactionIdIsInProgress(undorec->xid))
+			return TM_BeingModified;
+
+		if (!TransactionIdDidCommit(undorec->xid))
+		{
+			/* deleter must have aborted or crashed */
+			return TM_Ok;
+		}
+
+		if (undorec->type == ZSUNDO_TYPE_DELETE)
+			return TM_Deleted;
+		else
+			return TM_Updated;
+	}
+}
+
+/*
+ * Like HeapTupleSatisfiesAny
+ */
+static bool
+zs_SatisfiesAny(ZSBtreeScan *scan, ZSUncompressedBtreeItem *item)
+{
+	return true;
+}
+
+/*
+ * helper function to zs_SatisfiesMVCC(), to check if the given XID
+ * is visible to the snapshot.
+ */
+static bool
+xid_is_visible(Snapshot snapshot, TransactionId xid, CommandId cid)
+{
+	if (TransactionIdIsCurrentTransactionId(xid))
+	{
+		if (cid >= snapshot->curcid)
+			return false;
+		else
+			return true;
+	}
+	else if (XidInMVCCSnapshot(xid, snapshot))
+		return false;
+	else if (TransactionIdDidCommit(xid))
+	{
+		return true;
+	}
+	else
+	{
+		/* it must have aborted or crashed */
+		return false;
+	}
+}
+
+/*
+ * Like HeapTupleSatisfiesMVCC
+ */
+static bool
+zs_SatisfiesMVCC(ZSBtreeScan *scan, ZSUncompressedBtreeItem *item)
+{
+	Relation	rel = scan->rel;
+	Snapshot	snapshot = scan->snapshot;
+	ZSUndoRecPtr recent_oldest_undo = scan->recent_oldest_undo;
+	ZSUndoRec  *undorec;
+	bool		is_deleted;
+
+	Assert (snapshot->snapshot_type == SNAPSHOT_MVCC);
+
+	is_deleted = (item->t_flags & (ZSBT_UPDATED | ZSBT_DELETED)) != 0;
+
+	if (item->t_undo_ptr.counter < recent_oldest_undo.counter)
+	{
+		if (!is_deleted)
+			return true;
+		else
+			return false;
+	}
+
+	/* have to fetch the UNDO record */
+	undorec = zsundo_fetch(rel, item->t_undo_ptr);
+
+	if (!is_deleted)
+	{
+		/* Inserted tuple */
+		Assert(undorec->type == ZSUNDO_TYPE_INSERT ||
+			   undorec->type == ZSUNDO_TYPE_UPDATE);
+
+		return xid_is_visible(snapshot, undorec->xid, undorec->cid);
+	}
+	else
+	{
+		/* deleted or updated-away tuple */
+		Assert(undorec->type == ZSUNDO_TYPE_DELETE ||
+			   undorec->type == ZSUNDO_TYPE_UPDATE);
+
+		if (xid_is_visible(snapshot, undorec->xid, undorec->cid))
+		{
+			/* we can see the deletion */
+			return false;
+		}
+		else
+		{
+			/*
+			 * The deleting XID is not visible to us. But before concluding
+			 * that the tuple is visible, we have to check if the inserting
+			 * XID is visible to us.
+			 */
+			ZSUndoRecPtr	prevptr;
+
+			if (undorec->type == ZSUNDO_TYPE_DELETE)
+				prevptr = ((ZSUndoRec_Delete *) undorec)->prevundorec;
+			else
+				prevptr = ((ZSUndoRec_Update *) undorec)->prevundorec;
+
+			if (prevptr.counter < recent_oldest_undo.counter)
+				return true;
+
+			undorec = zsundo_fetch(rel, prevptr);
+
+			Assert(undorec->type == ZSUNDO_TYPE_INSERT ||
+				   undorec->type == ZSUNDO_TYPE_UPDATE);
+			if (xid_is_visible(snapshot, undorec->xid, undorec->cid))
+				return true;	/* we can see the insert, but not the delete */
+			else
+				return false;	/* we cannot see the insert */
+		}
+	}
+}
+
+/*
+ * Like HeapTupleSatisfiesVisibility
+ */
+bool
+zs_SatisfiesVisibility(ZSBtreeScan *scan, ZSUncompressedBtreeItem *item)
+{
+	/*
+	 * If we don't have a cached oldest-undo-ptr value yet, fetch it
+	 * from the metapage. (TODO: In the final EDB's UNDO-log implementation
+	 * this will probably be just a global variable, like RecentGlobalXmin.)
+	 */
+	if (scan->recent_oldest_undo.counter == 0)
+		scan->recent_oldest_undo = zsundo_get_oldest_undo_ptr(scan->rel);
+
+	/* dead items are never considered visible. */
+	if ((item->t_flags & ZSBT_DEAD) != 0)
+		return false;
+
+	switch (scan->snapshot->snapshot_type)
+	{
+		case SNAPSHOT_MVCC:
+			return zs_SatisfiesMVCC(scan, item);
+			break;
+		case SNAPSHOT_SELF:
+			elog(ERROR, "SnapshotSelf not implemented in zedstore yet");
+			break;
+		case SNAPSHOT_ANY:
+			return zs_SatisfiesAny(scan, item);
+			break;
+		case SNAPSHOT_TOAST:
+			elog(ERROR, "SnapshotToast not implemented in zedstore");
+			break;
+		case SNAPSHOT_DIRTY:
+			elog(ERROR, "SnapshotDirty not implemented in zedstore yet");
+			break;
+		case SNAPSHOT_HISTORIC_MVCC:
+			elog(ERROR, "SnapshotHistoricMVCC not implemented in zedstore yet");
+			break;
+		case SNAPSHOT_NON_VACUUMABLE:
+			elog(ERROR, "SnapshotNonVacuumable not implemented in zedstore yet");
+			break;
+	}
+
+	return false;				/* keep compiler quiet */
+}
diff --git a/src/backend/access/zedstore/zedstoream_handler.c b/src/backend/access/zedstore/zedstoream_handler.c
new file mode 100644
index 0000000000..84575260fb
--- /dev/null
+++ b/src/backend/access/zedstore/zedstoream_handler.c
@@ -0,0 +1,1500 @@
+/*-------------------------------------------------------------------------
+ *
+ * zedstoream_handler.c
+ *	  ZedStore table access method code
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/zedstore/zedstoream_handler.c
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <math.h>
+
+#include "miscadmin.h"
+
+#include "access/heapam.h"
+#include "access/multixact.h"
+#include "access/relscan.h"
+#include "access/tableam.h"
+#include "access/xact.h"
+#include "access/zedstore_internal.h"
+#include "access/zedstore_undo.h"
+#include "catalog/catalog.h"
+#include "catalog/index.h"
+#include "catalog/storage.h"
+#include "catalog/storage_xlog.h"
+#include "commands/vacuum.h"
+#include "executor/executor.h"
+#include "optimizer/plancat.h"
+#include "storage/procarray.h"
+#include "utils/builtins.h"
+#include "utils/rel.h"
+
+
+typedef enum
+{
+	ZSSCAN_STATE_UNSTARTED,
+	ZSSCAN_STATE_SCANNING,
+	ZSSCAN_STATE_FINISHED_RANGE,
+	ZSSCAN_STATE_FINISHED
+} zs_scan_state;
+
+typedef struct ZedStoreDescData
+{
+	/* scan parameters */
+	TableScanDescData rs_scan;  /* */
+	int		   *proj_atts;
+	ZSBtreeScan *btree_scans;
+	int			num_proj_atts;
+
+	zs_scan_state state;
+	zstid		cur_range_start;
+	zstid		cur_range_end;
+	bool		finished;
+
+	/* These fields are used for bitmap scans, to hold a "block's" worth of data */
+#define	MAX_ITEMS_PER_LOGICAL_BLOCK		MaxHeapTuplesPerPage
+	int			bmscan_ntuples;
+	zstid	   *bmscan_tids;
+	Datum	  **bmscan_datums;
+	bool	  **bmscan_isnulls;
+	int			bmscan_nexttuple;
+
+} ZedStoreDescData;
+
+typedef struct ZedStoreDescData *ZedStoreDesc;
+
+typedef struct ZedStoreIndexFetchData
+{
+	IndexFetchTableData idx_fetch_data;
+	int		   *proj_atts;
+	int			num_proj_atts;
+} ZedStoreIndexFetchData;
+
+typedef struct ZedStoreIndexFetchData *ZedStoreIndexFetch;
+
+typedef struct ParallelZSScanDescData *ParallelZSScanDesc;
+
+static Size zs_parallelscan_estimate(Relation rel);
+static Size zs_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan);
+static void zs_parallelscan_reinitialize(Relation rel, ParallelTableScanDesc pscan);
+static bool zs_parallelscan_nextrange(Relation rel, ParallelZSScanDesc pzscan,
+									  zstid *start, zstid *end);
+
+
+/* ----------------------------------------------------------------
+ *				storage AM support routines for zedstoream
+ * ----------------------------------------------------------------
+ */
+
+static bool
+zedstoream_fetch_row_version(Relation relation,
+							 ItemPointer tid,
+							 Snapshot snapshot,
+							 TupleTableSlot *slot)
+{
+	ereport(ERROR,
+			(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+			 errmsg("function %s not implemented yet", __func__)));
+	return false;
+}
+
+static void
+zedstoream_get_latest_tid(Relation relation,
+							 Snapshot snapshot,
+							 ItemPointer tid)
+{
+	ereport(ERROR,
+			(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+			 errmsg("function %s not implemented yet", __func__)));
+}
+
+static void
+zedstoream_insert(Relation relation, TupleTableSlot *slot, CommandId cid,
+				  int options, struct BulkInsertStateData *bistate)
+{
+	AttrNumber	attno;
+	Datum	   *d;
+	bool	   *isnulls;
+	zstid		tid;
+	ZSUndoRecPtr undorecptr;
+	TransactionId xid = GetCurrentTransactionId();
+
+	slot_getallattrs(slot);
+	d = slot->tts_values;
+	isnulls = slot->tts_isnull;
+
+	tid = InvalidZSTid;
+	ZSUndoRecPtrInitialize(&undorecptr);
+	for (attno = 1; attno <= relation->rd_att->natts; attno++)
+	{
+		Form_pg_attribute attr = &relation->rd_att->attrs[attno - 1];
+		Datum		datum = d[attno - 1];
+		bool		isnull = isnulls[attno - 1];
+		Datum		toastptr = (Datum) 0;
+
+		/* If this datum is too large, toast it */
+		if (!isnull && attr->attlen < 0 &&
+			VARSIZE_ANY_EXHDR(datum) > MaxZedStoreDatumSize)
+		{
+			toastptr = datum = zedstore_toast_datum(relation, attno, datum);
+		}
+
+		tid = zsbt_insert(relation, attno, datum, isnull, xid, cid, tid, &undorecptr);
+
+		if (toastptr != (Datum) 0)
+			zedstore_toast_finish(relation, attno, toastptr, tid);
+	}
+
+	slot->tts_tableOid = RelationGetRelid(relation);
+	slot->tts_tid = ItemPointerFromZSTid(tid);
+}
+
+static void
+zedstoream_insert_speculative(Relation relation, TupleTableSlot *slot, CommandId cid,
+							  int options, BulkInsertState bistate, uint32 specToken)
+{
+	ereport(ERROR,
+			(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+			 errmsg("function %s not implemented yet", __func__)));
+}
+
+static void
+zedstoream_complete_speculative(Relation relation, TupleTableSlot *slot, uint32 spekToken,
+								bool succeeded)
+{
+	ereport(ERROR,
+			(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+			 errmsg("function %s not implemented yet", __func__)));
+}
+
+static void
+zedstoream_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
+						CommandId cid, int options, BulkInsertState bistate)
+{
+	AttrNumber	attno;
+	zstid	   *tid;
+	ZSUndoRecPtr undorecptr;
+	int			i;
+	bool		slotgetandset = true;
+	TransactionId xid = GetCurrentTransactionId();
+	int		   *tupletoasted;
+	Datum	   *toastdatum;
+
+	tid = palloc(ntuples * sizeof(zstid));
+	tupletoasted = palloc(ntuples * sizeof(int));
+	toastdatum = palloc(ntuples * sizeof(Datum));
+
+	ZSUndoRecPtrInitialize(&undorecptr);
+
+	for (attno = 1; attno <= relation->rd_att->natts; attno++)
+	{
+		Form_pg_attribute attr = &relation->rd_att->attrs[attno - 1];
+		int			ntupletoasted = 0;
+		List	   *zitems = NIL;
+
+		for (i = 0; i < ntuples; i++)
+		{
+			ZSUncompressedBtreeItem *zitem;
+			Datum		datum = slots[i]->tts_values[attno - 1];
+			bool		isnull = slots[i]->tts_isnull[attno - 1];
+
+			if (slotgetandset)
+			{
+				slot_getallattrs(slots[i]);
+				tid[i] = InvalidZSTid;
+			}
+
+			/* If this datum is too large, toast it */
+			if (!isnull && attr->attlen < 0 &&
+				VARSIZE_ANY_EXHDR(datum) > MaxZedStoreDatumSize)
+			{
+				datum = toastdatum[ntupletoasted] = zedstore_toast_datum(relation, attno, datum);
+				tupletoasted[ntupletoasted++] = i;
+			}
+
+			zitem = zsbt_create_item(attr, tid[i], datum, isnull);
+			zitems = lappend(zitems, zitem);
+		}
+
+		zsbt_insert_multi_items(relation, attno, zitems, xid, cid, &undorecptr, tid);
+
+		for (i = 0; i < ntupletoasted; i++)
+		{
+			zedstore_toast_finish(relation, attno, toastdatum[tupletoasted[i]],
+								  tid[tupletoasted[i]]);
+		}
+
+		slotgetandset = false;
+	}
+
+	for (i = 0; i < ntuples; i++)
+	{
+		slots[i]->tts_tableOid = RelationGetRelid(relation);
+		slots[i]->tts_tid = ItemPointerFromZSTid(tid[i]);
+	}
+
+	pfree(tid);
+	pfree(tupletoasted);
+	pfree(toastdatum);
+}
+
+static TM_Result
+zedstoream_delete(Relation relation, ItemPointer tid_p, CommandId cid,
+				  Snapshot snapshot, Snapshot crosscheck, bool wait,
+				  TM_FailureData *hufd, bool changingPart)
+{
+	zstid		tid = ZSTidFromItemPointer(*tid_p);
+	TransactionId xid = GetCurrentTransactionId();
+	AttrNumber	attno;
+
+	for (attno = 1; attno <= relation->rd_att->natts; attno++)
+	{
+		TM_Result result;
+
+		result = zsbt_delete(relation, attno, tid, xid, cid,
+							 snapshot, crosscheck, wait, hufd, changingPart);
+
+		/*
+		 * TODO: Here, we should check for TM_BeingModified, like heap_delete()
+		 * does
+		 */
+
+		if (result != TM_Ok)
+		{
+			if (attno != 1)
+			{
+				/* failed to delete this attribute, but we might already have
+				 * deleted other attributes. */
+				elog(ERROR, "could not delete all columns of row");
+			}
+			return result;
+		}
+	}
+
+	return TM_Ok;
+}
+
+
+static TM_Result
+zedstoream_lock_tuple(Relation relation, ItemPointer tid, Snapshot snapshot,
+					  TupleTableSlot *slot, CommandId cid, LockTupleMode mode,
+					  LockWaitPolicy wait_policy, uint8 flags,
+					  TM_FailureData *hufd)
+{
+	ereport(ERROR,
+			(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+			 errmsg("function %s not implemented yet", __func__)));
+}
+
+
+static TM_Result
+zedstoream_update(Relation relation, ItemPointer otid_p, TupleTableSlot *slot,
+				  CommandId cid, Snapshot snapshot, Snapshot crosscheck,
+				  bool wait, TM_FailureData *hufd,
+				  LockTupleMode *lockmode, bool *update_indexes)
+{
+	zstid		otid = ZSTidFromItemPointer(*otid_p);
+	TransactionId xid = GetCurrentTransactionId();
+	AttrNumber	attno;
+	Datum	   *d;
+	bool	   *isnulls;
+	TM_Result	result;
+	zstid		newtid;
+
+	slot_getallattrs(slot);
+	d = slot->tts_values;
+	isnulls = slot->tts_isnull;
+
+	/*
+	 * TODO: Since we have visibility information on each column, we could skip
+	 * updating columns whose value didn't change.
+	 */
+
+	result = TM_Ok;
+	newtid = InvalidZSTid;
+	for (attno = 1; attno <= relation->rd_att->natts; attno++)
+	{
+		Form_pg_attribute attr = &relation->rd_att->attrs[attno - 1];
+		Datum		newdatum = d[attno - 1];
+		bool		newisnull = isnulls[attno - 1];
+		Datum		toastptr = (Datum) 0;
+		TM_Result this_result;
+
+		/* If this datum is too large, toast it */
+		if (!newisnull && attr->attlen < 0 &&
+			VARSIZE_ANY_EXHDR(newdatum) > MaxZedStoreDatumSize)
+		{
+			toastptr = newdatum = zedstore_toast_datum(relation, attno, newdatum);
+		}
+
+		this_result = zsbt_update(relation, attno, otid, newdatum, newisnull,
+								  xid, cid, snapshot, crosscheck,
+								  wait, hufd, &newtid);
+
+		if (this_result != TM_Ok)
+		{
+			/* FIXME: hmm, failed to delete this attribute, but we might already have
+			 * deleted other attributes. Error? */
+			/* FIXME: this leaks the toast chain on failure */
+			result = this_result;
+			break;
+		}
+
+		if (toastptr != (Datum) 0)
+			zedstore_toast_finish(relation, attno, toastptr, newtid);
+	}
+	slot->tts_tid = ItemPointerFromZSTid(newtid);
+
+	/* TODO: could we do HOT udates? */
+	/* TODO: What should we set lockmode to? */
+
+	return result;
+}
+
+static const TupleTableSlotOps *
+zedstoream_slot_callbacks(Relation relation)
+{
+	return &TTSOpsVirtual;
+}
+
+static TableScanDesc
+zedstoream_beginscan_with_column_projection(Relation relation, Snapshot snapshot,
+											int nkeys, ScanKey key,
+											ParallelTableScanDesc parallel_scan,
+											bool *project_columns,
+											bool allow_strat,
+											bool allow_sync,
+											bool allow_pagemode,
+											bool is_bitmapscan,
+											bool is_samplescan,
+											bool temp_snap)
+{
+	ZedStoreDesc scan;
+
+	/*
+	 * allocate and initialize scan descriptor
+	 */
+	scan = (ZedStoreDesc) palloc(sizeof(ZedStoreDescData));
+
+	scan->rs_scan.rs_rd = relation;
+	scan->rs_scan.rs_snapshot = snapshot;
+	scan->rs_scan.rs_nkeys = nkeys;
+	scan->rs_scan.rs_bitmapscan = is_bitmapscan;
+	scan->rs_scan.rs_samplescan = is_samplescan;
+	scan->rs_scan.rs_allow_strat = allow_strat;
+	scan->rs_scan.rs_allow_sync = allow_sync;
+	scan->rs_scan.rs_temp_snap = temp_snap;
+	scan->rs_scan.rs_parallel = parallel_scan;
+
+	/*
+	 * we can use page-at-a-time mode if it's an MVCC-safe snapshot
+	 */
+	scan->rs_scan.rs_pageatatime = allow_pagemode && snapshot && IsMVCCSnapshot(snapshot);
+
+	scan->state = ZSSCAN_STATE_UNSTARTED;
+
+	/*
+	 * we do this here instead of in initscan() because heap_rescan also calls
+	 * initscan() and we don't want to allocate memory again
+	 */
+	if (nkeys > 0)
+		scan->rs_scan.rs_key = (ScanKey) palloc(sizeof(ScanKeyData) * nkeys);
+	else
+		scan->rs_scan.rs_key = NULL;
+
+	scan->proj_atts = palloc(relation->rd_att->natts * sizeof(int));
+
+	scan->btree_scans = palloc0(relation->rd_att->natts * sizeof(ZSBtreeScan));
+	scan->num_proj_atts = 0;
+
+	/*
+	 * convert booleans array into an array of the attribute numbers of the
+	 * required columns.
+	 */
+	for (int i = 0; i < relation->rd_att->natts; i++)
+	{
+		/* project_columns empty also conveys need all the columns */
+		if (project_columns == NULL || project_columns[i])
+		{
+			scan->proj_atts[scan->num_proj_atts++] = i;
+		}
+	}
+
+	/* Extra setup for bitmap scans */
+	if (is_bitmapscan)
+	{
+		scan->bmscan_ntuples = 0;
+		scan->bmscan_tids = palloc(MAX_ITEMS_PER_LOGICAL_BLOCK * sizeof(zstid));
+
+		scan->bmscan_datums = palloc(scan->num_proj_atts * sizeof(Datum *));
+		scan->bmscan_isnulls = palloc(scan->num_proj_atts * sizeof(bool *));
+		for (int i = 0; i < scan->num_proj_atts; i++)
+		{
+			scan->bmscan_datums[i] = palloc(MAX_ITEMS_PER_LOGICAL_BLOCK * sizeof(Datum));
+			scan->bmscan_isnulls[i] = palloc(MAX_ITEMS_PER_LOGICAL_BLOCK * sizeof(bool));
+		}
+	}
+
+	return (TableScanDesc) scan;
+}
+
+static TableScanDesc
+zedstoream_beginscan(Relation relation, Snapshot snapshot,
+					 int nkeys, ScanKey key,
+					 ParallelTableScanDesc parallel_scan,
+					 bool allow_strat,
+					 bool allow_sync,
+					 bool allow_pagemode,
+					 bool is_bitmapscan,
+					 bool is_samplescan,
+					 bool temp_snap)
+{
+	return zedstoream_beginscan_with_column_projection(relation, snapshot, nkeys, key, parallel_scan,
+													   NULL, allow_strat, allow_sync, allow_pagemode,
+													   is_bitmapscan, is_samplescan, temp_snap);
+}
+
+static void
+zedstoream_endscan(TableScanDesc sscan)
+{
+	ZedStoreDesc scan = (ZedStoreDesc) sscan;
+
+	if (scan->proj_atts)
+		pfree(scan->proj_atts);
+
+	for (int i = 0; i < sscan->rs_rd->rd_att->natts; i++)
+		zsbt_end_scan(&scan->btree_scans[i]);
+
+	if (scan->rs_scan.rs_temp_snap)
+		UnregisterSnapshot(scan->rs_scan.rs_snapshot);
+
+	pfree(scan->btree_scans);
+	pfree(scan);
+}
+
+static bool
+zedstoream_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot)
+{
+	ZedStoreDesc scan = (ZedStoreDesc) sscan;
+
+	Assert(scan->num_proj_atts <= slot->tts_tupleDescriptor->natts);
+
+	slot->tts_nvalid = 0;
+	slot->tts_flags |= TTS_FLAG_EMPTY;
+
+	while (scan->state != ZSSCAN_STATE_FINISHED)
+	{
+		zstid		this_tid;
+
+		if (scan->state == ZSSCAN_STATE_UNSTARTED ||
+			scan->state == ZSSCAN_STATE_FINISHED_RANGE)
+		{
+			if (scan->rs_scan.rs_parallel)
+			{
+				/* Allocate next range of TIDs to scan */
+				if (!zs_parallelscan_nextrange(scan->rs_scan.rs_rd,
+											   (ParallelZSScanDesc) scan->rs_scan.rs_parallel,
+											   &scan->cur_range_start, &scan->cur_range_end))
+				{
+					scan->state = ZSSCAN_STATE_FINISHED;
+					break;
+				}
+			}
+			else
+			{
+				if (scan->state == ZSSCAN_STATE_FINISHED_RANGE)
+				{
+					scan->state = ZSSCAN_STATE_FINISHED;
+					break;
+				}
+				scan->cur_range_start = MinZSTid;
+				scan->cur_range_end = MaxPlusOneZSTid;
+			}
+
+			for (int i = 0; i < scan->num_proj_atts; i++)
+			{
+				int			natt = scan->proj_atts[i];
+
+				zsbt_begin_scan(scan->rs_scan.rs_rd, natt + 1,
+								scan->cur_range_start,
+								scan->rs_scan.rs_snapshot,
+								&scan->btree_scans[i]);
+			}
+			scan->state = ZSSCAN_STATE_SCANNING;
+		}
+
+		/* We now have a range to scan */
+		Assert(scan->state == ZSSCAN_STATE_SCANNING);
+		this_tid = InvalidZSTid;
+		for (int i = 0; i < scan->num_proj_atts; i++)
+		{
+			int			natt = scan->proj_atts[i];
+			Form_pg_attribute att = &scan->rs_scan.rs_rd->rd_att->attrs[natt];
+			Datum		datum;
+			bool        isnull;
+			zstid		tid;
+
+			if (!zsbt_scan_next(&scan->btree_scans[i], &datum, &isnull, &tid))
+			{
+				scan->state = ZSSCAN_STATE_FINISHED_RANGE;
+				break;
+			}
+			if (tid >= scan->cur_range_end)
+			{
+				scan->state = ZSSCAN_STATE_FINISHED_RANGE;
+				break;
+			}
+
+			if (i == 0)
+				this_tid = tid;
+			else if (this_tid != tid)
+			{
+				elog(ERROR, "scans on different attributes out of sync");
+			}
+
+			/*
+			 * flatten any ZS-TOASTed values, becaue the rest of the system
+			 * doesn't know how to deal with them.
+			 */
+			if (!isnull && att->attlen == -1 &&
+				VARATT_IS_EXTERNAL(datum) && VARTAG_EXTERNAL(datum) == VARTAG_ZEDSTORE)
+			{
+				datum = zedstore_toast_flatten(scan->rs_scan.rs_rd, natt + 1, tid, datum);
+			}
+
+			slot->tts_values[natt] = datum;
+			slot->tts_isnull[natt] = isnull;
+		}
+
+		if (scan->state == ZSSCAN_STATE_FINISHED_RANGE)
+		{
+			for (int i = 0; i < scan->num_proj_atts; i++)
+			{
+				int			natt = scan->proj_atts[i];
+
+				zsbt_end_scan(&scan->btree_scans[natt]);
+			}
+		}
+		else
+		{
+			Assert(scan->state == ZSSCAN_STATE_SCANNING);
+			slot->tts_tid = ItemPointerFromZSTid(this_tid);
+			slot->tts_nvalid = slot->tts_tupleDescriptor->natts;
+			slot->tts_flags &= ~TTS_FLAG_EMPTY;
+			return true;
+		}
+	}
+
+	ExecClearTuple(slot);
+	return false;
+}
+
+static bool
+zedstoream_tuple_satisfies_snapshot(Relation rel, TupleTableSlot *slot,
+									Snapshot snapshot)
+{
+	ereport(ERROR,
+			(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+			 errmsg("function %s not implemented yet", __func__)));
+}
+
+static TransactionId
+zedstoream_compute_xid_horizon_for_tuples(Relation rel,
+										  ItemPointerData *items,
+										  int nitems)
+{
+	ereport(ERROR,
+			(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+			 errmsg("function %s not implemented yet", __func__)));
+
+}
+
+static IndexFetchTableData *
+zedstoream_begin_index_fetch(Relation rel)
+{
+	ZedStoreIndexFetch zscan = palloc0(sizeof(ZedStoreIndexFetchData));
+
+	zscan->idx_fetch_data.rel = rel;
+
+	zscan->proj_atts = palloc(rel->rd_att->natts * sizeof(int));
+	zscan->num_proj_atts = 0;
+
+	return (IndexFetchTableData *) zscan;
+}
+
+static void
+zedstoream_fetch_set_column_projection(struct IndexFetchTableData *scan,
+									   bool *project_column)
+{
+	ZedStoreIndexFetch zscan = (ZedStoreIndexFetch)scan;
+	Relation	rel = zscan->idx_fetch_data.rel;
+
+	zscan->num_proj_atts = 0;
+
+	/*
+	 * convert booleans array into an array of the attribute numbers of the
+	 * required columns.
+	 */
+	for (int i = 0; i < rel->rd_att->natts; i++)
+	{
+		/* if project_columns is empty means need all the columns */
+		if (project_column == NULL || project_column[i])
+		{
+			zscan->proj_atts[zscan->num_proj_atts++] = i;
+		}
+	}
+}
+
+static void
+zedstoream_reset_index_fetch(IndexFetchTableData *scan)
+{
+}
+
+static void
+zedstoream_end_index_fetch(IndexFetchTableData *scan)
+{
+	ZedStoreIndexFetch zscan = (ZedStoreIndexFetch) scan;
+
+	pfree(zscan->proj_atts);
+	pfree(zscan);
+}
+
+static bool
+zedstoream_index_fetch_tuple(struct IndexFetchTableData *scan,
+							 ItemPointer tid_p,
+							 Snapshot snapshot,
+							 TupleTableSlot *slot,
+							 bool *call_again, bool *all_dead)
+{
+	ZedStoreIndexFetch zscan = (ZedStoreIndexFetch) scan;
+	Relation	rel = zscan->idx_fetch_data.rel;
+	zstid		tid = ZSTidFromItemPointer(*tid_p);
+	bool		found = true;
+
+	/*
+	 * if executor didn't set the column projections, need to return all the
+	 * columns.
+	 */
+	if (zscan->num_proj_atts == 0)
+	{
+		for (int i = 0; i < rel->rd_att->natts; i++)
+			zscan->proj_atts[zscan->num_proj_atts++] = i;
+	}
+
+	for (int i = 0; i < zscan->num_proj_atts && found; i++)
+	{
+		int         natt = zscan->proj_atts[i];
+		Form_pg_attribute att = &rel->rd_att->attrs[natt];
+		ZSBtreeScan btree_scan;
+		Datum		datum;
+		bool        isnull;
+		zstid		this_tid;
+
+		if (att->attisdropped)
+			continue;
+
+		zsbt_begin_scan(rel, natt + 1, tid, snapshot, &btree_scan);
+
+		if (zsbt_scan_next(&btree_scan, &datum, &isnull, &this_tid))
+		{
+			if (this_tid != tid)
+				found = false;
+			else
+			{
+				slot->tts_values[natt] = datum;
+				slot->tts_isnull[natt] = isnull;
+			}
+		}
+		else
+			found = false;
+
+		zsbt_end_scan(&btree_scan);
+	}
+
+	if (found)
+	{
+		slot->tts_tid = ItemPointerFromZSTid(tid);
+		slot->tts_nvalid = slot->tts_tupleDescriptor->natts;
+		slot->tts_flags &= ~TTS_FLAG_EMPTY;
+		return true;
+	}
+	else
+	{
+		/*
+		 * not found
+		 *
+		 * TODO: as a sanity check, it would be good to check if we
+		 * get *any* of the columns. Currently, if any of the columns
+		 * is missing, we treat the tuple as non-existent
+		 */
+		return false;
+	}
+}
+
+static void
+zedstoream_index_validate_scan(Relation heapRelation,
+							   Relation indexRelation,
+							   IndexInfo *indexInfo,
+							   Snapshot snapshot,
+							   ValidateIndexState *state)
+{
+	ereport(ERROR,
+			(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+			 errmsg("function %s not implemented yet", __func__)));
+}
+
+static double
+zedstoream_index_build_range_scan(Relation baseRelation,
+								  Relation indexRelation,
+								  IndexInfo *indexInfo,
+								  bool allow_sync,
+								  bool anyvisible,
+								  bool progress,
+								  BlockNumber start_blockno,
+								  BlockNumber numblocks,
+								  IndexBuildCallback callback,
+								  void *callback_state,
+								  TableScanDesc scan)
+{
+	bool		checking_uniqueness;
+	Datum		values[INDEX_MAX_KEYS];
+	bool		isnull[INDEX_MAX_KEYS];
+	double		reltuples;
+	ExprState  *predicate;
+	TupleTableSlot *slot;
+	EState	   *estate;
+	ExprContext *econtext;
+	Snapshot	snapshot;
+	bool		need_unregister_snapshot = false;
+	TransactionId OldestXmin;
+
+	/*
+	 * sanity checks
+	 */
+	Assert(OidIsValid(indexRelation->rd_rel->relam));
+
+	/* See whether we're verifying uniqueness/exclusion properties */
+	checking_uniqueness = (indexInfo->ii_Unique ||
+						   indexInfo->ii_ExclusionOps != NULL);
+
+	/*
+	 * "Any visible" mode is not compatible with uniqueness checks; make sure
+	 * only one of those is requested.
+	 */
+	Assert(!(anyvisible && checking_uniqueness));
+
+	/*
+	 * Need an EState for evaluation of index expressions and partial-index
+	 * predicates.  Also a slot to hold the current tuple.
+	 */
+	estate = CreateExecutorState();
+	econtext = GetPerTupleExprContext(estate);
+	slot = table_slot_create(baseRelation, NULL);
+
+	/* Arrange for econtext's scan tuple to be the tuple under test */
+	econtext->ecxt_scantuple = slot;
+
+	/* Set up execution state for predicate, if any. */
+	predicate = ExecPrepareQual(indexInfo->ii_Predicate, estate);
+
+	/*
+	 * Prepare for scan of the base relation.  In a normal index build, we use
+	 * SnapshotAny because we must retrieve all tuples and do our own time
+	 * qual checks (because we have to index RECENTLY_DEAD tuples). In a
+	 * concurrent build, or during bootstrap, we take a regular MVCC snapshot
+	 * and index whatever's live according to that.
+	 */
+	OldestXmin = InvalidTransactionId;
+
+	/* okay to ignore lazy VACUUMs here */
+	if (!IsBootstrapProcessingMode() && !indexInfo->ii_Concurrent)
+		OldestXmin = GetOldestXmin(baseRelation, PROCARRAY_FLAGS_VACUUM);
+
+	/*
+	 * TODO: It would be very good to fetch only the columns we need.
+	 */
+	if (!scan)
+	{
+		bool	   *proj;
+		int			attno;
+
+		/*
+		 * Serial index build.
+		 *
+		 * Must begin our own zedstore scan in this case.  We may also need to
+		 * register a snapshot whose lifetime is under our direct control.
+		 */
+		if (!TransactionIdIsValid(OldestXmin))
+		{
+			snapshot = RegisterSnapshot(GetTransactionSnapshot());
+			need_unregister_snapshot = true;
+		}
+		else
+			snapshot = SnapshotAny;
+
+		proj = palloc0(baseRelation->rd_att->natts * sizeof(bool));
+		for (attno = 0; attno < indexInfo->ii_NumIndexKeyAttrs; attno++)
+		{
+			Assert(indexInfo->ii_IndexAttrNumbers[attno] <= baseRelation->rd_att->natts);
+			/* skip expressions */
+			if (indexInfo->ii_IndexAttrNumbers[attno] > 0)
+				proj[indexInfo->ii_IndexAttrNumbers[attno] - 1] = true;
+		}
+
+		GetNeededColumnsForNode((Node *)indexInfo->ii_Expressions, proj,
+								baseRelation->rd_att->natts);
+
+		scan = table_beginscan_with_column_projection(baseRelation,	/* relation */
+													  snapshot,	/* snapshot */
+													  0, /* number of keys */
+													  NULL,	/* scan key */
+													  proj);
+
+		if (start_blockno != 0 || numblocks != InvalidBlockNumber)
+		{
+			ZedStoreDesc zscan = (ZedStoreDesc) scan;
+
+			zscan->cur_range_start = ZSTidFromBlkOff(start_blockno, 1);
+			zscan->cur_range_end = ZSTidFromBlkOff(numblocks, 1);
+
+			for (int i = 0; i < zscan->num_proj_atts; i++)
+			{
+				int			natt = zscan->proj_atts[i];
+
+				zsbt_begin_scan(zscan->rs_scan.rs_rd, natt + 1,
+								zscan->cur_range_start,
+								zscan->rs_scan.rs_snapshot,
+								&zscan->btree_scans[i]);
+			}
+			zscan->state = ZSSCAN_STATE_SCANNING;
+		}
+	}
+	else
+	{
+		/*
+		 * Parallel index build.
+		 *
+		 * Parallel case never registers/unregisters own snapshot.  Snapshot
+		 * is taken from parallel zedstore scan, and is SnapshotAny or an MVCC
+		 * snapshot, based on same criteria as serial case.
+		 */
+		Assert(!IsBootstrapProcessingMode());
+		Assert(allow_sync);
+		Assert(start_blockno == 0);
+		Assert(numblocks == InvalidBlockNumber);
+		snapshot = scan->rs_snapshot;
+	}
+
+	/*
+	 * Must call GetOldestXmin() with SnapshotAny.  Should never call
+	 * GetOldestXmin() with MVCC snapshot. (It's especially worth checking
+	 * this for parallel builds, since ambuild routines that support parallel
+	 * builds must work these details out for themselves.)
+	 */
+	Assert(snapshot == SnapshotAny || IsMVCCSnapshot(snapshot));
+	Assert(snapshot == SnapshotAny ? TransactionIdIsValid(OldestXmin) :
+		   !TransactionIdIsValid(OldestXmin));
+	Assert(snapshot == SnapshotAny || !anyvisible);
+
+	reltuples = 0;
+
+	/*
+	 * Scan all tuples in the base relation.
+	 */
+	while (table_scan_getnextslot(scan, ForwardScanDirection, slot))
+	{
+		bool		tupleIsAlive;
+		HeapTuple	heapTuple;
+
+		if (numblocks != InvalidBlockNumber &&
+			ItemPointerGetBlockNumber(&slot->tts_tid) >= numblocks)
+			break;
+
+		CHECK_FOR_INTERRUPTS();
+
+		/* table_scan_getnextslot did the visibility check */
+		tupleIsAlive = true;
+		reltuples += 1;
+
+		/*
+		 * TODO: Once we have in-place updates, like HOT, this will need
+		 * to work harder, to figure out which tuple version to index.
+		 */
+
+		MemoryContextReset(econtext->ecxt_per_tuple_memory);
+
+		/*
+		 * In a partial index, discard tuples that don't satisfy the
+		 * predicate.
+		 */
+		if (predicate != NULL)
+		{
+			if (!ExecQual(predicate, econtext))
+				continue;
+		}
+
+		/*
+		 * For the current heap tuple, extract all the attributes we use in
+		 * this index, and note which are null.  This also performs evaluation
+		 * of any expressions needed.
+		 */
+		FormIndexDatum(indexInfo,
+					   slot,
+					   estate,
+					   values,
+					   isnull);
+
+		/* Call the AM's callback routine to process the tuple */
+		heapTuple = ExecCopySlotHeapTuple(slot);
+		heapTuple->t_self = slot->tts_tid;
+		callback(indexRelation, heapTuple, values, isnull, tupleIsAlive,
+				 callback_state);
+		pfree(heapTuple);
+	}
+
+	table_endscan(scan);
+
+	/* we can now forget our snapshot, if set and registered by us */
+	if (need_unregister_snapshot)
+		UnregisterSnapshot(snapshot);
+
+	ExecDropSingleTupleTableSlot(slot);
+
+	FreeExecutorState(estate);
+
+	/* These may have been pointing to the now-gone estate */
+	indexInfo->ii_ExpressionsState = NIL;
+	indexInfo->ii_PredicateState = NULL;
+
+	return reltuples;
+}
+
+static void
+zedstoream_finish_bulk_insert(Relation relation, int options)
+{
+	/*
+	 * If we skipped writing WAL, then we need to sync the zedstore (but not
+	 * indexes since those use WAL anyway / don't go through tableam)
+	 */
+	if (options & HEAP_INSERT_SKIP_WAL)
+		heap_sync(relation);
+}
+
+/* ------------------------------------------------------------------------
+ * DDL related callbacks for zedstore AM.
+ * ------------------------------------------------------------------------
+ */
+
+static void
+zedstoream_relation_set_new_filenode(Relation rel, char persistence,
+									 TransactionId *freezeXid,
+									 MultiXactId *minmulti)
+{
+	/*
+	 * Initialize to the minimum XID that could put tuples in the table. We
+	 * know that no xacts older than RecentXmin are still running, so that
+	 * will do.
+	 */
+	*freezeXid = RecentXmin;
+
+	/*
+	 * Similarly, initialize the minimum Multixact to the first value that
+	 * could possibly be stored in tuples in the table.  Running transactions
+	 * could reuse values from their local cache, so we are careful to
+	 * consider all currently running multis.
+	 *
+	 * XXX this could be refined further, but is it worth the hassle?
+	 */
+	*minmulti = GetOldestMultiXactId();
+
+	RelationCreateStorage(rel->rd_node, persistence);
+
+	/*
+	 * If required, set up an init fork for an unlogged table so that it can
+	 * be correctly reinitialized on restart.  An immediate sync is required
+	 * even if the page has been logged, because the write did not go through
+	 * shared_buffers and therefore a concurrent checkpoint may have moved the
+	 * redo pointer past our xlog record.  Recovery may as well remove it
+	 * while replaying, for example, XLOG_DBASE_CREATE or XLOG_TBLSPC_CREATE
+	 * record. Therefore, logging is necessary even if wal_level=minimal.
+	 */
+	if (rel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED)
+	{
+		Assert(rel->rd_rel->relkind == RELKIND_RELATION ||
+			   rel->rd_rel->relkind == RELKIND_MATVIEW ||
+			   rel->rd_rel->relkind == RELKIND_TOASTVALUE);
+		RelationOpenSmgr(rel);
+		smgrcreate(rel->rd_smgr, INIT_FORKNUM, false);
+		log_smgrcreate(&rel->rd_smgr->smgr_rnode.node, INIT_FORKNUM);
+		smgrimmedsync(rel->rd_smgr, INIT_FORKNUM);
+	}
+}
+
+static void
+zedstoream_relation_nontransactional_truncate(Relation rel)
+{
+	RelationTruncate(rel, 0);
+}
+
+static void
+zedstoream_relation_copy_data(Relation rel, RelFileNode newrnode)
+{
+	ereport(ERROR,
+			(errcode(ERRCODE_INTERNAL_ERROR),
+			 errmsg("function %s not implemented yet", __func__)));
+}
+
+static void
+zedstoream_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
+									 Relation OldIndex, bool use_sort,
+									 TransactionId OldestXmin,
+									 TransactionId FreezeXid,
+									 MultiXactId MultiXactCutoff,
+									 double *num_tuples,
+									 double *tups_vacuumed,
+									 double *tups_recently_dead)
+{
+}
+
+static bool
+zedstoream_scan_analyze_next_block(TableScanDesc scan, BlockNumber blockno,
+								   BufferAccessStrategy bstrategy)
+{
+	ereport(ERROR,
+			(errcode(ERRCODE_INTERNAL_ERROR),
+			 errmsg("function %s not implemented yet", __func__)));
+	return false;
+}
+
+static bool
+zedstoream_scan_analyze_next_tuple(TableScanDesc scan, TransactionId OldestXmin,
+								   double *liverows, double *deadrows,
+								   TupleTableSlot *slot)
+{
+	ereport(ERROR,
+			(errcode(ERRCODE_INTERNAL_ERROR),
+			 errmsg("function %s not implemented yet", __func__)));
+	return false;
+}
+
+/* ------------------------------------------------------------------------
+ * Planner related callbacks for the zedstore AM
+ * ------------------------------------------------------------------------
+ */
+
+/*
+ * currently this is exact duplicate of heapam_estimate_rel_size().
+ * TODO fix to tune it based on zedstore storage.
+ */
+static void
+zedstoream_relation_estimate_size(Relation rel, int32 *attr_widths,
+								  BlockNumber *pages, double *tuples,
+								  double *allvisfrac)
+{
+	BlockNumber curpages;
+	BlockNumber relpages;
+	double		reltuples;
+	BlockNumber relallvisible;
+	double		density;
+
+	/* it has storage, ok to call the smgr */
+	curpages = RelationGetNumberOfBlocks(rel);
+
+	/* coerce values in pg_class to more desirable types */
+	relpages = (BlockNumber) rel->rd_rel->relpages;
+	reltuples = (double) rel->rd_rel->reltuples;
+	relallvisible = (BlockNumber) rel->rd_rel->relallvisible;
+
+	/*
+	 * HACK: if the relation has never yet been vacuumed, use a minimum size
+	 * estimate of 10 pages.  The idea here is to avoid assuming a
+	 * newly-created table is really small, even if it currently is, because
+	 * that may not be true once some data gets loaded into it.  Once a vacuum
+	 * or analyze cycle has been done on it, it's more reasonable to believe
+	 * the size is somewhat stable.
+	 *
+	 * (Note that this is only an issue if the plan gets cached and used again
+	 * after the table has been filled.  What we're trying to avoid is using a
+	 * nestloop-type plan on a table that has grown substantially since the
+	 * plan was made.  Normally, autovacuum/autoanalyze will occur once enough
+	 * inserts have happened and cause cached-plan invalidation; but that
+	 * doesn't happen instantaneously, and it won't happen at all for cases
+	 * such as temporary tables.)
+	 *
+	 * We approximate "never vacuumed" by "has relpages = 0", which means this
+	 * will also fire on genuinely empty relations.  Not great, but
+	 * fortunately that's a seldom-seen case in the real world, and it
+	 * shouldn't degrade the quality of the plan too much anyway to err in
+	 * this direction.
+	 *
+	 * If the table has inheritance children, we don't apply this heuristic.
+	 * Totally empty parent tables are quite common, so we should be willing
+	 * to believe that they are empty.
+	 */
+	if (curpages < 10 &&
+		relpages == 0 &&
+		!rel->rd_rel->relhassubclass)
+		curpages = 10;
+
+	/* report estimated # pages */
+	*pages = curpages;
+	/* quick exit if rel is clearly empty */
+	if (curpages == 0)
+	{
+		*tuples = 0;
+		*allvisfrac = 0;
+		return;
+	}
+
+	/* estimate number of tuples from previous tuple density */
+	if (relpages > 0)
+		density = reltuples / (double) relpages;
+	else
+	{
+		/*
+		 * When we have no data because the relation was truncated, estimate
+		 * tuple width from attribute datatypes.  We assume here that the
+		 * pages are completely full, which is OK for tables (since they've
+		 * presumably not been VACUUMed yet) but is probably an overestimate
+		 * for indexes.  Fortunately get_relation_info() can clamp the
+		 * overestimate to the parent table's size.
+		 *
+		 * Note: this code intentionally disregards alignment considerations,
+		 * because (a) that would be gilding the lily considering how crude
+		 * the estimate is, and (b) it creates platform dependencies in the
+		 * default plans which are kind of a headache for regression testing.
+		 */
+		int32		tuple_width;
+
+		tuple_width = get_rel_data_width(rel, attr_widths);
+		tuple_width += MAXALIGN(SizeofHeapTupleHeader);
+		tuple_width += sizeof(ItemIdData);
+		/* note: integer division is intentional here */
+		density = (BLCKSZ - SizeOfPageHeaderData) / tuple_width;
+	}
+	*tuples = rint(density * (double) curpages);
+
+	/*
+	 * We use relallvisible as-is, rather than scaling it up like we do for
+	 * the pages and tuples counts, on the theory that any pages added since
+	 * the last VACUUM are most likely not marked all-visible.  But costsize.c
+	 * wants it converted to a fraction.
+	 */
+	if (relallvisible == 0 || curpages <= 0)
+		*allvisfrac = 0;
+	else if ((double) relallvisible >= curpages)
+		*allvisfrac = 1;
+	else
+		*allvisfrac = (double) relallvisible / curpages;
+}
+
+/* ------------------------------------------------------------------------
+ * Executor related callbacks for the zedstore AM
+ * ------------------------------------------------------------------------
+ */
+
+static bool
+zedstoream_scan_bitmap_next_block(TableScanDesc sscan,
+								  TBMIterateResult *tbmres)
+{
+	ZedStoreDesc scan = (ZedStoreDesc) sscan;
+	BlockNumber tid_blkno = tbmres->blockno;
+	int			ntuples;
+	int			first_ntuples = 0;
+	bool		firstcol;
+
+	/*
+	 * Our strategy for a bitmap scan is to scan the tree of each attribute,
+	 * starting at the give logical block number, and store all the datums
+	 * in the scan struct. zedstoream_scan_bitmap_next_tuple() then just
+	 * needs to store the datums of the next TID in the slot.
+	 *
+	 * An alternative would be to keep the scans of each attribute open,
+	 * like in a sequential scan. I'm not sure which is better.
+	 */
+	firstcol = true;
+	for (int i = 0; i < scan->num_proj_atts; i++)
+	{
+		int			natt = scan->proj_atts[i];
+		ZSBtreeScan	btree_scan;
+		Datum		datum;
+		bool        isnull;
+		zstid		tid;
+		Datum	   *datums = scan->bmscan_datums[natt];
+		bool	   *isnulls = scan->bmscan_isnulls[natt];
+		int			noff = 0;
+
+		zsbt_begin_scan(scan->rs_scan.rs_rd, natt + 1,
+						ZSTidFromBlkOff(tid_blkno, 1),
+						scan->rs_scan.rs_snapshot,
+						&btree_scan);
+
+		/*
+		 * TODO: it would be good to pass the next expected TID down to zsbt_scan_next,
+		 * so that it could skip over to it more efficiently.
+		 */
+		ntuples = 0;
+		while (zsbt_scan_next(&btree_scan, &datum, &isnull, &tid))
+		{
+			if (ZSTidGetBlockNumber(tid) != tid_blkno)
+			{
+				Assert(ZSTidGetBlockNumber(tid) > tid_blkno);
+				break;
+			}
+
+			if (tbmres->ntuples != -1)
+			{
+				while (ZSTidGetOffsetNumber(tid) > tbmres->offsets[noff] && noff < tbmres->ntuples)
+					noff++;
+
+				if (noff == tbmres->ntuples)
+					break;
+
+				if (ZSTidGetOffsetNumber(tid) < tbmres->offsets[noff])
+					continue;
+			}
+
+			datums[ntuples] = datum;
+			isnulls[ntuples] = isnull;
+			if (firstcol)
+				scan->bmscan_tids[ntuples] = tid;
+			else if (tid != scan->bmscan_tids[ntuples])
+				elog(ERROR, "scans on different attributes out of sync");
+
+			ntuples++;
+		}
+		if (firstcol)
+			first_ntuples = ntuples;
+		else if (ntuples != first_ntuples)
+			elog(ERROR, "scans on different attributes out of sync");
+
+		zsbt_end_scan(&btree_scan);
+
+		firstcol = false;
+	}
+
+	scan->bmscan_nexttuple = 0;
+	scan->bmscan_ntuples = first_ntuples;
+
+	return first_ntuples > 0;
+}
+
+static bool
+zedstoream_scan_bitmap_next_tuple(TableScanDesc sscan,
+								  TBMIterateResult *tbmres,
+								  TupleTableSlot *slot)
+{
+	ZedStoreDesc scan = (ZedStoreDesc) sscan;
+	zstid		tid;
+
+	if (scan->bmscan_nexttuple >= scan->bmscan_ntuples)
+		return false;
+
+	tid = scan->bmscan_tids[scan->bmscan_nexttuple];
+	for (int i = 0; i < scan->num_proj_atts; i++)
+	{
+		Form_pg_attribute att = &scan->rs_scan.rs_rd->rd_att->attrs[i];
+		int			natt = scan->proj_atts[i];
+		Datum		datum;
+		bool        isnull;
+
+		datum = (scan->bmscan_datums[i])[scan->bmscan_nexttuple];
+		isnull = (scan->bmscan_isnulls[i])[scan->bmscan_nexttuple];
+
+		/*
+		 * flatten any ZS-TOASTed values, becaue the rest of the system
+		 * doesn't know how to deal with them.
+		 */
+		if (!isnull && att->attlen == -1 &&
+			VARATT_IS_EXTERNAL(datum) && VARTAG_EXTERNAL(datum) == VARTAG_ZEDSTORE)
+		{
+			datum = zedstore_toast_flatten(scan->rs_scan.rs_rd, natt + 1, tid, datum);
+		}
+
+		slot->tts_values[natt] = datum;
+		slot->tts_isnull[natt] = isnull;
+	}
+	slot->tts_tid = ItemPointerFromZSTid(tid);
+	slot->tts_nvalid = slot->tts_tupleDescriptor->natts;
+	slot->tts_flags &= ~TTS_FLAG_EMPTY;
+
+	scan->bmscan_nexttuple++;
+
+	return true;
+}
+
+static bool
+zedstoream_scan_sample_next_block(TableScanDesc scan, SampleScanState *scanstate)
+{
+	ereport(ERROR,
+			(errcode(ERRCODE_INTERNAL_ERROR),
+			 errmsg("function %s not implemented yet", __func__)));
+	return false;
+}
+
+static bool
+zedstoream_scan_sample_next_tuple(TableScanDesc scan, SampleScanState *scanstate,
+								  TupleTableSlot *slot)
+{
+	ereport(ERROR,
+			(errcode(ERRCODE_INTERNAL_ERROR),
+			 errmsg("function %s not implemented yet", __func__)));
+	return false;
+}
+
+static void
+zedstoream_vacuum_rel(Relation onerel, VacuumParams *params,
+					  BufferAccessStrategy bstrategy)
+{
+	zsundo_vacuum(onerel, params, bstrategy,
+				  GetOldestXmin(onerel, PROCARRAY_FLAGS_VACUUM));
+}
+
+static const TableAmRoutine zedstoream_methods = {
+	.type = T_TableAmRoutine,
+	.scans_leverage_column_projection = true,
+
+	.slot_callbacks = zedstoream_slot_callbacks,
+
+	.scan_begin = zedstoream_beginscan,
+	.scan_begin_with_column_projection = zedstoream_beginscan_with_column_projection,
+	.scan_end = zedstoream_endscan,
+	.scan_rescan = heap_rescan,
+	.scan_getnextslot = zedstoream_getnextslot,
+
+	.parallelscan_estimate = zs_parallelscan_estimate,
+	.parallelscan_initialize = zs_parallelscan_initialize,
+	.parallelscan_reinitialize = zs_parallelscan_reinitialize,
+
+	.index_fetch_begin = zedstoream_begin_index_fetch,
+	.index_fetch_reset = zedstoream_reset_index_fetch,
+	.index_fetch_end = zedstoream_end_index_fetch,
+	.index_fetch_set_column_projection = zedstoream_fetch_set_column_projection,
+	.index_fetch_tuple = zedstoream_index_fetch_tuple,
+
+	.tuple_insert = zedstoream_insert,
+	.tuple_insert_speculative = zedstoream_insert_speculative,
+	.tuple_complete_speculative = zedstoream_complete_speculative,
+	.multi_insert = zedstoream_multi_insert,
+	.tuple_delete = zedstoream_delete,
+	.tuple_update = zedstoream_update,
+	.tuple_lock = zedstoream_lock_tuple,
+	.finish_bulk_insert = zedstoream_finish_bulk_insert,
+
+	.tuple_fetch_row_version = zedstoream_fetch_row_version,
+	.tuple_get_latest_tid = zedstoream_get_latest_tid,
+	.tuple_satisfies_snapshot = zedstoream_tuple_satisfies_snapshot,
+	.compute_xid_horizon_for_tuples = zedstoream_compute_xid_horizon_for_tuples,
+
+	.relation_set_new_filenode = zedstoream_relation_set_new_filenode,
+	.relation_nontransactional_truncate = zedstoream_relation_nontransactional_truncate,
+	.relation_copy_data = zedstoream_relation_copy_data,
+	.relation_copy_for_cluster = zedstoream_relation_copy_for_cluster,
+	.relation_vacuum = zedstoream_vacuum_rel,
+	.scan_analyze_next_block = zedstoream_scan_analyze_next_block,
+	.scan_analyze_next_tuple = zedstoream_scan_analyze_next_tuple,
+
+	.index_build_range_scan = zedstoream_index_build_range_scan,
+	.index_validate_scan = zedstoream_index_validate_scan,
+
+	.relation_estimate_size = zedstoream_relation_estimate_size,
+
+	.scan_bitmap_next_block = zedstoream_scan_bitmap_next_block,
+	.scan_bitmap_next_tuple = zedstoream_scan_bitmap_next_tuple,
+	.scan_sample_next_block = zedstoream_scan_sample_next_block,
+	.scan_sample_next_tuple = zedstoream_scan_sample_next_tuple
+};
+
+Datum
+zedstore_tableam_handler(PG_FUNCTION_ARGS)
+{
+	PG_RETURN_POINTER(&zedstoream_methods);
+}
+
+
+/*
+ * Routines for dividing up the TID range for parallel seq scans
+ */
+
+typedef struct ParallelZSScanDescData
+{
+	ParallelTableScanDescData base;
+
+	zstid		pzs_endtid;		/* last tid + 1 in relation at start of scan */
+	pg_atomic_uint64 pzs_allocatedtid_blk;	/* TID space allocated to workers so far. (in  65536 increments) */
+} ParallelZSScanDescData;
+typedef struct ParallelZSScanDescData *ParallelZSScanDesc;
+
+static Size
+zs_parallelscan_estimate(Relation rel)
+{
+	return sizeof(ParallelZSScanDescData);
+}
+
+static Size
+zs_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan)
+{
+	ParallelZSScanDesc zpscan = (ParallelZSScanDesc) pscan;
+
+	zpscan->base.phs_relid = RelationGetRelid(rel);
+	/* FIXME: if attribute 1 is dropped, should use another attribute */
+	zpscan->pzs_endtid = zsbt_get_last_tid(rel, 1);
+	pg_atomic_init_u64(&zpscan->pzs_allocatedtid_blk, 0);
+
+	return sizeof(ParallelZSScanDescData);
+}
+
+static void
+zs_parallelscan_reinitialize(Relation rel, ParallelTableScanDesc pscan)
+{
+	ParallelZSScanDesc bpscan = (ParallelZSScanDesc) pscan;
+
+	pg_atomic_write_u64(&bpscan->pzs_allocatedtid_blk, 0);
+}
+
+/*
+ * get the next TID range to scan
+ *
+ * Returns true if there is more to scan, false otherwise.
+ *
+ * Get the next TID range to scan.  Even if there are no TIDs left to scan,
+ * another backend could have grabbed a range to scan and not yet finished
+ * looking at it, so it doesn't follow that the scan is done when the first
+ * backend gets 'false' return.
+ */
+static bool
+zs_parallelscan_nextrange(Relation rel, ParallelZSScanDesc pzscan,
+						  zstid *start, zstid *end)
+{
+	uint64		allocatedtid_blk;
+
+	/*
+	 * zhs_allocatedtid tracks how much has been allocated to workers
+	 * already.  When phs_allocatedtid >= rs_lasttid, all TIDs have been
+	 * allocated.
+	 *
+	 * Because we use an atomic fetch-and-add to fetch the current value, the
+	 * phs_allocatedtid counter will exceed rs_lasttid, because workers will
+	 * still increment the value, when they try to allocate the next block but
+	 * all blocks have been allocated already. The counter must be 64 bits
+	 * wide because of that, to avoid wrapping around when rs_lasttid is close
+	 * to 2^32.  That's also one reason we do this at granularity of 2^16 TIDs,
+	 * even though zedstore isn't block-oriented.
+	 *
+	 * TODO: we divide the TID space into chunks of 2^16 TIDs each. That's
+	 * pretty inefficient, there's a fair amount of overhead in re-starting
+	 * the B-tree scans between each range. We probably should use much larger
+	 * ranges. But this is good for testing.
+	 */
+	allocatedtid_blk = pg_atomic_fetch_add_u64(&pzscan->pzs_allocatedtid_blk, 1);
+	*start = ZSTidFromBlkOff(allocatedtid_blk, 1);
+	*end = ZSTidFromBlkOff(allocatedtid_blk + 1, 1);
+
+	return *start < pzscan->pzs_endtid;
+}
diff --git a/src/backend/executor/execScan.c b/src/backend/executor/execScan.c
index 881131aff2..cb212e49ce 100644
--- a/src/backend/executor/execScan.c
+++ b/src/backend/executor/execScan.c
@@ -20,6 +20,7 @@
 
 #include "executor/executor.h"
 #include "miscadmin.h"
+#include "nodes/nodeFuncs.h"
 #include "utils/memutils.h"
 
 
@@ -300,3 +301,77 @@ ExecScanReScan(ScanState *node)
 		}
 	}
 }
+
+typedef struct neededColumnContext
+{
+	bool *mask;
+	int n;
+} neededColumnContext;
+
+static bool
+neededColumnContextWalker(Node *node, neededColumnContext *c)
+{
+	if (node == NULL)
+		return false;
+
+	if (IsA(node, Var))
+	{
+		Var *var = (Var *)node;
+
+		if (var->varattno > 0)
+		{
+			Assert(var->varattno <= c->n);
+			c->mask[var->varattno - 1] = true;
+		}
+		/*
+		 * If all attributes are included,
+		 * set all entries in mask to true.
+		 */
+		else if (var->varattno == 0)
+			memset(c->mask, true, c->n);
+
+		return false;
+	}
+	return expression_tree_walker(node, neededColumnContextWalker, (void * )c);
+}
+
+/*
+ * n specifies the number of allowed entries in mask: we use
+ * it for bounds-checking in the walker above.
+ */
+void
+GetNeededColumnsForNode(Node *expr, bool *mask, int n)
+{
+	neededColumnContext c;
+
+	c.mask = mask;
+	c.n = n;
+
+	neededColumnContextWalker(expr, &c);
+}
+
+bool *
+GetNeededColumnsForScan(ScanState *scanstate, int ncol)
+{
+	bool *proj;
+	int i;
+
+	proj = palloc0(ncol * sizeof(bool));
+	GetNeededColumnsForNode((Node*) scanstate->ps.plan->targetlist, proj, ncol);
+	GetNeededColumnsForNode((Node*) scanstate->ps.plan->qual, proj, ncol);
+
+	for (i = 0; i < ncol; i++)
+	{
+		if (proj[i])
+			break;
+	}
+
+	/*
+	 * In some cases (for example, count(*)), no columns are specified.
+	 * We always scan the first column.
+	 */
+	if (i == ncol)
+		proj[0] = true;
+
+	return proj;
+}
diff --git a/src/backend/executor/nodeIndexonlyscan.c b/src/backend/executor/nodeIndexonlyscan.c
index 7711728495..5833d683b3 100644
--- a/src/backend/executor/nodeIndexonlyscan.c
+++ b/src/backend/executor/nodeIndexonlyscan.c
@@ -166,10 +166,10 @@ IndexOnlyNext(IndexOnlyScanState *node)
 			 * Rats, we have to visit the heap to check visibility.
 			 */
 			InstrCountTuples2(node, 1);
-			if (!index_fetch_heap(scandesc, slot))
+			if (!index_fetch_heap(scandesc, node->ioss_TableSlot))
 				continue;		/* no visible tuple, try next index entry */
 
-			ExecClearTuple(slot);
+			ExecClearTuple(node->ioss_TableSlot);
 
 			/*
 			 * Only MVCC snapshots are supported here, so there should be no
@@ -528,7 +528,17 @@ ExecInitIndexOnlyScan(IndexOnlyScan *node, EState *estate, int eflags)
 	 */
 	tupDesc = ExecTypeFromTL(node->indextlist);
 	ExecInitScanTupleSlot(estate, &indexstate->ss, tupDesc,
-						  table_slot_callbacks(currentRelation));
+						  &TTSOpsVirtual);
+
+	/*
+	 * We need another slot, in a format that's suitable for the table AM,
+	 * for when we need to fetch a tuple from the table for rechecking
+	 * visibility.
+	 */
+	indexstate->ioss_TableSlot =
+		ExecAllocTableSlot(&estate->es_tupleTable,
+						   RelationGetDescr(currentRelation),
+						   table_slot_callbacks(currentRelation));
 
 	/*
 	 * Initialize result type and projection info.  The node's targetlist will
diff --git a/src/backend/executor/nodeIndexscan.c b/src/backend/executor/nodeIndexscan.c
index 399ac0109c..82903bc708 100644
--- a/src/backend/executor/nodeIndexscan.c
+++ b/src/backend/executor/nodeIndexscan.c
@@ -115,6 +115,13 @@ IndexNext(IndexScanState *node)
 								   node->iss_NumScanKeys,
 								   node->iss_NumOrderByKeys);
 
+		if (table_scans_leverage_column_projection(node->ss.ss_currentRelation))
+		{
+			bool *proj;
+			proj = GetNeededColumnsForScan(&node->ss, node->ss.ss_currentRelation->rd_att->natts);
+			table_index_fetch_set_column_projection(scandesc->xs_heapfetch, proj);
+		}
+
 		node->iss_ScanDesc = scandesc;
 
 		/*
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index 8bd7430a91..7f5a2c107c 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -31,6 +31,7 @@
 #include "access/tableam.h"
 #include "executor/execdebug.h"
 #include "executor/nodeSeqscan.h"
+#include "nodes/nodeFuncs.h"
 #include "utils/rel.h"
 
 static TupleTableSlot *SeqNext(SeqScanState *node);
@@ -68,9 +69,20 @@ SeqNext(SeqScanState *node)
 		 * We reach here if the scan is not parallel, or if we're serially
 		 * executing a scan that was planned to be parallel.
 		 */
-		scandesc = table_beginscan(node->ss.ss_currentRelation,
-								   estate->es_snapshot,
-								   0, NULL);
+		if (table_scans_leverage_column_projection(node->ss.ss_currentRelation))
+		{
+			bool *proj;
+			proj = GetNeededColumnsForScan(&node->ss, node->ss.ss_currentRelation->rd_att->natts);
+			scandesc = table_beginscan_with_column_projection(node->ss.ss_currentRelation,
+															  estate->es_snapshot,
+															  0, NULL, proj);
+		}
+		else
+		{
+			scandesc = table_beginscan(node->ss.ss_currentRelation,
+									   estate->es_snapshot,
+									   0, NULL);
+		}
 		node->ss.ss_currentScanDesc = scandesc;
 	}
 
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index a647e7db32..2929165e2a 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -138,6 +138,7 @@ typedef struct TableAmRoutine
 {
 	/* this must be set to T_TableAmRoutine */
 	NodeTag		type;
+	bool scans_leverage_column_projection;
 
 
 	/* ------------------------------------------------------------------------
@@ -187,6 +188,18 @@ typedef struct TableAmRoutine
 								 bool is_samplescan,
 								 bool temp_snap);
 
+	TableScanDesc (*scan_begin_with_column_projection)(Relation relation,
+													   Snapshot snapshot,
+													   int nkeys, struct ScanKeyData *key,
+													   ParallelTableScanDesc parallel_scan,
+													   bool *project_column,
+													   bool allow_strat,
+													   bool allow_sync,
+													   bool allow_pagemode,
+													   bool is_bitmapscan,
+													   bool is_samplescan,
+													   bool temp_snap);
+
 	/*
 	 * Release resources and deallocate scan. If TableScanDesc.temp_snap,
 	 * TableScanDesc.rs_snapshot needs to be unregistered.
@@ -262,6 +275,13 @@ typedef struct TableAmRoutine
 	 */
 	void		(*index_fetch_end) (struct IndexFetchTableData *data);
 
+	/*
+	 * Set column projections for AM which leverage column projections for
+	 * scanning.
+	 */
+	void (*index_fetch_set_column_projection) (struct IndexFetchTableData *data,
+											   bool *project_column);
+
 	/*
 	 * Fetch tuple at `tid` into `slot`, after doing a visibility test
 	 * according to `snapshot`. If a tuple was found and passed the visibility
@@ -683,6 +703,12 @@ table_beginscan(Relation rel, Snapshot snapshot,
 									   true, true, true, false, false, false);
 }
 
+static inline bool
+table_scans_leverage_column_projection(Relation relation)
+{
+	return relation->rd_tableam->scans_leverage_column_projection;
+}
+
 /*
  * Like table_beginscan(), but for scanning catalog. It'll automatically use a
  * snapshot appropriate for scanning catalog relations.
@@ -707,6 +733,17 @@ table_beginscan_strat(Relation rel, Snapshot snapshot,
 									   false, false, false);
 }
 
+static inline TableScanDesc
+table_beginscan_with_column_projection(Relation relation, Snapshot snapshot,
+									   int nkeys, struct ScanKeyData *key,
+									   bool *project_column)
+{
+	Assert(relation->rd_tableam->scans_leverage_column_projection);
+
+	return relation->rd_tableam->scan_begin_with_column_projection(
+		relation, snapshot, nkeys, key, NULL, project_column,
+		true, true, true, false, false, false);
+}
 
 /*
  * table_beginscan_bm is an alternative entry point for setting up a
@@ -886,6 +923,13 @@ table_index_fetch_end(struct IndexFetchTableData *scan)
 	scan->rel->rd_tableam->index_fetch_end(scan);
 }
 
+static inline void
+table_index_fetch_set_column_projection(struct IndexFetchTableData *scan,
+										 bool *project_column)
+{
+	scan->rel->rd_tableam->index_fetch_set_column_projection(scan, project_column);
+}
+
 /*
  * Fetches, as part of an index scan, tuple at `tid` into `slot`, after doing
  * a visibility test according to `snapshot`. If a tuple was found and passed
diff --git a/src/include/access/zedstore_compression.h b/src/include/access/zedstore_compression.h
new file mode 100644
index 0000000000..9958c02eac
--- /dev/null
+++ b/src/include/access/zedstore_compression.h
@@ -0,0 +1,51 @@
+/*
+ * zedstore_compression.h
+ *		internal declarations for ZedStore compression
+ *
+ * Copyright (c) 2019, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		src/include/access/zedstore_compression.h
+ */
+#ifndef ZEDSTORE_COMPRESSION_H
+#define ZEDSTORE_COMPRESSION_H
+
+#include "storage/itemptr.h"
+
+typedef struct ZSDecompressContext
+{
+	char	   *buffer;
+	int			bufsize;		/* allocated size of 'buffer' */
+	int			uncompressedsize;
+	int			bytesread;
+} ZSDecompressContext;
+
+typedef struct ZSCompressContext
+{
+	char	   *uncompressedbuffer;
+
+	int			maxCompressedSize;
+	int			maxUncompressedSize;
+	char	   *buffer;
+	int			nitems;
+	int			rawsize;
+} ZSCompressContext;
+
+typedef struct ZSBtreeItem ZSBtreeItem;
+typedef struct ZSCompressedBtreeItem ZSCompressedBtreeItem;
+typedef struct ZSUncompressedBtreeItem ZSUncompressedBtreeItem;
+
+/* compression functions */
+extern void zs_compress_init(ZSCompressContext *context);
+extern void zs_compress_begin(ZSCompressContext *context, int maxCompressedSize);
+extern bool zs_compress_add(ZSCompressContext *context, ZSUncompressedBtreeItem *item);
+extern ZSCompressedBtreeItem *zs_compress_finish(ZSCompressContext *context);
+extern void zs_compress_free(ZSCompressContext *context);
+
+/* decompression functions */
+extern void zs_decompress_init(ZSDecompressContext *context);
+extern void zs_decompress_chunk(ZSDecompressContext *context, ZSCompressedBtreeItem *chunk);
+extern ZSUncompressedBtreeItem *zs_decompress_read_item(ZSDecompressContext *context);
+extern void zs_decompress_free(ZSDecompressContext *context);
+
+#endif							/* ZEDSTORE_COMPRESSION_H */
diff --git a/src/include/access/zedstore_internal.h b/src/include/access/zedstore_internal.h
new file mode 100644
index 0000000000..ab09b4a09b
--- /dev/null
+++ b/src/include/access/zedstore_internal.h
@@ -0,0 +1,409 @@
+/*
+ * zedstore_internal.h
+ *		internal declarations for ZedStore tables
+ *
+ * Copyright (c) 2019, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		src/include/access/zedstore_internal.h
+ */
+#ifndef ZEDSTORE_INTERNAL_H
+#define ZEDSTORE_INTERNAL_H
+
+#include "access/tableam.h"
+#include "access/zedstore_compression.h"
+#include "access/zedstore_undo.h"
+#include "storage/bufmgr.h"
+#include "utils/datum.h"
+
+/*
+ * Throughout ZedStore, we pass around TIDs as uint64's, rather than ItemPointers,
+ * for speed.
+ */
+typedef uint64	zstid;
+
+#define InvalidZSTid		0
+#define MinZSTid			1		/* blk 0, off 1 */
+#define MaxZSTid			((uint64) MaxBlockNumber << 16 | 0xffff)
+/* note: if this is converted to ItemPointer, it is invalid */
+#define MaxPlusOneZSTid		(MaxZSTid + 1)
+
+static inline zstid
+ZSTidFromItemPointer(ItemPointerData iptr)
+{
+	Assert(ItemPointerIsValid(&iptr));
+	return ((uint64) iptr.ip_blkid.bi_hi << 32 |
+			(uint64) iptr.ip_blkid.bi_lo << 16 |
+			(uint64) iptr.ip_posid);
+}
+
+static inline zstid
+ZSTidFromBlkOff(BlockNumber blk, OffsetNumber off)
+{
+	Assert(off != 0);
+	return ((uint64) blk << 16 | off);
+}
+
+static inline ItemPointerData
+ItemPointerFromZSTid(zstid tid)
+{
+	ItemPointerData iptr;
+
+	iptr.ip_blkid.bi_hi = (tid >> 32) & 0xffff;
+	iptr.ip_blkid.bi_lo = (tid >> 16) & 0xffff;
+	iptr.ip_posid = (tid) & 0xffff;
+	Assert(ItemPointerIsValid(&iptr));
+	return iptr;
+}
+
+static inline BlockNumber
+ZSTidGetBlockNumber(zstid tid)
+{
+	return (BlockNumber) (tid >> 16);
+}
+
+static inline OffsetNumber
+ZSTidGetOffsetNumber(zstid tid)
+{
+	return (OffsetNumber) tid;
+}
+
+/*
+ * Helper function to "increment" a TID by one.
+ *
+ * Skips over values that would be invalid ItemPointers.
+ */
+static inline zstid
+ZSTidIncrement(zstid tid)
+{
+	tid++;
+	if ((tid & 0xffff) == 0)
+		tid++;
+	return tid;
+}
+
+static inline zstid
+ZSTidIncrementForInsert(zstid tid)
+{
+	tid++;
+	if (ZSTidGetOffsetNumber(tid) >= MaxHeapTuplesPerPage)
+		tid = ZSTidFromBlkOff(ZSTidGetBlockNumber(tid) + 1, 1);
+	return tid;
+}
+
+/*
+ * A ZedStore table contains different kinds of pages, all in the same file.
+ *
+ * Block 0 is always a metapage. It contains the block numbers of the other
+ * data structures stored within the file, like the per-attribute B-trees,
+ * and the UNDO log. In addition, if there are overly large datums in the
+ * the table, they are chopped into separate "toast" pages.
+ */
+#define	ZS_META_PAGE_ID		0xF083
+#define	ZS_BTREE_PAGE_ID	0xF084
+#define	ZS_UNDO_PAGE_ID		0xF085
+#define	ZS_TOAST_PAGE_ID	0xF086
+
+/* like nbtree/gist FOLLOW_RIGHT flag, used to detect concurrent page splits */
+#define ZS_FOLLOW_RIGHT		0x0002
+
+typedef struct ZSBtreePageOpaque
+{
+	AttrNumber	zs_attno;
+	BlockNumber zs_next;
+	zstid		zs_lokey;		/* inclusive */
+	zstid		zs_hikey;		/* exclusive */
+	uint16		zs_level;			/* 0 = leaf */
+	uint16		zs_flags;
+	uint16		padding;			/* padding, to put zs_page_id last */
+	uint16		zs_page_id;			/* always ZS_BTREE_PAGE_ID */
+} ZSBtreePageOpaque;
+
+#define ZSBtreePageGetOpaque(page) ((ZSBtreePageOpaque *) PageGetSpecialPointer(page))
+
+/*
+ * Internal B-tree page layout.
+ *
+ * The "contents" of the page is an array of ZSBtreeInternalPageItem. The number
+ * of items can be deduced from pd_lower.
+ */
+typedef struct ZSBtreeInternalPageItem
+{
+	zstid		tid;
+	BlockNumber childblk;
+} ZSBtreeInternalPageItem;
+
+static inline ZSBtreeInternalPageItem *
+ZSBtreeInternalPageGetItems(Page page)
+{
+	ZSBtreeInternalPageItem *items;
+
+	items = (ZSBtreeInternalPageItem *) PageGetContents(page);
+
+	return items;
+}
+static inline int
+ZSBtreeInternalPageGetNumItems(Page page)
+{
+	ZSBtreeInternalPageItem *begin;
+	ZSBtreeInternalPageItem *end;
+
+	begin = (ZSBtreeInternalPageItem *) PageGetContents(page);
+	end = (ZSBtreeInternalPageItem *) ((char *) page + ((PageHeader) page)->pd_lower);
+
+	return end - begin;
+}
+
+static inline bool
+ZSBtreeInternalPageIsFull(Page page)
+{
+	PageHeader phdr = (PageHeader) page;
+
+	return phdr->pd_upper - phdr->pd_lower < sizeof(ZSBtreeInternalPageItem);
+}
+
+/*
+ * Leaf B-tree page layout
+ *
+ * Leaf pages are packed with ZSBtreeItems. There are three kinds of items:
+ *
+ * 1. plain item, holds one tuple (or rather, one datum).
+ *
+ * 2. A "container item", which holds multiple plain items, compressed.
+ *
+ * 3. A "dead item". A dead item prevents the TID from being reused. It's
+ *    used during VACUUM, to mark items for which there are no index pointers
+ *    anymore. But it cannot be removed until the undo record has been
+ *    trimmed away, because if the TID was reused for a new record, vacuum
+ *    might remove the new tuple version instead. After t_undo_ptr becomes
+ *    older than "oldest undo ptr", the item can be removed and the TID
+ *    recycled.
+ *
+ * TODO: squeeze harder: eliminate padding, use high bits of t_tid for flags or size
+ */
+typedef struct ZSBtreeItem
+{
+	zstid		t_tid;
+	uint16		t_size;
+	uint16		t_flags;
+} ZSBtreeItem;
+
+typedef struct ZSUncompressedBtreeItem
+{
+	/* these fields must match ZSBtreeItem */
+	zstid		t_tid;
+	uint16		t_size;
+	uint16		t_flags;
+
+	ZSUndoRecPtr t_undo_ptr;
+
+	char		t_payload[FLEXIBLE_ARRAY_MEMBER];
+} ZSUncompressedBtreeItem;
+
+typedef struct ZSCompressedBtreeItem
+{
+	/* these fields must match ZSBtreeItem */
+	zstid		t_tid;
+	uint16		t_size;
+	uint16		t_flags;
+
+	uint16		t_uncompressedsize;
+	zstid		t_lasttid;	/* inclusive */
+
+	char		t_payload[FLEXIBLE_ARRAY_MEMBER];
+} ZSCompressedBtreeItem;
+
+
+#define ZSBT_COMPRESSED		0x0001
+#define ZSBT_DELETED		0x0002
+#define ZSBT_UPDATED		0x0004
+#define ZSBT_NULL			0x0008
+#define ZSBT_DEAD			0x0010
+
+/*
+ * Toast page layout.
+ *
+ * When an overly large datum is stored, it is divided into chunks, and each
+ * chunk is stored on a dedicated toast page. The toast pages of a datum form
+ * list, each page has a next/prev pointer.
+ */
+/*
+ * Maximum size of an individual untoasted Datum stored in ZedStore. Datums
+ * larger than this need to be toasted.
+ *
+ * A datum needs to fit on a B-tree page, with page and item headers.
+ *
+ * XXX: 500 accounts for all the headers. Need to compute this correctly...
+ */
+#define		MaxZedStoreDatumSize		(BLCKSZ - 500)
+
+typedef struct ZSToastPageOpaque
+{
+	AttrNumber	zs_attno;
+
+	/* these are only set on the first page. */
+	zstid		zs_tid;
+	uint32		zs_total_size;
+
+	uint32		zs_slice_offset;
+	BlockNumber	zs_prev;
+	BlockNumber	zs_next;
+	uint16		zs_flags;
+	uint16		padding1;			/* padding, to put zs_page_id last */
+	uint16		padding2;			/* padding, to put zs_page_id last */
+	uint16		zs_page_id;
+} ZSToastPageOpaque;
+
+/*
+ * "Toast pointer" of a datum that's stored in zedstore toast pages.
+ *
+ * This looks somewhat like a normal TOAST pointer, but we mustn't let these
+ * escape out of zedstore code, because the rest of the system doesn't know
+ * how to deal with them.
+ *
+ * This must look like varattrib_1b_e!
+ */
+typedef struct varatt_zs_toastptr
+{
+	/* varattrib_1b_e */
+	uint8		va_header;
+	uint8		va_tag;			/* VARTAG_ZEDSTORE in zedstore toast datums */
+
+	/* first block */
+	BlockNumber	zst_block;
+} varatt_zs_toastptr;
+
+/*
+ * va_tag value. this should be distinguishable from the values in
+ * vartag_external
+ */
+#define		VARTAG_ZEDSTORE		10
+
+/*
+ * Versions of datumGetSize and datumCopy that know about ZedStore-toasted
+ * datums.
+ */
+static inline Size
+zs_datumGetSize(Datum value, bool typByVal, int typLen)
+{
+	if (typLen < 0 && VARATT_IS_EXTERNAL(value) && VARTAG_EXTERNAL(value) == VARTAG_ZEDSTORE)
+		return sizeof(varatt_zs_toastptr);
+	return datumGetSize(value, typByVal, typLen);
+}
+
+static inline Datum
+zs_datumCopy(Datum value, bool typByVal, int typLen)
+{
+	if (typLen < 0 && VARATT_IS_EXTERNAL(value) && VARTAG_EXTERNAL(value) == VARTAG_ZEDSTORE)
+	{
+		char	   *result = palloc(sizeof(varatt_zs_toastptr));
+
+		memcpy(result, DatumGetPointer(value), sizeof(varatt_zs_toastptr));
+
+		return PointerGetDatum(result);
+	}
+	else
+		return datumCopy(value, typByVal, typLen);
+}
+
+/*
+ * Block 0 on every ZedStore table is a metapage.
+ *
+ * It contains a directory of b-tree roots for each attribute, and lots more.
+ */
+#define ZS_META_BLK		0
+
+typedef struct ZSMetaPage
+{
+	int			nattributes;
+	BlockNumber	roots[FLEXIBLE_ARRAY_MEMBER];	/* one for each attribute */
+} ZSMetaPage;
+
+/*
+ * it's not clear what we should store in the "opaque" special area, and what
+ * as page contents, on a metapage. But have at least the page_id field here,
+ * so that tools like pg_filedump can recognize it as a zedstore metapage.
+ */
+typedef struct ZSMetaPageOpaque
+{
+	uint64		zs_undo_counter;
+	BlockNumber	zs_undo_head;
+	BlockNumber	zs_undo_tail;
+	ZSUndoRecPtr zs_undo_oldestptr;
+
+	uint16		zs_flags;
+	uint16		padding1;			/* padding, to put zs_page_id last */
+	uint16		padding2;			/* padding, to put zs_page_id last */
+	uint16		zs_page_id;
+} ZSMetaPageOpaque;
+
+
+/*
+ * Holds the state of an in-progress scan on a zedstore btree.
+ */
+typedef struct ZSBtreeScan
+{
+	Relation	rel;
+	AttrNumber	attno;
+
+	bool		for_update;
+
+	bool		active;
+	Buffer		lastbuf;
+	bool		lastbuf_is_locked;
+	OffsetNumber lastoff;
+	zstid		nexttid;
+	Snapshot	snapshot;
+
+	/* in the "real" UNDO-log, this would probably be a global variable */
+	ZSUndoRecPtr recent_oldest_undo;
+
+	/*
+	 * if we have remaining items from a compressed "container" tuple, they
+	 * are kept in the decompressor context, and 'has_decompressed' is true.
+	 */
+	ZSDecompressContext decompressor;
+	bool		has_decompressed;
+} ZSBtreeScan;
+
+/* prototypes for functions in zstore_btree.c */
+extern ZSUncompressedBtreeItem *zsbt_create_item(Form_pg_attribute attr, zstid tid,
+												 Datum datum, bool isnull);
+extern zstid zsbt_insert(Relation rel, AttrNumber attno, Datum datum,
+						 bool isnull, TransactionId xmin, CommandId cmin,
+						 zstid tid, ZSUndoRecPtr *undorecptr);
+extern void zsbt_insert_multi_items(Relation rel, AttrNumber attno, List *newitems,
+									TransactionId xid, CommandId cid,
+									ZSUndoRecPtr *undorecptr, zstid *tid);
+extern TM_Result zsbt_delete(Relation rel, AttrNumber attno, zstid tid,
+							 TransactionId xid, CommandId cid,
+							 Snapshot snapshot, Snapshot crosscheck, bool wait,
+							 TM_FailureData *hufd, bool changingPart);
+extern TM_Result zsbt_update(Relation rel, AttrNumber attno, zstid otid,
+							 Datum newdatum, bool newisnull, TransactionId xid,
+							 CommandId cid, Snapshot snapshot, Snapshot crosscheck,
+							 bool wait, TM_FailureData *hufd, zstid *newtid_p);
+extern void zsbt_mark_item_dead(Relation rel, AttrNumber attno, zstid tid, ZSUndoRecPtr);
+extern void zsbt_begin_scan(Relation rel, AttrNumber attno, zstid starttid, Snapshot snapshot, ZSBtreeScan *scan);
+extern bool zsbt_scan_next(ZSBtreeScan *scan, Datum *datum, bool *isnull, zstid *tid);
+extern void zsbt_end_scan(ZSBtreeScan *scan);
+extern zstid zsbt_get_last_tid(Relation rel, AttrNumber attno);
+
+
+
+
+/* prototypes for functions in zstore_meta.c */
+extern Buffer zs_getnewbuf(Relation rel);
+extern BlockNumber zsmeta_get_root_for_attribute(Relation rel, AttrNumber attno, bool for_update);
+extern void zsmeta_update_root_for_attribute(Relation rel, AttrNumber attno, Buffer metabuf, BlockNumber rootblk);
+
+/* prototypes for functions in zstore_visibility.c */
+extern TM_Result zs_SatisfiesUpdate(ZSBtreeScan *scan, ZSUncompressedBtreeItem *item);
+extern bool zs_SatisfiesVisibility(ZSBtreeScan *scan, ZSUncompressedBtreeItem *item);
+
+/* prototypes for functions in zstore_toast.c */
+extern Datum zedstore_toast_datum(Relation rel, AttrNumber attno, Datum value);
+extern void zedstore_toast_finish(Relation rel, AttrNumber attno, Datum toasted, zstid tid);
+extern Datum zedstore_toast_flatten(Relation rel, AttrNumber attno, zstid tid, Datum toasted);
+
+#endif							/* ZEDSTORE_INTERNAL_H */
diff --git a/src/include/access/zedstore_undo.h b/src/include/access/zedstore_undo.h
new file mode 100644
index 0000000000..ea41f3cfd1
--- /dev/null
+++ b/src/include/access/zedstore_undo.h
@@ -0,0 +1,133 @@
+/*
+ * zedstore_undo.h
+ *		internal declarations for ZedStore undo logging
+ *
+ * Copyright (c) 2019, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		src/include/access/zedstore_undo.h
+ */
+#ifndef ZEDSTORE_UNDO_H
+#define ZEDSTORE_UNDO_H
+
+#include "commands/vacuum.h"
+#include "utils/relcache.h"
+
+/* this must match the definition in zedstore_internal.h */
+typedef uint64	zstid;
+
+/*
+ * An UNDO-pointer.
+ *
+ * In the "real" UNDO-logging work from EDB, an UndoRecPtr is only 64 bits.
+ * But we make life easier for us, by encoding more information in it.
+ *
+ * 'counter' is a number that's incremented every time a new undo record is
+ * created. It can be used to determine if an undo pointer is too old to be
+ * of interest to anyone.
+ *
+ * 'blkno' and 'offset' are the physical location of the UNDO record. They
+ * can be used to easily fetch a given record.
+ */
+typedef struct
+{
+	uint64		counter;
+	BlockNumber blkno;
+	int32		offset;
+} ZSUndoRecPtr;
+
+/* TODO: assert that blkno and offset match, too, if counter matches */
+#define ZSUndoRecPtrEquals(a, b) ((a).counter == (b).counter)
+
+typedef struct
+{
+	int16		size;			/* size of this record, including header */
+	uint8		type;			/* ZSUNDO_TYPE_* */
+	AttrNumber	attno;
+	ZSUndoRecPtr undorecptr;
+	TransactionId xid;
+	CommandId	cid;
+	zstid		tid;
+} ZSUndoRec;
+
+#define ZSUNDO_TYPE_INSERT		1
+#define ZSUNDO_TYPE_DELETE		2
+#define ZSUNDO_TYPE_UPDATE		3
+
+/*
+ * Type-specific record formats.
+ *
+ * We store similar info as zheap for INSERT/UPDATE/DELETE. See zheap README.
+ */
+typedef struct
+{
+	ZSUndoRec	rec;
+	zstid       endtid; /* inclusive */
+} ZSUndoRec_Insert;
+
+typedef struct
+{
+	ZSUndoRec	rec;
+
+	/*
+	 * UNDO-record of the inserter. This is needed if a row is inserted, and
+	 * deleted, and there are some snapshots active don't don't consider even
+	 * the insertion as visible.
+	 */
+	ZSUndoRecPtr prevundorec;
+
+	/*
+	 * TODO: It might be good to move the deleted tuple to the undo-log, so
+	 * that the space can immediately be reused. But currently, we don't do
+	 * that. (or even better, move the old tuple to the undo-log lazily, if
+	 * the space is needed for a new insertion, before the old tuple becomes
+	 * recyclable.
+	 */
+} ZSUndoRec_Delete;
+
+typedef struct
+{
+	ZSUndoRec	rec;
+
+	/* Like in ZSUndoRec_Delete. */
+	ZSUndoRecPtr prevundorec;
+
+	/* old version of the datum */
+	/* TODO: currently, we only do "cold" updates, so the old tuple is
+	 * left in the old place. Once we start supporting in-place updates,
+	 * the old tuple should be stored here.
+	 */
+	zstid		otid;
+
+} ZSUndoRec_Update;
+
+typedef struct
+{
+	BlockNumber	next;
+	uint16		padding;			/* padding, to put zs_page_id last */
+	uint16		zs_page_id; /* ZS_UNDO_PAGE_ID */
+} ZSUndoPageOpaque;
+
+static inline void
+ZSUndoRecPtrInitialize(ZSUndoRecPtr *uptr)
+{
+	uptr->blkno = InvalidBlockNumber;
+	uptr->offset = InvalidOffsetNumber;
+	uptr->counter = 0;
+}
+
+static inline bool
+IsZSUndoRecPtrValid(ZSUndoRecPtr *uptr)
+{
+	return (uptr->blkno != InvalidBlockNumber &&
+			uptr->offset != InvalidOffsetNumber);
+}
+
+/* prototypes for functions in zstore_undo.c */
+extern ZSUndoRecPtr zsundo_insert(Relation rel, ZSUndoRec *rec);
+extern ZSUndoRec *zsundo_fetch(Relation rel, ZSUndoRecPtr undorecptr);
+extern void zsundo_vacuum(Relation rel, VacuumParams *params, BufferAccessStrategy bstrategy,
+			  TransactionId OldestXmin);
+extern ZSUndoRecPtr zsundo_get_oldest_undo_ptr(Relation rel);
+
+#endif							/* ZEDSTORE_UNDO_H */
diff --git a/src/include/catalog/pg_am.dat b/src/include/catalog/pg_am.dat
index 393b41dd68..f370f63460 100644
--- a/src/include/catalog/pg_am.dat
+++ b/src/include/catalog/pg_am.dat
@@ -33,5 +33,8 @@
 { oid => '3580', oid_symbol => 'BRIN_AM_OID',
   descr => 'block range index (BRIN) access method',
   amname => 'brin', amhandler => 'brinhandler', amtype => 'i' },
+{ oid => '6668', oid_symbol => 'ZEDSTORE_TABLE_AM_OID',
+  descr => 'zedstore table access method',
+  amname => 'zedstore', amhandler => 'zedstore_tableam_handler', amtype => 't' },
 
 ]
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index ad4519e001..e08d62754d 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -873,6 +873,11 @@
   proname => 'heap_tableam_handler', provolatile => 'v',
   prorettype => 'table_am_handler', proargtypes => 'internal',
   prosrc => 'heap_tableam_handler' },
+{ oid => '6669', oid_symbol => 'ZEDSTORE_TABLE_AM_HANDLER_OID',
+  descr => 'column-oriented table access method handler',
+  proname => 'zedstore_tableam_handler', provolatile => 'v',
+  prorettype => 'table_am_handler', proargtypes => 'internal',
+  prosrc => 'zedstore_tableam_handler' },
 
 # Index access method handlers
 { oid => '330', descr => 'btree index access method handler',
@@ -10672,4 +10677,23 @@
   proname => 'pg_partition_root', prorettype => 'regclass',
   proargtypes => 'regclass', prosrc => 'pg_partition_root' },
 
+# zedstore inspection functions
+{ oid => '7000', descr => 'get zedstore page type',
+  proname => 'pg_zs_page_type', prorettype => 'text',
+  proargtypes => 'regclass int8', prosrc => 'pg_zs_page_type' },
+{ oid => '7001', descr => 'show stats about active zedstore undo pages',
+  proname => 'pg_zs_undo_pages', prorows => '1000', proretset => 't',
+  prorettype => 'record', proargtypes => 'regclass',
+  proallargtypes => '{regclass,int8,int4,int4,int8,int8}',
+  proargmodes => '{i,o,o,o,o,o}',
+  proargnames => '{relid,blkno,nrecords,freespace,firstrecptr,lastrecptr}',
+  prosrc => 'pg_zs_undo_pages' },
+{ oid => '7002', descr => 'show stats about zedstore btree pages',
+  proname => 'pg_zs_btree_pages', prorows => '1000', proretset => 't',
+  prorettype => 'record', proargtypes => 'regclass',
+  proallargtypes => '{regclass,int8,int8,int4,int4,int8,int8,int4,int4,int4,int4,int4}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{relid,blkno,nextblk,attno,level,lokey,hikey,nitems,ncompressed,totalsz,uncompressedsz,freespace}',
+  prosrc => 'pg_zs_btree_pages' },
+
 ]
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index eb4c8b5e79..509017aa56 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -597,5 +597,7 @@ extern void CheckCmdReplicaIdentity(Relation rel, CmdType cmd);
 
 extern void CheckSubscriptionRelkind(char relkind, const char *nspname,
 						 const char *relname);
+extern void GetNeededColumnsForNode(Node *expr, bool *mask, int n);
+extern bool *GetNeededColumnsForScan(ScanState *scanstate, int ncol);
 
 #endif							/* EXECUTOR_H  */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index a5e4b7ef2e..108dee61e2 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1424,6 +1424,7 @@ typedef struct IndexOnlyScanState
 	struct IndexScanDescData *ioss_ScanDesc;
 	Buffer		ioss_VMBuffer;
 	Size		ioss_PscanLen;
+	TupleTableSlot *ioss_TableSlot;
 } IndexOnlyScanState;
 
 /* ----------------
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index 6cd4cfed0a..ad7870a0bb 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -350,6 +350,9 @@
 /* Define to 1 if you have the `ldap_r' library (-lldap_r). */
 #undef HAVE_LIBLDAP_R
 
+/* Define to 1 if you have the `lz4' library (-llz4). */
+#undef HAVE_LIBLZ4
+
 /* Define to 1 if you have the `m' library (-lm). */
 #undef HAVE_LIBM
 
@@ -389,6 +392,9 @@
 /* Define to 1 if `long long int' works and is 64 bits. */
 #undef HAVE_LONG_LONG_INT_64
 
+/* Define to 1 if you have the <lz4.h> header file. */
+#undef HAVE_LZ4_H
+
 /* Define to 1 if you have the <mbarrier.h> header file. */
 #undef HAVE_MBARRIER_H
 
@@ -932,6 +938,9 @@
 /* Define to 1 to build with LLVM based JIT support. (--with-llvm) */
 #undef USE_LLVM
 
+/* Define to 1 to build with LZ4 support. (--with-lz4) */
+#undef USE_LZ4
+
 /* Define to select named POSIX semaphores. */
 #undef USE_NAMED_POSIX_SEMAPHORES
 
diff --git a/src/test/regress/expected/create_am.out b/src/test/regress/expected/create_am.out
index 352959b751..6eae2bab97 100644
--- a/src/test/regress/expected/create_am.out
+++ b/src/test/regress/expected/create_am.out
@@ -126,11 +126,12 @@ ERROR:  function int4in(internal) does not exist
 CREATE ACCESS METHOD bogus TYPE TABLE HANDLER bthandler;
 ERROR:  function bthandler must return type table_am_handler
 SELECT amname, amhandler, amtype FROM pg_am where amtype = 't' ORDER BY 1, 2;
- amname |      amhandler       | amtype 
---------+----------------------+--------
- heap   | heap_tableam_handler | t
- heap2  | heap_tableam_handler | t
-(2 rows)
+  amname  |        amhandler         | amtype 
+----------+--------------------------+--------
+ heap     | heap_tableam_handler     | t
+ heap2    | heap_tableam_handler     | t
+ zedstore | zedstore_tableam_handler | t
+(3 rows)
 
 -- First create tables employing the new AM using USING
 -- plain CREATE TABLE
diff --git a/src/test/regress/expected/zedstore.out b/src/test/regress/expected/zedstore.out
new file mode 100644
index 0000000000..37ffd6a98a
--- /dev/null
+++ b/src/test/regress/expected/zedstore.out
@@ -0,0 +1,259 @@
+-- simple tests to iteratively build the zedstore
+-- create and drop works
+create table t_zedstore(c1 int, c2 int, c3 int) USING zedstore;
+drop table t_zedstore;
+-- insert and select works
+create table t_zedstore(c1 int, c2 int, c3 int) USING zedstore;
+insert into t_zedstore select i,i+1,i+2 from generate_series(1, 10)i;
+select * from t_zedstore;
+ c1 | c2 | c3 
+----+----+----
+  1 |  2 |  3
+  2 |  3 |  4
+  3 |  4 |  5
+  4 |  5 |  6
+  5 |  6 |  7
+  6 |  7 |  8
+  7 |  8 |  9
+  8 |  9 | 10
+  9 | 10 | 11
+ 10 | 11 | 12
+(10 rows)
+
+-- selecting only few columns work
+select c1, c3 from t_zedstore;
+ c1 | c3 
+----+----
+  1 |  3
+  2 |  4
+  3 |  5
+  4 |  6
+  5 |  7
+  6 |  8
+  7 |  9
+  8 | 10
+  9 | 11
+ 10 | 12
+(10 rows)
+
+-- only few columns in output and where clause work
+select c3 from t_zedstore where c2 > 5;
+ c3 
+----
+  7
+  8
+  9
+ 10
+ 11
+ 12
+(6 rows)
+
+-- Test abort works
+begin;
+insert into t_zedstore select i,i+1,i+2 from generate_series(21, 25)i;
+abort;
+insert into t_zedstore select i,i+1,i+2 from generate_series(31, 35)i;
+select * from t_zedstore;
+ c1 | c2 | c3 
+----+----+----
+  1 |  2 |  3
+  2 |  3 |  4
+  3 |  4 |  5
+  4 |  5 |  6
+  5 |  6 |  7
+  6 |  7 |  8
+  7 |  8 |  9
+  8 |  9 | 10
+  9 | 10 | 11
+ 10 | 11 | 12
+ 31 | 32 | 33
+ 32 | 33 | 34
+ 33 | 34 | 35
+ 34 | 35 | 36
+ 35 | 36 | 37
+(15 rows)
+
+--
+-- Test indexing
+--
+create index on t_zedstore (c1);
+set enable_seqscan=off;
+set enable_indexscan=on;
+set enable_bitmapscan=off;
+-- index scan
+select * from t_zedstore where c1 = 5;
+ c1 | c2 | c3 
+----+----+----
+  5 |  6 |  7
+(1 row)
+
+-- index-only scan
+select c1 from t_zedstore where c1 = 5;
+ c1 
+----
+  5
+(1 row)
+
+-- bitmap scan
+set enable_indexscan=off;
+set enable_bitmapscan=on;
+select c1, c2 from t_zedstore where c1 between 5 and 10;
+ c1 | c2 
+----+----
+  5 |  6
+  6 |  7
+  7 |  8
+  8 |  9
+  9 | 10
+ 10 | 11
+(6 rows)
+
+--
+-- Test DELETE and UPDATE
+--
+delete from t_zedstore where c2 = 5;
+select * from t_zedstore;
+ c1 | c2 | c3 
+----+----+----
+  1 |  2 |  3
+  2 |  3 |  4
+  3 |  4 |  5
+  5 |  6 |  7
+  6 |  7 |  8
+  7 |  8 |  9
+  8 |  9 | 10
+  9 | 10 | 11
+ 10 | 11 | 12
+ 31 | 32 | 33
+ 32 | 33 | 34
+ 33 | 34 | 35
+ 34 | 35 | 36
+ 35 | 36 | 37
+(14 rows)
+
+delete from t_zedstore where c2 < 5;
+select * from t_zedstore;
+ c1 | c2 | c3 
+----+----+----
+  5 |  6 |  7
+  6 |  7 |  8
+  7 |  8 |  9
+  8 |  9 | 10
+  9 | 10 | 11
+ 10 | 11 | 12
+ 31 | 32 | 33
+ 32 | 33 | 34
+ 33 | 34 | 35
+ 34 | 35 | 36
+ 35 | 36 | 37
+(11 rows)
+
+update t_zedstore set c2 = 100 where c1 = 8;
+select * from t_zedstore;
+ c1 | c2  | c3 
+----+-----+----
+  5 |   6 |  7
+  6 |   7 |  8
+  7 |   8 |  9
+  9 |  10 | 11
+ 10 |  11 | 12
+ 31 |  32 | 33
+ 32 |  33 | 34
+ 33 |  34 | 35
+ 34 |  35 | 36
+ 35 |  36 | 37
+  8 | 100 | 10
+(11 rows)
+
+--
+-- Test VACUUM
+--
+vacuum t_zedstore;
+select * from t_zedstore;
+ c1 | c2  | c3 
+----+-----+----
+  5 |   6 |  7
+  6 |   7 |  8
+  7 |   8 |  9
+  9 |  10 | 11
+ 10 |  11 | 12
+ 31 |  32 | 33
+ 32 |  33 | 34
+ 33 |  34 | 35
+ 34 |  35 | 36
+ 35 |  36 | 37
+  8 | 100 | 10
+(11 rows)
+
+--
+-- Test toasting
+--
+create table t_zedtoast(c1 int, t text) USING zedstore;
+insert into t_zedtoast select i, repeat('x', 10000) from generate_series(1, 10) i;
+select c1, length(t) from t_zedtoast;
+ c1 | length 
+----+--------
+  1 |  10000
+  2 |  10000
+  3 |  10000
+  4 |  10000
+  5 |  10000
+  6 |  10000
+  7 |  10000
+  8 |  10000
+  9 |  10000
+ 10 |  10000
+(10 rows)
+
+--
+-- Test NULL values
+--
+create table t_zednullvalues(c1 int, c2 int) USING zedstore;
+insert into t_zednullvalues values(1, NULL), (NULL, 2);
+select * from t_zednullvalues;
+ c1 | c2 
+----+----
+  1 |   
+    |  2
+(2 rows)
+
+select c2 from t_zednullvalues;
+ c2 
+----
+   
+  2
+(2 rows)
+
+update t_zednullvalues set c1 = 1, c2 = NULL;
+select * from t_zednullvalues;
+ c1 | c2 
+----+----
+  1 |   
+  1 |   
+(2 rows)
+
+--
+-- Test COPY
+--
+create table t_zedcopy(a serial, b int, c text not null default 'stuff', d text,e text) USING zedstore;
+COPY t_zedcopy (a, b, c, d, e) from stdin;
+COPY t_zedcopy (b, d) from stdin;
+COPY t_zedcopy (b, d) from stdin;
+COPY t_zedcopy (a, b, c, d, e) from stdin;
+select * from t_zedcopy;
+   a   | b  |   c   |   d    | e  
+-------+----+-------+--------+----
+  9999 |    | \N    | NN     | 
+ 10000 | 21 | 31    | 41     | 51
+     1 |  1 | stuff | test_1 | 
+     2 |  2 | stuff | test_2 | 
+     3 |  3 | stuff | test_3 | 
+     4 |  4 | stuff | test_4 | 
+     5 |  5 | stuff | test_5 | 
+ 10001 | 22 | 32    | 42     | 52
+ 10002 | 23 | 33    | 43     | 53
+ 10003 | 24 | 34    | 44     | 54
+ 10004 | 25 | 35    | 45     | 55
+ 10005 | 26 | 36    | 46     | 56
+(12 rows)
+
diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule
index f320fb6ef3..1668c251db 100644
--- a/src/test/regress/parallel_schedule
+++ b/src/test/regress/parallel_schedule
@@ -89,7 +89,7 @@ test: brin gin gist spgist privileges init_privs security_label collate matview
 # ----------
 # Another group of parallel tests
 # ----------
-test: create_table_like alter_generic alter_operator misc psql async dbsize misc_functions sysviews tsrf tidscan stats_ext
+test: create_table_like alter_generic alter_operator misc psql async dbsize misc_functions sysviews tsrf tidscan stats_ext zedstore
 
 # rules cannot run concurrently with any test that creates a view
 test: rules psql_crosstab amutils
diff --git a/src/test/regress/serial_schedule b/src/test/regress/serial_schedule
index 36644aa963..a81722e412 100644
--- a/src/test/regress/serial_schedule
+++ b/src/test/regress/serial_schedule
@@ -135,6 +135,7 @@ test: sysviews
 test: tsrf
 test: tidscan
 test: stats_ext
+test: zedstore
 test: rules
 test: psql_crosstab
 test: select_parallel
diff --git a/src/test/regress/sql/zedstore.sql b/src/test/regress/sql/zedstore.sql
new file mode 100644
index 0000000000..5240a9efeb
--- /dev/null
+++ b/src/test/regress/sql/zedstore.sql
@@ -0,0 +1,104 @@
+-- simple tests to iteratively build the zedstore
+-- create and drop works
+create table t_zedstore(c1 int, c2 int, c3 int) USING zedstore;
+drop table t_zedstore;
+-- insert and select works
+create table t_zedstore(c1 int, c2 int, c3 int) USING zedstore;
+insert into t_zedstore select i,i+1,i+2 from generate_series(1, 10)i;
+select * from t_zedstore;
+-- selecting only few columns work
+select c1, c3 from t_zedstore;
+-- only few columns in output and where clause work
+select c3 from t_zedstore where c2 > 5;
+
+-- Test abort works
+begin;
+insert into t_zedstore select i,i+1,i+2 from generate_series(21, 25)i;
+abort;
+insert into t_zedstore select i,i+1,i+2 from generate_series(31, 35)i;
+select * from t_zedstore;
+
+--
+-- Test indexing
+--
+create index on t_zedstore (c1);
+set enable_seqscan=off;
+set enable_indexscan=on;
+set enable_bitmapscan=off;
+
+-- index scan
+select * from t_zedstore where c1 = 5;
+
+-- index-only scan
+select c1 from t_zedstore where c1 = 5;
+
+-- bitmap scan
+set enable_indexscan=off;
+set enable_bitmapscan=on;
+select c1, c2 from t_zedstore where c1 between 5 and 10;
+
+--
+-- Test DELETE and UPDATE
+--
+delete from t_zedstore where c2 = 5;
+select * from t_zedstore;
+delete from t_zedstore where c2 < 5;
+select * from t_zedstore;
+
+update t_zedstore set c2 = 100 where c1 = 8;
+select * from t_zedstore;
+
+--
+-- Test VACUUM
+--
+vacuum t_zedstore;
+select * from t_zedstore;
+
+--
+-- Test toasting
+--
+create table t_zedtoast(c1 int, t text) USING zedstore;
+insert into t_zedtoast select i, repeat('x', 10000) from generate_series(1, 10) i;
+
+select c1, length(t) from t_zedtoast;
+
+--
+-- Test NULL values
+--
+create table t_zednullvalues(c1 int, c2 int) USING zedstore;
+insert into t_zednullvalues values(1, NULL), (NULL, 2);
+select * from t_zednullvalues;
+select c2 from t_zednullvalues;
+update t_zednullvalues set c1 = 1, c2 = NULL;
+select * from t_zednullvalues;
+
+--
+-- Test COPY
+--
+create table t_zedcopy(a serial, b int, c text not null default 'stuff', d text,e text) USING zedstore;
+
+COPY t_zedcopy (a, b, c, d, e) from stdin;
+9999	\N	\\N	\NN	\N
+10000	21	31	41	51
+\.
+
+COPY t_zedcopy (b, d) from stdin;
+1	test_1
+\.
+
+COPY t_zedcopy (b, d) from stdin;
+2	test_2
+3	test_3
+4	test_4
+5	test_5
+\.
+
+COPY t_zedcopy (a, b, c, d, e) from stdin;
+10001	22	32	42	52
+10002	23	33	43	53
+10003	24	34	44	54
+10004	25	35	45	55
+10005	26	36	46	56
+\.
+
+select * from t_zedcopy;

base-commit: 80a96e066eecb6bd1788964b5911a405d932a784
-- 
2.19.1

