diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index d025ff7..7765cae 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -53,6 +53,7 @@
 #include "access/xact.h"
 #include "access/xlogutils.h"
 #include "catalog/catalog.h"
+#include "catalog/index.h"
 #include "catalog/namespace.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -86,6 +87,7 @@ static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
 static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
 				ItemPointerData from, Buffer newbuf, HeapTuple newtup,
 				bool all_visible_cleared, bool new_all_visible_cleared);
+static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup);
 static bool HeapSatisfiesHOTUpdate(Relation relation, Bitmapset *hot_attrs,
 					   HeapTuple oldtup, HeapTuple newtup);
 
@@ -1618,10 +1620,16 @@ heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer,
 		 */
 		if (!skip)
 		{
+			/* setup the redirected t_self for the benefit of timetravel access */
+			ItemPointerSet(&(heapTuple->t_self), BufferGetBlockNumber(buffer), offnum);
+
 			/* If it's visible per the snapshot, we must return it */
 			valid = HeapTupleSatisfiesVisibility(heapTuple, snapshot, buffer);
 			CheckForSerializableConflictOut(valid, relation, heapTuple,
 											buffer, snapshot);
+			/* reset original, non-redirected, tid */
+			heapTuple->t_self = *tid;
+
 			if (valid)
 			{
 				ItemPointerSetOffsetNumber(tid, offnum);
@@ -1960,10 +1968,24 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 		xl_heap_insert xlrec;
 		xl_heap_header xlhdr;
 		XLogRecPtr	recptr;
-		XLogRecData rdata[3];
+		XLogRecData rdata[4];
 		Page		page = BufferGetPage(buffer);
 		uint8		info = XLOG_HEAP_INSERT;
 
+		/*
+		 * For the logical replication case we need the tuple even if were
+		 * doing a full page write. We could alternatively store a pointer into
+		 * the fpw though.
+		 * For that to work we add another rdata entry for the buffer in that
+		 * case.
+		 */
+		bool        need_tuple_data = wal_level >= WAL_LEVEL_LOGICAL
+			&& RelationGetRelid(relation)  >= FirstNormalObjectId;
+
+		/* For logical decode we need combocids to properly decode the catalog */
+		if (wal_level >= WAL_LEVEL_LOGICAL && RelationGetRelid(relation)  < FirstNormalObjectId)
+			log_heap_new_cid(relation, heaptup);
+
 		xlrec.all_visible_cleared = all_visible_cleared;
 		xlrec.target.node = relation->rd_node;
 		xlrec.target.tid = heaptup->t_self;
@@ -1983,18 +2005,33 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 		 */
 		rdata[1].data = (char *) &xlhdr;
 		rdata[1].len = SizeOfHeapHeader;
-		rdata[1].buffer = buffer;
+		rdata[1].buffer = need_tuple_data ? InvalidBuffer : buffer;
 		rdata[1].buffer_std = true;
 		rdata[1].next = &(rdata[2]);
 
 		/* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
 		rdata[2].data = (char *) heaptup->t_data + offsetof(HeapTupleHeaderData, t_bits);
 		rdata[2].len = heaptup->t_len - offsetof(HeapTupleHeaderData, t_bits);
-		rdata[2].buffer = buffer;
+		rdata[2].buffer = need_tuple_data ? InvalidBuffer : buffer;
 		rdata[2].buffer_std = true;
 		rdata[2].next = NULL;
 
 		/*
+		 * add record for the buffer without actual content thats removed if
+		 * fpw is done for that buffer
+		 */
+		if (need_tuple_data)
+		{
+			rdata[2].next = &(rdata[3]);
+
+			rdata[3].data = NULL;
+			rdata[3].len = 0;
+			rdata[3].buffer = buffer;
+			rdata[3].buffer_std = true;
+			rdata[3].next = NULL;
+		}
+
+		/*
 		 * If this is the single and first tuple on page, we can reinit the
 		 * page instead of restoring the whole thing.  Set flag, and hide
 		 * buffer references from XLogInsert.
@@ -2003,7 +2040,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 			PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
 		{
 			info |= XLOG_HEAP_INIT_PAGE;
-			rdata[1].buffer = rdata[2].buffer = InvalidBuffer;
+			rdata[1].buffer = rdata[2].buffer = rdata[3].buffer = InvalidBuffer;
 		}
 
 		recptr = XLogInsert(RM_HEAP_ID, info, rdata);
@@ -2123,6 +2160,10 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 	Page		page;
 	bool		needwal;
 	Size		saveFreeSpace;
+	bool        need_tuple_data = wal_level >= WAL_LEVEL_LOGICAL
+		&& RelationGetRelid(relation)  >= FirstNormalObjectId;
+	bool        need_cids = wal_level >= WAL_LEVEL_LOGICAL &&
+		RelationGetRelid(relation)  < FirstNormalObjectId;
 
 	needwal = !(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation);
 	saveFreeSpace = RelationGetTargetPageFreeSpace(relation,
@@ -2205,7 +2246,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 		{
 			XLogRecPtr	recptr;
 			xl_heap_multi_insert *xlrec;
-			XLogRecData rdata[2];
+			XLogRecData rdata[3];
 			uint8		info = XLOG_HEAP2_MULTI_INSERT;
 			char	   *tupledata;
 			int			totaldatalen;
@@ -2267,6 +2308,15 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 					   datalen);
 				tuphdr->datalen = datalen;
 				scratchptr += datalen;
+
+				/*
+				 * We don't use heap_multi_insert for catalog tuples yet, but
+				 * better be prepared...
+				 */
+				if (need_cids)
+				{
+					log_heap_new_cid(relation, heaptup);
+				}
 			}
 			totaldatalen = scratchptr - tupledata;
 			Assert((scratchptr - scratch) < BLCKSZ);
@@ -2278,17 +2328,32 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 
 			rdata[1].data = tupledata;
 			rdata[1].len = totaldatalen;
-			rdata[1].buffer = buffer;
+			rdata[1].buffer = need_tuple_data ? InvalidBuffer : buffer;
 			rdata[1].buffer_std = true;
 			rdata[1].next = NULL;
 
 			/*
+			 * add record for the buffer without actual content thats removed if
+			 * fpw is done for that buffer
+			 */
+			if (need_tuple_data)
+			{
+				rdata[1].next = &(rdata[2]);
+
+				rdata[2].data = NULL;
+				rdata[2].len = 0;
+				rdata[2].buffer = buffer;
+				rdata[2].buffer_std = true;
+				rdata[2].next = NULL;
+			}
+
+			/*
 			 * If we're going to reinitialize the whole page using the WAL
 			 * record, hide buffer reference from XLogInsert.
 			 */
 			if (init)
 			{
-				rdata[1].buffer = InvalidBuffer;
+				rdata[1].buffer = rdata[2].buffer = InvalidBuffer;
 				info |= XLOG_HEAP_INIT_PAGE;
 			}
 
@@ -2595,7 +2660,14 @@ l1:
 	{
 		xl_heap_delete xlrec;
 		XLogRecPtr	recptr;
-		XLogRecData rdata[2];
+		XLogRecData rdata[4];
+
+		bool need_tuple_data = wal_level >= WAL_LEVEL_LOGICAL &&
+			RelationGetRelid(relation) >= FirstNormalObjectId;
+
+		/* For logical decode we need combocids to properly decode the catalog */
+		if (wal_level >= WAL_LEVEL_LOGICAL && RelationGetRelid(relation)  < FirstNormalObjectId)
+			log_heap_new_cid(relation, &tp);
 
 		xlrec.all_visible_cleared = all_visible_cleared;
 		xlrec.target.node = relation->rd_node;
@@ -2611,6 +2683,76 @@ l1:
 		rdata[1].buffer_std = true;
 		rdata[1].next = NULL;
 
+		/*
+		 * XXX: We could decide not to log changes when the origin is not the
+		 * local node, that should reduce redundant logging.
+		 */
+		if (need_tuple_data)
+		{
+			xl_heap_header xlhdr;
+
+			Oid indexoid = InvalidOid;
+			int16 pknratts;
+			int16 pkattnum[INDEX_MAX_KEYS];
+			Oid pktypoid[INDEX_MAX_KEYS];
+			Oid pkopclass[INDEX_MAX_KEYS];
+			TupleDesc desc = RelationGetDescr(relation);
+			Relation index_rel;
+			TupleDesc indexdesc;
+			int natt;
+
+			Datum idxvals[INDEX_MAX_KEYS];
+			bool idxisnull[INDEX_MAX_KEYS];
+			HeapTuple idxtuple;
+
+			MemSet(pkattnum, 0, sizeof(pkattnum));
+			MemSet(pktypoid, 0, sizeof(pktypoid));
+			MemSet(pkopclass, 0, sizeof(pkopclass));
+			MemSet(idxvals, 0, sizeof(idxvals));
+			MemSet(idxisnull, 0, sizeof(idxisnull));
+			relationFindPrimaryKey(relation, &indexoid, &pknratts, pkattnum, pktypoid, pkopclass);
+
+			if (!indexoid)
+			{
+				elog(WARNING, "Could not find primary key for table with oid %u",
+					 RelationGetRelid(relation));
+				goto no_index_found;
+			}
+
+			index_rel = index_open(indexoid, AccessShareLock);
+
+			indexdesc = RelationGetDescr(index_rel);
+
+			for (natt = 0; natt < indexdesc->natts; natt++)
+			{
+				idxvals[natt] =
+					fastgetattr(&tp, pkattnum[natt], desc, &idxisnull[natt]);
+				Assert(!idxisnull[natt]);
+			}
+
+			idxtuple = heap_form_tuple(indexdesc, idxvals, idxisnull);
+
+			xlhdr.t_infomask2 = idxtuple->t_data->t_infomask2;
+			xlhdr.t_infomask = idxtuple->t_data->t_infomask;
+			xlhdr.t_hoff = idxtuple->t_data->t_hoff;
+
+			rdata[1].next = &(rdata[2]);
+			rdata[2].data = (char*)&xlhdr;
+			rdata[2].len = SizeOfHeapHeader;
+			rdata[2].buffer = InvalidBuffer;
+			rdata[2].next = NULL;
+
+			rdata[2].next = &(rdata[3]);
+			rdata[3].data = (char *) idxtuple->t_data + offsetof(HeapTupleHeaderData, t_bits);
+			rdata[3].len = idxtuple->t_len - offsetof(HeapTupleHeaderData, t_bits);
+			rdata[3].buffer = InvalidBuffer;
+			rdata[3].next = NULL;
+
+			heap_close(index_rel, NoLock);
+		no_index_found:
+			;
+		}
+
 		recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE, rdata);
 
 		PageSetLSN(page, recptr);
@@ -3203,10 +3345,20 @@ l2:
 	/* XLOG stuff */
 	if (RelationNeedsWAL(relation))
 	{
-		XLogRecPtr	recptr = log_heap_update(relation, buffer, oldtup.t_self,
-											 newbuf, heaptup,
-											 all_visible_cleared,
-											 all_visible_cleared_new);
+		XLogRecPtr	recptr;
+
+		/* For logical decode we need combocids to properly decode the catalog */
+		if (wal_level >= WAL_LEVEL_LOGICAL &&
+			RelationGetRelid(relation)  < FirstNormalObjectId)
+		{
+			log_heap_new_cid(relation, &oldtup);
+			log_heap_new_cid(relation, heaptup);
+		}
+
+		recptr = log_heap_update(relation, buffer, oldtup.t_self,
+								 newbuf, heaptup,
+								 all_visible_cleared,
+								 all_visible_cleared_new);
 
 		if (newbuf != buffer)
 		{
@@ -4445,9 +4597,15 @@ log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from,
 	xl_heap_header xlhdr;
 	uint8		info;
 	XLogRecPtr	recptr;
-	XLogRecData rdata[4];
+	XLogRecData rdata[5];
 	Page		page = BufferGetPage(newbuf);
 
+	/*
+	 * Just as for XLOG_HEAP_INSERT we need to make sure the tuple
+	 */
+	bool        need_tuple_data = wal_level >= WAL_LEVEL_LOGICAL
+		&& RelationGetRelid(reln) >= FirstNormalObjectId;
+
 	/* Caller should not call me on a non-WAL-logged relation */
 	Assert(RelationNeedsWAL(reln));
 
@@ -4478,28 +4636,44 @@ log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from,
 	xlhdr.t_hoff = newtup->t_data->t_hoff;
 
 	/*
-	 * As with insert records, we need not store the rdata[2] segment if we
-	 * decide to store the whole buffer instead.
+	 * As with insert's logging , we need not store the the Datum containing
+	 * tuples separately from the buffer if we do logical replication that
+	 * is...
 	 */
 	rdata[2].data = (char *) &xlhdr;
 	rdata[2].len = SizeOfHeapHeader;
-	rdata[2].buffer = newbuf;
+	rdata[2].buffer = need_tuple_data ? InvalidBuffer : newbuf;
 	rdata[2].buffer_std = true;
 	rdata[2].next = &(rdata[3]);
 
 	/* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
 	rdata[3].data = (char *) newtup->t_data + offsetof(HeapTupleHeaderData, t_bits);
 	rdata[3].len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits);
-	rdata[3].buffer = newbuf;
+	rdata[3].buffer = need_tuple_data ? InvalidBuffer : newbuf;
 	rdata[3].buffer_std = true;
 	rdata[3].next = NULL;
 
+	/*
+	 * separate storage for the buffer reference of the new page in the
+	 * wal_level >= logical case
+	*/
+	if(need_tuple_data)
+	{
+		rdata[3].next = &(rdata[4]);
+
+		rdata[4].data = NULL,
+		rdata[4].len = 0;
+		rdata[4].buffer = newbuf;
+		rdata[4].buffer_std = true;
+		rdata[4].next = NULL;
+	}
+
 	/* If new tuple is the single and first tuple on page... */
 	if (ItemPointerGetOffsetNumber(&(newtup->t_self)) == FirstOffsetNumber &&
 		PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
 	{
 		info |= XLOG_HEAP_INIT_PAGE;
-		rdata[2].buffer = rdata[3].buffer = InvalidBuffer;
+		rdata[2].buffer = rdata[3].buffer = rdata[4].buffer = InvalidBuffer;
 	}
 
 	recptr = XLogInsert(RM_HEAP_ID, info, rdata);
@@ -4608,6 +4782,64 @@ log_newpage_buffer(Buffer buffer)
 }
 
 /*
+ * Perform XLogInsert of a XLOG_HEAP2_NEW_CID record
+ *
+ * The HeapTuple really needs to already have a ComboCid set otherwise we
+ * cannot detect combocid/cmin/cmax.
+ *
+ * This is only used in wal_level >= WAL_LEVEL_LOGICAL
+ */
+static XLogRecPtr
+log_heap_new_cid(Relation relation, HeapTuple tup)
+{
+	xl_heap_new_cid xlrec;
+
+	XLogRecPtr	recptr;
+	XLogRecData rdata[1];
+	HeapTupleHeader hdr = tup->t_data;
+
+	Assert(ItemPointerIsValid(&tup->t_self));
+	Assert(tup->t_tableOid != InvalidOid);
+
+	xlrec.top_xid = GetTopTransactionId();
+	xlrec.target.node = relation->rd_node;
+	xlrec.target.tid = tup->t_self;
+
+	if (hdr->t_infomask & HEAP_COMBOCID)
+	{
+		xlrec.cmin = HeapTupleHeaderGetCmin(hdr);
+		xlrec.cmax = HeapTupleHeaderGetCmax(hdr);
+		xlrec.combocid = HeapTupleHeaderGetRawCommandId(hdr);
+	}
+	else
+	{
+		/* tuple inserted */
+		if (hdr->t_infomask & HEAP_XMAX_INVALID)
+		{
+			xlrec.cmin = HeapTupleHeaderGetRawCommandId(hdr);
+			xlrec.cmax = InvalidCommandId;
+		}
+		/* tuple from a different tx updated or deleted */
+		else
+		{
+			xlrec.cmin = InvalidCommandId;
+			xlrec.cmax = HeapTupleHeaderGetRawCommandId(hdr);
+
+		}
+		xlrec.combocid = InvalidCommandId;
+	}
+
+	rdata[0].data = (char *) &xlrec;
+	rdata[0].len = SizeOfHeapNewCid;
+	rdata[0].buffer = InvalidBuffer;
+	rdata[0].next = NULL;
+
+	recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_NEW_CID, rdata);
+
+	return recptr;
+}
+
+/*
  * Handles CLEANUP_INFO
  */
 static void
@@ -5676,6 +5908,9 @@ heap2_redo(XLogRecPtr lsn, XLogRecord *record)
 		case XLOG_HEAP2_MULTI_INSERT:
 			heap_xlog_multi_insert(lsn, record);
 			break;
+		case XLOG_HEAP2_NEW_CID:
+			/* nothing to do on a real replay, only during logical decoding */
+			break;
 		default:
 			elog(PANIC, "heap2_redo: unknown op code %u", info);
 	}
@@ -5825,6 +6060,15 @@ heap2_desc(StringInfo buf, uint8 xl_info, char *rec)
 				xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode,
 						 xlrec->blkno, xlrec->ntuples);
 	}
+	else if (info == XLOG_HEAP2_NEW_CID)
+	{
+		xl_heap_new_cid *xlrec = (xl_heap_new_cid *) rec;
+
+		appendStringInfo(buf, "new_cid: ");
+		out_target(buf, &(xlrec->target));
+		appendStringInfo(buf, "; cmin: %u, cmax: %u, combo: %u",
+						 xlrec->cmin, xlrec->cmax, xlrec->combocid);
+	}
 	else
 		appendStringInfo(buf, "UNKNOWN");
 }
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 1749f46..e6fb04e 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -106,6 +106,7 @@ const struct config_enum_entry wal_level_options[] = {
 	{"minimal", WAL_LEVEL_MINIMAL, false},
 	{"archive", WAL_LEVEL_ARCHIVE, false},
 	{"hot_standby", WAL_LEVEL_HOT_STANDBY, false},
+	{"logical", WAL_LEVEL_LOGICAL, false},
 	{NULL, 0, false}
 };
 
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 18d0c5a..1d86b87 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -50,6 +50,7 @@
 #include "nodes/nodeFuncs.h"
 #include "optimizer/clauses.h"
 #include "parser/parser.h"
+#include "parser/parse_relation.h"
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
@@ -3426,3 +3427,76 @@ ResetReindexPending(void)
 {
 	pendingReindexedIndexes = NIL;
 }
+
+/*
+ * relationFindPrimaryKey
+ *		Find primary key for a relation if it exists.
+ *
+ * If no primary key is found *indexOid is set to InvalidOid
+ *
+ * This is quite similar to tablecmd.c's transformFkeyGetPrimaryKey.
+ *
+ * XXX: It might be a good idea to change pg_class.relhaspkey into an bool to
+ * make this more efficient.
+ */
+void
+relationFindPrimaryKey(Relation pkrel, Oid *indexOid,
+                       int16 *nratts, int16 *attnums, Oid *atttypids,
+                       Oid *opclasses){
+	List *indexoidlist;
+	ListCell *indexoidscan;
+	HeapTuple indexTuple = NULL;
+	Datum indclassDatum;
+	bool isnull;
+	oidvector  *indclass;
+	int i;
+	Form_pg_index indexStruct = NULL;
+
+	*indexOid = InvalidOid;
+
+	indexoidlist = RelationGetIndexList(pkrel);
+
+	foreach(indexoidscan, indexoidlist)
+	{
+		Oid indexoid = lfirst_oid(indexoidscan);
+
+		indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid));
+		if(!HeapTupleIsValid(indexTuple))
+			elog(ERROR, "cache lookup failed for index %u", indexoid);
+
+		indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
+		if(indexStruct->indisprimary && indexStruct->indimmediate)
+		{
+			*indexOid = indexoid;
+			break;
+		}
+		ReleaseSysCache(indexTuple);
+
+	}
+	list_free(indexoidlist);
+
+	if (!OidIsValid(*indexOid))
+		return;
+
+	/* Must get indclass the hard way */
+	indclassDatum = SysCacheGetAttr(INDEXRELID, indexTuple,
+									Anum_pg_index_indclass, &isnull);
+	Assert(!isnull);
+	indclass = (oidvector *) DatumGetPointer(indclassDatum);
+
+	*nratts = indexStruct->indnatts;
+	/*
+	 * Now build the list of PK attributes from the indkey definition (we
+	 * assume a primary key cannot have expressional elements)
+	 */
+	for (i = 0; i < indexStruct->indnatts; i++)
+	{
+		int			pkattno = indexStruct->indkey.values[i];
+
+		attnums[i] = pkattno;
+		atttypids[i] = attnumTypeId(pkrel, pkattno);
+		opclasses[i] = indclass->values[i];
+	}
+
+	ReleaseSysCache(indexTuple);
+}
diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile
index 2dde011..2e13e27 100644
--- a/src/backend/replication/Makefile
+++ b/src/backend/replication/Makefile
@@ -17,6 +17,8 @@ override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
 OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \
 	repl_gram.o syncrep.o
 
