diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile
index 9d9ec87..ae7f6b1 100644
--- a/src/backend/replication/Makefile
+++ b/src/backend/replication/Makefile
@@ -17,6 +17,8 @@ override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
 OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \
 	repl_gram.o syncrep.o
 
+SUBDIRS = logical
+
 include $(top_srcdir)/src/backend/common.mk
 
 # repl_scanner is compiled as part of repl_gram
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
new file mode 100644
index 0000000..4e56769
--- /dev/null
+++ b/src/backend/replication/logical/Makefile
@@ -0,0 +1,19 @@
+#-------------------------------------------------------------------------
+#
+# Makefile--
+#    Makefile for src/backend/replication/logical
+#
+# IDENTIFICATION
+#    src/backend/replication/logical/Makefile
+#
+#-------------------------------------------------------------------------
+
+subdir = src/backend/replication/logical
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
+
+OBJS = applycache.o decode.o snapbuild.o logicalfuncs.o
+
+include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/replication/logical/applycache.c b/src/backend/replication/logical/applycache.c
new file mode 100644
index 0000000..1e08371
--- /dev/null
+++ b/src/backend/replication/logical/applycache.c
@@ -0,0 +1,574 @@
+/*-------------------------------------------------------------------------
+ *
+ * applycache.c
+ *
+ * PostgreSQL logical replay "cache" management
+ *
+ *
+ * Portions Copyright (c) 2012, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/replication/applycache.c
+ *
+ */
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "access/xact.h"
+#include "catalog/pg_class.h"
+#include "catalog/pg_control.h"
+#include "replication/applycache.h"
+
+#include "lib/simpleheap.h"
+
+#include "utils/ilist.h"
+#include "utils/memutils.h"
+#include "utils/relcache.h"
+#include "utils/tqual.h"
+#include "utils/syscache.h"
+
+
+const Size max_memtries = 1<<16;
+
+const size_t max_cached_changes = 1024;
+const size_t max_cached_tuplebufs = 1024; /* ~8MB */
+const size_t max_cached_transactions = 512;
+
+typedef struct ApplyCacheTXNByIdEnt
+{
+	TransactionId xid;
+	ApplyCacheTXN* txn;
+} ApplyCacheTXNByIdEnt;
+
+static ApplyCacheTXN* ApplyCacheGetTXN(ApplyCache *cache);
+static void ApplyCacheReturnTXN(ApplyCache *cache, ApplyCacheTXN* txn);
+
+static ApplyCacheTXN* ApplyCacheTXNByXid(ApplyCache*, TransactionId xid,
+                                         bool create, bool* is_new);
+
+
+ApplyCache*
+ApplyCacheAllocate(void)
+{
+	ApplyCache* cache = (ApplyCache*)malloc(sizeof(ApplyCache));
+	HASHCTL         hash_ctl;
+
+	if (!cache)
+		elog(ERROR, "Could not allocate the ApplyCache");
+
+	cache->build_snapshots = true;
+
+	memset(&hash_ctl, 0, sizeof(hash_ctl));
+
+	cache->context = AllocSetContextCreate(TopMemoryContext,
+	                                       "ApplyCache",
+	                                       ALLOCSET_DEFAULT_MINSIZE,
+	                                       ALLOCSET_DEFAULT_INITSIZE,
+	                                       ALLOCSET_DEFAULT_MAXSIZE);
+
+	hash_ctl.keysize = sizeof(TransactionId);
+	hash_ctl.entrysize = sizeof(ApplyCacheTXNByIdEnt);
+	hash_ctl.hash = tag_hash;
+	hash_ctl.hcxt = cache->context;
+
+	cache->by_txn = hash_create("ApplyCacheByXid", 1000, &hash_ctl,
+	                            HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
+
+	cache->nr_cached_transactions = 0;
+	cache->nr_cached_changes = 0;
+	cache->nr_cached_tuplebufs = 0;
+
+	ilist_d_init(&cache->cached_transactions);
+	ilist_d_init(&cache->cached_changes);
+	ilist_s_init(&cache->cached_tuplebufs);
+
+	return cache;
+}
+
+void ApplyCacheFree(ApplyCache* cache)
+{
+	/* FIXME: check for in-progress transactions */
+	/* FIXME: clean up cached transaction */
+	/* FIXME: clean up cached changes */
+	/* FIXME: clean up cached tuplebufs */
+	hash_destroy(cache->by_txn);
+	free(cache);
+}
+
+static ApplyCacheTXN* ApplyCacheGetTXN(ApplyCache *cache)
+{
+	ApplyCacheTXN* txn;
+
+	if (cache->nr_cached_transactions)
+	{
+		cache->nr_cached_transactions--;
+		txn = ilist_container(ApplyCacheTXN, node,
+		                      ilist_d_pop_front(&cache->cached_transactions));
+	}
+	else
+	{
+		txn = (ApplyCacheTXN*)
+			malloc(sizeof(ApplyCacheTXN));
+
+		if (!txn)
+			elog(ERROR, "Could not allocate a ApplyCacheTXN struct");
+	}
+
+	memset(txn, 0, sizeof(ApplyCacheTXN));
+	ilist_d_init(&txn->changes);
+	ilist_d_init(&txn->subtxns);
+	ilist_d_init(&txn->snapshots);
+	ilist_d_init(&txn->commandids);
+
+	return txn;
+}
+
+void ApplyCacheReturnTXN(ApplyCache *cache, ApplyCacheTXN* txn)
+{
+	if(cache->nr_cached_transactions < max_cached_transactions){
+		cache->nr_cached_transactions++;
+		ilist_d_push_front(&cache->cached_transactions, &txn->node);
+	}
+	else{
+		free(txn);
+	}
+}
+
+ApplyCacheChange*
+ApplyCacheGetChange(ApplyCache* cache)
+{
+	ApplyCacheChange* change;
+
+	if (cache->nr_cached_changes)
+	{
+		cache->nr_cached_changes--;
+		change = ilist_container(ApplyCacheChange, node,
+		                         ilist_d_pop_front(&cache->cached_changes));
+	}
+	else
+	{
+		change = (ApplyCacheChange*)malloc(sizeof(ApplyCacheChange));
+
+		if (!change)
+			elog(ERROR, "Could not allocate a ApplyCacheChange struct");
+	}
+
+
+	memset(change, 0, sizeof(ApplyCacheChange));
+	return change;
+}
+
+void
+ApplyCacheReturnChange(ApplyCache* cache, ApplyCacheChange* change)
+{
+	switch(change->action){
+		case APPLY_CACHE_CHANGE_INSERT:
+		case APPLY_CACHE_CHANGE_UPDATE:
+		case APPLY_CACHE_CHANGE_DELETE:
+			if (change->newtuple)
+			{
+				ApplyCacheReturnTupleBuf(cache, change->newtuple);
+				change->newtuple = NULL;
+			}
+
+			if (change->oldtuple)
+			{
+				ApplyCacheReturnTupleBuf(cache, change->oldtuple);
+				change->oldtuple = NULL;
+			}
+
+			if (change->table)
+			{
+				heap_freetuple(change->table);
+				change->table = NULL;
+			}
+			break;
+		case APPLY_CACHE_CHANGE_SNAPSHOT:
+			if (change->snapshot)
+			{
+				/* FIXME: free snapshot */
+				change->snapshot = NULL;
+			}
+		case APPLY_CACHE_CHANGE_COMMAND_ID:
+			break;
+	}
+
+	if(cache->nr_cached_changes < max_cached_changes){
+		cache->nr_cached_changes++;
+		ilist_d_push_front(&cache->cached_changes, &change->node);
+	}
+	else{
+		free(change);
+	}
+}
+
+ApplyCacheTupleBuf*
+ApplyCacheGetTupleBuf(ApplyCache* cache)
+{
+	ApplyCacheTupleBuf* tuple;
+
+	if (cache->nr_cached_tuplebufs)
+	{
+		cache->nr_cached_tuplebufs--;
+		tuple = ilist_container(ApplyCacheTupleBuf, node,
+		                        ilist_s_pop_front(&cache->cached_tuplebufs));
+	}
+	else
+	{
+		tuple =
+			(ApplyCacheTupleBuf*)malloc(sizeof(ApplyCacheTupleBuf));
+
+		if (!tuple)
+			elog(ERROR, "Could not allocate a ApplyCacheTupleBuf struct");
+	}
+
+	return tuple;
+}
+
+void
+ApplyCacheReturnTupleBuf(ApplyCache* cache, ApplyCacheTupleBuf* tuple)
+{
+	if(cache->nr_cached_tuplebufs < max_cached_tuplebufs){
+		cache->nr_cached_tuplebufs++;
+		ilist_s_push_front(&cache->cached_tuplebufs, &tuple->node);
+	}
+	else{
+		free(tuple);
+	}
+}
+
+
+static
+ApplyCacheTXN*
+ApplyCacheTXNByXid(ApplyCache* cache, TransactionId xid, bool create, bool* is_new)
+{
+	ApplyCacheTXNByIdEnt* ent;
+	bool found;
+
+	/* FIXME: add one entry fast-path cache */
+
+	ent = (ApplyCacheTXNByIdEnt*)
+		hash_search(cache->by_txn,
+		            (void *)&xid,
+		            (create ? HASH_ENTER : HASH_FIND),
+		            &found);
+
+	if (found)
+	{
+#ifdef VERBOSE_DEBUG
+		elog(LOG, "found cache entry for %u at %p", xid, ent);
+#endif
+	}
+	else
+	{
+#ifdef VERBOSE_DEBUG
+		elog(LOG, "didn't find cache entry for %u in %p at %p, creating %u",
+		     xid, cache, ent, create);
+#endif
+	}
+
+	if (!found && !create)
+		return NULL;
+
+	if (!found)
+	{
+		ent->txn = ApplyCacheGetTXN(cache);
+		ent->txn->xid = xid;
+	}
+
+	if (is_new)
+		*is_new = !found;
+
+	return ent->txn;
+}
+
+void
+ApplyCacheAddChange(ApplyCache* cache, TransactionId xid, XLogRecPtr lsn,
+                    ApplyCacheChange* change)
+{
+	ApplyCacheTXN* txn = ApplyCacheTXNByXid(cache, xid, true, NULL);
+	txn->lsn = lsn;
+	ilist_d_push_back(&txn->changes, &change->node);
+}
+
+
+void
+ApplyCacheCommitChild(ApplyCache* cache, TransactionId xid,
+                      TransactionId subxid, XLogRecPtr lsn)
+{
+	ApplyCacheTXN* txn;
+	ApplyCacheTXN* subtxn;
+
+	subtxn = ApplyCacheTXNByXid(cache, subxid, false, NULL);
+
+	/*
+	 * No need to do anything if that subtxn didn't contain any changes
+	 */
+	if (!subtxn)
+		return;
+
+	subtxn->lsn = lsn;
+
+	txn = ApplyCacheTXNByXid(cache, xid, true, NULL);
+
+	ilist_d_push_back(&txn->subtxns, &subtxn->node);
+}
+
+typedef struct ApplyCacheIterTXNState
+{
+	simpleheap *heap;
+} ApplyCacheIterTXNState;
+
+static int
+ApplyCacheIterCompare(simpleheap_kv* a, simpleheap_kv* b)
+{
+	ApplyCacheChange *change_a = ilist_container(ApplyCacheChange, node, a->key);
+	ApplyCacheChange *change_b = ilist_container(ApplyCacheChange, node, b->key);
+
+	if (change_a->lsn < change_b->lsn)
+		return -1;
+
+	else if (change_a->lsn == change_b->lsn)
+		return 0;
+
+	return 1;
+}
+
+static ApplyCacheIterTXNState*
+ApplyCacheIterTXNInit(ApplyCache* cache, ApplyCacheTXN* txn);
+
+static ApplyCacheChange*
+ApplyCacheIterTXNNext(ApplyCache* cache, ApplyCacheIterTXNState* state);
+
+static void
+ApplyCacheIterTXNFinish(ApplyCache* cache, ApplyCacheIterTXNState* state);
+
+
+
+static ApplyCacheIterTXNState*
+ApplyCacheIterTXNInit(ApplyCache* cache, ApplyCacheTXN* txn)
+{
+	size_t nr_txns = 0; /* main txn */
+	ApplyCacheIterTXNState *state;
+	ilist_d_node* cur_txn_i;
+	ApplyCacheTXN *cur_txn;
+	ApplyCacheChange *cur_change;
+
+	if (!ilist_d_is_empty(&txn->changes))
+		nr_txns++;
+
+	/* count how large our heap must be */
+	ilist_d_foreach(cur_txn_i, &txn->subtxns)
+	{
+		cur_txn = ilist_container(ApplyCacheTXN, node, cur_txn_i);
+
+		if (!ilist_d_is_empty(&cur_txn->changes))
+			nr_txns++;
+	}
+
+	/* allocate array for our heap */
+	state = palloc0(sizeof(ApplyCacheIterTXNState));
+
+	state->heap = simpleheap_allocate(nr_txns);
+	state->heap->compare = ApplyCacheIterCompare;
+
+	/* fill array with elements, heap condition not yet fullfilled */
+	if (!ilist_d_is_empty(&txn->changes))
+	{
+		cur_change = ilist_d_front_unchecked(ApplyCacheChange, node, &txn->changes);
+
+		simpleheap_add_unordered(state->heap, &cur_change->node, txn);
+	}
+
+	ilist_d_foreach(cur_txn_i, &txn->subtxns)
+	{
+		cur_txn = ilist_container(ApplyCacheTXN, node, cur_txn_i);
+
+		if (!ilist_d_is_empty(&cur_txn->changes))
+		{
+			cur_change = ilist_d_front_unchecked(ApplyCacheChange, node, &cur_txn->changes);
+
+			simpleheap_add_unordered(state->heap, &cur_change->node, txn);
+		}
+	}
+
+	/* make the array fullfill the heap property */
+	simpleheap_build(state->heap);
+	return state;
+}
+
+static ApplyCacheChange*
+ApplyCacheIterTXNNext(ApplyCache* cache, ApplyCacheIterTXNState* state)
+{
+	ApplyCacheTXN *txn = NULL;
+	ApplyCacheChange *change;
+	simpleheap_kv *kv;
+
+	/*
+	 * Do a k-way merge between transactions/subtransactions to extract changes
+	 * merged by the lsn of their change. For that we model the current heads
+	 * of the different transactions as a binary heap so we easily know which
+	 * (sub-)transaction has the change with the smalles lsn next.
+	 */
+
+	/* nothing there anymore */
+	if (state->heap->size == 0)
+		return NULL;
+
+	kv = simpleheap_first(state->heap);
+
+	change = ilist_container(ApplyCacheChange, node, kv->key);
+
+	txn = (ApplyCacheTXN*)kv->value;
+
+	if (!ilist_d_has_next(&txn->changes, &change->node))
+	{
+		simpleheap_remove_first(state->heap);
+	}
+	else
+	{
+		simpleheap_change_key(state->heap, change->node.next);
+	}
+	return change;
+}
+
+static void
+ApplyCacheIterTXNFinish(ApplyCache* cache, ApplyCacheIterTXNState* state)
+{
+	simpleheap_free(state->heap);
+	pfree(state);
+}
+
+
+static void
+ApplyCacheCleanupTXN(ApplyCache* cache, ApplyCacheTXN* txn)
+{
+	bool found;
+	ilist_d_node* cur_change, *next_change;
+	ilist_d_node* cur_txn, *next_txn;
+
+	/* cleanup transactions & changes */
+	ilist_d_foreach_modify (cur_txn, next_txn, &txn->subtxns)
+	{
+		ApplyCacheTXN* subtxn = ilist_container(ApplyCacheTXN, node, cur_txn);
+
+		ilist_d_foreach_modify (cur_change, next_change, &subtxn->changes)
+		{
+			ApplyCacheChange* change =
+				ilist_container(ApplyCacheChange, node, cur_change);
+
+			ApplyCacheReturnChange(cache, change);
+		}
+		ApplyCacheReturnTXN(cache, subtxn);
+	}
+
+	ilist_d_foreach_modify (cur_change, next_change, &txn->changes)
+	{
+		ApplyCacheChange* change =
+			ilist_container(ApplyCacheChange, node, cur_change);
+
+		ApplyCacheReturnChange(cache, change);
+	}
+
+	/* now remove reference from cache */
+	hash_search(cache->by_txn,
+	            (void *)&txn->xid,
+	            HASH_REMOVE,
+	            &found);
+	Assert(found);
+
+	ApplyCacheReturnTXN(cache, txn);
+}
+void
+ApplyCacheCommit(ApplyCache* cache, TransactionId xid, XLogRecPtr lsn)
+{
+	ApplyCacheTXN* txn = ApplyCacheTXNByXid(cache, xid, false, NULL);
+	ApplyCacheIterTXNState* iterstate;
+	ApplyCacheChange* change;
+	CommandId command_id;
+	Snapshot snapshot_mvcc = NULL;
+
+	if (!txn)
+		return;
+
+	txn->lsn = lsn;
+
+	cache->begin(cache, txn);
+
+	PG_TRY();
+	{
+		iterstate = ApplyCacheIterTXNInit(cache, txn);
+		while((change = ApplyCacheIterTXNNext(cache, iterstate)))
+		{
+			switch(change->action){
+				case APPLY_CACHE_CHANGE_INSERT:
+				case APPLY_CACHE_CHANGE_UPDATE:
+				case APPLY_CACHE_CHANGE_DELETE:
+					Assert(snapshot_mvcc != NULL);
+					cache->apply_change(cache, txn, txn /*FIXME*/, change);
+					break;
+				case APPLY_CACHE_CHANGE_SNAPSHOT:
+					/*
+					 * the first snapshot seen in a transaction is its mvcc
+					 * snapshot
+					 */
+					if (!snapshot_mvcc)
+						snapshot_mvcc = change->snapshot;
+					SetupDecodingSnapshots(change->snapshot);
+					break;
+				case APPLY_CACHE_CHANGE_COMMAND_ID:
+					/* FIXME */
+					command_id = change->command_id;
+					break;
+			}
+		}
+
+		ApplyCacheIterTXNFinish(cache, iterstate);
+
+		cache->commit(cache, txn);
+
+		ApplyCacheCleanupTXN(cache, txn);
+		RevertFromDecodingSnapshots();
+	}
+	PG_CATCH();
+	{
+		RevertFromDecodingSnapshots();
+		PG_RE_THROW();
+	}
+	PG_END_TRY();
+}
+
+void
+ApplyCacheAbort(ApplyCache* cache, TransactionId xid, XLogRecPtr lsn)
+{
+	ApplyCacheTXN* txn = ApplyCacheTXNByXid(cache, xid, false, NULL);
+
+	/* no changes in this commit */
+	if (!txn)
+		return;
+
+	ApplyCacheCleanupTXN(cache, txn);
+}
+
+bool
+ApplyCacheIsXidKnown(ApplyCache* cache, TransactionId xid)
+{
+	bool is_new;
+	/* FIXME: for efficiency reasons we create the xid here, that doesn't seem
+	 * like a good idea though */
+	ApplyCacheTXNByXid(cache, xid, true, &is_new);
+
+	/* no changes in this commit */
+	return !is_new;
+}
+
+void
+ApplyCacheAddBaseSnapshot(ApplyCache* cache, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
+{
+	ApplyCacheChange *change = ApplyCacheGetChange(cache);
+	change->snapshot = snap;
+	change->action = APPLY_CACHE_CHANGE_SNAPSHOT;
+
+	ApplyCacheAddChange(cache, xid, lsn, change);
+}
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
new file mode 100644
index 0000000..244dd7b
--- /dev/null
+++ b/src/backend/replication/logical/decode.c
@@ -0,0 +1,366 @@
+/*-------------------------------------------------------------------------
+ *
+ * decode.c
+ *
+ * Decodes wal records from an xlogreader.h callback into an applycache.
+ *
+ * Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group
+ *
+ * NOTE:
+ *
+ *    Its possible that the separation between decode.c and snapbuild.c is a
+ *    bit too strict, in the end they just about have the same struct.
+ *
+ * IDENTIFICATION
+ *	  src/backend/replication/logical/decode.c
+ *
+ */
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "access/transam.h"
+#include "access/xlog_internal.h"
+#include "access/xact.h"
+#include "access/heapam_xlog.h"
+
+#include "catalog/pg_control.h"
+
+#include "replication/applycache.h"
+#include "replication/decode.h"
+#include "replication/snapbuild.h"
+
+#include "utils/memutils.h"
+#include "utils/syscache.h"
+#include "utils/lsyscache.h"
+
+static void DecodeXLogTuple(char* data, Size len, ApplyCacheTupleBuf* tuple);
+
+static void DecodeInsert(ApplyCache *cache, XLogRecordBuffer* buf);
+
+static void DecodeUpdate(ApplyCache *cache, XLogRecordBuffer* buf);
+
+static void DecodeDelete(ApplyCache *cache, XLogRecordBuffer* buf);
+
+static void DecodeNewpage(ApplyCache *cache, XLogRecordBuffer* buf);
+static void DecodeMultiInsert(ApplyCache *cache, XLogRecordBuffer* buf);
+
+static void DecodeCommit(ApplyCache* cache, XLogRecordBuffer* buf, TransactionId xid,
+	                     TransactionId *sub_xids, int nsubxacts);
+
+
+void DecodeRecordIntoApplyCache(ReaderApplyState *state, XLogRecordBuffer* buf)
+{
+	XLogRecord* r = &buf->record;
+	uint8 info = r->xl_info & ~XLR_INFO_MASK;
+	ApplyCache *cache = state->apply_cache;
+	SnapBuildAction action;
+
+	/*
+	 * FIXME: The existance of the snapshot builder is pretty obvious to the
+	 * outside right now, that doesn't seem to be very good...
+	 */
+	if(!state->snapstate)
+	{
+		state->snapstate = AllocateSnapshotBuilder(cache);
+	}
+
+	/*
+	 * Call the snapshot builder. It needs to be called before we analyze
+	 * tuples for two reasons:
+	 *
+	 * * Only in the snapshot building logic we know whether we have enough
+	 *   information to decode a particular tuple
+     *
+     * * The Snapshot/CommandIds computed by the SnapshotBuilder need to be
+     *   added to the ApplyCache before we add tuples using them
+	 */
+	action = SnapBuildCallback(cache, state->snapstate, buf);
+
+	if (action == SNAPBUILD_SKIP)
+		return;
+
+	switch (r->xl_rmid)
+	{
+		case RM_HEAP_ID:
+		{
+			info &= XLOG_HEAP_OPMASK;
+			switch (info)
+			{
+				case XLOG_HEAP_INSERT:
+					DecodeInsert(cache, buf);
+					break;
+
+				/* no guarantee that we get an HOT update again, so handle it as a normal update*/
+				case XLOG_HEAP_HOT_UPDATE:
+				case XLOG_HEAP_UPDATE:
+					DecodeUpdate(cache, buf);
+					break;
+
+				case XLOG_HEAP_NEWPAGE:
+					DecodeNewpage(cache, buf);
+					break;
+
+				case XLOG_HEAP_DELETE:
+					DecodeDelete(cache, buf);
+					break;
+				default:
+					break;
+			}
+			break;
+		}
+		case RM_HEAP2_ID:
+		{
+			info &= XLOG_HEAP_OPMASK;
+			switch (info)
+			{
+				case XLOG_HEAP2_MULTI_INSERT:
+					DecodeMultiInsert(cache, buf);
+					break;
+				default:
+					/* everything else here is just physical stuff were not interested in */
+					break;
+			}
+			break;
+		}
+
+		case RM_XACT_ID:
+		{
+			switch (info)
+			{
+				case XLOG_XACT_COMMIT:
+				{
+					TransactionId *sub_xids;
+					xl_xact_commit *xlrec = (xl_xact_commit*)buf->record_data;
+
+					/* FIXME: this is not really allowed if there is no subtransactions */
+					sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
+					DecodeCommit(cache, buf, r->xl_xid, sub_xids, xlrec->nsubxacts);
+
+					break;
+				}
+				case XLOG_XACT_COMMIT_PREPARED:
+				{
+					TransactionId *sub_xids;
+					xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared*)buf->record_data;
+
+					sub_xids = (TransactionId *) &(xlrec->crec.xnodes[xlrec->crec.nrels]);
+
+					DecodeCommit(cache, buf, r->xl_xid, sub_xids,
+					             xlrec->crec.nsubxacts);
+
+					break;
+				}
+				case XLOG_XACT_COMMIT_COMPACT:
+				{
+					xl_xact_commit_compact *xlrec = (xl_xact_commit_compact*)buf->record_data;
+					DecodeCommit(cache, buf, r->xl_xid, xlrec->subxacts,
+					             xlrec->nsubxacts);
+					break;
+				}
+				case XLOG_XACT_ABORT:
+				case XLOG_XACT_ABORT_PREPARED:
+				{
+					TransactionId *sub_xids;
+					xl_xact_abort *xlrec = (xl_xact_abort*)buf->record_data;
+					int i;
+
+					/* FIXME: this is not really allowed if there is no subtransaction */
+					sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
+
+					for(i = 0; i < xlrec->nsubxacts; i++)
+					{
+						ApplyCacheAbort(cache, *sub_xids, buf->origptr);
+						sub_xids += 1;
+					}
+
+					/* TODO: check that this also contains not-yet-aborted subtxns */
+					ApplyCacheAbort(cache, r->xl_xid, buf->origptr);
+
+					elog(WARNING, "ABORT %u", r->xl_xid);
+					break;
+				}
+				case XLOG_XACT_ASSIGNMENT:
+					/*
+					 * XXX: We could reassign transactions to the parent here
+					 * to save space and effort when merging transactions at
+					 * commit.
+					 */
+					break;
+				case XLOG_XACT_PREPARE:
+					/*
+					 * FXIME: we should replay the transaction and prepare it
+					 * as well.
+					 */
+					break;
+				default:
+					break;
+					;
+			}
+			break;
+		}
+		case RM_XLOG_ID:
+		{
+			switch (info)
+			{
+				/* this is also used in END_OF_RECOVERY checkpoints */
+				case XLOG_CHECKPOINT_SHUTDOWN:
+					/*
+					 * abort all transactions that still are in progress, they
+					 * aren't in progress anymore.
+					 * do not abort prepared transactions that have been
+					 * prepared for commit.
+					 * FIXME: implement.
+					 */
+					break;
+			}
+		}
+		default:
+			break;
+	}
+}
+
+static void
+DecodeCommit(ApplyCache* cache, XLogRecordBuffer* buf, TransactionId xid,
+             TransactionId *sub_xids, int nsubxacts)
+{
+	int i;
+
+	for (i = 0; i < nsubxacts; i++)
+	{
+		ApplyCacheCommitChild(cache, xid, *sub_xids, buf->origptr);
+		sub_xids++;
+	}
+
+	/* replay actions of all transaction + subtransactions in order */
+	ApplyCacheCommit(cache, xid, buf->origptr);
+}
+
+static void DecodeInsert(ApplyCache *cache, XLogRecordBuffer* buf)
+{
+	XLogRecord* r = &buf->record;
+	xl_heap_insert *xlrec = (xl_heap_insert *) buf->record_data;
+
+	ApplyCacheChange* change;
+
+	if (r->xl_info & XLR_BKP_BLOCK_1
+	    && r->xl_len < (SizeOfHeapUpdate + SizeOfHeapHeader))
+	{
+		elog(FATAL, "huh, no tuple data on wal_level = logical?");
+	}
+
+	change = ApplyCacheGetChange(cache);
+	change->action = APPLY_CACHE_CHANGE_INSERT;
+
+	memcpy(&change->relnode, &xlrec->target.node, sizeof(RelFileNode));
+
+	change->newtuple = ApplyCacheGetTupleBuf(cache);
+
+	DecodeXLogTuple((char*)xlrec + SizeOfHeapInsert,
+	                r->xl_len - SizeOfHeapInsert,
+	                change->newtuple);
+
+	ApplyCacheAddChange(cache, r->xl_xid, buf->origptr, change);
+}
+
+static void
+DecodeUpdate(ApplyCache *cache, XLogRecordBuffer* buf)
+{
+	XLogRecord* r = &buf->record;
+	xl_heap_update *xlrec = (xl_heap_update *) buf->record_data;
+
+
+	ApplyCacheChange* change;
+
+	if ((r->xl_info & XLR_BKP_BLOCK_1 || r->xl_info & XLR_BKP_BLOCK_2) &&
+	    (r->xl_len < (SizeOfHeapUpdate + SizeOfHeapHeader)))
+	{
+		elog(FATAL, "huh, no tuple data on wal_level = logical?");
+	}
+
+	change = ApplyCacheGetChange(cache);
+	change->action = APPLY_CACHE_CHANGE_UPDATE;
+
+	memcpy(&change->relnode, &xlrec->target.node, sizeof(RelFileNode));
+
+	/* FIXME: need to save the old tuple as well if we want primary key changes to work. */
+	change->newtuple = ApplyCacheGetTupleBuf(cache);
+
+	DecodeXLogTuple((char*)xlrec + SizeOfHeapUpdate,
+	                r->xl_len - SizeOfHeapUpdate,
+	                change->newtuple);
+
+	ApplyCacheAddChange(cache, r->xl_xid, buf->origptr, change);
+}
+
+static void DecodeDelete(ApplyCache *cache, XLogRecordBuffer* buf)
+{
+	XLogRecord* r = &buf->record;
+
+	xl_heap_delete *xlrec = (xl_heap_delete *) buf->record_data;
+
+	ApplyCacheChange* change;
+
+	change = ApplyCacheGetChange(cache);
+	change->action = APPLY_CACHE_CHANGE_DELETE;
+
+	memcpy(&change->relnode, &xlrec->target.node, sizeof(RelFileNode));
+
+	if (r->xl_len <= (SizeOfHeapDelete + SizeOfHeapHeader))
+	{
+		elog(FATAL, "huh, no primary key for a delete on wal_level = logical?");
+	}
+
+	change->oldtuple = ApplyCacheGetTupleBuf(cache);
+
+	DecodeXLogTuple((char*)xlrec + SizeOfHeapDelete,
+	                r->xl_len - SizeOfHeapDelete,
+	                change->oldtuple);
+
+	ApplyCacheAddChange(cache, r->xl_xid, buf->origptr, change);
+}
+
+
+static void
+DecodeNewpage(ApplyCache *cache, XLogRecordBuffer* buf)
+{
+	elog(WARNING, "skipping XLOG_HEAP_NEWPAGE record because we are too dumb");
+}
+
+static void
+DecodeMultiInsert(ApplyCache *cache, XLogRecordBuffer* buf)
+{
+	elog(WARNING, "skipping XLOG_HEAP2_MULTI_INSERT record because we are too dumb");
+}
+
+
+static void DecodeXLogTuple(char* data, Size len, ApplyCacheTupleBuf* tuple)
+{
+	xl_heap_header xlhdr;
+	int datalen = len - SizeOfHeapHeader;
+
+	Assert(datalen >= 0);
+	Assert(datalen <= MaxHeapTupleSize);
+
+	tuple->tuple.t_len = datalen + offsetof(HeapTupleHeaderData, t_bits);
+
+	/* not a disk based tuple */
+	ItemPointerSetInvalid(&tuple->tuple.t_self);
+
+	tuple->tuple.t_tableOid = InvalidOid;
+	tuple->tuple.t_data = &tuple->header;
+
+	/* data is not stored aligned */
+	memcpy((char *) &xlhdr,
+	       data,
+	       SizeOfHeapHeader);
+
+	memset(&tuple->header, 0, sizeof(HeapTupleHeaderData));
+
+	memcpy((char *) &tuple->header + offsetof(HeapTupleHeaderData, t_bits),
+	       data + SizeOfHeapHeader,
+	       datalen);
+
+	tuple->header.t_infomask = xlhdr.t_infomask;
+	tuple->header.t_infomask2 = xlhdr.t_infomask2;
+	tuple->header.t_hoff = xlhdr.t_hoff;
+}
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
new file mode 100644
index 0000000..035c48a
--- /dev/null
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -0,0 +1,237 @@
+/*-------------------------------------------------------------------------
+ *
+ * logicalfuncs.c
+ *
+ *     Support functions for using xlog decoding
+ *
+ * NOTE:
+ *     Nothing in here should be sued for anythign but debugging!
+ *
+ *
+ * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/replication/snapbuild.c
+ *
+ */
+
+#include "postgres.h"
+
+#include "access/xlogreader.h"
+
+#include "catalog/pg_class.h"
+#include "catalog/pg_type.h"
+
+#include "replication/applycache.h"
+#include "replication/decode.h"
+#include "replication/walreceiver.h"
+/*FIXME: XLogRead*/
+#include "replication/walsender_private.h"
+
+#include "utils/inval.h"
+#include "utils/lsyscache.h"
+#include "utils/syscache.h"
+#include "utils/typcache.h"
+
+
+
+/* We don't need no header */
+extern Datum
+decode_xlog(PG_FUNCTION_ARGS);
+
+
+static bool
+replay_record_is_interesting(XLogReaderState* state, XLogRecord* r)
+{
+	return true;
+}
+
+static void
+replay_writeout_data(XLogReaderState* state, char* data, Size len)
+{
+	return;
+}
+
+static void
+replay_finished_record(XLogReaderState* state, XLogRecordBuffer* buf)
+{
+	ReaderApplyState* apply_state = state->private_data;
+	DecodeRecordIntoApplyCache(apply_state, buf);
+}
+
+static void
+replay_read_page(XLogReaderState* state, char* cur_page, XLogRecPtr startptr)
+{
+	XLogPageHeader page_header;
+
+	Assert((startptr % XLOG_BLCKSZ) == 0);
+
+	/* FIXME: more sensible/efficient implementation */
+	XLogRead(cur_page, startptr, XLOG_BLCKSZ);
+
+	page_header = (XLogPageHeader)cur_page;
+
+	if (page_header->xlp_magic != XLOG_PAGE_MAGIC)
+	{
+		elog(FATAL, "page header magic %x, should be %x at %X/%X", page_header->xlp_magic,
+		     XLOG_PAGE_MAGIC, (uint32)(startptr >> 32), (uint32)startptr);
+	}
+}
+
+static
+void decode_begin_txn(ApplyCache* cache, ApplyCacheTXN* txn)
+{
+	elog(WARNING, "BEGIN");
+}
+
+static void
+decode_commit_txn(ApplyCache* cache, ApplyCacheTXN* txn)
+{
+	elog(WARNING, "COMMIT");
+}
+
+/* don't want to include that header */
+extern HeapTuple
+LookupTableByRelFileNode(RelFileNode* r);
+
+
+/* This is is just for demonstration, don't ever use this code for anything real! */
+static void
+decode_change(ApplyCache* cache, ApplyCacheTXN* txn, ApplyCacheTXN* subtxn, ApplyCacheChange* change)
+{
+	InvalidateSystemCaches();
+
+	if (change->action == APPLY_CACHE_CHANGE_INSERT)
+	{
+		StringInfoData s;
+		HeapTuple table = LookupTableByRelFileNode(&change->relnode);
+		Form_pg_class class_form;
+		HeapTuple	typeTuple;
+		Form_pg_type pt;
+		TupleDesc	tupdesc;
+		int			i;
+
+		if (!table)
+		{
+			elog(LOG, "couldn't lookup %u", change->relnode.relNode);
+			return;
+		}
+
+		class_form = (Form_pg_class) GETSTRUCT(table);
+
+		initStringInfo(&s);
+
+		tupdesc = lookup_rowtype_tupdesc(class_form->reltype, -1);
+
+		for (i = 0; i < tupdesc->natts; i++)
+		{
+			Oid			typid, typoutput;
+			bool		typisvarlena;
+			Datum		origval, val;
+			char        *outputstr;
+			bool        isnull;
+			if (tupdesc->attrs[i]->attisdropped)
+				continue;
+			if (tupdesc->attrs[i]->attnum < 0)
+				continue;
+
+			typid = tupdesc->attrs[i]->atttypid;
+
+			typeTuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typid));
+			if (!HeapTupleIsValid(typeTuple))
+				elog(ERROR, "cache lookup failed for type %u", typid);
+			pt = (Form_pg_type) GETSTRUCT(typeTuple);
+
+			appendStringInfo(&s, " %s[%s]",
+			                 NameStr(tupdesc->attrs[i]->attname),
+			                 NameStr(pt->typname));
+
+			getTypeOutputInfo(typid,
+			                  &typoutput, &typisvarlena);
+
+			ReleaseSysCache(typeTuple);
+
+			origval = heap_getattr(&change->newtuple->tuple, i + 1, tupdesc, &isnull);
+
+			if (typisvarlena && !isnull)
+				val = PointerGetDatum(PG_DETOAST_DATUM(origval));
+			else
+				val = origval;
+
+			outputstr = OidOutputFunctionCall(typoutput, val);
+
+			appendStringInfo(&s, ":%s", isnull ? "(null)" : outputstr);
+		}
+		ReleaseTupleDesc(tupdesc);
+
+		elog(WARNING, "tuple is:%s", s.data);
+	}
+}
+
+/* test the xlog decoding infrastructure from lsn, to lsn */
+Datum
+decode_xlog(PG_FUNCTION_ARGS)
+{
+	char* start = PG_GETARG_CSTRING(0);
+	char* end = PG_GETARG_CSTRING(1);
+
+	ApplyCache *apply_cache;
+	XLogReaderState *xlogreader_state = XLogReaderAllocate();
+	ReaderApplyState *apply_state;
+
+	XLogRecPtr startpoint;
+	XLogRecPtr endpoint;
+
+	uint32		hi,
+				lo;
+
+	if (sscanf(start, "%X/%X",
+	           &hi, &lo) != 2)
+		elog(ERROR, "unparseable xlog pos");
+	startpoint = ((uint64) hi) << 32 | lo;
+
+	elog(LOG, "starting to parse at %X/%X", hi, lo);
+
+	if (sscanf(end, "%X/%X",
+	           &hi, &lo) != 2)
+		elog(ERROR, "unparseable xlog pos");
+	endpoint = ((uint64) hi) << 32 | lo;
+
+	elog(LOG, "end parse at %X/%X", hi, lo);
+
+	xlogreader_state->is_record_interesting = replay_record_is_interesting;
+	xlogreader_state->finished_record = replay_finished_record;
+	xlogreader_state->writeout_data = replay_writeout_data;
+	xlogreader_state->read_page = replay_read_page;
+	xlogreader_state->private_data = calloc(1, sizeof(ReaderApplyState));
+
+
+	if (!xlogreader_state->private_data)
+		elog(ERROR, "Could not allocate the ReaderApplyState struct");
+
+	xlogreader_state->startptr = startpoint;
+	xlogreader_state->curptr = startpoint;
+	xlogreader_state->endptr = endpoint;
+
+	apply_state = (ReaderApplyState*)xlogreader_state->private_data;
+
+	/*
+	 * allocate an ApplyCache that will apply data using lowlevel calls
+	 * without type conversion et al. This requires binary compatibility
+	 * between both systems.
+	 * XXX: This would be the place too hook different apply methods, like
+	 * producing sql and applying it.
+	 */
+	apply_cache = ApplyCacheAllocate();
+	apply_cache->begin = decode_begin_txn;
+	apply_cache->apply_change = decode_change;
+	apply_cache->commit = decode_commit_txn;
+
+	apply_state->apply_cache = apply_cache;
+
+	XLogReaderRead(xlogreader_state);
+
+	PG_RETURN_BOOL(true);
+}
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
new file mode 100644
index 0000000..05b176d
--- /dev/null
+++ b/src/backend/replication/logical/snapbuild.c
@@ -0,0 +1,1045 @@
+/*-------------------------------------------------------------------------
+ *
+ * snapbuild.c
+ *
+ *     Support for building timetravel snapshots based on the contents of the
+ *     wal
+ *
+ * NOTE:
+ *     This is complex, in-progress and underdocumented.
+ *
+ *     We build snapshots which can *only* be used to read catalog contents by
+ *     reading the wal stream. The aim is to provide mvcc and SnapshotNow
+ *     snapshots that behave the same as their respective counterparts would
+ *     have at the time the XLogRecord was generated. This is done to provide a
+ *     reliable environment for decoding those records into every format that
+ *     pleases the user of an ApplyCache.
+ *
+ *     The percentage of transactions modifying the catalog should be fairly
+ *     small, so instead of keeping track of all running transactions an
+ *     treating everything inside (xmin, xmax) thats not running as commited we
+ *     do the contrary. That, and other implementation details, neccisate using
+ *     our own ->satisfies visibility routine.
+ *     In contrast to a class SnapshotNow which doesn't need any data this
+ *     module provides something that *behaves* like a SnapshotNow would have
+ *     back then (minus some races). Minus some minor things a SnapshotNow
+ *     behaves like a SnapshotMVCC taken exactly in the moment the SnapshotNow
+ *     was used. Because of that we simply model our timetravel-SnapshotNow's
+ *     as mvcc Snapshots.
+ *
+ *     To replace the normal handling of SnapshotNow snapshots use the
+ *     SetupDecodingSnapshots/RevertFromDecodingSnapshots functions. Be careful
+ *     to handle errors properly, otherwise the rest of the session will have
+ *     very strange behaviour.
+ *
+ * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/replication/snapbuild.c
+ *
+ */
+
+#include "postgres.h"
+
+#include "access/heapam_xlog.h"
+#include "access/rmgr.h"
+#include "access/transam.h"
+#include "access/xlogreader.h"
+#include "access/xact.h"
+
+#include "catalog/catalog.h"
+#include "catalog/pg_control.h"
+#include "catalog/pg_class.h"
+#include "catalog/pg_tablespace.h"
+
+#include "miscadmin.h"
+
+#include "replication/applycache.h"
+#include "replication/snapbuild.h"
+
+#include "utils/builtins.h"
+#include "utils/catcache.h"
+#include "utils/inval.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/snapshot.h"
+#include "utils/syscache.h"
+#include "utils/tqual.h"
+
+#include "storage/standby.h"
+
+typedef struct SnapstateTxnEnt
+{
+	TransactionId xid;
+	bool does_timetravel;
+} SnapstateTxnEnt;
+
+
+static bool
+SnapBuildHasCatalogChanges(Snapstate* snapstate, TransactionId xid, RelFileNode* relfilenode);
+
+/* transaction state manipulation functions */
+static void
+SnapBuildEndTxn(Snapstate* snapstate, TransactionId xid);
+
+static void
+SnapBuildAbortTxn(Snapstate* state, TransactionId xid, int nsubxacts,
+                  TransactionId* subxacts);
+
+static void
+SnapBuildCommitTxn(Snapstate* snapstate, TransactionId xid, int nsubxacts,
+                   TransactionId* subxacts);
+
+/* ->running manipulation */
+static bool
+SnapBuildTxnRunning(Snapstate* snapstate, TransactionId xid);
+
+static void
+SnapBuildReserveRunning(Snapstate *snapstate, Size count);
+
+static void
+SnapBuildSortRunning(Snapstate *snapstate);
+
+static void
+SnapBuildAddRunningTxn(Snapstate *snapstate, TransactionId xid);
+
+
+/* ->committed manipulation */
+static void
+SnapBuildPurgeCommittedTxn(Snapstate* snapstate);
+
+static void
+SnapBuildCommitTxn(Snapstate* snapstate, TransactionId xid, int nsubxacts,
+                   TransactionId* subxacts);
+
+
+/* snapshot building/manipulation/distribution functions */
+static void
+SnapBuildDistributeSnapshotNow(Snapstate* snapstate, TransactionId xid);
+
+static Snapshot
+SnapBuildBuildSnapshot(Snapstate *snapstate, TransactionId xid);
+
+
+HeapTuple
+LookupTableByRelFileNode(RelFileNode* relfilenode)
+{
+	Oid spc;
+
+	InvalidateSystemCaches();
+
+	/*
+	 * relations in the default tablespace are stored with a reltablespace = 0
+	 * for some reason.
+	 */
+	spc = relfilenode->spcNode == DEFAULTTABLESPACE_OID ?
+		0 : relfilenode->spcNode;
+
+	return SearchSysCacheCopy2(RELFILENODE,
+	                           spc,
+	                           relfilenode->relNode);
+}
+
+Snapstate*
+AllocateSnapshotBuilder(ApplyCache *applycache)
+{
+	Snapstate *snapstate = malloc(sizeof(Snapstate));
+	HASHCTL hash_ctl;
+
+	snapstate->state = SNAPBUILD_START;
+	snapstate->valid_after = InvalidTransactionId;
+
+	snapstate->nrrunning = 0;
+	snapstate->nrrunning_initial = 0;
+	snapstate->nrrunning_space = 0;
+	snapstate->running = NULL;
+
+	snapstate->nrcommitted = 0;
+	snapstate->nrcommitted_space = 128;
+	snapstate->committed = malloc(snapstate->nrcommitted_space * sizeof(TransactionId));
+	if (!snapstate->committed)
+		elog(ERROR, "could not allocate memory for snapstate->committed");
+
+	memset(&hash_ctl, 0, sizeof(hash_ctl));
+	hash_ctl.keysize = sizeof(TransactionId);
+	hash_ctl.entrysize = sizeof(SnapstateTxnEnt);
+	hash_ctl.hash = tag_hash;
+	hash_ctl.hcxt = TopMemoryContext;
+
+	snapstate->by_txn = hash_create("SnapstateByXid", 1000, &hash_ctl,
+	                            HASH_ELEM | HASH_FUNCTION);
+
+	elog(LOG, "allocating snapshotbuilder");
+	return snapstate;
+}
+
+void
+FreeSnapshotBuilder(Snapstate* snapstate)
+{
+	hash_destroy(snapstate->by_txn);
+	free(snapstate);
+}
+
+SnapBuildAction
+SnapBuildCallback(ApplyCache *applycache, Snapstate* snapstate, XLogRecordBuffer* buf)
+{
+	XLogRecord* r = &buf->record;
+	uint8 info = r->xl_info & ~XLR_INFO_MASK;
+	TransactionId xid = buf->record.xl_xid;
+
+	/*  relfilenode with the table changes have happened in */
+	bool found_changes = false;
+
+	RelFileNode *relfilenode;
+	SnapBuildAction ret = SNAPBUILD_SKIP;
+
+	{
+		StringInfoData s;
+
+		initStringInfo(&s);
+		RmgrTable[r->xl_rmid].rm_desc(&s,
+		                              r->xl_info,
+		                              buf->record_data);
+
+		/* don't bother emitting empty description */
+		if (s.len > 0)
+			elog(LOG,"xlog redo %u: %s", xid, s.data);
+	}
+
+	if (snapstate->state <= SNAPBUILD_FULL_SNAPSHOT)
+	{
+		if (r->xl_rmid == RM_STANDBY_ID &&
+		   info == XLOG_RUNNING_XACTS)
+		{
+			xl_running_xacts *running = (xl_running_xacts*)buf->record_data;
+
+			if (!running->subxid_overflow)
+			{
+				snapstate->state = SNAPBUILD_FULL_SNAPSHOT;
+
+
+				snapstate->xmin = running->oldestRunningXid;
+				TransactionIdRetreat(snapstate->xmin);
+				snapstate->xmax = running->latestCompletedXid;
+
+				snapstate->nrrunning = running->xcnt;
+				snapstate->nrrunning_initial = running->xcnt;
+				snapstate->nrrunning_space = running->xcnt;
+
+				SnapBuildReserveRunning(snapstate, snapstate->nrrunning_space);
+
+				memcpy(snapstate->running, running->xids,
+				       snapstate->nrrunning_initial * sizeof(TransactionId));
+
+				/* sort so we can do a binary search */
+				SnapBuildSortRunning(snapstate);
+
+				if (running->xcnt)
+				{
+					snapstate->xmin_running = snapstate->running[0];
+					snapstate->xmax_running = snapstate->running[running->xcnt - 1];
+				}
+				else
+				{
+					snapstate->xmin_running = InvalidTransactionId;
+					snapstate->xmax_running = InvalidTransactionId;
+					/* FIXME: abort everything considered running */
+					snapstate->state = SNAPBUILD_CONSISTENT;
+				}
+				elog(LOG, "built initial snapshot (via running xacts). Done: %i",
+				     snapstate->state == SNAPBUILD_CONSISTENT);
+			}
+			else if (TransactionIdIsValid(snapstate->valid_after))
+			{
+				if (NormalTransactionIdPrecedes(snapstate->valid_after, running->oldestRunningXid))
+				{
+					snapstate->state = SNAPBUILD_FULL_SNAPSHOT;
+					snapstate->xmin_running = InvalidTransactionId;
+					snapstate->xmax_running = InvalidTransactionId;
+					/* FIXME: copy all transactions we have seen starting to ->running */
+				}
+			}
+			else
+			{
+				snapstate->state = SNAPBUILD_INITIAL_POINT;
+
+				snapstate->valid_after = running->nextXid;
+				elog(INFO, "starting to build snapshot, valid_after xid: %u",
+				     snapstate->valid_after);
+			}
+		}
+		/* we know nothing has been in progress at this point... */
+		else if (r->xl_rmid == RM_XLOG_ID &&
+		        info == XLOG_CHECKPOINT_SHUTDOWN)
+		{
+			CheckPoint* checkpoint = (CheckPoint*)buf->record_data;
+
+			snapstate->xmin = checkpoint->nextXid;
+			snapstate->xmax = checkpoint->nextXid;
+
+			snapstate->nrrunning = 0;
+			snapstate->nrrunning_initial = 0;
+			snapstate->nrrunning_space = 0;
+			free(snapstate->running);
+			snapstate->running = NULL;
+
+			snapstate->state = SNAPBUILD_CONSISTENT;
+
+			elog(LOG, "built initial snapshot (via shutdown)!!!!");
+			/*FIXME: cleanup state */
+		}
+		else if(r->xl_rmid == RM_XLOG_ID &&
+		        info == XLOG_CHECKPOINT_ONLINE)
+		{
+			/* FIXME: Check whether there is a valid state dumped to disk */
+		}
+	}
+
+	if (snapstate->state == SNAPBUILD_START)
+		return SNAPBUILD_SKIP;
+
+	switch (r->xl_rmid)
+	{
+		case RM_XLOG_ID:
+		{
+			switch (info)
+			{
+				case XLOG_CHECKPOINT_SHUTDOWN:
+				{
+					CheckPoint* checkpoint = (CheckPoint*)buf->record_data;
+
+					/*
+					 * we know nothing can be running anymore, normal
+					 * transaction state is sufficient
+					 */
+
+					/* no need to have any transaction state anymore */
+#ifdef NOT_YES
+					for (/*FIXME*/)
+					{
+						SnapBuildAbortTxn(snapstate, xid);
+					}
+#endif
+					snapstate->xmin = checkpoint->nextXid;
+					TransactionIdRetreat(snapstate->xmin);
+					snapstate->xmax = checkpoint->nextXid;
+
+					free(snapstate->running);
+					snapstate->running = NULL;
+					snapstate->nrrunning = 0;
+					snapstate->nrrunning_initial = 0;
+					snapstate->nrrunning_space = 0;
+
+					/*FIXME: cleanup state */
+
+
+					ret = SNAPBUILD_DECODE;
+
+					break;
+				}
+				case XLOG_CHECKPOINT_ONLINE:
+				{
+					/* FIXME: dump state to disk so we can restart from here later */
+					break;
+				}
+			}
+			break;
+		}
+		case RM_STANDBY_ID:
+		{
+			switch (info)
+			{
+				case XLOG_RUNNING_XACTS:
+				{
+					xl_running_xacts *running = (xl_running_xacts*)buf->record_data;
+					snapstate->xmin = running->oldestRunningXid;
+					TransactionIdRetreat(snapstate->xmin);
+					snapstate->xmax = running->latestCompletedXid;
+					TransactionIdAdvance(snapstate->xmax);
+
+					SnapBuildPurgeCommittedTxn(snapstate);
+
+					break;
+				}
+				case XLOG_STANDBY_LOCK:
+					break;
+			}
+			break;
+		}
+		case RM_XACT_ID:
+		{
+			switch (info)
+			{
+				case XLOG_XACT_COMMIT:
+				{
+					xl_xact_commit* xlrec =
+						(xl_xact_commit*)buf->record_data;
+
+					SnapBuildCommitTxn(snapstate, xid, xlrec->nsubxacts,
+					                   (TransactionId*)&xlrec->xnodes);
+					ret = SNAPBUILD_DECODE;
+
+					break;
+				}
+				case XLOG_XACT_COMMIT_COMPACT:
+				{
+					xl_xact_commit_compact* xlrec =
+						(xl_xact_commit_compact*)buf->record_data;
+
+					SnapBuildCommitTxn(snapstate, xid, xlrec->nsubxacts,
+					                   xlrec->subxacts);
+					ret = SNAPBUILD_DECODE;
+					break;
+				}
+				case XLOG_XACT_COMMIT_PREPARED:
+				{
+					xl_xact_commit_prepared* xlrec =
+						(xl_xact_commit_prepared*)buf->record_data;
+
+					SnapBuildCommitTxn(snapstate, xid, xlrec->crec.nsubxacts,
+					                   (TransactionId*)&xlrec->crec.xnodes);
+					ret = SNAPBUILD_DECODE;
+					break;
+				}
+				case XLOG_XACT_ABORT:
+				{
+					xl_xact_abort* xlrec =
+						(xl_xact_abort*)buf->record_data;
+
+					SnapBuildAbortTxn(snapstate, xid, xlrec->nsubxacts,
+					                  (TransactionId*)&xlrec->xnodes);
+					ret = SNAPBUILD_DECODE;
+
+				}
+				case XLOG_XACT_ABORT_PREPARED:
+				{
+					xl_xact_abort_prepared* xlrec =
+						(xl_xact_abort_prepared*)buf->record_data;
+
+					SnapBuildAbortTxn(snapstate, xid, xlrec->arec.nsubxacts,
+					                  (TransactionId*)&xlrec->arec.xnodes);
+					ret = SNAPBUILD_DECODE;
+				}
+				case XLOG_XACT_ASSIGNMENT:
+				case XLOG_XACT_PREPARE: /* boring? */
+				default:
+					break;
+					;
+			}
+			break;
+		}
+		case RM_HEAP_ID:
+		{
+			switch (info & XLOG_HEAP_OPMASK)
+			{
+				/* XXX: this only happens for "irrelevant" changes? Ignore for now */
+				case XLOG_HEAP_INPLACE:
+				{
+					xl_heap_inplace *xlrec = (xl_heap_inplace*)buf->record_data;
+					relfilenode = &xlrec->target.node;
+					found_changes = false; /* <----- LOOK */
+					break;
+				}
+				/*
+				 * we only ever read changes, so row level locks aren't
+				 * interesting
+				 */
+				case XLOG_HEAP_LOCK:
+					break;
+
+				case XLOG_HEAP_INSERT:
+				{
+					xl_heap_insert *xlrec = (xl_heap_insert*)buf->record_data;
+					relfilenode = &xlrec->target.node;
+					found_changes = true;
+					break;
+				}
+				case XLOG_HEAP_UPDATE:
+				case XLOG_HEAP_HOT_UPDATE:
+				{
+					xl_heap_update *xlrec = (xl_heap_update*)buf->record_data;
+					relfilenode = &xlrec->target.node;
+					found_changes = true;
+					break;
+				}
+				case XLOG_HEAP_DELETE:
+				{
+					xl_heap_delete *xlrec = (xl_heap_delete*)buf->record_data;
+					relfilenode = &xlrec->target.node;
+					found_changes = true;
+					break;
+				}
+				default:
+					;
+			}
+			break;
+		}
+		case RM_HEAP2_ID:
+		{
+			/* some HEAP2 things don't necessarily happen in a transaction? */
+			if (!TransactionIdIsValid(xid))
+				break;
+
+			switch (info)
+			{
+				case XLOG_HEAP2_MULTI_INSERT:
+				{
+					xl_heap_multi_insert *xlrec =
+						(xl_heap_multi_insert*)buf->record_data;
+
+					relfilenode = &xlrec->node;
+
+					found_changes = true;
+
+					/*
+					 * we only decode the first tuple as all the following ones
+					 * will have the same cmin (and no cmax)
+					 */
+					break;
+				}
+				default:
+					;
+			}
+		}
+		break;
+	}
+
+
+
+	if (found_changes)
+	{
+		/*
+		 * we unfortunately cannot access the catalog of other databases, so
+		 * don't think about changes in them
+		 */
+		if (relfilenode->dbNode != MyDatabaseId)
+			;
+		/*
+		 * we need to keep track of new transactions while we didn't know what
+		 * was already running. Only actual data changes are relevant, so its
+		 * fine to track them here.
+		 */
+		else if (snapstate->state < SNAPBUILD_FULL_SNAPSHOT)
+			SnapBuildAddRunningTxn(snapstate, xid);
+		/*
+		 * No point in keeping track of changes in transactions that we don't
+		 * have enough information about to decode.
+		 */
+		else if (snapstate->state < SNAPBUILD_CONSISTENT &&
+		         SnapBuildTxnRunning(snapstate, xid))
+			;
+		else
+		{
+			bool does_timetravel;
+			bool old_tx = ApplyCacheIsXidKnown(applycache, xid);
+			bool found;
+			SnapstateTxnEnt *ent;
+
+			Assert(TransactionIdIsNormal(xid));
+			Assert(!SnapBuildTxnRunning(snapstate, xid));
+
+
+
+			ent = hash_search(snapstate->by_txn,
+			                  (void *)&xid,
+			                  HASH_FIND,
+			                  &found);
+
+			/* FIXME: For now skip transactions with catalog changes entirely */
+			if (ent && ent->does_timetravel)
+				does_timetravel = true;
+			else
+				does_timetravel = SnapBuildHasCatalogChanges(snapstate, xid, relfilenode);
+
+			/*
+			 * we don't add catalog changes to the applycache, we could use
+			 * them to queue local cache inval messages for catalog tables if
+			 * the relmapper would map from relfilenode to relid with correct
+			 * visibility rules.
+			 */
+			if (!does_timetravel)
+				ret = SNAPBUILD_DECODE;
+
+			elog(LOG, "found changes in xid %u (known: %u), timetravel: %i",
+			     xid, old_tx, does_timetravel);
+
+			/*
+			 * FIXME: At this point we have might have a problem if somebody
+			 * would CLUSTER, REINDEX or similar a system table inside a
+			 * transaction and *also* does other catalog modifications because
+			 * we can only build proper snapshots to look at the catalog after
+			 * we have reached the commit record because only then we know the
+			 * subxids of a toplevel txid. Because we wouldn't notice the
+			 * changed system table relfilenodes we wouldn't see the any of
+			 * those catalog changes.
+			 *
+			 * So we need to forbid that.
+			 */
+
+			if (!old_tx)
+			{
+				/* update global snapshot information */
+				if (does_timetravel)
+				{
+					ent = hash_search(snapstate->by_txn,
+					                  (void *)&xid,
+					                  HASH_FIND|HASH_ENTER,
+					                  &found);
+
+					elog(LOG, "found catalog change in tx %u without changes, did we know it: %u",
+					     xid, found);
+
+					ent->does_timetravel = true;
+
+				}
+				else
+				{
+					elog(LOG, "adding initial snapshot to xid %u", xid);
+				}
+
+				/* add initial snapshot*/
+				{
+					Snapshot snap = SnapBuildBuildSnapshot(snapstate, xid);
+
+					elog(LOG, "adding base snap");
+					ApplyCacheAddBaseSnapshot(applycache, xid,
+					                          InvalidXLogRecPtr,
+					                          snap);
+				}
+
+			}
+			/* update already distributed snapshots */
+			else if (does_timetravel && old_tx)
+			{
+				/*
+				 * check whether we already know the xid as a catalog modifying
+				 * one
+				 */
+				SnapstateTxnEnt *ent =
+					hash_search(snapstate->by_txn,
+					            (void *)&xid,
+					            HASH_FIND|HASH_ENTER,
+				            &found);
+
+				elog(LOG, "found catalog change in tx %u with changes, did we know it: %u",
+				     xid, found);
+
+				ent->does_timetravel = true;
+
+				/* FIXME: add a new CommandId to the applycache's ->changes queue */
+			}
+		}
+	}
+
+	return ret;
+}
+
+
+/* Does this relation carry catalog information */
+static bool
+SnapBuildHasCatalogChanges(Snapstate* snapstate, TransactionId xid, RelFileNode* relfilenode)
+{
+	/* FIXME: build snapshot for transaction */
+	HeapTuple table;
+	Form_pg_class class_form;
+
+	Snapshot snap = SnapBuildBuildSnapshot(snapstate, xid);
+
+	if (relfilenode->spcNode == GLOBALTABLESPACE_OID)
+		return true;
+
+	SetupDecodingSnapshots(snap);
+
+	InvalidateSystemCaches();
+
+	table = LookupTableByRelFileNode(relfilenode);
+
+	RevertFromDecodingSnapshots();
+	InvalidateSystemCaches();
+
+	/*
+	 * tables in the default tablespace are stored in pg_class with 0 as their
+	 * reltablespace
+	 */
+	if (!HeapTupleIsValid(table))
+	{
+		if (relfilenode->relNode >= FirstNormalObjectId)
+		{
+			elog(WARNING, "failed pg_class lookup for %u:%u with a oid in >= FirstNormalObjectId",
+			     relfilenode->spcNode, relfilenode->relNode);
+		}
+		return true;
+	}
+
+	class_form = (Form_pg_class) GETSTRUCT(table);
+
+	return IsSystemClass(class_form);
+}
+
+/* build a new snapshot, based on currently committed transactions */
+static Snapshot
+SnapBuildBuildSnapshot(Snapstate *snapstate, TransactionId xid)
+{
+	Snapshot snapshot = malloc(sizeof(SnapshotData) +
+	                           sizeof(TransactionId) * snapstate->nrcommitted +
+	                           sizeof(TransactionId) * 1 /* toplevel xid */);
+
+	snapshot->satisfies = HeapTupleSatisfiesMVCCDuringDecoding;
+	/*
+	 * we copy all currently in progress transaction to ->xip, all transactions
+	 * added to the transaction that committed during running - which thus need
+	 * to be considered visible in SnapshotNow semantics - get copied to
+	 * ->subxip.
+	 * XXX: Do we want extra fileds for those two instead?
+	 */
+	snapshot->xmin = snapstate->xmin;
+	snapshot->xmax = snapstate->xmax;
+
+	/* store all transaction to be treated as committed */
+	snapshot->xip = (TransactionId*)((char*)snapshot + sizeof(SnapshotData));
+
+	snapshot->xcnt = snapstate->nrcommitted;
+	memcpy(snapshot->xip, snapstate->committed,
+	       snapstate->nrcommitted * sizeof(TransactionId));
+
+	/* sort so we can bsearch() */
+	qsort(snapshot->xip, snapshot->xcnt, sizeof(TransactionId), xidComparator);
+
+	/* store toplevel xid */
+	/*
+	 * FIXME: subtransaction handling currently needs to be done in
+	 * applycache. Yuck.
+	 */
+	snapshot->subxip = (TransactionId*)(
+		(char*)snapshot
+		+ sizeof(SnapshotData) /* offset to ->xip's data */
+		+ sizeof(TransactionId) * snapstate->nrcommitted /* data */
+		);
+
+	snapshot->subxcnt = 1;
+	snapshot->subxip[0] = xid;
+
+	snapshot->suboverflowed = false;
+	snapshot->takenDuringRecovery = false;
+	snapshot->copied = false;
+	snapshot->curcid = 0;
+	snapshot->active_count = 0;
+	snapshot->regd_count = 0;
+
+	return snapshot;
+}
+
+/* check whether `xid` is currently running */
+static bool
+SnapBuildTxnRunning(Snapstate* snapstate, TransactionId xid)
+{
+	if (snapstate->nrrunning &&
+	    NormalTransactionIdFollows(xid, snapstate->xmin_running) &&
+	    NormalTransactionIdPrecedes(xid, snapstate->xmax_running))
+	{
+		TransactionId* xid =
+			bsearch(&xid, snapstate->running, snapstate->nrrunning_initial,
+			        sizeof(TransactionId), xidComparator);
+
+		if (xid != NULL)
+			return true;
+	}
+
+	return false;
+}
+
+/*
+ * add a new SnapshotNow to all transactions were decoding that are currently
+ * in-progress so they can see new catalog contents.
+ */
+static void
+SnapBuildDistributeSnapshotNow(Snapstate* snapstate, TransactionId xid)
+{
+	/* FIXME: implement */
+}
+
+/*
+ * Keep track of a new catalog changing transaction that has committed
+ */
+static void
+SnapBuildAddCommittedTxn(Snapstate* snapstate, TransactionId xid)
+{
+	if (snapstate->nrcommitted == snapstate->nrcommitted_space)
+	{
+		elog(WARNING, "increasing space for committed transactions");
+
+		snapstate->nrcommitted_space *= 2;
+		snapstate->committed = realloc(snapstate->committed,
+		                               snapstate->nrcommitted_space * sizeof(TransactionId));
+		if (!snapstate->committed)
+			elog(ERROR, "couldn't enlarge space for committed transactions");
+	}
+	snapstate->committed[snapstate->nrcommitted++] = xid;
+}
+
+/*
+ * Remove all transactions we treat as committed that are smaller than
+ * ->xmin. Those won't ever get checked via the ->commited array anyway.
+ */
+static void
+SnapBuildPurgeCommittedTxn(Snapstate* snapstate)
+{
+	int off;
+	TransactionId *workspace;
+	int surviving_xids = 0;
+
+	/* FIXME: Neater algorithm? */
+	workspace = malloc(snapstate->nrcommitted * sizeof(TransactionId));
+
+	if (!workspace)
+		elog(ERROR, "could not allocate memory for workspace during xmin purging");
+
+	for (off = 0; off < snapstate->nrcommitted; off++)
+	{
+		if (snapstate->committed[off] > snapstate->xmin)
+			workspace[surviving_xids++] = snapstate->committed[off];
+	}
+
+	memcpy(snapstate->committed, workspace,
+	       surviving_xids * sizeof(TransactionId));
+
+	snapstate->nrcommitted = surviving_xids;
+	free(workspace);
+}
+
+/*
+ * makes sure we have enough space for at least `count` additional txn's,
+ * reallocates if necessary
+ */
+static void
+SnapBuildReserveRunning(Snapstate *snapstate, Size count)
+{
+	const Size reserve = 100;
+
+	if (snapstate->nrrunning_initial + count < snapstate->nrrunning_space)
+		return;
+
+	if (snapstate->running)
+	{
+		snapstate->nrrunning_space += count + reserve;
+		snapstate->running =
+			realloc(snapstate->running,
+			        snapstate->nrrunning_space *
+			        sizeof(TransactionId));
+		if (!snapstate->running)
+			elog(ERROR, "could not reallocate ->running");
+	}
+	else
+	{
+		snapstate->nrrunning_space = count + reserve;
+		snapstate->running = malloc(snapstate->nrrunning_space
+		                            * sizeof(TransactionId));
+	}
+}
+
+/*
+ * To allow binary search in the set of running transactions, sort them with
+ * xidComparator.
+ */
+static void
+SnapBuildSortRunning(Snapstate *snapstate)
+{
+	qsort(snapstate->running, snapstate->nrrunning_initial,
+	      sizeof(TransactionId), xidComparator);
+}
+
+/*
+ * Add transaction to the set of currently runnign transactions.
+ */
+static void
+SnapBuildAddRunningTxn(Snapstate *snapstate, TransactionId xid)
+{
+	Assert(snapstate->state == SNAPBUILD_INITIAL_POINT &&
+	       TransactionIdIsValid(snapstate->valid_after));
+
+	/*
+	 * we only need those running txn's if were switching state due to reaching
+	 * the xmin horizon. Transactions before we reached that are not
+	 * interesting.
+	 */
+	if (NormalTransactionIdPrecedes(xid, snapstate->valid_after) )
+		return;
+
+	if (SnapBuildTxnRunning(snapstate, xid))
+		return;
+
+	Assert(!TransactionIdPrecedesOrEquals(xid, snapstate->xmin_running));
+
+	if (TransactionIdFollowsOrEquals(xid, snapstate->xmax_running))
+		snapstate->xmax_running = xid;
+
+	SnapBuildReserveRunning(snapstate, 1);
+
+	/* FIXME: inefficient insertion logic, should at least be insertion sort */
+	snapstate->running[snapstate->nrrunning_initial++] = xid;
+	snapstate->nrrunning++;
+	SnapBuildSortRunning(snapstate);
+}
+
+/*
+ * Common logic for SnapBuildAbortTxn and SnapBuildCommitTxn dealing with
+ * keeping track of the amount of running transactions.
+ */
+static void
+SnapBuildEndTxn(Snapstate* snapstate, TransactionId xid)
+{
+	if (snapstate->state == SNAPBUILD_CONSISTENT)
+		return;
+
+	if (SnapBuildTxnRunning(snapstate, xid))
+	{
+		if (!--snapstate->nrrunning)
+		{
+			/*
+			 * none of the originally running transaction is running
+			 * anymore. Due to that our incrementaly built snapshot now is
+			 * complete.
+			 */
+			elog(LOG, "found consistent point due to SnapBuildEndTxn + running: %u", xid);
+			snapstate->state = SNAPBUILD_CONSISTENT;
+		}
+	}
+}
+
+/* Abort a transaction, throw away all state we kept */
+static void
+SnapBuildAbortTxn(Snapstate* snapstate, TransactionId xid, int nsubxacts, TransactionId* subxacts)
+{
+	bool found;
+	int i;
+
+	for(i = 0; i < nsubxacts; i++)
+	{
+		TransactionId subxid = subxacts[i];
+		SnapBuildEndTxn(snapstate, subxid);
+
+		hash_search(snapstate->by_txn,
+		            (void *)&subxid,
+		            HASH_REMOVE,
+		            &found);
+
+	}
+
+	SnapBuildEndTxn(snapstate, xid);
+
+	hash_search(snapstate->by_txn,
+	            (void *)&xid,
+	            HASH_REMOVE,
+	            &found);
+}
+
+/* Handle everything that needs to be done when a transaction commits */
+static void
+SnapBuildCommitTxn(Snapstate* snapstate, TransactionId xid, int nsubxacts,
+                   TransactionId* subxacts)
+{
+	int off;
+	bool found;
+	bool forced_timetravel = false;
+	bool sub_does_timetravel = false;
+	SnapstateTxnEnt *ent;
+
+	/*
+	 * If we couldn't observe every change of a transaction because it was
+	 * already running at the point we started to observe we have to assume it
+	 * made catalog changes.
+	 */
+	if (snapstate->state < SNAPBUILD_CONSISTENT && SnapBuildTxnRunning(snapstate, xid))
+	{
+		elog(LOG, "forced to assume catalog changes for xid %u because it was running to early", xid);
+		SnapBuildAddCommittedTxn(snapstate, xid);
+		forced_timetravel = true;
+	}
+
+	for(off = 0; off < nsubxacts; off++)
+	{
+		TransactionId subxid = subxacts[off];
+
+		SnapBuildEndTxn(snapstate, subxid);
+
+		ent = hash_search(snapstate->by_txn,
+		                  (void *)&subxid,
+		                  HASH_FIND,
+		                  &found);
+
+		if (forced_timetravel)
+		{
+			SnapBuildAddCommittedTxn(snapstate, subxid);
+		}
+		/* add subtransaction to base snapshot, we don't distinguish after that */
+		else if (found && ent->does_timetravel)
+		{
+			sub_does_timetravel = true;
+
+			elog(WARNING, "found subtransaction %u:%u with catalog changes",
+			     xid, subxid);
+
+			SnapBuildAddCommittedTxn(snapstate, subxid);
+		}
+
+		/* make sure its not tracked in running txn's anymore, switch state */
+		SnapBuildEndTxn(snapstate, subxid);
+
+		if (found)
+		{
+			hash_search(snapstate->by_txn,
+			            (void *)&xid,
+			            HASH_REMOVE,
+			            &found);
+			Assert(found);
+		}
+
+		if (NormalTransactionIdFollows(subxid, snapstate->xmax))
+		{
+			snapstate->xmax = subxid;
+			TransactionIdAdvance(snapstate->xmax);
+		}
+	}
+
+	/* make sure its not tracked in running txn's anymore, switch state */
+	SnapBuildEndTxn(snapstate, xid);
+
+	ent =
+		hash_search(snapstate->by_txn,
+		            (void *)&xid,
+		            HASH_FIND,
+		            &found);
+
+	/* add toplevel transaction to base snapshot */
+	if (found && ent->does_timetravel)
+	{
+		elog(DEBUG1, "found top level transaction %u, with catalog changes !!!!", xid);
+		SnapBuildAddCommittedTxn(snapstate, xid);
+	}
+
+	if ((found && ent->does_timetravel) || sub_does_timetravel || forced_timetravel)
+	{
+		elog(DEBUG1, "found transaction %u, with catalog changes !!!!", xid);
+
+		/* add a new SnapshotNow to all currently running transactions */
+		SnapBuildDistributeSnapshotNow(snapstate, xid);
+	}
+
+	if (found)
+	{
+		/* now we don't need the contents anymore, remove */
+		hash_search(snapstate->by_txn,
+		            (void *)&xid,
+		            HASH_REMOVE,
+		            &found);
+		Assert(found);
+	}
+
+	if (NormalTransactionIdFollows(xid, snapstate->xmax))
+	{
+		snapstate->xmax = xid;
+		TransactionIdAdvance(snapstate->xmax);
+	}
+}
diff --git a/src/backend/utils/time/tqual.c b/src/backend/utils/time/tqual.c
index b531db5..25af26a 100644
--- a/src/backend/utils/time/tqual.c
+++ b/src/backend/utils/time/tqual.c
@@ -65,6 +65,7 @@
 #include "storage/bufmgr.h"
 #include "storage/procarray.h"
 #include "utils/tqual.h"
