Index: src/backend/access/transam/xlog.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/access/transam/xlog.c,v
retrieving revision 1.242
diff -c -r1.242 xlog.c
*** src/backend/access/transam/xlog.c	27 Jun 2006 18:59:17 -0000	1.242
--- src/backend/access/transam/xlog.c	11 Jul 2006 16:46:21 -0000
***************
*** 124,129 ****
--- 124,130 ----
  
  /* File path names (all relative to $PGDATA) */
  #define BACKUP_LABEL_FILE		"backup_label"
+ #define BACKUP_LABEL_IN_USE	    "backup_label.in_use"
  #define RECOVERY_COMMAND_FILE	"recovery.conf"
  #define RECOVERY_COMMAND_DONE	"recovery.done"
  
***************
*** 183,188 ****
--- 184,192 ----
  static bool recoveryTargetInclusive = true;
  static TransactionId recoveryTargetXid;
  static time_t recoveryTargetTime;
+ static bool InStandby = false;
+ /* How many XLOG_CHECKPOINT* entries since last recovery checkpoint */
+ static int nCheckpoints = 0;    
  
  /* if recoveryStopsHere returns true, it saves actual stop xid/time here */
  static TransactionId recoveryStopXid;
***************
*** 496,501 ****
--- 500,506 ----
  					 uint32 endLogId, uint32 endLogSeg);
  static void WriteControlFile(void);
  static void ReadControlFile(void);
+ static void ValidateControlFile(void);
  static char *str_time(time_t tnow);
  static void issue_xlog_fsync(void);
  
***************
*** 505,511 ****
  static bool read_backup_label(XLogRecPtr *checkPointLoc);
  static void remove_backup_label(void);
  static void rm_redo_error_callback(void *arg);
! 
  
  /*
   * Insert an XLOG record having the specified RMID and info bytes,
--- 510,516 ----
  static bool read_backup_label(XLogRecPtr *checkPointLoc);
  static void remove_backup_label(void);
  static void rm_redo_error_callback(void *arg);
! static void CheckPointShmem(XLogRecPtr checkPointRedo);
  
  /*
   * Insert an XLOG record having the specified RMID and info bytes,
***************
*** 3626,3631 ****
--- 3631,3663 ----
  		ereport(FATAL,
  				(errmsg("incorrect checksum in control file")));
  
+     ValidateControlFile();
+ 
+ 	if (pg_perm_setlocale(LC_COLLATE, ControlFile->lc_collate) == NULL)
+ 		ereport(FATAL,
+ 			(errmsg("database files are incompatible with operating system"),
+ 			 errdetail("The database cluster was initialized with LC_COLLATE \"%s\","
+ 					   " which is not recognized by setlocale().",
+ 					   ControlFile->lc_collate),
+ 			 errhint("It looks like you need to initdb or install locale support.")));
+ 	if (pg_perm_setlocale(LC_CTYPE, ControlFile->lc_ctype) == NULL)
+ 		ereport(FATAL,
+ 			(errmsg("database files are incompatible with operating system"),
+ 		errdetail("The database cluster was initialized with LC_CTYPE \"%s\","
+ 				  " which is not recognized by setlocale().",
+ 				  ControlFile->lc_ctype),
+ 			 errhint("It looks like you need to initdb or install locale support.")));
+ 
+ 	/* Make the fixed locale settings visible as GUC variables, too */
+ 	SetConfigOption("lc_collate", ControlFile->lc_collate,
+ 					PGC_INTERNAL, PGC_S_OVERRIDE);
+ 	SetConfigOption("lc_ctype", ControlFile->lc_ctype,
+ 					PGC_INTERNAL, PGC_S_OVERRIDE);
+ }
+ 
+ static void
+ ValidateControlFile(void)
+ {
  	/*
  	 * Do compatibility checking immediately.  We do this here for 2 reasons:
  	 *
***************
*** 3722,3747 ****
  				  " but the server was compiled with LOCALE_NAME_BUFLEN %d.",
  						   ControlFile->localeBuflen, LOCALE_NAME_BUFLEN),
  				 errhint("It looks like you need to recompile or initdb.")));
- 	if (pg_perm_setlocale(LC_COLLATE, ControlFile->lc_collate) == NULL)
- 		ereport(FATAL,
- 			(errmsg("database files are incompatible with operating system"),
- 			 errdetail("The database cluster was initialized with LC_COLLATE \"%s\","
- 					   " which is not recognized by setlocale().",
- 					   ControlFile->lc_collate),
- 			 errhint("It looks like you need to initdb or install locale support.")));
- 	if (pg_perm_setlocale(LC_CTYPE, ControlFile->lc_ctype) == NULL)
- 		ereport(FATAL,
- 			(errmsg("database files are incompatible with operating system"),
- 		errdetail("The database cluster was initialized with LC_CTYPE \"%s\","
- 				  " which is not recognized by setlocale().",
- 				  ControlFile->lc_ctype),
- 			 errhint("It looks like you need to initdb or install locale support.")));
- 
- 	/* Make the fixed locale settings visible as GUC variables, too */
- 	SetConfigOption("lc_collate", ControlFile->lc_collate,
- 					PGC_INTERNAL, PGC_S_OVERRIDE);
- 	SetConfigOption("lc_ctype", ControlFile->lc_ctype,
- 					PGC_INTERNAL, PGC_S_OVERRIDE);
  }
  
  void