+SUBDIRS = logical
+
 include $(top_srcdir)/src/backend/common.mk
 
 # repl_scanner is compiled as part of repl_gram
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
new file mode 100644
index 0000000..cf040ef
--- /dev/null
+++ b/src/backend/replication/logical/Makefile
@@ -0,0 +1,19 @@
+#-------------------------------------------------------------------------
+#
+# Makefile--
+#    Makefile for src/backend/replication/logical
+#
+# IDENTIFICATION
+#    src/backend/replication/logical/Makefile
+#
+#-------------------------------------------------------------------------
+
+subdir = src/backend/replication/logical
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
+
+OBJS = decode.o logicalfuncs.o reorderbuffer.o snapbuild.o
+
+include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
new file mode 100644
index 0000000..15d261b
--- /dev/null
+++ b/src/backend/replication/logical/decode.c
@@ -0,0 +1,496 @@
+/*-------------------------------------------------------------------------
+ *
+ * decode.c
+ *		Decodes wal records from an xlogreader.h callback into an reorderbuffer
+ *		while building an appropriate snapshots to decode those
+ *
+ * NOTE:
+ * Its possible that the separation between decode.c and snapbuild.c is a
+ * bit too strict, in the end they just about have the same switch.
+ *
+ * Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/replication/logical/decode.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "access/heapam_xlog.h"
+#include "access/transam.h"
+#include "access/xlog_internal.h"
+#include "access/xact.h"
+#include "access/xlogreader.h"
+
+#include "catalog/pg_control.h"
+
+#include "replication/reorderbuffer.h"
+#include "replication/decode.h"
+#include "replication/snapbuild.h"
+#include "replication/logicalfuncs.h"
+
+#include "utils/memutils.h"
+#include "utils/syscache.h"
+#include "utils/lsyscache.h"
+
+static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple);
+
+static void DecodeInsert(ReorderBuffer *cache, XLogRecordBuffer *buf);
+
+static void DecodeUpdate(ReorderBuffer *cache, XLogRecordBuffer *buf);
+
+static void DecodeDelete(ReorderBuffer *cache, XLogRecordBuffer *buf);
+
+static void DecodeMultiInsert(ReorderBuffer *cache, XLogRecordBuffer *buf);
+
+static void DecodeCommit(ReaderApplyState *state, XLogRecordBuffer *buf, TransactionId xid,
+						 TransactionId *sub_xids, int nsubxacts);
+
+static void DecodeAbort(ReorderBuffer * cache, XLogRecPtr lsn, TransactionId xid,
+						TransactionId *sub_xids, int nsubxacts);
+
+
+void DecodeRecordIntoReorderBuffer(XLogReaderState *reader,
+								   ReaderApplyState *state,
+								   XLogRecordBuffer *buf)
+{
+	XLogRecord *r = &buf->record;
+	uint8 info = r->xl_info & ~XLR_INFO_MASK;
+	ReorderBuffer *reorder = state->reorderbuffer;
+	SnapBuildAction action;
+
+	/*
+	 * FIXME: The existance of the snapshot builder is pretty obvious to the
+	 * outside right now, that doesn't seem to be very good...
+	 */
+	if (!state->snapstate)
+	{
+		state->snapstate = AllocateSnapshotBuilder(reorder);
+	}
+
+	/*---------
+	 * Call the snapshot builder. It needs to be called before we analyze
+	 * tuples for two reasons:
+	 *
+	 * * Only in the snapshot building logic we know whether we have enough
+	 *   information to decode a particular tuple
+	 *
+	 * * The Snapshot/CommandIds computed by the SnapshotBuilder need to be
+	 *   added to the ReorderBuffer before we add tuples using them
+	 *---------
+	 */
+	action = SnapBuildDecodeCallback(reorder, state->snapstate, buf);
+
+	if (state->stop_after_consistent && state->snapstate->state == SNAPBUILD_CONSISTENT)
+	{
+		//Assert(action == SNAPBUILD_SKIP);
+		reader->stop_at_record_boundary = true;
+		elog(WARNING, "reached consistent point, stopping!");
+		return;
+	}
+
+	if (action == SNAPBUILD_SKIP)
+		return;
+
+	switch (r->xl_rmid)
+	{
+		case RM_HEAP_ID:
+			{
+				info &= XLOG_HEAP_OPMASK;
+				switch (info)
+				{
+					case XLOG_HEAP_INSERT:
+						DecodeInsert(reorder, buf);
+						break;
+
+						/*
+						 * no guarantee that we get an HOT update again, so
+						 * handle it as a normal update
+						 */
+					case XLOG_HEAP_HOT_UPDATE:
+					case XLOG_HEAP_UPDATE:
+						DecodeUpdate(reorder, buf);
+						break;
+
+					case XLOG_HEAP_NEWPAGE:
+						/*
+						 * XXX: There doesn't seem to be a usecase for decoding
+						 * HEAP_NEWPAGE's. Its only used in various indexam's
+						 * and CLUSTER, neither of which should be relevant for
+						 * the logical changestream.
+						 */
+						break;
+
+					case XLOG_HEAP_DELETE:
+						DecodeDelete(reorder, buf);
+						break;
+					default:
+						break;
+				}
+				break;
+			}
+		case RM_HEAP2_ID:
+			{
+				info &= XLOG_HEAP_OPMASK;
+				switch (info)
+				{
+					case XLOG_HEAP2_MULTI_INSERT:
+						DecodeMultiInsert(reorder, buf);
+						break;
+					default:
+						/*
+						 * everything else here is just physical stuff were not
+						 * interested in
+						 */
+						break;
+				}
+				break;
+			}
+
+		case RM_XACT_ID:
+			{
+				switch (info)
+				{
+					case XLOG_XACT_COMMIT:
+						{
+							TransactionId *sub_xids;
+							xl_xact_commit *xlrec =
+								(xl_xact_commit *) buf->record_data;
+
+							/*
+							 * FIXME: theoretically computing this address is
+							 * not really allowed if there are no
+							 * subtransactions
+							 */
+							sub_xids = (TransactionId *) &(
+								xlrec->xnodes[xlrec->nrels]);
+
+							DecodeCommit(state, buf, r->xl_xid,
+										 sub_xids, xlrec->nsubxacts);
+
+
+							break;
+						}
+					case XLOG_XACT_COMMIT_PREPARED:
+						{
+							TransactionId *sub_xids;
+							xl_xact_commit_prepared *xlrec =
+								(xl_xact_commit_prepared*) buf->record_data;
+
+							sub_xids = (TransactionId *) &(
+								xlrec->crec.xnodes[xlrec->crec.nrels]);
+
+							DecodeCommit(state, buf, r->xl_xid, sub_xids,
+										 xlrec->crec.nsubxacts);
+
+							break;
+						}
+					case XLOG_XACT_COMMIT_COMPACT:
+						{
+							xl_xact_commit_compact *xlrec =
+								(xl_xact_commit_compact *) buf->record_data;
+							DecodeCommit(state, buf, r->xl_xid,
+										 xlrec->subxacts, xlrec->nsubxacts);
+							break;
+						}
+					case XLOG_XACT_ABORT:
+						{
+							TransactionId *sub_xids;
+							xl_xact_abort *xlrec =
+								(xl_xact_abort *) buf->record_data;
+
+							sub_xids = (TransactionId *) &(
+								xlrec->xnodes[xlrec->nrels]);
+
+							DecodeAbort(reorder, r->xl_xid, buf->origptr,
+										sub_xids, xlrec->nsubxacts);
+							break;
+						}
+					case XLOG_XACT_ABORT_PREPARED:
+						{
+							TransactionId *sub_xids;
+							xl_xact_abort_prepared *xlrec =
+								(xl_xact_abort_prepared *)buf->record_data;
+							xl_xact_abort *arec = &xlrec->arec;
+
+							sub_xids = (TransactionId *) &(
+								arec->xnodes[arec->nrels]);
+
+							DecodeAbort(reorder, xlrec->xid, buf->origptr,
+										sub_xids, arec->nsubxacts);
+							/* XXX: any reason for also aborting r->xl_xid? */
+							break;
+						}
+
+					case XLOG_XACT_ASSIGNMENT:
+						/*
+						 * XXX: We could reassign transactions to the parent
+						 * here to save space and effort when merging
+						 * transactions at commit.
+						 */
+						break;
+					case XLOG_XACT_PREPARE:
+						/*
+						 * FXIME: we should replay the transaction and prepare
+						 * it as well.
+						 */
+						break;
+					default:
+						break;
+						;
+				}
+				break;
+			}
+		case RM_XLOG_ID:
+			{
+				switch (info)
+				{
+					/* this is also used in END_OF_RECOVERY checkpoints */
+					case XLOG_CHECKPOINT_SHUTDOWN:
+						/*
+						 * abort all transactions that still are in progress,
+						 * they aren't in progress anymore.  do not abort
+						 * prepared transactions that have been prepared for
+						 * commit.
+						 *
+						 * FIXME: implement.
+						 */
+						break;
+				}
+			}
+		default:
+			break;
+	}
+}
+
+static void
+DecodeCommit(ReaderApplyState *state, XLogRecordBuffer *buf, TransactionId xid,
+			 TransactionId *sub_xids, int nsubxacts)
+{
+	int i;
+
+	/* not interested in that part of the stream */
+	if (XLByteLE(buf->origptr, state->snapstate->transactions_after))
+	{
+		DecodeAbort(state->reorderbuffer, buf->origptr, xid,
+					sub_xids, nsubxacts);
+		return;
+	}
+
+	for (i = 0; i < nsubxacts; i++)
+	{
+		ReorderBufferCommitChild(state->reorderbuffer, xid, *sub_xids,
+								 buf->origptr);
+		sub_xids++;
+	}
+
+	/* replay actions of all transaction + subtransactions in order */
+	ReorderBufferCommit(state->reorderbuffer, xid, buf->origptr);
+}
+
+static void
+DecodeAbort(ReorderBuffer *reorder, XLogRecPtr lsn, TransactionId xid,
+			TransactionId *sub_xids, int nsubxacts)
+{
+	int i;
+
+	elog(WARNING, "ABORT %u", xid);
+
+	for (i = 0; i < nsubxacts; i++)
+	{
+		ReorderBufferAbort(reorder, *sub_xids, lsn);
+		sub_xids++;
+	}
+
+	ReorderBufferAbort(reorder, xid, lsn);
+}
+
+static void
+DecodeInsert(ReorderBuffer *reorder, XLogRecordBuffer *buf)
+{
+	XLogRecord *r = &buf->record;
+	xl_heap_insert *xlrec = (xl_heap_insert *) buf->record_data;
+
+	ReorderBufferChange *change;
+
+	if (r->xl_info & XLR_BKP_BLOCK(0)
+		&& r->xl_len < (SizeOfHeapUpdate + SizeOfHeapHeader))
+	{
+		elog(DEBUG2, "huh, no tuple data on wal_level = logical?");
+		return;
+	}
+
+	change = ReorderBufferGetChange(reorder);
+	change->action = REORDER_BUFFER_CHANGE_INSERT;
+
+	memcpy(&change->relnode, &xlrec->target.node, sizeof(RelFileNode));
+
+	change->newtuple = ReorderBufferGetTupleBuf(reorder);
+
+	DecodeXLogTuple((char *) xlrec + SizeOfHeapInsert,
+					r->xl_len - SizeOfHeapInsert,
+					change->newtuple);
+
+	ReorderBufferAddChange(reorder, r->xl_xid, buf->origptr, change);
+}
+
+static void
+DecodeUpdate(ReorderBuffer *reorder, XLogRecordBuffer *buf)
+{
+	XLogRecord *r = &buf->record;
+	xl_heap_update *xlrec = (xl_heap_update *) buf->record_data;
+
+
+	ReorderBufferChange *change;
+
+	if ((r->xl_info & XLR_BKP_BLOCK(0) || r->xl_info & XLR_BKP_BLOCK(1)) &&
+		(r->xl_len < (SizeOfHeapUpdate + SizeOfHeapHeader)))
+	{
+		elog(DEBUG2, "huh, no tuple data on wal_level = logical?");
+		return;
+	}
+
+	change = ReorderBufferGetChange(reorder);
+	change->action = REORDER_BUFFER_CHANGE_UPDATE;
+
+	memcpy(&change->relnode, &xlrec->target.node, sizeof(RelFileNode));
+
+	/*
+	 * FIXME: need to get/save the old tuple as well if we want primary key
+	 * changes to work.
+	 */
+	change->newtuple = ReorderBufferGetTupleBuf(reorder);
+
+	DecodeXLogTuple((char *) xlrec + SizeOfHeapUpdate,
+					r->xl_len - SizeOfHeapUpdate,
+					change->newtuple);
+
+	ReorderBufferAddChange(reorder, r->xl_xid, buf->origptr, change);
+}
+
+static void
+DecodeDelete(ReorderBuffer *reorder, XLogRecordBuffer *buf)
+{
+	XLogRecord *r = &buf->record;
+
+	xl_heap_delete *xlrec = (xl_heap_delete *) buf->record_data;
+
+	ReorderBufferChange *change;
+
+	change = ReorderBufferGetChange(reorder);
+	change->action = REORDER_BUFFER_CHANGE_DELETE;
+
+	memcpy(&change->relnode, &xlrec->target.node, sizeof(RelFileNode));
+
+	if (r->xl_len <= (SizeOfHeapDelete + SizeOfHeapHeader))
+	{
+		elog(DEBUG2, "huh, no primary key for a delete on wal_level = logical?");
+		return;
+	}
+
+	change->oldtuple = ReorderBufferGetTupleBuf(reorder);
+
+	DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete,
+					r->xl_len - SizeOfHeapDelete,
+					change->oldtuple);
+
+	ReorderBufferAddChange(reorder, r->xl_xid, buf->origptr, change);
+}
+
+/*
+ * Decode xl_heap_multi_insert record into multiple changes.
+ *
+ * Due to slightly different layout we can't reuse DecodeXLogTuple without
+ * making that even harder to understand than already is.
+ */
+static void
+DecodeMultiInsert(ReorderBuffer *reorder, XLogRecordBuffer *buf)
+{
+	XLogRecord *r = &buf->record;
+	xl_heap_multi_insert *xlrec = (xl_heap_multi_insert *)buf->record_data;
+	int i;
+	char *data = buf->record_data;
+	bool		isinit = (r->xl_info & XLOG_HEAP_INIT_PAGE) != 0;
+
+	data += SizeOfHeapMultiInsert;
+
+	/* OffsetNumber's are only stored if its not a HEAP_INIT_PAGE record */
+	if (!isinit)
+		data += sizeof(OffsetNumber) * xlrec->ntuples;
+
+	for (i = 0; i < xlrec->ntuples; i++)
+	{
+		ReorderBufferChange *change;
+		xl_multi_insert_tuple *xlhdr;
+		int datalen;
+		ReorderBufferTupleBuf *tuple;
+
+		change = ReorderBufferGetChange(reorder);
+		change->action = REORDER_BUFFER_CHANGE_INSERT;
+		change->newtuple = ReorderBufferGetTupleBuf(reorder);
+		memcpy(&change->relnode, &xlrec->node, sizeof(RelFileNode));
+
+		tuple = change->newtuple;
+		/* not a disk based tuple */
+		ItemPointerSetInvalid(&tuple->tuple.t_self);
+
+		xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
+		data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
+		datalen = xlhdr->datalen;
+
+		/* we can only figure this out after reassembling the transactions */
+		tuple->tuple.t_tableOid = InvalidOid;
+		tuple->tuple.t_data = &tuple->header;
+		tuple->tuple.t_len = datalen + offsetof(HeapTupleHeaderData, t_bits);
+
+		memset(&tuple->header, 0, sizeof(HeapTupleHeaderData));
+
+		memcpy((char *) &tuple->header + offsetof(HeapTupleHeaderData, t_bits),
+			   (char *) data,
+			   datalen);
+		data += datalen;
+
+		tuple->header.t_infomask = xlhdr->t_infomask;
+		tuple->header.t_infomask2 = xlhdr->t_infomask2;
+		tuple->header.t_hoff = xlhdr->t_hoff;
+
+		ReorderBufferAddChange(reorder, r->xl_xid, buf->origptr, change);
+	}
+}
+
+
+static void
+DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
+{
+	xl_heap_header xlhdr;
+	int datalen = len - SizeOfHeapHeader;
+
+	Assert(datalen >= 0);
+	Assert(datalen <= MaxHeapTupleSize);
+
+	tuple->tuple.t_len = datalen + offsetof(HeapTupleHeaderData, t_bits);
+
+	/* not a disk based tuple */
+	ItemPointerSetInvalid(&tuple->tuple.t_self);
+
+	/* we can only figure this out after reassembling the transactions */
+	tuple->tuple.t_tableOid = InvalidOid;
+	tuple->tuple.t_data = &tuple->header;
+
+	/* data is not stored aligned */
+	memcpy((char *) &xlhdr,
+		   data,
+		   SizeOfHeapHeader);
+
+	memset(&tuple->header, 0, sizeof(HeapTupleHeaderData));
+
+	memcpy((char *) &tuple->header + offsetof(HeapTupleHeaderData, t_bits),
+		   data + SizeOfHeapHeader,
+		   datalen);
+
+	tuple->header.t_infomask = xlhdr.t_infomask;
+	tuple->header.t_infomask2 = xlhdr.t_infomask2;
+	tuple->header.t_hoff = xlhdr.t_hoff;
+}
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
new file mode 100644
index 0000000..41f2ec8
--- /dev/null
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -0,0 +1,247 @@
+/*-------------------------------------------------------------------------
+ *
+ * logicalfuncs.c
+ *
+ *     Support functions for using xlog decoding
+ *
+ *
+ * Portions Copyright (c) 2012, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/replication/logicalfuncs.c
+ *
+ */
+
+#include "postgres.h"
+
+#include "fmgr.h"
+
+#include "replication/logicalfuncs.h"
+
+#include "catalog/pg_class.h"
+#include "catalog/pg_type.h"
+
+#include "replication/reorderbuffer.h"
+#include "replication/decode.h"
+/*FIXME: XLogRead*/
+#include "replication/walsender_private.h"
+#include "replication/snapbuild.h"
+
+#include "utils/lsyscache.h"
+#include "utils/syscache.h"
+#include "utils/typcache.h"
+
+
+/*
+ * XLogReader callbacks
+ */
+static bool
+replay_record_is_interesting(XLogReaderState* state, XLogRecord* r)
+{
+	return true;
+}
+
+static void
+replay_writeout_data(XLogReaderState* state, char* data, Size len)
+{
+	return;
+}
+
+static void
+replay_finished_record(XLogReaderState* state, XLogRecordBuffer* buf)
+{
+	ReaderApplyState* apply_state = state->private_data;
+	DecodeRecordIntoReorderBuffer(state, apply_state, buf);
+}
+
+static void
+replay_read_page(XLogReaderState* state, char* cur_page, XLogRecPtr startptr)
+{
+	XLogPageHeader page_header;
+
+	Assert((startptr % XLOG_BLCKSZ) == 0);
+
+	/* FIXME: more sensible/efficient implementation */
+	XLogRead(cur_page, startptr, XLOG_BLCKSZ);
+
+	page_header = (XLogPageHeader)cur_page;
+
+	if (page_header->xlp_magic != XLOG_PAGE_MAGIC)
+	{
+		elog(FATAL, "page header magic %x, should be %x at %X/%X", page_header->xlp_magic,
+		     XLOG_PAGE_MAGIC, (uint32)(startptr >> 32), (uint32)startptr);
+	}
+}
+
+/*
+ * Callbacks for ReorderBuffer which add in some more information and then call
+ * output_plugin.h plugins.
+ */
+static void
+begin_txn_wrapper(ReorderBuffer* cache, ReorderBufferTXN* txn)
+{
+	ReaderApplyState *state = cache->private_data;
+	bool send;
+
+	resetStringInfo(state->out);
+	WalSndPrepareWrite(state->out, txn->lsn);
+
+	send = state->begin_cb(state->user_private, state->out, txn);
+
+	if (send)
+	{
+		WalSndWriteData(state->out);
+	}
+}
+
+static void
+commit_txn_wrapper(ReorderBuffer* cache, ReorderBufferTXN* txn, XLogRecPtr commit_lsn)
+{
+	ReaderApplyState *state = cache->private_data;
+	bool send;
+
+	resetStringInfo(state->out);
+	WalSndPrepareWrite(state->out, commit_lsn);
+
+	send = state->commit_cb(state->user_private, state->out, txn, commit_lsn);
+
+	if (send)
+	{
+		WalSndWriteData(state->out);
+	}
+}
+
+static void
+change_wrapper(ReorderBuffer* cache, ReorderBufferTXN* txn, ReorderBufferChange* change)
+{
+	ReaderApplyState *state = cache->private_data;
+	bool send;
+	HeapTuple table;
+	Oid reloid;
+
+	resetStringInfo(state->out);
+	WalSndPrepareWrite(state->out, change->lsn);
+
+	table = LookupTableByRelFileNode(&change->relnode);
+	Assert(table);
+	reloid = HeapTupleHeaderGetOid(table->t_data);
+	ReleaseSysCache(table);
+
+
+	send = state->change_cb(state->user_private, state->out, txn,
+							reloid, change);
+
+	if (send)
+	{
+		WalSndWriteData(state->out);
+	}
+}
+
+/*
+ * Build a snapshot reader that doesn't ever outputs/decodes anything, but just
+ * waits for the first point in the LSN stream where it reaches a consistent
+ * state.
+ */
+XLogReaderState *
+initial_snapshot_reader()
+{
+	ReorderBuffer *reorder;
+	XLogReaderState *xlogreader_state = XLogReaderAllocate();
+	ReaderApplyState *apply_state;
+
+	xlogreader_state->is_record_interesting = replay_record_is_interesting;
+	xlogreader_state->finished_record = replay_finished_record;
+	xlogreader_state->writeout_data = replay_writeout_data;
+	xlogreader_state->read_page = replay_read_page;
+	xlogreader_state->private_data = calloc(1, sizeof(ReaderApplyState));
+
+	if (!xlogreader_state->private_data)
+		elog(ERROR, "Could not allocate the ReaderApplyState struct");
+
+	apply_state = (ReaderApplyState*)xlogreader_state->private_data;
+
+	reorder = ReorderBufferAllocate();
+	reorder->begin = NULL; /* not decoding yet */
+	reorder->apply_change = NULL;
+	reorder->commit = NULL;
+	reorder->private_data = xlogreader_state->private_data;
+
+	apply_state->reorderbuffer = reorder;
+	apply_state->stop_after_consistent = true;
+
+	return xlogreader_state;
+}
+
+/*
+ * Build a snapshot reader with callbacks found in the shared library "plugin"
+ * under the symbol names found in output_plugin.h.
+ * It wraps those callbacks so they send out their changes via an logical
+ * walsender.
+ */
+XLogReaderState *
+normal_snapshot_reader(char *plugin, XLogRecPtr valid_after)
+{
+	ReorderBuffer *reorder;
+	XLogReaderState *xlogreader_state = XLogReaderAllocate();
+	ReaderApplyState *apply_state;
+
+	xlogreader_state->is_record_interesting = replay_record_is_interesting;
+	xlogreader_state->finished_record = replay_finished_record;
+	xlogreader_state->writeout_data = replay_writeout_data;
+	xlogreader_state->read_page = replay_read_page;
+	xlogreader_state->private_data = calloc(1, sizeof(ReaderApplyState));
+
+	if (!xlogreader_state->private_data)
+		elog(ERROR, "Could not allocate the ReaderApplyState struct");
+
+	apply_state = (ReaderApplyState*)xlogreader_state->private_data;
+
+	reorder = ReorderBufferAllocate();
+
+	apply_state->reorderbuffer = reorder;
+	apply_state->stop_after_consistent = false;
+
+	apply_state->snapstate = AllocateSnapshotBuilder(reorder);
+	apply_state->snapstate->transactions_after = valid_after;
+
+	reorder->begin = begin_txn_wrapper;
+	reorder->apply_change = change_wrapper;
+	reorder->commit = commit_txn_wrapper;
+
+	/* lookup symbols in the shared libarary */
+
+	/* optional */
+	apply_state->init_cb = (LogicalDecodeInitCB)
+		load_external_function(plugin, "pg_decode_init", false, NULL);
+
+	apply_state->begin_cb = (LogicalDecodeBeginCB)
+		load_external_function(plugin, "pg_decode_begin_txn", true, NULL);
+
+	apply_state->change_cb = (LogicalDecodeChangeCB)
+		load_external_function(plugin, "pg_decode_change", true, NULL);
+
+	apply_state->commit_cb = (LogicalDecodeCommitCB)
+		load_external_function(plugin, "pg_decode_commit_txn", true, NULL);
+
+	/* optional */
+	apply_state->cleanup_cb = (LogicalDecodeCleanupCB)
+		load_external_function(plugin, "pg_decode_clean", false, NULL);
+
+	reorder->private_data = xlogreader_state->private_data;
+
+	apply_state->out = makeStringInfo();
+
+	/* initialize output plugin */
+	if (apply_state->init_cb)
+		apply_state->init_cb(&apply_state->user_private);
+
+	return xlogreader_state;
+}
+
+/* has the initial snapshot found a consistent state? */
+bool
+initial_snapshot_ready(XLogReaderState *reader)
+{
+	ReaderApplyState* state = reader->private_data;
+	return state->snapstate->state == SNAPBUILD_CONSISTENT;
+}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
new file mode 100644
index 0000000..b80b054
--- /dev/null
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -0,0 +1,1156 @@
+/*-------------------------------------------------------------------------
+ *
+ * reorderbuffer.c
+ *
+ * PostgreSQL logical replay "cache" management
+ *
+ *
+ * Portions Copyright (c) 2012, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/replication/reorderbuffer.c
+ *
+ */
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "access/xact.h"
+
+#include "catalog/pg_class.h"
+#include "catalog/pg_control.h"
+
+#include "lib/simpleheap.h"
+
+#include "replication/reorderbuffer.h"
+#include "replication/snapbuild.h"
+
+#include "storage/sinval.h"
+#include "storage/bufmgr.h"
+
+#include "utils/builtins.h"
+#include "utils/combocid.h"
+#include "utils/memutils.h"
+#include "utils/relcache.h"
+#include "utils/tqual.h"
+#include "utils/syscache.h"
+
+
+
+const Size max_memtries = 1 << 16;
+
+const size_t max_cached_changes = 1024;
+const size_t max_cached_tuplebufs = 1024; /* ~8MB */
+const size_t max_cached_transactions = 512;
+
+/* entry for a hash table we use to map from xid to our transaction state */
+typedef struct ReorderBufferTXNByIdEnt
+{
+	TransactionId xid;
+	ReorderBufferTXN *txn;
+}  ReorderBufferTXNByIdEnt;
+
+typedef struct ReorderBufferTupleCidKey
+{
+	RelFileNode relnode;
+	ItemPointerData tid;
+} ReorderBufferTupleCidKey;
+
+typedef struct ReorderBufferTupleCidEnt
+{
+	ReorderBufferTupleCidKey key;
+	CommandId cmin;
+	CommandId cmax;
+	CommandId combocid;
+} ReorderBufferTupleCidEnt;
+
+/*
+ * For efficiency and simplicity reasons we want to keep Snapshots, CommandIds
+ * and ComboCids in the same list with the user visible INSERT/UPDATE/DELETE
+ * changes. We don't want to leak those internal values to external users
+ * though (they would just use switch()...default:) because that would make it
+ * harder to add to new user visible values.
+ *
+ * This needs to be synchronized with ReorderBufferChangeType! Adjust the
+ * StatisAssertExpr's in ReorderBufferAllocate if you add anything!
+ */
+enum ReorderBufferChangeTypeInternal
+{
+	REORDER_BUFFER_CHANGE_INTERNAL_INSERT,
+	REORDER_BUFFER_CHANGE_INTERNAL_UPDATE,
+	REORDER_BUFFER_CHANGE_INTERNAL_DELETE,
+	REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT,
+	REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID,
+	REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID
+};
+
+/* Get an unused, but potentially cached, ReorderBufferTXN entry */
+static ReorderBufferTXN *ReorderBufferGetTXN(ReorderBuffer *cache);
+
+/* Return an unused ReorderBufferTXN entry */
+static void ReorderBufferReturnTXN(ReorderBuffer *cache, ReorderBufferTXN *txn);
+
+static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *cache, TransactionId xid,
+                                         bool create, bool *is_new);
+
+
+/*
+ * support functions for lsn-order iterating over the ->changes of a
+ * transaction and its subtransactions
+ */
+
+/*
+ * used for iteration over the k-way heap merge of a transaction and its
+ * subtransactions
+ */
+typedef struct ReorderBufferIterTXNState
+{
+	simpleheap *heap;
+} ReorderBufferIterTXNState;
+
+/* allocate & initialize an iterator */
+static ReorderBufferIterTXNState *
+ReorderBufferIterTXNInit(ReorderBuffer *cache, ReorderBufferTXN *txn);
+
+/* get the next change */
+static ReorderBufferChange *
+ReorderBufferIterTXNNext(ReorderBuffer *cache, ReorderBufferIterTXNState *state);
+
+/* deallocate iterator */
+static void
+ReorderBufferIterTXNFinish(ReorderBuffer *cache, ReorderBufferIterTXNState *state);
+
+/* where to put this? */
+static void
+ReorderBufferProcessInvalidations(ReorderBuffer *cache, ReorderBufferTXN *txn);
+
+ReorderBuffer *
+ReorderBufferAllocate(void)
+{
+	ReorderBuffer *cache = (ReorderBuffer *) malloc(sizeof(ReorderBuffer));
+	HASHCTL hash_ctl;
+
+	StaticAssertExpr((int)REORDER_BUFFER_CHANGE_INTERNAL_INSERT == (int)REORDER_BUFFER_CHANGE_INSERT, "out of sync enums");
+	StaticAssertExpr((int)REORDER_BUFFER_CHANGE_INTERNAL_UPDATE == (int)REORDER_BUFFER_CHANGE_UPDATE, "out of sync enums");
+	StaticAssertExpr((int)REORDER_BUFFER_CHANGE_INTERNAL_DELETE == (int)REORDER_BUFFER_CHANGE_DELETE, "out of sync enums");
+
+	if (!cache)
+		elog(ERROR, "Could not allocate the ReorderBuffer");
+
+	cache->build_snapshots = true;
+
+	memset(&hash_ctl, 0, sizeof(hash_ctl));
+
+	cache->context = AllocSetContextCreate(TopMemoryContext,
+	                                       "ReorderBuffer",
+	                                       ALLOCSET_DEFAULT_MINSIZE,
+	                                       ALLOCSET_DEFAULT_INITSIZE,
+	                                       ALLOCSET_DEFAULT_MAXSIZE);
+
+	hash_ctl.keysize = sizeof(TransactionId);
+	hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
+	hash_ctl.hash = tag_hash;
+	hash_ctl.hcxt = cache->context;
+
+	cache->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
+	                            HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
+
+	cache->nr_cached_transactions = 0;
+	cache->nr_cached_changes = 0;
+	cache->nr_cached_tuplebufs = 0;
+
+	dlist_init(&cache->cached_transactions);
+	dlist_init(&cache->cached_changes);
+	slist_init(&cache->cached_tuplebufs);
+
+	return cache;
+}
+
+/*
+ * Free a ReorderBuffer
+ */
+void
+ReorderBufferFree(ReorderBuffer *cache)
+{
+	/* FIXME: check for in-progress transactions */
+	/* FIXME: clean up cached transaction */
+	/* FIXME: clean up cached changes */
+	/* FIXME: clean up cached tuplebufs */
+	hash_destroy(cache->by_txn);
+	free(cache);
+}
+
+/*
+ * Get a unused, possibly preallocated, ReorderBufferTXN.
+ */
+static ReorderBufferTXN *
+ReorderBufferGetTXN(ReorderBuffer *cache)
+{
+	ReorderBufferTXN *txn;
+
+	if (cache->nr_cached_transactions)
+	{
+		cache->nr_cached_transactions--;
+		txn = dlist_container(ReorderBufferTXN, node,
+		                      dlist_pop_head_node(&cache->cached_transactions));
+	}
+	else
+	{
+		txn = (ReorderBufferTXN *)
+			malloc(sizeof(ReorderBufferTXN));
+
+		if (!txn)
+			elog(ERROR, "Could not allocate a ReorderBufferTXN struct");
+	}
+
+	memset(txn, 0, sizeof(ReorderBufferTXN));
+
+	dlist_init(&txn->changes);
+	dlist_init(&txn->tuplecids);
+	dlist_init(&txn->subtxns);
+
+	return txn;
+}
+
+/*
+ * Free an ReorderBufferTXN. Deallocation might be delayed for efficiency
+ * purposes.
+ */
+void
+ReorderBufferReturnTXN(ReorderBuffer *cache, ReorderBufferTXN *txn)
+{
+	if (txn->tuplecid_hash != NULL)
+	{
+		hash_destroy(txn->tuplecid_hash);
+		txn->tuplecid_hash = NULL;
+	}
+
+	if (txn->invalidations)
+	{
+		free(txn->invalidations);
+		txn->invalidations = NULL;
+	}
+
+
+	if (cache->nr_cached_transactions < max_cached_transactions)
+	{
+		cache->nr_cached_transactions++;
+		dlist_push_head(&cache->cached_transactions, &txn->node);
+	}
+	else
+	{
+		free(txn);
+	}
+}
+
+/*
+ * Get a unused, possibly preallocated, ReorderBufferChange.
+ */
+ReorderBufferChange *
+ReorderBufferGetChange(ReorderBuffer *cache)
+{
+	ReorderBufferChange *change;
+
+	if (cache->nr_cached_changes)
+	{
+		cache->nr_cached_changes--;
+		change = dlist_container(ReorderBufferChange, node,
+								 dlist_pop_head_node(&cache->cached_changes));
+	}
+	else
+	{
+		change = (ReorderBufferChange *) malloc(sizeof(ReorderBufferChange));
+
+		if (!change)
+			elog(ERROR, "Could not allocate a ReorderBufferChange struct");
+	}
+
+
+	memset(change, 0, sizeof(ReorderBufferChange));
+	return change;
+}
+
+/*
+ * Free an ReorderBufferChange. Deallocation might be delayed for efficiency
+ * purposes.
+ */
+void
+ReorderBufferReturnChange(ReorderBuffer *cache, ReorderBufferChange *change)
+{
+	switch ((enum ReorderBufferChangeTypeInternal)change->action_internal)
+	{
+		case REORDER_BUFFER_CHANGE_INTERNAL_INSERT:
+		case REORDER_BUFFER_CHANGE_INTERNAL_UPDATE:
+		case REORDER_BUFFER_CHANGE_INTERNAL_DELETE:
+			if (change->newtuple)
+			{
+				ReorderBufferReturnTupleBuf(cache, change->newtuple);
+				change->newtuple = NULL;
+			}
+
+			if (change->oldtuple)
+			{
+				ReorderBufferReturnTupleBuf(cache, change->oldtuple);
+				change->oldtuple = NULL;
+			}
+			break;
+		case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
+			if (change->snapshot)
+			{
+				SnapBuildSnapDecRefcount(change->snapshot);
+				change->snapshot = NULL;
+			}
+		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
+			break;
+		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
+			break;
+	}
+
+	if (cache->nr_cached_changes < max_cached_changes)
+	{
+		cache->nr_cached_changes++;
+		dlist_push_head(&cache->cached_changes, &change->node);
+	}
+	else
+	{
+		free(change);
+	}
+}
+
+
+/*
+ * Get a unused, possibly preallocated, ReorderBufferTupleBuf
+ */
+ReorderBufferTupleBuf *
+ReorderBufferGetTupleBuf(ReorderBuffer *cache)
+{
+	ReorderBufferTupleBuf *tuple;
+
+	if (cache->nr_cached_tuplebufs)
+	{
+		cache->nr_cached_tuplebufs--;
+		tuple = slist_container(ReorderBufferTupleBuf, node,
+		                        slist_pop_head_node(&cache->cached_tuplebufs));
+	}
+	else
+	{
+		tuple =
+			(ReorderBufferTupleBuf *) malloc(sizeof(ReorderBufferTupleBuf));
+
+		if (!tuple)
+			elog(ERROR, "Could not allocate a ReorderBufferTupleBuf struct");
+	}
+
+	return tuple;
+}
+
+/*
+ * Free an ReorderBufferTupleBuf. Deallocation might be delayed for efficiency
+ * purposes.
+ */
+void
+ReorderBufferReturnTupleBuf(ReorderBuffer *cache, ReorderBufferTupleBuf *tuple)
+{
+	if (cache->nr_cached_tuplebufs < max_cached_tuplebufs)
+	{
+		cache->nr_cached_tuplebufs++;
+		slist_push_head(&cache->cached_tuplebufs, &tuple->node);
+	}
+	else
+	{
+		free(tuple);
+	}
+}
+
+/*
+ * Access the transactions being processed via xid. Optionally create a new
+ * entry.
+ */
+static
+ReorderBufferTXN *
+ReorderBufferTXNByXid(ReorderBuffer *cache, TransactionId xid, bool create, bool *is_new)
+{
+	ReorderBufferTXNByIdEnt *ent;
+	bool found;
+
+	/* FIXME: add one entry fast-path cache */
+
+	ent = (ReorderBufferTXNByIdEnt *)
+		hash_search(cache->by_txn,
+		            (void *)&xid,
+		            (create ? HASH_ENTER : HASH_FIND),
+		            &found);
+
+	if (found)
+	{
+#ifdef VERBOSE_DEBUG
+		elog(LOG, "found cache entry for %u at %p", xid, ent);
+#endif
+	}
+	else
+	{
+#ifdef VERBOSE_DEBUG
+		elog(LOG, "didn't find cache entry for %u in %p at %p, creating %u",
+		     xid, cache, ent, create);
+#endif
+	}
+
+	if (!found && !create)
+		return NULL;
+
+	if (!found)
+	{
+		ent->txn = ReorderBufferGetTXN(cache);
+		ent->txn->xid = xid;
+	}
+
+	if (is_new)
+		*is_new = !found;
+
+	return ent->txn;
+}
+
+/*
+ * Queue a change into a transaction so it can be replayed uppon commit.
+ */
+void
+ReorderBufferAddChange(ReorderBuffer *cache, TransactionId xid, XLogRecPtr lsn,
+                    ReorderBufferChange *change)
+{
+	ReorderBufferTXN *txn = ReorderBufferTXNByXid(cache, xid, true, NULL);
+	txn->lsn = lsn;
+	dlist_push_tail(&txn->changes, &change->node);
+}
+
+
+/*
+ * Associate a subtransaction with its toplevel transaction.
+ */
+void
+ReorderBufferCommitChild(ReorderBuffer *cache, TransactionId xid,
+						 TransactionId subxid, XLogRecPtr lsn)
+{
+	ReorderBufferTXN *txn;
+	ReorderBufferTXN *subtxn;
+
+	subtxn = ReorderBufferTXNByXid(cache, subxid, false, NULL);
+
+	/*
+	 * No need to do anything if that subtxn didn't contain any changes
+	 */
+	if (!subtxn)
+		return;
+
+	subtxn->lsn = lsn;
+
+	txn = ReorderBufferTXNByXid(cache, xid, true, NULL);
+
+	dlist_push_tail(&txn->subtxns, &subtxn->node);
+	txn->nsubtxns++;
+}
+
+
+/*
+ * Support for efficiently iterating over a transaction's and its
+ * subtransactions' changes.
+ *
+ * We do by doing aa k-way merge between transactions/subtransactions. For that
+ * we model the current heads of the different transactions as a binary heap so
+ * we easily know which (sub-)transaction has the change with the smalles lsn
+ * next.
+ * Luckily the changes in individual transactions are already sorted by LSN.
+ */
+
+/*
+ * Binary heap comparison function.
+ */
+static int
+ReorderBufferIterCompare(simpleheap_kv *a, simpleheap_kv *b)
+{
+	ReorderBufferChange *change_a = dlist_container(ReorderBufferChange, node,
+													(dlist_node*)a->key);
+	ReorderBufferChange *change_b = dlist_container(ReorderBufferChange, node,
+													(dlist_node*)b->key);
+
+	if (change_a->lsn < change_b->lsn)
+		return -1;
+
+	else if (change_a->lsn == change_b->lsn)
+		return 0;
+
+	return 1;
+}
+
+/*
+ * Initialize an iterator which iterates in lsn order over a transaction and
+ * all its subtransactions.
+ */
+static ReorderBufferIterTXNState *
+ReorderBufferIterTXNInit(ReorderBuffer *cache, ReorderBufferTXN *txn)
+{
+	size_t nr_txns = 0; /* main txn */
+	ReorderBufferIterTXNState *state;
+	dlist_iter cur_txn_i;
+	ReorderBufferTXN *cur_txn;
+	ReorderBufferChange *cur_change;
+
+	if (!dlist_is_empty(&txn->changes))
+		nr_txns++;
+
+	/* count how large our heap must be */
+	dlist_foreach(cur_txn_i, &txn->subtxns)
+	{
+		cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
+
+		if (!dlist_is_empty(&cur_txn->changes))
+			nr_txns++;
+	}
+
+	/*
+	 * FIXME: Add fastpath for the rather common nr_txns=1 case, no need to
+	 * allocate/build a heap in that case.
+	 */
+
+	/* allocate iteration state */
+	state = calloc(1, sizeof(ReorderBufferIterTXNState));
+
+	/* allocate heap */
+	state->heap = simpleheap_allocate(nr_txns);
+	state->heap->compare = ReorderBufferIterCompare;
+
+	/*
+	 * fill array with elements, heap condition not yet fullfilled. Properly
+	 * building the heap afterwards is more efficient.
+	 */
+
+	/* add toplevel transaction if it contains changes*/
+	if (!dlist_is_empty(&txn->changes))
+	{
+		cur_change = dlist_head_element(ReorderBufferChange, node, &txn->changes);
+
+		simpleheap_add_unordered(state->heap, &cur_change->node, txn);
+	}
+
+	/* add subtransactions if they contain changes */
+	dlist_foreach(cur_txn_i, &txn->subtxns)
+	{
+		cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
+
+		if (!dlist_is_empty(&cur_txn->changes))
+		{
+			cur_change = dlist_head_element(ReorderBufferChange, node,
+											&cur_txn->changes);
+
+			simpleheap_add_unordered(state->heap, &cur_change->node, txn);
+		}
+	}
+
+	/* make the array fullfill the heap property */
+	simpleheap_build(state->heap);
+	return state;
+}
+
+/*
+ * Return the next change when iterating over a transaction and its
+ * subtransaction.
+ */
+static ReorderBufferChange *
+ReorderBufferIterTXNNext(ReorderBuffer *cache, ReorderBufferIterTXNState *state)
+{
+	ReorderBufferTXN *txn = NULL;
+	ReorderBufferChange *change;
+	simpleheap_kv *kv;
+
+	/* nothing there anymore */
+	if (state->heap->size == 0)
+		return NULL;
+
+	kv = simpleheap_first(state->heap);
+
+	change = dlist_container(ReorderBufferChange, node, (dlist_node*)kv->key);
+
+	txn = (ReorderBufferTXN *) kv->value;
+
+	if (!dlist_has_next(&txn->changes, &change->node))
+	{
+		simpleheap_remove_first(state->heap);
+	}
+	else
+	{
+		simpleheap_change_key(state->heap, change->node.next);
+	}
+	return change;
+}
+
+/*
+ * Deallocate the iterator
+ */
+static void
+ReorderBufferIterTXNFinish(ReorderBuffer *cache, ReorderBufferIterTXNState *state)
+{
+	simpleheap_free(state->heap);
+	free(state);
+}
+
+
+/*
+ * Cleanup the contents of a transaction, usually after the transaction
+ * committed or aborted.
+ */
+static void
+ReorderBufferCleanupTXN(ReorderBuffer *cache, ReorderBufferTXN *txn)
+{
+	bool found;
+	dlist_mutable_iter cur_change;
+	dlist_mutable_iter cur_txn;
+
+	/* cleanup subtransactions & their changes */
+	dlist_foreach_modify(cur_txn, &txn->subtxns)
+	{
+		ReorderBufferTXN *subtxn = dlist_container(ReorderBufferTXN, node, cur_txn.cur);
+
+		dlist_foreach_modify(cur_change, &subtxn->changes)
+		{
+			ReorderBufferChange *change =
+				dlist_container(ReorderBufferChange, node, cur_change.cur);
+
+			ReorderBufferReturnChange(cache, change);
+		}
+		ReorderBufferReturnTXN(cache, subtxn);
+	}
+
+	/* cleanup changes in the toplevel txn */
+	dlist_foreach_modify(cur_change, &txn->changes)
+	{
+		ReorderBufferChange *change =
+			dlist_container(ReorderBufferChange, node, cur_change.cur);
+
+		ReorderBufferReturnChange(cache, change);
+	}
+
+	/*
+	 * cleanup the tuplecids we stored timetravel access. They are always
+	 * stored in the toplevel transaction.
+	 */
+	dlist_foreach_modify(cur_change, &txn->tuplecids)
+	{
+		ReorderBufferChange *change =
+			dlist_container(ReorderBufferChange, node, cur_change.cur);
+		Assert(change->action_internal == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
+		ReorderBufferReturnChange(cache, change);
+	}
+
+	/* now remove reference from cache */
+	hash_search(cache->by_txn,
+	            (void *)&txn->xid,
+	            HASH_REMOVE,
+	            &found);
+	Assert(found);
+
+	ReorderBufferReturnTXN(cache, txn);
+}
+
+/*
+ * Build a hash with a (relfilenode, itempoint) -> (cmin, cmax) mapping for use
+ * by tqual.c's HeapTupleSatisfiesMVCCDuringDecoding.
+ */
+static void
+ReorderBufferBuildTupleCidHash(ReorderBuffer *cache, ReorderBufferTXN *txn)
+{
+	dlist_iter cur_change;
+	HASHCTL hash_ctl;
+
+	if (!txn->does_timetravel || dlist_is_empty(&txn->tuplecids))
+		return;
+
+	memset(&hash_ctl, 0, sizeof(hash_ctl));
+
+	hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
+	hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
+	hash_ctl.hash = tag_hash;
+	hash_ctl.hcxt = cache->context;
+
+	/*
+	 * create the hash with the exact number of to-be-stored tuplecids from the
+	 * start
+	 */
+	txn->tuplecid_hash =
+		hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
+					HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
+
+	dlist_foreach(cur_change, &txn->tuplecids)
+	{
+		ReorderBufferTupleCidKey key;
+		ReorderBufferTupleCidEnt *ent;
+		bool found;
+		ReorderBufferChange *change =
+			dlist_container(ReorderBufferChange, node, cur_change.cur);
+
+		Assert(change->action_internal == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
+
+		/* be careful about padding */
+		memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
+
+		key.relnode = change->tuplecid.node;
+
+		ItemPointerCopy(&change->tuplecid.tid,
+						&key.tid);
+
+		ent = (ReorderBufferTupleCidEnt *)
+			hash_search(txn->tuplecid_hash,
+						(void *)&key,
+						HASH_ENTER|HASH_FIND,
+						&found);
+		if (!found)
+		{
+			ent->cmin = change->tuplecid.cmin;
+			ent->cmax = change->tuplecid.cmax;
+			ent->combocid = change->tuplecid.combocid;
+		}
+		else
+		{
+			Assert(ent->cmin == change->tuplecid.cmin);
+			Assert(ent->cmax == InvalidCommandId ||
+				   ent->cmax == change->tuplecid.cmax);
+			/*
+			 * if the tuple got valid in this transaction and now got deleted
+			 * we already have a valid cmin stored. The cmax will be
+			 * InvalidCommandId though.
+			 */
+			ent->cmax = change->tuplecid.cmax;
+		}
+	}
+}
+
+/*
+ * Copy a provided snapshot so we can modify it privately. This is needed so
+ * that catalog modifying transactions can look into intermediate catalog
+ * states.
+ */
+static Snapshot
+ReorderBufferCopySnap(ReorderBuffer *cache, Snapshot orig_snap,
+					  ReorderBufferTXN *txn, CommandId cid)
+{
+	Snapshot snap;
+	dlist_iter sub_txn_i;
+	ReorderBufferTXN *sub_txn;
+	int i = 0;
+	Size size = sizeof(SnapshotData) +
+		sizeof(TransactionId) * orig_snap->xcnt +
+		sizeof(TransactionId) * (txn->nsubtxns + 1)
+		;
+
+	elog(DEBUG1, "copying a non-transaction-specific snapshot into timetravel tx %u", txn->xid);
+
+	/* we only want to start with snapshots as provided by snapbuild.c */
+	Assert(!orig_snap->subxip);
+	Assert(!orig_snap->copied);
+
+	snap = calloc(1, size);
+	memcpy(snap, orig_snap, sizeof(SnapshotData));
+
+	snap->copied = true;
+	snap->active_count = 0;
+	snap->regd_count = 0;
+	snap->xip = (TransactionId *) (snap + 1);
+
+	memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
+
+	/*
+	 * ->subxip contains all txids that belong to our transaction which we need
+	 * to check via cmin/cmax. Thats why we store the toplevel transaction in
+	 * there as well.
+	 */
+	snap->subxip = snap->xip + snap->xcnt;
+	snap->subxip[i++] = txn->xid;
+	snap->subxcnt = txn->nsubtxns + 1;
+
+	dlist_foreach(sub_txn_i, &txn->subtxns)
+	{
+		sub_txn = dlist_container(ReorderBufferTXN, node, sub_txn_i.cur);
+		snap->subxip[i++] = sub_txn->xid;
+	}
+
+	/* bsearch()ability */
+	qsort(snap->subxip, snap->subxcnt,
+		  sizeof(TransactionId), xidComparator);
+
+	/*
+	 * store the specified current CommandId
+	 */
+	snap->curcid = cid;
+
+	return snap;
+}
+
+/*
+ * Free a previously ReorderBufferCopySnap'ed snapshot
+ */
+static void
+ReorderBufferFreeSnap(ReorderBuffer *cache, Snapshot snap)
+{
+	Assert(snap->copied);
+	free(snap);
+}
+
+/*
+ * Commit a transaction and replay all actions that previously have been
+ * ReorderBufferAddChange'd in the toplevel TX or any of the subtransactions
+ * assigned via ReorderBufferCommitChild.
+ */
+void
+ReorderBufferCommit(ReorderBuffer *cache, TransactionId xid, XLogRecPtr lsn)
+{
+	ReorderBufferTXN *txn = ReorderBufferTXNByXid(cache, xid, false, NULL);
+	ReorderBufferIterTXNState *iterstate = NULL;
+	ReorderBufferChange *change;
+	CommandId command_id = FirstCommandId;
+	Snapshot snapshot_mvcc = NULL;
+	Snapshot snapshot_now = NULL;
+	bool snapshot_copied = false;
+
+	if (!txn)
+		return;
+
+	txn->lsn = lsn;
+
+	cache->begin(cache, txn);
+
+	PG_TRY();
+	{
+		ReorderBufferBuildTupleCidHash(cache, txn);
+
+		iterstate = ReorderBufferIterTXNInit(cache, txn);
+		while ((change = ReorderBufferIterTXNNext(cache, iterstate)))
+		{
+			switch ((enum ReorderBufferChangeTypeInternal)change->action_internal)
+			{
+				case REORDER_BUFFER_CHANGE_INTERNAL_INSERT:
+				case REORDER_BUFFER_CHANGE_INTERNAL_UPDATE:
+				case REORDER_BUFFER_CHANGE_INTERNAL_DELETE:
+					Assert(snapshot_mvcc != NULL);
+					if (!SnapBuildHasCatalogChanges(NULL, xid, &change->relnode))
+						cache->apply_change(cache, txn, change);
+					break;
+				case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
+					/* XXX: we could skip snapshots in non toplevel txns */
+					if (snapshot_copied)
+					{
+						ReorderBufferFreeSnap(cache, snapshot_now);
+						snapshot_now = ReorderBufferCopySnap(cache, change->snapshot,
+															 txn, command_id);
+					}
+					else
+					{
+						snapshot_now = change->snapshot;
+					}
+
+					/*
+					 * the first snapshot seen in a transaction is its mvcc
+					 * snapshot
+					 */
+					if (!snapshot_mvcc)
+						snapshot_mvcc = snapshot_now;
+					else
+						RevertFromDecodingSnapshots();
+
+					SetupDecodingSnapshots(snapshot_now, txn->tuplecid_hash);
+					break;
+
+				case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
+					if (!snapshot_copied && snapshot_now)
+					{
+						/* we don't use the global one anymore */
+						snapshot_copied = true;
+						snapshot_now = ReorderBufferCopySnap(cache, snapshot_now,
+														  txn, command_id);
+					}
+
+					command_id = Max(command_id, change->command_id);
+
+					if (snapshot_now && command_id != InvalidCommandId)
+					{
+						snapshot_now->curcid = command_id;
+
+						RevertFromDecodingSnapshots();
+						SetupDecodingSnapshots(snapshot_now, txn->tuplecid_hash);
+					}
+
+					/*
+					 * everytime the CommandId is incremented, we could see new
+					 * catalog contents
+					 */
+					ReorderBufferProcessInvalidations(cache, txn);
+
+					break;
+
+				case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
+					elog(ERROR, "tuplecid value in normal queue");
+					break;
+			}
+		}
+
+		ReorderBufferIterTXNFinish(cache, iterstate);
+
+		cache->commit(cache, txn, lsn);
+
+		RevertFromDecodingSnapshots();
+		ReorderBufferProcessInvalidations(cache, txn);
+
+		ReorderBufferCleanupTXN(cache, txn);
+
+
+		if (snapshot_copied)
+		{
+			ReorderBufferFreeSnap(cache, snapshot_now);
+		}
+	}
+	PG_CATCH();
+	{
+		if (iterstate)
+			ReorderBufferIterTXNFinish(cache, iterstate);
+
+		/*
+		 * XXX: do we want to do this here?
+		 * ReorderBufferCleanupTXN(cache, txn);
+		 */
+
+		RevertFromDecodingSnapshots();
+		ReorderBufferProcessInvalidations(cache, txn);
+
+		if (snapshot_copied)
+		{
+			ReorderBufferFreeSnap(cache, snapshot_now);
+		}
+		PG_RE_THROW();
+	}
+	PG_END_TRY();
+}
+
+/*
+ * Abort a transaction that possibly has previous changes. Needs to be done
+ * independently for toplevel and subtransactions.
+ */
+void
+ReorderBufferAbort(ReorderBuffer *cache, TransactionId xid, XLogRecPtr lsn)
+{
+	ReorderBufferTXN *txn = ReorderBufferTXNByXid(cache, xid, false, NULL);
+
+	/* no changes in this commit */
+	if (!txn)
+		return;
+
+	ReorderBufferCleanupTXN(cache, txn);
+}
+
+/*
+ * Check whether a transaction is already known in this module
+ */
+bool
+ReorderBufferIsXidKnown(ReorderBuffer *cache, TransactionId xid)
+{
+	bool is_new;
+	/*
+	 * FIXME: for efficiency reasons we create the xid here, that doesn't seem
+	 * like a good idea though
+	 */
+	ReorderBufferTXNByXid(cache, xid, true, &is_new);
+
+	/* no changes in this commit */
+	return !is_new;
+}
+
+/*
+ * Add a new snapshot to this transaction which is the "base" of snapshots we
+ * modify if this is a catalog modifying transaction.
+ */
+void
+ReorderBufferAddBaseSnapshot(ReorderBuffer *cache, TransactionId xid,
+							 XLogRecPtr lsn, Snapshot snap)
+{
+	ReorderBufferTXN *txn = ReorderBufferTXNByXid(cache, xid, true, NULL);
+	ReorderBufferChange *change = ReorderBufferGetChange(cache);
+
+	change->snapshot = snap;
+	change->action_internal = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT;
+
+	if (lsn == InvalidXLogRecPtr)
+		txn->has_base_snapshot = true;
+
+	ReorderBufferAddChange(cache, xid, lsn, change);
+}
+
+/*
+ * Access the catalog with this CommandId at this point in the changestream.
+ */
+void
+ReorderBufferAddNewCommandId(ReorderBuffer *cache, TransactionId xid,
+							 XLogRecPtr lsn, CommandId cid)
+{
+	ReorderBufferChange *change = ReorderBufferGetChange(cache);
+
+	change->command_id = cid;
+	change->action_internal = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID;
+
+	ReorderBufferAddChange(cache, xid, lsn, change);
+}
+
+
+/*
+ * Add new (relfilenode, tid) -> (cmin, cmax) mappings.
+ */
+void
+ReorderBufferAddNewTupleCids(ReorderBuffer *cache, TransactionId xid, XLogRecPtr lsn,
+							 RelFileNode node, ItemPointerData tid,
+							 CommandId cmin, CommandId cmax, CommandId combocid)
+{
+	ReorderBufferChange *change = ReorderBufferGetChange(cache);
+	ReorderBufferTXN *txn = ReorderBufferTXNByXid(cache, xid, true, NULL);
+
+	change->tuplecid.node = node;
+	change->tuplecid.tid = tid;
+	change->tuplecid.cmin = cmin;
+	change->tuplecid.cmax = cmax;
+	change->tuplecid.combocid = combocid;
+	change->lsn = lsn;
+	change->action_internal = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID;
+	dlist_push_tail(&txn->tuplecids, &change->node);
+	txn->ntuplecids++;
+}
+
+/*
+ * Setup the invalidation of the toplevel transaction.
+ *
+ * This needs to be done before ReorderBufferCommit is called!
+ */
+void
+ReorderBufferAddInvalidations(ReorderBuffer *cache, TransactionId xid, XLogRecPtr lsn,
+                                Size nmsgs, SharedInvalidationMessage* msgs)
+{
+	ReorderBufferTXN *txn = ReorderBufferTXNByXid(cache, xid, true, NULL);
+
+	if (txn->ninvalidations)
+		elog(ERROR, "only ever add one set of invalidations");
+	/* FIXME: free */
+	txn->invalidations = malloc(sizeof(SharedInvalidationMessage) * nmsgs);
+
+	if (!txn->invalidations)
+		elog(ERROR, "could not allocate memory for invalidations");
+
+	memcpy(txn->invalidations, msgs, sizeof(SharedInvalidationMessage) * nmsgs);
+	txn->ninvalidations = nmsgs;
+}
+
+/*
+ * Apply all invalidations we know. Possibly we only need parts at this point
+ * in the changestream but we don't know which those are.
+ */
+static void
+ReorderBufferProcessInvalidations(ReorderBuffer *cache, ReorderBufferTXN *txn)
+{
+	int i;
+	for (i = 0; i < txn->ninvalidations; i++)
+	{
+		LocalExecuteInvalidationMessage(&txn->invalidations[i]);
+	}
+}
+
+/*
+ * Mark a transaction as doing timetravel.
+ */
+void
+ReorderBufferXidSetTimetravel(ReorderBuffer *cache, TransactionId xid)
+{
+	ReorderBufferTXN *txn = ReorderBufferTXNByXid(cache, xid, true, NULL);
+	txn->does_timetravel = true;
+}
+
+/*
+ * Query whether a transaction is *known* to be doing timetravel. This can be
+ * wrong until directly before the commit!
+ */
+bool
+ReorderBufferXidDoesTimetravel(ReorderBuffer *cache, TransactionId xid)
+{
+	ReorderBufferTXN *txn = ReorderBufferTXNByXid(cache, xid, true, NULL);
+	return txn->does_timetravel;
+}
+
+/*
+ * Have we already added the first snapshot?
+ */
+bool
+ReorderBufferXidHasBaseSnapshot(ReorderBuffer *cache, TransactionId xid)
+{
+	ReorderBufferTXN *txn = ReorderBufferTXNByXid(cache, xid, true, NULL);
+	return txn->has_base_snapshot;
+}
+
+/*
+ * Visibility support routines
+ */
+
+/*-------------------------------------------------------------------------
+ * Lookup actual cmin/cmax values during timetravel access. We can't always
+ * rely on stored cmin/cmax values because of two scenarios:
+ *
+ * * A tuple got changed multiple times during a single transaction and thus
+ *   has got a combocid. Combocid's are only valid for the duration of a single
+ *   transaction.
+ * * A tuple with a cmin but no cmax (and thus no combocid) got deleted/updated
+ *   in another transaction than the one which created it which we are looking
+ *   at right now. As only one of cmin, cmax or combocid is actually stored in
+ *   the heap we don't have access to the the value we need anymore.
+ *
+ * To resolve those problems we have a per-transaction hash of (cmin, cmax)
+ * tuples keyed by (relfilenode, ctid) which contains the actual (cmin, cmax)
+ * values. That also takes care of combocids by simply not caring about them at
+ * all. As we have the real cmin/cmax values thats enough.
+ *
+ * As we only care about catalog tuples here the overhead of this hashtable
+ * should be acceptable.
+ * -------------------------------------------------------------------------
+ */
+extern bool
+ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data,
+							  HeapTuple htup, Buffer buffer,
+							  CommandId *cmin, CommandId *cmax)
+{
+	ReorderBufferTupleCidKey key;
+	ReorderBufferTupleCidEnt* ent;
+	ForkNumber forkno;
+	BlockNumber blockno;
+
+	/* be careful about padding */
+	memset(&key, 0, sizeof(key));
+
+	Assert(!BufferIsLocal(buffer));
+
+	/*
+	 * get relfilenode from the buffer, no convenient way to access it other
+	 * than that.
+	 */
+	BufferGetTag(buffer, &key.relnode, &forkno, &blockno);
+
+	/* tuples can only be in the main fork */
+	Assert(forkno == MAIN_FORKNUM);
+	Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
+
+	ItemPointerCopy(&htup->t_self,
+					&key.tid);
+
+	ent = (ReorderBufferTupleCidEnt *)
+		hash_search(tuplecid_data,
+					(void *)&key,
+					HASH_FIND,
+					NULL);
+
+	if (ent == NULL)
+		return false;
+
+	if (cmin)
+		*cmin = ent->cmin;
+	if (cmax)
+		*cmax = ent->cmax;
+	return true;
+}
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
new file mode 100644
index 0000000..df24b33
--- /dev/null
+++ b/src/backend/replication/logical/snapbuild.c
@@ -0,0 +1,1144 @@
+/*-------------------------------------------------------------------------
+ *
+ * snapbuild.c
+ *
+ *	  Support for building timetravel snapshots based on the contents of the
+ *	  wal
+ *
+ * NOTES:
+ *
+ * We build snapshots which can *only* be used to read catalog contents by
+ * reading the wal stream. The aim is to provide mvcc and SnapshotNow snapshots
+ * that behave the same as their respective counterparts would have at the time
+ * the XLogRecord was generated. This is done to provide a reliable environment
+ * for decoding those records into every format that pleases the author of an
+ * output plugin.
+ *
+ * To build the snapshots we reuse the infrastructure built for hot
+ * standby. The snapshots we build look different than HS' because we have
+ * different needs. To successfully decode data from the WAL we only need to
+ * access catalogs/(sys|rel|cat)cache, not the actual user tables. And we need
+ * to build multiple, vastly different, ones, without being able to fully rely
+ * on the clog for information about committed transactions because they might
+ * commit in the future from the POV of the wal entry were currently decoding.
+
+ * As the percentage of transactions modifying the catalog normally is fairly
+ * small, instead of keeping track of all running transactions and treating
+ * everything inside (xmin, xmax) thats not known to be running as commited we
+ * do the contrary. That is we keep a list of transactions between
+ * snapshot->(xmin, xmax) that we consider committed, everything else is
+ * considered aborted/in progress.
+ * That also allows us not to care about subtransactions before they have
+ * committed.
+ *
+ * Classic SnapshotNow behaviour - which is mainly used for efficiency, not for
+ * correctness - is not actually required by any of the routines that we need
+ * during decoding and is hard to emulate fully. Instead we build snapshots
+ * with MVCC behaviour that are updated whenever another transaction commits.
+ *
+ * One additional complexity of doing this is that to handle mixed DDL/DML
+ * transactions we need Snapshots that see intermediate states in a
+ * transaction. In normal operation this is achieved by using
+ * CommandIds/cmin/cmax. The problem with this however is that for space
+ * efficiency reasons only one value of that is stored (cf. combocid.c). To get
+ * arround that we log additional information which allows us to get the
+ * original (cmin, cmax) pair during visibility checks.
+ *
+ * To facilitate all this we need our own visibility routine, as the normal
+ * ones are optimized for different usecases. We also need the code to use out
+ * special snapshots automatically whenever SnapshotNow behaviour is expected
+ * (specifying our snapshot everywhere would be far to invasive).
+ *
+ * To replace the normal SnapshotNows snapshots use the SetupDecodingSnapshots
+ * RevertFromDecodingSnapshots functions.
+ *
+ * Portions Copyright (c) 2012, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/replication/snapbuild.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/heapam_xlog.h"
+#include "access/rmgr.h"
+#include "access/transam.h"
+#include "access/xact.h"
+#include "access/xlogreader.h"
+
+#include "catalog/catalog.h"
+#include "catalog/pg_control.h"
+#include "catalog/pg_class.h"
+#include "catalog/pg_tablespace.h"
+
+#include "miscadmin.h"
+
+#include "replication/reorderbuffer.h"
+#include "replication/snapbuild.h"
+#include "replication/walsender_private.h"
+
+#include "utils/builtins.h"
+#include "utils/catcache.h"
+#include "utils/inval.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/relmapper.h"
+#include "utils/snapshot.h"
+#include "utils/syscache.h"
+#include "utils/tqual.h"
+
+#include "storage/block.h" /* debugging output */
+#include "storage/standby.h"
+#include "storage/sinval.h"
+
+/* transaction state manipulation functions */
+static void SnapBuildEndTxn(Snapstate *snapstate, TransactionId xid);
+
+static void SnapBuildAbortTxn(Snapstate *state, TransactionId xid, int nsubxacts,
+							  TransactionId *subxacts);
+
+static void SnapBuildCommitTxn(Snapstate *snapstate, ReorderBuffer *reorder,
+							   XLogRecPtr lsn, TransactionId xid,
+							   int nsubxacts, TransactionId *subxacts);
+
+/* ->running manipulation */
+static bool SnapBuildTxnIsRunning(Snapstate *snapstate, TransactionId xid);
+
+/* ->committed manipulation */
+static void SnapBuildPurgeCommittedTxn(Snapstate *snapstate);
+
+/* snapshot building/manipulation/distribution functions */
+/* XXX */
+static Snapshot SnapBuildBuildSnapshot(Snapstate *snapstate, TransactionId xid);
+
+static void	SnapBuildFreeSnapshot(Snapshot snap);
+
+static void SnapBuildSnapIncRefcount(Snapshot snap);
+
+static void SnapBuildDistributeSnapshotNow(Snapstate *snapstate, ReorderBuffer *reorder, XLogRecPtr lsn);
+
+/*
+ * Lookup a table via its current relfilenode.
+ *
+ * This requires that some snapshot in which that relfilenode is actually
+ * visible to be set up.
+ *
+ * The result of this function needs to be released from the syscache.
+ */
+HeapTuple
+LookupTableByRelFileNode(RelFileNode *relfilenode)
+{
+	Oid spc;
+	HeapTuple tuple;
+	Oid heaprel;
+
+	/*
+	 * relations in the default tablespace are stored with a reltablespace = 0
+	 * for some reason.
+	 */
+	spc = relfilenode->spcNode == DEFAULTTABLESPACE_OID ?
+		InvalidOid : relfilenode->spcNode;
+
+	tuple = SearchSysCache2(RELFILENODE,
+							spc,
+							relfilenode->relNode);
+
+	if (!HeapTupleIsValid(tuple))
+	{
+		if (relfilenode->spcNode == GLOBALTABLESPACE_OID)
+		{
+			heaprel = RelationMapFilenodeToOid(relfilenode->relNode, true);
+		}
+		else
+		{
+			heaprel = RelationMapFilenodeToOid(relfilenode->relNode, false);
+		}
+
+		if (heaprel != InvalidOid)
+		{
+			tuple = SearchSysCache1(RELOID,
+									heaprel);
+		}
+	}
+	return tuple;
+}
+
+/*
+ * Does this relation carry catalog information? Important for knowing whether
+ * a transaction made changes to the catalog, in which case it need to be
+ * included in snapshots.
+ *
+ * Requires that an appropriate timetravel snapshot is set up!
+ */
+bool
+SnapBuildHasCatalogChanges(Snapstate *snapstate, TransactionId xid, RelFileNode *relfilenode)
+{
+	HeapTuple table;
+	Form_pg_class class_form;
+	bool ret;
+
+	if (relfilenode->spcNode == GLOBALTABLESPACE_OID)
+		return true;
+
+
+	table = LookupTableByRelFileNode(relfilenode);
+
+	/*
+	 * tables in the default tablespace are stored in pg_class with 0 as their
+	 * reltablespace
+	 */
+	if (!HeapTupleIsValid(table))
+	{
+		elog(FATAL, "failed pg_class lookup for %u:%u",
+			 relfilenode->spcNode, relfilenode->relNode);
+		return false;
+	}
+
+	class_form = (Form_pg_class) GETSTRUCT(table);
+	ret = IsSystemClass(class_form);
+
+	ReleaseSysCache(table);
+	return ret;
+}
+
+/*
+ * Allocate a new snapshot builder.
+ */
+Snapstate *
+AllocateSnapshotBuilder(ReorderBuffer *reorder)
+{
+	Snapstate *snapstate = malloc(sizeof(Snapstate));
+
+	snapstate->state = SNAPBUILD_START;
+
+	snapstate->nrrunning = 0;
+	snapstate->nrrunning_initial = 0;
+	snapstate->running = NULL;
+
+	snapstate->nrcommitted = 0;
+	snapstate->nrcommitted_space = 128; /* arbitrary number */
+	snapstate->committed = malloc(snapstate->nrcommitted_space * sizeof(TransactionId));
+	snapstate->transactions_after = InvalidXLogRecPtr;
+
+	if (!snapstate->committed)
+		elog(ERROR, "could not allocate memory for snapstate->committed");
+
+	snapstate->snapshot = NULL;
+
+	return snapstate;
+}
+
+/*
+ * Freesnapshot builder.
+ */
+void
+FreeSnapshotBuilder(Snapstate *snapstate)
+{
+	if (snapstate->snapshot)
+		SnapBuildFreeSnapshot(snapstate->snapshot);
+
+	if (snapstate->committed)
+		free(snapstate->committed);
+
+	if (snapstate->running)
+		free(snapstate->running);
+
+	free(snapstate);
+}
+
+/*
+ * Free an unreferenced snapshot that has previously been built by us.
+ */
+static void
+SnapBuildFreeSnapshot(Snapshot snap)
+{
+	/* make sure we don't get passed an external snapshot */
+	Assert(snap->satisfies == HeapTupleSatisfiesMVCCDuringDecoding);
+
+	/* make sure nobody modified our snapshot */
+	Assert(snap->curcid == FirstCommandId);
+	Assert(!snap->suboverflowed);
+	Assert(!snap->takenDuringRecovery);
+	Assert(!snap->regd_count);
+
+	/* slightly more likely, so its checked even without casserts */
+	if (snap->copied)
+		elog(ERROR, "we don't deal with copied snapshots here.");
+
+	if (snap->active_count)
+		elog(ERROR, "freeing active snapshot");
+
+	free(snap);
+}
+
+/*
+ * Increase refcount of a snapshot.
+ *
+ * This is used when handing out a snapshot to some external resource or when
+ * adding a Snapshot as snapstate->snapshot.
+ */
+static void
+SnapBuildSnapIncRefcount(Snapshot snap)
+{
+	snap->active_count++;
+}
+
+/*
+ * Decrease refcount of a snapshot and free if the refcount reaches zero.
+ *
+ * Externally visible so external resources that have been handed an IncRef'ed
+ * Snapshot can free it easily.
+ */
+void
+SnapBuildSnapDecRefcount(Snapshot snap)
+{
+	/* make sure we don't get passed an external snapshot */
+	Assert(snap->satisfies == HeapTupleSatisfiesMVCCDuringDecoding);
+
+	/* make sure nobody modified our snapshot */
+	Assert(snap->curcid == FirstCommandId);
+	Assert(!snap->suboverflowed);
+	Assert(!snap->takenDuringRecovery);
+	Assert(!snap->regd_count);
+
+	Assert(snap->active_count);
+
+	/* slightly more likely, so its checked even without casserts */
+	if (snap->copied)
+		elog(ERROR, "we don't deal with copied snapshots here.");
+
+	snap->active_count--;
+	if (!snap->active_count)
+		SnapBuildFreeSnapshot(snap);
+}
+
+/*
+ * Build a new snapshot, based on currently committed, catalog modifying
+ * transactions.
+ *
+ * In-Progress transaction with catalog access are *not* allowed to modify
+ * these snapshots, they have to copy them and fill in appropriate ->curcid and
+ * ->subxip/subxcnt values.
+ */
+static Snapshot
+SnapBuildBuildSnapshot(Snapstate *snapstate, TransactionId xid)
+{
+	Snapshot snapshot = malloc(sizeof(SnapshotData) +
+							   sizeof(TransactionId) * snapstate->nrcommitted +
+							   sizeof(TransactionId) * 1 /* toplevel xid */);
+
+	snapshot->satisfies = HeapTupleSatisfiesMVCCDuringDecoding;
+	/*
+	 * we copy all currently in progress transaction to ->xip, all
+	 * transactions added to the transaction that committed during running -
+	 * which thus need to be considered visible in SnapshotNow semantics - get
+	 * copied to ->subxip.
+	 *
+	 * XXX: Do we want extra fields for those two instead?
+	 */
+	snapshot->xmin = snapstate->xmin;
+	snapshot->xmax = snapstate->xmax;
+
+	/* store all transaction to be treated as committed */
+	snapshot->xip = (TransactionId *) ((char *) snapshot + sizeof(SnapshotData));
+	snapshot->xcnt = snapstate->nrcommitted;
+	memcpy(snapshot->xip, snapstate->committed,
+	       snapstate->nrcommitted * sizeof(TransactionId));
+	/* sort so we can bsearch() */
+	qsort(snapshot->xip, snapshot->xcnt, sizeof(TransactionId), xidComparator);
+
+
+	snapshot->subxcnt = 0;
+	snapshot->subxip = NULL;
+
+	snapshot->suboverflowed = false;
+	snapshot->takenDuringRecovery = false;
+	snapshot->copied = false;
+	snapshot->curcid = FirstCommandId;
+	snapshot->active_count = 0;
+	snapshot->regd_count = 0;
+
+	return snapshot;
+}
+
+/*
+ * Handle the effects of a single heap change, appropriate to the current state
+ * of the snapshot builder.
+ */
+static SnapBuildAction
+SnapBuildProcessChange(ReorderBuffer *reorder, Snapstate *snapstate,
+					   TransactionId xid, XLogRecordBuffer *buf,
+					   RelFileNode *relfilenode)
+{
+	SnapBuildAction ret = SNAPBUILD_SKIP;
+
+	/*
+	 * We can't handle data in transactions if we haven't built a snapshot yet,
+	 * so don't store them.
+	 */
+	if (snapstate->state < SNAPBUILD_FULL_SNAPSHOT)
+		;
+	/*
+	 * No point in keeping track of changes in transactions that we don't have
+	 * enough information about to decode.
+	 */
+	else if (snapstate->state < SNAPBUILD_CONSISTENT &&
+			 SnapBuildTxnIsRunning(snapstate, xid))
+		;
+	else
+	{
+		bool old_tx = ReorderBufferIsXidKnown(reorder, xid);
+
+		ret = SNAPBUILD_DECODE;
+
+		if (!old_tx || !ReorderBufferXidHasBaseSnapshot(reorder, xid))
+		{
+			if (!snapstate->snapshot) {
+				snapstate->snapshot = SnapBuildBuildSnapshot(snapstate, xid);
+				/* refcount of the snapshot builder */
+				SnapBuildSnapIncRefcount(snapstate->snapshot);
+			}
+
+			/* refcount of the transaction */
+			SnapBuildSnapIncRefcount(snapstate->snapshot);
+			ReorderBufferAddBaseSnapshot(reorder, xid,
+									  InvalidXLogRecPtr,
+									  snapstate->snapshot);
+		}
+	}
+
+	return ret;
+}
+
+/*
+ * Process a single xlog record.
+ */
+SnapBuildAction
+SnapBuildDecodeCallback(ReorderBuffer *reorder, Snapstate *snapstate,
+						XLogRecordBuffer *buf)
+{
+	XLogRecord *r = &buf->record;
+	uint8 info = r->xl_info & ~XLR_INFO_MASK;
+	TransactionId xid = buf->record.xl_xid;
+
+	SnapBuildAction ret = SNAPBUILD_SKIP;
+
+#if DEBUG_ME_LOUDLY
+	{
+		StringInfoData s;
+
+		initStringInfo(&s);
+		RmgrTable[r->xl_rmid].rm_desc(&s,
+									  r->xl_info,
+									  buf->record_data);
+
+		/* don't bother emitting empty description */
+		if (s.len > 0)
+			elog(LOG, "xlog redo %u: %s", xid, s.data);
+	}
+#endif
+
+	/*
+	 * Only search for an initial starting point if we haven't build a full
+	 * snapshot yet
+	 */
+	if (snapstate->state <= SNAPBUILD_CONSISTENT)
+	{
+		/*
+		 * Build snapshot incrementally using information about the currently
+		 * running transactions. As soon as all of those have finished
+		 */
+		if (r->xl_rmid == RM_STANDBY_ID &&
+			info == XLOG_RUNNING_XACTS)
+		{
+			xl_running_xacts *running = (xl_running_xacts *) buf->record_data;
+
+			snapstate->state = SNAPBUILD_FULL_SNAPSHOT;
+
+			/*
+			 * inrease shared memory state, so vacuum can work
+			 * on tuples we prevent from being purged.
+			 */
+			IncreaseLogicalXminForSlot(buf->origptr, running->oldestRunningXid);
+
+			if (running->xcnt == 0)
+			{
+				/*
+				 * might have already started to incrementally assemble
+				 * transactions.
+				 */
+				snapstate->transactions_after = buf->origptr;
+
+				snapstate->xmin_running = InvalidTransactionId;
+				snapstate->xmax_running = InvalidTransactionId;
+
+				/*
+				 * FIXME: abort everything we have stored about running
+				 * transactions, relevant e.g. after a crash.
+				 */
+				snapstate->state = SNAPBUILD_CONSISTENT;
+			}
+			/* first encounter of a xl_running_xacts record */
+			else if (!snapstate->nrrunning)
+			{
+				/*
+				 * We only care about toplevel xids as those are the ones we
+				 * definitely see in the wal stream. As snapbuild.c tracks
+				 * committed instead of running transactions we don't need to
+				 * know anything about uncommitted subtransactions.
+				 */
+				snapstate->xmin = running->oldestRunningXid;
+				TransactionIdRetreat(snapstate->xmin);
+				snapstate->xmax = running->latestCompletedXid;
+				TransactionIdAdvance(snapstate->xmax);
+
+				snapstate->nrrunning = running->xcnt;
+				snapstate->nrrunning_initial = running->xcnt;
+
+				snapstate->running = malloc(snapstate->nrrunning
+											* sizeof(TransactionId));
+
+				memcpy(snapstate->running, running->xids,
+					   snapstate->nrrunning_initial * sizeof(TransactionId));
+
+				/* sort so we can do a binary search */
+				qsort(snapstate->running, snapstate->nrrunning_initial,
+					  sizeof(TransactionId), xidComparator);
+
+				snapstate->xmin_running = snapstate->running[0];
+				snapstate->xmax_running = snapstate->running[running->xcnt - 1];
+
+				/* makes comparisons cheaper later */
+				TransactionIdRetreat(snapstate->xmin_running);
+				TransactionIdAdvance(snapstate->xmax_running);
+
+				snapstate->state = SNAPBUILD_FULL_SNAPSHOT;
+			}
+
+			elog(LOG, "found initial snapshot (via running xacts). Done: %i",
+				 snapstate->state == SNAPBUILD_CONSISTENT);
+		}
+		else if (r->xl_rmid == RM_XLOG_ID &&
+				 (info == XLOG_CHECKPOINT_SHUTDOWN || info == XLOG_CHECKPOINT_ONLINE))
+		{
+			/* FIXME: Check whether there is a valid state dumped to disk */
+		}
+	}
+
+	if (snapstate->state == SNAPBUILD_START)
+		return SNAPBUILD_SKIP;
+
+	/*
+	 * This switch is - partially due to PGs indentation rules - rather deep
+	 * and large. Maybe break it into separate functions?
+	 */
+	switch (r->xl_rmid)
+	{
+		case RM_XLOG_ID:
+			{
+				switch (info)
+				{
+					case XLOG_CHECKPOINT_SHUTDOWN:
+#ifdef NOT_YET
+						{
+							/*
+							 * FIXME: abort everything but prepared xacts, we
+							 * don't track prepared xacts though so far.  It
+							 * might be neccesary to do this to handle subtxn
+							 * ids that haven't been assigned to a toplevel xid
+							 * after a crash.
+							 */
+							for ( /* FIXME */ )
+							{
+							}
+						}
+#endif
+					case XLOG_CHECKPOINT_ONLINE:
+						{
+							/*
+							 * FIXME: dump state to disk so we can restart
+							 * from here later
+							 */
+							break;
+						}
+				}
+				break;
+			}
+		case RM_STANDBY_ID:
+			{
+				switch (info)
+				{
+					case XLOG_RUNNING_XACTS:
+						{
+							xl_running_xacts *running =
+								(xl_running_xacts *) buf->record_data;
+
+							/*
+							 * update range of interesting xids. We don't
+							 * increase ->xmax because once we are in a
+							 * consistent state we can do that ourselves and
+							 * much more efficiently so because we only need to
+							 * do it for catalog transactions.
+							 */
+							snapstate->xmin = running->oldestRunningXid;
+							TransactionIdRetreat(snapstate->xmin);
+
+							/*
+							 * Remove transactions we don't need to keep track
+							 * off anymore.
+							 */
+							SnapBuildPurgeCommittedTxn(snapstate);
+
+							/*
+							 * inrease shared memory state, so vacuum can work
+							 * on tuples we prevent from being purged.
+							 */
+							IncreaseLogicalXminForSlot(buf->origptr, running->oldestRunningXid);
+
+							break;
+						}
+					case XLOG_STANDBY_LOCK:
+						break;
+				}
+				break;
+			}
+		case RM_XACT_ID:
+			{
+				switch (info)
+				{
+					case XLOG_XACT_COMMIT:
+						{
+							xl_xact_commit *xlrec =
+								(xl_xact_commit *) buf->record_data;
+
+							ret = SNAPBUILD_DECODE;
+
+							/*
+							 * Queue cache invalidation messages.
+							 */
+							if (xlrec->nmsgs)
+							{
+								TransactionId *subxacts;
+								SharedInvalidationMessage *inval_msgs;
+
+								/* subxid array follows relfilenodes */
+								subxacts = (TransactionId *)
+									&(xlrec->xnodes[xlrec->nrels]);
+								/* invalidation messages follow subxids */
+								inval_msgs = (SharedInvalidationMessage *)
+									&(subxacts[xlrec->nsubxacts]);
+
+								/*
+								 * no need to check
+								 * XactCompletionRelcacheInitFileInval, we will
+								 * process the sinval messages that the
+								 * relmapper change has generated.
+								 */
+								ReorderBufferAddInvalidations(reorder, xid,
+														   InvalidXLogRecPtr,
+								                           xlrec->nmsgs,
+														   inval_msgs);
+
+								/*
+								 * Let everyone know that this transaction
+								 * modified the catalog. We need this at commit
+								 * time.
+								 */
+								ReorderBufferXidSetTimetravel(reorder, xid);
+
+							}
+
+							SnapBuildCommitTxn(snapstate, reorder,
+											   buf->origptr, xid,
+											   xlrec->nsubxacts,
+											   (TransactionId *) &xlrec->xnodes);
+							break;
+						}
+					case XLOG_XACT_COMMIT_COMPACT:
+						{
+							xl_xact_commit_compact *xlrec =
+								(xl_xact_commit_compact *) buf->record_data;
+
+							ret = SNAPBUILD_DECODE;
+
+							SnapBuildCommitTxn(snapstate, reorder,
+											   buf->origptr, xid,
+											   xlrec->nsubxacts,
+											   xlrec->subxacts);
+							break;
+						}
+					case XLOG_XACT_COMMIT_PREPARED:
+						{
+							xl_xact_commit_prepared *xlrec =
+								(xl_xact_commit_prepared *) buf->record_data;
+
+							/* FIXME: check for invalidation messages! */
+
+							SnapBuildCommitTxn(snapstate, reorder,
+											   buf->origptr, xlrec->xid,
+											   xlrec->crec.nsubxacts,
+											   (TransactionId *) &xlrec->crec.xnodes);
+
+							ret = SNAPBUILD_DECODE;
+							break;
+						}
+					case XLOG_XACT_ABORT:
+						{
+							xl_xact_abort *xlrec =
+								(xl_xact_abort *) buf->record_data;
+
+							SnapBuildAbortTxn(snapstate, xid, xlrec->nsubxacts,
+											  (TransactionId *) &(xlrec->xnodes[xlrec->nrels]));
+							ret = SNAPBUILD_DECODE;
+							break;
+						}
+					case XLOG_XACT_ABORT_PREPARED:
+						{
+							xl_xact_abort_prepared *xlrec =
+								(xl_xact_abort_prepared *) buf->record_data;
+							xl_xact_abort *arec = &xlrec->arec;
+
+							SnapBuildAbortTxn(snapstate, xlrec->xid,
+											  arec->nsubxacts,
+											  (TransactionId *) &(arec->xnodes[arec->nrels]));
+							ret = SNAPBUILD_DECODE;
+							break;
+						}
+					case XLOG_XACT_ASSIGNMENT:
+						break;
+					case XLOG_XACT_PREPARE:
+						/*
+						 * XXX: We could take note of all in-progress prepared
+						 * xacts so we can use shutdown checkpoints to abort
+						 * in-progress transactions...
+						 */
+					default:
+						break;
+						;
+				}
+				break;
+			}
+		case RM_HEAP_ID:
+			{
+				switch (info & XLOG_HEAP_OPMASK)
+				{
+					case XLOG_HEAP_INPLACE:
+						{
+							xl_heap_inplace *xlrec =
+								(xl_heap_inplace *) buf->record_data;
+
+							ret = SnapBuildProcessChange(reorder, snapstate,
+														 xid, buf,
+														 &xlrec->target.node);
+
+							/*
+							 * inplace records happen in catalog modifying
+							 * txn's
+							 */
+							ReorderBufferXidSetTimetravel(reorder, xid);
+
+							break;
+						}
+					/*
+					 * we only ever read changes, so row level locks
+					 * aren't interesting
+					 */
+					case XLOG_HEAP_LOCK:
+						break;
+
+					case XLOG_HEAP_INSERT:
+						{
+							xl_heap_insert *xlrec =
+								(xl_heap_insert *) buf->record_data;
+
+							ret = SnapBuildProcessChange(reorder, snapstate,
+														 xid, buf,
+														 &xlrec->target.node);
+							break;
+						}
+					/* HEAP(_HOT)?_UPDATE use the same data layout */
+					case XLOG_HEAP_UPDATE:
+					case XLOG_HEAP_HOT_UPDATE:
+						{
+							xl_heap_update *xlrec =
+								(xl_heap_update *) buf->record_data;
+
+							ret = SnapBuildProcessChange(reorder, snapstate,
+														 xid, buf,
+														 &xlrec->target.node);
+							break;
+						}
+					case XLOG_HEAP_DELETE:
+						{
+							xl_heap_delete *xlrec =
+								(xl_heap_delete *) buf->record_data;
+
+							ret = SnapBuildProcessChange(reorder, snapstate,
+														 xid, buf,
+														 &xlrec->target.node);
+							break;
+						}
+					default:
+						;
+				}
+				break;
+			}
+		case RM_HEAP2_ID:
+			{
+				switch (info)
+				{
+					case XLOG_HEAP2_MULTI_INSERT:
+						{
+							xl_heap_multi_insert *xlrec =
+								(xl_heap_multi_insert *) buf->record_data;
+
+							ret = SnapBuildProcessChange(reorder, snapstate, xid,
+														 buf, &xlrec->node);
+							break;
+						}
+					case XLOG_HEAP2_NEW_CID:
+						{
+							CommandId cid;
+
+							xl_heap_new_cid *xlrec =
+								(xl_heap_new_cid *) buf->record_data;
+#if 0
+							elog(WARNING, "found new cid in xid %u: relfilenode %u/%u/%u: tid: (%u, %u) cmin: %u, cmax: %u, combo: %u",
+								 xlrec->top_xid,
+								 xlrec->target.node.dbNode, xlrec->target.node.spcNode,	xlrec->target.node.relNode,
+								 BlockIdGetBlockNumber(&xlrec->target.tid.ip_blkid), xlrec->target.tid.ip_posid,
+								 xlrec->cmin, xlrec->cmax, xlrec->combocid);
+#endif
+							/* we only log new_cid's if a catalog tuple was modified */
+							ReorderBufferXidSetTimetravel(reorder, xid);
+
+							if (!ReorderBufferXidHasBaseSnapshot(reorder, xid))
+							{
+								if (!snapstate->snapshot) {
+									snapstate->snapshot = SnapBuildBuildSnapshot(snapstate, xid);
+									/* refcount of the snapshot builder */
+									SnapBuildSnapIncRefcount(snapstate->snapshot);
+								}
+
+								/* refcount of the transaction */
+								SnapBuildSnapIncRefcount(snapstate->snapshot);
+
+								ReorderBufferAddBaseSnapshot(reorder, xid,
+														  InvalidXLogRecPtr,
+														  snapstate->snapshot);
+							}
+
+							ReorderBufferAddNewTupleCids(reorder, xlrec->top_xid, buf->origptr,
+													  xlrec->target.node, xlrec->target.tid,
+													  xlrec->cmin, xlrec->cmax, xlrec->combocid);
+
+							/* figure out new command id */
+							if (xlrec->cmin != InvalidCommandId && xlrec->cmax != InvalidCommandId)
+								cid = Max(xlrec->cmin, xlrec->cmax);
+							else if (xlrec->cmax != InvalidCommandId)
+								cid = xlrec->cmax;
+							else if (xlrec->cmin != InvalidCommandId)
+								cid = xlrec->cmin;
+							else
+							{
+								cid = InvalidCommandId; /* silence compiler */
+								elog(ERROR, "broken arrow, no cid?");
+							}
+							/*
+							 * FIXME: potential race condition here: if
+							 * multiple snapshots were running & generating
+							 * changes in the same transaction on the source
+							 * side this could be problematic.  But this cannot
+							 * happen for system catalogs, right?
+							 */
+							ReorderBufferAddNewCommandId(reorder, xid, buf->origptr,
+													  cid + 1);
+						}
+					default:
+						;
+				}
+			}
+			break;
+	}
+
+	return ret;
+}
+
+
+/*
+ * check whether `xid` is currently running
+ */
+static bool
+SnapBuildTxnIsRunning(Snapstate *snapstate, TransactionId xid)
+{
+	if (snapstate->nrrunning &&
+		NormalTransactionIdFollows(xid, snapstate->xmin_running) &&
+		NormalTransactionIdPrecedes(xid, snapstate->xmax_running))
+	{
+		TransactionId *search =
+			bsearch(&xid, snapstate->running, snapstate->nrrunning_initial,
+					sizeof(TransactionId), xidComparator);
+
+		if (search != NULL)
+		{
+			Assert(*search == xid);
+			return true;
+		}
+	}
+
+	return false;
+}
+
+/*
+ * FIXME: Analogous struct to the private one in reorderbuffer.c.
+ *
+ * Maybe introduce reorderbuffer_internal.h?
+ */
+typedef struct ReorderBufferTXNByIdEnt
+{
+	TransactionId xid;
+	ReorderBufferTXN *txn;
+}  ReorderBufferTXNByIdEnt;
+
+/*
+ * Add a new SnapshotNow to all transactions we're decoding that currently are
+ * in-progress so they can see new catalog contents made by the transaction
+ * that just committed.
+ */
+static void
+SnapBuildDistributeSnapshotNow(Snapstate *snapstate, ReorderBuffer *reorder, XLogRecPtr lsn)
+{
+	HASH_SEQ_STATUS status;
+	ReorderBufferTXNByIdEnt* ent;
+	elog(DEBUG1, "distributing snapshots to all running transactions");
+
+	hash_seq_init(&status, reorder->by_txn);
+
+	/*
+	 * FIXME: were providing snapshots the txn that committed just now...
+	 *
+	 * XXX: If we would handle XLOG_ASSIGNMENT records we could avoid handing
+	 * out snapshots to transactions that we recognize as being subtransactions.
+	 */
+	while ((ent = (ReorderBufferTXNByIdEnt*) hash_seq_search(&status)) != NULL)
+	{
+		if (ReorderBufferXidHasBaseSnapshot(reorder, ent->xid))
+		{
+			SnapBuildSnapIncRefcount(snapstate->snapshot);
+			ReorderBufferAddBaseSnapshot(reorder, ent->xid, lsn, snapstate->snapshot);
+		}
+	}
+}
+
+/*
+ * Keep track of a new catalog changing transaction that has committed
+ */
+static void
+SnapBuildAddCommittedTxn(Snapstate *snapstate, TransactionId xid)
+{
+	if (snapstate->nrcommitted == snapstate->nrcommitted_space)
+	{
+		elog(WARNING, "increasing space for committed transactions");
+
+		snapstate->nrcommitted_space *= 2;
+		snapstate->committed = realloc(snapstate->committed,
+									   snapstate->nrcommitted_space * sizeof(TransactionId));
+		if (!snapstate->committed)
+			elog(ERROR, "couldn't enlarge space for committed transactions");
+	}
+	snapstate->committed[snapstate->nrcommitted++] = xid;
+}
+
+/*
+ * Remove all transactions we treat as committed that are smaller than
+ * ->xmin. Those won't ever get checked via the ->commited array anyway.
+ */
+static void
+SnapBuildPurgeCommittedTxn(Snapstate *snapstate)
+{
+	int off;
+	TransactionId *workspace;
+	int surviving_xids = 0;
+
+	/* XXX: Neater algorithm? */
+	workspace = malloc(snapstate->nrcommitted * sizeof(TransactionId));
+
+	if (!workspace)
+		elog(ERROR, "could not allocate memory for workspace during xmin purging");
+
+	for (off = 0; off < snapstate->nrcommitted; off++)
+	{
+		if (snapstate->committed[off] > snapstate->xmin)
+			workspace[surviving_xids++] = snapstate->committed[off];
+	}
+
+	memcpy(snapstate->committed, workspace,
+		   surviving_xids * sizeof(TransactionId));
+
+	snapstate->nrcommitted = surviving_xids;
+	free(workspace);
+}
+
+/*
+ * Common logic for SnapBuildAbortTxn and SnapBuildCommitTxn dealing with
+ * keeping track of the amount of running transactions.
+ */
+static void
+SnapBuildEndTxn(Snapstate *snapstate, TransactionId xid)
+{
+	if (snapstate->state == SNAPBUILD_CONSISTENT)
+		return;
+
+	if (SnapBuildTxnIsRunning(snapstate, xid))
+	{
+		if (!--snapstate->nrrunning)
+		{
+			/*
+			 * none of the originally running transaction is running
+			 * anymore. Due to that our incrementaly built snapshot now is
+			 * complete.
+			 */
+			elog(LOG, "found consistent point due to SnapBuildEndTxn + running: %u", xid);
+			snapstate->state = SNAPBUILD_CONSISTENT;
+		}
+	}
+}
+
+/* Abort a transaction, throw away all state we kept */
+static void
+SnapBuildAbortTxn(Snapstate *snapstate, TransactionId xid, int nsubxacts, TransactionId *subxacts)
+{
+	int i;
+
+	for (i = 0; i < nsubxacts; i++)
+	{
+		TransactionId subxid = subxacts[i];
+		SnapBuildEndTxn(snapstate, subxid);
+	}
+
+	SnapBuildEndTxn(snapstate, xid);
+}
+
+/* Handle everything that needs to be done when a transaction commits */
+static void
+SnapBuildCommitTxn(Snapstate *snapstate, ReorderBuffer *reorder,
+				   XLogRecPtr lsn, TransactionId xid,
+				   int nsubxacts, TransactionId *subxacts)
+{
+	int nxact;
+
+	bool forced_timetravel = false;
+	bool sub_does_timetravel = false;
+	bool top_does_timetravel = false;
+
+	/*
+	 * If we couldn't observe every change of a transaction because it was
+	 * already running at the point we started to observe we have to assume it
+	 * made catalog changes.
+	 *
+	 * This has the positive benefit that we afterwards have enough information
+	 * to build an exportable snapshot thats usable by pg_dump et al.
+	 */
+	if (snapstate->state < SNAPBUILD_CONSISTENT)
+	{
+		if (XLByteLT(snapstate->transactions_after, lsn))
+			snapstate->transactions_after = lsn;
+
+		if (SnapBuildTxnIsRunning(snapstate, xid))
+		{
+			elog(LOG, "forced to assume catalog changes for xid %u because it was running to early", xid);
+			SnapBuildAddCommittedTxn(snapstate, xid);
+			forced_timetravel = true;
+		}
+	}
+
+	for (nxact = 0; nxact < nsubxacts; nxact++)
+	{
+		TransactionId subxid = subxacts[nxact];
+
+		/*
+		 * make sure txn is not tracked in running txn's anymore, switch
+		 * state
+		 */
+		SnapBuildEndTxn(snapstate, subxid);
+
+		if (forced_timetravel)
+		{
+			SnapBuildAddCommittedTxn(snapstate, subxid);
+		}
+		/*
+		 * add subtransaction to base snapshot, we don't distinguish after
+		 * that
+		 */
+		else if (ReorderBufferXidDoesTimetravel(reorder, subxid))
+		{
+			sub_does_timetravel = true;
+
+			elog(DEBUG1, "found subtransaction %u:%u with catalog changes.",
+				 xid, subxid);
+
+			SnapBuildAddCommittedTxn(snapstate, subxid);
+		}
+
+
+		if (forced_timetravel && sub_does_timetravel &&
+			NormalTransactionIdFollows(subxid, snapstate->xmax))
+		{
+			snapstate->xmax = subxid;
+			TransactionIdAdvance(snapstate->xmax);
+		}
+	}
+
+	/*
+	 * make sure txn is not tracked in running txn's anymore, switch state
+	 */
+	SnapBuildEndTxn(snapstate, xid);
+
+	if (forced_timetravel)
+	{
+		elog(DEBUG1, "forced transaction %u to do timetravel.", xid);
+
+		SnapBuildAddCommittedTxn(snapstate, xid);
+	}
+	/* add toplevel transaction to base snapshot */
+	else if (ReorderBufferXidDoesTimetravel(reorder, xid))
+	{
+		elog(DEBUG1, "found top level transaction %u, with catalog changes!", xid);
+
+		top_does_timetravel = true;
+		SnapBuildAddCommittedTxn(snapstate, xid);
+	}
+	else if (sub_does_timetravel)
+	{
+		/* mark toplevel txn as timetravel as well */
+		SnapBuildAddCommittedTxn(snapstate, xid);
+	}
+
+	if (forced_timetravel || top_does_timetravel || sub_does_timetravel)
+	{
+		if (!TransactionIdIsValid(snapstate->xmax) ||
+			NormalTransactionIdFollows(xid, snapstate->xmax))
+		{
+			snapstate->xmax = xid;
+			TransactionIdAdvance(snapstate->xmax);
+		}
+
+		if (snapstate->state < SNAPBUILD_FULL_SNAPSHOT)
+			return;
+
+		if (snapstate->snapshot) {
+			/* refcount of the transaction */
+			SnapBuildSnapDecRefcount(snapstate->snapshot);
+		}
+
+		snapstate->snapshot = SnapBuildBuildSnapshot(snapstate, xid);
+
+		/* refcount of the snapshot builder */
+		SnapBuildSnapIncRefcount(snapstate->snapshot);
+
+		/* add a new SnapshotNow to all currently running transactions */
+		SnapBuildDistributeSnapshotNow(snapstate, reorder, lsn);
+	}
+}
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index b6cfdac..07f1cdb 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -76,9 +76,11 @@ Node *replication_parse_result;
 %token K_NOWAIT
 %token K_WAL
 %token K_START_REPLICATION
+%token K_INIT_LOGICAL_REPLICATION
+%token K_START_LOGICAL_REPLICATION
 
 %type <node>	command
-%type <node>	base_backup start_replication identify_system
+%type <node>	base_backup start_replication identify_system start_logical_replication init_logical_replication
 %type <list>	base_backup_opt_list
 %type <defelt>	base_backup_opt
 %%
@@ -97,6 +99,8 @@ command:
 			identify_system
 			| base_backup
 			| start_replication
+			| init_logical_replication
+			| start_logical_replication
 			;
 
 /*
@@ -166,6 +170,32 @@ start_replication:
 					$$ = (Node *) cmd;
 				}
 			;
+
+/* FIXME: don't use SCONST */
+init_logical_replication:
+			K_INIT_LOGICAL_REPLICATION SCONST
+				{
+					InitLogicalReplicationCmd *cmd;
+					cmd = makeNode(InitLogicalReplicationCmd);
+					cmd->plugin = $2;
+
+					$$ = (Node *) cmd;
+				}
+			;
+
+/* FIXME: don't use SCONST */
+start_logical_replication:
+			K_START_LOGICAL_REPLICATION SCONST RECPTR
+				{
+					StartLogicalReplicationCmd *cmd;
+					cmd = makeNode(StartLogicalReplicationCmd);
+					cmd->name = $2;
+					cmd->startpoint = $3;
+
+					$$ = (Node *) cmd;
+				}
+			;
+
 %%
 
 #include "repl_scanner.c"
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index 51f381d..58f7972 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -64,6 +64,8 @@ NOWAIT			{ return K_NOWAIT; }
 PROGRESS			{ return K_PROGRESS; }
 WAL			{ return K_WAL; }
 START_REPLICATION	{ return K_START_REPLICATION; }
+INIT_LOGICAL_REPLICATION	{ return K_INIT_LOGICAL_REPLICATION; }
+START_LOGICAL_REPLICATION	{ return K_START_LOGICAL_REPLICATION; }
 ","				{ return ','; }
 ";"				{ return ';'; }
 
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 6452c34..4713847 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -52,6 +52,7 @@
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "replication/walsender_private.h"
+#include "replication/logicalfuncs.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/pmsignal.h"
@@ -83,6 +84,9 @@ WalSndCtlData *WalSndCtl = NULL;
 /* My slot in the shared memory array */
 WalSnd	   *MyWalSnd = NULL;
 
+/* My slot for logical rep in the shared memory array */
+LogicalWalSnd *MyLogicalWalSnd = NULL;
+
 /* Global state */
 bool		am_walsender = false;		/* Am I a walsender process ? */
 bool		am_cascading_walsender = false;		/* Am I cascading WAL to
@@ -92,6 +96,7 @@ static bool	replication_started = false; /* Started streaming yet? */
 
 /* User-settable parameters for walsender */
 int			max_wal_senders = 0;	/* the maximum number of concurrent walsenders */
+int			max_logical_slots = 0;	/* the maximum number of logical slots */
 int			wal_sender_timeout = 60 * 1000;	/* maximum time to send one
 												 * WAL data message */
 /*
@@ -129,18 +134,30 @@ static bool	ping_sent = false;
 static volatile sig_atomic_t got_SIGHUP = false;
 volatile sig_atomic_t walsender_ready_to_stop = false;
 
+/* XXX reader */
+static MemoryContext decoding_ctx = NULL;
+static MemoryContext old_decoding_ctx = NULL;
+
+static XLogReaderState *logical_reader = NULL;
+
+
 /* Signal handlers */
 static void WalSndSigHupHandler(SIGNAL_ARGS);
 static void WalSndXLogSendHandler(SIGNAL_ARGS);
 static void WalSndLastCycleHandler(SIGNAL_ARGS);
 
 /* Prototypes for private functions */
-static void WalSndLoop(void) __attribute__((noreturn));
+typedef void (*WalSndSendData)(bool *);
+static void WalSndLoop(WalSndSendData send_data) __attribute__((noreturn));
 static void InitWalSenderSlot(void);
 static void WalSndKill(int code, Datum arg);
-static void XLogSend(bool *caughtup);
+static void XLogSendPhysical(bool *caughtup);
+static void XLogSendLogical(bool *caughtup);
 static void IdentifySystem(void);
 static void StartReplication(StartReplicationCmd *cmd);
+static void InitLogicalReplication(InitLogicalReplicationCmd *cmd);
+static void StartLogicalReplication(StartLogicalReplicationCmd *cmd);
+static void ComputeLogicalXmin(void);
 static void ProcessStandbyMessage(void);
 static void ProcessStandbyReplyMessage(void);
 static void ProcessStandbyHSFeedbackMessage(void);
@@ -192,6 +209,58 @@ WalSndErrorCleanup()
 		proc_exit(0);
 }
 