+#include "utils/builtins.h"
 
 
 /* Static variables representing various special snapshot semantics */
@@ -73,6 +74,8 @@ SnapshotData SnapshotSelfData = {HeapTupleSatisfiesSelf};
 SnapshotData SnapshotAnyData = {HeapTupleSatisfiesAny};
 SnapshotData SnapshotToastData = {HeapTupleSatisfiesToast};
 
+static Snapshot SnapshotNowDecoding;
+
 /* local functions */
 static bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot);
 
@@ -1375,3 +1378,161 @@ XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
 
 	return false;
 }
+
+static bool
+TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
+{
+	return bsearch(&xid, xip, num,
+	               sizeof(TransactionId), xidComparator) != NULL;
+}
+
+
+/*
+ * See the comments for HeapTupleSatisfiesMVCC for the semantics this function
+ * obeys.
+ *
+ * Only usable on tuples from catalog tables!
+ *
+ * We don't need to support HEAP_MOVED_(IN|OFF) for now because we only support
+ * reading catalog pages which couldn't have been created in an older version.
+ *
+ * Basically we record all transactions that are in progress when the
+ * transaction starts and treat them as in-progress for the duration of the
+ * snapshot, everything below xmin is comitted, everything above xmax is
+ * in-progress, and everything thats not in our in-progress array is committed
+ * as well.
+ */
+bool
+HeapTupleSatisfiesMVCCDuringDecoding(HeapTupleHeader tuple, Snapshot snapshot,
+                                     Buffer buffer)
+{
+	TransactionId xmin = HeapTupleHeaderGetXmin(tuple);
+	TransactionId xmax = HeapTupleHeaderGetXmax(tuple);
+
+	/*
+	 * FIXME: The not yet existing decoding infrastructure will need to force
+	 * the xmin to stay lower than what they are currently decoding.
+	 */
+	bool fixme_xmin_horizon = false;
+
+	if (fixme_xmin_horizon && tuple->t_infomask & HEAP_XMIN_INVALID)
+	{
+		return false;
+	}
+	/* normal transaction state counts */
+	else if (TransactionIdPrecedes(xmin, snapshot->xmin))
+	{
+		if (!TransactionIdDidCommit(xmin))
+			return false;
+	}
+	/* beyond our xmax horizon, i.e. invisible */
+	else if (TransactionIdFollows(xmin, snapshot->xmax))
+	{
+		return false;
+	}
+    /* check if its one of our txids, toplevel is also in there */
+	else if (TransactionIdInArray(xmin, snapshot->subxip, snapshot->subxcnt))
+	{
+		CommandId cmin = HeapTupleHeaderGetRawCommandId(tuple);
+		/* no support for that yet */
+		if (tuple->t_infomask & HEAP_COMBOCID){
+			elog(WARNING, "combocids not yet supported");
+			return false;
+		}
+		if (cmin >= snapshot->curcid)
+			return false;	/* inserted after scan started */
+	}
+	/* check if we know the transaction has committed */
+	else if(TransactionIdInArray(xmin, snapshot->xip, snapshot->xcnt))
+	{
+	}
+	else
+	{
+		return false;
+	}
+
+	/* at this point we know xmin is visible */
+
+	/* why should those be in catalog tables? */
+	Assert(!(tuple->t_infomask & HEAP_XMAX_IS_MULTI));
+
+	if (tuple->t_infomask & HEAP_XMAX_INVALID)	/* xid invalid or aborted */
+		return true;
+
+	if (tuple->t_infomask & HEAP_IS_LOCKED)
+		return true;
+
+	/* we cannot possibly see the deleting transaction */
+	if (TransactionIdFollows(xmax, snapshot->xmax))
+	{
+		return true;
+	}
+	/* normal transaction state is valid */
+	else if (TransactionIdPrecedes(xmax, snapshot->xmin))
+	{
+		return !TransactionIdDidCommit(xmax);
+	}
+    /* check if its one of our txids, toplevel is also in there */
+	else if (TransactionIdInArray(xmax, snapshot->subxip, snapshot->subxcnt))
+	{
+		CommandId cmax = HeapTupleHeaderGetRawCommandId(tuple);
+		/* no support for that yet */
+		if (tuple->t_infomask & HEAP_COMBOCID){
+			elog(WARNING, "combocids not yet supported");
+			return true;
+		}
+
+		if (cmax >= snapshot->curcid)
+			return true;	/* deleted after scan started */
+		else
+			return false;	/* deleted before scan started */
+	}
+	/* do we know that the deleting txn is valid? */
+	else if (TransactionIdInArray(xmax, snapshot->xip, snapshot->xcnt))
+	{
+		return false;
+	}
+	else
+	{
+		return true;
+	}
+}
+
+static bool
+FailsSatisfies(HeapTupleHeader tuple, Snapshot snapshot, Buffer buffer)
+{
+	elog(ERROR, "should not be called after SetupDecodingSnapshots!");
+	return false;
+}
+
+static bool
+RedirectSatisfiesNow(HeapTupleHeader tuple, Snapshot snapshot, Buffer buffer)
+{
+	Assert(SnapshotNowDecoding != NULL);
+	return HeapTupleSatisfiesMVCCDuringDecoding(tuple, SnapshotNowDecoding,
+	                                            buffer);
+}
+
+void
+SetupDecodingSnapshots(Snapshot snapshot_now)
+{
+	SnapshotNowData.satisfies = RedirectSatisfiesNow;
+	SnapshotSelfData.satisfies = FailsSatisfies;
+	SnapshotAnyData.satisfies = FailsSatisfies;
+	SnapshotToastData.satisfies = FailsSatisfies;
+
+	SnapshotNowDecoding = snapshot_now;
+}
+
+
+void
+RevertFromDecodingSnapshots(void)
+{
+	SnapshotNowDecoding = NULL;
+
+	SnapshotNowData.satisfies = HeapTupleSatisfiesNow;
+	SnapshotSelfData.satisfies = HeapTupleSatisfiesSelf;
+	SnapshotAnyData.satisfies = HeapTupleSatisfiesAny;
+	SnapshotToastData.satisfies = HeapTupleSatisfiesToast;
+
+}
diff --git a/src/include/access/transam.h b/src/include/access/transam.h
index 228f6a1..915b2cd 100644
--- a/src/include/access/transam.h
+++ b/src/include/access/transam.h
@@ -63,6 +63,11 @@
 	(AssertMacro(TransactionIdIsNormal(id1) && TransactionIdIsNormal(id2)), \
 	(int32) ((id1) - (id2)) < 0)
 