--- 3754,3759 ----
***************
*** 3749,3754 ****
--- 3761,3768 ----
  {
  	int			fd;
  
+     ValidateControlFile();
+ 
  	INIT_CRC32(ControlFile->crc);
  	COMP_CRC32(ControlFile->crc,
  			   (char *) ControlFile,
***************
*** 4095,4100 ****
--- 4109,4123 ----
  					(errmsg("restore_command = \"%s\"",
  							recoveryRestoreCommand)));
  		}
+ 		else if (strcmp(tok1, "standby_mode") == 0)
+ 		{
+ 			if (strcmp(tok2, "true") == 0)
+             {
+                 InStandby = true;
+ 				ereport(LOG,
+ 						(errmsg("standby_mode = true")));
+             }
+         }
  		else if (strcmp(tok1, "recovery_target_timeline") == 0)
  		{
  			rtliGiven = true;
***************
*** 4230,4235 ****
--- 4253,4259 ----
  	 * We are no longer in archive recovery state.
  	 */
  	InArchiveRecovery = false;
+ 	InStandby = false;
  
  	/*
  	 * We should have the ending log segment currently open.  Verify, and then
***************
*** 4465,4476 ****
  		ereport(LOG,
  				(errmsg("database system shutdown was interrupted at %s",
  						str_time(ControlFile->time))));
! 	else if (ControlFile->state == DB_IN_RECOVERY)
  		ereport(LOG,
  		   (errmsg("database system was interrupted while in recovery at %s",
  				   str_time(ControlFile->time)),
  			errhint("This probably means that some data is corrupted and"
  					" you will have to use the last backup for recovery.")));
  	else if (ControlFile->state == DB_IN_PRODUCTION)
  		ereport(LOG,
  				(errmsg("database system was interrupted at %s",
--- 4489,4506 ----
  		ereport(LOG,
  				(errmsg("database system shutdown was interrupted at %s",
  						str_time(ControlFile->time))));
! 	else if (ControlFile->state == DB_IN_CRASH_RECOVERY)
  		ereport(LOG,
  		   (errmsg("database system was interrupted while in recovery at %s",
  				   str_time(ControlFile->time)),
  			errhint("This probably means that some data is corrupted and"
  					" you will have to use the last backup for recovery.")));
+ 	else if (ControlFile->state == DB_IN_ARCHIVE_RECOVERY)
+ 		ereport(LOG,
+ 		   (errmsg("database system was interrupted while in recovery at log time %s",
+ 				   str_time(ControlFile->time)),
+ 			errhint("If this has occurred more than once some data may be corrupted"
+ 					" and you may need to choose an earlier recovery target.")));
  	else if (ControlFile->state == DB_IN_PRODUCTION)
  		ereport(LOG,
  				(errmsg("database system was interrupted at %s",
***************
*** 4626,4641 ****
  	{
  		int			rmid;
  
  		if (InArchiveRecovery)
! 			ereport(LOG,
  					(errmsg("automatic recovery in progress")));
  		else
  			ereport(LOG,
  					(errmsg("database system was not properly shut down; "
  							"automatic recovery in progress")));
! 		ControlFile->state = DB_IN_RECOVERY;
  		ControlFile->time = time(NULL);
  		UpdateControlFile();
  
  		/* Start up the recovery environment */
  		XLogInitRelationCache();
--- 4656,4685 ----
  	{
  		int			rmid;
  
+         /*
+          * If we are in Archive Recovery then we create recovery checkpoints
+          * to avoid needing to start right from the beginning again. 
+          */
+     	LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
  		if (InArchiveRecovery)
!         {		
!         	ereport(LOG,
  					(errmsg("automatic recovery in progress")));
+     		ControlFile->state = DB_IN_ARCHIVE_RECOVERY;
+         }
  		else
+         {
  			ereport(LOG,
  					(errmsg("database system was not properly shut down; "
  							"automatic recovery in progress")));
!     		ControlFile->state = DB_IN_CRASH_RECOVERY;
!         }
  		ControlFile->time = time(NULL);
+     	ControlFile->prevCheckPoint = ControlFile->checkPoint;
+     	ControlFile->checkPoint = checkPointLoc;
+     	ControlFile->checkPointCopy = checkPoint;
  		UpdateControlFile();
+     	LWLockRelease(ControlFileLock);
  
  		/* Start up the recovery environment */
  		XLogInitRelationCache();
***************
*** 4668,4673 ****
--- 4712,4719 ----
  			ErrorContextCallback	errcontext;
  
  			InRedo = true;
+             nCheckpoints = 0;
+ 
  			ereport(LOG,
  					(errmsg("redo starts at %X/%X",
  							ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));
***************
*** 5334,5345 ****
  		ereport(DEBUG2,
  				(errmsg("checkpoint starting")));
  
! 	CheckPointCLOG();
! 	CheckPointSUBTRANS();
! 	CheckPointMultiXact();
! 	FlushBufferPool();
! 	/* We deliberately delay 2PC checkpointing as long as possible */
! 	CheckPointTwoPhase(checkPoint.redo);
  
  	START_CRIT_SECTION();
  
--- 5380,5389 ----
  		ereport(DEBUG2,
  				(errmsg("checkpoint starting")));
  
!     /*
!      * Ensure all of shared memory gets checkpointed
!      */
!     CheckPointShmem(checkPoint.redo);
  
  	START_CRIT_SECTION();
  
***************
*** 5458,5463 ****
--- 5502,5508 ----
  xlog_redo(XLogRecPtr lsn, XLogRecord *record)
  {
  	uint8		info = record->xl_info & ~XLR_INFO_MASK;
+ 	CheckPoint	checkPoint;
  
  	if (info == XLOG_NEXTOID)
  	{
***************
*** 5469,5479 ****
  			ShmemVariableCache->nextOid = nextOid;
  			ShmemVariableCache->oidCount = 0;
  		}
  	}
  	else if (info == XLOG_CHECKPOINT_SHUTDOWN)
  	{
- 		CheckPoint	checkPoint;
- 
  		memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
  		/* In a SHUTDOWN checkpoint, believe the counters exactly */
  		ShmemVariableCache->nextXid = checkPoint.nextXid;
--- 5514,5523 ----
  			ShmemVariableCache->nextOid = nextOid;
  			ShmemVariableCache->oidCount = 0;
  		}
+         return;
  	}
  	else if (info == XLOG_CHECKPOINT_SHUTDOWN)
  	{
  		memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
  		/* In a SHUTDOWN checkpoint, believe the counters exactly */
  		ShmemVariableCache->nextXid = checkPoint.nextXid;
***************
*** 5499,5506 ****
  	}
  	else if (info == XLOG_CHECKPOINT_ONLINE)
  	{
- 		CheckPoint	checkPoint;
- 
  		memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
  		/* In an ONLINE checkpoint, treat the counters like NEXTOID */
  		if (TransactionIdPrecedes(ShmemVariableCache->nextXid,
--- 5543,5548 ----
***************
*** 5519,5524 ****
--- 5561,5609 ----
  					(errmsg("unexpected timeline ID %u (should be %u) in checkpoint record",
  							checkPoint.ThisTimeLineID, ThisTimeLineID)));
  	}
+ 
+ #define RECOVERY_CHECKPOINT_INTERVAL 100
+ 
+     /*
+      * If we are in Standby mode, then do a recovery checkpoint 
+      * for each checkpoint found in WAL replay. Otherwise,
+      * don't do this very frequently since this slows down recovery.
+      * A recovery checkpoint is simply a recreation of the database
+      * state after the original checkpoint: all database changes
+      * are written to disk, allowing us to restart recovery from that
+      * point. 
+      *
+      * Note: Should recovery ever be parallelised in the future,
+      * all work *must* stop until the recovery checkpoint has
+      * completed.
+      */
+     if (InArchiveRecovery && (InStandby || nCheckpoints >= RECOVERY_CHECKPOINT_INTERVAL))
+     {
+         CheckPointShmem(lsn);
+ 
+     	LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
+    		ControlFile->state = DB_IN_ARCHIVE_RECOVERY;
+     	ControlFile->prevCheckPoint = ControlFile->checkPoint;
+         /* 
+          * The checkpoint record starts at ReadRecPtr; lsn is pointer to
+          * the next xlog record so must not be used here
+          */
+     	ControlFile->checkPoint = ReadRecPtr;
+     	ControlFile->checkPointCopy = checkPoint;
+         /* 
+          * Make it look like we started from this point, so this is *not*
+          * current time but original checkpoint time 
+          */
+     	ControlFile->time = checkPoint.time;
+     	UpdateControlFile();
+     	LWLockRelease(ControlFileLock);
+ 		ereport(LOG,
+ 				(errmsg("recovery checkpoint at %X/%X",
+ 						lsn.xlogid, lsn.xrecoff)));
+         nCheckpoints = 0;
+     }
+     else
+         nCheckpoints++;
  }
  
  void
***************
*** 6106,6111 ****
--- 6191,6207 ----
  							histfilepath)));
  	}
  