+extern void
+IncreaseLogicalXminForSlot(XLogRecPtr lsn, TransactionId xmin)
+{
+	Assert(MyLogicalWalSnd != NULL);
+
+	SpinLockAcquire(&MyLogicalWalSnd->mutex);
+	/*
+	 * Only increase if the previous value has been applied...
+	 */
+	if (!TransactionIdIsValid(MyLogicalWalSnd->candidate_xmin))
+	{
+		MyLogicalWalSnd->candidate_xmin_after = lsn;
+		MyLogicalWalSnd->candidate_xmin = xmin;
+		elog(LOG, "got new xmin %u at %lu", xmin, lsn);
+	}
+	SpinLockRelease(&MyLogicalWalSnd->mutex);
+}
+
+static void
+ComputeLogicalXmin(void)
+{
+	int slot;
+	TransactionId xmin = InvalidTransactionId;
+	LogicalWalSnd *logical_base;
+	LogicalWalSnd *walsnd;
+
+	logical_base = (LogicalWalSnd*)&WalSndCtl->walsnds[max_wal_senders];
+
+	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+
+	for (slot = 0; slot < max_logical_slots; slot++)
+	{
+		walsnd = &logical_base[slot];
+
+		SpinLockAcquire(&walsnd->mutex);
+		if (walsnd->in_use &&
+			TransactionIdIsValid(walsnd->xmin) && (
+				!TransactionIdIsValid(xmin) ||
+				TransactionIdPrecedes(walsnd->xmin, xmin))
+			)
+		{
+			xmin = walsnd->xmin;
+		}
+		SpinLockRelease(&walsnd->mutex);
+	}
+	WalSndCtl->logical_xmin = xmin;
+	LWLockRelease(ProcArrayLock);
+
+	elog(LOG, "computed new xmin: %u", xmin);
+
+}
+
 /*
  * IDENTIFY_SYSTEM
  */
