*** a/src/backend/access/transam/Makefile
--- b/src/backend/access/transam/Makefile
***************
*** 14,20 **** include $(top_builddir)/src/Makefile.global
  
  OBJS = clog.o transam.o varsup.o xact.o rmgr.o slru.o subtrans.o multixact.o \
  	timeline.o twophase.o twophase_rmgr.o xlog.o xlogarchive.o xlogfuncs.o \
! 	xlogutils.o
  
  include $(top_srcdir)/src/backend/common.mk
  
--- 14,20 ----
  
  OBJS = clog.o transam.o varsup.o xact.o rmgr.o slru.o subtrans.o multixact.o \
  	timeline.o twophase.o twophase_rmgr.o xlog.o xlogarchive.o xlogfuncs.o \
! 	xlogreader.o xlogutils.o
  
  include $(top_srcdir)/src/backend/common.mk
  
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 30,35 ****
--- 30,36 ----
  #include "access/twophase.h"
  #include "access/xact.h"
  #include "access/xlog_internal.h"
+ #include "access/xlogreader.h"
  #include "access/xlogutils.h"
  #include "catalog/catversion.h"
  #include "catalog/pg_control.h"
***************
*** 192,205 **** static bool LocalHotStandbyActive = false;
   */
  static int	LocalXLogInsertAllowed = -1;
  
! /* Are we recovering using offline XLOG archives? (only valid in the startup process) */
! bool InArchiveRecovery = false;
  
  /* Was the last xlog file restored from archive, or local? */
  static bool restoredFromArchive = false;
  
  /* options taken from recovery.conf for archive recovery */
! char *recoveryRestoreCommand = NULL;
  static char *recoveryEndCommand = NULL;
  static char *archiveCleanupCommand = NULL;
  static RecoveryTargetType recoveryTarget = RECOVERY_TARGET_UNSET;
--- 193,209 ----
   */
  static int	LocalXLogInsertAllowed = -1;
  
! /*
!  * Are we recovering using offline XLOG archives? (only valid in the startup
!  * process)
!  */
! bool		InArchiveRecovery = false;
  
  /* Was the last xlog file restored from archive, or local? */
  static bool restoredFromArchive = false;
  
  /* options taken from recovery.conf for archive recovery */
! char	   *recoveryRestoreCommand = NULL;
  static char *recoveryEndCommand = NULL;
  static char *archiveCleanupCommand = NULL;
  static RecoveryTargetType recoveryTarget = RECOVERY_TARGET_UNSET;
***************
*** 210,216 **** static TimestampTz recoveryTargetTime;
  static char *recoveryTargetName;
  
  /* options taken from recovery.conf for XLOG streaming */
! bool StandbyMode = false;
  static char *PrimaryConnInfo = NULL;
  static char *TriggerFile = NULL;
  
--- 214,220 ----
  static char *recoveryTargetName;
  
  /* options taken from recovery.conf for XLOG streaming */
! bool		StandbyMode = false;
  static char *PrimaryConnInfo = NULL;
  static char *TriggerFile = NULL;
  
***************
*** 389,395 **** typedef struct XLogCtlData
  	uint32		ckptXidEpoch;	/* nextXID & epoch of latest checkpoint */
  	TransactionId ckptXid;
  	XLogRecPtr	asyncXactLSN;	/* LSN of newest async commit/abort */
! 	XLogSegNo	lastRemovedSegNo; /* latest removed/recycled XLOG segment */
  
  	/* Protected by WALWriteLock: */
  	XLogCtlWrite Write;
--- 393,400 ----
  	uint32		ckptXidEpoch;	/* nextXID & epoch of latest checkpoint */
  	TransactionId ckptXid;
  	XLogRecPtr	asyncXactLSN;	/* LSN of newest async commit/abort */
! 	XLogSegNo	lastRemovedSegNo;		/* latest removed/recycled XLOG
! 										 * segment */
  
  	/* Protected by WALWriteLock: */
  	XLogCtlWrite Write;
***************
*** 530,554 **** static XLogSegNo openLogSegNo = 0;
  static uint32 openLogOff = 0;
  
  /*
!  * These variables are used similarly to the ones above, but for reading
   * the XLOG.  Note, however, that readOff generally represents the offset
   * of the page just read, not the seek position of the FD itself, which
   * will be just past that page. readLen indicates how much of the current
   * page has been read into readBuf, and readSource indicates where we got
   * the currently open file from.
   */
! static int	readFile = -1;
! static XLogSegNo readSegNo = 0;
! static uint32 readOff = 0;
! static uint32 readLen = 0;
! static bool	readFileHeaderValidated = false;
! static int	readSource = 0;		/* XLOG_FROM_* code */
! 
! /*
!  * Keeps track of which sources we've tried to read the current WAL
!  * record from and failed.
!  */
! static int	failedSources = 0;	/* OR of XLOG_FROM_* codes */
  
  /*
   * These variables track when we last obtained some WAL data to process,
--- 535,563 ----
  static uint32 openLogOff = 0;
  
  /*
!  * Status data for XLogPageRead.
!  *
!  * The first three are used similarly to the ones above, but for reading
   * the XLOG.  Note, however, that readOff generally represents the offset
   * of the page just read, not the seek position of the FD itself, which
   * will be just past that page. readLen indicates how much of the current
   * page has been read into readBuf, and readSource indicates where we got
   * the currently open file from.
+  *
+  * failedSources keeps track of which sources we've tried to read the current
+  * WAL record from and failed.
   */
! typedef struct XLogPageReadPrivate
! {
! 	int			readFile;
! 	XLogSegNo	readSegNo;
! 	uint32		readOff;
! 	uint32		readLen;
! 	bool		readFileHeaderValidated;
! 	bool		fetching_ckpt;	/* are we fetching a checkpoint record? */
! 	int			readSource;		/* XLOG_FROM_* code */
! 	int			failedSources;	/* OR of XLOG_FROM_* codes */
! } XLogPageReadPrivate;
  
  /*
   * These variables track when we last obtained some WAL data to process,
***************
*** 559,571 **** static int	failedSources = 0;	/* OR of XLOG_FROM_* codes */
  static TimestampTz XLogReceiptTime = 0;
  static int	XLogReceiptSource = 0;		/* XLOG_FROM_* code */
  
- /* Buffer for currently read page (XLOG_BLCKSZ bytes) */
- static char *readBuf = NULL;
- 
- /* Buffer for current ReadRecord result (expandable) */
- static char *readRecordBuf = NULL;
- static uint32 readRecordBufSize = 0;
- 
  /* State information for XLOG reading */
  static XLogRecPtr ReadRecPtr;	/* start of last record read */
  static XLogRecPtr EndRecPtr;	/* end+1 of last record read */
--- 568,573 ----
***************
*** 609,615 **** typedef struct xl_restore_point
  
  
  static void readRecoveryCommandFile(void);
! static void exitArchiveRecovery(TimeLineID endTLI, XLogSegNo endLogSegNo);
  static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);
  static void recoveryPausesHere(void);
  static void SetLatestXTime(TimestampTz xtime);
--- 611,618 ----
  
  
  static void readRecoveryCommandFile(void);
! static void exitArchiveRecovery(XLogPageReadPrivate *private, TimeLineID endTLI,
! 					XLogSegNo endLogSegNo);
  static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);
  static void recoveryPausesHere(void);
  static void SetLatestXTime(TimestampTz xtime);
***************
*** 628,641 **** static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch);
  static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
  					   bool find_free, int *max_advance,
  					   bool use_lock);
! static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
! 			 int source, bool notexistOk);
! static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int sources);
! static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
! 			 bool randAccess);
! static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
! 							bool fetching_ckpt);
! static int	emode_for_corrupt_record(int emode, XLogRecPtr RecPtr);
  static void XLogFileClose(void);
  static void PreallocXlogFiles(XLogRecPtr endptr);
  static void RemoveOldXlogFiles(XLogSegNo segno, XLogRecPtr endptr);
--- 631,644 ----
  static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
  					   bool find_free, int *max_advance,
  					   bool use_lock);
! static int XLogFileRead(XLogPageReadPrivate *private, XLogSegNo segno,
! 			 int emode, TimeLineID tli, int source, bool notexistOk);
! static int XLogFileReadAnyTLI(XLogPageReadPrivate *private, XLogSegNo segno,
! 				   int emode, int sources);
! static bool XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr RecPtr,
! 			 int emode, bool randAccess, char *readBuf, void *private_data);
! static bool WaitForWALToBecomeAvailable(XLogPageReadPrivate *private,
! 							XLogRecPtr RecPtr, bool randAccess);
  static void XLogFileClose(void);
  static void PreallocXlogFiles(XLogRecPtr endptr);
  static void RemoveOldXlogFiles(XLogSegNo segno, XLogRecPtr endptr);
***************
*** 643,654 **** static void UpdateLastRemovedPtr(char *filename);
  static void ValidateXLOGDirectoryStructure(void);
  static void CleanupBackupHistory(void);
  static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force);
! static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt);
! static void CheckRecoveryConsistency(void);
! static bool ValidXLogPageHeader(XLogPageHeader hdr, int emode, bool segmentonly);
! static bool ValidXLogRecordHeader(XLogRecPtr *RecPtr, XLogRecord *record,
! 					  int emode, bool randAccess);
! static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt);
  static bool rescanLatestTimeLine(void);
  static void WriteControlFile(void);
  static void ReadControlFile(void);
--- 646,658 ----
  static void ValidateXLOGDirectoryStructure(void);
  static void CleanupBackupHistory(void);
  static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force);
! static XLogRecord *ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr,
! 		   int emode, bool fetching_ckpt);
! static void CheckRecoveryConsistency(XLogRecPtr EndRecPtr);
! static bool ValidXLogPageHeader(XLogSegNo segno, uint32 offset, int source,
! 					XLogPageHeader hdr, int emode, bool segmentonly);
! static XLogRecord *ReadCheckpointRecord(XLogReaderState *xlogreader,
! 					 XLogRecPtr RecPtr, int whichChkpt);
  static bool rescanLatestTimeLine(void);
  static void WriteControlFile(void);
  static void ReadControlFile(void);
***************
*** 1515,1521 **** XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
  		 */
  		if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[curridx]))
  			elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
! 				 (uint32) (LogwrtResult.Write >> 32), (uint32) LogwrtResult.Write,
  				 (uint32) (XLogCtl->xlblocks[curridx] >> 32),
  				 (uint32) XLogCtl->xlblocks[curridx]);
  
--- 1519,1525 ----
  		 */
  		if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[curridx]))
  			elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
! 			(uint32) (LogwrtResult.Write >> 32), (uint32) LogwrtResult.Write,
  				 (uint32) (XLogCtl->xlblocks[curridx] >> 32),
  				 (uint32) XLogCtl->xlblocks[curridx]);
  
***************
*** 1581,1589 **** XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
  				if (lseek(openLogFile, (off_t) startoffset, SEEK_SET) < 0)
  					ereport(PANIC,
  							(errcode_for_file_access(),
! 							 errmsg("could not seek in log file %s to offset %u: %m",
! 									XLogFileNameP(ThisTimeLineID, openLogSegNo),
! 									startoffset)));
  				openLogOff = startoffset;
  			}
  
--- 1585,1593 ----
  				if (lseek(openLogFile, (off_t) startoffset, SEEK_SET) < 0)
  					ereport(PANIC,
  							(errcode_for_file_access(),
! 					 errmsg("could not seek in log file %s to offset %u: %m",
! 							XLogFileNameP(ThisTimeLineID, openLogSegNo),
! 							startoffset)));
  				openLogOff = startoffset;
  			}
  
***************
*** 1824,1830 **** UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force)
  		if (!force && XLByteLT(newMinRecoveryPoint, lsn))
  			elog(WARNING,
  			   "xlog min recovery request %X/%X is past current point %X/%X",
! 				 (uint32) (lsn >> 32) , (uint32) lsn,
  				 (uint32) (newMinRecoveryPoint >> 32),
  				 (uint32) newMinRecoveryPoint);
  
--- 1828,1834 ----
  		if (!force && XLByteLT(newMinRecoveryPoint, lsn))
  			elog(WARNING,
  			   "xlog min recovery request %X/%X is past current point %X/%X",
! 				 (uint32) (lsn >> 32), (uint32) lsn,
  				 (uint32) (newMinRecoveryPoint >> 32),
  				 (uint32) newMinRecoveryPoint);
  
***************
*** 1878,1884 **** XLogFlush(XLogRecPtr record)
  		elog(LOG, "xlog flush request %X/%X; write %X/%X; flush %X/%X",
  			 (uint32) (record >> 32), (uint32) record,
  			 (uint32) (LogwrtResult.Write >> 32), (uint32) LogwrtResult.Write,
! 			 (uint32) (LogwrtResult.Flush >> 32), (uint32) LogwrtResult.Flush);
  #endif
  
  	START_CRIT_SECTION();
--- 1882,1888 ----
  		elog(LOG, "xlog flush request %X/%X; write %X/%X; flush %X/%X",
  			 (uint32) (record >> 32), (uint32) record,
  			 (uint32) (LogwrtResult.Write >> 32), (uint32) LogwrtResult.Write,
! 		   (uint32) (LogwrtResult.Flush >> 32), (uint32) LogwrtResult.Flush);
  #endif
  
  	START_CRIT_SECTION();