+/* compare two XIDs already known to be normal; this is a macro for speed */
+#define NormalTransactionIdFollows(id1, id2) \
+	(AssertMacro(TransactionIdIsNormal(id1) && TransactionIdIsNormal(id2)), \
+	(int32) ((id1) - (id2)) > 0)
+
 /* ----------
  *		Object ID (OID) zero is InvalidOid.
  *
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index d88248a..b5b886b 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -4655,6 +4655,9 @@ DESCR("SP-GiST support for suffix tree over text");
 DATA(insert OID = 4031 (  spg_text_leaf_consistent	PGNSP PGUID 12 1 0 0 0 f f f f t f i 2 0 16 "2281 2281" _null_ _null_ _null_ _null_  spg_text_leaf_consistent _null_ _null_ _null_ ));
 DESCR("SP-GiST support for suffix tree over text");
 
+DATA(insert OID = 4033 (  decode_xlog	PGNSP PGUID 12 1  0 0 0 f f f f t f i 2 0 16 "2275 2275" _null_ _null_ _null_ _null_ decode_xlog _null_ _null_ _null_ ));
+DESCR("decode xlog");
+
 DATA(insert OID = 3469 (  spg_range_quad_config	PGNSP PGUID 12 1 0 0 0 f f f f t f i 2 0 2278 "2281 2281" _null_ _null_ _null_ _null_  spg_range_quad_config _null_ _null_ _null_ ));
 DESCR("SP-GiST support for quad tree over range");
 DATA(insert OID = 3470 (  spg_range_quad_choose	PGNSP PGUID 12 1 0 0 0 f f f f t f i 2 0 2278 "2281 2281" _null_ _null_ _null_ _null_  spg_range_quad_choose _null_ _null_ _null_ ));
diff --git a/src/include/replication/applycache.h b/src/include/replication/applycache.h
new file mode 100644
index 0000000..f101eeb
--- /dev/null
+++ b/src/include/replication/applycache.h
@@ -0,0 +1,239 @@
+/*
+ * applycache.h
+ *
+ * PostgreSQL logical replay "cache" management
+ *
+ * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/replication/applycache.h
+ */
+#ifndef APPLYCACHE_H
+#define APPLYCACHE_H
+
+#include "access/htup_details.h"
+#include "utils/hsearch.h"
+#include "utils/ilist.h"
+#include "utils/snapshot.h"
+
+typedef struct ApplyCache ApplyCache;
+
+enum ApplyCacheChangeType
+{
+	APPLY_CACHE_CHANGE_INSERT,
+	APPLY_CACHE_CHANGE_UPDATE,
+	APPLY_CACHE_CHANGE_DELETE,
+	/*
+	 * for efficiency and simplicity reasons we keep those in the same list,
+	 * thats somewhat annoying because switch()es warn if those aren't
+	 * handled... Make those private values?
+	 */
+	APPLY_CACHE_CHANGE_SNAPSHOT,
+	APPLY_CACHE_CHANGE_COMMAND_ID
+};
+
+typedef struct ApplyCacheTupleBuf
+{
+	/* position in preallocated list */
+	ilist_s_node node;
+
+	HeapTupleData tuple;
+	HeapTupleHeaderData header;
+	char data[MaxHeapTupleSize];
+} ApplyCacheTupleBuf;
+
+typedef struct ApplyCacheChange
+{
+	XLogRecPtr lsn;
+	enum ApplyCacheChangeType action;
+
+	RelFileNode relnode;
+
+	union {
+		ApplyCacheTupleBuf* newtuple;
+		Snapshot snapshot;
+		CommandId command_id;
+	};
+	ApplyCacheTupleBuf* oldtuple;
+
+
+	HeapTuple table;
+
+	/*
+	 * While in use this is how a change is linked into a transactions,
+	 * otherwise its the preallocated list.
+	*/
+	ilist_d_node node;
+} ApplyCacheChange;
+
+typedef struct ApplyCacheTXN
+{
+	TransactionId xid;
+
+	XLogRecPtr lsn;
+
+	/*
+	 * How many ApplyCacheChange's do we have in this txn.
+	 *
+	 * Subtransactions are *not* included.
+	 */
+	Size nentries;
+
+	/*
+	 * How many of the above entries are stored in memory in contrast to being
+	 * spilled to disk.
+	 */
+	Size nentries_mem;
+
+	/*
+	 * List of actual changes
+	 */
+	ilist_d_head changes;
+
+	/*
+	 * non-hierarchical list of subtransactions that are *not* aborted
+	 */
+	ilist_d_head subtxns;
+
+	/*
+	 * our position in a list of subtransactions while the TXN is in
+	 * use. Otherwise its the position in the list of preallocated
+	 * transactions.
+	 */
+	ilist_d_node node;
+
+	/*
+	 * List of (lsn, command_id).
+	 *
+	 * Everytime a catalog change happens this list gets appended with the
+	 * current commandid. This is used to be able to construct proper
+	 * Snapshot's for decoding.
+	 */
+	ilist_d_head commandids;
+
+	/*
+	 * List of (lsn, Snapshot) pairs.
+	 *
+	 * The first record always is the (InvalidXLogRecPtr, SnapshotAtStart)
+	 * pair. Everytime *another* transaction commits this gets appended with a
+	 * new Snapshot that has enough information to make SnapshotNow lookups.
+	 */
+	ilist_d_head snapshots;
+} ApplyCacheTXN;
+
+
+/* XXX: were currently passing the originating subtxn. Not sure thats necessary */
+typedef void (*ApplyCacheApplyChangeCB)(ApplyCache* cache, ApplyCacheTXN* txn, ApplyCacheTXN* subtxn, ApplyCacheChange* change);
+typedef void (*ApplyCacheBeginCB)(ApplyCache* cache, ApplyCacheTXN* txn);
+typedef void (*ApplyCacheCommitCB)(ApplyCache* cache, ApplyCacheTXN* txn);
+
+/*
+ * max number of concurrent top-level transactions or transaction where we
+ * don't know if they are top-level can be calculated by:
+ * (max_connections + max_prepared_xactx + ?)  * PGPROC_MAX_CACHED_SUBXIDS
+ */
+struct ApplyCache
+{
+	/*
+	 * Should snapshots for decoding be collected. If many catalog changes
+	 * happen this can be considerably expensive.
+	 */
+	bool build_snapshots;
+
+	TransactionId last_txn;
+	ApplyCacheTXN *last_txn_cache;
+	HTAB *by_txn;
+
+	ApplyCacheBeginCB begin;
+	ApplyCacheApplyChangeCB apply_change;
+	ApplyCacheCommitCB commit;
+
+	void* private_data;
+
+	MemoryContext context;
+
+	/*
+	 * we don't want to repeatedly (de-)allocated those structs, so cache them for reusage.
+	 */
+	ilist_d_head cached_transactions;
+	size_t nr_cached_transactions;
+
+	ilist_d_head cached_changes;
+	size_t nr_cached_changes;
+
+	ilist_s_head cached_tuplebufs;
+	size_t nr_cached_tuplebufs;
+};
+
+
+ApplyCache*
+ApplyCacheAllocate(void);
+
+void
+ApplyCacheFree(ApplyCache*);
+
+ApplyCacheTupleBuf*
+ApplyCacheGetTupleBuf(ApplyCache*);
+
+void
+ApplyCacheReturnTupleBuf(ApplyCache* cache, ApplyCacheTupleBuf* tuple);
+
+/*
+ * Returns a (potentically preallocated) change struct. Its lifetime is managed
+ * by the applycache module.
+ *
+ * If not added to a transaction with ApplyCacheAddChange it needs to be
+ * returned via ApplyCacheReturnChange
+ *
+ * FIXME: better name
+ */
+ApplyCacheChange*
+ApplyCacheGetChange(ApplyCache*);
+
+/*
+ * Return an unused ApplyCacheChange struct
+ */
+void
+ApplyCacheReturnChange(ApplyCache*, ApplyCacheChange*);
+
+
+/*
+ * record the transaction as in-progress if not already done, add the current
+ * change.
+ *
+ * We have a one-entry cache for lookin up the current ApplyCacheTXN so we
+ * don't need to do a full hash-lookup if the same xid is used
+ * sequentially. Them being used multiple times that way is rather frequent.
+ */
+void
+ApplyCacheAddChange(ApplyCache*, TransactionId, XLogRecPtr lsn, ApplyCacheChange*);
+
+/*
+ *
+ */
+void
+ApplyCacheCommit(ApplyCache*, TransactionId, XLogRecPtr lsn);
+
+void
+ApplyCacheCommitChild(ApplyCache*, TransactionId, TransactionId, XLogRecPtr lsn);
+
+void
+ApplyCacheAbort(ApplyCache*, TransactionId, XLogRecPtr lsn);
+
+typedef struct SnapshotData* Snapshot;
+
+/*
+ * if lsn == InvalidXLogRecPtr this is the first snap for the transaction
+ */
+void
+ApplyCacheAddBaseSnapshot(ApplyCache*, TransactionId, XLogRecPtr lsn, Snapshot snap);
+
+/*
+ * Will only be called for command ids > 1
+ */
+void
+ApplyCacheAddNewCommandId(ApplyCache*, TransactionId, XLogRecPtr lsn, CommandId cid);
+
+bool
+ApplyCacheIsXidKnown(ApplyCache* cache, TransactionId xid);
+#endif
diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h
new file mode 100644
index 0000000..86312d1
--- /dev/null
+++ b/src/include/replication/decode.h
@@ -0,0 +1,26 @@
+/*-------------------------------------------------------------------------
+ * decode.h
+ *     PostgreSQL WAL to logical transformation
+ *
+ * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef DECODE_H
+#define DECODE_H
+
+#include "access/xlogreader.h"
+#include "replication/applycache.h"
+
+struct Snapstate;
+
+typedef struct ReaderApplyState
+{
+	ApplyCache *apply_cache;
+	struct Snapstate *snapstate;
+} ReaderApplyState;
+
+void DecodeRecordIntoApplyCache(ReaderApplyState* state, XLogRecordBuffer* buf);
+
+#endif
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
new file mode 100644
index 0000000..ed92e75
--- /dev/null
+++ b/src/include/replication/snapbuild.h
@@ -0,0 +1,119 @@
+/*-------------------------------------------------------------------------
+ *
+ * snapbuild.h
+ *	  Exports from replication/logical/snapbuild.c.
+ *
+ * Portions Copyright (c) 2012, PostgreSQL Global Development Group
+ *
+ * src/include/replication/snapbuild.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef SNAPBUILD_H
+#define SNAPBUILD_H
+
+#include "replication/applycache.h"
+
+#include "utils/hsearch.h"
+#include "utils/snapshot.h"
+#include "access/htup.h"
+
+typedef enum
+{
+	SNAPBUILD_START,
+	/*
+	 * found initial visibility information.
+	 *
+	 * Thats either: XLOG_RUNNING_XACTS or XLOG_CHECKPOINT_SHUTDOWN
+	 */
+	SNAPBUILD_INITIAL_POINT,
+	/*
+	 * We have collected enough information to decode tuples in transactions
+	 * that started after this.
+	 *
+	 * Once we reached this we start to collect changes. We cannot apply them
+	 * yet because the might be based on transactions that were still running
+	 * when we reached them yet.
+	 */
+	SNAPBUILD_FULL_SNAPSHOT,
+	/*
+	 * Found a point after hitting built_full_snapshot where all transactions
+	 * that were running at that point finished. Till we reach that we hold off
+	 * calling any commit callbacks.
+	 */
+	SNAPBUILD_CONSISTENT
+} SnapBuildState;
+
+typedef enum
+{
+	SNAPBUILD_SKIP,
+	SNAPBUILD_DECODE
+} SnapBuildAction;
+
+typedef struct Snapstate
+{
+	SnapBuildState state;
+
+	/* all transactions smaller than this have committed/aborted */
+	TransactionId xmin;
+
+	/* all transactions bigger than this are uncommitted */
+	TransactionId xmax;
+
+	/*
+	 * All transactions in this window have to be checked via the running
+	 * array. This will only be used initially till we are past xmax_running.
+	 *
+	 * Note that we initially assume treat already running transactions to have
+	 * catalog modifications because we don't have enough information about
+	 * them to properly judge that.
+	 */
+	TransactionId xmin_running;
+	TransactionId xmax_running;
+
+	/* sorted array of running transactions, can be searched with bsearch() */
+	TransactionId* running;
+	/* how many running transactions remain */
+	size_t nrrunning;
+	/* how much free space do we have to add more running txn's */
+	size_t nrrunning_space;
+	/*
+	 * we need to keep track of the amount of tracked transactions separately
+	 * from nrrunning_space as nrunning_initial gives the range of valid xids
+	 * in the array so bsearch() can work.
+	 */
+	size_t nrrunning_initial;
+
+	TransactionId valid_after;
+
+	/*
+	 * Running (sub-)transactions with catalog changes. This will be used to
+	 * fill the committed array with a transactions xid and all it subxids
+	 * at commit.
+	 */
+	HTAB *by_txn;
+
+	/*
+	 * Transactions which could have catalog changes that committed between
+	 * xmin and xmax
+	 */
+	size_t nrcommitted;
+	size_t nrcommitted_space;
+	TransactionId* committed;
+
+	/* contains all catalog modifying txns */
+} Snapstate;
+
+extern Snapstate*
+AllocateSnapshotBuilder(ApplyCache *cache);
+
+extern void
+FreeSnapshotBuilder(Snapstate *cache);
+
+extern SnapBuildAction
+SnapBuildCallback(ApplyCache *cache, Snapstate* snapstate, XLogRecordBuffer* buf);
+
+extern HeapTuple
+LookupTableByRelFileNode(RelFileNode* r);
+
+#endif /* SNAPBUILD_H */
diff --git a/src/include/utils/tqual.h b/src/include/utils/tqual.h
index ff74f86..6c9b261 100644
--- a/src/include/utils/tqual.h
+++ b/src/include/utils/tqual.h
@@ -39,7 +39,8 @@ extern PGDLLIMPORT SnapshotData SnapshotToastData;
 
 /* This macro encodes the knowledge of which snapshots are MVCC-safe */
 #define IsMVCCSnapshot(snapshot)  \