@@ -376,7 +445,362 @@ StartReplication(StartReplicationCmd *cmd)
 	SyncRepInitConfig();
 
 	/* Main loop of walsender */
-	WalSndLoop();
+	WalSndLoop(XLogSendPhysical);
+}
+
+static void
+InitLogicalReplication(InitLogicalReplicationCmd *cmd)
+{
+	int slot;
+	LogicalWalSnd *logical_base;
+	LogicalWalSnd *walsnd;
+	char *slot_name;
+	StringInfoData buf;
+	char		xpos[MAXFNAMELEN];
+
+	elog(WARNING, "Initiating logical rep");
+
+	logical_base = (LogicalWalSnd*)&WalSndCtl->walsnds[max_wal_senders];
+
+	Assert(!MyLogicalWalSnd);
+
+	for (slot = 0; slot < max_logical_slots; slot++)
+	{
+		walsnd = &logical_base[slot];
+
+		SpinLockAcquire(&walsnd->mutex);
+		if (!walsnd->in_use)
+		{
+			Assert(!walsnd->active);
+			/* NOT releasing the lock yet */
+			break;
+		}
+		SpinLockRelease(&walsnd->mutex);
+		walsnd = NULL;
+	}
+
+	if (!walsnd)
+	{
+		elog(ERROR, "couldn't find free logical slot. free one or increase max_logical_slots");
+	}
+
+	/* so we get reset on exit/failure */
+	MyLogicalWalSnd = walsnd;
+	MyLogicalWalSnd->last_required_checkpoint = GetRedoRecPtr();
+
+	walsnd->in_use = true;
+	/* mark slot as active till we build the base snapshot */
+	walsnd->active = true;
+	walsnd->xmin = InvalidTransactionId;
+
+	strcpy(NameStr(walsnd->plugin), cmd->plugin);
+
+	/* FIXME: permanent name allocation scheme */
+	slot_name = NameStr(walsnd->name);
+	sprintf(slot_name, "id-%d", slot);
+
+	/* release spinlock, so this slot can be examined  */
+	SpinLockRelease(&walsnd->mutex);
+
+	/*
+	 * FIXME: think about race conditions here...
+	 *
+	 * We need to do this *after* releasing the spinlock, otherwise
+	 * GetOldestXmin will deadlock with ourselves.
+	 */
+	walsnd->xmin = GetOldestXmin(true, true);
+	ComputeLogicalXmin();
+
+	decoding_ctx = AllocSetContextCreate(TopMemoryContext,
+										 "decoding context",
+										 ALLOCSET_DEFAULT_MINSIZE,
+										 ALLOCSET_DEFAULT_INITSIZE,
+										 ALLOCSET_DEFAULT_MAXSIZE);
+	old_decoding_ctx = MemoryContextSwitchTo(decoding_ctx);
+	TopTransactionContext = decoding_ctx;
+
+	logical_reader = initial_snapshot_reader();
+
+	logical_reader->startptr = MyLogicalWalSnd->last_required_checkpoint;
+
+	for (;;)
+	{
+		ResetLatch(&MyWalSnd->latch);
+
+		/*
+		 * Emergency bailout if postmaster has died.  This is to avoid the
+		 * necessity for manual cleanup of all postmaster children.
+		 */
+		if (!PostmasterIsAlive())
+			exit(1);
+
+		/* Process any requests or signals received recently */
+		if (got_SIGHUP)
+		{
+			got_SIGHUP = false;
+			ProcessConfigFile(PGC_SIGHUP);
+		}
+
+		logical_reader->endptr = GetFlushRecPtr();
+
+		/* continue building initial snapshot */
+		XLogReaderRead(logical_reader);
+
+		if (logical_reader->needs_input || !initial_snapshot_ready(logical_reader))
+		{
+			long		sleeptime = 10000;		/* 10 s */
+			int			wakeEvents;
+
+			wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT;
+
+			/* Sleep until something happens or we time out */
+			ImmediateInterruptOK = true;
+			CHECK_FOR_INTERRUPTS();
+			WaitLatch(&MyWalSnd->latch, wakeEvents,
+					  sleeptime);
+			ImmediateInterruptOK = false;
+		}
+		else
+			break;
+	}
+
+	walsnd->confirmed_flush = logical_reader->curptr;
+
+	snprintf(xpos, sizeof(xpos), "%X/%X",
+			 (uint32) (logical_reader->curptr >> 32), (uint32) logical_reader->curptr);
+
+	pq_beginmessage(&buf, 'T');
+	pq_sendint(&buf, 4, 2);		/* 4 fields */
+
+	/* first field */
+	pq_sendstring(&buf, "replication_id");	/* col name */
+	pq_sendint(&buf, 0, 4);		/* table oid */
+	pq_sendint(&buf, 0, 2);		/* attnum */
+	pq_sendint(&buf, TEXTOID, 4);		/* type oid */
+	pq_sendint(&buf, -1, 2);	/* typlen */
+	pq_sendint(&buf, 0, 4);		/* typmod */
+	pq_sendint(&buf, 0, 2);		/* format code */
+
+	pq_sendstring(&buf, "consistent_point");	/* col name */
+	pq_sendint(&buf, 0, 4);		/* table oid */
+	pq_sendint(&buf, 0, 2);		/* attnum */
+	pq_sendint(&buf, TEXTOID, 4);		/* type oid */
+	pq_sendint(&buf, -1, 2);	/* typlen */
+	pq_sendint(&buf, 0, 4);		/* typmod */
+	pq_sendint(&buf, 0, 2);		/* format code */
+
+	pq_sendstring(&buf, "snapshot_name");	/* col name */
+	pq_sendint(&buf, 0, 4);		/* table oid */
+	pq_sendint(&buf, 0, 2);		/* attnum */
+	pq_sendint(&buf, TEXTOID, 4);		/* type oid */
+	pq_sendint(&buf, -1, 2);	/* typlen */
+	pq_sendint(&buf, 0, 4);		/* typmod */
+	pq_sendint(&buf, 0, 2);		/* format code */
+
+	pq_sendstring(&buf, "plugin");	/* col name */
+	pq_sendint(&buf, 0, 4);		/* table oid */
+	pq_sendint(&buf, 0, 2);		/* attnum */
+	pq_sendint(&buf, TEXTOID, 4);		/* type oid */
+	pq_sendint(&buf, -1, 2);	/* typlen */
+	pq_sendint(&buf, 0, 4);		/* typmod */
+	pq_sendint(&buf, 0, 2);		/* format code */
+
+	pq_endmessage(&buf);
+
+	/* Send a DataRow message */
+	pq_beginmessage(&buf, 'D');
+	pq_sendint(&buf, 4, 2);		/* # of columns */
+
+	pq_sendint(&buf, strlen(slot_name), 4); /* col1 len */
+	pq_sendbytes(&buf, slot_name, strlen(slot_name));
+
+	pq_sendint(&buf, strlen(xpos), 4); /* col2 len */
+	pq_sendbytes(&buf, xpos, strlen(xpos));
+
+	pq_sendint(&buf, strlen("0xDEADBEEF"), 4); /* col3 len */
+	pq_sendbytes(&buf, "0xDEADBEEF", strlen("0xDEADBEEF"));
+
+	pq_sendint(&buf, strlen(cmd->plugin), 4); /* col4 len */
+	pq_sendbytes(&buf, cmd->plugin, strlen(cmd->plugin));
+
+	pq_endmessage(&buf);
+
+	SpinLockAcquire(&walsnd->mutex);
+	walsnd->active = false;
+	MyLogicalWalSnd = NULL;
+	SpinLockRelease(&walsnd->mutex);
+
+	MemoryContextSwitchTo(old_decoding_ctx);
+	TopTransactionContext = NULL;
+}
+
+
+
+static void
+StartLogicalReplication(StartLogicalReplicationCmd *cmd)
+{
+	StringInfoData buf;
+
+	int slot;
+	LogicalWalSnd *logical_base;
+	logical_base = (LogicalWalSnd*)&WalSndCtl->walsnds[max_wal_senders];
+
+	elog(WARNING, "Starting logical replication");
+
+	Assert(!MyLogicalWalSnd);
+
+	for (slot = 0; slot < max_logical_slots; slot++)
+	{
+		LogicalWalSnd   *walsnd = &logical_base[slot];
+
+		SpinLockAcquire(&walsnd->mutex);
+		if (walsnd->in_use && strcmp(cmd->name, NameStr(walsnd->name)) == 0)
+		{
+			MyLogicalWalSnd = walsnd;
+			/* NOT releasing the lock yet */
+			break;
+		}
+		SpinLockRelease(&walsnd->mutex);
+	}
+
+	if (!MyLogicalWalSnd)
+		elog(ERROR, "couldn't find logical slot for \"%s\"",
+		     cmd->name);
+
+	if (MyLogicalWalSnd->active)
+	{
+		SpinLockRelease(&MyLogicalWalSnd->mutex);
+		elog(ERROR, "slot already active");
+	}
+
+	MyLogicalWalSnd->active = true;
+	SpinLockRelease(&MyLogicalWalSnd->mutex);
+
+	MarkPostmasterChildWalSender();
+	SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
+	replication_started = true;
+
+	if (am_cascading_walsender && !RecoveryInProgress())
+	{
+		ereport(LOG,
+		   (errmsg("terminating walsender process to force cascaded standby "
+				   "to update timeline and reconnect")));
+		walsender_ready_to_stop = true;
+	}
+
+	WalSndSetState(WALSNDSTATE_CATCHUP);
+
+	/* Send a CopyBothResponse message, and start streaming */
+	pq_beginmessage(&buf, 'W');
+	pq_sendbyte(&buf, 0);
+	pq_sendint(&buf, 0, 2);
+	pq_endmessage(&buf);
+	pq_flush();
+
+	/*
+	 * Initialize position to the received one, then the xlog records begin to
+	 * be shipped from that position
+	 */
+	sentPtr = MyLogicalWalSnd->last_required_checkpoint;
+
+	/* Also update the start position status in shared memory */
+	{
+		/* use volatile pointer to prevent code rearrangement */
+		volatile WalSnd *walsnd = MyWalSnd;
+
+		SpinLockAcquire(&walsnd->mutex);
+		walsnd->sentPtr = MyLogicalWalSnd->last_required_checkpoint;
+		SpinLockRelease(&walsnd->mutex);
+	}
+
+	SyncRepInitConfig();
+
+	logical_reader = normal_snapshot_reader(NameStr(MyLogicalWalSnd->plugin),
+											cmd->startpoint);
+
+	logical_reader->startptr = MyLogicalWalSnd->last_required_checkpoint;
+	logical_reader->curptr = logical_reader->startptr;
+	logical_reader->endptr = GetFlushRecPtr();
+
+	/* Main loop of walsender */
+	WalSndLoop(XLogSendLogical);
+}
+
+/*
+ * Prepare a write into a StringInfo.
+ *
+ * Don't do anything lasting in here, its quite possible that nothing will done
+ * with the data.
+ */
+void
+WalSndPrepareWrite(StringInfo out, XLogRecPtr lsn)
+{
+	pq_sendbyte(out, 'w');
+	pq_sendint64(out, lsn);	/* dataStart */
+	pq_sendint64(out, lsn);	/* walEnd */
+	/* XXX: gather that value later just as its done in XLogSendPhysical */
+	pq_sendint64(out, 0 /*GetCurrentIntegerTimestamp() */);/* sendtime */
+}
+
+/*
+ * Actually write out data previously prepared by WalSndPrepareWrite out to the
+ * network, take as long as needed but process replies from the other side
+ * during that.
+ */
+void
+WalSndWriteData(StringInfo data)
+{
+	long		sleeptime = 10000;		/* 10 s */
+	int			wakeEvents;
+
+	pq_putmessage_noblock('d', data->data, data->len);
+
+	for (;;)
+	{
+		if (!pq_is_send_pending())
+			return;
+
+		/*
+		 * Emergency bailout if postmaster has died.  This is to avoid the
+		 * necessity for manual cleanup of all postmaster children.
+		 */
+		if (!PostmasterIsAlive())
+			exit(1);
+
+		/* Process any requests or signals received recently */
+		if (got_SIGHUP)
+		{
+			got_SIGHUP = false;
+			ProcessConfigFile(PGC_SIGHUP);
+			SyncRepInitConfig();
+		}
+
+		CHECK_FOR_INTERRUPTS();
+
+		/* Check for input from the client */
+		ProcessRepliesIfAny();
+
+		/* Clear any already-pending wakeups */
+		ResetLatch(&MyWalSnd->latch);
+
+		/* Try to flush pending output to the client */
+		if (pq_flush_if_writable() != 0)
+			break;
+
+		if (!pq_is_send_pending())
+			return;
+
+		/* FIXME: wal_sender_timeout integration */
+		wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
+			WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT;
+
+		ImmediateInterruptOK = true;
+		CHECK_FOR_INTERRUPTS();
+		WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
+						  MyProcPort->sock, sleeptime);
+		ImmediateInterruptOK = false;
+	}
+	SetLatch(&MyWalSnd->latch);
 }
 
 /*
@@ -421,6 +845,14 @@ exec_replication_command(const char *cmd_string)
 			StartReplication((StartReplicationCmd *) cmd_node);
 			break;
 
+		case T_InitLogicalReplicationCmd:
+			InitLogicalReplication((InitLogicalReplicationCmd *) cmd_node);
+			break;
+
+		case T_StartLogicalReplicationCmd:
+			StartLogicalReplication((StartLogicalReplicationCmd *) cmd_node);
+			break;
+
 		case T_BaseBackupCmd:
 			SendBaseBackup((BaseBackupCmd *) cmd_node);
 			break;
@@ -588,6 +1020,37 @@ ProcessStandbyReplyMessage(void)
 		SpinLockRelease(&walsnd->mutex);
 	}
 
+	/*
+	 * Do an unlocked check for candidate_xmin first.
+	 */
+	if (MyLogicalWalSnd &&
+		TransactionIdIsValid(MyLogicalWalSnd->candidate_xmin))
+	{
+		bool updated_xmin = false;
+
+		/* use volatile pointer to prevent code rearrangement */
+		volatile LogicalWalSnd *walsnd = MyLogicalWalSnd;
+
+		SpinLockAcquire(&walsnd->mutex);
+
+		/* if were past the location required for bumping xmin, do so */
+		if (TransactionIdIsValid(walsnd->candidate_xmin) &&
+			flushPtr != InvalidXLogRecPtr &&
+			XLByteLE(walsnd->candidate_xmin_after, flushPtr)
+			)
+		{
+			walsnd->xmin = walsnd->candidate_xmin;
+			walsnd->candidate_xmin = InvalidTransactionId;
+			walsnd->candidate_xmin_after = InvalidXLogRecPtr;
+			updated_xmin = true;
+		}
+
+		SpinLockRelease(&walsnd->mutex);
+
+		if (updated_xmin)
+			ComputeLogicalXmin();
+	}
+
 	if (!am_cascading_walsender)
 		SyncRepReleaseWaiters();
 }