***************
*** 1942,1949 **** XLogFlush(XLogRecPtr record)
  		/*
  		 * Sleep before flush! By adding a delay here, we may give further
  		 * backends the opportunity to join the backlog of group commit
! 		 * followers; this can significantly improve transaction throughput, at
! 		 * the risk of increasing transaction latency.
  		 *
  		 * We do not sleep if enableFsync is not turned on, nor if there are
  		 * fewer than CommitSiblings other backends with active transactions.
--- 1946,1953 ----
  		/*
  		 * Sleep before flush! By adding a delay here, we may give further
  		 * backends the opportunity to join the backlog of group commit
! 		 * followers; this can significantly improve transaction throughput,
! 		 * at the risk of increasing transaction latency.
  		 *
  		 * We do not sleep if enableFsync is not turned on, nor if there are
  		 * fewer than CommitSiblings other backends with active transactions.
***************
*** 1958,1964 **** XLogFlush(XLogRecPtr record)
  			XLogCtlInsert *Insert = &XLogCtl->Insert;
  			uint32		freespace = INSERT_FREESPACE(Insert);
  
! 			if (freespace == 0)		/* buffer is full */
  				WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
  			else
  			{
--- 1962,1968 ----
  			XLogCtlInsert *Insert = &XLogCtl->Insert;
  			uint32		freespace = INSERT_FREESPACE(Insert);
  
! 			if (freespace == 0) /* buffer is full */
  				WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
  			else
  			{
***************
*** 2011,2017 **** XLogFlush(XLogRecPtr record)
  		elog(ERROR,
  		"xlog flush request %X/%X is not satisfied --- flushed only to %X/%X",
  			 (uint32) (record >> 32), (uint32) record,
! 			 (uint32) (LogwrtResult.Flush >> 32), (uint32) LogwrtResult.Flush);
  }
  
  /*
--- 2015,2021 ----
  		elog(ERROR,
  		"xlog flush request %X/%X is not satisfied --- flushed only to %X/%X",
  			 (uint32) (record >> 32), (uint32) record,
! 		   (uint32) (LogwrtResult.Flush >> 32), (uint32) LogwrtResult.Flush);
  }
  
  /*
***************
*** 2090,2096 **** XLogBackgroundFlush(void)
  		elog(LOG, "xlog bg flush request %X/%X; write %X/%X; flush %X/%X",
  			 (uint32) (WriteRqstPtr >> 32), (uint32) WriteRqstPtr,
  			 (uint32) (LogwrtResult.Write >> 32), (uint32) LogwrtResult.Write,
! 			 (uint32) (LogwrtResult.Flush >> 32), (uint32) LogwrtResult.Flush);
  #endif
  
  	START_CRIT_SECTION();
--- 2094,2100 ----
  		elog(LOG, "xlog bg flush request %X/%X; write %X/%X; flush %X/%X",
  			 (uint32) (WriteRqstPtr >> 32), (uint32) WriteRqstPtr,
  			 (uint32) (LogwrtResult.Write >> 32), (uint32) LogwrtResult.Write,
! 		   (uint32) (LogwrtResult.Flush >> 32), (uint32) LogwrtResult.Flush);
  #endif
  
  	START_CRIT_SECTION();
***************
*** 2330,2336 **** XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock)
  	if (fd < 0)
  		ereport(ERROR,
  				(errcode_for_file_access(),
! 		   errmsg("could not open file \"%s\": %m", path)));
  
  	elog(DEBUG2, "done creating and filling new WAL file");
  
--- 2334,2340 ----
  	if (fd < 0)
  		ereport(ERROR,
  				(errcode_for_file_access(),
! 				 errmsg("could not open file \"%s\": %m", path)));
  
  	elog(DEBUG2, "done creating and filling new WAL file");
  
***************
*** 2569,2576 **** XLogFileOpen(XLogSegNo segno)
   * Otherwise, it's assumed to be already available in pg_xlog.
   */
  static int
! XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
! 			 int source, bool notfoundOk)
  {
  	char		xlogfname[MAXFNAMELEN];
  	char		activitymsg[MAXFNAMELEN + 16];
--- 2573,2580 ----
   * Otherwise, it's assumed to be already available in pg_xlog.
   */
  static int
! XLogFileRead(XLogPageReadPrivate *private, XLogSegNo segno, int emode,
! 			 TimeLineID tli, int source, bool notfoundOk)
  {
  	char		xlogfname[MAXFNAMELEN];
  	char		activitymsg[MAXFNAMELEN + 16];
***************
*** 2618,2626 **** XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
  		XLogFilePath(xlogfpath, tli, segno);
  		if (stat(xlogfpath, &statbuf) == 0)
  		{
! 			char oldpath[MAXPGPATH];
  #ifdef WIN32
  			static unsigned int deletedcounter = 1;
  			/*
  			 * On Windows, if another process (e.g a walsender process) holds
  			 * the file open in FILE_SHARE_DELETE mode, unlink will succeed,
--- 2622,2632 ----
  		XLogFilePath(xlogfpath, tli, segno);
  		if (stat(xlogfpath, &statbuf) == 0)
  		{
! 			char		oldpath[MAXPGPATH];
! 
  #ifdef WIN32
  			static unsigned int deletedcounter = 1;
+ 
  			/*
  			 * On Windows, if another process (e.g a walsender process) holds
  			 * the file open in FILE_SHARE_DELETE mode, unlink will succeed,
***************
*** 2687,2700 **** XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
  		set_ps_display(activitymsg, false);
  
  		/* Track source of data in assorted state variables */
! 		readSource = source;
  		XLogReceiptSource = source;
  		/* In FROM_STREAM case, caller tracks receipt time, not me */
  		if (source != XLOG_FROM_STREAM)
  			XLogReceiptTime = GetCurrentTimestamp();
  
  		/* The file header needs to be validated on first access */
! 		readFileHeaderValidated = false;
  
  		return fd;
  	}
--- 2693,2706 ----
  		set_ps_display(activitymsg, false);
  
  		/* Track source of data in assorted state variables */
! 		private->readSource = source;
  		XLogReceiptSource = source;
  		/* In FROM_STREAM case, caller tracks receipt time, not me */
  		if (source != XLOG_FROM_STREAM)
  			XLogReceiptTime = GetCurrentTimestamp();
  
  		/* The file header needs to be validated on first access */
! 		private->readFileHeaderValidated = false;
  
  		return fd;
  	}
***************
*** 2711,2717 **** XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
   * This version searches for the segment with any TLI listed in expectedTLIs.
   */
  static int
! XLogFileReadAnyTLI(XLogSegNo segno, int emode, int sources)
  {
  	char		path[MAXPGPATH];
  	ListCell   *cell;
--- 2717,2724 ----
   * This version searches for the segment with any TLI listed in expectedTLIs.
   */
  static int
! XLogFileReadAnyTLI(XLogPageReadPrivate *private, XLogSegNo segno, int emode,
! 				   int sources)
  {
  	char		path[MAXPGPATH];
  	ListCell   *cell;
***************
*** 2736,2742 **** XLogFileReadAnyTLI(XLogSegNo segno, int emode, int sources)
  
  		if (sources & XLOG_FROM_ARCHIVE)
  		{
! 			fd = XLogFileRead(segno, emode, tli, XLOG_FROM_ARCHIVE, true);
  			if (fd != -1)
  			{
  				elog(DEBUG1, "got WAL segment from archive");
--- 2743,2750 ----
  
  		if (sources & XLOG_FROM_ARCHIVE)
  		{
! 			fd = XLogFileRead(private, segno, emode, tli,
! 							  XLOG_FROM_ARCHIVE, true);
  			if (fd != -1)
  			{
  				elog(DEBUG1, "got WAL segment from archive");
***************
*** 2746,2752 **** XLogFileReadAnyTLI(XLogSegNo segno, int emode, int sources)
  
  		if (sources & XLOG_FROM_PG_XLOG)
  		{
! 			fd = XLogFileRead(segno, emode, tli, XLOG_FROM_PG_XLOG, true);
  			if (fd != -1)
  				return fd;
  		}
--- 2754,2761 ----
  
  		if (sources & XLOG_FROM_PG_XLOG)
  		{
! 			fd = XLogFileRead(private, segno, emode, tli,
! 							  XLOG_FROM_PG_XLOG, true);
  			if (fd != -1)
  				return fd;
  		}
***************
*** 3179,3280 **** RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record, int block_index,
  }
  
  /*
-  * CRC-check an XLOG record.  We do not believe the contents of an XLOG
-  * record (other than to the minimal extent of computing the amount of
-  * data to read in) until we've checked the CRCs.
-  *
-  * We assume all of the record (that is, xl_tot_len bytes) has been read
-  * into memory at *record.  Also, ValidXLogRecordHeader() has accepted the
-  * record's header, which means in particular that xl_tot_len is at least
-  * SizeOfXlogRecord, so it is safe to fetch xl_len.
-  */
- static bool
- RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
- {
- 	pg_crc32	crc;
- 	int			i;
- 	uint32		len = record->xl_len;
- 	BkpBlock	bkpb;
- 	char	   *blk;
- 	size_t		remaining = record->xl_tot_len;
- 
- 	/* First the rmgr data */
- 	if (remaining < SizeOfXLogRecord + len)
- 	{
- 		/* ValidXLogRecordHeader() should've caught this already... */
- 		ereport(emode_for_corrupt_record(emode, recptr),
- 				(errmsg("invalid record length at %X/%X",
- 						(uint32) (recptr >> 32), (uint32) recptr)));
- 		return false;
- 	}
- 	remaining -= SizeOfXLogRecord + len;
- 	INIT_CRC32(crc);
- 	COMP_CRC32(crc, XLogRecGetData(record), len);
- 
- 	/* Add in the backup blocks, if any */
- 	blk = (char *) XLogRecGetData(record) + len;
- 	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
- 	{
- 		uint32		blen;
- 
- 		if (!(record->xl_info & XLR_BKP_BLOCK(i)))
- 			continue;
- 
- 		if (remaining < sizeof(BkpBlock))
- 		{
- 			ereport(emode_for_corrupt_record(emode, recptr),
- 					(errmsg("invalid backup block size in record at %X/%X",
- 							(uint32) (recptr >> 32), (uint32) recptr)));
- 			return false;
- 		}
- 		memcpy(&bkpb, blk, sizeof(BkpBlock));
- 
- 		if (bkpb.hole_offset + bkpb.hole_length > BLCKSZ)
- 		{
- 			ereport(emode_for_corrupt_record(emode, recptr),
- 					(errmsg("incorrect hole size in record at %X/%X",
- 							(uint32) (recptr >> 32), (uint32) recptr)));
- 			return false;
- 		}
- 		blen = sizeof(BkpBlock) + BLCKSZ - bkpb.hole_length;
- 
- 		if (remaining < blen)
- 		{
- 			ereport(emode_for_corrupt_record(emode, recptr),
- 					(errmsg("invalid backup block size in record at %X/%X",
- 							(uint32) (recptr >> 32), (uint32) recptr)));
- 			return false;
- 		}
- 		remaining -= blen;
- 		COMP_CRC32(crc, blk, blen);
- 		blk += blen;
- 	}
- 
- 	/* Check that xl_tot_len agrees with our calculation */
- 	if (remaining != 0)
- 	{
- 		ereport(emode_for_corrupt_record(emode, recptr),
- 				(errmsg("incorrect total length in record at %X/%X",
- 						(uint32) (recptr >> 32), (uint32) recptr)));
- 		return false;
- 	}
- 
- 	/* Finally include the record header */
- 	COMP_CRC32(crc, (char *) record, offsetof(XLogRecord, xl_crc));
- 	FIN_CRC32(crc);
- 
- 	if (!EQ_CRC32(record->xl_crc, crc))
- 	{
- 		ereport(emode_for_corrupt_record(emode, recptr),
- 		(errmsg("incorrect resource manager data checksum in record at %X/%X",
- 				(uint32) (recptr >> 32), (uint32) recptr)));
- 		return false;
- 	}
- 
- 	return true;
- }
- 
- /*
   * Attempt to read an XLOG record.
   *
   * If RecPtr is not NULL, try to read a record at that position.  Otherwise
--- 3188,3193 ----
***************
*** 3287,3608 **** RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
   * the returned record pointer always points there.
   */
  static XLogRecord *
! ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt)
  {
  	XLogRecord *record;
! 	XLogRecPtr	tmpRecPtr = EndRecPtr;
! 	bool		randAccess = false;
! 	uint32		len,
! 				total_len;
! 	uint32		targetRecOff;
! 	uint32		pageHeaderSize;
! 	bool		gotheader;
! 
! 	if (readBuf == NULL)
! 	{
! 		/*
! 		 * First time through, permanently allocate readBuf.  We do it this
! 		 * way, rather than just making a static array, for two reasons: (1)
! 		 * no need to waste the storage in most instantiations of the backend;
! 		 * (2) a static char array isn't guaranteed to have any particular
! 		 * alignment, whereas malloc() will provide MAXALIGN'd storage.
! 		 */
! 		readBuf = (char *) malloc(XLOG_BLCKSZ);
! 		Assert(readBuf != NULL);
! 	}
! 
! 	if (RecPtr == NULL)
! 	{
! 		RecPtr = &tmpRecPtr;
! 
! 		/*
! 		 * RecPtr is pointing to end+1 of the previous WAL record.  If
! 		 * we're at a page boundary, no more records can fit on the current
! 		 * page. We must skip over the page header, but we can't do that
! 		 * until we've read in the page, since the header size is variable.
! 		 */
! 	}
! 	else
! 	{
! 		/*
! 		 * In this case, the passed-in record pointer should already be
! 		 * pointing to a valid record starting position.
! 		 */
! 		if (!XRecOffIsValid(*RecPtr))
! 			ereport(PANIC,
! 					(errmsg("invalid record offset at %X/%X",
! 							(uint32) (*RecPtr >> 32), (uint32) *RecPtr)));
  
! 		/*
! 		 * Since we are going to a random position in WAL, forget any prior
! 		 * state about what timeline we were in, and allow it to be any
! 		 * timeline in expectedTLIs.  We also set a flag to allow curFileTLI
! 		 * to go backwards (but we can't reset that variable right here, since
! 		 * we might not change files at all).
! 		 */
  		/* see comment in ValidXLogPageHeader */
! 		lastPageTLI = lastSegmentTLI = 0;
! 		randAccess = true;		/* allow curFileTLI to go backwards too */
! 	}
  
! 	/* This is the first try to read this page. */
! 	failedSources = 0;
! retry:
! 	/* Read the page containing the record */
! 	if (!XLogPageRead(RecPtr, emode, fetching_ckpt, randAccess))
! 		return NULL;
! 
! 	pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
! 	targetRecOff = (*RecPtr) % XLOG_BLCKSZ;
! 	if (targetRecOff == 0)
! 	{
! 		/*
! 		 * At page start, so skip over page header.  The Assert checks that
! 		 * we're not scribbling on caller's record pointer; it's OK because we
! 		 * can only get here in the continuing-from-prev-record case, since
! 		 * XRecOffIsValid rejected the zero-page-offset case otherwise.
! 		 */
! 		Assert(RecPtr == &tmpRecPtr);
! 		(*RecPtr) += pageHeaderSize;
! 		targetRecOff = pageHeaderSize;
! 	}
! 	else if (targetRecOff < pageHeaderSize)
! 	{
! 		ereport(emode_for_corrupt_record(emode, *RecPtr),
! 				(errmsg("invalid record offset at %X/%X",
! 						(uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
! 		goto next_record_is_invalid;
! 	}
! 	if ((((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
! 		targetRecOff == pageHeaderSize)
! 	{
! 		ereport(emode_for_corrupt_record(emode, *RecPtr),
! 				(errmsg("contrecord is requested by %X/%X",
! 						(uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
! 		goto next_record_is_invalid;
! 	}
  
! 	/*
! 	 * Read the record length.
! 	 *
! 	 * NB: Even though we use an XLogRecord pointer here, the whole record
! 	 * header might not fit on this page. xl_tot_len is the first field of
! 	 * the struct, so it must be on this page (the records are MAXALIGNed),
! 	 * but we cannot access any other fields until we've verified that we
! 	 * got the whole header.
! 	 */
! 	record = (XLogRecord *) (readBuf + (*RecPtr) % XLOG_BLCKSZ);
! 	total_len = record->xl_tot_len;
! 
! 	/*
! 	 * If the whole record header is on this page, validate it immediately.
! 	 * Otherwise do just a basic sanity check on xl_tot_len, and validate the
! 	 * rest of the header after reading it from the next page.  The xl_tot_len
! 	 * check is necessary here to ensure that we enter the "Need to reassemble
! 	 * record" code path below; otherwise we might fail to apply
! 	 * ValidXLogRecordHeader at all.
! 	 */
! 	if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
! 	{
! 		if (!ValidXLogRecordHeader(RecPtr, record, emode, randAccess))
! 			goto next_record_is_invalid;
! 		gotheader = true;
! 	}
! 	else
! 	{
! 		if (total_len < SizeOfXLogRecord)
! 		{
! 			ereport(emode_for_corrupt_record(emode, *RecPtr),
! 					(errmsg("invalid record length at %X/%X",
! 							(uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
! 			goto next_record_is_invalid;
! 		}
! 		gotheader = false;
! 	}
! 
! 	/*
! 	 * Allocate or enlarge readRecordBuf as needed.  To avoid useless small
! 	 * increases, round its size to a multiple of XLOG_BLCKSZ, and make sure
! 	 * it's at least 4*Max(BLCKSZ, XLOG_BLCKSZ) to start with.  (That is
! 	 * enough for all "normal" records, but very large commit or abort records
! 	 * might need more space.)
! 	 */
! 	if (total_len > readRecordBufSize)
! 	{
! 		uint32		newSize = total_len;
! 
! 		newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
! 		newSize = Max(newSize, 4 * Max(BLCKSZ, XLOG_BLCKSZ));
! 		if (readRecordBuf)
! 			free(readRecordBuf);
! 		readRecordBuf = (char *) malloc(newSize);
! 		if (!readRecordBuf)
! 		{
! 			readRecordBufSize = 0;
! 			/* We treat this as a "bogus data" condition */
! 			ereport(emode_for_corrupt_record(emode, *RecPtr),
! 					(errmsg("record length %u at %X/%X too long",
! 							total_len, (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
! 			goto next_record_is_invalid;
! 		}
! 		readRecordBufSize = newSize;
! 	}
! 
! 	len = XLOG_BLCKSZ - (*RecPtr) % XLOG_BLCKSZ;
! 	if (total_len > len)
  	{
! 		/* Need to reassemble record */
! 		char	   *contrecord;
! 		XLogPageHeader pageHeader;
! 		XLogRecPtr	pagelsn;
! 		char	   *buffer;
! 		uint32		gotlen;
! 
! 		/* Initialize pagelsn to the beginning of the page this record is on */
! 		pagelsn = ((*RecPtr) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
! 
! 		/* Copy the first fragment of the record from the first page. */
! 		memcpy(readRecordBuf, readBuf + (*RecPtr) % XLOG_BLCKSZ, len);
! 		buffer = readRecordBuf + len;
! 		gotlen = len;
! 
! 		do
  		{
! 			/* Calculate pointer to beginning of next page */
! 			XLByteAdvance(pagelsn, XLOG_BLCKSZ);
! 			/* Wait for the next page to become available */
! 			if (!XLogPageRead(&pagelsn, emode, false, false))
! 				return NULL;
! 
! 			/* Check that the continuation on next page looks valid */
! 			pageHeader = (XLogPageHeader) readBuf;
! 			if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
! 			{
! 				ereport(emode_for_corrupt_record(emode, *RecPtr),
! 						(errmsg("there is no contrecord flag in log segment %s, offset %u",
! 								XLogFileNameP(curFileTLI, readSegNo),
! 								readOff)));
! 				goto next_record_is_invalid;
! 			}
! 			/*
! 			 * Cross-check that xlp_rem_len agrees with how much of the record
! 			 * we expect there to be left.
! 			 */
! 			if (pageHeader->xlp_rem_len == 0 ||
! 				total_len != (pageHeader->xlp_rem_len + gotlen))
! 			{
! 				ereport(emode_for_corrupt_record(emode, *RecPtr),
! 						(errmsg("invalid contrecord length %u in log segment %s, offset %u",
! 								pageHeader->xlp_rem_len,
! 								XLogFileNameP(curFileTLI, readSegNo),
! 								readOff)));
! 				goto next_record_is_invalid;
! 			}
  
! 			/* Append the continuation from this page to the buffer */
! 			pageHeaderSize = XLogPageHeaderSize(pageHeader);
! 			contrecord = (char *) readBuf + pageHeaderSize;
! 			len = XLOG_BLCKSZ - pageHeaderSize;
! 			if (pageHeader->xlp_rem_len < len)
! 				len = pageHeader->xlp_rem_len;
! 			memcpy(buffer, (char *) contrecord, len);
! 			buffer += len;
! 			gotlen += len;
! 
! 			/* If we just reassembled the record header, validate it. */
! 			if (!gotheader)
  			{
! 				record = (XLogRecord *) readRecordBuf;
! 				if (!ValidXLogRecordHeader(RecPtr, record, emode, randAccess))
! 					goto next_record_is_invalid;
! 				gotheader = true;
  			}
! 		} while (pageHeader->xlp_rem_len > len);
! 
! 		record = (XLogRecord *) readRecordBuf;
! 		if (!RecordIsValid(record, *RecPtr, emode))
! 			goto next_record_is_invalid;
! 		pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
! 		XLogSegNoOffsetToRecPtr(
! 			readSegNo,
! 			readOff + pageHeaderSize + MAXALIGN(pageHeader->xlp_rem_len),
! 			EndRecPtr);
! 		ReadRecPtr = *RecPtr;
! 	}
! 	else
! 	{
! 		/* Record does not cross a page boundary */
! 		if (!RecordIsValid(record, *RecPtr, emode))
! 			goto next_record_is_invalid;
! 		EndRecPtr = *RecPtr + MAXALIGN(total_len);
! 
! 		ReadRecPtr = *RecPtr;
! 		memcpy(readRecordBuf, record, total_len);
! 	}
! 
! 	/*
! 	 * Special processing if it's an XLOG SWITCH record
! 	 */
! 	if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
! 	{
! 		/* Pretend it extends to end of segment */
! 		EndRecPtr += XLogSegSize - 1;
! 		EndRecPtr -= EndRecPtr % XLogSegSize;
  
- 		/*
- 		 * Pretend that readBuf contains the last page of the segment. This is
- 		 * just to avoid Assert failure in StartupXLOG if XLOG ends with this
- 		 * segment.
- 		 */
- 		readOff = XLogSegSize - XLOG_BLCKSZ;
- 	}
  	return record;
- 
- next_record_is_invalid:
- 	failedSources |= readSource;
- 
- 	if (readFile >= 0)
- 	{
- 		close(readFile);
- 		readFile = -1;
- 	}
- 
- 	/* In standby-mode, keep trying */
- 	if (StandbyMode)
- 		goto retry;
- 	else
- 		return NULL;
  }
  
  /*
   * Check whether the xlog header of a page just read in looks valid.
   *
   * This is just a convenience subroutine to avoid duplicated code in
!  * ReadRecord.	It's not intended for use from anywhere else.
   */
  static bool
! ValidXLogPageHeader(XLogPageHeader hdr, int emode, bool segmentonly)
  {
  	XLogRecPtr	recaddr;
  
! 	XLogSegNoOffsetToRecPtr(readSegNo, readOff, recaddr);
  
  	if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
  	{
! 		ereport(emode_for_corrupt_record(emode, recaddr),
! 				(errmsg("invalid magic number %04X in log segment %s, offset %u",
! 						hdr->xlp_magic,
! 						XLogFileNameP(curFileTLI, readSegNo),
! 						readOff)));
  		return false;
  	}
  	if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0)
  	{
! 		ereport(emode_for_corrupt_record(emode, recaddr),
  				(errmsg("invalid info bits %04X in log segment %s, offset %u",
  						hdr->xlp_info,
! 						XLogFileNameP(curFileTLI, readSegNo),
! 						readOff)));
  		return false;
  	}
  	if (hdr->xlp_info & XLP_LONG_HEADER)
--- 3200,3270 ----
   * the returned record pointer always points there.
   */
  static XLogRecord *
! ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
! 		   bool fetching_ckpt)
  {
  	XLogRecord *record;
! 	XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data;
  
! 	if (!XLogRecPtrIsInvalid(RecPtr))
  		/* see comment in ValidXLogPageHeader */
! 		lastPageTLI = lastSegmentTLI = 0;		
  
! 	/* Set flag for XLogPageRead */
! 	private->fetching_ckpt = fetching_ckpt;
  
! 	/* This is the first try to read this page. */
! 	private->failedSources = 0;
! 	do
  	{
! 		record = XLogReadRecord(xlogreader, RecPtr, emode);
! 		ReadRecPtr = xlogreader->ReadRecPtr;
! 		EndRecPtr = xlogreader->EndRecPtr;
! 		if (record == NULL)
  		{
! 			private->failedSources |= private->readSource;
  
! 			if (private->readFile >= 0)
  			{
! 				close(private->readFile);
! 				private->readFile = -1;
  			}
! 		}
! 	} while (StandbyMode && record == NULL);
  
  	return record;
  }
  
  /*
   * Check whether the xlog header of a page just read in looks valid.
   *
   * This is just a convenience subroutine to avoid duplicated code in
!  * XLogPageRead.  It's not intended for use from anywhere else.
   */
  static bool
! ValidXLogPageHeader(XLogSegNo segno, uint32 offset, int source,
! 					XLogPageHeader hdr, int emode, bool segmentonly)
  {
  	XLogRecPtr	recaddr;
  
! 	XLogSegNoOffsetToRecPtr(segno, offset, recaddr);
  
  	if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
  	{
! 		ereport(emode_for_corrupt_record(emode, source, recaddr),
! 			(errmsg("invalid magic number %04X in log segment %s, offset %u",
! 					hdr->xlp_magic,
! 					XLogFileNameP(curFileTLI, segno),
! 					offset)));
  		return false;
  	}
  	if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0)
  	{
! 		ereport(emode_for_corrupt_record(emode, source, recaddr),
  				(errmsg("invalid info bits %04X in log segment %s, offset %u",
  						hdr->xlp_info,
! 						XLogFileNameP(curFileTLI, segno),
! 						offset)));
  		return false;
  	}
  	if (hdr->xlp_info & XLP_LONG_HEADER)
***************
*** 3622,3628 **** ValidXLogPageHeader(XLogPageHeader hdr, int emode, bool segmentonly)
  					 longhdr->xlp_sysid);
  			snprintf(sysident_str, sizeof(sysident_str), UINT64_FORMAT,
  					 ControlFile->system_identifier);
! 			ereport(emode_for_corrupt_record(emode, recaddr),
  					(errmsg("WAL file is from different database system"),
  					 errdetail("WAL file database system identifier is %s, pg_control database system identifier is %s.",
  							   fhdrident_str, sysident_str)));
--- 3284,3290 ----
  					 longhdr->xlp_sysid);
  			snprintf(sysident_str, sizeof(sysident_str), UINT64_FORMAT,
  					 ControlFile->system_identifier);
! 			ereport(emode_for_corrupt_record(emode, source, recaddr),
  					(errmsg("WAL file is from different database system"),
  					 errdetail("WAL file database system identifier is %s, pg_control database system identifier is %s.",
  							   fhdrident_str, sysident_str)));
***************
*** 3630,3666 **** ValidXLogPageHeader(XLogPageHeader hdr, int emode, bool segmentonly)
  		}
  		if (longhdr->xlp_seg_size != XLogSegSize)
  		{
! 			ereport(emode_for_corrupt_record(emode, recaddr),
  					(errmsg("WAL file is from different database system"),
  					 errdetail("Incorrect XLOG_SEG_SIZE in page header.")));
  			return false;
  		}
  		if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ)
  		{
! 			ereport(emode_for_corrupt_record(emode, recaddr),
  					(errmsg("WAL file is from different database system"),
  					 errdetail("Incorrect XLOG_BLCKSZ in page header.")));
  			return false;
  		}
  	}
! 	else if (readOff == 0)
  	{
  		/* hmm, first page of file doesn't have a long header? */
! 		ereport(emode_for_corrupt_record(emode, recaddr),
  				(errmsg("invalid info bits %04X in log segment %s, offset %u",
  						hdr->xlp_info,
! 						XLogFileNameP(curFileTLI, readSegNo),
! 						readOff)));
  		return false;
  	}
  
  	if (!XLByteEQ(hdr->xlp_pageaddr, recaddr))
  	{
! 		ereport(emode_for_corrupt_record(emode, recaddr),
! 				(errmsg("unexpected pageaddr %X/%X in log segment %s, offset %u",
! 						(uint32) (hdr->xlp_pageaddr >> 32), (uint32) hdr->xlp_pageaddr,
! 						XLogFileNameP(curFileTLI, readSegNo),
! 						readOff)));
  		return false;
  	}
  
--- 3292,3328 ----
  		}
  		if (longhdr->xlp_seg_size != XLogSegSize)
  		{
! 			ereport(emode_for_corrupt_record(emode, source, recaddr),
  					(errmsg("WAL file is from different database system"),
  					 errdetail("Incorrect XLOG_SEG_SIZE in page header.")));
  			return false;
  		}
  		if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ)
  		{
! 			ereport(emode_for_corrupt_record(emode, source, recaddr),
  					(errmsg("WAL file is from different database system"),
  					 errdetail("Incorrect XLOG_BLCKSZ in page header.")));
  			return false;
  		}
  	}
! 	else if (offset == 0)
  	{
  		/* hmm, first page of file doesn't have a long header? */
! 		ereport(emode_for_corrupt_record(emode, source, recaddr),
  				(errmsg("invalid info bits %04X in log segment %s, offset %u",
  						hdr->xlp_info,
! 						XLogFileNameP(curFileTLI, segno),
! 						offset)));
  		return false;
  	}
  
  	if (!XLByteEQ(hdr->xlp_pageaddr, recaddr))
  	{
! 		ereport(emode_for_corrupt_record(emode, source, recaddr),
! 			(errmsg("unexpected pageaddr %X/%X in log segment %s, offset %u",
! 			  (uint32) (hdr->xlp_pageaddr >> 32), (uint32) hdr->xlp_pageaddr,
! 					XLogFileNameP(curFileTLI, segno),
! 					offset)));
  		return false;
  	}
  
***************
*** 3669,3679 **** ValidXLogPageHeader(XLogPageHeader hdr, int emode, bool segmentonly)
  	 */
  	if (!list_member_int(expectedTLIs, (int) hdr->xlp_tli))
  	{
! 		ereport(emode_for_corrupt_record(emode, recaddr),
! 				(errmsg("unexpected timeline ID %u in log segment %s, offset %u",
! 						hdr->xlp_tli,
! 						XLogFileNameP(curFileTLI, readSegNo),
! 						readOff)));
  		return false;
  	}
  
--- 3331,3341 ----
  	 */
  	if (!list_member_int(expectedTLIs, (int) hdr->xlp_tli))
  	{
! 		ereport(emode_for_corrupt_record(emode, source, recaddr),
! 			(errmsg("unexpected timeline ID %u in log segment %s, offset %u",
! 					hdr->xlp_tli,
! 					XLogFileNameP(curFileTLI, segno),
! 					offset)));
  		return false;
  	}
  
***************
*** 3697,3708 **** ValidXLogPageHeader(XLogPageHeader hdr, int emode, bool segmentonly)
  	 */
  	if (hdr->xlp_tli < (segmentonly ? lastSegmentTLI : lastPageTLI))
  	{
! 		ereport(emode_for_corrupt_record(emode, recaddr),
  				(errmsg("out-of-sequence timeline ID %u (after %u) in log segment %s, offset %u",
  						hdr->xlp_tli,
  						segmentonly ? lastSegmentTLI : lastPageTLI,
! 						XLogFileNameP(curFileTLI, readSegNo),
! 						readOff)));
  		return false;
  	}
  	lastPageTLI = hdr->xlp_tli;
--- 3359,3370 ----
  	 */
  	if (hdr->xlp_tli < (segmentonly ? lastSegmentTLI : lastPageTLI))
  	{
! 		ereport(emode_for_corrupt_record(emode, source, recaddr),
  				(errmsg("out-of-sequence timeline ID %u (after %u) in log segment %s, offset %u",
  						hdr->xlp_tli,
  						segmentonly ? lastSegmentTLI : lastPageTLI,
! 						XLogFileNameP(curFileTLI, segno),
! 						offset)));
  		return false;
  	}
  	lastPageTLI = hdr->xlp_tli;
***************
*** 3713,3800 **** ValidXLogPageHeader(XLogPageHeader hdr, int emode, bool segmentonly)
  }
  
  /*
-  * Validate an XLOG record header.
-  *
-  * This is just a convenience subroutine to avoid duplicated code in
-  * ReadRecord.	It's not intended for use from anywhere else.
-  */
- static bool
- ValidXLogRecordHeader(XLogRecPtr *RecPtr, XLogRecord *record, int emode,
- 					  bool randAccess)
- {
- 	/*
- 	 * xl_len == 0 is bad data for everything except XLOG SWITCH, where it is
- 	 * required.
- 	 */
- 	if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
- 	{
- 		if (record->xl_len != 0)
- 		{
- 			ereport(emode_for_corrupt_record(emode, *RecPtr),
- 					(errmsg("invalid xlog switch record at %X/%X",
- 							(uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
- 			return false;
- 		}
- 	}
- 	else if (record->xl_len == 0)
- 	{
- 		ereport(emode_for_corrupt_record(emode, *RecPtr),
- 				(errmsg("record with zero length at %X/%X",
- 						(uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
- 		return false;
- 	}
- 	if (record->xl_tot_len < SizeOfXLogRecord + record->xl_len ||
- 		record->xl_tot_len > SizeOfXLogRecord + record->xl_len +
- 		XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
- 	{
- 		ereport(emode_for_corrupt_record(emode, *RecPtr),
- 				(errmsg("invalid record length at %X/%X",
- 						(uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
- 		return false;
- 	}
- 	if (record->xl_rmid > RM_MAX_ID)
- 	{
- 		ereport(emode_for_corrupt_record(emode, *RecPtr),
- 				(errmsg("invalid resource manager ID %u at %X/%X",
- 						record->xl_rmid, (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
- 		return false;
- 	}
- 	if (randAccess)
- 	{
- 		/*
- 		 * We can't exactly verify the prev-link, but surely it should be less
- 		 * than the record's own address.
- 		 */
- 		if (!XLByteLT(record->xl_prev, *RecPtr))
- 		{
- 			ereport(emode_for_corrupt_record(emode, *RecPtr),
- 					(errmsg("record with incorrect prev-link %X/%X at %X/%X",
- 							(uint32) (record->xl_prev >> 32), (uint32) record->xl_prev,
- 							(uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
- 			return false;
- 		}
- 	}
- 	else
- 	{
- 		/*
- 		 * Record's prev-link should exactly match our previous location. This
- 		 * check guards against torn WAL pages where a stale but valid-looking
- 		 * WAL record starts on a sector boundary.
- 		 */
- 		if (!XLByteEQ(record->xl_prev, ReadRecPtr))
- 		{
- 			ereport(emode_for_corrupt_record(emode, *RecPtr),
- 					(errmsg("record with incorrect prev-link %X/%X at %X/%X",
- 							(uint32) (record->xl_prev >> 32), (uint32) record->xl_prev,
- 							(uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
- 			return false;
- 		}
- 	}
- 
- 	return true;
- }
- 
- /*
   * Scan for new timelines that might have appeared in the archive since we
   * started recovery.
   *
--- 3375,3380 ----
***************
*** 4755,4761 **** readRecoveryCommandFile(void)
   * Exit archive-recovery state
   */
  static void
! exitArchiveRecovery(TimeLineID endTLI, XLogSegNo endLogSegNo)
  {
  	char		recoveryPath[MAXPGPATH];
  	char		xlogpath[MAXPGPATH];
--- 4335,4342 ----
   * Exit archive-recovery state
   */
  static void
! exitArchiveRecovery(XLogPageReadPrivate *private, TimeLineID endTLI,
! 					XLogSegNo endLogSegNo)
  {
  	char		recoveryPath[MAXPGPATH];
  	char		xlogpath[MAXPGPATH];
***************
*** 4774,4783 **** exitArchiveRecovery(TimeLineID endTLI, XLogSegNo endLogSegNo)
  	 * If the ending log segment is still open, close it (to avoid problems on
  	 * Windows with trying to rename or delete an open file).
  	 */
! 	if (readFile >= 0)
  	{
! 		close(readFile);
! 		readFile = -1;
  	}
  
  	/*
--- 4355,4364 ----
  	 * If the ending log segment is still open, close it (to avoid problems on
  	 * Windows with trying to rename or delete an open file).
  	 */
! 	if (private->readFile >= 0)
  	{
! 		close(private->readFile);
! 		private->readFile = -1;
  	}
  
  	/*
***************
*** 5212,5217 **** StartupXLOG(void)
--- 4793,4800 ----
  	bool		backupEndRequired = false;
  	bool		backupFromStandby = false;
  	DBState		dbstate_at_startup;
+ 	XLogReaderState *xlogreader;
+ 	XLogPageReadPrivate *private;
  
  	/*
  	 * Read control file and check XLOG status looks valid.
***************
*** 5345,5350 **** StartupXLOG(void)
--- 4928,4942 ----
  	if (StandbyMode)
  		OwnLatch(&XLogCtl->recoveryWakeupLatch);
  
+ 	private = palloc0(sizeof(XLogPageReadPrivate));
+ 	private->readFile = -1;
+ 	xlogreader = XLogReaderAllocate(InvalidXLogRecPtr, &XLogPageRead, private);
+ 	if (!xlogreader)
+ 		ereport(ERROR,
+ 				(errcode(ERRCODE_OUT_OF_MEMORY),
+ 				 errmsg("out of memory"),
+ 				 errdetail("Failed while allocating an XLog reading processor")));
+ 
  	if (read_backup_label(&checkPointLoc, &backupEndRequired,
  						  &backupFromStandby))
  	{
***************
*** 5352,5365 **** StartupXLOG(void)
  		 * When a backup_label file is present, we want to roll forward from
  		 * the checkpoint it identifies, rather than using pg_control.
  		 */
! 		record = ReadCheckpointRecord(checkPointLoc, 0);
  		if (record != NULL)
  		{
  			memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
  			wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN);
  			ereport(DEBUG1,
  					(errmsg("checkpoint record is at %X/%X",
! 							(uint32) (checkPointLoc >> 32), (uint32) checkPointLoc)));
  			InRecovery = true;	/* force recovery even if SHUTDOWNED */
  
  			/*
--- 4944,4957 ----
  		 * When a backup_label file is present, we want to roll forward from
  		 * the checkpoint it identifies, rather than using pg_control.
  		 */
! 		record = ReadCheckpointRecord(xlogreader, checkPointLoc, 0);
  		if (record != NULL)
  		{
  			memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
  			wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN);
  			ereport(DEBUG1,
  					(errmsg("checkpoint record is at %X/%X",
! 				   (uint32) (checkPointLoc >> 32), (uint32) checkPointLoc)));
  			InRecovery = true;	/* force recovery even if SHUTDOWNED */
  
  			/*
***************
*** 5370,5376 **** StartupXLOG(void)
  			 */
  			if (XLByteLT(checkPoint.redo, checkPointLoc))
  			{
! 				if (!ReadRecord(&(checkPoint.redo), LOG, false))
  					ereport(FATAL,
  							(errmsg("could not find redo location referenced by checkpoint record"),
  							 errhint("If you are not restoring from a backup, try removing the file \"%s/backup_label\".", DataDir)));
--- 4962,4968 ----
  			 */
  			if (XLByteLT(checkPoint.redo, checkPointLoc))
  			{
! 				if (!ReadRecord(xlogreader, checkPoint.redo, LOG, false))
  					ereport(FATAL,
  							(errmsg("could not find redo location referenced by checkpoint record"),
  							 errhint("If you are not restoring from a backup, try removing the file \"%s/backup_label\".", DataDir)));
***************
*** 5394,5405 **** StartupXLOG(void)
  		 */
  		checkPointLoc = ControlFile->checkPoint;
  		RedoStartLSN = ControlFile->checkPointCopy.redo;
! 		record = ReadCheckpointRecord(checkPointLoc, 1);
  		if (record != NULL)
  		{
  			ereport(DEBUG1,
  					(errmsg("checkpoint record is at %X/%X",
! 							(uint32) (checkPointLoc >> 32), (uint32) checkPointLoc)));
  		}
  		else if (StandbyMode)
  		{
--- 4986,4997 ----
  		 */
  		checkPointLoc = ControlFile->checkPoint;
  		RedoStartLSN = ControlFile->checkPointCopy.redo;
! 		record = ReadCheckpointRecord(xlogreader, checkPointLoc, 1);
  		if (record != NULL)
  		{
  			ereport(DEBUG1,
  					(errmsg("checkpoint record is at %X/%X",
! 				   (uint32) (checkPointLoc >> 32), (uint32) checkPointLoc)));
  		}
  		else if (StandbyMode)
  		{
***************
*** 5413,5424 **** StartupXLOG(void)
  		else
  		{
  			checkPointLoc = ControlFile->prevCheckPoint;
! 			record = ReadCheckpointRecord(checkPointLoc, 2);
  			if (record != NULL)
  			{
  				ereport(LOG,
  						(errmsg("using previous checkpoint record at %X/%X",
! 								(uint32) (checkPointLoc >> 32), (uint32) checkPointLoc)));
  				InRecovery = true;		/* force recovery even if SHUTDOWNED */
  			}
  			else
--- 5005,5016 ----
  		else
  		{
  			checkPointLoc = ControlFile->prevCheckPoint;
! 			record = ReadCheckpointRecord(xlogreader, checkPointLoc, 2);
  			if (record != NULL)
  			{
  				ereport(LOG,
  						(errmsg("using previous checkpoint record at %X/%X",
! 				   (uint32) (checkPointLoc >> 32), (uint32) checkPointLoc)));
  				InRecovery = true;		/* force recovery even if SHUTDOWNED */
  			}
  			else
***************
*** 5433,5439 **** StartupXLOG(void)
  
  	ereport(DEBUG1,
  			(errmsg("redo record is at %X/%X; shutdown %s",
! 					(uint32) (checkPoint.redo >> 32), (uint32) checkPoint.redo,
  					wasShutdown ? "TRUE" : "FALSE")));
  	ereport(DEBUG1,
  			(errmsg("next transaction ID: %u/%u; next OID: %u",
--- 5025,5031 ----
  
  	ereport(DEBUG1,
  			(errmsg("redo record is at %X/%X; shutdown %s",
! 				  (uint32) (checkPoint.redo >> 32), (uint32) checkPoint.redo,
  					wasShutdown ? "TRUE" : "FALSE")));
  	ereport(DEBUG1,
  			(errmsg("next transaction ID: %u/%u; next OID: %u",
***************
*** 5714,5720 **** StartupXLOG(void)
  		 * Allow read-only connections immediately if we're consistent
  		 * already.
  		 */
! 		CheckRecoveryConsistency();
  
  		/*
  		 * Find the first record that logically follows the checkpoint --- it
--- 5306,5312 ----
  		 * Allow read-only connections immediately if we're consistent
  		 * already.
  		 */
! 		CheckRecoveryConsistency(EndRecPtr);
  
  		/*
  		 * Find the first record that logically follows the checkpoint --- it
***************
*** 5723,5734 **** StartupXLOG(void)
  		if (XLByteLT(checkPoint.redo, RecPtr))
  		{
  			/* back up to find the record */
! 			record = ReadRecord(&(checkPoint.redo), PANIC, false);
  		}
  		else
  		{
  			/* just have to read next record after CheckPoint */
! 			record = ReadRecord(NULL, LOG, false);
  		}
  
  		if (record != NULL)
--- 5315,5326 ----
  		if (XLByteLT(checkPoint.redo, RecPtr))
  		{
  			/* back up to find the record */
! 			record = ReadRecord(xlogreader, checkPoint.redo, PANIC, false);
  		}
  		else
  		{
  			/* just have to read next record after CheckPoint */
! 			record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
  		}
  
  		if (record != NULL)
***************
*** 5743,5749 **** StartupXLOG(void)
  
  			ereport(LOG,
  					(errmsg("redo starts at %X/%X",
! 							(uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr)));
  
  			/*
  			 * main redo apply loop
--- 5335,5341 ----
  
  			ereport(LOG,
  					(errmsg("redo starts at %X/%X",
! 						 (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr)));
  
  			/*
  			 * main redo apply loop
***************
*** 5759,5766 **** StartupXLOG(void)
  
  					initStringInfo(&buf);
  					appendStringInfo(&buf, "REDO @ %X/%X; LSN %X/%X: ",
! 									 (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr,
! 									 (uint32) (EndRecPtr >> 32), (uint32) EndRecPtr);
  					xlog_outrec(&buf, record);
  					appendStringInfo(&buf, " - ");
  					RmgrTable[record->xl_rmid].rm_desc(&buf,
--- 5351,5358 ----
  
  					initStringInfo(&buf);
  					appendStringInfo(&buf, "REDO @ %X/%X; LSN %X/%X: ",
! 							(uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr,
! 							 (uint32) (EndRecPtr >> 32), (uint32) EndRecPtr);
  					xlog_outrec(&buf, record);
  					appendStringInfo(&buf, " - ");
  					RmgrTable[record->xl_rmid].rm_desc(&buf,
***************
*** 5775,5781 **** StartupXLOG(void)
  				HandleStartupProcInterrupts();
  
  				/* Allow read-only connections if we're consistent now */
! 				CheckRecoveryConsistency();
  
  				/*
  				 * Have we reached our recovery target?
--- 5367,5373 ----
  				HandleStartupProcInterrupts();
  
  				/* Allow read-only connections if we're consistent now */
! 				CheckRecoveryConsistency(EndRecPtr);
  
  				/*
  				 * Have we reached our recovery target?
***************
*** 5879,5885 **** StartupXLOG(void)
  
  				LastRec = ReadRecPtr;
  
! 				record = ReadRecord(NULL, LOG, false);
  			} while (record != NULL && recoveryContinue);
  
  			/*
--- 5471,5477 ----
  
  				LastRec = ReadRecPtr;
  
! 				record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
  			} while (record != NULL && recoveryContinue);
  
  			/*
***************
*** 5888,5894 **** StartupXLOG(void)
  
  			ereport(LOG,
  					(errmsg("redo done at %X/%X",
! 							(uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr)));
  			xtime = GetLatestXTime();
  			if (xtime)
  				ereport(LOG,
--- 5480,5486 ----
  
  			ereport(LOG,
  					(errmsg("redo done at %X/%X",
! 						 (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr)));
  			xtime = GetLatestXTime();
  			if (xtime)
  				ereport(LOG,
***************
*** 5929,5935 **** StartupXLOG(void)
  	 * Re-fetch the last valid or last applied record, so we can identify the
  	 * exact endpoint of what we consider the valid portion of WAL.
  	 */
! 	record = ReadRecord(&LastRec, PANIC, false);
  	EndOfLog = EndRecPtr;
  	XLByteToPrevSeg(EndOfLog, endLogSegNo);
  
--- 5521,5527 ----
  	 * Re-fetch the last valid or last applied record, so we can identify the
  	 * exact endpoint of what we consider the valid portion of WAL.
  	 */
! 	record = ReadRecord(xlogreader, LastRec, PANIC, false);
  	EndOfLog = EndRecPtr;
  	XLByteToPrevSeg(EndOfLog, endLogSegNo);
  
***************
*** 5992,5998 **** StartupXLOG(void)
  	 */
  	if (InArchiveRecovery)
  	{
! 		char	reason[200];
  
  		ThisTimeLineID = findNewestTimeLine(recoveryTargetTLI) + 1;
  		ereport(LOG,
--- 5584,5590 ----
  	 */
  	if (InArchiveRecovery)
  	{
! 		char		reason[200];
  
  		ThisTimeLineID = findNewestTimeLine(recoveryTargetTLI) + 1;
  		ereport(LOG,
***************
*** 6033,6039 **** StartupXLOG(void)
  	 * we will use that below.)
  	 */
  	if (InArchiveRecovery)
! 		exitArchiveRecovery(curFileTLI, endLogSegNo);
  
  	/*
  	 * Prepare to write WAL starting at EndOfLog position, and init xlog
--- 5625,5631 ----
  	 * we will use that below.)
  	 */
  	if (InArchiveRecovery)
! 		exitArchiveRecovery(private, curFileTLI, endLogSegNo);
  
  	/*
  	 * Prepare to write WAL starting at EndOfLog position, and init xlog
***************
*** 6052,6059 **** StartupXLOG(void)
  	 * record spans, not the one it starts in.	The last block is indeed the
  	 * one we want to use.
  	 */
! 	Assert(readOff == (XLogCtl->xlblocks[0] - XLOG_BLCKSZ) % XLogSegSize);
! 	memcpy((char *) Insert->currpage, readBuf, XLOG_BLCKSZ);
  	Insert->currpos = (char *) Insert->currpage +
  		(EndOfLog + XLOG_BLCKSZ - XLogCtl->xlblocks[0]);
  
--- 5644,5658 ----
  	 * record spans, not the one it starts in.	The last block is indeed the
  	 * one we want to use.
  	 */
! 	if (EndOfLog % XLOG_BLCKSZ == 0)
! 	{
! 		memset(Insert->currpage, 0, XLOG_BLCKSZ);
! 	}
! 	else
! 	{
! 		Assert(private->readOff == (XLogCtl->xlblocks[0] - XLOG_BLCKSZ) % XLogSegSize);
! 		memcpy((char *) Insert->currpage, xlogreader->readBuf, XLOG_BLCKSZ);
! 	}
  	Insert->currpos = (char *) Insert->currpage +
  		(EndOfLog + XLOG_BLCKSZ - XLogCtl->xlblocks[0]);
  
***************
*** 6205,6226 **** StartupXLOG(void)
  		ShutdownRecoveryTransactionEnvironment();
  
  	/* Shut down readFile facility, free space */
! 	if (readFile >= 0)
! 	{
! 		close(readFile);
! 		readFile = -1;
! 	}
! 	if (readBuf)
  	{
! 		free(readBuf);
! 		readBuf = NULL;
! 	}
! 	if (readRecordBuf)
! 	{
! 		free(readRecordBuf);
! 		readRecordBuf = NULL;
! 		readRecordBufSize = 0;
  	}
  
  	/*
  	 * If any of the critical GUCs have changed, log them before we allow
--- 5804,5818 ----
  		ShutdownRecoveryTransactionEnvironment();
  
  	/* Shut down readFile facility, free space */
! 	private = (XLogPageReadPrivate *) xlogreader->private_data;
! 	if (private->readFile >= 0)
  	{
! 		close(private->readFile);
! 		private->readFile = -1;
  	}
+ 	if (xlogreader->private_data)
+ 		free(xlogreader->private_data);
+ 	XLogReaderFree(xlogreader);
  
  	/*
  	 * If any of the critical GUCs have changed, log them before we allow
***************
*** 6251,6257 **** StartupXLOG(void)
   * that it can start accepting read-only connections.
   */
  static void
! CheckRecoveryConsistency(void)
  {
  	/*
  	 * During crash recovery, we don't reach a consistent state until we've
--- 5843,5849 ----
   * that it can start accepting read-only connections.
   */
  static void
! CheckRecoveryConsistency(XLogRecPtr EndRecPtr)
  {
  	/*
  	 * During crash recovery, we don't reach a consistent state until we've
***************
*** 6431,6437 **** LocalSetXLogInsertAllowed(void)
   * 1 for "primary", 2 for "secondary", 0 for "other" (backup_label)
   */
  static XLogRecord *
! ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt)
  {
  	XLogRecord *record;
  
--- 6023,6029 ----
   * 1 for "primary", 2 for "secondary", 0 for "other" (backup_label)
   */
  static XLogRecord *
! ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int whichChkpt)
  {
  	XLogRecord *record;
  
***************
*** 6455,6461 **** ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt)
  		return NULL;
  	}
  
! 	record = ReadRecord(&RecPtr, LOG, true);
  
  	if (record == NULL)
  	{
--- 6047,6053 ----
  		return NULL;
  	}
  
! 	record = ReadRecord(xlogreader, RecPtr, LOG, true);
  
  	if (record == NULL)
  	{
***************
*** 6683,6689 **** GetRecoveryTargetTLI(void)
  {
  	/* use volatile pointer to prevent code rearrangement */
  	volatile XLogCtlData *xlogctl = XLogCtl;
! 	TimeLineID result;
  
  	SpinLockAcquire(&xlogctl->info_lck);
  	result = xlogctl->RecoveryTargetTLI;
--- 6275,6281 ----
  {
  	/* use volatile pointer to prevent code rearrangement */
  	volatile XLogCtlData *xlogctl = XLogCtl;
! 	TimeLineID	result;
  
  	SpinLockAcquire(&xlogctl->info_lck);
  	result = xlogctl->RecoveryTargetTLI;
***************
*** 6968,6974 **** CreateCheckPoint(int flags)
  		XLogRecPtr	curInsert;
  
  		INSERT_RECPTR(curInsert, Insert, Insert->curridx);
! 		if (curInsert == ControlFile->checkPoint + 
  			MAXALIGN(SizeOfXLogRecord + sizeof(CheckPoint)) &&
  			ControlFile->checkPoint == ControlFile->checkPointCopy.redo)
  		{
--- 6560,6566 ----
  		XLogRecPtr	curInsert;
  
  		INSERT_RECPTR(curInsert, Insert, Insert->curridx);
! 		if (curInsert == ControlFile->checkPoint +
  			MAXALIGN(SizeOfXLogRecord + sizeof(CheckPoint)) &&
  			ControlFile->checkPoint == ControlFile->checkPointCopy.redo)
  		{
***************
*** 7398,7404 **** CreateRestartPoint(int flags)
  	{
  		ereport(DEBUG2,
  				(errmsg("skipping restartpoint, already performed at %X/%X",
! 						(uint32) (lastCheckPoint.redo >> 32), (uint32) lastCheckPoint.redo)));
  
  		UpdateMinRecoveryPoint(InvalidXLogRecPtr, true);
  		if (flags & CHECKPOINT_IS_SHUTDOWN)
--- 6990,6996 ----
  	{
  		ereport(DEBUG2,
  				(errmsg("skipping restartpoint, already performed at %X/%X",
! 		(uint32) (lastCheckPoint.redo >> 32), (uint32) lastCheckPoint.redo)));
  
  		UpdateMinRecoveryPoint(InvalidXLogRecPtr, true);
  		if (flags & CHECKPOINT_IS_SHUTDOWN)
***************
*** 7508,7514 **** CreateRestartPoint(int flags)
  	xtime = GetLatestXTime();
  	ereport((log_checkpoints ? LOG : DEBUG2),
  			(errmsg("recovery restart point at %X/%X",
! 					(uint32) (lastCheckPoint.redo >> 32), (uint32) lastCheckPoint.redo),
  		   xtime ? errdetail("last completed transaction was at log time %s",
  							 timestamptz_to_str(xtime)) : 0));
  
--- 7100,7106 ----
  	xtime = GetLatestXTime();
  	ereport((log_checkpoints ? LOG : DEBUG2),
  			(errmsg("recovery restart point at %X/%X",
! 		 (uint32) (lastCheckPoint.redo >> 32), (uint32) lastCheckPoint.redo),
  		   xtime ? errdetail("last completed transaction was at log time %s",
  							 timestamptz_to_str(xtime)) : 0));
  
***************
*** 8033,8039 **** xlog_desc(StringInfo buf, uint8 xl_info, char *rec)
  		appendStringInfo(buf, "checkpoint: redo %X/%X; "
  				   "tli %u; fpw %s; xid %u/%u; oid %u; multi %u; offset %u; "
  						 "oldest xid %u in DB %u; oldest running xid %u; %s",
! 						 (uint32) (checkpoint->redo >> 32), (uint32) checkpoint->redo,
  						 checkpoint->ThisTimeLineID,
  						 checkpoint->fullPageWrites ? "true" : "false",
  						 checkpoint->nextXidEpoch, checkpoint->nextXid,
--- 7625,7631 ----
  		appendStringInfo(buf, "checkpoint: redo %X/%X; "
  				   "tli %u; fpw %s; xid %u/%u; oid %u; multi %u; offset %u; "
  						 "oldest xid %u in DB %u; oldest running xid %u; %s",
! 				(uint32) (checkpoint->redo >> 32), (uint32) checkpoint->redo,
  						 checkpoint->ThisTimeLineID,
  						 checkpoint->fullPageWrites ? "true" : "false",
  						 checkpoint->nextXidEpoch, checkpoint->nextXid,
***************
*** 8214,8220 **** assign_xlog_sync_method(int new_sync_method, void *extra)
  				ereport(PANIC,
  						(errcode_for_file_access(),
  						 errmsg("could not fsync log segment %s: %m",
! 								XLogFileNameP(ThisTimeLineID, openLogSegNo))));
  			if (get_sync_bit(sync_method) != get_sync_bit(new_sync_method))
  				XLogFileClose();
  		}
--- 7806,7812 ----
  				ereport(PANIC,
  						(errcode_for_file_access(),
  						 errmsg("could not fsync log segment %s: %m",
! 							  XLogFileNameP(ThisTimeLineID, openLogSegNo))));
  			if (get_sync_bit(sync_method) != get_sync_bit(new_sync_method))
  				XLogFileClose();
  		}
***************
*** 8245,8252 **** issue_xlog_fsync(int fd, XLogSegNo segno)
  			if (pg_fsync_writethrough(fd) != 0)
  				ereport(PANIC,
  						(errcode_for_file_access(),
! 						 errmsg("could not fsync write-through log file %s: %m",
! 								XLogFileNameP(ThisTimeLineID, segno))));
  			break;
  #endif
  #ifdef HAVE_FDATASYNC
--- 7837,7844 ----
  			if (pg_fsync_writethrough(fd) != 0)
  				ereport(PANIC,
  						(errcode_for_file_access(),
! 					  errmsg("could not fsync write-through log file %s: %m",
! 							 XLogFileNameP(ThisTimeLineID, segno))));
  			break;
  #endif
  #ifdef HAVE_FDATASYNC
***************
*** 8275,8280 **** char *
--- 7867,7873 ----
  XLogFileNameP(TimeLineID tli, XLogSegNo segno)
  {
  	char	   *result = palloc(MAXFNAMELEN);
+ 
  	XLogFileName(result, tli, segno);
  	return result;
  }
***************
*** 8520,8528 **** do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile)
  					"%Y-%m-%d %H:%M:%S %Z",
  					pg_localtime(&stamp_time, log_timezone));
  		appendStringInfo(&labelfbuf, "START WAL LOCATION: %X/%X (file %s)\n",
! 						 (uint32) (startpoint >> 32), (uint32) startpoint, xlogfilename);
  		appendStringInfo(&labelfbuf, "CHECKPOINT LOCATION: %X/%X\n",
! 						 (uint32) (checkpointloc >> 32), (uint32) checkpointloc);
  		appendStringInfo(&labelfbuf, "BACKUP METHOD: %s\n",
  						 exclusive ? "pg_start_backup" : "streamed");
  		appendStringInfo(&labelfbuf, "BACKUP FROM: %s\n",
--- 8113,8121 ----
  					"%Y-%m-%d %H:%M:%S %Z",
  					pg_localtime(&stamp_time, log_timezone));
  		appendStringInfo(&labelfbuf, "START WAL LOCATION: %X/%X (file %s)\n",
! 			 (uint32) (startpoint >> 32), (uint32) startpoint, xlogfilename);
  		appendStringInfo(&labelfbuf, "CHECKPOINT LOCATION: %X/%X\n",
! 					 (uint32) (checkpointloc >> 32), (uint32) checkpointloc);
  		appendStringInfo(&labelfbuf, "BACKUP METHOD: %s\n",
  						 exclusive ? "pg_start_backup" : "streamed");
  		appendStringInfo(&labelfbuf, "BACKUP FROM: %s\n",
***************
*** 8870,8876 **** do_pg_stop_backup(char *labelfile, bool waitforarchive)
  				 errmsg("could not create file \"%s\": %m",
  						histfilepath)));
  	fprintf(fp, "START WAL LOCATION: %X/%X (file %s)\n",
! 			(uint32) (startpoint >> 32), (uint32) startpoint, startxlogfilename);
  	fprintf(fp, "STOP WAL LOCATION: %X/%X (file %s)\n",
  			(uint32) (stoppoint >> 32), (uint32) stoppoint, stopxlogfilename);
  	/* transfer remaining lines from label to history file */
--- 8463,8469 ----
  				 errmsg("could not create file \"%s\": %m",
  						histfilepath)));
  	fprintf(fp, "START WAL LOCATION: %X/%X (file %s)\n",
! 		(uint32) (startpoint >> 32), (uint32) startpoint, startxlogfilename);
  	fprintf(fp, "STOP WAL LOCATION: %X/%X (file %s)\n",
  			(uint32) (stoppoint >> 32), (uint32) stoppoint, stopxlogfilename);
  	/* transfer remaining lines from label to history file */
***************
*** 9261,9287 **** CancelBackup(void)
   * sleep and retry.
   */
  static bool
! XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
! 			 bool randAccess)
  {
  	uint32		targetPageOff;
  	uint32		targetRecOff;
  	XLogSegNo	targetSegNo;
  
! 	XLByteToSeg(*RecPtr, targetSegNo);
! 	targetPageOff = (((*RecPtr) % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
! 	targetRecOff = (*RecPtr) % XLOG_BLCKSZ;
  
  	/* Fast exit if we have read the record in the current buffer already */
! 	if (failedSources == 0 && targetSegNo == readSegNo &&
! 		targetPageOff == readOff && targetRecOff < readLen)
  		return true;
  
  	/*
  	 * See if we need to switch to a new segment because the requested record
  	 * is not in the currently open one.
  	 */
! 	if (readFile >= 0 && !XLByteInSeg(*RecPtr, readSegNo))
  	{
  		/*
  		 * Request a restartpoint if we've replayed too much xlog since the
--- 8854,8881 ----
   * sleep and retry.
   */
  static bool
! XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
! 			 bool randAccess, char *readBuf, void *private_data)
  {
+ 	XLogPageReadPrivate *private = (XLogPageReadPrivate *) private_data;
  	uint32		targetPageOff;
  	uint32		targetRecOff;
  	XLogSegNo	targetSegNo;
  
! 	XLByteToSeg(RecPtr, targetSegNo);
! 	targetPageOff = ((RecPtr % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
! 	targetRecOff = RecPtr % XLOG_BLCKSZ;
  
  	/* Fast exit if we have read the record in the current buffer already */
! 	if (private->failedSources == 0 && targetSegNo == private->readSegNo &&
! 		targetPageOff == private->readOff && targetRecOff < private->readLen)
  		return true;
  
  	/*
  	 * See if we need to switch to a new segment because the requested record
  	 * is not in the currently open one.
  	 */
! 	if (private->readFile >= 0 && !XLByteInSeg(RecPtr, private->readSegNo))
  	{
  		/*
  		 * Request a restartpoint if we've replayed too much xlog since the
***************
*** 9289,9324 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
  		 */
  		if (StandbyMode && bgwriterLaunched)
  		{
! 			if (XLogCheckpointNeeded(readSegNo))
  			{
  				(void) GetRedoRecPtr();
! 				if (XLogCheckpointNeeded(readSegNo))
  					RequestCheckpoint(CHECKPOINT_CAUSE_XLOG);
  			}
  		}
  
! 		close(readFile);
! 		readFile = -1;
! 		readSource = 0;
  	}
  
! 	XLByteToSeg(*RecPtr, readSegNo);
  
  retry:
  	/* See if we need to retrieve more data */
! 	if (readFile < 0 ||
! 		(readSource == XLOG_FROM_STREAM && !XLByteLT(*RecPtr, receivedUpto)))
  	{
  		if (StandbyMode)
  		{
! 			if (!WaitForWALToBecomeAvailable(*RecPtr, randAccess,
! 											 fetching_ckpt))
  				goto triggered;
  		}
  		else
  		{
  			/* In archive or crash recovery. */
! 			if (readFile < 0)
  			{
  				int			sources;
  
--- 8883,8918 ----
  		 */
  		if (StandbyMode && bgwriterLaunched)
  		{
! 			if (XLogCheckpointNeeded(private->readSegNo))
  			{
  				(void) GetRedoRecPtr();
! 				if (XLogCheckpointNeeded(private->readSegNo))
  					RequestCheckpoint(CHECKPOINT_CAUSE_XLOG);
  			}
  		}
  
! 		close(private->readFile);
! 		private->readFile = -1;
! 		private->readSource = 0;
  	}
  
! 	XLByteToSeg(RecPtr, private->readSegNo);
  
  retry:
  	/* See if we need to retrieve more data */
! 	if (private->readFile < 0 ||
! 		(private->readSource == XLOG_FROM_STREAM &&
! 		 !XLByteLT(RecPtr, receivedUpto)))
  	{
  		if (StandbyMode)
  		{
! 			if (!WaitForWALToBecomeAvailable(private, RecPtr, randAccess))
  				goto triggered;
  		}
  		else
  		{
  			/* In archive or crash recovery. */
! 			if (private->readFile < 0)
  			{
  				int			sources;
  
***************
*** 9330,9337 **** retry:
  				if (InArchiveRecovery)
  					sources |= XLOG_FROM_ARCHIVE;
  
! 				readFile = XLogFileReadAnyTLI(readSegNo, emode, sources);
! 				if (readFile < 0)
  					return false;
  			}
  		}
--- 8924,8933 ----
  				if (InArchiveRecovery)
  					sources |= XLOG_FROM_ARCHIVE;
  
! 				private->readFile =
! 					XLogFileReadAnyTLI(private, private->readSegNo, emode,
! 									   sources);
! 				if (private->readFile < 0)
  					return false;
  			}
  		}
***************
*** 9341,9347 **** retry:
  	 * At this point, we have the right segment open and if we're streaming we
  	 * know the requested record is in it.
  	 */
! 	Assert(readFile != -1);
  
  	/*
  	 * If the current segment is being streamed from master, calculate how
--- 8937,8943 ----
  	 * At this point, we have the right segment open and if we're streaming we
  	 * know the requested record is in it.
  	 */
! 	Assert(private->readFile != -1);
  
  	/*
  	 * If the current segment is being streamed from master, calculate how
***************
*** 9349,9367 **** retry:
  	 * requested record has been received, but this is for the benefit of
  	 * future calls, to allow quick exit at the top of this function.
  	 */
! 	if (readSource == XLOG_FROM_STREAM)
  	{
! 		if (((*RecPtr) / XLOG_BLCKSZ) != (receivedUpto / XLOG_BLCKSZ))
  		{
! 			readLen = XLOG_BLCKSZ;
  		}
  		else
! 			readLen = receivedUpto % XLogSegSize - targetPageOff;
  	}
  	else
! 		readLen = XLOG_BLCKSZ;
  
! 	if (!readFileHeaderValidated && targetPageOff != 0)
  	{
  		/*
  		 * Whenever switching to a new WAL segment, we read the first page of
--- 8945,8963 ----
  	 * requested record has been received, but this is for the benefit of
  	 * future calls, to allow quick exit at the top of this function.
  	 */
! 	if (private->readSource == XLOG_FROM_STREAM)
  	{
! 		if (((RecPtr) / XLOG_BLCKSZ) != (receivedUpto / XLOG_BLCKSZ))
  		{
! 			private->readLen = XLOG_BLCKSZ;
  		}
  		else
! 			private->readLen = receivedUpto % XLogSegSize - targetPageOff;
  	}
  	else
! 		private->readLen = XLOG_BLCKSZ;
  
! 	if (!private->readFileHeaderValidated && targetPageOff != 0)
  	{
  		/*
  		 * Whenever switching to a new WAL segment, we read the first page of
***************
*** 9370,9431 **** retry:
  		 * identification info that is present in the first page's "long"
  		 * header.
  		 */
! 		readOff = 0;
! 		if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
  		{
! 			char fname[MAXFNAMELEN];
! 			XLogFileName(fname, curFileTLI, readSegNo);
! 			ereport(emode_for_corrupt_record(emode, *RecPtr),
  					(errcode_for_file_access(),
! 					 errmsg("could not read from log segment %s, offset %u: %m",
! 							fname, readOff)));
  			goto next_record_is_invalid;
  		}
! 		if (!ValidXLogPageHeader((XLogPageHeader) readBuf, emode, true))
  			goto next_record_is_invalid;
  	}
  
  	/* Read the requested page */
! 	readOff = targetPageOff;
! 	if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
  	{
! 		char fname[MAXFNAMELEN];
! 		XLogFileName(fname, curFileTLI, readSegNo);
! 		ereport(emode_for_corrupt_record(emode, *RecPtr),
  				(errcode_for_file_access(),
! 		 errmsg("could not seek in log segment %s to offset %u: %m",
! 				fname, readOff)));
  		goto next_record_is_invalid;
  	}
! 	if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
  	{
! 		char fname[MAXFNAMELEN];
! 		XLogFileName(fname, curFileTLI, readSegNo);
! 		ereport(emode_for_corrupt_record(emode, *RecPtr),
  				(errcode_for_file_access(),
! 		 errmsg("could not read from log segment %s, offset %u: %m",
! 				fname, readOff)));
  		goto next_record_is_invalid;
  	}
! 	if (!ValidXLogPageHeader((XLogPageHeader) readBuf, emode, false))
  		goto next_record_is_invalid;
  
! 	readFileHeaderValidated = true;
  
! 	Assert(targetSegNo == readSegNo);
! 	Assert(targetPageOff == readOff);
! 	Assert(targetRecOff < readLen);
  
  	return true;
  
  next_record_is_invalid:
! 	failedSources |= readSource;
  
! 	if (readFile >= 0)
! 		close(readFile);
! 	readFile = -1;
! 	readLen = 0;
! 	readSource = 0;
  
  	/* In standby-mode, keep trying */
  	if (StandbyMode)
--- 8966,9034 ----
  		 * identification info that is present in the first page's "long"
  		 * header.
  		 */
! 		private->readOff = 0;
! 		if (read(private->readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
  		{
! 			char		fname[MAXFNAMELEN];
! 
! 			XLogFileName(fname, curFileTLI, private->readSegNo);
! 			ereport(emode_for_corrupt_record(emode, private->readSource, RecPtr),
  					(errcode_for_file_access(),
! 				  errmsg("could not read from log segment %s, offset %u: %m",
! 						 fname, private->readOff)));
  			goto next_record_is_invalid;
  		}
! 		if (!ValidXLogPageHeader(private->readSegNo, private->readOff,
! 							   private->readSource, (XLogPageHeader) readBuf,
! 								 emode, true))
  			goto next_record_is_invalid;
  	}
  
  	/* Read the requested page */
! 	private->readOff = targetPageOff;
! 	if (lseek(private->readFile, (off_t) private->readOff, SEEK_SET) < 0)
  	{
! 		char		fname[MAXFNAMELEN];
! 
! 		XLogFileName(fname, curFileTLI, private->readSegNo);
! 		ereport(emode_for_corrupt_record(emode, private->readSource, RecPtr),
  				(errcode_for_file_access(),
! 				 errmsg("could not seek in log segment %s to offset %u: %m",
! 						fname, private->readOff)));
  		goto next_record_is_invalid;
  	}
! 	if (read(private->readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
  	{
! 		char		fname[MAXFNAMELEN];
! 
! 		XLogFileName(fname, curFileTLI, private->readSegNo);
! 		ereport(emode_for_corrupt_record(emode, private->readSource, RecPtr),
  				(errcode_for_file_access(),
! 				 errmsg("could not read from log segment %s, offset %u: %m",
! 						fname, private->readOff)));
  		goto next_record_is_invalid;
  	}
! 	if (!ValidXLogPageHeader(private->readSegNo, private->readOff,
! 							 private->readSource, (XLogPageHeader) readBuf,
! 							 emode, false))
  		goto next_record_is_invalid;
  
! 	private->readFileHeaderValidated = true;
  
! 	Assert(targetSegNo == private->readSegNo);
! 	Assert(targetPageOff == private->readOff);
! 	Assert(targetRecOff < private->readLen);
  
  	return true;
  
  next_record_is_invalid:
! 	private->failedSources |= private->readSource;
  
! 	if (private->readFile >= 0)
! 		close(private->readFile);
! 	private->readFile = -1;
! 	private->readLen = 0;
! 	private->readSource = 0;
  
  	/* In standby-mode, keep trying */
  	if (StandbyMode)
***************
*** 9434,9444 **** next_record_is_invalid:
  		return false;
  
  triggered:
! 	if (readFile >= 0)
! 		close(readFile);
! 	readFile = -1;
! 	readLen = 0;
! 	readSource = 0;
  
  	return false;
  }
--- 9037,9047 ----
  		return false;
  
  triggered:
! 	if (private->readFile >= 0)
! 		close(private->readFile);
! 	private->readFile = -1;
! 	private->readLen = 0;
! 	private->readSource = 0;
  
  	return false;
  }
***************
*** 9455,9462 **** triggered:
   * false.
   */
  static bool
! WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
! 							bool fetching_ckpt)
  {
  	static pg_time_t last_fail_time = 0;
  
--- 9058,9065 ----
   * false.
   */
  static bool
! WaitForWALToBecomeAvailable(XLogPageReadPrivate *private, XLogRecPtr RecPtr,
! 							bool randAccess)
  {
  	static pg_time_t last_fail_time = 0;
  
***************
*** 9475,9481 **** WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
  			 * the archive should be identical to what was streamed, so it's
  			 * unlikely that it helps, but one can hope...
  			 */
! 			if (failedSources & XLOG_FROM_STREAM)
  			{
  				ShutdownWalRcv();
  				continue;
--- 9078,9084 ----
  			 * the archive should be identical to what was streamed, so it's
  			 * unlikely that it helps, but one can hope...
  			 */
! 			if (private->failedSources & XLOG_FROM_STREAM)
  			{
  				ShutdownWalRcv();
  				continue;
***************
*** 9514,9534 **** WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
  			if (havedata)
  			{
  				/*
! 				 * Great, streamed far enough.  Open the file if it's not open
  				 * already.  Use XLOG_FROM_STREAM so that source info is set
  				 * correctly and XLogReceiptTime isn't changed.
  				 */
! 				if (readFile < 0)
  				{
! 					readFile = XLogFileRead(readSegNo, PANIC,
! 											recoveryTargetTLI,
! 											XLOG_FROM_STREAM, false);
! 					Assert(readFile >= 0);
  				}
  				else
  				{
  					/* just make sure source info is correct... */
! 					readSource = XLOG_FROM_STREAM;
  					XLogReceiptSource = XLOG_FROM_STREAM;
  				}
  				break;
--- 9117,9138 ----
  			if (havedata)
  			{
  				/*
! 				 * Great, streamed far enough.	Open the file if it's not open
  				 * already.  Use XLOG_FROM_STREAM so that source info is set
  				 * correctly and XLogReceiptTime isn't changed.
  				 */
! 				if (private->readFile < 0)
  				{
! 					private->readFile =
! 						XLogFileRead(private, private->readSegNo, PANIC,
! 									 recoveryTargetTLI,
! 									 XLOG_FROM_STREAM, false);
! 					Assert(private->readFile >= 0);
  				}
  				else
  				{
  					/* just make sure source info is correct... */
! 					private->readSource = XLOG_FROM_STREAM;
  					XLogReceiptSource = XLOG_FROM_STREAM;
  				}
  				break;
***************
*** 9557,9566 **** WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
  			int			sources;
  			pg_time_t	now;
  
! 			if (readFile >= 0)
  			{
! 				close(readFile);
! 				readFile = -1;
  			}
  			/* Reset curFileTLI if random fetch. */
  			if (randAccess)
--- 9161,9170 ----
  			int			sources;
  			pg_time_t	now;
  
! 			if (private->readFile >= 0)
  			{
! 				close(private->readFile);
! 				private->readFile = -1;
  			}
  			/* Reset curFileTLI if random fetch. */
  			if (randAccess)
***************
*** 9571,9582 **** WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
  			 * from pg_xlog.
  			 */
  			sources = XLOG_FROM_ARCHIVE | XLOG_FROM_PG_XLOG;
! 			if (!(sources & ~failedSources))
  			{
  				/*
  				 * We've exhausted all options for retrieving the file. Retry.
  				 */
! 				failedSources = 0;
  
  				/*
  				 * Before we sleep, re-scan for possible new timelines if we
--- 9175,9186 ----
  			 * from pg_xlog.
  			 */
  			sources = XLOG_FROM_ARCHIVE | XLOG_FROM_PG_XLOG;
! 			if (!(sources & ~private->failedSources))
  			{
  				/*
  				 * We've exhausted all options for retrieving the file. Retry.
  				 */
! 				private->failedSources = 0;
  
  				/*
  				 * Before we sleep, re-scan for possible new timelines if we
***************
*** 9605,9634 **** WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
  				 * stream the missing WAL, before retrying to restore from
  				 * archive/pg_xlog.
  				 *
! 				 * If fetching_ckpt is TRUE, RecPtr points to the initial
! 				 * checkpoint location. In that case, we use RedoStartLSN as
! 				 * the streaming start position instead of RecPtr, so that
! 				 * when we later jump backwards to start redo at RedoStartLSN,
! 				 * we will have the logs streamed already.
  				 */
  				if (PrimaryConnInfo)
  				{
! 					XLogRecPtr ptr = fetching_ckpt ? RedoStartLSN : RecPtr;
  
  					RequestXLogStreaming(ptr, PrimaryConnInfo);
  					continue;
  				}
  			}
  			/* Don't try to read from a source that just failed */
! 			sources &= ~failedSources;
! 			readFile = XLogFileReadAnyTLI(readSegNo, DEBUG2, sources);
! 			if (readFile >= 0)
  				break;
  
  			/*
  			 * Nope, not found in archive and/or pg_xlog.
  			 */
! 			failedSources |= sources;
  
  			/*
  			 * Check to see if the trigger file exists. Note that we do this
--- 9209,9240 ----
  				 * stream the missing WAL, before retrying to restore from
  				 * archive/pg_xlog.
  				 *
! 				 * If we're fetching a checkpoint record, RecPtr points to the
! 				 * initial checkpoint location. In that case, we use
! 				 * RedoStartLSN as the streaming start position instead of
! 				 * RecPtr, so that when we later jump backwards to start redo
! 				 * at RedoStartLSN, we will have the logs streamed already.
  				 */
  				if (PrimaryConnInfo)
  				{
! 					XLogRecPtr	ptr = private->fetching_ckpt ?
! 					RedoStartLSN : RecPtr;
  
  					RequestXLogStreaming(ptr, PrimaryConnInfo);
  					continue;
  				}
  			}
  			/* Don't try to read from a source that just failed */
! 			sources &= ~private->failedSources;
! 			private->readFile = XLogFileReadAnyTLI(private, private->readSegNo,
! 												   DEBUG2, sources);
! 			if (private->readFile >= 0)
  				break;
  
  			/*
  			 * Nope, not found in archive and/or pg_xlog.
  			 */
! 			private->failedSources |= sources;
  
  			/*
  			 * Check to see if the trigger file exists. Note that we do this
***************
*** 9668,9679 **** WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
   * you are about to ereport(), or you might cause a later message to be
   * erroneously suppressed.
   */
! static int
! emode_for_corrupt_record(int emode, XLogRecPtr RecPtr)
  {
  	static XLogRecPtr lastComplaint = 0;
  
! 	if (readSource == XLOG_FROM_PG_XLOG && emode == LOG)
  	{
  		if (XLByteEQ(RecPtr, lastComplaint))
  			emode = DEBUG1;
--- 9274,9285 ----
   * you are about to ereport(), or you might cause a later message to be
   * erroneously suppressed.
   */
! int
! emode_for_corrupt_record(int emode, int source, XLogRecPtr RecPtr)
  {
  	static XLogRecPtr lastComplaint = 0;
  
! 	if (source == XLOG_FROM_PG_XLOG && emode == LOG)
  	{
  		if (XLByteEQ(RecPtr, lastComplaint))
  			emode = DEBUG1;
*** /dev/null
--- b/src/backend/access/transam/xlogreader.c
***************
*** 0 ****
--- 1,542 ----
+ /*-------------------------------------------------------------------------
+  *
+  * xlogreader.c
+  *		Generic xlog reading facility
+  *
+  * Portions Copyright (c) 2012, PostgreSQL Global Development Group
+  *
+  * IDENTIFICATION
+  *		src/backend/access/transam/xlogreader.c
+  *
+  * NOTES
+  *		Documentation about how do use this interface can be found in
+  *		xlogreader.h, more specifically in the definition of the
+  *		XLogReaderState struct where all parameters are documented.
+  *
+  * TODO:
+  * * usable without backend code around
+  *-------------------------------------------------------------------------
+  */
+ 
+ #include "postgres.h"
+ 
+ #include "access/transam.h"
+ #include "access/xlog_internal.h"
+ #include "access/xlogreader.h"
+ #include "catalog/pg_control.h"
+ 
+ static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
+ static bool ValidXLogRecordHeader(XLogRecPtr RecPtr, XLogRecPtr PrevRecPtr,
+ 					  XLogRecord *record, int emode, bool randAccess);
+ static bool RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode);
+ 
+ /*
+  * Allocate and initialize a new xlog reader
+  *
+  * Returns NULL if the xlogreader couldn't be allocated.
+  */
+ XLogReaderState *
+ XLogReaderAllocate(XLogRecPtr startpoint,
+ 				   XLogPageReadCB pagereadfunc, void *private_data)
+ {
+ 	XLogReaderState *state;
+ 
+ 	state = (XLogReaderState *) malloc(sizeof(XLogReaderState));
+ 	if (!state)
+ 		return NULL;
+ 	MemSet(state, 0, sizeof(XLogReaderState));
+ 
+ 	/*
+ 	 * Permanently allocate readBuf.  We do it this way, rather than just
+ 	 * making a static array, for two reasons: (1) no need to waste the
+ 	 * storage in most instantiations of the backend; (2) a static char array
+ 	 * isn't guaranteed to have any particular alignment, whereas malloc()
+ 	 * will provide MAXALIGN'd storage.
+ 	 */
+ 	state->readBuf = (char *) malloc(XLOG_BLCKSZ);
+ 	if (!state->readBuf)
+ 	{
+ 		pfree(state);
+ 		return NULL;
+ 	}
+ 
+ 	state->read_page = pagereadfunc;
+ 	state->private_data = private_data;
+ 	state->EndRecPtr = startpoint;
+ 
+ 	/*
+ 	 * Allocate an initial readRecordBuf of minimal size, which can later be
+ 	 * enlarged if necessary.
+ 	 */
+ 	if (!allocate_recordbuf(state, 0))
+ 	{
+ 		free(state->readBuf);
+ 		pfree(state);
+ 		return NULL;
+ 	}
+ 
+ 	return state;
+ }
+ 
+ void
+ XLogReaderFree(XLogReaderState *state)
+ {
+ 	if (state->readRecordBuf)
+ 		free(state->readRecordBuf);
+ 	free(state->readBuf);
+ 	pfree(state);
+ }
+ 
+ /*
+  * Allocate readRecordBuf to fit a record of at least the given length.
+  * Returns true if successful, false if out of memory.
+  *
+  * readRecordBufSize is set to the new buffer size.
+  *
+  * To avoid useless small increases, round its size to a multiple of
+  * XLOG_BLCKSZ, and make sure it's at least 4*Max(BLCKSZ, XLOG_BLCKSZ) to start
+  * with.  (That is enough for all "normal" records, but very large commit or
+  * abort records might need more space.)
+  */
+ static bool
+ allocate_recordbuf(XLogReaderState *state, uint32 reclength)
+ {
+ 	uint32		newSize = reclength;
+ 
+ 	newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
+ 	newSize = Max(newSize, 4 * Max(BLCKSZ, XLOG_BLCKSZ));
+ 
+ 	if (state->readRecordBuf)
+ 		free(state->readRecordBuf);
+ 	state->readRecordBuf = (char *) malloc(newSize);
+ 	if (!state->readRecordBuf)
+ 	{
+ 		state->readRecordBufSize = 0;
+ 		return false;
+ 	}
+ 
+ 	state->readRecordBufSize = newSize;
+ 	return true;
+ }
+ 
+ /*
+  * Attempt to read an XLOG record.
+  *
+  * If RecPtr is not NULL, try to read a record at that position.  Otherwise
+  * try to read a record just after the last one previously read.
+  *
+  * If no valid record is available, returns NULL, or fails if emode is PANIC.
+  * (emode must be either PANIC, LOG)
+  *
+  * The record is copied into readRecordBuf, so that on successful return,
+  * the returned record pointer always points there.
+  */
+ XLogRecord *
+ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, int emode)
+ {
+ 	XLogRecord *record;
+ 	XLogRecPtr	tmpRecPtr = state->EndRecPtr;
+ 	bool		randAccess = false;
+ 	uint32		len,
+ 				total_len;
+ 	uint32		targetRecOff;
+ 	uint32		pageHeaderSize;
+ 	bool		gotheader;
+ 
+ 	if (RecPtr == InvalidXLogRecPtr)
+ 	{
+ 		RecPtr = tmpRecPtr;
+ 
+ 		/*
+ 		 * RecPtr is pointing to end+1 of the previous WAL record.	If we're
+ 		 * at a page boundary, no more records can fit on the current page. We
+ 		 * must skip over the page header, but we can't do that until we've
+ 		 * read in the page, since the header size is variable.
+ 		 */
+ 	}
+ 	else
+ 	{
+ 		/*
+ 		 * In this case, the passed-in record pointer should already be
+ 		 * pointing to a valid record starting position.
+ 		 */
+ 		if (!XRecOffIsValid(RecPtr))
+ 			ereport(PANIC,
+ 					(errmsg("invalid record offset at %X/%X",
+ 							(uint32) (RecPtr >> 32), (uint32) RecPtr)));
+ 		randAccess = true;		/* allow curFileTLI to go backwards too */
+ 	}
+ 
+ 	/* Read the page containing the record */
+ 	if (!state->read_page(state, RecPtr, emode, randAccess, state->readBuf,
+ 						  state->private_data))
+ 		return NULL;
+ 
+ 	pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
+ 	targetRecOff = RecPtr % XLOG_BLCKSZ;
+ 	if (targetRecOff == 0)
+ 	{
+ 		/*
+ 		 * At page start, so skip over page header.  The Assert checks that
+ 		 * we're not scribbling on caller's record pointer; it's OK because we
+ 		 * can only get here in the continuing-from-prev-record case, since
+ 		 * XRecOffIsValid rejected the zero-page-offset case otherwise. XXX:
+ 		 * does this assert make sense now that RecPtr is not a pointer?
+ 		 */
+ 		Assert(RecPtr == tmpRecPtr);
+ 		RecPtr += pageHeaderSize;
+ 		targetRecOff = pageHeaderSize;
+ 	}
+ 	else if (targetRecOff < pageHeaderSize)
+ 	{
+ 		ereport(emode_for_corrupt_record(emode, 0, RecPtr),
+ 				(errmsg("invalid record offset at %X/%X",
+ 						(uint32) (RecPtr >> 32), (uint32) RecPtr)));
+ 		return NULL;
+ 	}
+ 	if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
+ 		targetRecOff == pageHeaderSize)
+ 	{
+ 		ereport(emode_for_corrupt_record(emode, 0, RecPtr),
+ 				(errmsg("contrecord is requested by %X/%X",
+ 						(uint32) (RecPtr >> 32), (uint32) RecPtr)));
+ 		return NULL;
+ 	}
+ 
+ 	/*
+ 	 * Read the record length.
+ 	 *
+ 	 * NB: Even though we use an XLogRecord pointer here, the whole record
+ 	 * header might not fit on this page. xl_tot_len is the first field of the
+ 	 * struct, so it must be on this page (the records are MAXALIGNed), but we
+ 	 * cannot access any other fields until we've verified that we got the
+ 	 * whole header.
+ 	 */
+ 	record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
+ 	total_len = record->xl_tot_len;
+ 
+ 	/*
+ 	 * If the whole record header is on this page, validate it immediately.
+ 	 * Otherwise do just a basic sanity check on xl_tot_len, and validate the
+ 	 * rest of the header after reading it from the next page.	The xl_tot_len
+ 	 * check is necessary here to ensure that we enter the "Need to reassemble
+ 	 * record" code path below; otherwise we might fail to apply
+ 	 * ValidXLogRecordHeader at all.
+ 	 */
+ 	if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
+ 	{
+ 		if (!ValidXLogRecordHeader(RecPtr, state->ReadRecPtr, record, emode,
+ 								   randAccess))
+ 			return NULL;
+ 		gotheader = true;
+ 	}
+ 	else
+ 	{
+ 		if (total_len < SizeOfXLogRecord)
+ 		{
+ 			ereport(emode_for_corrupt_record(emode, 0, RecPtr),
+ 					(errmsg("invalid record length at %X/%X",
+ 							(uint32) (RecPtr >> 32), (uint32) RecPtr)));
+ 			return NULL;
+ 		}
+ 		gotheader = false;
+ 	}
+ 
+ 	/*
+ 	 * Enlarge readRecordBuf as needed.
+ 	 */
+ 	if (total_len > state->readRecordBufSize &&
+ 		!allocate_recordbuf(state, total_len))
+ 	{
+ 		/* We treat this as a "bogus data" condition */
+ 		ereport(emode_for_corrupt_record(emode, 0, RecPtr),
+ 				(errmsg("record length %u at %X/%X too long",
+ 					  total_len, (uint32) (RecPtr >> 32), (uint32) RecPtr)));
+ 		return NULL;
+ 	}
+ 
+ 	len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
+ 	if (total_len > len)
+ 	{
+ 		/* Need to reassemble record */
+ 		char	   *contrecord;
+ 		XLogPageHeader pageHeader;
+ 		XLogRecPtr	pagelsn;
+ 		char	   *buffer;
+ 		uint32		gotlen;
+ 
+ 		/* Initialize pagelsn to the beginning of the page this record is on */
+ 		pagelsn = (RecPtr / XLOG_BLCKSZ) * XLOG_BLCKSZ;
+ 
+ 		/* Copy the first fragment of the record from the first page. */
+ 		memcpy(state->readRecordBuf,
+ 			   state->readBuf + RecPtr % XLOG_BLCKSZ, len);
+ 		buffer = state->readRecordBuf + len;
+ 		gotlen = len;
+ 
+ 		do
+ 		{
+ 			/* Calculate pointer to beginning of next page */
+ 			XLByteAdvance(pagelsn, XLOG_BLCKSZ);
+ 			/* Wait for the next page to become available */
+ 			if (!state->read_page(state, pagelsn, emode, false, state->readBuf,
+ 								  state->private_data))
+ 				return NULL;
+ 
+ 			/* Check that the continuation on next page looks valid */
+ 			pageHeader = (XLogPageHeader) state->readBuf;
+ 			if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
+ 			{
+ 				ereport(emode_for_corrupt_record(emode, 0, RecPtr),
+ 						(errmsg("there is no contrecord flag at %X/%X",
+ 								(uint32) (RecPtr >> 32), (uint32) RecPtr)));
+ 				return NULL;
+ 			}
+ 
+ 			/*
+ 			 * Cross-check that xlp_rem_len agrees with how much of the record
+ 			 * we expect there to be left.
+ 			 */
+ 			if (pageHeader->xlp_rem_len == 0 ||
+ 				total_len != (pageHeader->xlp_rem_len + gotlen))
+ 			{
+ 				ereport(emode_for_corrupt_record(emode, 0, RecPtr),
+ 						(errmsg("invalid contrecord length %u at %X/%X",
+ 								pageHeader->xlp_rem_len,
+ 								(uint32) (RecPtr >> 32), (uint32) RecPtr)));
+ 				return NULL;
+ 			}
+ 
+ 			/* Append the continuation from this page to the buffer */
+ 			pageHeaderSize = XLogPageHeaderSize(pageHeader);
+ 			contrecord = (char *) state->readBuf + pageHeaderSize;
+ 			len = XLOG_BLCKSZ - pageHeaderSize;
+ 			if (pageHeader->xlp_rem_len < len)
+ 				len = pageHeader->xlp_rem_len;
+ 			memcpy(buffer, (char *) contrecord, len);
+ 			buffer += len;
+ 			gotlen += len;
+ 
+ 			/* If we just reassembled the record header, validate it. */
+ 			if (!gotheader)
+ 			{
+ 				record = (XLogRecord *) state->readRecordBuf;
+ 				if (!ValidXLogRecordHeader(RecPtr, state->ReadRecPtr, record,
+ 										   emode, randAccess))
+ 					return NULL;
+ 				gotheader = true;
+ 			}
+ 		} while (pageHeader->xlp_rem_len > len);
+ 
+ 		record = (XLogRecord *) state->readRecordBuf;
+ 		if (!RecordIsValid(record, RecPtr, emode))
+ 			return NULL;
+ 		pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
+ 		state->ReadRecPtr = RecPtr;
+ 		state->EndRecPtr = pagelsn + pageHeaderSize + MAXALIGN(pageHeader->xlp_rem_len);
+ 	}
+ 	else
+ 	{
+ 		/* Record does not cross a page boundary */
+ 		if (!RecordIsValid(record, RecPtr, emode))
+ 			return NULL;
+ 		state->EndRecPtr = RecPtr + MAXALIGN(total_len);
+ 
+ 		state->ReadRecPtr = RecPtr;
+ 		memcpy(state->readRecordBuf, record, total_len);
+ 	}
+ 
+ 	/*
+ 	 * Special processing if it's an XLOG SWITCH record
+ 	 */
+ 	if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
+ 	{
+ 		/* Pretend it extends to end of segment */
+ 		state->EndRecPtr += XLogSegSize - 1;
+ 		state->EndRecPtr -= state->EndRecPtr % XLogSegSize;
+ 	}
+ 
+ 	return record;
+ }
+ 
+ /*
+  * Validate an XLOG record header.
+  *
+  * This is just a convenience subroutine to avoid duplicated code in
+  * XLogReadRecord.	It's not intended for use from anywhere else.
+  */
+ static bool
+ ValidXLogRecordHeader(XLogRecPtr RecPtr, XLogRecPtr PrevRecPtr,
+ 					  XLogRecord *record, int emode, bool randAccess)
+ {
+ 	/*
+ 	 * xl_len == 0 is bad data for everything except XLOG SWITCH, where it is
+ 	 * required.
+ 	 */
+ 	if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
+ 	{
+ 		if (record->xl_len != 0)
+ 		{
+ 			ereport(emode_for_corrupt_record(emode, 0, RecPtr),
+ 					(errmsg("invalid xlog switch record at %X/%X",
+ 							(uint32) (RecPtr >> 32), (uint32) RecPtr)));
+ 			return false;
+ 		}
+ 	}
+ 	else if (record->xl_len == 0)
+ 	{
+ 		ereport(emode_for_corrupt_record(emode, 0, RecPtr),
+ 				(errmsg("record with zero length at %X/%X",
+ 						(uint32) (RecPtr >> 32), (uint32) RecPtr)));
+ 		return false;
+ 	}
+ 	if (record->xl_tot_len < SizeOfXLogRecord + record->xl_len ||
+ 		record->xl_tot_len > SizeOfXLogRecord + record->xl_len +
+ 		XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
+ 	{
+ 		ereport(emode_for_corrupt_record(emode, 0, RecPtr),
+ 				(errmsg("invalid record length at %X/%X",
+ 						(uint32) (RecPtr >> 32), (uint32) RecPtr)));
+ 		return false;
+ 	}
+ 	if (record->xl_rmid > RM_MAX_ID)
+ 	{
+ 		ereport(emode_for_corrupt_record(emode, 0, RecPtr),
+ 				(errmsg("invalid resource manager ID %u at %X/%X",
+ 						record->xl_rmid, (uint32) (RecPtr >> 32),
+ 						(uint32) RecPtr)));
+ 		return false;
+ 	}
+ 	if (randAccess)
+ 	{
+ 		/*
+ 		 * We can't exactly verify the prev-link, but surely it should be less
+ 		 * than the record's own address.
+ 		 */
+ 		if (!XLByteLT(record->xl_prev, RecPtr))
+ 		{
+ 			ereport(emode_for_corrupt_record(emode, 0, RecPtr),
+ 					(errmsg("record with incorrect prev-link %X/%X at %X/%X",
+ 							(uint32) (record->xl_prev >> 32),
+ 							(uint32) record->xl_prev,
+ 							(uint32) (RecPtr >> 32), (uint32) RecPtr)));
+ 			return false;
+ 		}
+ 	}
+ 	else
+ 	{
+ 		/*
+ 		 * Record's prev-link should exactly match our previous location. This
+ 		 * check guards against torn WAL pages where a stale but valid-looking
+ 		 * WAL record starts on a sector boundary.
+ 		 */
+ 		if (!XLByteEQ(record->xl_prev, PrevRecPtr))
+ 		{
+ 			ereport(emode_for_corrupt_record(emode, 0, RecPtr),
+ 					(errmsg("record with incorrect prev-link %X/%X at %X/%X",
+ 							(uint32) (record->xl_prev >> 32),
+ 							(uint32) record->xl_prev,
+ 							(uint32) (RecPtr >> 32), (uint32) RecPtr)));
+ 			return false;
+ 		}
+ 	}
+ 
+ 	return true;
+ }
+ 
+ 
+ /*
+  * CRC-check an XLOG record.  We do not believe the contents of an XLOG
+  * record (other than to the minimal extent of computing the amount of
+  * data to read in) until we've checked the CRCs.
+  *
+  * We assume all of the record (that is, xl_tot_len bytes) has been read
+  * into memory at *record.	Also, ValidXLogRecordHeader() has accepted the
+  * record's header, which means in particular that xl_tot_len is at least
+  * SizeOfXlogRecord, so it is safe to fetch xl_len.
+  */
+ static bool
+ RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
+ {
+ 	pg_crc32	crc;
+ 	int			i;
+ 	uint32		len = record->xl_len;
+ 	BkpBlock	bkpb;
+ 	char	   *blk;
+ 	size_t		remaining = record->xl_tot_len;
+ 
+ 	/* First the rmgr data */
+ 	if (remaining < SizeOfXLogRecord + len)
+ 	{
+ 		/* ValidXLogRecordHeader() should've caught this already... */
+ 		ereport(emode_for_corrupt_record(emode, 0, recptr),
+ 				(errmsg("invalid record length at %X/%X",
+ 						(uint32) (recptr >> 32), (uint32) recptr)));
+ 		return false;
+ 	}
+ 	remaining -= SizeOfXLogRecord + len;
+ 	INIT_CRC32(crc);
+ 	COMP_CRC32(crc, XLogRecGetData(record), len);
+ 
+ 	/* Add in the backup blocks, if any */
+ 	blk = (char *) XLogRecGetData(record) + len;
+ 	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
+ 	{
+ 		uint32		blen;
+ 
+ 		if (!(record->xl_info & XLR_BKP_BLOCK(i)))
+ 			continue;
+ 
+ 		if (remaining < sizeof(BkpBlock))
+ 		{
+ 			ereport(emode_for_corrupt_record(emode, 0, recptr),
+ 					(errmsg("invalid backup block size in record at %X/%X",
+ 							(uint32) (recptr >> 32), (uint32) recptr)));
+ 			return false;
+ 		}
+ 		memcpy(&bkpb, blk, sizeof(BkpBlock));
+ 
+ 		if (bkpb.hole_offset + bkpb.hole_length > BLCKSZ)
+ 		{
+ 			ereport(emode_for_corrupt_record(emode, 0, recptr),
+ 					(errmsg("incorrect hole size in record at %X/%X",
+ 							(uint32) (recptr >> 32), (uint32) recptr)));
+ 			return false;
+ 		}
+ 		blen = sizeof(BkpBlock) + BLCKSZ - bkpb.hole_length;
+ 
+ 		if (remaining < blen)
+ 		{
+ 			ereport(emode_for_corrupt_record(emode, 0, recptr),
+ 					(errmsg("invalid backup block size in record at %X/%X",
+ 							(uint32) (recptr >> 32), (uint32) recptr)));
+ 			return false;
+ 		}
+ 		remaining -= blen;
+ 		COMP_CRC32(crc, blk, blen);
+ 		blk += blen;
+ 	}
+ 
+ 	/* Check that xl_tot_len agrees with our calculation */
+ 	if (remaining != 0)
+ 	{
+ 		ereport(emode_for_corrupt_record(emode, 0, recptr),
+ 				(errmsg("incorrect total length in record at %X/%X",
+ 						(uint32) (recptr >> 32), (uint32) recptr)));
+ 		return false;
+ 	}
+ 
+ 	/* Finally include the record header */
+ 	COMP_CRC32(crc, (char *) record, offsetof(XLogRecord, xl_crc));
+ 	FIN_CRC32(crc);
+ 
+ 	if (!EQ_CRC32(record->xl_crc, crc))
+ 	{
+ 		ereport(emode_for_corrupt_record(emode, 0, recptr),
+ 		(errmsg("incorrect resource manager data checksum in record at %X/%X",
+ 				(uint32) (recptr >> 32), (uint32) recptr)));
+ 		return false;
+ 	}
+ 
+ 	return true;
+ }
*** a/src/include/access/xlog_internal.h
--- b/src/include/access/xlog_internal.h
***************
*** 231,236 **** extern XLogRecPtr RequestXLogSwitch(void);
--- 231,244 ----
  
  extern void GetOldestRestartPoint(XLogRecPtr *oldrecptr, TimeLineID *oldtli);
  
+ 
+ /*
+  * Exported so that xlogreader.c can call this. TODO: Should be refactored
+  * into a callback, or just have xlogreader return the error string and have
+  * the caller of XLogReadRecord() do the ereport() call.
+  */
+ extern int	emode_for_corrupt_record(int emode, int readSource, XLogRecPtr RecPtr);
+ 
  /*
   * Exported for the functions in timeline.c and xlogarchive.c.  Only valid
   * in the startup process.
*** /dev/null
--- b/src/include/access/xlogreader.h
***************
*** 0 ****
--- 1,97 ----
+ /*-------------------------------------------------------------------------
+  *
+  * readxlog.h
+  *
+  *		Generic xlog reading facility.
+  *
+  * Portions Copyright (c) 2012, PostgreSQL Global Development Group
+  *
+  * IDENTIFICATION
+  *		src/include/access/xlogreader.h
+  *
+  * NOTES
+  *		Check the definition of the XLogReaderState struct for instructions on
+  *		how to use the XLogReader infrastructure.
+  *
+  *		The basic idea is to allocate an XLogReaderState via
+  *		XLogReaderAllocate, and call XLogReadRecord() until it returns NULL.
+  *-------------------------------------------------------------------------
+  */
+ #ifndef XLOGREADER_H
+ #define XLOGREADER_H
+ 
+ #include "access/xlog_internal.h"
+ 
+ struct XLogReaderState;
+ 
+ /*
+  * The callbacks are explained in more detail inside the XLogReaderState
+  * struct.
+  */
+ typedef bool (*XLogPageReadCB) (struct XLogReaderState *state,
+ 											XLogRecPtr RecPtr, int emode,
+ 											bool randAccess,
+ 											char *readBuf,
+ 											void *private_data);
+ 
+ typedef struct XLogReaderState
+ {
+ 	/* ----------------------------------------
+ 	 * Public parameters
+ 	 * ----------------------------------------
+ 	 */
+ 
+ 	/*
+ 	 * Data input callback (mandatory).
+ 	 *
+ 	 * This callback shall read XLOG_BLKSZ bytes, from the location 'RecPtr',
+ 	 * into memory pointed at by 'readBuf' parameter.  The callback shall
+ 	 * return true on success, false if the page could not be read.
+ 	 */
+ 	XLogPageReadCB read_page;
+ 
+ 	/*
+ 	 * Opaque data for callbacks to use.  Not used by XLogReader.
+ 	 */
+ 	void	   *private_data;
+ 
+ 	/*
+ 	 * From where to where are we reading
+ 	 */
+ 	XLogRecPtr	ReadRecPtr;		/* start of last record read */
+ 	XLogRecPtr	EndRecPtr;		/* end+1 of last record read */
+ 
+ 	/* ----------------------------------------
+ 	 * private/internal state
+ 	 * ----------------------------------------
+ 	 */
+ 
+ 	/* Buffer for currently read page (XLOG_BLCKSZ bytes) */
+ 	char	   *readBuf;
+ 
+ 	/* Buffer for current ReadRecord result (expandable) */
+ 	char	   *readRecordBuf;
+ 	uint32		readRecordBufSize;
+ } XLogReaderState;
+ 
+ /*
+  * Get a new XLogReader
+  *
+  * At least the read_page callback, startptr and endptr have to be set before
+  * the reader can be used.
+  */
+ extern XLogReaderState *XLogReaderAllocate(XLogRecPtr startpoint,
+ 				   XLogPageReadCB pagereadfunc, void *private_data);
+ 
+ /*
+  * Free an XLogReader
+  */
+ extern void XLogReaderFree(XLogReaderState *state);
+ 
+ /*
+  * Read the next record from xlog. Returns NULL on end-of-WAL or on failure.
+  */
+ extern XLogRecord *XLogReadRecord(XLogReaderState *state, XLogRecPtr ptr,
+ 			   int emode);
+ 
+ #endif   /* XLOGREADER_H */