+ 	/*
+ 	 * Rename the backup label file out of the way, so that we don't accidentally
+ 	 * re-start recovery from the beginning.
+ 	 */
+ 	unlink(BACKUP_LABEL_IN_USE);
+ 	if (rename(BACKUP_LABEL_FILE, BACKUP_LABEL_IN_USE) != 0)
+ 		ereport(FATAL,
+ 				(errcode_for_file_access(),
+ 				 errmsg("could not rename file \"%s\" to \"%s\": %m",
+ 						BACKUP_LABEL_FILE, BACKUP_LABEL_IN_USE)));
+ 
  	return true;
  }
  
***************
*** 6119,6130 ****
  static void
  remove_backup_label(void)
  {
! 	if (unlink(BACKUP_LABEL_FILE) != 0)
! 		if (errno != ENOENT)
! 			ereport(FATAL,
! 					(errcode_for_file_access(),
! 					 errmsg("could not remove file \"%s\": %m",
! 							BACKUP_LABEL_FILE)));
  }
  
  /*
--- 6215,6226 ----
  static void
  remove_backup_label(void)
  {
!     if (unlink(BACKUP_LABEL_IN_USE) != 0)
!         if (errno != ENOENT)
!             ereport(FATAL,
!                     (errcode_for_file_access(),
!                     errmsg("could not remove file \"%s\": %m",
!                                     BACKUP_LABEL_IN_USE)));
  }
  
  /*
***************
*** 6147,6149 ****
--- 6243,6258 ----
  
  	pfree(buf.data);
  }
+ 
+ /* 
+  * Flush all shared memory data zones and ensure fsync
+  */
+ static void CheckPointShmem(XLogRecPtr checkPointRedo)
+ {
+ 	CheckPointCLOG();
+ 	CheckPointSUBTRANS();
+ 	CheckPointMultiXact();
+ 	FlushBufferPool();     /* performs all required fsyncs */
+ 	/* We deliberately delay 2PC checkpointing as long as possible */
+ 	CheckPointTwoPhase(checkPointRedo);
+ }
Index: src/include/catalog/pg_control.h
===================================================================
RCS file: /projects/cvsroot/pgsql/src/include/catalog/pg_control.h,v
retrieving revision 1.29
diff -c -r1.29 pg_control.h
*** src/include/catalog/pg_control.h	4 Apr 2006 22:39:59 -0000	1.29
--- src/include/catalog/pg_control.h	11 Jul 2006 16:46:24 -0000
***************
*** 55,61 ****
  	DB_STARTUP = 0,
  	DB_SHUTDOWNED,
  	DB_SHUTDOWNING,
! 	DB_IN_RECOVERY,
  	DB_IN_PRODUCTION
  } DBState;
  
--- 55,62 ----
  	DB_STARTUP = 0,
  	DB_SHUTDOWNED,
  	DB_SHUTDOWNING,
! 	DB_IN_CRASH_RECOVERY,
! 	DB_IN_ARCHIVE_RECOVERY,
  	DB_IN_PRODUCTION
  } DBState;
  