@@ -669,7 +1132,7 @@ ProcessStandbyHSFeedbackMessage(void)
 
 /* Main loop of walsender process that streams the WAL over Copy messages. */
 static void
-WalSndLoop(void)
+WalSndLoop(WalSndSendData send_data)
 {
 	bool		caughtup = false;
 
@@ -713,12 +1176,12 @@ WalSndLoop(void)
 
 		/*
 		 * If we don't have any pending data in the output buffer, try to send
-		 * some more.  If there is some, we don't bother to call XLogSend
+		 * some more.  If there is some, we don't bother to call send_data
 		 * again until we've flushed it ... but we'd better assume we are not
 		 * caught up.
 		 */
 		if (!pq_is_send_pending())
-			XLogSend(&caughtup);
+			send_data(&caughtup);
 		else
 			caughtup = false;
 
@@ -754,7 +1217,7 @@ WalSndLoop(void)
 			if (walsender_ready_to_stop)
 			{
 				/* ... let's just be real sure we're caught up ... */
-				XLogSend(&caughtup);
+				send_data(&caughtup);
 				if (caughtup && !pq_is_send_pending())
 				{
 					/* Inform the standby that XLOG streaming is done */
@@ -769,7 +1232,7 @@ WalSndLoop(void)
 		/*
 		 * We don't block if not caught up, unless there is unsent data
 		 * pending in which case we'd better block until the socket is
-		 * write-ready.  This test is only needed for the case where XLogSend
+		 * write-ready.  This test is only needed for the case where send_data
 		 * loaded a subset of the available data but then pq_flush_if_writable
 		 * flushed it all --- we should immediately try to send more.
 		 */
@@ -917,6 +1380,13 @@ WalSndKill(int code, Datum arg)
 	 * for this.
 	 */
 	MyWalSnd->pid = 0;
+
+	/* LOCK? */
+	if(MyLogicalWalSnd && MyLogicalWalSnd->active)
+	{
+		MyLogicalWalSnd->active = false;
+	}
+
 	DisownLatch(&MyWalSnd->latch);
 
 	/* WalSnd struct isn't mine anymore */
@@ -1068,6 +1538,8 @@ retry:
 }
 
 /*
+ * Send out the WAL in its normal physical/stored form.
+ *
  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
  * but not yet sent to the client, and buffer it in the libpq output
  * buffer.
@@ -1076,7 +1548,7 @@ retry:
  * *caughtup is set to false.
  */
 static void
-XLogSend(bool *caughtup)
+XLogSendPhysical(bool *caughtup)
 {
 	XLogRecPtr	SendRqstPtr;
 	XLogRecPtr	startptr;
@@ -1210,6 +1682,65 @@ XLogSend(bool *caughtup)
 }
 
 /*
+ * Send out the WAL after it being decoded into a logical format by the output
+ * plugin specified in INIT_LOGICAL_DECODING
+ */
+static void
+XLogSendLogical(bool *caughtup)
+{
+	XLogRecPtr	endptr;
+	XLogRecPtr	curptr;
+
+	if (decoding_ctx == NULL)
+	{
+		decoding_ctx = AllocSetContextCreate(TopMemoryContext,
+											 "decoding context",
+											 ALLOCSET_DEFAULT_MINSIZE,
+											 ALLOCSET_DEFAULT_INITSIZE,
+											 ALLOCSET_DEFAULT_MAXSIZE);
+	}
+
+	logical_reader->endptr = logical_reader->curptr;
+	curptr = logical_reader->curptr;
+
+	/*
+	 * read at most MAX_SEND_SIZE of wal. We chunk the reading only to allow
+	 * reading keepalives and such inbetween.
+	 */
+	XLByteAdvance(logical_reader->endptr, MAX_SEND_SIZE);
+
+	/* only read up to already flushed wal */
+	endptr = GetFlushRecPtr();
+	if (XLByteLT(endptr, logical_reader->endptr))
+		logical_reader->endptr = endptr;
+
+	old_decoding_ctx = MemoryContextSwitchTo(decoding_ctx);
+	TopTransactionContext = decoding_ctx;
+
+	/* continue building initial snapshot */
+	XLogReaderRead(logical_reader);
+
+	MemoryContextSwitchTo(old_decoding_ctx);
+	TopTransactionContext = NULL;
+
+	if (curptr == logical_reader->curptr ||
+		logical_reader->curptr == endptr)
+		*caughtup = true;
+	else
+		*caughtup = false;
+
+	/* Update shared memory status */
+	{
+		/* use volatile pointer to prevent code rearrangement */
+		volatile WalSnd *walsnd = MyWalSnd;
+
+		SpinLockAcquire(&walsnd->mutex);
+		walsnd->sentPtr = logical_reader->curptr;
+		SpinLockRelease(&walsnd->mutex);
+	}
+}
+
+/*
  * Request walsenders to reload the currently-open WAL file
  */
 void
@@ -1301,6 +1832,8 @@ WalSndShmemSize(void)
 	size = offsetof(WalSndCtlData, walsnds);
 	size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
 
+	size = add_size(size, mul_size(max_logical_slots, sizeof(LogicalWalSnd)));
+
 	return size;
 }
 
@@ -1329,6 +1862,21 @@ WalSndShmemInit(void)
 			SpinLockInit(&walsnd->mutex);
 			InitSharedLatch(&walsnd->latch);
 		}
