diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 700cfd8..eb6cfc5 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -14,7 +14,7 @@ include $(top_builddir)/src/Makefile.global
 
 OBJS = clog.o transam.o varsup.o xact.o rmgr.o slru.o subtrans.o multixact.o \
 	timeline.o twophase.o twophase_rmgr.o xlog.o xlogarchive.o xlogfuncs.o \
-	xlogutils.o
+	xlogreader.o xlogutils.o
 
 include $(top_srcdir)/src/backend/common.mk
 
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
new file mode 100644
index 0000000..71e7d52
--- /dev/null
+++ b/src/backend/access/transam/xlogreader.c
@@ -0,0 +1,1032 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogreader.c
+ *		Generic xlog reading facility
+ *
+ * Portions Copyright (c) 2012, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		src/backend/access/transam/readxlog.c
+ *
+ * NOTES
+ *		Documentation about how do use this interface can be found in
+ *		xlogreader.h, more specifically in the definition of the
+ *		XLogReaderState struct where all parameters are documented.
+ *
+ * TODO:
+ * * more extensive validation of read records
+ * * separation of reader/writer
+ * * customizable error response
+ * * usable without backend code around
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xlog_internal.h"
+#include "access/transam.h"
+#include "catalog/pg_control.h"
+#include "access/xlogreader.h"
+
+/* If (very) verbose debugging is needed:
+ * #define VERBOSE_DEBUG
+ */
+
+XLogReaderState*
+XLogReaderAllocate(void)
+{
+	XLogReaderState* state = (XLogReaderState*)malloc(sizeof(XLogReaderState));
+	int i;
+
+	if (!state)
+		goto oom;
+
+	memset(&state->buf.record, 0, sizeof(XLogRecord));
+	state->buf.record_data_size = XLOG_BLCKSZ*8;
+	state->buf.record_data =
+			malloc(state->buf.record_data_size);
+
+	if (!state->buf.record_data)
+		goto oom;
+
+	memset(state->buf.record_data, 0, state->buf.record_data_size);
+	state->buf.origptr = InvalidXLogRecPtr;
+
+	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
+	{
+		state->buf.bkp_block_data[i] =
+			malloc(BLCKSZ);
+
+		if (!state->buf.bkp_block_data[i])
+			goto oom;
+	}
+
+	state->is_record_interesting = NULL;
+	state->writeout_data = NULL;
+	state->finished_record = NULL;
+	state->private_data = NULL;
+	state->output_buffer_size = 0;
+
+	XLogReaderReset(state);
+	return state;
+
+oom:
+	ereport(ERROR,
+	        (errcode(ERRCODE_OUT_OF_MEMORY),
+	         errmsg("out of memory"),
+	         errdetail("failed while allocating an XLogReader")));
+	return NULL;
+}
+
+void
+XLogReaderFree(XLogReaderState* state)
+{
+	int i;
+
+	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
+	{
+		free(state->buf.bkp_block_data[i]);
+	}
+
+	free(state->buf.record_data);
+
+	free(state);
+}
+
+void
+XLogReaderReset(XLogReaderState* state)
+{	state->in_record = false;
+	state->in_record_header = false;
+	state->do_reassemble_record = false;
+	state->in_bkp_blocks = 0;
+	state->in_bkp_block_header = false;
+	state->in_skip = false;
+	state->remaining_size = 0;
+	state->already_written_size = 0;
+	state->incomplete = false;
+	state->initialized = false;
+	state->needs_input = false;
+	state->needs_output = false;
+	state->stop_at_record_boundary = false;
+}
+
+static inline bool
+XLogReaderHasInput(XLogReaderState* state, Size size)
+{
+	XLogRecPtr tmp = state->curptr;
+	XLByteAdvance(tmp, size);
+	if (XLByteLE(state->endptr, tmp))
+		return false;
+	return true;
+}
+
+static inline bool
+XLogReaderHasOutput(XLogReaderState* state, Size size){
+	/* if we don't do output or have no limits in the output size */
+	if (state->writeout_data == NULL || state->output_buffer_size == 0)
+		return true;
+
+	if (state->already_written_size + size > state->output_buffer_size)
+		return false;
+
+	return true;
+}
+
+static inline bool
+XLogReaderHasSpace(XLogReaderState* state, Size size)
+{
+	if (!XLogReaderHasInput(state, size))
+		return false;
+
+	if (!XLogReaderHasOutput(state, size))
+		return false;
+
+	return true;
+}
+
+/* ----------------------------------------------------------------------------
+ * Write out data iff
+ * 1. we have a writeout_data callback
+ * 2. were currently behind startptr
+ *
+ * The 2nd condition requires that we will never start a write before startptr
+ * and finish after it. The code needs to guarantee this.
+ * ----------------------------------------------------------------------------
+ */
+static void
+XLogReaderInternalWrite(XLogReaderState* state, char* data, Size size)
+{
+	/* no point in doing any checks if we don't have a write callback */
+	if (!state->writeout_data)
+		return;
+
+	if (XLByteLT(state->curptr, state->startptr))
+		return;
+
+	state->writeout_data(state, data, size);
+}
+
+/*
+ * Change state so we read the next bkp block if there is one. If there is none
+ * return false so that the caller can consider the record finished.
+ */
+static bool
+XLogReaderInternalNextBkpBlock(XLogReaderState* state)
+{
+	Assert(state->in_record);
+	Assert(state->remaining_size == 0);
+
+	/*
+	 * only continue with in_record=true if we have bkp block
+	 */
+	while (state->in_bkp_blocks)
+	{
+		if (state->buf.record.xl_info &
+		    XLR_BKP_BLOCK(XLR_MAX_BKP_BLOCKS - state->in_bkp_blocks))
+		{
+#ifdef VERBOSE_DEBUG
+			elog(LOG, "reading bkp block %u", XLR_MAX_BKP_BLOCKS - state->in_bkp_blocks);
+#endif
+			break;
+		}
+		state->in_bkp_blocks--;
+	}
+
+	if (!state->in_bkp_blocks)
+		return false;
+
+	/* bkp blocks are stored without regard for alignment */
+
+	state->in_bkp_block_header = true;
+	state->remaining_size = sizeof(BkpBlock);
+
+	return true;
+}
+
+void
+XLogReaderRead(XLogReaderState* state)
+{
+	state->needs_input = false;
+	state->needs_output = false;
+
+	/*
+	 * Do some basic sanity checking and setup if were starting anew.
+	 */
+	if (!state->initialized)
+	{
+		if (!state->read_page)
+			elog(ERROR, "The read_page callback needs to be set");
+
+		state->initialized = true;
+		/*
+		 * we need to start reading at the beginning of the page to understand
+		 * what we are currently reading. We will skip over that because we
+		 * check curptr < startptr later.
+		 */
+		state->curptr = state->startptr;
+		state->curptr -= state->startptr % XLOG_BLCKSZ;
+
+		Assert(state->curptr % XLOG_BLCKSZ == 0);
+
+		elog(LOG, "start reading from %X/%X, scrolled back to %X/%X",
+		     (uint32) (state->startptr >> 32), (uint32) state->startptr,
+		     (uint32) (state->curptr >> 32), (uint32) state->curptr);
+	}
+	else
+	{
+		/*
+		 * We didn't finish reading the last time round. Since then new data
+		 * could have been appended to the current page. So we need to update
+		 * our copy of that.
+		 *
+		 * XXX: We could tie that to state->needs_input but that doesn't seem
+		 * worth the complication atm.
+		 */
+		XLogRecPtr rereadptr = state->curptr;
+		rereadptr -= rereadptr % XLOG_BLCKSZ;
+
+		XLByteAdvance(rereadptr, SizeOfXLogShortPHD);
+
+		if(!XLByteLE(rereadptr, state->endptr))
+			goto not_enough_input;
+
+		rereadptr -= rereadptr % XLOG_BLCKSZ;
+
+		state->read_page(state, state->cur_page, rereadptr);
+
+		/*
+		 * we will only rely on this data being valid if we are allowed to read
+		 * that far, so its safe to just always read the header. read_page
+		 * always returns a complete page even though its contents may be
+		 * invalid.
+		 */
+		state->page_header = (XLogPageHeader)state->cur_page;
+		state->page_header_size = XLogPageHeaderSize(state->page_header);
+	}
+
+#ifdef VERBOSE_DEBUG
+	elog(LOG, "starting reading for %X/%X from %X/%X",
+	     (uint32)(state->startptr >> 32), (uint32) state->startptr,
+	     (uint32)(state->curptr >> 32), (uint32) state->curptr);
+#endif
+	/*
+	 * Iterate over the data and reassemble it until we reached the end of the
+	 * data. As we advance curptr inside the loop we need to recheck whether we
+	 * have space inside as well.
+	 */
+	while (XLByteLT(state->curptr, state->endptr))
+	{
+		/* how much space is left in the current block */
+		uint32 len_in_block;
+
+		/*
+		 * did we read a partial xlog record due to input/output constraints?
+		 * If yes, we need to signal that to the caller so it can be handled
+		 * sensibly there. E.g. by waiting on a latch till more xlog is
+		 * available.
+		 */
+		bool partial_read = false;
+		bool partial_write = false;
+
+#ifdef VERBOSE_DEBUG
+		elog(LOG, "one loop start: record: %u header %u, skip: %u bkb_block: %d in_bkp_header: %u curptr: %X/%X remaining: %u, off: %u",
+		     state->in_record, state->in_record_header, state->in_skip,
+		     state->in_bkp_blocks, state->in_bkp_block_header,
+		     (uint32)(state->curptr >> 32), (uint32)state->curptr,
+		     state->remaining_size,
+		     (uint32)(state->curptr % XLOG_BLCKSZ));
+#endif
+
+		/*
+		 * at a page boundary, read the header
+		 */
+		if (state->curptr % XLOG_BLCKSZ == 0)
+		{
+#ifdef VERBOSE_DEBUG
+			elog(LOG, "reading page header, at %X/%X",
+			     (uint32)(state->curptr >> 32), (uint32)state->curptr);
+#endif
+			/*
+			 * check whether we can read enough to see the short header, we
+			 * need to read the short header's xlp_info to know whether this is
+			 * a short or a long header.
+			 */
+			if (!XLogReaderHasInput(state, SizeOfXLogShortPHD))
+				goto not_enough_input;
+
+			state->read_page(state, state->cur_page, state->curptr);
+			state->page_header = (XLogPageHeader)state->cur_page;
+			state->page_header_size = XLogPageHeaderSize(state->page_header);
+
+			/* check that we have enough space to read/write the full header */
+			if (!XLogReaderHasInput(state, state->page_header_size))
+				goto not_enough_input;
+
+			if (!XLogReaderHasOutput(state, state->page_header_size))
+				goto not_enough_output;
+
+			XLogReaderInternalWrite(state, state->cur_page, state->page_header_size);
+
+			XLByteAdvance(state->curptr, state->page_header_size);
+
+			if (state->page_header->xlp_info & XLP_FIRST_IS_CONTRECORD)
+			{
+				if (!state->in_record)
+				{
+					/*
+					 * we need to support this case for initializing a cluster
+					 * because we need to read/writeout a full page but there
+					 * may be none without records being split across.
+					 *
+					 * If we are before startptr there is nothing special about
+					 * this case. Most pages start with a contrecord.
+					 */
+					if(!XLByteLT(state->curptr, state->startptr))
+					{
+						elog(WARNING, "contrecord although we are not in a record at %X/%X, starting at %X/%X",
+						     (uint32)(state->curptr >> 32), (uint32)state->curptr,
+						     (uint32)(state->startptr >> 32), (uint32)state->startptr);
+					}
+					state->in_record = true;
+					state->check_crc = false;
+					state->do_reassemble_record = false;
+					state->remaining_size = state->page_header->xlp_rem_len;
+					continue;
+				}
+				else
+				{
+					if (state->page_header->xlp_rem_len < state->remaining_size)
+						elog(PANIC, "remaining length is smaller than to be read data. xlp_rem_len: %u needed: %u",
+						     state->page_header->xlp_rem_len, state->remaining_size
+							);
+				}
+			}
+			else if (state->in_record)
+			{
+				elog(PANIC, "no contrecord although were in a record that continued onto the next page. info %hhu at page %X/%X",
+				     state->page_header->xlp_info,
+				     (uint32)(state->page_header->xlp_pageaddr >> 32),
+				     (uint32)state->page_header->xlp_pageaddr);
+			}
+		}
+
+		/*
+		 * If a record will start next, skip over alignment padding.
+		 */
+		if (!state->in_record)
+		{
+			/*
+			 * a record must be stored aligned. So skip as far we need to
+			 * comply with that.
+			 */
+			Size skiplen;
+			skiplen = MAXALIGN(state->curptr) - state->curptr;
+
+			if (skiplen)
+			{
+				if (!XLogReaderHasSpace(state, skiplen))
+				{
+#ifdef VERBOSE_DEBUG
+					elog(LOG, "not aligning bc of space");
+#endif
+					/*
+					 * We don't have enough space to read/write the alignment
+					 * bytes, so fake up a skip-state
+					 */
+					state->in_record = true;
+					state->check_crc = false;
+					state->in_skip = true;
+					state->remaining_size = skiplen;
+
+					if (!XLogReaderHasInput(state, skiplen))
+						goto not_enough_input;
+					goto not_enough_output;
+				}
+#ifdef VERBOSE_DEBUG
+				elog(LOG, "aligning from %X/%X to %X/%X, skips %lu",
+				     (uint32)(state->curptr >> 32), (uint32)state->curptr,
+				     (uint32)((state->curptr + skiplen) >> 32),
+				     (uint32)(state->curptr + skiplen),
+				     skiplen
+					);
+#endif
+				XLogReaderInternalWrite(state, NULL, skiplen);
+
+				XLByteAdvance(state->curptr, skiplen);
+
+				/*
+				 * full pages are not treated as continuations, so restart on
+				 * the beginning of the new page.
+				 */
+				if ((state->curptr % XLOG_BLCKSZ) == 0)
+					continue;
+			}
+		}
+
+		/*
+		 * --------------------------------------------------------------------
+		 * Start to read a record
+		 * --------------------------------------------------------------------
+		 */
+		if (!state->in_record)
+		{
+			state->in_record = true;
+			state->in_record_header = true;
+			state->check_crc = true;
+
+			/*
+			 * If the record starts before startptr were not interested in its
+			 * contents. There is also no point in reassembling if were not
+			 * analyzing the contents.
+			 *
+			 * If every record needs to be processed by finish_record restarts
+			 * need to be started after the end of the last record.
+			 *
+			 * See state->restart_ptr for that point.
+			 */
+			if ((state->finished_record == NULL &&
+			     !state->stop_at_record_boundary) ||
+				XLByteLT(state->curptr, state->startptr)){
+				state->do_reassemble_record = false;
+			}
+			else
+				state->do_reassemble_record = true;
+
+			state->remaining_size = SizeOfXLogRecord;
+
+			/*
+			 * we quickly loose the original address of a record as we can skip
+			 * records and such, so keep the original addresses.
+			 */
+			state->buf.origptr = state->curptr;
+
+			INIT_CRC32(state->next_crc);
+		}
+
+		Assert(state->in_record);
+
+		/*
+		 * Compute how much space on the current page is left and how much of
+		 * that we actually are interested in.
+		 */
+
+		/* amount of space on page */
+		if (state->curptr % XLOG_BLCKSZ == 0)
+			len_in_block = 0;
+		else
+			len_in_block = XLOG_BLCKSZ - (state->curptr % XLOG_BLCKSZ);
+
+		/* we have more data available than we need, so read only as much as needed */
+		if (len_in_block > state->remaining_size)
+			len_in_block = state->remaining_size;
+
+		/*
+		 * Handle constraints set by startptr, endptr and the size of the
+		 * output buffer.
+		 *
+		 * Normally we use XLogReaderHasSpace for that, but thats not
+		 * convenient here because we want to read data in parts. It also
+		 * doesn't handle splitting around startptr. So, open-code the logic
+		 * for that.
+		 */
+
+		/* to make sure we always writeout in the same chunks, split at startptr */
+		if (XLByteLT(state->curptr, state->startptr) &&
+		    (state->curptr + len_in_block) > state->startptr )
+		{
+#ifdef VERBOSE_DEBUG
+			Size cur_len = len_in_block;
+#endif
+			len_in_block = state->startptr - state->curptr;
+#ifdef VERBOSE_DEBUG
+			elog(LOG, "truncating len_in_block due to startptr from %lu to %u",
+			     cur_len, len_in_block);
+#endif
+		}
+
+		/* do we have enough valid data to read the current block? */
+		if (state->curptr + len_in_block > state->endptr)
+		{
+#ifdef VERBOSE_DEBUG
+			Size cur_len = len_in_block;
+#endif
+			len_in_block = state->endptr - state->curptr;
+			partial_read = true;
+#ifdef VERBOSE_DEBUG
+			elog(LOG, "truncating len_in_block due to endptr %X/%X %lu to %i at %X/%X",
+			     (uint32)(state->startptr >> 32), (uint32)state->startptr,
+			     cur_len, len_in_block,
+			     (uint32)(state->curptr >> 32), (uint32)state->curptr);
+#endif
+		}
+
+		/* can we write what we read? */
+		if (state->writeout_data != NULL && state->output_buffer_size != 0
+				&& len_in_block > (state->output_buffer_size - state->already_written_size))
+		{
+#ifdef VERBOSE_DEBUG
+			Size cur_len = len_in_block;
+#endif
+			len_in_block = state->output_buffer_size - state->already_written_size;
+			partial_write = true;
+#ifdef VERBOSE_DEBUG
+			elog(LOG, "truncating len_in_block due to output_buffer_size %lu to %i",
+			     cur_len, len_in_block);
+#endif
+		}
+
+		/* --------------------------------------------------------------------
+		 * copy data of the size determined above to whatever we are currently
+		 * reading.
+		 * --------------------------------------------------------------------
+		 */
+
+		/* nothing to do if were skipping */
+		if (state->in_skip)
+		{
+			/* writeout zero data, original content is boring */
+			XLogReaderInternalWrite(state, NULL, len_in_block);
+
+			/*
+			 * we may not need this here because were skipping over something
+			 * really uninteresting but keeping track of that would be
+			 * unnecessarily complicated.
+			 */
+			COMP_CRC32(state->next_crc,
+			           state->cur_page + (state->curptr % XLOG_BLCKSZ),
+			           len_in_block);
+		}
+		/* reassemble the XLogRecord struct, quite likely in one-go */
+		else if (state->in_record_header)
+		{
+			/*
+			 * Need to clampt o sizeof(XLogRecord), we don't have the padding
+			 * in buf.record...
+			 */
+			Size already_written = SizeOfXLogRecord - state->remaining_size;
+			Size padding_size = SizeOfXLogRecord - sizeof(XLogRecord);
+			Size copysize = len_in_block;
+
+			if (state->remaining_size - len_in_block < padding_size)
+				copysize = Max(0, state->remaining_size - (int)padding_size);
+
+			memcpy((char*)&state->buf.record + already_written,
+			       state->cur_page + (state->curptr % XLOG_BLCKSZ),
+			       copysize);
+
+			XLogReaderInternalWrite(state,
+			                        state->cur_page + (state->curptr % XLOG_BLCKSZ),
+			                        len_in_block);
+#ifdef VERBOSE_DEBUG
+			elog(LOG, "copied part of the record. len_in_block %u, remaining: %u",
+			     len_in_block, state->remaining_size);
+#endif
+		}
+		/*
+		 * copy data into the current backup block header so we have enough
+		 * knowledge to read the actual backup block afterwards
+		 */
+		else if (state->in_bkp_block_header)
+		{
+			int blockno = XLR_MAX_BKP_BLOCKS - state->in_bkp_blocks;
+			BkpBlock* bkpb = &state->buf.bkp_block[blockno];
+
+			Assert(state->in_bkp_blocks);
+
+			memcpy((char*)bkpb + sizeof(BkpBlock) - state->remaining_size,
+			       state->cur_page + (state->curptr % XLOG_BLCKSZ),
+			       len_in_block);
+
+			XLogReaderInternalWrite(state,
+			                        state->cur_page + ((uint32)state->curptr % XLOG_BLCKSZ),
+			                        len_in_block);
+
+			COMP_CRC32(state->next_crc,
+			           state->cur_page + (state->curptr % XLOG_BLCKSZ),
+			           len_in_block);
+
+#ifdef VERBOSE_DEBUG
+			elog(LOG, "copying bkp header for block %d, %u bytes, complete %lu at %X/%X rem %u",
+			     blockno, len_in_block, sizeof(BkpBlock),
+			     (uint32)(state->curptr >> 32), (uint32)state->curptr,
+			     state->remaining_size);
+
+			if (state->remaining_size == len_in_block)
+			{
+				elog(LOG, "block off %u len %u", bkpb->hole_offset, bkpb->hole_length);
+			}
+#endif
+		}
+		/*
+		 * Reassemble the current backup block, those usually are the biggest
+		 * parts of individual XLogRecords so this might take several rounds.
+		 */
+		else if (state->in_bkp_blocks)
+		{
+			int blockno = XLR_MAX_BKP_BLOCKS - state->in_bkp_blocks;
+			BkpBlock* bkpb = &state->buf.bkp_block[blockno];
+			char* data = state->buf.bkp_block_data[blockno];
+
+			if (state->do_reassemble_record)
+			{
+				memcpy(data + BLCKSZ - bkpb->hole_length - state->remaining_size,
+				       state->cur_page + (state->curptr % XLOG_BLCKSZ),
+				       len_in_block);
+			}
+
+			XLogReaderInternalWrite(state,
+			                        state->cur_page + (state->curptr % XLOG_BLCKSZ),
+			                        len_in_block);
+
+			COMP_CRC32(state->next_crc,
+			           state->cur_page + (state->curptr % XLOG_BLCKSZ),
+			           len_in_block);
+
+#ifdef VERBOSE_DEBUG
+			elog(LOG, "copying %u bytes of data for bkp block %d, complete %u",
+			     len_in_block, blockno, state->remaining_size);
+#endif
+		}
+		/*
+		 * read the (rest) of the XLogRecord's data. Note that this is not the
+		 * XLogRecord struct itself!
+		 */
+		else if (state->in_record)
+		{
+			if (state->do_reassemble_record)
+			{
+				if(state->buf.record_data_size < state->buf.record.xl_len){
+					state->buf.record_data_size = state->buf.record.xl_len;
+					state->buf.record_data =
+						realloc(state->buf.record_data,
+						        state->buf.record_data_size);
+					if(!state->buf.record_data)
+						elog(ERROR, "could not allocate memory for contents of an xlog record");
+				}
+
+				memcpy(state->buf.record_data
+				       + state->buf.record.xl_len
+				       - state->remaining_size,
+				       state->cur_page + (state->curptr % XLOG_BLCKSZ),
+				       len_in_block);
+			}
+			XLogReaderInternalWrite(state,
+			                        state->cur_page + (state->curptr % XLOG_BLCKSZ),
+			                        len_in_block);
+
+
+			COMP_CRC32(state->next_crc,
+			           state->cur_page + (state->curptr % XLOG_BLCKSZ),
+			           len_in_block);
+
+#ifdef VERBOSE_DEBUG
+			elog(LOG, "copying %u bytes into a record at off %u",
+			     len_in_block, (uint32)(state->curptr % XLOG_BLCKSZ));
+#endif
+		}
+
+		/* should handle wrapping around to next page */
+		XLByteAdvance(state->curptr, len_in_block);
+
+		/* do the math of how much we need to read next round */
+		state->remaining_size -= len_in_block;
+
+		/*
+		 * --------------------------------------------------------------------
+		 * we completed whatever we were reading. So, handle going to the next
+		 * state.
+		 * --------------------------------------------------------------------
+		 */
+		if (state->remaining_size == 0)
+		{
+			/* completed reading - and potentially reassembling - the record */
+			if (state->in_record_header)
+			{
+				state->in_record_header = false;
+
+				/* ------------------------------------------------------------
+				 * normally we don't look at the content of xlog records here,
+				 * XLOG_SWITCH is a special case though, as everything left in
+				 * that segment won't be sensbible content.
+				 * So skip to the next segment.
+				 * ------------------------------------------------------------
+				 */
+				if (state->buf.record.xl_rmid == RM_XLOG_ID
+				    && (state->buf.record.xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
+				{
+					/*
+					 * Pretend the current data extends to end of segment
+					 */
+					elog(LOG, "XLOG_SWITCH");
+					state->curptr += XLogSegSize - 1;
+					state->curptr -= state->curptr % XLogSegSize;
+
+					state->in_record = false;
+					Assert(!state->in_bkp_blocks);
+					Assert(!state->in_skip);
+					continue;
+				}
+				else if (state->is_record_interesting == NULL ||
+				         state->is_record_interesting(state, &state->buf.record))
+				{
+					state->remaining_size = state->buf.record.xl_len;
+					Assert(state->in_bkp_blocks == 0);
+					Assert(!state->in_bkp_block_header);
+					Assert(!state->in_skip);
+#ifdef VERBOSE_DEBUG
+					elog(LOG, "found interesting record at %X/%X, prev: %X/%X, rmid %hhu, tx %u, len %u tot %u",
+					     (uint32)(state->buf.origptr >> 32), (uint32)state->buf.origptr,
+					     (uint32)(state->buf.record.xl_prev >> 32), (uint32)(state->buf.record.xl_prev),
+					     state->buf.record.xl_rmid, state->buf.record.xl_xid,
+					     state->buf.record.xl_len, state->buf.record.xl_tot_len);
+#endif
+
+				}
+				/* ------------------------------------------------------------
+				 * ok, everybody aggrees, the content of the current record are
+				 * just plain boring. So fake-up a record that replaces it with
+				 * a NOOP record.
+				 *
+				 * FIXME: we should allow "compressing" the output here. That
+				 * is write something that shows how long the record should be
+				 * if everything is decompressed again. This can radically
+				 * reduce space-usage over the wire.
+				 * It could also be very useful for traditional SR by removing
+				 * unneded BKP blocks from being transferred.  For that we
+				 * would need to recompute CRCs though, which we currently
+				 * don't support.
+				 * ------------------------------------------------------------
+				 */
+				else
+				{
+					/*
+					 * we need to fix up a fake record with correct length that
+					 * can be written out.
+					 */
+					XLogRecord spacer;
+
+					elog(LOG, "found boring record at %X/%X, rmid %hhu, tx %u, len %u tot %u",
+					     (uint32)(state->buf.origptr >> 32), (uint32)state->buf.origptr,
+					     state->buf.record.xl_rmid, state->buf.record.xl_xid,
+					     state->buf.record.xl_len, state->buf.record.xl_tot_len);
+
+					/*
+					 * xl_tot_len contains the size of the XLogRecord itself,
+					 * we read that already though.
+					 */
+					state->remaining_size = state->buf.record.xl_tot_len
+						- SizeOfXLogRecord;
+
+					state->in_record = true;
+					state->check_crc = true;
+					state->in_bkp_blocks = 0;
+					state->in_skip = true;
+
+					spacer.xl_prev = state->buf.origptr;
+					spacer.xl_xid = InvalidTransactionId;
+					spacer.xl_tot_len = state->buf.record.xl_tot_len;
+					spacer.xl_len = state->buf.record.xl_tot_len - SizeOfXLogRecord;
+					spacer.xl_rmid = RM_XLOG_ID;
+					spacer.xl_info = XLOG_NOOP;
+
+					XLogReaderInternalWrite(state, (char*)&spacer,
+					                        sizeof(XLogRecord));
+
+					/*
+					 * write out the padding in a separate write, otherwise we
+					 * would overrun the stack
+					 */
+					XLogReaderInternalWrite(state, NULL,
+					                        SizeOfXLogRecord - sizeof(XLogRecord));
+
+				}
+			}
+			/*
+			 * in the in_skip case we already read backup blocks because we
+			 * likely read record->xl_tot_len, so everything is finished.
+			 */
+			else if (state->in_skip)
+			{
+				state->in_record = false;
+				state->in_bkp_blocks = 0;
+				state->in_skip = false;
+				/* alignment is handled when starting to read a record */
+			}
+			/*
+			 * We read the header of the current block. Start reading the
+			 * content of that now.
+			 */
+			else if (state->in_bkp_block_header)
+			{
+				BkpBlock* bkpb;
+				int blockno = XLR_MAX_BKP_BLOCKS - state->in_bkp_blocks;
+
+				Assert(state->in_bkp_blocks);
+
+				bkpb = &state->buf.bkp_block[blockno];
+
+				if(bkpb->hole_length >= BLCKSZ)
+				{
+					elog(ERROR, "hole_length of block %u is %u but maximum is %u",
+					     blockno, bkpb->hole_length, BLCKSZ);
+				}
+
+				if(bkpb->hole_offset >= BLCKSZ)
+				{
+					elog(ERROR, "hole_offset of block %u is %u but maximum is %u",
+					     blockno, bkpb->hole_offset, BLCKSZ);
+				}
+
+				state->remaining_size = BLCKSZ - bkpb->hole_length;
+				state->in_bkp_block_header = false;
+
+#ifdef VERBOSE_DEBUG
+				elog(LOG, "completed reading of header for %d, reading data now %u hole %u, off %u",
+				     blockno, state->remaining_size, bkpb->hole_length,
+				     bkpb->hole_offset);
+#endif
+			}
+			/*
+			 * The current backup block is finished, more could be following
+			 */
+			else if (state->in_bkp_blocks)
+			{
+				int blockno = XLR_MAX_BKP_BLOCKS - state->in_bkp_blocks;
+				BkpBlock* bkpb;
+				char* bkpb_data;
+
+				Assert(!state->in_bkp_block_header);
+
+				bkpb = &state->buf.bkp_block[blockno];
+				bkpb_data = state->buf.bkp_block_data[blockno];
+
+				/*
+				 * reassemble block to its entirety by removing the bkp_hole
+				 * "compression"
+				 */
+				if(bkpb->hole_length){
+					memmove(bkpb_data + bkpb->hole_offset,
+					        bkpb_data + bkpb->hole_offset + bkpb->hole_length,
+					        BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
+					memset(bkpb_data + bkpb->hole_offset,
+					       0,
+					       bkpb->hole_length);
+				}
+
+				state->in_bkp_blocks--;
+
+				state->in_skip = false;
+
+				if(!XLogReaderInternalNextBkpBlock(state))
+					goto all_bkp_finished;
+
+			}
+			/*
+			 * read a non-skipped record, start reading bkp blocks afterwards
+			 */
+			else if (state->in_record)
+			{
+				Assert(!state->in_skip);
+
+				state->in_bkp_blocks = XLR_MAX_BKP_BLOCKS;
+
+				if(!XLogReaderInternalNextBkpBlock(state))
+					goto all_bkp_finished;
+			}
+		}
+		/*
+		 * Something could only be partially read inside a single block because
+		 * of input or output space constraints..
+		 */
+		else if (partial_read)
+		{
+			partial_read = false;
+			goto not_enough_input;
+		}
+		else if (partial_write)
+		{
+			partial_write = false;
+			goto not_enough_output;
+		}
+		/*
+		 * Data continues into the next block.
+		 */
+		else
+		{
+		}
+
+#ifdef VERBOSE_DEBUG
+		elog(LOG, "one loop end: record: %u header: %u, skip: %u bkb_block: %d in_bkp_header: %u curpos: %X/%X remaining: %u, off: %u",
+		     state->in_record, state->in_record_header, state->in_skip,
+		     state->in_bkp_blocks, state->in_bkp_block_header,
+		     (uint32)(state->curptr >> 32), (uint32)state->curptr,
+		     state->remaining_size,
+		     (uint32)(state->curptr % XLOG_BLCKSZ));
+#endif
+		continue;
+
+		/*
+		 * we fully read a record. Process its contents if needed and start
+		 * reading the next record afterwards
+		 */
+	all_bkp_finished:
+		{
+			Assert(state->in_record);
+			Assert(!state->in_skip);
+			Assert(!state->in_bkp_block_header);
+			Assert(!state->in_bkp_blocks);
+
+			state->in_record = false;
+
+			/* compute and verify crc */
+			COMP_CRC32(state->next_crc,
+			           &state->buf.record,
+			           offsetof(XLogRecord, xl_crc));
+
+			FIN_CRC32(state->next_crc);
+
+			if (state->check_crc &&
+			    state->next_crc != state->buf.record.xl_crc) {
+				elog(ERROR, "crc mismatch: newly computed : %x, existing is %x",
+				     state->next_crc, state->buf.record.xl_crc);
+			}
+
+			/*
+			 * if we haven't reassembled the record there is no point in
+			 * calling the finished callback because we do not have any
+			 * interesting data. do_reassemble_record is false if we don't have
+			 * a finished_record callback.
+			 */
+			if (state->do_reassemble_record)
+			{
+				/* in stop_at_record_boundary thats a valid case */
+				if (state->finished_record)
+				{
+					state->finished_record(state, &state->buf);
+				}
+
+				if (state->stop_at_record_boundary)
+					goto out;
+			}
+
+			/* alignment is handled when starting to read a record */
+#ifdef VERBOSE_DEBUG
+			elog(LOG, "finished record at %X/%X to %X/%X, already_written_size: %lu, reas = %d",
+			     (uint32)(state->curptr >> 32), (uint32)state->curptr,
+			     (uint32)(state->endptr >> 32), (uint32)state->endptr,
+			     state->already_written_size, state->do_reassemble_record);
+#endif
+
+		}
+	}
+out:
+	/*
+	 * we are finished, check whether we finished everything, this may be
+	 * useful for the caller.
+	 */
+	if (state->in_skip)
+	{
+		state->incomplete = true;
+	}
+	else if (state->in_record)
+	{
+		state->incomplete = true;
+	}
+	else
+	{
+		state->incomplete = false;
+	}
+	return;
+
+not_enough_input:
+	/* signal we need more xlog and finish */
+	state->needs_input = true;
+	goto out;
+
+not_enough_output:
+	/* signal we need more space to write output to */
+	state->needs_output = true;
+	goto out;
+}
+
+XLogRecordBuffer*
+XLogReaderReadOne(XLogReaderState* state)
+{
+	bool was_set_to_stop = state->stop_at_record_boundary;
+	XLogRecPtr last_record = state->buf.origptr;
+
+	if (!was_set_to_stop)
+		state->stop_at_record_boundary = true;
+
+	XLogReaderRead(state);
+
+	if (!was_set_to_stop)
+		state->stop_at_record_boundary = false;
+
+	/* check that we fully read it and that its not the same as the last one */
+	if (state->incomplete ||
+	    XLByteEQ(last_record, state->buf.origptr))
+		return NULL;
+
+	return &state->buf;
+}
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
new file mode 100644
index 0000000..f45c90b
--- /dev/null
+++ b/src/include/access/xlogreader.h
@@ -0,0 +1,264 @@
+/*-------------------------------------------------------------------------
+ *
+ * readxlog.h
+ *
+ *		Generic xlog reading facility.
+ *
+ * Portions Copyright (c) 2012, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		src/include/access/readxlog.h
+ *
+ * NOTES
+ *		Check the definition of the XLogReaderState struct for instructions on
+ *		how to use the XLogReader infrastructure.
+ *
+ *		The basic idea is to allocate an XLogReaderState via
+ *		XLogReaderAllocate, fill out the wanted callbacks, set startptr/endptr
+ *		and call XLogReaderRead(state). That will iterate over the record as
+ *		long as it has enough input to reassemble a record calling
+ *		is_interesting/finish_record for every record found.
+ *-------------------------------------------------------------------------
+ */
+#ifndef READXLOG_H
+#define READXLOG_H
+
+#include "access/xlog_internal.h"
+
+/*
+ * Used to store a reassembled record.
+ */
+typedef struct XLogRecordBuffer
+{
+	/* the record itself */
+	XLogRecord record;
+
+	/* at which LSN was that record found at */
+	XLogRecPtr origptr;
+
+	/* the data for xlog record */
+	char* record_data;
+	uint32 record_data_size;
+
+	BkpBlock bkp_block[XLR_MAX_BKP_BLOCKS];
+	char* bkp_block_data[XLR_MAX_BKP_BLOCKS];
+} XLogRecordBuffer;
+
+
+struct XLogReaderState;
+
+/*
+ * The callbacks are explained in more detail inside the XLogReaderState
+ * struct.
+ */
+typedef bool (*XLogReaderStateInterestingCB)(struct XLogReaderState* state,
+                                             XLogRecord* r);
+typedef void (*XLogReaderStateWriteoutCB)(struct XLogReaderState* state,
+                                          char* data, Size length);
+typedef void (*XLogReaderStateFinishedRecordCB)(struct XLogReaderState* state,
+                                                XLogRecordBuffer* buf);
+typedef void (*XLogReaderStateReadPageCB)(struct XLogReaderState* state,
+                                          char* cur_page, XLogRecPtr at);
+
+typedef struct XLogReaderState
+{
+	/* ----------------------------------------
+	 * Public parameters
+	 * ----------------------------------------
+	 */
+
+	/* callbacks */
+
+	/*
+	 * Called to decide whether a xlog record is interesting and should be
+	 * assembled, analyzed (finished_record) and written out or skipped.
+	 *
+	 * Gets passed the current state as the first parameter and and the record
+	 * *header* to decide over as the second.
+	 *
+	 * Return false to skip the record - and output a NOOP record instead - and
+	 * true to reassemble it fully.
+	 *
+	 * If set to NULL every record is considered to be interesting.
+	 */
+	XLogReaderStateInterestingCB is_record_interesting;
+
+	/*
+	 * Writeout xlog data.
+	 *
+	 * The 'state' parameter is passed as the first parameter and a pointer to
+	 * the 'data' and its 'length' as second and third paramter. If the 'data'
+	 * is NULL zeroes need to be written out.
+	 */
+	XLogReaderStateWriteoutCB writeout_data;
+
+	/*
+	 * If set to anything but NULL this callback gets called after a record,
+	 * including the backup blocks, has been fully reassembled.
+	 *
+	 * The first parameter is the current 'state'. 'buf', an XLogRecordBuffer,
+	 * gets passed as the second parameter and contains the record header, its
+	 * data, original position/lsn and backup block.
+	 */
+	XLogReaderStateFinishedRecordCB finished_record;
+
+	/*
+	 * Data input function.
+	 *
+	 * This callback *has* to be implemented.
+	 *
+	 * Has to read XLOG_BLKSZ bytes that are at the location 'at' into the
+	 * memory pointed at by cur_page although everything behind endptr does not
+	 * have to be valid.
+	 */
+	XLogReaderStateReadPageCB read_page;
+
+	/*
+	 * this can be used by the caller to pass state to the callbacks without
+	 * using global variables or such ugliness. It will neither be read or set
+	 * by anything but your code.
+	 */
+	void* private_data;
+
+
+	/* from where to where are we reading */
+
+	/* so we know where interesting data starts after scrolling back to the beginning of a page */
+	XLogRecPtr startptr;
+
+	/* continue up to here in this run */
+	XLogRecPtr endptr;
+
+	/*
+	 * size of the output buffer, if set to zero (default), there is no limit
+	 * in the output buffer size.
+	 */
+	Size output_buffer_size;
+
+	/*
+	 * Stop reading and return after every completed record.
+	 */
+	bool stop_at_record_boundary;
+
+	/* ----------------------------------------
+	 * output parameters
+	 * ----------------------------------------
+	 */
+
+	/* we need new input data - a later endptr - to continue reading */
+	bool needs_input;
+
+	/* we need new output space to continue reading */
+	bool needs_output;
+
+	/* track our progress */
+	XLogRecPtr curptr;
+
+	/*
+	 * are we in the middle of something? This is useful for the outside to
+	 * know whether to start reading anew
+	 */
+	bool incomplete;
+
+	/* ----------------------------------------
+	 * private/internal state
+	 * ----------------------------------------
+	 */
+
+	char cur_page[XLOG_BLCKSZ];
+	XLogPageHeader page_header;
+	uint32 page_header_size;
+	XLogRecordBuffer buf;
+	pg_crc32 next_crc;
+
+	/* ----------------------------------------
+	 * state machine variables
+	 * ----------------------------------------
+	 */
+
+	bool initialized;
+
+	/* are we currently reading a record? */
+	bool in_record;
+
+	/* are we currently reading a record header? */
+	bool in_record_header;
+
+	/* do we want to reassemble the record or just read/write it? */
+	bool do_reassemble_record;
+
+	/* how many bkp blocks remain to be read? */
+	int in_bkp_blocks;
+
+	/*
+	 * the header of a bkp block can be split across pages, so we need to
+	 * support reading that incrementally
+	 */
+	bool in_bkp_block_header;
+
+	/*
+	 * We are not interested in the contents of the `remaining_size` next
+	 * blocks. Don't read their contents and write out zeroes instead.
+	 */
+	bool in_skip;
+
+	/*
+	 * Should we check the crc of the currently read record? In some situations
+	 * - e.g. if we just skip till the start of a record - this doesn't make
+	 * sense.
+	 *
+	 * This needs to be separate from in_skip because we want to be able not
+	 * writeout records but still verify those. E.g. records that are "not
+	 * interesting".
+	 */
+	bool check_crc;
+
+	/* how much more to read in the current state */
+	uint32 remaining_size;
+
+	/* size of already written data */
+	Size already_written_size;
+
+} XLogReaderState;
+
+/*
+ * Get a new XLogReader
+ *
+ * At least the read_page callback, startptr and endptr have to be set before
+ * the reader can be used.
+ */
+extern XLogReaderState* XLogReaderAllocate(void);
+
+/*
+ * Free an XLogReader
+ */
+extern void XLogReaderFree(XLogReaderState*);
+
+/*
+ * Reset internal state so it can be used without continuing from the last
+ * state.
+ *
+ * The callbacks and private_data won't be reset
+ */
+extern void XLogReaderReset(XLogReaderState* state);
+
+/*
+ * Read the xlog and call the appropriate callbacks as far as possible within
+ * the constraints of input data (startptr, endptr) and output space.
+ */
+extern void XLogReaderRead(XLogReaderState* state);
+
+/*
+ * Read the next xlog record if enough input/output is available.
+ *
+ * This is a bit less efficient than XLogReaderRead.
+ *
+ * Returns NULL if the next record couldn't be read for some reason. Check
+ * state->incomplete, ->needs_input, ->needs_output.
+ *
+ * Be careful to check that there is anything further to read when using
+ * ->endptr, otherwise its easy to get in an endless loop.
+ */
+extern XLogRecordBuffer* XLogReaderReadOne(XLogReaderState* state);
+
+#endif /* READXLOG_H */