-	((snapshot)->satisfies == HeapTupleSatisfiesMVCC)
+	((snapshot)->satisfies == HeapTupleSatisfiesMVCC || \
+	 (snapshot)->satisfies == HeapTupleSatisfiesMVCCDuringDecoding)
 
 /*
  * HeapTupleSatisfiesVisibility
@@ -89,4 +90,22 @@ extern bool HeapTupleIsSurelyDead(HeapTupleHeader tuple,
 extern void HeapTupleSetHintBits(HeapTupleHeader tuple, Buffer buffer,
 					 uint16 infomask, TransactionId xid);
 
+/*
+ * Special "satisfies" routines used during decoding xlog from a different
+ * point of lsn. Also used for timetravel SnapshotNow's.
+ */
+extern bool HeapTupleSatisfiesMVCCDuringDecoding(HeapTupleHeader tuple,
+                                                 Snapshot snapshot, Buffer buffer);
+
+/*
+ * install the 'snapshot_now' snapshot as a timetravelling snapshot replacing
+ * the normal SnapshotNow behaviour. This snapshot needs to have been created
+ * by snapbuild.c otherwise you will see crashes!
+ *
+ * FIXME: We need something resembling the real SnapshotNow to handle things
+ * like enum lookups from indices correctly.
+ */
+extern void SetupDecodingSnapshots(Snapshot snapshot_now);
+extern void RevertFromDecodingSnapshots(void);
+
 #endif   /* TQUAL_H */