+
+		WalSndCtl->logical_xmin = InvalidTransactionId;
+
+		if (max_logical_slots > 0)
+		{
+			LogicalWalSnd *logical_base;
+			logical_base = (LogicalWalSnd*)&WalSndCtl->walsnds[max_wal_senders];
+
+			for (i = 0; i < max_logical_slots; i++)
+			{
+				LogicalWalSnd   *walsnd = &logical_base[i];
+				walsnd->xmin = InvalidTransactionId;
+				SpinLockInit(&walsnd->mutex);
+			}
+		}
 	}
 }
 
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index a98358d..da9e7e5 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -51,6 +51,8 @@
 #include "access/xact.h"
 #include "access/twophase.h"
 #include "miscadmin.h"
+#include "replication/walsender.h"
+#include "replication/walsender_private.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "storage/spin.h"
@@ -1156,6 +1158,14 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
 		}
 	}
 
+	if (max_logical_slots > 0 &&
+		TransactionIdIsValid(WalSndCtl->logical_xmin) &&
+		TransactionIdPrecedes(WalSndCtl->logical_xmin, result))
+	{
+		result = WalSndCtl->logical_xmin;
+	}
+
+
 	if (RecoveryInProgress())
 	{
 		/*
@@ -1442,10 +1452,17 @@ GetSnapshotData(Snapshot snapshot)
 			suboverflowed = true;
 	}
 
+	/* FIXME: comment & concurrency */
+	if (TransactionIdIsValid(WalSndCtl->logical_xmin) &&
+		TransactionIdPrecedes(WalSndCtl->logical_xmin, xmin))
+		xmin = WalSndCtl->logical_xmin;
+
 	if (!TransactionIdIsValid(MyPgXact->xmin))
 		MyPgXact->xmin = TransactionXmin = xmin;
+
 	LWLockRelease(ProcArrayLock);
 
+
 	/*
 	 * Update globalxmin to include actual process xids.  This is a slightly
 	 * different way of computing it than GetOldestXmin uses, but should give
@@ -1695,6 +1712,12 @@ GetRunningTransactionData(void)
 		}
 	}
 
+	/*
+	 * Its important *not* to track decoding tasks here because snapbuild.c
+	 * uses ->oldestRunningXid to manage its xmin. If it were to be included
+	 * here the initial value could never increase.
+	 */
+
 	CurrentRunningXacts->xcnt = count - subcount;
 	CurrentRunningXacts->subxcnt = subcount;
 	CurrentRunningXacts->subxid_overflow = suboverflowed;
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 0cab243..6cc112d 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -804,7 +804,13 @@ standby_desc_running_xacts(StringInfo buf, xl_running_xacts *xlrec)
 			appendStringInfo(buf, " %u", xlrec->xids[i]);
 	}
 
