diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index f82f10e..660b5fc 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -13,7 +13,7 @@ top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = clog.o transam.o varsup.o xact.o rmgr.o slru.o subtrans.o multixact.o \
-	twophase.o twophase_rmgr.o xlog.o xlogfuncs.o xlogutils.o
+	twophase.o twophase_rmgr.o xlog.o xlogfuncs.o xlogreader.o xlogutils.o
 
 include $(top_srcdir)/src/backend/common.mk
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index ff56c26..769ddea 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -31,6 +31,7 @@
 #include "access/twophase.h"
 #include "access/xact.h"
 #include "access/xlog_internal.h"
+#include "access/xlogreader.h"
 #include "access/xlogutils.h"
 #include "catalog/catversion.h"
 #include "catalog/pg_control.h"
@@ -541,6 +542,8 @@ static uint32 readOff = 0;
 static uint32 readLen = 0;
 static int	readSource = 0;		/* XLOG_FROM_* code */
 
+static bool fetching_ckpt_global;
+
 /*
  * Keeps track of which sources we've tried to read the current WAL
  * record from and failed.
@@ -556,13 +559,6 @@ static int	failedSources = 0;	/* OR of XLOG_FROM_* codes */
 static TimestampTz XLogReceiptTime = 0;
 static int	XLogReceiptSource = 0;		/* XLOG_FROM_* code */
 
-/* Buffer for currently read page (XLOG_BLCKSZ bytes) */
-static char *readBuf = NULL;
-
-/* Buffer for current ReadRecord result (expandable) */
-static char *readRecordBuf = NULL;
-static uint32 readRecordBufSize = 0;
-
 /* State information for XLOG reading */
 static XLogRecPtr ReadRecPtr;	/* start of last record read */
 static XLogRecPtr EndRecPtr;	/* end+1 of last record read */
@@ -632,9 +628,8 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
 static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
 			 int source, bool notexistOk);
 static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int sources);
-static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
-			 bool randAccess);
-static int	emode_for_corrupt_record(int emode, XLogRecPtr RecPtr);
+static bool XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr RecPtr,
+			 int emode, bool randAccess, char *reaBuf, void *private_data);
 static void XLogFileClose(void);
 static bool RestoreArchivedFile(char *path, const char *xlogfname,
 					const char *recovername, off_t expectedSize);
@@ -646,12 +641,10 @@ static void UpdateLastRemovedPtr(char *filename);
 static void ValidateXLOGDirectoryStructure(void);
 static void CleanupBackupHistory(void);
 static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force);
-static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt);
-static void CheckRecoveryConsistency(void);
+static XLogRecord *ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode, bool fetching_ckpt);
+static void CheckRecoveryConsistency(XLogRecPtr EndRecPtr);
 static bool ValidXLogPageHeader(XLogPageHeader hdr, int emode);
-static bool ValidXLogRecordHeader(XLogRecPtr *RecPtr, XLogRecord *record,
-					  int emode, bool randAccess);
-static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt);
+static XLogRecord *ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int whichChkpt);
 static List *readTimeLineHistory(TimeLineID targetTLI);
 static bool existsTimeLineHistory(TimeLineID probeTLI);
 static bool rescanLatestTimeLine(void);
@@ -3703,102 +3696,6 @@ RestoreBkpBlocks(XLogRecPtr lsn, XLogRecord *record, bool cleanup)
 }
 
 /*
- * CRC-check an XLOG record.  We do not believe the contents of an XLOG
- * record (other than to the minimal extent of computing the amount of
- * data to read in) until we've checked the CRCs.
- *
- * We assume all of the record (that is, xl_tot_len bytes) has been read
- * into memory at *record.  Also, ValidXLogRecordHeader() has accepted the
- * record's header, which means in particular that xl_tot_len is at least
- * SizeOfXlogRecord, so it is safe to fetch xl_len.
- */
-static bool
-RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
-{
-	pg_crc32	crc;
-	int			i;
-	uint32		len = record->xl_len;
-	BkpBlock	bkpb;
-	char	   *blk;
-	size_t		remaining = record->xl_tot_len;
-
-	/* First the rmgr data */
-	if (remaining < SizeOfXLogRecord + len)
-	{
-		/* ValidXLogRecordHeader() should've caught this already... */
-		ereport(emode_for_corrupt_record(emode, recptr),
-				(errmsg("invalid record length at %X/%X",
-						(uint32) (recptr >> 32), (uint32) recptr)));
-		return false;
-	}
-	remaining -= SizeOfXLogRecord + len;
-	INIT_CRC32(crc);
-	COMP_CRC32(crc, XLogRecGetData(record), len);
-
-	/* Add in the backup blocks, if any */
-	blk = (char *) XLogRecGetData(record) + len;
-	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
-	{
-		uint32		blen;
-
-		if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
-			continue;
-
-		if (remaining < sizeof(BkpBlock))
-		{
-			ereport(emode_for_corrupt_record(emode, recptr),
-					(errmsg("invalid backup block size in record at %X/%X",
-							(uint32) (recptr >> 32), (uint32) recptr)));
-			return false;
-		}
-		memcpy(&bkpb, blk, sizeof(BkpBlock));
-
-		if (bkpb.hole_offset + bkpb.hole_length > BLCKSZ)
-		{
-			ereport(emode_for_corrupt_record(emode, recptr),
-					(errmsg("incorrect hole size in record at %X/%X",
-							(uint32) (recptr >> 32), (uint32) recptr)));
-			return false;
-		}
-		blen = sizeof(BkpBlock) + BLCKSZ - bkpb.hole_length;
-
-		if (remaining < blen)
-		{
-			ereport(emode_for_corrupt_record(emode, recptr),
-					(errmsg("invalid backup block size in record at %X/%X",
-							(uint32) (recptr >> 32), (uint32) recptr)));
-			return false;
-		}
-		remaining -= blen;
-		COMP_CRC32(crc, blk, blen);
-		blk += blen;
-	}
-
-	/* Check that xl_tot_len agrees with our calculation */
-	if (remaining != 0)
-	{
-		ereport(emode_for_corrupt_record(emode, recptr),
-				(errmsg("incorrect total length in record at %X/%X",
-						(uint32) (recptr >> 32), (uint32) recptr)));
-		return false;
-	}
-
-	/* Finally include the record header */
-	COMP_CRC32(crc, (char *) record, offsetof(XLogRecord, xl_crc));
-	FIN_CRC32(crc);
-
-	if (!EQ_CRC32(record->xl_crc, crc))
-	{
-		ereport(emode_for_corrupt_record(emode, recptr),
-		(errmsg("incorrect resource manager data checksum in record at %X/%X",
-				(uint32) (recptr >> 32), (uint32) recptr)));
-		return false;
-	}
-
-	return true;
-}
-
-/*
  * Attempt to read an XLOG record.
  *
  * If RecPtr is not NULL, try to read a record at that position.  Otherwise
@@ -3811,290 +3708,35 @@ RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
  * the returned record pointer always points there.
  */
 static XLogRecord *
-ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt)
+ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode, bool fetching_ckpt)
 {
 	XLogRecord *record;
-	XLogRecPtr	tmpRecPtr = EndRecPtr;
-	bool		randAccess = false;
-	uint32		len,
-				total_len;
-	uint32		targetRecOff;
-	uint32		pageHeaderSize;
-	bool		gotheader;
-
-	if (readBuf == NULL)
-	{
-		/*
-		 * First time through, permanently allocate readBuf.  We do it this
-		 * way, rather than just making a static array, for two reasons: (1)
-		 * no need to waste the storage in most instantiations of the backend;
-		 * (2) a static char array isn't guaranteed to have any particular
-		 * alignment, whereas malloc() will provide MAXALIGN'd storage.
-		 */
-		readBuf = (char *) malloc(XLOG_BLCKSZ);
-		Assert(readBuf != NULL);
-	}
-
-	if (RecPtr == NULL)
-	{
-		RecPtr = &tmpRecPtr;
 
-		/*
-		 * RecPtr is pointing to end+1 of the previous WAL record.  If
-		 * we're at a page boundary, no more records can fit on the current
-		 * page. We must skip over the page header, but we can't do that
-		 * until we've read in the page, since the header size is variable.
-		 */
-	}
-	else
-	{
-		/*
-		 * In this case, the passed-in record pointer should already be
-		 * pointing to a valid record starting position.
-		 */
-		if (!XRecOffIsValid(*RecPtr))
-			ereport(PANIC,
-					(errmsg("invalid record offset at %X/%X",
-							(uint32) (*RecPtr >> 32), (uint32) *RecPtr)));
-
-		/*
-		 * Since we are going to a random position in WAL, forget any prior
-		 * state about what timeline we were in, and allow it to be any
-		 * timeline in expectedTLIs.  We also set a flag to allow curFileTLI
-		 * to go backwards (but we can't reset that variable right here, since
-		 * we might not change files at all).
-		 */
+	if (!XLogRecPtrIsInvalid(RecPtr))
 		lastPageTLI = 0;		/* see comment in ValidXLogPageHeader */
-		randAccess = true;		/* allow curFileTLI to go backwards too */
-	}
+
+	fetching_ckpt_global = fetching_ckpt;
 
 	/* This is the first try to read this page. */
 	failedSources = 0;
-retry:
-	/* Read the page containing the record */
-	if (!XLogPageRead(RecPtr, emode, fetching_ckpt, randAccess))
-		return NULL;
-
-	pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
-	targetRecOff = (*RecPtr) % XLOG_BLCKSZ;
-	if (targetRecOff == 0)
+	do
 	{
-		/*
-		 * At page start, so skip over page header.  The Assert checks that
-		 * we're not scribbling on caller's record pointer; it's OK because we
-		 * can only get here in the continuing-from-prev-record case, since
-		 * XRecOffIsValid rejected the zero-page-offset case otherwise.
-		 */
-		Assert(RecPtr == &tmpRecPtr);
-		(*RecPtr) += pageHeaderSize;
-		targetRecOff = pageHeaderSize;
-	}
-	else if (targetRecOff < pageHeaderSize)
-	{
-		ereport(emode_for_corrupt_record(emode, *RecPtr),
-				(errmsg("invalid record offset at %X/%X",
-						(uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
-		goto next_record_is_invalid;
-	}
-	if ((((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
-		targetRecOff == pageHeaderSize)
-	{
-		ereport(emode_for_corrupt_record(emode, *RecPtr),
-				(errmsg("contrecord is requested by %X/%X",
-						(uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
-		goto next_record_is_invalid;
-	}
-
-	/*
-	 * Read the record length.
-	 *
-	 * NB: Even though we use an XLogRecord pointer here, the whole record
-	 * header might not fit on this page. xl_tot_len is the first field of
-	 * the struct, so it must be on this page (the records are MAXALIGNed),
-	 * but we cannot access any other fields until we've verified that we
-	 * got the whole header.
-	 */
-	record = (XLogRecord *) (readBuf + (*RecPtr) % XLOG_BLCKSZ);
-	total_len = record->xl_tot_len;
-
-	/*
-	 * If the whole record header is on this page, validate it immediately.
-	 * Otherwise do just a basic sanity check on xl_tot_len, and validate the
-	 * rest of the header after reading it from the next page.  The xl_tot_len
-	 * check is necessary here to ensure that we enter the "Need to reassemble
-	 * record" code path below; otherwise we might fail to apply
-	 * ValidXLogRecordHeader at all.
-	 */
-	if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
-	{
-		if (!ValidXLogRecordHeader(RecPtr, record, emode, randAccess))
-			goto next_record_is_invalid;
-		gotheader = true;
-	}
-	else
-	{
-		if (total_len < SizeOfXLogRecord)
+		record = XLogReadRecord(xlogreader, RecPtr, emode);
+		ReadRecPtr = xlogreader->ReadRecPtr;
+		EndRecPtr = xlogreader->EndRecPtr;
+		if (record == NULL)
 		{
-			ereport(emode_for_corrupt_record(emode, *RecPtr),
-					(errmsg("invalid record length at %X/%X",
-							(uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
-			goto next_record_is_invalid;
-		}
-		gotheader = false;
-	}
-
-	/*
-	 * Allocate or enlarge readRecordBuf as needed.  To avoid useless small
-	 * increases, round its size to a multiple of XLOG_BLCKSZ, and make sure
-	 * it's at least 4*Max(BLCKSZ, XLOG_BLCKSZ) to start with.  (That is
-	 * enough for all "normal" records, but very large commit or abort records
-	 * might need more space.)
-	 */
-	if (total_len > readRecordBufSize)
-	{
-		uint32		newSize = total_len;
+			failedSources |= readSource;
 
-		newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
-		newSize = Max(newSize, 4 * Max(BLCKSZ, XLOG_BLCKSZ));
-		if (readRecordBuf)
-			free(readRecordBuf);
-		readRecordBuf = (char *) malloc(newSize);
-		if (!readRecordBuf)
-		{
-			readRecordBufSize = 0;
-			/* We treat this as a "bogus data" condition */
-			ereport(emode_for_corrupt_record(emode, *RecPtr),
-					(errmsg("record length %u at %X/%X too long",
-							total_len, (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
-			goto next_record_is_invalid;
-		}
-		readRecordBufSize = newSize;
-	}
-
-	len = XLOG_BLCKSZ - (*RecPtr) % XLOG_BLCKSZ;
-	if (total_len > len)
-	{
-		/* Need to reassemble record */
-		char	   *contrecord;
-		XLogPageHeader pageHeader;
-		XLogRecPtr	pagelsn;
-		char	   *buffer;
-		uint32		gotlen;
-
-		/* Initialize pagelsn to the beginning of the page this record is on */
-		pagelsn = ((*RecPtr) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
-
-		/* Copy the first fragment of the record from the first page. */
-		memcpy(readRecordBuf, readBuf + (*RecPtr) % XLOG_BLCKSZ, len);
-		buffer = readRecordBuf + len;
-		gotlen = len;
-
-		do
-		{
-			/* Calculate pointer to beginning of next page */
-			XLByteAdvance(pagelsn, XLOG_BLCKSZ);
-			/* Wait for the next page to become available */
-			if (!XLogPageRead(&pagelsn, emode, false, false))
-				return NULL;
-
-			/* Check that the continuation on next page looks valid */
-			pageHeader = (XLogPageHeader) readBuf;
-			if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
-			{
-				ereport(emode_for_corrupt_record(emode, *RecPtr),
-						(errmsg("there is no contrecord flag in log segment %s, offset %u",
-								XLogFileNameP(curFileTLI, readSegNo),
-								readOff)));
-				goto next_record_is_invalid;
-			}
-			/*
-			 * Cross-check that xlp_rem_len agrees with how much of the record
-			 * we expect there to be left.
-			 */
-			if (pageHeader->xlp_rem_len == 0 ||
-				total_len != (pageHeader->xlp_rem_len + gotlen))
+			if (readFile >= 0)
 			{
-				ereport(emode_for_corrupt_record(emode, *RecPtr),
-						(errmsg("invalid contrecord length %u in log segment %s, offset %u",
-								pageHeader->xlp_rem_len,
-								XLogFileNameP(curFileTLI, readSegNo),
-								readOff)));
-				goto next_record_is_invalid;
+				close(readFile);
+				readFile = -1;
 			}
+		}
+	} while(StandbyMode && record == NULL);
 
-			/* Append the continuation from this page to the buffer */
-			pageHeaderSize = XLogPageHeaderSize(pageHeader);
-			contrecord = (char *) readBuf + pageHeaderSize;
-			len = XLOG_BLCKSZ - pageHeaderSize;
-			if (pageHeader->xlp_rem_len < len)
-				len = pageHeader->xlp_rem_len;
-			memcpy(buffer, (char *) contrecord, len);
-			buffer += len;
-			gotlen += len;
-
-			/* If we just reassembled the record header, validate it. */
-			if (!gotheader)
-			{
-				record = (XLogRecord *) readRecordBuf;
-				if (!ValidXLogRecordHeader(RecPtr, record, emode, randAccess))
-					goto next_record_is_invalid;
-				gotheader = true;
-			}
-		} while (pageHeader->xlp_rem_len > len);
-
-		record = (XLogRecord *) readRecordBuf;
-		if (!RecordIsValid(record, *RecPtr, emode))
-			goto next_record_is_invalid;
-		pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
-		XLogSegNoOffsetToRecPtr(
-			readSegNo,
-			readOff + pageHeaderSize + MAXALIGN(pageHeader->xlp_rem_len),
-			EndRecPtr);
-		ReadRecPtr = *RecPtr;
-	}
-	else
-	{
-		/* Record does not cross a page boundary */
-		if (!RecordIsValid(record, *RecPtr, emode))
-			goto next_record_is_invalid;
-		EndRecPtr = *RecPtr + MAXALIGN(total_len);
-
-		ReadRecPtr = *RecPtr;
-		memcpy(readRecordBuf, record, total_len);
-	}
-
-	/*
-	 * Special processing if it's an XLOG SWITCH record
-	 */
-	if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
-	{
-		/* Pretend it extends to end of segment */
-		EndRecPtr += XLogSegSize - 1;
-		EndRecPtr -= EndRecPtr % XLogSegSize;
-
-		/*
-		 * Pretend that readBuf contains the last page of the segment. This is
-		 * just to avoid Assert failure in StartupXLOG if XLOG ends with this
-		 * segment.
-		 */
-		readOff = XLogSegSize - XLOG_BLCKSZ;
-	}
 	return record;
-
-next_record_is_invalid:
-	failedSources |= readSource;
-
-	if (readFile >= 0)
-	{
-		close(readFile);
-		readFile = -1;
-	}
-
-	/* In standby-mode, keep trying */
-	if (StandbyMode)
-		goto retry;
-	else
-		return NULL;
 }
 
 /*
@@ -4223,88 +3865,6 @@ ValidXLogPageHeader(XLogPageHeader hdr, int emode)
 }
 
 /*
- * Validate an XLOG record header.
- *
- * This is just a convenience subroutine to avoid duplicated code in
- * ReadRecord.	It's not intended for use from anywhere else.
- */
-static bool
-ValidXLogRecordHeader(XLogRecPtr *RecPtr, XLogRecord *record, int emode,
-					  bool randAccess)
-{
-	/*
-	 * xl_len == 0 is bad data for everything except XLOG SWITCH, where it is
-	 * required.
-	 */
-	if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
-	{
-		if (record->xl_len != 0)
-		{
-			ereport(emode_for_corrupt_record(emode, *RecPtr),
-					(errmsg("invalid xlog switch record at %X/%X",
-							(uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
-			return false;
-		}
-	}
-	else if (record->xl_len == 0)
-	{
-		ereport(emode_for_corrupt_record(emode, *RecPtr),
-				(errmsg("record with zero length at %X/%X",
-						(uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
-		return false;
-	}
-	if (record->xl_tot_len < SizeOfXLogRecord + record->xl_len ||
-		record->xl_tot_len > SizeOfXLogRecord + record->xl_len +
-		XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
-	{
-		ereport(emode_for_corrupt_record(emode, *RecPtr),
-				(errmsg("invalid record length at %X/%X",
-						(uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
-		return false;
-	}
-	if (record->xl_rmid > RM_MAX_ID)
-	{
-		ereport(emode_for_corrupt_record(emode, *RecPtr),
-				(errmsg("invalid resource manager ID %u at %X/%X",
-						record->xl_rmid, (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
-		return false;
-	}
-	if (randAccess)
-	{
-		/*
-		 * We can't exactly verify the prev-link, but surely it should be less
-		 * than the record's own address.
-		 */
-		if (!XLByteLT(record->xl_prev, *RecPtr))
-		{
-			ereport(emode_for_corrupt_record(emode, *RecPtr),
-					(errmsg("record with incorrect prev-link %X/%X at %X/%X",
-							(uint32) (record->xl_prev >> 32), (uint32) record->xl_prev,
-							(uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
-			return false;
-		}
-	}
-	else
-	{
-		/*
-		 * Record's prev-link should exactly match our previous location. This
-		 * check guards against torn WAL pages where a stale but valid-looking
-		 * WAL record starts on a sector boundary.
-		 */
-		if (!XLByteEQ(record->xl_prev, ReadRecPtr))
-		{
-			ereport(emode_for_corrupt_record(emode, *RecPtr),
-					(errmsg("record with incorrect prev-link %X/%X at %X/%X",
-							(uint32) (record->xl_prev >> 32), (uint32) record->xl_prev,
-							(uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
-			return false;
-		}
-	}
-
-	return true;
-}
-
-/*
  * Try to read a timeline's history file.
  *
  * If successful, return the list of component TLIs (the given TLI followed by
@@ -6089,6 +5649,7 @@ StartupXLOG(void)
 	bool		backupEndRequired = false;
 	bool		backupFromStandby = false;
 	DBState		dbstate_at_startup;
+	XLogReaderState *xlogreader;
 
 	/*
 	 * Read control file and check XLOG status looks valid.
@@ -6222,6 +5783,8 @@ StartupXLOG(void)
 	if (StandbyMode)
 		OwnLatch(&XLogCtl->recoveryWakeupLatch);
 
+	xlogreader = XLogReaderAllocate(InvalidXLogRecPtr, &XLogPageRead, NULL);
+
 	if (read_backup_label(&checkPointLoc, &backupEndRequired,
 						  &backupFromStandby))
 	{
@@ -6229,7 +5792,7 @@ StartupXLOG(void)
 		 * When a backup_label file is present, we want to roll forward from
 		 * the checkpoint it identifies, rather than using pg_control.
 		 */
-		record = ReadCheckpointRecord(checkPointLoc, 0);
+		record = ReadCheckpointRecord(xlogreader, checkPointLoc, 0);
 		if (record != NULL)
 		{
 			memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
@@ -6247,7 +5810,7 @@ StartupXLOG(void)
 			 */
 			if (XLByteLT(checkPoint.redo, checkPointLoc))
 			{
-				if (!ReadRecord(&(checkPoint.redo), LOG, false))
+				if (!ReadRecord(xlogreader, checkPoint.redo, LOG, false))
 					ereport(FATAL,
 							(errmsg("could not find redo location referenced by checkpoint record"),
 							 errhint("If you are not restoring from a backup, try removing the file \"%s/backup_label\".", DataDir)));
@@ -6271,7 +5834,7 @@ StartupXLOG(void)
 		 */
 		checkPointLoc = ControlFile->checkPoint;
 		RedoStartLSN = ControlFile->checkPointCopy.redo;
-		record = ReadCheckpointRecord(checkPointLoc, 1);
+		record = ReadCheckpointRecord(xlogreader, checkPointLoc, 1);
 		if (record != NULL)
 		{
 			ereport(DEBUG1,
@@ -6290,7 +5853,7 @@ StartupXLOG(void)
 		else
 		{
 			checkPointLoc = ControlFile->prevCheckPoint;
-			record = ReadCheckpointRecord(checkPointLoc, 2);
+			record = ReadCheckpointRecord(xlogreader, checkPointLoc, 2);
 			if (record != NULL)
 			{
 				ereport(LOG,
@@ -6591,7 +6154,7 @@ StartupXLOG(void)
 		 * Allow read-only connections immediately if we're consistent
 		 * already.
 		 */
-		CheckRecoveryConsistency();
+		CheckRecoveryConsistency(EndRecPtr);
 
 		/*
 		 * Find the first record that logically follows the checkpoint --- it
@@ -6600,12 +6163,12 @@ StartupXLOG(void)
 		if (XLByteLT(checkPoint.redo, RecPtr))
 		{
 			/* back up to find the record */
-			record = ReadRecord(&(checkPoint.redo), PANIC, false);
+			record = ReadRecord(xlogreader, checkPoint.redo, PANIC, false);
 		}
 		else
 		{
 			/* just have to read next record after CheckPoint */
-			record = ReadRecord(NULL, LOG, false);
+			record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
 		}
 
 		if (record != NULL)
@@ -6652,7 +6215,7 @@ StartupXLOG(void)
 				HandleStartupProcInterrupts();
 
 				/* Allow read-only connections if we're consistent now */
-				CheckRecoveryConsistency();
+				CheckRecoveryConsistency(EndRecPtr);
 
 				/*
 				 * Have we reached our recovery target?
@@ -6756,7 +6319,7 @@ StartupXLOG(void)
 
 				LastRec = ReadRecPtr;
 
-				record = ReadRecord(NULL, LOG, false);
+				record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
 			} while (record != NULL && recoveryContinue);
 
 			/*
@@ -6806,7 +6369,7 @@ StartupXLOG(void)
 	 * Re-fetch the last valid or last applied record, so we can identify the
 	 * exact endpoint of what we consider the valid portion of WAL.
 	 */
-	record = ReadRecord(&LastRec, PANIC, false);
+	record = ReadRecord(xlogreader, LastRec, PANIC, false);
 	EndOfLog = EndRecPtr;
 	XLByteToPrevSeg(EndOfLog, endLogSegNo);
 
@@ -6905,8 +6468,15 @@ StartupXLOG(void)
 	 * record spans, not the one it starts in.	The last block is indeed the
 	 * one we want to use.
 	 */
-	Assert(readOff == (XLogCtl->xlblocks[0] - XLOG_BLCKSZ) % XLogSegSize);
-	memcpy((char *) Insert->currpage, readBuf, XLOG_BLCKSZ);
+	if (EndOfLog % XLOG_BLCKSZ == 0)
+	{
+		memset(Insert->currpage, 0, XLOG_BLCKSZ);
+	}
+	else
+	{
+		Assert(readOff == (XLogCtl->xlblocks[0] - XLOG_BLCKSZ) % XLogSegSize);
+		memcpy((char *) Insert->currpage, xlogreader->readBuf, XLOG_BLCKSZ);
+	}
 	Insert->currpos = (char *) Insert->currpage +
 		(EndOfLog + XLOG_BLCKSZ - XLogCtl->xlblocks[0]);
 
@@ -7063,17 +6633,7 @@ StartupXLOG(void)
 		close(readFile);
 		readFile = -1;
 	}
-	if (readBuf)
-	{
-		free(readBuf);
-		readBuf = NULL;
-	}
-	if (readRecordBuf)
-	{
-		free(readRecordBuf);
-		readRecordBuf = NULL;
-		readRecordBufSize = 0;
-	}
+	XLogReaderFree(xlogreader);
 
 	/*
 	 * If any of the critical GUCs have changed, log them before we allow
@@ -7104,7 +6664,7 @@ StartupXLOG(void)
  * that it can start accepting read-only connections.
  */
 static void
-CheckRecoveryConsistency(void)
+CheckRecoveryConsistency(XLogRecPtr EndRecPtr)
 {
 	/*
 	 * During crash recovery, we don't reach a consistent state until we've
@@ -7284,7 +6844,7 @@ LocalSetXLogInsertAllowed(void)
  * 1 for "primary", 2 for "secondary", 0 for "other" (backup_label)
  */
 static XLogRecord *
-ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt)
+ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int whichChkpt)
 {
 	XLogRecord *record;
 
@@ -7308,7 +6868,7 @@ ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt)
 		return NULL;
 	}
 
-	record = ReadRecord(&RecPtr, LOG, true);
+	record = ReadRecord(xlogreader, RecPtr, LOG, true);
 
 	if (record == NULL)
 	{
@@ -10100,19 +9660,21 @@ CancelBackup(void)
  * sleep and retry.
  */
 static bool
-XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
-			 bool randAccess)
+XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
+			 bool randAccess, char *readBuf, void *private_data)
 {
+	/* TODO: these, and fetching_ckpt, would be better in private_data */
 	static XLogRecPtr receivedUpto = 0;
+	static pg_time_t last_fail_time = 0;
+	bool		fetching_ckpt = fetching_ckpt_global;
 	bool		switched_segment = false;
 	uint32		targetPageOff;
 	uint32		targetRecOff;
 	XLogSegNo	targetSegNo;
-	static pg_time_t last_fail_time = 0;
 
-	XLByteToSeg(*RecPtr, targetSegNo);
-	targetPageOff = (((*RecPtr) % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
-	targetRecOff = (*RecPtr) % XLOG_BLCKSZ;
+	XLByteToSeg(RecPtr, targetSegNo);
+	targetPageOff = ((RecPtr % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
+	targetRecOff = RecPtr % XLOG_BLCKSZ;
 
 	/* Fast exit if we have read the record in the current buffer already */
 	if (failedSources == 0 && targetSegNo == readSegNo &&
@@ -10123,7 +9685,7 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
 	 * See if we need to switch to a new segment because the requested record
 	 * is not in the currently open one.
 	 */
-	if (readFile >= 0 && !XLByteInSeg(*RecPtr, readSegNo))
+	if (readFile >= 0 && !XLByteInSeg(RecPtr, readSegNo))
 	{
 		/*
 		 * Request a restartpoint if we've replayed too much xlog since the
@@ -10144,12 +9706,12 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
 		readSource = 0;
 	}
 
-	XLByteToSeg(*RecPtr, readSegNo);
+	XLByteToSeg(RecPtr, readSegNo);
 
 retry:
 	/* See if we need to retrieve more data */
 	if (readFile < 0 ||
-		(readSource == XLOG_FROM_STREAM && !XLByteLT(*RecPtr, receivedUpto)))
+		(readSource == XLOG_FROM_STREAM && !XLByteLT(RecPtr, receivedUpto)))
 	{
 		if (StandbyMode)
 		{
@@ -10192,17 +9754,17 @@ retry:
 					 * XLogReceiptTime will not advance, so the grace time
 					 * alloted to conflicting queries will decrease.
 					 */
-					if (XLByteLT(*RecPtr, receivedUpto))
+					if (XLByteLT(RecPtr, receivedUpto))
 						havedata = true;
 					else
 					{
 						XLogRecPtr	latestChunkStart;
 
 						receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart);
-						if (XLByteLT(*RecPtr, receivedUpto))
+						if (XLByteLT(RecPtr, receivedUpto))
 						{
 							havedata = true;
-							if (!XLByteLT(*RecPtr, latestChunkStart))
+							if (!XLByteLT(RecPtr, latestChunkStart))
 							{
 								XLogReceiptTime = GetCurrentTimestamp();
 								SetCurrentChunkStartTime(XLogReceiptTime);
@@ -10321,7 +9883,7 @@ retry:
 						if (PrimaryConnInfo)
 						{
 							RequestXLogStreaming(
-									  fetching_ckpt ? RedoStartLSN : *RecPtr,
+									  fetching_ckpt ? RedoStartLSN : RecPtr,
 												 PrimaryConnInfo);
 							continue;
 						}
@@ -10393,7 +9955,7 @@ retry:
 	 */
 	if (readSource == XLOG_FROM_STREAM)
 	{
-		if (((*RecPtr) / XLOG_BLCKSZ) != (receivedUpto / XLOG_BLCKSZ))
+		if (((RecPtr) / XLOG_BLCKSZ) != (receivedUpto / XLOG_BLCKSZ))
 		{
 			readLen = XLOG_BLCKSZ;
 		}
@@ -10417,7 +9979,7 @@ retry:
 		{
 			char fname[MAXFNAMELEN];
 			XLogFileName(fname, curFileTLI, readSegNo);
-			ereport(emode_for_corrupt_record(emode, *RecPtr),
+			ereport(emode_for_corrupt_record(emode, RecPtr),
 					(errcode_for_file_access(),
 					 errmsg("could not read from log segment %s, offset %u: %m",
 							fname, readOff)));
@@ -10433,7 +9995,7 @@ retry:
 	{
 		char fname[MAXFNAMELEN];
 		XLogFileName(fname, curFileTLI, readSegNo);
-		ereport(emode_for_corrupt_record(emode, *RecPtr),
+		ereport(emode_for_corrupt_record(emode, RecPtr),
 				(errcode_for_file_access(),
 		 errmsg("could not seek in log segment %s to offset %u: %m",
 				fname, readOff)));
@@ -10443,7 +10005,7 @@ retry:
 	{
 		char fname[MAXFNAMELEN];
 		XLogFileName(fname, curFileTLI, readSegNo);
-		ereport(emode_for_corrupt_record(emode, *RecPtr),
+		ereport(emode_for_corrupt_record(emode, RecPtr),
 				(errcode_for_file_access(),
 		 errmsg("could not read from log segment %s, offset %u: %m",
 				fname, readOff)));
@@ -10501,7 +10063,7 @@ triggered:
  * you are about to ereport(), or you might cause a later message to be
  * erroneously suppressed.
  */
-static int
+int
 emode_for_corrupt_record(int emode, XLogRecPtr RecPtr)
 {
 	static XLogRecPtr lastComplaint = 0;
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
new file mode 100644
index 0000000..8ba05b1
--- /dev/null
+++ b/src/backend/access/transam/xlogreader.c
@@ -0,0 +1,496 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogreader.c
+ *		Generic xlog reading facility
+ *
+ * Portions Copyright (c) 2012, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		src/backend/access/transam/xlogreader.c
+ *
+ * NOTES
+ *		Documentation about how do use this interface can be found in
+ *		xlogreader.h, more specifically in the definition of the
+ *		XLogReaderState struct where all parameters are documented.
+ *
+ * TODO:
+ * * usable without backend code around
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/transam.h"
+#include "access/xlog_internal.h"
+#include "access/xlogreader.h"
+#include "catalog/pg_control.h"
+
+static bool ValidXLogRecordHeader(XLogRecPtr RecPtr, XLogRecPtr PrevRecPtr,
+					  XLogRecord *record, int emode, bool randAccess);
+static bool RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode);
+
+/*
+ * Initialize a new xlog reader
+ */
+XLogReaderState *
+XLogReaderAllocate(XLogRecPtr startpoint,
+				   XLogPageReadCB pagereadfunc, void *private_data)
+{
+	XLogReaderState *state;
+
+	state = (XLogReaderState *) palloc0(sizeof(XLogReaderState));
+
+	/*
+	 * First time through, permanently allocate readBuf.  We do it this
+	 * way, rather than just making a static array, for two reasons: (1)
+	 * no need to waste the storage in most instantiations of the backend;
+	 * (2) a static char array isn't guaranteed to have any particular
+	 * alignment, whereas malloc() will provide MAXALIGN'd storage.
+	 */
+	state->readBuf = (char *) palloc(XLOG_BLCKSZ);
+
+	state->read_page = pagereadfunc;
+	state->private_data = private_data;
+	state->EndRecPtr = startpoint;
+
+	return state;
+}
+
+void
+XLogReaderFree(XLogReaderState *state)
+{
+	if (state->readRecordBuf)
+		pfree(state->readRecordBuf);
+	pfree(state->readBuf);
+	pfree(state);
+}
+
+/*
+ * Attempt to read an XLOG record.
+ *
+ * If RecPtr is not NULL, try to read a record at that position.  Otherwise
+ * try to read a record just after the last one previously read.
+ *
+ * If no valid record is available, returns NULL, or fails if emode is PANIC.
+ * (emode must be either PANIC, LOG)
+ *
+ * The record is copied into readRecordBuf, so that on successful return,
+ * the returned record pointer always points there.
+ */
+XLogRecord *
+XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, int emode)
+{
+	XLogRecord *record;
+	XLogRecPtr	tmpRecPtr = state->EndRecPtr;
+	bool		randAccess = false;
+	uint32		len,
+				total_len;
+	uint32		targetRecOff;
+	uint32		pageHeaderSize;
+	bool		gotheader;
+
+	if (RecPtr == InvalidXLogRecPtr)
+	{
+		RecPtr = tmpRecPtr;
+
+		/*
+		 * RecPtr is pointing to end+1 of the previous WAL record.  If
+		 * we're at a page boundary, no more records can fit on the current
+		 * page. We must skip over the page header, but we can't do that
+		 * until we've read in the page, since the header size is variable.
+		 */
+	}
+	else
+	{
+		/*
+		 * In this case, the passed-in record pointer should already be
+		 * pointing to a valid record starting position.
+		 */
+		if (!XRecOffIsValid(RecPtr))
+			ereport(PANIC,
+					(errmsg("invalid record offset at %X/%X",
+							(uint32) (RecPtr >> 32), (uint32) RecPtr)));
+		randAccess = true;		/* allow curFileTLI to go backwards too */
+	}
+
+	/* Read the page containing the record */
+	if (!state->read_page(state, RecPtr, emode, randAccess, state->readBuf, state->private_data))
+		return NULL;
+
+	pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
+	targetRecOff = RecPtr % XLOG_BLCKSZ;
+	if (targetRecOff == 0)
+	{
+		/*
+		 * At page start, so skip over page header.  The Assert checks that
+		 * we're not scribbling on caller's record pointer; it's OK because we
+		 * can only get here in the continuing-from-prev-record case, since
+		 * XRecOffIsValid rejected the zero-page-offset case otherwise.
+		 * XXX: does this assert make sense now that RecPtr is not a pointer?
+		 */
+		Assert(RecPtr == tmpRecPtr);
+		RecPtr += pageHeaderSize;
+		targetRecOff = pageHeaderSize;
+	}
+	else if (targetRecOff < pageHeaderSize)
+	{
+		ereport(emode_for_corrupt_record(emode, RecPtr),
+				(errmsg("invalid record offset at %X/%X",
+						(uint32) (RecPtr >> 32), (uint32) RecPtr)));
+		goto next_record_is_invalid;
+	}
+	if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
+		targetRecOff == pageHeaderSize)
+	{
+		ereport(emode_for_corrupt_record(emode, RecPtr),
+				(errmsg("contrecord is requested by %X/%X",
+						(uint32) (RecPtr >> 32), (uint32) RecPtr)));
+		goto next_record_is_invalid;
+	}
+
+	/*
+	 * Read the record length.
+	 *
+	 * NB: Even though we use an XLogRecord pointer here, the whole record
+	 * header might not fit on this page. xl_tot_len is the first field of
+	 * the struct, so it must be on this page (the records are MAXALIGNed),
+	 * but we cannot access any other fields until we've verified that we
+	 * got the whole header.
+	 */
+	record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
+	total_len = record->xl_tot_len;
+
+	/*
+	 * If the whole record header is on this page, validate it immediately.
+	 * Otherwise do just a basic sanity check on xl_tot_len, and validate the
+	 * rest of the header after reading it from the next page.  The xl_tot_len
+	 * check is necessary here to ensure that we enter the "Need to reassemble
+	 * record" code path below; otherwise we might fail to apply
+	 * ValidXLogRecordHeader at all.
+	 */
+	if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
+	{
+		if (!ValidXLogRecordHeader(RecPtr, state->ReadRecPtr, record, emode, randAccess))
+			goto next_record_is_invalid;
+		gotheader = true;
+	}
+	else
+	{
+		if (total_len < SizeOfXLogRecord)
+		{
+			ereport(emode_for_corrupt_record(emode, RecPtr),
+					(errmsg("invalid record length at %X/%X",
+							(uint32) (RecPtr >> 32), (uint32) RecPtr)));
+			goto next_record_is_invalid;
+		}
+		gotheader = false;
+	}
+
+	/*
+	 * Allocate or enlarge readRecordBuf as needed.  To avoid useless small
+	 * increases, round its size to a multiple of XLOG_BLCKSZ, and make sure
+	 * it's at least 4*Max(BLCKSZ, XLOG_BLCKSZ) to start with.  (That is
+	 * enough for all "normal" records, but very large commit or abort records
+	 * might need more space.)
+	 */
+	if (total_len > state->readRecordBufSize)
+	{
+		uint32		newSize = total_len;
+
+		newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
+		newSize = Max(newSize, 4 * Max(BLCKSZ, XLOG_BLCKSZ));
+		if (state->readRecordBuf)
+			pfree(state->readRecordBuf);
+		state->readRecordBuf = (char *) palloc(newSize);
+		if (!state->readRecordBuf)
+		{
+			state->readRecordBufSize = 0;
+			/* We treat this as a "bogus data" condition */
+			ereport(emode_for_corrupt_record(emode, RecPtr),
+					(errmsg("record length %u at %X/%X too long",
+							total_len, (uint32) (RecPtr >> 32), (uint32) RecPtr)));
+			goto next_record_is_invalid;
+		}
+		state->readRecordBufSize = newSize;
+	}
+
+	len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
+	if (total_len > len)
+	{
+		/* Need to reassemble record */
+		char	   *contrecord;
+		XLogPageHeader pageHeader;
+		XLogRecPtr	pagelsn;
+		char	   *buffer;
+		uint32		gotlen;
+
+		/* Initialize pagelsn to the beginning of the page this record is on */
+		pagelsn = (RecPtr / XLOG_BLCKSZ) * XLOG_BLCKSZ;
+
+		/* Copy the first fragment of the record from the first page. */
+		memcpy(state->readRecordBuf, state->readBuf + RecPtr % XLOG_BLCKSZ, len);
+		buffer = state->readRecordBuf + len;
+		gotlen = len;
+
+		do
+		{
+			/* Calculate pointer to beginning of next page */
+			XLByteAdvance(pagelsn, XLOG_BLCKSZ);
+			/* Wait for the next page to become available */
+			if (!state->read_page(state, pagelsn, emode, false, state->readBuf, NULL))
+				return NULL;
+
+			/* Check that the continuation on next page looks valid */
+			pageHeader = (XLogPageHeader) state->readBuf;
+			if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
+			{
+				ereport(emode_for_corrupt_record(emode, RecPtr),
+						(errmsg("there is no contrecord flag at %X/%X",
+								(uint32) (RecPtr >> 32), (uint32) RecPtr)));
+				goto next_record_is_invalid;
+			}
+			/*
+			 * Cross-check that xlp_rem_len agrees with how much of the record
+			 * we expect there to be left.
+			 */
+			if (pageHeader->xlp_rem_len == 0 ||
+				total_len != (pageHeader->xlp_rem_len + gotlen))
+			{
+				ereport(emode_for_corrupt_record(emode, RecPtr),
+						(errmsg("invalid contrecord length %u at %X/%X",
+								pageHeader->xlp_rem_len,
+								(uint32) (RecPtr >> 32), (uint32) RecPtr)));
+				goto next_record_is_invalid;
+			}
+
+			/* Append the continuation from this page to the buffer */
+			pageHeaderSize = XLogPageHeaderSize(pageHeader);
+			contrecord = (char *) state->readBuf + pageHeaderSize;
+			len = XLOG_BLCKSZ - pageHeaderSize;
+			if (pageHeader->xlp_rem_len < len)
+				len = pageHeader->xlp_rem_len;
+			memcpy(buffer, (char *) contrecord, len);
+			buffer += len;
+			gotlen += len;
+
+			/* If we just reassembled the record header, validate it. */
+			if (!gotheader)
+			{
+				record = (XLogRecord *) state->readRecordBuf;
+				if (!ValidXLogRecordHeader(RecPtr, state->ReadRecPtr, record, emode, randAccess))
+					goto next_record_is_invalid;
+				gotheader = true;
+			}
+		} while (pageHeader->xlp_rem_len > len);
+
+		record = (XLogRecord *) state->readRecordBuf;
+		if (!RecordIsValid(record, RecPtr, emode))
+			goto next_record_is_invalid;
+		pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
+		state->ReadRecPtr = RecPtr;
+		state->EndRecPtr = pagelsn + pageHeaderSize + MAXALIGN(pageHeader->xlp_rem_len);
+	}
+	else
+	{
+		/* Record does not cross a page boundary */
+		if (!RecordIsValid(record, RecPtr, emode))
+			goto next_record_is_invalid;
+		state->EndRecPtr = RecPtr + MAXALIGN(total_len);
+
+		state->ReadRecPtr = RecPtr;
+		memcpy(state->readRecordBuf, record, total_len);
+	}
+
+	/*
+	 * Special processing if it's an XLOG SWITCH record
+	 */
+	if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
+	{
+		/* Pretend it extends to end of segment */
+		state->EndRecPtr += XLogSegSize - 1;
+		state->EndRecPtr -= state->EndRecPtr % XLogSegSize;
+	}
+	return record;
+
+next_record_is_invalid:
+	return NULL;
+}
+
+/*
+ * Validate an XLOG record header.
+ *
+ * This is just a convenience subroutine to avoid duplicated code in
+ * ReadRecord.	It's not intended for use from anywhere else.
+ */
+static bool
+ValidXLogRecordHeader(XLogRecPtr RecPtr, XLogRecPtr PrevRecPtr, XLogRecord *record, int emode,
+					  bool randAccess)
+{
+	/*
+	 * xl_len == 0 is bad data for everything except XLOG SWITCH, where it is
+	 * required.
+	 */
+	if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
+	{
+		if (record->xl_len != 0)
+		{
+			ereport(emode_for_corrupt_record(emode, RecPtr),
+					(errmsg("invalid xlog switch record at %X/%X",
+							(uint32) (RecPtr >> 32), (uint32) RecPtr)));
+			return false;
+		}
+	}
+	else if (record->xl_len == 0)
+	{
+		ereport(emode_for_corrupt_record(emode, RecPtr),
+				(errmsg("record with zero length at %X/%X",
+						(uint32) (RecPtr >> 32), (uint32) RecPtr)));
+		return false;
+	}
+	if (record->xl_tot_len < SizeOfXLogRecord + record->xl_len ||
+		record->xl_tot_len > SizeOfXLogRecord + record->xl_len +
+		XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
+	{
+		ereport(emode_for_corrupt_record(emode, RecPtr),
+				(errmsg("invalid record length at %X/%X",
+						(uint32) (RecPtr >> 32), (uint32) RecPtr)));
+		return false;
+	}
+	if (record->xl_rmid > RM_MAX_ID)
+	{
+		ereport(emode_for_corrupt_record(emode, RecPtr),
+				(errmsg("invalid resource manager ID %u at %X/%X",
+						record->xl_rmid, (uint32) (RecPtr >> 32), (uint32) RecPtr)));
+		return false;
+	}
+	if (randAccess)
+	{
+		/*
+		 * We can't exactly verify the prev-link, but surely it should be less
+		 * than the record's own address.
+		 */
+		if (!XLByteLT(record->xl_prev, RecPtr))
+		{
+			ereport(emode_for_corrupt_record(emode, RecPtr),
+					(errmsg("record with incorrect prev-link %X/%X at %X/%X",
+							(uint32) (record->xl_prev >> 32), (uint32) record->xl_prev,
+							(uint32) (RecPtr >> 32), (uint32) RecPtr)));
+			return false;
+		}
+	}
+	else
+	{
+		/*
+		 * Record's prev-link should exactly match our previous location. This
+		 * check guards against torn WAL pages where a stale but valid-looking
+		 * WAL record starts on a sector boundary.
+		 */
+		if (!XLByteEQ(record->xl_prev, PrevRecPtr))
+		{
+			ereport(emode_for_corrupt_record(emode, RecPtr),
+					(errmsg("record with incorrect prev-link %X/%X at %X/%X",
+							(uint32) (record->xl_prev >> 32), (uint32) record->xl_prev,
+							(uint32) (RecPtr >> 32), (uint32) RecPtr)));
+			return false;
+		}
+	}
+
+	return true;
+}
+
+
+/*
+ * CRC-check an XLOG record.  We do not believe the contents of an XLOG
+ * record (other than to the minimal extent of computing the amount of
+ * data to read in) until we've checked the CRCs.
+ *
+ * We assume all of the record (that is, xl_tot_len bytes) has been read
+ * into memory at *record.  Also, ValidXLogRecordHeader() has accepted the
+ * record's header, which means in particular that xl_tot_len is at least
+ * SizeOfXlogRecord, so it is safe to fetch xl_len.
+ */
+static bool
+RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
+{
+	pg_crc32	crc;
+	int			i;
+	uint32		len = record->xl_len;
+	BkpBlock	bkpb;
+	char	   *blk;
+	size_t		remaining = record->xl_tot_len;
+
+	/* First the rmgr data */
+	if (remaining < SizeOfXLogRecord + len)
+	{
+		/* ValidXLogRecordHeader() should've caught this already... */
+		ereport(emode_for_corrupt_record(emode, recptr),
+				(errmsg("invalid record length at %X/%X",
+						(uint32) (recptr >> 32), (uint32) recptr)));
+		return false;
+	}
+	remaining -= SizeOfXLogRecord + len;
+	INIT_CRC32(crc);
+	COMP_CRC32(crc, XLogRecGetData(record), len);
+
+	/* Add in the backup blocks, if any */
+	blk = (char *) XLogRecGetData(record) + len;
+	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
+	{
+		uint32		blen;
+
+		if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
+			continue;
+
+		if (remaining < sizeof(BkpBlock))
+		{
+			ereport(emode_for_corrupt_record(emode, recptr),
+					(errmsg("invalid backup block size in record at %X/%X",
+							(uint32) (recptr >> 32), (uint32) recptr)));
+			return false;
+		}
+		memcpy(&bkpb, blk, sizeof(BkpBlock));
+
+		if (bkpb.hole_offset + bkpb.hole_length > BLCKSZ)
+		{
+			ereport(emode_for_corrupt_record(emode, recptr),
+					(errmsg("incorrect hole size in record at %X/%X",
+							(uint32) (recptr >> 32), (uint32) recptr)));
+			return false;
+		}
+		blen = sizeof(BkpBlock) + BLCKSZ - bkpb.hole_length;
+
+		if (remaining < blen)
+		{
+			ereport(emode_for_corrupt_record(emode, recptr),
+					(errmsg("invalid backup block size in record at %X/%X",
+							(uint32) (recptr >> 32), (uint32) recptr)));
+			return false;
+		}
+		remaining -= blen;
+		COMP_CRC32(crc, blk, blen);
+		blk += blen;
+	}
+
+	/* Check that xl_tot_len agrees with our calculation */
+	if (remaining != 0)
+	{
+		ereport(emode_for_corrupt_record(emode, recptr),
+				(errmsg("incorrect total length in record at %X/%X",
+						(uint32) (recptr >> 32), (uint32) recptr)));
+		return false;
+	}
+
+	/* Finally include the record header */
+	COMP_CRC32(crc, (char *) record, offsetof(XLogRecord, xl_crc));
+	FIN_CRC32(crc);
+
+	if (!EQ_CRC32(record->xl_crc, crc))
+	{
+		ereport(emode_for_corrupt_record(emode, recptr),
+		(errmsg("incorrect resource manager data checksum in record at %X/%X",
+				(uint32) (recptr >> 32), (uint32) recptr)));
+		return false;
+	}
+
+	return true;
+}
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index b5bfb7b..1ada664 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -229,6 +229,14 @@ extern const RmgrData RmgrTable[];
 extern pg_time_t GetLastSegSwitchTime(void);
 extern XLogRecPtr RequestXLogSwitch(void);
 
+
+/*
+ * Exported so that xlogreader.c can call this. TODO: Should be refactored
+ * into a callback, or just have xlogreader return the error string and have
+ * the caller of XLogReadRecord() do the ereport() call.
+ */
+extern int	emode_for_corrupt_record(int emode, XLogRecPtr RecPtr);
+
 /*
  * These aren't in xlog.h because I'd rather not include fmgr.h there.
  */