-	if (xlrec->subxid_overflow)
+	if (xlrec->subxcnt > 0)
+	{
+		appendStringInfo(buf, "; %d subxacts:", xlrec->subxcnt);
+		for (i = 0; i < xlrec->subxcnt; i++)
+			appendStringInfo(buf, " %u", xlrec->xids[xlrec->xcnt + i]);
+	}
+	else if (xlrec->subxid_overflow)
 		appendStringInfo(buf, "; subxid ovf");
 }
 
diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c
index e26bf0b..7839a14 100644
--- a/src/backend/utils/cache/inval.c
+++ b/src/backend/utils/cache/inval.c
@@ -475,7 +475,7 @@ RegisterRelcacheInvalidation(Oid dbId, Oid relId)
  * Only the local caches are flushed; this does not transmit the message
  * to other backends.
  */
-static void
+void
 LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
 {
 	if (msg->id >= 0)
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 8c9ebe0..7bd2c27 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -506,9 +506,10 @@ RelationBuildTupleDesc(Relation relation)
 	heap_close(pg_attribute_desc, AccessShareLock);
 
 	if (need != 0)
+	{
 		elog(ERROR, "catalog is missing %d attribute(s) for relid %u",
 			 need, RelationGetRelid(relation));
-
+	}
 	/*
 	 * The attcacheoff values we read from pg_attribute should all be -1
 	 * ("unknown").  Verify this if assert checking is on.	They will be
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 745e7be..19a03cef 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2030,6 +2030,17 @@ static struct config_int ConfigureNamesInt[] =
 	},
 
 	{
+		/* see max_connections */
+		{"max_logical_slots", PGC_POSTMASTER, REPLICATION_SENDING,
+			gettext_noop("Sets the maximum number of simultaneously defined WAL decoding slots."),
+			NULL
+		},
+		&max_logical_slots,
+		0, 0, MAX_BACKENDS /*?*/,
+		NULL, NULL, NULL
+	},
+
+	{
 		{"wal_sender_timeout", PGC_SIGHUP, REPLICATION_SENDING,
 			gettext_noop("Sets the maximum time to wait for WAL replication."),
 			NULL,
diff --git a/src/backend/utils/time/tqual.c b/src/backend/utils/time/tqual.c
index f64d52d..e24c712 100644
--- a/src/backend/utils/time/tqual.c
+++ b/src/backend/utils/time/tqual.c
@@ -64,6 +64,8 @@
 #include "access/xact.h"
 #include "storage/bufmgr.h"
 #include "storage/procarray.h"
+#include "utils/builtins.h"
+#include "utils/combocid.h"
 #include "utils/tqual.h"
 
 
@@ -73,6 +75,8 @@ SnapshotData SnapshotSelfData = {HeapTupleSatisfiesSelf};
 SnapshotData SnapshotAnyData = {HeapTupleSatisfiesAny};
 SnapshotData SnapshotToastData = {HeapTupleSatisfiesToast};
 
+static Snapshot SnapshotNowDecoding;
+
 /* local functions */
 static bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot);
 
@@ -1407,3 +1411,248 @@ XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
 
 	return false;
 }
+
+static bool
+TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
+{
+	return bsearch(&xid, xip, num,
+	               sizeof(TransactionId), xidComparator) != NULL;
+}
+
+static HTAB *tuplecid_data = NULL;
+
+/*
+ * See the comments for HeapTupleSatisfiesMVCC for the semantics this function
+ * obeys.
+ *
+ * Only usable on tuples from catalog tables!
+ *
+ * We don't need to support HEAP_MOVED_(IN|OFF) for now because we only support
+ * reading catalog pages which couldn't have been created in an older version.
+ *
+ * We don't set any hint bits in here as it seems unlikely to be beneficial as
+ * those should already be set by normal access and it seems to be too
+ * dangerous to do so as the semantics of doing so during timetravel are more
+ * complicated than when dealing "only" with the present.
+ */
+bool
+HeapTupleSatisfiesMVCCDuringDecoding(HeapTuple htup, Snapshot snapshot,
+                                     Buffer buffer)
+{
+	HeapTupleHeader tuple = htup->t_data;
+/*#define DEBUG_ME*/
+	TransactionId xmin = HeapTupleHeaderGetXmin(tuple);
+	TransactionId xmax = HeapTupleHeaderGetXmax(tuple);
+
+	Assert(ItemPointerIsValid(&htup->t_self));
+	Assert(htup->t_tableOid != InvalidOid);
+
+	/* transaction aborted */
+	if (tuple->t_infomask & HEAP_XMIN_INVALID)
+	{
+		Assert(!TransactionIdDidCommit(xmin));
+		goto invisible;
+	}
+    /* check if its one of our txids, toplevel is also in there */
+	else if (TransactionIdInArray(xmin, snapshot->subxip, snapshot->subxcnt))
+	{
+		CommandId cmin = HeapTupleHeaderGetRawCommandId(tuple);
+		CommandId cmax = InvalidCommandId;
+
+		/*
+		 * if another transaction deleted this tuple or if cmin/cmax is stored
+		 * in a combocid we need to to lookup the actual values externally.
+		 */
+		if ((!(tuple->t_infomask & HEAP_XMAX_INVALID) &&
+			 !TransactionIdInArray(xmax, snapshot->subxip, snapshot->subxcnt)) ||
+			tuple->t_infomask & HEAP_COMBOCID
+			)
+		{
+			bool resolved;
+
+			resolved = ResolveCminCmaxDuringDecoding(tuplecid_data, htup,
+													 buffer, &cmin, &cmax);
+
+			if (!resolved)
+				elog(ERROR, "could not resolve cmin/cmax of catalog tuple");
+		}
+
+#ifdef DEBUG_ME
+		elog(LOG, "curcid: %u cmin: %u; invisible: %u", snapshot->curcid, cmin,
+			 cmin >= snapshot->curcid);
+#endif
+		if (cmin >= snapshot->curcid)
+			goto invisible;	/* inserted after scan started */
+	}
+	/* normal transaction state counts */
+	else if (TransactionIdPrecedes(xmin, snapshot->xmin))
+	{
+		Assert(!(tuple->t_infomask & HEAP_XMIN_COMMITTED &&
+				 !TransactionIdDidCommit(xmin)));
+
+		if (!(tuple->t_infomask & HEAP_XMIN_COMMITTED) &&
+			!TransactionIdDidCommit(xmin))
+			goto invisible;
+	}
+	/* beyond our xmax horizon, i.e. invisible */
+	else if (TransactionIdFollows(xmin, snapshot->xmax))
+	{
+		goto invisible;
+	}
+	/* check if we know the transaction has committed */
+	else if(TransactionIdInArray(xmin, snapshot->xip, snapshot->xcnt))
+	{
+	}
+	else
+	{
+		goto invisible;
+	}
+
+	/* at this point we know xmin is visible */
+
+	/* why should those be in catalog tables? */
+	Assert(!(tuple->t_infomask & HEAP_XMAX_IS_MULTI));
+
+	/* xid invalid or aborted */
+	if (tuple->t_infomask & HEAP_XMAX_INVALID)
+		goto visible;
+	/* locked tuples are always visible */
+	else if (tuple->t_infomask & HEAP_IS_LOCKED)
+		goto visible;
+    /* check if its one of our txids, toplevel is also in there */
+	else if (TransactionIdInArray(xmax, snapshot->subxip, snapshot->subxcnt))
+	{
+		CommandId cmin;
+		CommandId cmax = HeapTupleHeaderGetRawCommandId(tuple);
+
+		/* Lookup actual cmin/cmax values */
+		if (tuple->t_infomask & HEAP_COMBOCID){
+#ifdef DEBUG_ME
+			CommandId combocid = cmax;
+#endif
+			bool resolved;
+
+			resolved = ResolveCminCmaxDuringDecoding(tuplecid_data, htup,
+													 buffer, &cmin, &cmax);
+
+			if (!resolved)
+			{
+				elog(FATAL, "could not resolve combocid to cmax");
+				goto invisible;
+			}
+
+
+#ifdef DEBUG_ME
+			elog(LOG, "converting combocid %u to cmax %u (cmin %u)",
+				 combocid, cmax, cmin);
+#endif
+		}
+#ifdef DEBUG_ME
+		elog(LOG, "curcid: %u, cmax %u, visible: %u", snapshot->curcid, cmax,
+			 cmax >= snapshot->curcid);
+#endif
+		if (cmax >= snapshot->curcid)
+			goto visible;	/* deleted after scan started */
+		else
+			goto invisible;	/* deleted before scan started */
+	}
+	/* we cannot possibly see the deleting transaction */
+	else if (TransactionIdFollows(xmax, snapshot->xmax))
+	{
+		goto visible;
+	}
+	/* normal transaction state is valid */
+	else if (TransactionIdPrecedes(xmax, snapshot->xmin))
+	{
+		Assert(!(tuple->t_infomask & HEAP_XMAX_COMMITTED &&
+				 !TransactionIdDidCommit(xmax)));
+
+		if (tuple->t_infomask & HEAP_XMAX_COMMITTED)
+			goto invisible;
+
+		if (TransactionIdDidCommit(xmax))
+			goto invisible;
+		else
+			goto visible;
+	}
+	/* do we know that the deleting txn is valid? */
+	else if (TransactionIdInArray(xmax, snapshot->xip, snapshot->xcnt))
+	{
+		goto invisible;
+	}
+	else
+	{
+		goto visible;
+	}
+visible:
+#ifdef DEBUG_ME
+	if (xmin > FirstNormalTransactionId)
+		elog(DEBUG1, "visible tuple with xmin: %u, xmax: %u, cmin %u, snapmin: %u, snapmax: %u, snapcid: %u, owncnt: %u top: %u, combo: %u, (%u, %u)",
+			 xmin, xmax, HeapTupleHeaderGetRawCommandId(tuple),
+			 snapshot->xmin, snapshot->xmax, snapshot->curcid,
+			 snapshot->subxcnt, snapshot->subxcnt ? snapshot->subxip[0] : 0,
+			 !!(tuple->t_infomask & HEAP_COMBOCID),
+			 BlockIdGetBlockNumber(&htup->t_self.ip_blkid), htup->t_self.ip_posid);
+#endif
+	return true;
+
+invisible:
+#ifdef DEBUG_ME
+	if (xmin > FirstNormalTransactionId)
+		elog(DEBUG1, "invisible tuple with xmin: %u, xmax: %u, cmin %u, snapmin: %u, snapmax: %u, snapcid: %u, owncnt: %u top: %u, combo: %u, (%u, %u)",
+			 xmin, xmax, HeapTupleHeaderGetRawCommandId(tuple),
+			 snapshot->xmin, snapshot->xmax, snapshot->curcid,
+			 snapshot->subxcnt, snapshot->subxcnt ? snapshot->subxip[0] : 0,
+			 !!(tuple->t_infomask & HEAP_COMBOCID),
+			 BlockIdGetBlockNumber(&htup->t_self.ip_blkid), htup->t_self.ip_posid);
+#endif
+	return false;
+}
+
+static bool
+FailsSatisfies(HeapTuple htup, Snapshot snapshot, Buffer buffer)
+{
+	elog(ERROR, "Normal snapshots cannot be used during timetravel access.");
+	return false;
+}
+
+static bool
+RedirectSatisfiesNow(HeapTuple htup, Snapshot snapshot, Buffer buffer)
+{
+	Assert(SnapshotNowDecoding != NULL);
+	return HeapTupleSatisfiesMVCCDuringDecoding(htup, SnapshotNowDecoding,
+	                                            buffer);
+}
+
+void
+SetupDecodingSnapshots(Snapshot snapshot_now, HTAB *tuplecids)
+{
+	/* prevent recursively setting up decoding snapshots */
+	Assert(SnapshotNowData.satisfies != RedirectSatisfiesNow);
+
+	SnapshotNowData.satisfies = RedirectSatisfiesNow;
+	/* make sure normal snapshots aren't used*/
+	SnapshotSelfData.satisfies = FailsSatisfies;
+	SnapshotAnyData.satisfies = FailsSatisfies;
+	SnapshotToastData.satisfies = FailsSatisfies;
+
+	/* setup the timetravel snapshot */
+	SnapshotNowDecoding = snapshot_now;
+
+	/* setup (cmin, cmax) lookup hash */
+	tuplecid_data = tuplecids;
+}
+
+
+void
+RevertFromDecodingSnapshots(void)
+{
+	SnapshotNowDecoding = NULL;
+	tuplecid_data = NULL;
+
+	/* rally to restore sanity and/or boredom */
+	SnapshotNowData.satisfies = HeapTupleSatisfiesNow;
+	SnapshotSelfData.satisfies = HeapTupleSatisfiesSelf;
+	SnapshotAnyData.satisfies = HeapTupleSatisfiesAny;
+	SnapshotToastData.satisfies = HeapTupleSatisfiesToast;
+}
diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c
index 129c4d0..10080d0 100644
--- a/src/bin/pg_controldata/pg_controldata.c
+++ b/src/bin/pg_controldata/pg_controldata.c
@@ -77,6 +77,8 @@ wal_level_str(WalLevel wal_level)
 			return "archive";
 		case WAL_LEVEL_HOT_STANDBY:
 			return "hot_standby";
+		case WAL_LEVEL_LOGICAL:
+			return "logical";
 	}
 	return _("unrecognized wal_level");
 }
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index 8ec710e..1405317 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -54,6 +54,7 @@
 #define XLOG_HEAP2_CLEANUP_INFO 0x30
 #define XLOG_HEAP2_VISIBLE		0x40
 #define XLOG_HEAP2_MULTI_INSERT 0x50
+#define XLOG_HEAP2_NEW_CID 0x60
 
 /*
  * All what we need to find changed tuple
@@ -238,6 +239,28 @@ typedef struct xl_heap_visible
 
 #define SizeOfHeapVisible (offsetof(xl_heap_visible, cutoff_xid) + sizeof(TransactionId))
 
+typedef struct xl_heap_new_cid
+{
+	/*
+	 * store toplevel xid so we don't have to merge cids from different
+	 * transactions
+	 */
+	TransactionId top_xid;
+	CommandId cmin;
+	CommandId cmax;
+	/*
+	 * don't really need the combocid but the padding makes it free and its
+	 * useful for debugging.
+	 */
+	CommandId combocid;
+	/*
+	 * Store the relfilenode/ctid pair to facilitate lookups.
+	 */
+	xl_heaptid target;
+} xl_heap_new_cid;
+
+#define SizeOfHeapNewCid (offsetof(xl_heap_new_cid, target) + SizeOfHeapTid)
+
 extern void HeapTupleHeaderAdvanceLatestRemovedXid(HeapTupleHeader tuple,
 									   TransactionId *latestRemovedXid);
 
diff --git a/src/include/access/transam.h b/src/include/access/transam.h
index 228f6a1..915b2cd 100644
--- a/src/include/access/transam.h
+++ b/src/include/access/transam.h
@@ -63,6 +63,11 @@
 	(AssertMacro(TransactionIdIsNormal(id1) && TransactionIdIsNormal(id2)), \
 	(int32) ((id1) - (id2)) < 0)
 
+/* compare two XIDs already known to be normal; this is a macro for speed */
+#define NormalTransactionIdFollows(id1, id2) \
+	(AssertMacro(TransactionIdIsNormal(id1) && TransactionIdIsNormal(id2)), \
+	(int32) ((id1) - (id2)) > 0)
+
 /* ----------
  *		Object ID (OID) zero is InvalidOid.
  *
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 32c2e40..ae4f849 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -196,7 +196,8 @@ typedef enum WalLevel
 {
 	WAL_LEVEL_MINIMAL = 0,
 	WAL_LEVEL_ARCHIVE,
-	WAL_LEVEL_HOT_STANDBY
+	WAL_LEVEL_HOT_STANDBY,
+	WAL_LEVEL_LOGICAL
 } WalLevel;
 extern int	wal_level;
 
diff --git a/src/include/catalog/index.h b/src/include/catalog/index.h
index 298641b..8848fd2 100644
--- a/src/include/catalog/index.h
+++ b/src/include/catalog/index.h
@@ -103,4 +103,8 @@ extern bool ReindexIsProcessingHeap(Oid heapOid);
 extern bool ReindexIsProcessingIndex(Oid indexOid);
 extern Oid	IndexGetRelation(Oid indexId, bool missing_ok);
 
+extern void relationFindPrimaryKey(Relation pkrel, Oid *indexOid,
+                                   int16 *nratts, int16 *attnums, Oid *atttypids,
+                                   Oid *opclasses);
+
 #endif   /* INDEX_H */
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 438a1d9..223849b 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -407,6 +407,8 @@ typedef enum NodeTag
 	T_IdentifySystemCmd,
 	T_BaseBackupCmd,
 	T_StartReplicationCmd,
+	T_InitLogicalReplicationCmd,
+	T_StartLogicalReplicationCmd,
 
 	/*
 	 * TAGS FOR RANDOM OTHER STUFF
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index 236a36d..91a4986 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -49,4 +49,26 @@ typedef struct StartReplicationCmd
 	XLogRecPtr	startpoint;
 } StartReplicationCmd;
 
+
+/* ----------------------
+ *		INIT_LOGICAL_REPLICATION command
+ * ----------------------
+ */
+typedef struct InitLogicalReplicationCmd
+{
+	NodeTag		type;
+	char       *plugin;
+} InitLogicalReplicationCmd;
+
+/* ----------------------
+ *		START_LOGICAL_REPLICATION command
+ * ----------------------
+ */
+typedef struct StartLogicalReplicationCmd
+{
+	NodeTag		type;
+	char       *name;
+	XLogRecPtr	startpoint;
+} StartLogicalReplicationCmd;
+
 #endif   /* REPLNODES_H */
diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h
new file mode 100644
index 0000000..1caa98d
--- /dev/null
+++ b/src/include/replication/decode.h
@@ -0,0 +1,21 @@
+/*-------------------------------------------------------------------------
+ * decode.h
+ *     PostgreSQL WAL to logical transformation
+ *
+ * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef DECODE_H
+#define DECODE_H
+
+#include "access/xlogreader.h"
+#include "replication/reorderbuffer.h"
+#include "replication/logicalfuncs.h"
+
+void DecodeRecordIntoReorderBuffer(XLogReaderState *reader,
+								   ReaderApplyState* state,
+								   XLogRecordBuffer* buf);
+
+#endif
diff --git a/src/include/replication/logicalfuncs.h b/src/include/replication/logicalfuncs.h
new file mode 100644
index 0000000..db78797
--- /dev/null
+++ b/src/include/replication/logicalfuncs.h
@@ -0,0 +1,44 @@
+/*-------------------------------------------------------------------------
+ * decode.h
+ *     PostgreSQL WAL to logical transformation
+ *
+ * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef LOGICALFUNCS_H
+#define LOGICALFUNCS_H
+
+#include "access/xlogreader.h"
+#include "replication/reorderbuffer.h"
+#include "replication/output_plugin.h"
+
+typedef struct ReaderApplyState
+{
+	struct ReorderBuffer *reorderbuffer;
+	bool stop_after_consistent;
+	struct Snapstate *snapstate;
+
+	LogicalDecodeInitCB init_cb;
+	LogicalDecodeBeginCB begin_cb;
+	LogicalDecodeChangeCB change_cb;
+	LogicalDecodeCommitCB commit_cb;
+	LogicalDecodeCleanupCB cleanup_cb;
+
+	StringInfo out;
+	void *user_private;
+
+
+} ReaderApplyState;
+
+XLogReaderState *
+initial_snapshot_reader(void);
+
+XLogReaderState *
+normal_snapshot_reader(char *plugin, XLogRecPtr valid_after);
+
+bool
+initial_snapshot_ready(XLogReaderState *);
+
+#endif
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
new file mode 100644
index 0000000..27d5982
--- /dev/null
+++ b/src/include/replication/output_plugin.h
@@ -0,0 +1,76 @@
+/*-------------------------------------------------------------------------
+ * output_plugin.h
+ *     PostgreSQL Logical Decode Plugin Interface
+ *
+ * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef OUTPUT_PLUGIN_H
+#define OUTPUT_PLUGIN_H
+
+#include "lib/stringinfo.h"
+
+#include "replication/reorderbuffer.h"
+
+/*
+ * Callback that gets called in a user-defined plugin.
+ * 'private' can be set to some private data.
+ *
+ * Gets looked up in the library symbol pg_decode_init.
+ */
+typedef void (*LogicalDecodeInitCB) (
+	void **private
+	);
+
+/*
+ * Gets called for every BEGIN of a successful transaction.
+ *
+ * Return "true" if the message in "out" should get sent, false otherwise.
+ *
+ * Gets looked up in the library symbol pg_decode_begin_txn.
+ */
+typedef bool (*LogicalDecodeBeginCB) (
+	void *private,
+	StringInfo out,
+	ReorderBufferTXN *txn);
+
+/*
+ * Gets called for every change in a successful transaction.
+ *
+ * Return "true" if the message in "out" should get sent, false otherwise.
+ *
+ * Gets looked up in the library symbol pg_decode_change.
+ */
+typedef bool (*LogicalDecodeChangeCB) (
+	void *private,
+	StringInfo out,
+	ReorderBufferTXN *txn,
+	Oid tableoid,
+	ReorderBufferChange *change
+	);
+
+/*
+ * Gets called for every COMMIT of a successful transaction.
+ *
+ * Return "true" if the message in "out" should get sent, false otherwise.
+ *
+ * Gets looked up in the library symbol pg_decode_commit_txn.
+ */
+typedef bool (*LogicalDecodeCommitCB) (
+	void *private,
+	StringInfo out,
+	ReorderBufferTXN *txn,
+	XLogRecPtr commit_lsn);
+
+/*
+ * Gets called to cleanup the state of an output plugin
+ *
+ * Gets looked up in the library symbol pg_decode_cleanup.
+ */
+typedef void (*LogicalDecodeCleanupCB) (
+	void *private
+	);
+
+#endif
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
new file mode 100644
index 0000000..a79dd79
--- /dev/null
+++ b/src/include/replication/reorderbuffer.h
@@ -0,0 +1,284 @@
+/*
+ * reorderbuffer.h
+ *
+ * PostgreSQL logical replay "cache" management
+ *
+ * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/replication/reorderbuffer.h
+ */
+#ifndef REORDERBUFFER_H
+#define REORDERBUFFER_H
+
+#include "access/htup_details.h"
+#include "utils/hsearch.h"
+
+#include "lib/ilist.h"
+
+#include "storage/sinval.h"
+
+#include "utils/snapshot.h"
+
+
+typedef struct ReorderBuffer ReorderBuffer;
+
+enum ReorderBufferChangeType
+{
+	REORDER_BUFFER_CHANGE_INSERT,
+	REORDER_BUFFER_CHANGE_UPDATE,
+	REORDER_BUFFER_CHANGE_DELETE
+};
+
+typedef struct ReorderBufferTupleBuf
+{
+	/* position in preallocated list */
+	slist_node node;
+
+	HeapTupleData tuple;
+	HeapTupleHeaderData header;
+	char		data[MaxHeapTupleSize];
+}	ReorderBufferTupleBuf;
+
+typedef struct ReorderBufferChange
+{
+	XLogRecPtr	lsn;
+
+	union {
+		enum ReorderBufferChangeType action;
+		/* do not leak internal enum values to the outside */
+		int action_internal;
+	};
+
+	RelFileNode relnode;
+
+	union
+	{
+		ReorderBufferTupleBuf *newtuple;
+		Snapshot	snapshot;
+		CommandId	command_id;
+		struct {
+			RelFileNode node;
+			ItemPointerData tid;
+			CommandId	cmin;
+			CommandId	cmax;
+			CommandId	combocid;
+		} tuplecid;
+	};
+
+	ReorderBufferTupleBuf *oldtuple;
+
+	/*
+	 * While in use this is how a change is linked into a transactions,
+	 * otherwise its the preallocated list.
+	 */
+	dlist_node node;
+} ReorderBufferChange;
+
+typedef struct ReorderBufferTXN
+{
+	TransactionId xid;
+
+	XLogRecPtr	lsn;
+
+	/* did the TX have catalog changes */
+	bool does_timetravel;
+
+	bool has_base_snapshot;
+
+	/*
+	 * How many ReorderBufferChange's do we have in this txn.
+	 *
+	 * Subtransactions are *not* included.
+	 */
+	Size		nentries;
+
+	/*
+	 * How many of the above entries are stored in memory in contrast to being
+	 * spilled to disk.
+	 */
+	Size		nentries_mem;
+
+	/*
+	 * List of actual changes, those include new Snapshots and new CommandIds
+	 */
+	dlist_head changes;
+
+	/*
+	 * List of cmin/cmax pairs for catalog tuples
+	 */
+	dlist_head tuplecids;
+
+	/*
+	 * Numer of stored cmin/cmax pairs. Used to create the tuplecid_hash with
+	 * the correct size.
+	 */
+	Size       ntuplecids;
+
+	/*
+	 * On-demand built hash for looking up the above values.
+	 */
+	HTAB	   *tuplecid_hash;
+
+	/*
+	 * non-hierarchical list of subtransactions that are *not* aborted
+	 */
+	dlist_head subtxns;
+	Size nsubtxns;
+
+	/*
+	 * our position in a list of subtransactions while the TXN is in use.
+	 * Otherwise its the position in the list of preallocated transactions.
+	 */
+	dlist_node node;
+
+	/*
+	 * Number of stored cache invalidations.
+	 */
+	Size ninvalidations;
+
+	/*
+	 * Stored cache invalidations. This is not a linked list because we get all
+	 * the invalidations at once.
+	 */
+	SharedInvalidationMessage *invalidations;
+
+} ReorderBufferTXN;
+
+
+/* XXX: were currently passing the originating subtxn. Not sure thats necessary */
+typedef void (*ReorderBufferApplyChangeCB) (
+	ReorderBuffer *cache,
+	ReorderBufferTXN *txn,
+	ReorderBufferChange *change);
+
+typedef void (*ReorderBufferBeginCB) (
+	ReorderBuffer *cache,
+	ReorderBufferTXN *txn);
+
+typedef void (*ReorderBufferCommitCB) (
+	ReorderBuffer *cache,
+	ReorderBufferTXN *txn,
+	XLogRecPtr commit_lsn);
+
+/*
+ * max number of concurrent top-level transactions or transaction where we
+ * don't know if they are top-level can be calculated by:
+ * (max_connections + max_prepared_xactx + ?)  * PGPROC_MAX_CACHED_SUBXIDS
+ */
+struct ReorderBuffer
+{
+	/*
+	 * Should snapshots for decoding be collected. If many catalog changes
+	 * happen this can be considerably expensive.
+	 */
+	bool		build_snapshots;
+
+	TransactionId last_txn;
+	ReorderBufferTXN *last_txn_cache;
+	HTAB	   *by_txn;
+
+	ReorderBufferBeginCB begin;
+	ReorderBufferApplyChangeCB apply_change;
+	ReorderBufferCommitCB commit;
+
+	void	   *private_data;
+
+	MemoryContext context;
+
+	/*
+	 * we don't want to repeatedly (de-)allocated those structs, so cache them
+	 * for reusage.
+	 */
+	dlist_head cached_transactions;
+	Size		nr_cached_transactions;
+
+	dlist_head cached_changes;
+	Size		nr_cached_changes;
+
+	slist_head cached_tuplebufs;
+	Size		nr_cached_tuplebufs;
+};
+
+
+ReorderBuffer *ReorderBufferAllocate(void);
+
+void ReorderBufferFree(ReorderBuffer *);
+
+ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *);
+
+void ReorderBufferReturnTupleBuf(ReorderBuffer *cache, ReorderBufferTupleBuf * tuple);
+
+/*
+ * Returns a (potentically preallocated) change struct. Its lifetime is managed
+ * by the reorderbuffer module.
+ *
+ * If not added to a transaction with ReorderBufferAddChange it needs to be
+ * returned via ReorderBufferReturnChange
+ *
+ * FIXME: better name
+ */
+ReorderBufferChange *
+			ReorderBufferGetChange(ReorderBuffer *);
+
+/*
+ * Return an unused ReorderBufferChange struct
+ */
+void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *);
+
+
+/*
+ * record the transaction as in-progress if not already done, add the current
+ * change.
+ *
+ * We have a one-entry cache for lookin up the current ReorderBufferTXN so we
+ * don't need to do a full hash-lookup if the same xid is used
+ * sequentially. Them being used multiple times that way is rather frequent.
+ */
+void ReorderBufferAddChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *);
+
+/*
+ *
+ */
+void ReorderBufferCommit(ReorderBuffer *, TransactionId, XLogRecPtr lsn);
+
+void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr lsn);
+
+void ReorderBufferAbort(ReorderBuffer *, TransactionId, XLogRecPtr lsn);
+
+/*
+ * if lsn == InvalidXLogRecPtr this is the first snap for the transaction
+ *
+ * most callers don't need snapshot.h, so we use struct SnapshotData instead
+ */
+void ReorderBufferAddBaseSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap);
+
+/*
+ * Will only be called for command ids > 1
+ */
+void ReorderBufferAddNewCommandId(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
+								  CommandId cid);
+
+void ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
+								  RelFileNode node, ItemPointerData pt,
+								  CommandId cmin, CommandId cmax, CommandId combocid);
+
+void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
+								   Size nmsgs, SharedInvalidationMessage* msgs);
+
+bool ReorderBufferIsXidKnown(ReorderBuffer *cache, TransactionId xid);
+
+/*
+ * Announce that tx does timetravel. Relevant for the whole toplevel/subtxn
+ * tree.
+ */
+void ReorderBufferXidSetTimetravel(ReorderBuffer *cache, TransactionId xid);
+
+/*
+ * Does the transaction indicated by 'xid' do timetravel?
+ */
+bool ReorderBufferXidDoesTimetravel(ReorderBuffer *cache, TransactionId xid);
+
+bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *cache, TransactionId xid);
+
+#endif
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
new file mode 100644
index 0000000..e8c5fcb
--- /dev/null
+++ b/src/include/replication/snapbuild.h
@@ -0,0 +1,128 @@
+/*-------------------------------------------------------------------------
+ *
+ * snapbuild.h
+ *	  Exports from replication/logical/snapbuild.c.
+ *
+ * Portions Copyright (c) 2012, PostgreSQL Global Development Group
+ *
+ * src/include/replication/snapbuild.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef SNAPBUILD_H
+#define SNAPBUILD_H
+
+#include "replication/reorderbuffer.h"
+
+#include "utils/hsearch.h"
+#include "utils/snapshot.h"
+#include "access/htup.h"
+
+typedef enum
+{
+	/*
+	 * Initial state, we can't do much yet.
+	 */
+	SNAPBUILD_START,
+
+	/*
+	 * We have collected enough information to decode tuples in transactions
+	 * that started after this.
+	 *
+	 * Once we reached this we start to collect changes. We cannot apply them
+	 * yet because the might be based on transactions that were still running
+	 * when we reached them yet.
+	 */
+	SNAPBUILD_FULL_SNAPSHOT,
+
+	/*
+	 * Found a point after hitting built_full_snapshot where all transactions
+	 * that were running at that point finished. Till we reach that we hold
+	 * off calling any commit callbacks.
+	 */
+	SNAPBUILD_CONSISTENT
+}	SnapBuildState;
+
+typedef enum
+{
+	SNAPBUILD_SKIP,
+	SNAPBUILD_DECODE
+}	SnapBuildAction;
+
+typedef struct Snapstate
+{
+	SnapBuildState state;
+
+	/* all transactions smaller than this have committed/aborted */
+	TransactionId xmin;
+
+	/* all transactions bigger than this are uncommitted */
+	TransactionId xmax;
+
+	/*
+	 * All transactions in this window have to be checked via the running
+	 * array. This will only be used initially till we are past xmax_running.
+	 *
+	 * Note that we initially assume treat already running transactions to
+	 * have catalog modifications because we don't have enough information
+	 * about them to properly judge that.
+	 */
+	TransactionId xmin_running;
+	TransactionId xmax_running;
+
+	/*
+	 * array of running transactions.
+	 *
+	 * Kept in xidComparator order so it can be searched with bsearch().
+	 */
+	TransactionId *running;
+	/* how many transactions are still running */
+	size_t		nrrunning;
+
+	/*
+	 * we need to keep track of the amount of tracked transactions separately
+	 * from nrrunning_space as nrunning_initial gives the range of valid xids
+	 * in the array so bsearch() can work.
+	 */
+	size_t		nrrunning_initial;
+
+	XLogRecPtr transactions_after;
+
+	/*
+	 * Transactions which could have catalog changes that committed between
+	 * xmin and xmax
+	 */
+	size_t		nrcommitted;
+	size_t		nrcommitted_space;
+	/*
+	 * Array of committed transactions that have modified the catalog.
+	 *
+	 * As this array is frequently modified we do *not* keep it in
+	 * xidComparator order. Instead we sort the array when building &
+	 * distributing a snapshot.
+	 */
+	TransactionId *committed;
+
+	/*
+	 * Snapshot thats valid to see all committed transactions that see catalog
+	 * modifications.
+	 */
+	Snapshot snapshot;
+}	Snapstate;
+
+extern Snapstate *AllocateSnapshotBuilder(ReorderBuffer *cache);
+
+extern void	FreeSnapshotBuilder(Snapstate *cache);
+
+struct XLogRecordBuffer;
+
+extern SnapBuildAction SnapBuildDecodeCallback(ReorderBuffer *cache, Snapstate *snapstate, struct XLogRecordBuffer *buf);
+
+extern HeapTuple LookupTableByRelFileNode(RelFileNode *r);
+
+extern bool SnapBuildHasCatalogChanges(Snapstate *snapstate, TransactionId xid,
+                                       RelFileNode *relfilenode);
+
+extern void SnapBuildSnapDecRefcount(Snapshot snap);
+
+#endif   /* SNAPBUILD_H */
diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index df8e951..b2d9434 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -24,6 +24,7 @@ extern bool wake_wal_senders;
 
 /* user-settable parameters */
 extern int	max_wal_senders;
+extern int	max_logical_slots;
 extern int	wal_sender_timeout;
 
 extern void InitWalSender(void);
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 66234cd..c712659 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -66,6 +66,28 @@ typedef struct WalSnd
 
 extern WalSnd *MyWalSnd;
 
+typedef struct
+{
+	TransactionId xmin;
+	NameData      name;
+	NameData      plugin;
+
+	XLogRecPtr	  last_required_checkpoint;
+	XLogRecPtr	  confirmed_flush;
+
+	TransactionId candidate_xmin;
+	XLogRecPtr	  candidate_xmin_after;
+
+	/* is this slot defined */
+	bool          in_use;
+	/* is somebody streaming out changes for this slot */
+	bool          active;
+	slock_t		mutex;
+} LogicalWalSnd;
+
+extern LogicalWalSnd *MyLogicalWalSnd;
+
+
 /* There is one WalSndCtl struct for the whole database cluster */
 typedef struct
 {
@@ -88,12 +110,14 @@ typedef struct
 	 */
 	bool		sync_standbys_defined;
 
+	TransactionId logical_xmin;
+
 	WalSnd		walsnds[1];		/* VARIABLE LENGTH ARRAY */
+	LogicalWalSnd logical_walsnds[1];		/* VARIABLE LENGTH ARRAY */
 } WalSndCtlData;
 
 extern WalSndCtlData *WalSndCtl;
 
-
 extern void WalSndSetState(WalSndState state);
 extern void XLogRead(char *buf, XLogRecPtr startptr, Size count);
 
@@ -109,4 +133,12 @@ extern void replication_scanner_finish(void);
 
 extern Node *replication_parse_result;
 
+/* change logical xmin */
+extern void IncreaseLogicalXminForSlot(XLogRecPtr lsn, TransactionId xmin);
+
+/* logical wal sender data gathering functions */
+extern void WalSndWriteData(StringInfo data);
+extern void WalSndPrepareWrite(StringInfo out, XLogRecPtr lsn);
+
+
 #endif   /* _WALSENDER_PRIVATE_H */
diff --git a/src/include/storage/itemptr.h b/src/include/storage/itemptr.h
index 331812b..fc5f86d 100644
--- a/src/include/storage/itemptr.h
+++ b/src/include/storage/itemptr.h
@@ -116,6 +116,9 @@ typedef ItemPointerData *ItemPointer;
 /*
  * ItemPointerCopy
  *		Copies the contents of one disk item pointer to another.
+ *
+ * Should there ever be padding in an ItemPointer this would need to be handled
+ * differently as its used in hashes.
  */
 #define ItemPointerCopy(fromPointer, toPointer) \
 ( \
diff --git a/src/include/storage/sinval.h b/src/include/storage/sinval.h
index bcf2c81..1b68bab 100644
--- a/src/include/storage/sinval.h
+++ b/src/include/storage/sinval.h
@@ -136,4 +136,6 @@ extern void ProcessCommittedInvalidationMessages(SharedInvalidationMessage *msgs
 									 int nmsgs, bool RelcacheInitFileInval,
 									 Oid dbid, Oid tsid);
 
+extern void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg);
+
 #endif   /* SINVAL_H */
diff --git a/src/include/utils/tqual.h b/src/include/utils/tqual.h
index b129ae9..2e9a7d8 100644
--- a/src/include/utils/tqual.h
+++ b/src/include/utils/tqual.h
@@ -39,7 +39,8 @@ extern PGDLLIMPORT SnapshotData SnapshotToastData;
 
 /* This macro encodes the knowledge of which snapshots are MVCC-safe */
 #define IsMVCCSnapshot(snapshot)  \
-	((snapshot)->satisfies == HeapTupleSatisfiesMVCC)
+	((snapshot)->satisfies == HeapTupleSatisfiesMVCC || \
+	 (snapshot)->satisfies == HeapTupleSatisfiesMVCCDuringDecoding)
 
 /*
  * HeapTupleSatisfiesVisibility
@@ -89,4 +90,32 @@ extern bool HeapTupleIsSurelyDead(HeapTuple htup,
 extern void HeapTupleSetHintBits(HeapTupleHeader tuple, Buffer buffer,
 					 uint16 infomask, TransactionId xid);
 
+/*
+ * Special "satisfies" routines used during decoding xlog from a different
+ * point of lsn. Also used for timetravel SnapshotNow's.
+ */
+extern bool HeapTupleSatisfiesMVCCDuringDecoding(HeapTuple htup,
+                                                 Snapshot snapshot, Buffer buffer);
+
+/*
+ * install the 'snapshot_now' snapshot as a timetravelling snapshot replacing
+ * the normal SnapshotNow behaviour. This snapshot needs to have been created
+ * by snapbuild.c otherwise you will see crashes!
+ *
+ * FIXME: We need something resembling the real SnapshotNow to handle things
+ * like enum lookups from indices correctly.
+ */
+extern void SetupDecodingSnapshots(Snapshot snapshot_now, HTAB *tuplecids);
+extern void RevertFromDecodingSnapshots(void);
+
+/*
+ * resolve combocids and overwritten cmin values
+ *
+ * To avoid leaking to much knowledge about the reorderbuffer this is
+ * implemented in reorderbuffer.c not tqual.c.
+ */
+extern bool ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data, HeapTuple htup,
+										  Buffer buffer,
+										  CommandId *cmin, CommandId *cmax);
+
 #endif   /* TQUAL_H */
