*** a/doc/src/sgml/config.sgml
--- b/doc/src/sgml/config.sgml
***************
*** 1712,1717 **** SET ENABLE_SEQSCAN TO OFF;
--- 1712,1773 ----
        </listitem>
       </varlistentry>
  
+      <varlistentry id="guc-page-checksums" xreflabel="page_checksums">
+       <indexterm>
+        <primary><varname>page_checksums</> configuration parameter</primary>
+       </indexterm>
+       <term><varname>page_checksums</varname> (<type>boolean</type>)</term>
+       <listitem>
+        <para>
+         When this parameter is on, the <productname>PostgreSQL</> server
+         calculates checksums when it writes main database pages to disk,
+         flagging the page as checksum protected.
+         When pages are read into shared buffers any page flagged with a
+         checksum has the checksum re-calculated and compared against the
+         stored value to provide greatly improved validation of page contents.
+         The checksum uses 16-bit checksums, using the fast Fletcher 16 algorithm.
+        </para>
+ 
+        <para>
+         When this parameter is off we write only the page version number and
+         blocksize as a standard watermark. If we read in a block that has a
+         checksum yet <varname>page_checksums</varname> is disabled we do not
+         verify the checksum value. Such pages will be reset to the standard
+         watermark if they are rewritten for any reason.
+         The database may thus contain a mix of pages with checksums and pages
+         without checksums.
+        </para>
+ 
+        <para>
+         Writes via temp_buffers are not checksummed at any time.
+        </para>
+ 
+        <para>
+         Turning this parameter off reduces the CPU overhead of reading/writing
+         data from disk though might allow data corruption to go unnoticed.
+         With this parameter enabled there is still a non-zero probability that
+         an error would go undetected.
+        </para>
+ 
+        <para>
+         The checksum uses 16-bit checksums, using the fast Fletcher 16 algorithm.
+         Later releases may increase the size of the checksum field and offer
+         alternative checksum or CRC algorithms.
+        </para>
+ 
+        <para>
+         The default is <literal>off</> for backwards compatibility and
+         to allow upgrade. The recommended setting is <literal>on</> though
+         this should not be enabled until upgrade is successfully complete
+         with full set of new backups.
+        </para>
+ 
+        <para>
+         This parameter can only be set at server start.
+        </para>
+       </listitem>
+      </varlistentry>
+ 
       <varlistentry id="guc-wal-buffers" xreflabel="wal_buffers">
        <term><varname>wal_buffers</varname> (<type>integer</type>)</term>
        <indexterm>
*** a/src/backend/access/hash/hashpage.c
--- b/src/backend/access/hash/hashpage.c
***************
*** 730,735 **** _hash_alloc_buckets(Relation rel, BlockNumber firstblock, uint32 nblocks)
--- 730,736 ----
  	MemSet(zerobuf, 0, sizeof(zerobuf));
  
  	RelationOpenSmgr(rel);
+ 	/* no need to set page verification info for all zero pages */
  	smgrextend(rel->rd_smgr, MAIN_FORKNUM, lastblock, zerobuf, false);
  
  	return true;
*** a/src/backend/access/heap/rewriteheap.c
--- b/src/backend/access/heap/rewriteheap.c
***************
*** 266,271 **** end_heap_rewrite(RewriteState state)
--- 266,273 ----
  	/* Write the last page, if any */
  	if (state->rs_buffer_valid)
  	{
+ 		PageSetVerificationInfoInplace(state->rs_buffer);
+ 
  		if (state->rs_use_wal)
  			log_newpage(&state->rs_new_rel->rd_node,
  						MAIN_FORKNUM,
***************
*** 611,616 **** raw_heap_insert(RewriteState state, HeapTuple tup)
--- 613,620 ----
  		{
  			/* Doesn't fit, so write out the existing page */
  
+ 			PageSetVerificationInfoInplace(page);
+ 
  			/* XLOG stuff */
  			if (state->rs_use_wal)
  				log_newpage(&state->rs_new_rel->rd_node,
*** a/src/backend/access/heap/visibilitymap.c
--- b/src/backend/access/heap/visibilitymap.c
***************
*** 568,573 **** vm_extend(Relation rel, BlockNumber vm_nblocks)
--- 568,574 ----
  	/* Now extend the file */
  	while (vm_nblocks_now < vm_nblocks)
  	{
+ 		/* no need to set page verification info for all zero pages */
  		smgrextend(rel->rd_smgr, VISIBILITYMAP_FORKNUM, vm_nblocks_now,
  				   (char *) pg, false);
  		vm_nblocks_now++;
*** a/src/backend/access/nbtree/nbtree.c
--- b/src/backend/access/nbtree/nbtree.c
***************
*** 216,221 **** btbuildempty(PG_FUNCTION_ARGS)
--- 216,222 ----
  	_bt_initmetapage(metapage, P_NONE, 0);
  
  	/* Write the page.	If archiving/streaming, XLOG it. */
+ 	PageSetVerificationInfoInplace(metapage);
  	smgrwrite(index->rd_smgr, INIT_FORKNUM, BTREE_METAPAGE,
  			  (char *) metapage, true);
  	if (XLogIsNeeded())
*** a/src/backend/access/nbtree/nbtsort.c
--- b/src/backend/access/nbtree/nbtsort.c
***************
*** 288,299 **** _bt_blwritepage(BTWriteState *wstate, Page page, BlockNumber blkno)
--- 288,302 ----
  	{
  		if (!wstate->btws_zeropage)
  			wstate->btws_zeropage = (Page) palloc0(BLCKSZ);
+ 		/* no need to set page verification info for all zero pages */
  		smgrextend(wstate->index->rd_smgr, MAIN_FORKNUM,
  				   wstate->btws_pages_written++,
  				   (char *) wstate->btws_zeropage,
  				   true);
  	}
  
+ 	PageSetVerificationInfoInplace(page);
+ 
  	/*
  	 * Now write the page.	There's no need for smgr to schedule an fsync for
  	 * this write; we'll do it ourselves before ending the build.
*** a/src/backend/access/spgist/spginsert.c
--- b/src/backend/access/spgist/spginsert.c
***************
*** 150,155 **** spgbuildempty(PG_FUNCTION_ARGS)
--- 150,156 ----
  	SpGistInitMetapage(page);
  
  	/* Write the page.	If archiving/streaming, XLOG it. */
+ 	PageSetVerificationInfoInplace(page);
  	smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_METAPAGE_BLKNO,
  			  (char *) page, true);
  	if (XLogIsNeeded())
***************
*** 159,164 **** spgbuildempty(PG_FUNCTION_ARGS)
--- 160,166 ----
  	/* Likewise for the root page. */
  	SpGistInitPage(page, SPGIST_LEAF);
  
+ 	PageSetVerificationInfoInplace(page);
  	smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_HEAD_BLKNO,
  			  (char *) page, true);
  	if (XLogIsNeeded())
*** a/src/backend/access/transam/README
--- b/src/backend/access/transam/README
***************
*** 527,532 **** associated with the n'th distinct buffer ID seen in the "rdata" array, and
--- 527,540 ----
  per the above discussion, fully-rewritable buffers shouldn't be mentioned in
  "rdata".)
  
+ Note that we must only use PageSetLSN/PageGetLSN() when we know the action
+ is serialised. Only Startup process may modify data blocks during recovery,
+ so Startup process may execute PageGetLSN() without fear of serialisation
+ problems. All other processes must only call PageSet/GetLSN when holding
+ either an exclusive buffer lock or a shared lock plus buffer header lock,
+ or be writing the data block directly rather than through shared buffers
+ while holding AccessExclusiveLock on the relation.
+ 
  Due to all these constraints, complex changes (such as a multilevel index
  insertion) normally need to be described by a series of atomic-action WAL
  records.  What do you do if the intermediate states are not self-consistent?
*** a/src/backend/access/transam/twophase.c
--- b/src/backend/access/transam/twophase.c
***************
*** 320,326 **** MarkAsPreparing(TransactionId xid, const char *gid,
  	proc->lxid = (LocalTransactionId) xid;
  	pgxact->xid = xid;
  	pgxact->xmin = InvalidTransactionId;
! 	pgxact->inCommit = false;
  	pgxact->vacuumFlags = 0;
  	proc->pid = 0;
  	proc->backendId = InvalidBackendId;
--- 320,326 ----
  	proc->lxid = (LocalTransactionId) xid;
  	pgxact->xid = xid;
  	pgxact->xmin = InvalidTransactionId;
! 	pgxact->delayChkpt = false;
  	pgxact->vacuumFlags = 0;
  	proc->pid = 0;
  	proc->backendId = InvalidBackendId;
***************
*** 1028,1045 **** EndPrepare(GlobalTransaction gxact)
  	 * odds of a PANIC actually occurring should be very tiny given that we
  	 * were able to write the bogus CRC above.
  	 *
! 	 * We have to set inCommit here, too; otherwise a checkpoint starting
  	 * immediately after the WAL record is inserted could complete without
  	 * fsync'ing our state file.  (This is essentially the same kind of race
  	 * condition as the COMMIT-to-clog-write case that RecordTransactionCommit
! 	 * uses inCommit for; see notes there.)
  	 *
  	 * We save the PREPARE record's location in the gxact for later use by
  	 * CheckPointTwoPhase.
  	 */
  	START_CRIT_SECTION();
  
! 	MyPgXact->inCommit = true;
  
  	gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE,
  									records.head);
--- 1028,1045 ----
  	 * odds of a PANIC actually occurring should be very tiny given that we
  	 * were able to write the bogus CRC above.
  	 *
! 	 * We have to set delayChkpt here, too; otherwise a checkpoint starting
  	 * immediately after the WAL record is inserted could complete without
  	 * fsync'ing our state file.  (This is essentially the same kind of race
  	 * condition as the COMMIT-to-clog-write case that RecordTransactionCommit
! 	 * uses delayChkpt for; see notes there.)
  	 *
  	 * We save the PREPARE record's location in the gxact for later use by
  	 * CheckPointTwoPhase.
  	 */
  	START_CRIT_SECTION();
  
! 	MyPgXact->delayChkpt = true;
  
  	gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE,
  									records.head);
***************
*** 1087,1093 **** EndPrepare(GlobalTransaction gxact)
  	 * checkpoint starting after this will certainly see the gxact as a
  	 * candidate for fsyncing.
  	 */
! 	MyPgXact->inCommit = false;
  
  	END_CRIT_SECTION();
  
--- 1087,1093 ----
  	 * checkpoint starting after this will certainly see the gxact as a
  	 * candidate for fsyncing.
  	 */
! 	MyPgXact->delayChkpt = false;
  
  	END_CRIT_SECTION();
  
***************
*** 1971,1977 **** RecoverPreparedTransactions(void)
   *	RecordTransactionCommitPrepared
   *
   * This is basically the same as RecordTransactionCommit: in particular,
!  * we must set the inCommit flag to avoid a race condition.
   *
   * We know the transaction made at least one XLOG entry (its PREPARE),
   * so it is never possible to optimize out the commit record.
--- 1971,1977 ----
   *	RecordTransactionCommitPrepared
   *
   * This is basically the same as RecordTransactionCommit: in particular,
!  * we must set the delayChkpt flag to avoid a race condition.
   *
   * We know the transaction made at least one XLOG entry (its PREPARE),
   * so it is never possible to optimize out the commit record.
***************
*** 1994,2000 **** RecordTransactionCommitPrepared(TransactionId xid,
  	START_CRIT_SECTION();
  
  	/* See notes in RecordTransactionCommit */
! 	MyPgXact->inCommit = true;
  
  	/* Emit the XLOG commit record */
  	xlrec.xid = xid;
--- 1994,2000 ----
  	START_CRIT_SECTION();
  
  	/* See notes in RecordTransactionCommit */
! 	MyPgXact->delayChkpt = true;
  
  	/* Emit the XLOG commit record */
  	xlrec.xid = xid;
***************
*** 2059,2065 **** RecordTransactionCommitPrepared(TransactionId xid,
  	TransactionIdCommitTree(xid, nchildren, children);
  
  	/* Checkpoint can proceed now */
! 	MyPgXact->inCommit = false;
  
  	END_CRIT_SECTION();
  
--- 2059,2065 ----
  	TransactionIdCommitTree(xid, nchildren, children);
  
  	/* Checkpoint can proceed now */
! 	MyPgXact->delayChkpt = false;
  
  	END_CRIT_SECTION();
  
*** a/src/backend/access/transam/xact.c
--- b/src/backend/access/transam/xact.c
***************
*** 975,987 **** RecordTransactionCommit(void)
  		 * RecordTransactionAbort.	That's because loss of a transaction abort
  		 * is noncritical; the presumption would be that it aborted, anyway.
  		 *
! 		 * It's safe to change the inCommit flag of our own backend without
  		 * holding the ProcArrayLock, since we're the only one modifying it.
! 		 * This makes checkpoint's determination of which xacts are inCommit a
  		 * bit fuzzy, but it doesn't matter.
  		 */
  		START_CRIT_SECTION();
! 		MyPgXact->inCommit = true;
  
  		SetCurrentTransactionStopTimestamp();
  
--- 975,987 ----
  		 * RecordTransactionAbort.	That's because loss of a transaction abort
  		 * is noncritical; the presumption would be that it aborted, anyway.
  		 *
! 		 * It's safe to change the delayChkpt flag of our own backend without
  		 * holding the ProcArrayLock, since we're the only one modifying it.
! 		 * This makes checkpoint's determination of which xacts are delayChkpt a
  		 * bit fuzzy, but it doesn't matter.
  		 */
  		START_CRIT_SECTION();
! 		MyPgXact->delayChkpt = true;
  
  		SetCurrentTransactionStopTimestamp();
  
***************
*** 1155,1161 **** RecordTransactionCommit(void)
  	 */
  	if (markXidCommitted)
  	{
! 		MyPgXact->inCommit = false;
  		END_CRIT_SECTION();
  	}
  
--- 1155,1161 ----
  	 */
  	if (markXidCommitted)
  	{
! 		MyPgXact->delayChkpt = false;
  		END_CRIT_SECTION();
  	}
  
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 733,738 **** XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
--- 733,739 ----
  	bool		doPageWrites;
  	bool		isLogSwitch = false;
  	bool		fpwChange = false;
+ 	bool		isHint = false;
  	uint8		info_orig = info;
  
  	/* cross-check on whether we should be here or not */
***************
*** 760,765 **** XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
--- 761,770 ----
  				fpwChange = true;
  				break;
  
+ 			case XLOG_HINT:
+ 				isHint = true;
+ 				break;
+ 
  			default:
  				break;
  		}
***************
*** 998,1003 **** begin:;
--- 1003,1020 ----
  	}
  
  	/*
+ 	 * If this is a hint record and we don't need a backup block then
+ 	 * we have no more work to do and can exit quickly without inserting
+ 	 * a WAL record at all. In that case return InvalidXLogRecPtr.
+ 	 */
+ 	if (isHint && !(info & XLR_BKP_BLOCK_MASK))
+ 	{
+ 		LWLockRelease(WALInsertLock);
+ 		END_CRIT_SECTION();
+ 		return InvalidXLogRecPtr;
+ 	}
+ 
+ 	/*
  	 * If there isn't enough space on the current XLOG page for a record
  	 * header, advance to the next page (leaving the unused space as zeroes).
  	 */
***************
*** 1280,1286 **** XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
  	/*
  	 * XXX We assume page LSN is first data on *every* page that can be passed
  	 * to XLogInsert, whether it otherwise has the standard page layout or
! 	 * not.
  	 */
  	*lsn = PageGetLSN(page);
  
--- 1297,1304 ----
  	/*
  	 * XXX We assume page LSN is first data on *every* page that can be passed
  	 * to XLogInsert, whether it otherwise has the standard page layout or
! 	 * not. We don't need the buffer header lock for PageGetLSN because we
! 	 * have exclusive lock on the page and/or the relation.
  	 */
  	*lsn = PageGetLSN(page);
  
***************
*** 3724,3729 **** RestoreBkpBlocks(XLogRecPtr lsn, XLogRecord *record, bool cleanup)
--- 3742,3753 ----
  				   BLCKSZ - (bkpb.hole_offset + bkpb.hole_length));
  		}
  
+ 		/*
+ 		 * Any checksum set on this page will be invalid. We don't need
+ 		 * to reset it here since it will be reset before being written
+ 		 * and changing data when we don't have a cleanup lock is bad juju.
+ 		 */
+ 
  		PageSetLSN(page, lsn);
  		PageSetTLI(page, ThisTimeLineID);
  		MarkBufferDirty(buffer);
***************
*** 7653,7660 **** CreateCheckPoint(int flags)
  	uint32		redo_logSeg;
  	uint32		insert_logId;
  	uint32		insert_logSeg;
! 	TransactionId *inCommitXids;
! 	int			nInCommit;
  
  	/*
  	 * An end-of-recovery checkpoint is really a shutdown checkpoint, just
--- 7677,7684 ----
  	uint32		redo_logSeg;
  	uint32		insert_logId;
  	uint32		insert_logSeg;
! 	VirtualTransactionId *vxids;
! 	int	nvxids;
  
  	/*
  	 * An end-of-recovery checkpoint is really a shutdown checkpoint, just
***************
*** 7828,7834 **** CreateCheckPoint(int flags)
  
  	/*
  	 * Before flushing data, we must wait for any transactions that are
! 	 * currently in their commit critical sections.  If an xact inserted its
  	 * commit record into XLOG just before the REDO point, then a crash
  	 * restart from the REDO point would not replay that record, which means
  	 * that our flushing had better include the xact's update of pg_clog.  So
--- 7852,7858 ----
  
  	/*
  	 * Before flushing data, we must wait for any transactions that are
! 	 * currently in commit or hint critical sections.  If an xact inserted its
  	 * commit record into XLOG just before the REDO point, then a crash
  	 * restart from the REDO point would not replay that record, which means
  	 * that our flushing had better include the xact's update of pg_clog.  So
***************
*** 7844,7864 **** CreateCheckPoint(int flags)
  	 * protected by different locks, but again that seems best on grounds of
  	 * minimizing lock contention.)
  	 *
! 	 * A transaction that has not yet set inCommit when we look cannot be at
  	 * risk, since he's not inserted his commit record yet; and one that's
  	 * already cleared it is not at risk either, since he's done fixing clog
  	 * and we will correctly flush the update below.  So we cannot miss any
  	 * xacts we need to wait for.
  	 */
! 	nInCommit = GetTransactionsInCommit(&inCommitXids);
! 	if (nInCommit > 0)
  	{
  		do
  		{
  			pg_usleep(10000L);	/* wait for 10 msec */
! 		} while (HaveTransactionsInCommit(inCommitXids, nInCommit));
  	}
! 	pfree(inCommitXids);
  
  	/*
  	 * Get the other info we need for the checkpoint record.
--- 7868,7888 ----
  	 * protected by different locks, but again that seems best on grounds of
  	 * minimizing lock contention.)
  	 *
! 	 * A transaction that has not yet set delayChkpt when we look cannot be at
  	 * risk, since he's not inserted his commit record yet; and one that's
  	 * already cleared it is not at risk either, since he's done fixing clog
  	 * and we will correctly flush the update below.  So we cannot miss any
  	 * xacts we need to wait for.
  	 */
! 	vxids = GetVirtualXIDsDelayingChkpt(&nvxids);
! 	if (nvxids > 0)
  	{
  		do
  		{
  			pg_usleep(10000L);	/* wait for 10 msec */
! 		} while (HaveVirtualXIDsDelayingChkpt(vxids, nvxids));
  	}
! 	pfree(vxids);
  
  	/*
  	 * Get the other info we need for the checkpoint record.
***************
*** 8443,8448 **** XLogRestorePoint(const char *rpName)
--- 8467,8517 ----
  }
  
  /*
+  * Write a backup block if needed when we are setting a hint. Note that
+  * this may be called for a variety of page types, not just heaps.
+  *
+  * Deciding the "if needed" part is delicate and requires us to either
+  * grab WALInsertLock or check the info_lck spinlock. If we check the
+  * spinlock and it says Yes then we will need to get WALInsertLock as well,
+  * so the design choice here is to just go straight for the WALInsertLock
+  * and trust that calls to this function are minimised elsewhere.
+  *
+  * Callable while holding just share lock on the buffer content.
+  *
+  * Possible that multiple concurrent backends could attempt to write
+  * WAL records. In that case, more than one backup block may be recorded
+  * though that isn't important to the outcome and the backup blocks are
+  * likely to be identical anyway.
+  */
+ #define	XLOG_HINT_WATERMARK		13579
+ XLogRecPtr
+ XLogRecordHint(Buffer buffer)
+ {
+ 	/*
+ 	 * Make an XLOG entry reporting the hint
+ 	 */
+ 	XLogRecData rdata[2];
+ 	int			watermark = XLOG_HINT_WATERMARK;
+ 
+ 	/*
+ 	 * Not allowed to have zero-length records, so use a small watermark
+ 	 */
+ 	rdata[0].data = (char *) (&watermark);
+ 	rdata[0].len = sizeof(int);
+ 	rdata[0].buffer = InvalidBuffer;
+ 	rdata[0].buffer_std = false;
+ 	rdata[0].next = &(rdata[1]);
+ 
+ 	rdata[1].data = NULL;
+ 	rdata[1].len = 0;
+ 	rdata[1].buffer = buffer;
+ 	rdata[1].buffer_std = true;
+ 	rdata[1].next = NULL;
+ 
+ 	return XLogInsert(RM_XLOG_ID, XLOG_HINT, rdata);
+ }
+ 
+ /*
   * Check if any of the GUC parameters that are critical for hot standby
   * have changed, and update the value in pg_control file if necessary.
   */
***************
*** 8540,8547 **** xlog_redo(XLogRecPtr lsn, XLogRecord *record)
  {
  	uint8		info = record->xl_info & ~XLR_INFO_MASK;
  
! 	/* Backup blocks are not used in xlog records */
! 	Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
  
  	if (info == XLOG_NEXTOID)
  	{
--- 8609,8616 ----
  {
  	uint8		info = record->xl_info & ~XLR_INFO_MASK;
  
! 	/* Backup blocks are not used in most xlog records */
! 	Assert(info == XLOG_HINT || !(record->xl_info & XLR_BKP_BLOCK_MASK));
  
  	if (info == XLOG_NEXTOID)
  	{
***************
*** 8681,8686 **** xlog_redo(XLogRecPtr lsn, XLogRecord *record)
--- 8750,8781 ----
  	{
  		/* nothing to do here */
  	}
+ 	else if (info == XLOG_HINT)
+ 	{
+ 		int	*watermark = (int *) XLogRecGetData(record);
+ 
+ 		/* Check the watermark is correct for the hint record */
+ 		Assert(*watermark == XLOG_HINT_WATERMARK);
+ 
+ 		/* Backup blocks must be present for smgr hint records */
+ 		Assert(record->xl_info & XLR_BKP_BLOCK_MASK);
+ 
+ 		/*
+ 		 * Hint records have no information that needs to be replayed.
+ 		 * The sole purpose of them is to ensure that a hint bit does
+ 		 * not cause a checksum invalidation if a hint bit write should
+ 		 * cause a torn page. So the body of the record is empty but
+ 		 * there must be one backup block.
+ 		 *
+ 		 * Since the only change in the backup block is a hint bit,
+ 		 * there is no confict with Hot Standby.
+ 		 *
+ 		 * This also means there is no corresponding API call for this,
+ 		 * so an smgr implementation has no need to implement anything.
+ 		 * Which means nothing is needed in md.c etc
+ 		 */
+ 		RestoreBkpBlocks(lsn, record, false);
+ 	}
  	else if (info == XLOG_BACKUP_END)
  	{
  		XLogRecPtr	startpoint;
***************
*** 8816,8821 **** xlog_desc(StringInfo buf, uint8 xl_info, char *rec)
--- 8911,8920 ----
  		appendStringInfo(buf, "restore point: %s", xlrec->rp_name);
  
  	}
+ 	else if (info == XLOG_HINT)
+ 	{
+ 		appendStringInfo(buf, "page hint");
+ 	}
  	else if (info == XLOG_BACKUP_END)
  	{
  		XLogRecPtr	startpoint;
*** a/src/backend/commands/tablecmds.c
--- b/src/backend/commands/tablecmds.c
***************
*** 8537,8542 **** copy_relation_data(SMgrRelation src, SMgrRelation dst,
--- 8537,8544 ----
  
  		smgrread(src, forkNum, blkno, buf);
  
+ 		PageSetVerificationInfoInplace(page);
+ 
  		/* XLOG stuff */
  		if (use_wal)
  			log_newpage(&dst->smgr_rnode.node, forkNum, blkno, page);
*** a/src/backend/storage/buffer/bufmgr.c
--- b/src/backend/storage/buffer/bufmgr.c
***************
*** 34,39 ****
--- 34,40 ----
  #include <unistd.h>
  
  #include "catalog/catalog.h"
+ #include "catalog/storage.h"
  #include "executor/instrument.h"
  #include "miscadmin.h"
  #include "pg_trace.h"
***************
*** 440,446 **** ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
  			smgrread(smgr, forkNum, blockNum, (char *) bufBlock);
  
  			/* check for garbage data */
! 			if (!PageHeaderIsValid((PageHeader) bufBlock))
  			{
  				if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages)
  				{
--- 441,447 ----
  			smgrread(smgr, forkNum, blockNum, (char *) bufBlock);
  
  			/* check for garbage data */
! 			if (!PageIsVerified((Page) bufBlock))
  			{
  				if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages)
  				{
***************
*** 465,471 **** ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
  	{
  		/* Only need to adjust flags */
  		bufHdr->flags |= BM_VALID;
! 	}
  	else
  	{
  		/* Set BM_VALID, terminate IO, and wake up any waiters */
--- 466,474 ----
  	{
  		/* Only need to adjust flags */
  		bufHdr->flags |= BM_VALID;
! 	[
! 
! }
  	else
  	{
  		/* Set BM_VALID, terminate IO, and wake up any waiters */
***************
*** 635,648 **** BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
  				 * victim.	We need lock to inspect the page LSN, so this
  				 * can't be done inside StrategyGetBuffer.
  				 */
! 				if (strategy != NULL &&
! 					XLogNeedsFlush(BufferGetLSN(buf)) &&
! 					StrategyRejectBuffer(strategy, buf))
  				{
! 					/* Drop lock/pin and loop around for another buffer */
! 					LWLockRelease(buf->content_lock);
! 					UnpinBuffer(buf, true);
! 					continue;
  				}
  
  				/* OK, do the I/O */
--- 638,660 ----
  				 * victim.	We need lock to inspect the page LSN, so this
  				 * can't be done inside StrategyGetBuffer.
  				 */
! 				if (strategy != NULL)
  				{
! 					XLogRecPtr	lsn;
! 
! 					/* Read the LSN while holding buffer header lock */
! 					LockBufHdr(buf);
! 					lsn = BufferGetLSN(buf);
! 					UnlockBufHdr(buf);
! 
! 					if (XLogNeedsFlush(lsn) &&
! 						StrategyRejectBuffer(strategy, buf))
! 					{
! 						/* Drop lock/pin and loop around for another buffer */
! 						LWLockRelease(buf->content_lock);
! 						UnpinBuffer(buf, true);
! 						continue;
! 					}
  				}
  
  				/* OK, do the I/O */
***************
*** 1873,1878 **** FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln)
--- 1885,1892 ----
  {
  	XLogRecPtr	recptr;
  	ErrorContextCallback errcontext;
+ 	Block		bufBlock;
+ 	char		*bufToWrite;
  
  	/*
  	 * Acquire the buffer's io_in_progress lock.  If StartBufferIO returns
***************
*** 1901,1912 **** FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln)
  										reln->smgr_rnode.node.dbNode,
  										reln->smgr_rnode.node.relNode);
  
  	/*
  	 * Force XLOG flush up to buffer's LSN.  This implements the basic WAL
  	 * rule that log updates must hit disk before any of the data-file changes
  	 * they describe do.
  	 */
- 	recptr = BufferGetLSN(buf);
  	XLogFlush(recptr);
  
  	/*
--- 1915,1936 ----
  										reln->smgr_rnode.node.dbNode,
  										reln->smgr_rnode.node.relNode);
  
+ 	LockBufHdr(buf);
+ 
+ 	/*
+ 	 * Run PageGetLSN while holding header lock.
+ 	 */
+ 	recptr = BufferGetLSN(buf);
+ 
+ 	/* To check if block content changes while flushing. - vadim 01/17/97 */
+ 	buf->flags &= ~BM_JUST_DIRTIED;
+ 	UnlockBufHdr(buf);
+ 
  	/*
  	 * Force XLOG flush up to buffer's LSN.  This implements the basic WAL
  	 * rule that log updates must hit disk before any of the data-file changes
  	 * they describe do.
  	 */
  	XLogFlush(recptr);
  
  	/*
***************
*** 1915,1929 **** FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln)
  	 * we have the io_in_progress lock.
  	 */
  
! 	/* To check if block content changes while flushing. - vadim 01/17/97 */
! 	LockBufHdr(buf);
! 	buf->flags &= ~BM_JUST_DIRTIED;
! 	UnlockBufHdr(buf);
  
  	smgrwrite(reln,
  			  buf->tag.forkNum,
  			  buf->tag.blockNum,
! 			  (char *) BufHdrGetBlock(buf),
  			  false);
  
  	pgBufferUsage.shared_blks_written++;
--- 1939,1955 ----
  	 * we have the io_in_progress lock.
  	 */
  
! 	bufBlock = BufHdrGetBlock(buf);
  
+ 	bufToWrite = PageSetVerificationInfoOnCopy((Page) bufBlock);
+ 
+ 	/*
+ 	 * bufToWrite is either the shared buffer or a copy, as appropriate.
+ 	 */
  	smgrwrite(reln,
  			  buf->tag.forkNum,
  			  buf->tag.blockNum,
! 			  bufToWrite,
  			  false);
  
  	pgBufferUsage.shared_blks_written++;
***************
*** 1934,1939 **** FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln)
--- 1960,1967 ----
  	 */
  	TerminateBufferIO(buf, true, 0);
  
+ 	/* XXX Assert(buf is not BM_JUST_DIRTIED) */
+ 
  	TRACE_POSTGRESQL_BUFFER_FLUSH_DONE(buf->tag.forkNum,
  									   buf->tag.blockNum,
  									   reln->smgr_rnode.node.spcNode,
***************
*** 2326,2331 **** void
--- 2354,2360 ----
  SetBufferCommitInfoNeedsSave(Buffer buffer)
  {
  	volatile BufferDesc *bufHdr;
+ 	Page 	page = BufferGetPage(buffer);
  
  	if (!BufferIsValid(buffer))
  		elog(ERROR, "bad buffer ID: %d", buffer);
***************
*** 2354,2368 **** SetBufferCommitInfoNeedsSave(Buffer buffer)
  	if ((bufHdr->flags & (BM_DIRTY | BM_JUST_DIRTIED)) !=
  		(BM_DIRTY | BM_JUST_DIRTIED))
  	{
  		bool		dirtied = false;
  
  		LockBufHdr(bufHdr);
  		Assert(bufHdr->refcount > 0);
  		if (!(bufHdr->flags & BM_DIRTY))
! 			dirtied = true;
  		bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
  		UnlockBufHdr(bufHdr);
  
  		if (dirtied)
  		{
  			VacuumPageDirty++;
--- 2383,2487 ----
  	if ((bufHdr->flags & (BM_DIRTY | BM_JUST_DIRTIED)) !=
  		(BM_DIRTY | BM_JUST_DIRTIED))
  	{
+ 		XLogRecPtr	lsn = {0, 0};
  		bool		dirtied = false;
+ 		bool		delayChkpt = false;
+ 
+ 		/*
+ 		 * If we are currently writing checksums, or if the page has a
+ 		 * checksum set then we must check to see if a full page image is
+ 		 * required to protect against torn pages.
+ 		 *
+ 		 * If page_checksums is set then we must write a new checksum
+ 		 * for any change, even a hint. If we write a checksum and we crash
+ 		 * it could leave torn pages on disk. So to protect against that
+ 		 * a hint must be written if this is the first change on the block
+ 		 * since the last checkpoint.
+ 		 *
+ 		 * Even if page_checksums is off, we may also need to write a full
+ 		 * page image. This protects against events that can occur if
+ 		 * page_checksums is turned on and off and then on again. In that case,
+ 		 * if a page has a checksum set and then we write a page
+ 		 * while page_checksums = off a torn page could result in a page
+ 		 * that has a checksums set and yet not match the actual page.
+ 		 * So even if page_checksums = off we write a full page image if the
+ 		 * page is marked as having a checksum, even though we are going
+ 		 * to reset the checksum.
+ 		 *
+ 		 * We don't check full_page_writes here because that can be set
+ 		 * on dynamically during a backup, so even if the usual value is
+ 		 * off we may still need to use a full page image.
+ 		 */
+ 		if (page_checksums || !PageHasNoChecksum(page))
+ 		{
+ 			/*
+ 			 * If we're in recovery we cannot dirty a page because of a hint.
+ 			 * We can set the hint, just not dirty the page as a result so
+ 			 * the hint is lost when we evict the page or shutdown.
+ 			 *
+ 			 * See long discussion in bufpage.c
+ 			 */
+ 			if (RecoveryInProgress())
+ 				return;
+ 
+ 			/*
+ 			 * If the block is already dirty because we either made a change
+ 			 * or set a hint already, then we don't need to write a full page
+ 			 * image.  Note that aggressive cleaning of blocks
+ 			 * dirtied by hint bit setting would increase the call rate.
+ 			 * Bulk setting of hint bits would reduce the call rate...
+ 			 *
+ 			 * We must issue the WAL record before we mark the buffer dirty.
+ 			 * Otherwise we might write the page before we write the WAL.
+ 			 * That causes a race condition, since a checkpoint might occur
+ 			 * between writing the WAL record and marking the buffer dirty.
+ 			 * We solve that with a kluge, but one that is already in use
+ 			 * during transaction commit to prevent race conditions.
+ 			 * Basically, we simply prevent the checkpoint WAL record from
+ 			 * being written until we have marked the buffer dirty. We don't
+ 			 * start the checkpoint flush until we have marked dirty, so our
+ 			 * checkpoint must flush the change to disk successfully or the
+ 			 * checkpoint never gets written, so crash recovery will fix.
+ 			 *
+ 			 * It's possible we may enter here without an xid, so it is
+ 			 * essential that CreateCheckpoint waits for virtual transactions
+ 			 * rather than full transactionids.
+ 			 */
+ 			MyPgXact->delayChkpt = delayChkpt = true;
+ 			lsn = XLogRecordHint(buffer);
+ 		}
  
  		LockBufHdr(bufHdr);
  		Assert(bufHdr->refcount > 0);
  		if (!(bufHdr->flags & BM_DIRTY))
! 		{
! 			dirtied = true;		/* Means "will be dirtied by this action" */
! 
! 			/*
! 			 * Set the page LSN if we wrote a backup block. We aren't
! 			 * supposed to set this when only holding a share lock but
! 			 * as long as we serialise it somehow we're OK. We choose to
! 			 * set LSN while holding the buffer header lock, which causes
! 			 * any reader of an LSN who holds only a share lock to also
! 			 * obtain a buffer header lock before using PageGetLSN().
! 			 * Fortunately, thats not too many places.
! 			 *
! 			 * If !page_checksums and PageHasChecksum() you might think
! 			 * we should reset the checksum here. That will happen when
! 			 * the page is written sometime later in this checkpoint cycle.
! 			 */
! 			if (!XLogRecPtrIsInvalid(lsn))
! 			{
! 				PageSetLSN(page, lsn);
! 				PageSetTLI(page, ThisTimeLineID);
! 			}
! 		}
  		bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
  		UnlockBufHdr(bufHdr);
  
+ 		if (delayChkpt)
+ 			MyPgXact->delayChkpt = false;
+ 
  		if (dirtied)
  		{
  			VacuumPageDirty++;
*** a/src/backend/storage/buffer/localbuf.c
--- b/src/backend/storage/buffer/localbuf.c
***************
*** 200,205 **** LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
--- 200,207 ----
  		/* Find smgr relation for buffer */
  		oreln = smgropen(bufHdr->tag.rnode, MyBackendId);
  
+ 		/* XXX do we want to write checksums for local buffers? An option? */
+ 
  		/* And write... */
  		smgrwrite(oreln,
  				  bufHdr->tag.forkNum,
*** a/src/backend/storage/ipc/procarray.c
--- b/src/backend/storage/ipc/procarray.c
***************
*** 398,404 **** ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
  		pgxact->xmin = InvalidTransactionId;
  		/* must be cleared with xid/xmin: */
  		pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
! 		pgxact->inCommit = false; /* be sure this is cleared in abort */
  		proc->recoveryConflictPending = false;
  
  		/* Clear the subtransaction-XID cache too while holding the lock */
--- 398,404 ----
  		pgxact->xmin = InvalidTransactionId;
  		/* must be cleared with xid/xmin: */
  		pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
! 		pgxact->delayChkpt = false; /* be sure this is cleared in abort */
  		proc->recoveryConflictPending = false;
  
  		/* Clear the subtransaction-XID cache too while holding the lock */
***************
*** 425,431 **** ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
  		pgxact->xmin = InvalidTransactionId;
  		/* must be cleared with xid/xmin: */
  		pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
! 		pgxact->inCommit = false; /* be sure this is cleared in abort */
  		proc->recoveryConflictPending = false;
  
  		Assert(pgxact->nxids == 0);
--- 425,431 ----
  		pgxact->xmin = InvalidTransactionId;
  		/* must be cleared with xid/xmin: */
  		pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
! 		pgxact->delayChkpt = false; /* be sure this is cleared in abort */
  		proc->recoveryConflictPending = false;
  
  		Assert(pgxact->nxids == 0);
***************
*** 460,466 **** ProcArrayClearTransaction(PGPROC *proc)
  
  	/* redundant, but just in case */
  	pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
! 	pgxact->inCommit = false;
  
  	/* Clear the subtransaction-XID cache too */
  	pgxact->nxids = 0;
--- 460,466 ----
  
  	/* redundant, but just in case */
  	pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
! 	pgxact->delayChkpt = false;
  
  	/* Clear the subtransaction-XID cache too */
  	pgxact->nxids = 0;
***************
*** 1723,1787 **** GetOldestActiveTransactionId(void)
  }
  
  /*
!  * GetTransactionsInCommit -- Get the XIDs of transactions that are committing
   *
!  * Constructs an array of XIDs of transactions that are currently in commit
!  * critical sections, as shown by having inCommit set in their PGPROC entries.
   *
!  * *xids_p is set to a palloc'd array that should be freed by the caller.
!  * The return value is the number of valid entries.
   *
!  * Note that because backends set or clear inCommit without holding any lock,
   * the result is somewhat indeterminate, but we don't really care.  Even in
   * a multiprocessor with delayed writes to shared memory, it should be certain
!  * that setting of inCommit will propagate to shared memory when the backend
!  * takes the WALInsertLock, so we cannot fail to see an xact as inCommit if
   * it's already inserted its commit record.  Whether it takes a little while
!  * for clearing of inCommit to propagate is unimportant for correctness.
   */
! int
! GetTransactionsInCommit(TransactionId **xids_p)
  {
  	ProcArrayStruct *arrayP = procArray;
! 	TransactionId *xids;
! 	int			nxids;
  	int			index;
  
! 	xids = (TransactionId *) palloc(arrayP->maxProcs * sizeof(TransactionId));
! 	nxids = 0;
  
  	LWLockAcquire(ProcArrayLock, LW_SHARED);
  
  	for (index = 0; index < arrayP->numProcs; index++)
  	{
  		int		pgprocno = arrayP->pgprocnos[index];
! 		volatile PGXACT *pgxact = &allPgXact[pgprocno];
! 		TransactionId pxid;
  
! 		/* Fetch xid just once - see GetNewTransactionId */
! 		pxid = pgxact->xid;
  
! 		if (pgxact->inCommit && TransactionIdIsValid(pxid))
! 			xids[nxids++] = pxid;
  	}
  
  	LWLockRelease(ProcArrayLock);
  
! 	*xids_p = xids;
! 	return nxids;
  }
  
  /*
!  * HaveTransactionsInCommit -- Are any of the specified XIDs in commit?
   *
!  * This is used with the results of GetTransactionsInCommit to see if any
!  * of the specified XIDs are still in their commit critical sections.
   *
!  * Note: this is O(N^2) in the number of xacts that are/were in commit, but
   * those numbers should be small enough for it not to be a problem.
   */
  bool
! HaveTransactionsInCommit(TransactionId *xids, int nxids)
  {
  	bool		result = false;
  	ProcArrayStruct *arrayP = procArray;
--- 1723,1792 ----
  }
  
  /*
!  * GetVirtualXIDsDelayingChkpt -- Get the VXIDs of transactions that are
!  * delaying checkpoint because they have critical actions in progress.
   *
!  * Constructs an array of VXIDs of transactions that are currently in commit
!  * critical sections, as shown by having delayChkpt set in their PGXACT.
   *
!  * Returns a palloc'd array that should be freed by the caller.
!  * *nvxids is the number of valid entries.
   *
!  * Note that because backends set or clear delayChkpt without holding any lock,
   * the result is somewhat indeterminate, but we don't really care.  Even in
   * a multiprocessor with delayed writes to shared memory, it should be certain
!  * that setting of delayChkpt will propagate to shared memory when the backend
!  * takes a lock, so we cannot fail to see an virtual xact as delayChkpt if
   * it's already inserted its commit record.  Whether it takes a little while
!  * for clearing of delayChkpt to propagate is unimportant for correctness.
   */
! VirtualTransactionId *
! GetVirtualXIDsDelayingChkpt(int *nvxids)
  {
+ 	VirtualTransactionId *vxids;
  	ProcArrayStruct *arrayP = procArray;
! 	int			count = 0;
  	int			index;
  
! 	/* allocate what's certainly enough result space */
! 	vxids = (VirtualTransactionId *)
! 		palloc(sizeof(VirtualTransactionId) * arrayP->maxProcs);
  
  	LWLockAcquire(ProcArrayLock, LW_SHARED);
  
  	for (index = 0; index < arrayP->numProcs; index++)
  	{
  		int		pgprocno = arrayP->pgprocnos[index];
! 		volatile PGPROC    *proc = &allProcs[pgprocno];
! 		volatile PGXACT    *pgxact = &allPgXact[pgprocno];
  
! 		if (pgxact->delayChkpt)
! 		{
! 			VirtualTransactionId vxid;
  
! 			GET_VXID_FROM_PGPROC(vxid, *proc);
! 			if (VirtualTransactionIdIsValid(vxid))
! 				vxids[count++] = vxid;
! 		}
  	}
  
  	LWLockRelease(ProcArrayLock);
  
! 	*nvxids = count;
! 	return vxids;
  }
  
  /*
!  * HaveVirtualXIDsDelayingChkpt -- Are any of the specified VXIDs delaying?
   *
!  * This is used with the results of GetVirtualXIDsDelayingChkpt to see if any
!  * of the specified VXIDs are still in critical sections of code.
   *
!  * Note: this is O(N^2) in the number of vxacts that are/were delaying, but
   * those numbers should be small enough for it not to be a problem.
   */
  bool
! HaveVirtualXIDsDelayingChkpt(VirtualTransactionId *vxids, int nvxids)
  {
  	bool		result = false;
  	ProcArrayStruct *arrayP = procArray;
***************
*** 1789,1818 **** HaveTransactionsInCommit(TransactionId *xids, int nxids)
  
  	LWLockAcquire(ProcArrayLock, LW_SHARED);
  
! 	for (index = 0; index < arrayP->numProcs; index++)
  	{
! 		int		pgprocno = arrayP->pgprocnos[index];
! 		volatile PGXACT    *pgxact = &allPgXact[pgprocno];
! 		TransactionId	pxid;
! 
! 		/* Fetch xid just once - see GetNewTransactionId */
! 		pxid = pgxact->xid;
! 
! 		if (pgxact->inCommit && TransactionIdIsValid(pxid))
  		{
! 			int			i;
  
! 			for (i = 0; i < nxids; i++)
  			{
! 				if (xids[i] == pxid)
  				{
  					result = true;
  					break;
  				}
  			}
- 			if (result)
- 				break;
  		}
  	}
  
  	LWLockRelease(ProcArrayLock);
--- 1794,1825 ----
  
  	LWLockAcquire(ProcArrayLock, LW_SHARED);
  
! 	while (VirtualTransactionIdIsValid(*vxids))
  	{
! 		for (index = 0; index < arrayP->numProcs; index++)
  		{
! 			int		pgprocno = arrayP->pgprocnos[index];
! 			volatile PGPROC    *proc = &allProcs[pgprocno];
! 			volatile PGXACT    *pgxact = &allPgXact[pgprocno];
! 			VirtualTransactionId vxid;
  
! 			GET_VXID_FROM_PGPROC(vxid, *proc);
! 			if (VirtualTransactionIdIsValid(vxid))
  			{
! 				if (VirtualTransactionIdEquals(vxid, *vxids) &&
! 					pgxact->delayChkpt)
  				{
  					result = true;
  					break;
  				}
  			}
  		}
+ 
+ 		if (result)
+ 			break;
+ 
+ 		/* The virtual transaction is gone now, wait for the next one */
+ 		vxids++;
  	}
  
  	LWLockRelease(ProcArrayLock);
*** a/src/backend/storage/lmgr/proc.c
--- b/src/backend/storage/lmgr/proc.c
***************
*** 356,362 **** InitProcess(void)
  	MyProc->backendId = InvalidBackendId;
  	MyProc->databaseId = InvalidOid;
  	MyProc->roleId = InvalidOid;
! 	MyPgXact->inCommit = false;
  	MyPgXact->vacuumFlags = 0;
  	/* NB -- autovac launcher intentionally does not set IS_AUTOVACUUM */
  	if (IsAutoVacuumWorkerProcess())
--- 356,362 ----
  	MyProc->backendId = InvalidBackendId;
  	MyProc->databaseId = InvalidOid;
  	MyProc->roleId = InvalidOid;
! 	MyPgXact->delayChkpt = false;
  	MyPgXact->vacuumFlags = 0;
  	/* NB -- autovac launcher intentionally does not set IS_AUTOVACUUM */
  	if (IsAutoVacuumWorkerProcess())
***************
*** 514,520 **** InitAuxiliaryProcess(void)
  	MyProc->backendId = InvalidBackendId;
  	MyProc->databaseId = InvalidOid;
  	MyProc->roleId = InvalidOid;
! 	MyPgXact->inCommit = false;
  	MyPgXact->vacuumFlags = 0;
  	MyProc->lwWaiting = false;
  	MyProc->lwWaitMode = 0;
--- 514,520 ----
  	MyProc->backendId = InvalidBackendId;
  	MyProc->databaseId = InvalidOid;
  	MyProc->roleId = InvalidOid;
! 	MyPgXact->delayChkpt = false;
  	MyPgXact->vacuumFlags = 0;
  	MyProc->lwWaiting = false;
  	MyProc->lwWaitMode = 0;
*** a/src/backend/storage/page/bufpage.c
--- b/src/backend/storage/page/bufpage.c
***************
*** 16,21 ****
--- 16,27 ----
  
  #include "access/htup.h"
  
+ bool page_checksums = false;
+ 
+ static Page pageCopy;	/* temporary buffer to allow checksum calculation */
+ 
+ static bool PageVerificationInfoOK(Page page);
+ static uint16 PageCalcChecksum16(Page page);
  
  /* ----------------------------------------------------------------
   *						Page support functions
***************
*** 25,30 ****
--- 31,40 ----
  /*
   * PageInit
   *		Initializes the contents of a page.
+  *		Note that we don't automatically add a checksum, or flag that the
+  * 		page has a checksum field. We start with a normal page layout and defer
+  *		the decision on what page verification will be written just before
+  *		we write the block to disk.
   */
  void
  PageInit(Page page, Size pageSize, Size specialSize)
***************
*** 67,87 **** PageInit(Page page, Size pageSize, Size specialSize)
   * will clean up such a page and make it usable.
   */
  bool
! PageHeaderIsValid(PageHeader page)
  {
  	char	   *pagebytes;
  	int			i;
  
! 	/* Check normal case */
! 	if (PageGetPageSize(page) == BLCKSZ &&
! 		PageGetPageLayoutVersion(page) == PG_PAGE_LAYOUT_VERSION &&
! 		(page->pd_flags & ~PD_VALID_FLAG_BITS) == 0 &&
! 		page->pd_lower >= SizeOfPageHeaderData &&
! 		page->pd_lower <= page->pd_upper &&
! 		page->pd_upper <= page->pd_special &&
! 		page->pd_special <= BLCKSZ &&
! 		page->pd_special == MAXALIGN(page->pd_special))
! 		return true;
  
  	/* Check all-zeroes case */
  	pagebytes = (char *) page;
--- 77,102 ----
   * will clean up such a page and make it usable.
   */
  bool
! PageIsVerified(Page page)
  {
+ 	PageHeader	p = (PageHeader) page;
  	char	   *pagebytes;
  	int			i;
  
! 	/*
! 	 * Don't verify page data unless the page passes basic non-zero test
! 	 */
! 	if (p->pd_lower >= SizeOfPageHeaderData)
! 	{
! 		/* Check normal case */
! 		if (PageVerificationInfoOK(page) &&
! 			(p->pd_flags & ~PD_VALID_FLAG_BITS) == 0 &&
! 			p->pd_lower <= p->pd_upper &&
! 			p->pd_upper <= p->pd_special &&
! 			p->pd_special <= BLCKSZ &&
! 			p->pd_special == MAXALIGN(p->pd_special))
! 			return true;
! 	}
  
  	/* Check all-zeroes case */
  	pagebytes = (char *) page;
***************
*** 827,829 **** PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
--- 842,1152 ----
  
  	pfree(itemidbase);
  }
+ 
+ /*
+  * Test whether the page verification information is correct or not.
+  *
+  * IMPORTANT NOTE -
+  * Verification info is not valid at all times on a data page. We set
+  * verification info before we flush page/buffer, and implicitly invalidate
+  * verification info when we write to the page. A heavily accessed buffer
+  * might then spend most of its life with invalid page verification info,
+  * so testing verification info on random pages in the buffer pool will tell
+  * you nothing. The reason for this is that page verification info protects
+  * Postgres data from errors on the filesystems on which we rely. We do not
+  * protect buffers against uncorrectable memory errors, since these have a
+  * very low measured incidence according to research on large server farms,
+  * http://www.cs.toronto.edu/~bianca/papers/sigmetrics09.pdf, discussed 2010/12/22.
+  *
+  * To confirm your understanding that means that WAL-logged changes to a page
+  * do NOT update the page verification info, so full page images may not have
+  * correct verification information on them. But those page images have the
+  * WAL CRC covering them and so are verified separately from this mechanism.
+  *
+  * Any write of a data block can cause a torn page if the write is unsuccessful.
+  * Full page writes protect us from that, which are stored in WAL. Setting
+  * hint bits when a page is already dirty is OK because a full page write
+  * must already have been written for that since the last checkpoint.
+  * Setting hint bits on an otherwise clean page can allow torn pages; this
+  * doesn't normally matter since they are just hints. When the page has
+  * checksums, losing a few bits would cause the checksum to be invalid.
+  * So if we have full_page_writes = on and page_checksums = on then we must
+  * write a WAL record specifically so that we record a full page image in WAL.
+  * New WAL records cannot be written during recovery, so hint bits set
+  * during recovery must not dirty the page if the buffer is not already dirty,
+  * when page_checksums = on.
+  *
+  * So we cannot enable/disable page_checksums except at a checkpoint if
+  * full_page_writes is enabled. We choose to only allow changes at server start.
+  *
+  * WAL replay ignores page verification info unless it writes out or reads in
+  * blocks from disk; restoring full page images does not check verification
+  * info via this function. In recovery, since we only dirty a block when we
+  * have a full page image available if we crash, we are fully OK to use page
+  * verification normally.
+  *
+  * The best way to understand this is that WAL CRCs protect records entering
+  * the WAL stream, and page verification protects blocks entering and leaving
+  * the buffer pool. They are similar in purpose, yet completely separate.
+  * Together they ensure we are able to detect errors in data leaving and
+  * re-entering PostgreSQL controlled memory. Note also that the WAL checksum
+  * is a CRC, whereas the page checksum is a Fletcher16 checksum, not a CRC.
+  *
+  * Note also that the verification mechanism can vary from page to page.
+  * All we do here is look at what the page itself says is the verification
+  * mechanism and then apply that test. This allows us to run without the CPU
+  * cost of verification if we choose, as well as to provide an upgrade path
+  * for anyone doing direct upgrades using pg_upgrade.
+  *
+  * There is some concern that trusting page data to say how to check page
+  * data is dangerously self-referential. To ensure no mistakes we set two
+  * non-adjacent bits to signify that the page has a checksum and
+  * should be verified when that block is read back into a buffer.
+  * We use two bits in case a multiple bit error removes one of the checksum
+  * flags *and* destroys data, which would lead to skipping the checksum check
+  * and silently accepting bad data. We also require that a third bit is
+  * zeroed, to protect against a run of 1s being written to the page header.
+  *
+  * This function returns a boolean, not a full damage assessment.
+  */
+ static bool
+ PageVerificationInfoOK(Page page)
+ {
+ 	PageHeader	p = (PageHeader) page;
+ 
+ 	/* Quick exit if nobody cares about checksumming */
+ 	if (!page_checksums)
+ 		return true;
+ 
+ 	if (PageHasChecksum(p))
+ 	{
+ 		uint16	checksum = PageCalcChecksum16(page);
+ 
+ 		if (checksum == p->pd_verify.pd_checksum16)
+ 		{
+ #ifdef CHECK_HOLE
+ 			/* Also check page hole is all-zeroes */
+ 			char	   *pagebytes;
+ 			bool		empty = true;
+ 			int			i;
+ 
+ 			pagebytes = (char *) page;
+ 			for (i = p->pd_lower; i < p->pd_upper; i++)
+ 			{
+ 				if (pagebytes[i] != 0)
+ 				{
+ 					empty = false;
+ 					break;
+ 				}
+ 			}
+ 
+ 			if (!empty)
+ 				elog(LOG, "hole was not empty at byte %d pd_lower %d pd_upper %d",
+ 								i, p->pd_lower, p->pd_upper);
+ #endif
+ 			return true;
+ 		}
+ 
+ 		elog(LOG, "page verification failed - checksum was %u page checksum field is %u",
+ 						checksum, p->pd_verify.pd_checksum16);
+ 	}
+ 	else if (PageHasNoChecksum(p))
+ 	{
+ 		uint16		pd_pagesize_version = BLCKSZ | PG_PAGE_LAYOUT_VERSION;
+ 
+ 		if (p->pd_verify.pd_pagesize_version == pd_pagesize_version)
+ 			return true;
+ 
+ 		elog(LOG, "page verification failed - page size and version set incorrectly %u",
+ 						p->pd_verify.pd_checksum16);
+ 	}
+ 	else
+ 		elog(LOG, "page verification failed - page flags are incorrectly set");
+ 
+ 	return false;
+ }
+ 
+ #define PageSetChecksum(page) \
+ do \
+ { \
+ 	PageHeader	p = (PageHeader) page; \
+ 	p->pd_flags |= PD_PAGE_VERSION_PLUS1; \
+ 	p->pd_flags |= PD_CHECKSUM1; \
+ 	p->pd_flags &= ~PD_CHECKSUM2; \
+ 	p->pd_verify.pd_checksum16 = PageCalcChecksum16(page); \
+ } while (0);
+ 
+ /* ensure any older checksum info is overwritten with watermark */
+ #define PageResetVersion(page) \
+ do \
+ { \
+ 	PageHeader	p = (PageHeader) page; \
+ 	if (!PageHasNoChecksum(p)) \
+ 	{ \
+ 		p->pd_flags &= ~PD_PAGE_VERSION_PLUS1; \
+ 		p->pd_flags &= ~PD_CHECKSUM1; \
+ 		p->pd_flags &= ~PD_CHECKSUM2; \
+ 		PageSetPageSizeAndVersion(p, BLCKSZ, PG_PAGE_LAYOUT_VERSION); \
+ 	} \
+ } while (0);
+ 
+ /*
+  * Set verification info for page in shared buffers.
+  *
+  * Either we set a new checksum, or we set the standard watermark. We must
+  * not leave an invalid checksum in place. Note that the verification info is
+  * not WAL logged, whereas the data changes to pages are, so data is safe
+  * whether or not we have page_checksums enabled. The purpose of checksums
+  * is to detect page corruption to allow replacement from backup.
+  *
+  * Returns a pointer to the block-sized data that needs to be written. That
+  * allows us to either copy, or not, depending upon whether we checksum.
+  */
+ char *
+ PageSetVerificationInfoOnCopy(Page page)
+ {
+ 	if (PageIsNew(page))
+ 		return (char *) page;
+ 
+ 	if (page_checksums)
+ 	{
+ 		if (pageCopy == NULL)
+ 		{
+ 			pageCopy = (Page) malloc(BLCKSZ);
+ 			if (pageCopy == NULL)
+ 				ereport(ERROR,
+ 					(errcode(ERRCODE_OUT_OF_MEMORY),
+ 					 errmsg("out of memory")));
+ 		}
+ 
+ 		/*
+ 		 * We make a copy iff we need to calculate a checksum because other
+ 		 * backends may set hint bits on this page while we write, which
+ 		 * would mean the checksum differs from the page contents. It doesn't
+ 		 * matter if we include or exclude hints during the copy, as long
+ 		 * as we write a valid page and associated checksum.
+ 		 */
+ 		memcpy((char *) pageCopy, (char *) page, BLCKSZ);
+ 		PageSetChecksum(pageCopy);
+ 		return (char *) pageCopy;
+ 	}
+ 
+ 	PageResetVersion(page);
+ 	return (char *) page;
+ }
+ 
+ /*
+  * Set verification info for page in private memory.
+  *
+  * This is a simpler version of PageSetVerificationInfoOnCopy(). The more
+  * explicit API allows us to more easily see if we're making the correct call
+  * and reduces the amount of additional code specific to page verification.
+  */
+ void
+ PageSetVerificationInfoInplace(Page page)
+ {
+ 	if (PageIsNew(page))
+ 		return;
+ 
+ 	if (page_checksums)
+ 	{
+ 		PageSetChecksum(page);
+ 		return;
+ 	}
+ 
+ 	PageResetVersion(page);
+ 	return;
+ }
+ 
+ /*
+  * Calculate checksum for a PostgreSQL Page. We do this in 3 steps, first
+  * we calculate the checksum for the header, avoiding the verification
+  * info, which will be added afterwards. Next, we add the line pointers up to
+  * the hole in the middle of the block at pd_lower. Last, we add the tail
+  * of the page from pd_upper to the end of page.
+  */
+ static uint16
+ PageCalcChecksum16(Page page)
+ {
+ #define PAGE_VERIFICATION_USES_FLETCHER16 (true)
+ #ifdef PAGE_VERIFICATION_USES_FLETCHER16
+ 	/*
+ 	 * Following calculation is a Flecther's 16 checksum. The calc is isolated
+ 	 * here and tuning and/or replacement algorithms are possible.
+ 	 */
+ 	PageHeader	p = (PageHeader) page;
+ 	uint	page_header_stop = (uint)(offsetof(PageHeaderData, pd_special) + sizeof(LocationIndex));
+ 	uint	page_lower_start = (uint)(offsetof(PageHeaderData, pd_prune_xid));
+ 	uint	page_lower_stop;
+ 	uint 	sum1 = 0;
+ 	uint64	sum2 = 0;
+ 	int		i;
+ 
+ 	/*
+ 	 * Avoid calculating checksum if page is new, just return a value that
+ 	 * will cause the check to fail. We may still pass the all-zeroes check.
+ 	 */
+ 	if (PageIsNew(page))
+ 		return 1;
+ 
+ 	/*
+ 	 * Just add in the pd_prune_xid if there are no line pointers yet.
+ 	 */
+ 	page_lower_stop = p->pd_lower;
+ 	if (page_lower_stop == 0)
+ 		page_lower_stop = page_lower_start + sizeof(TransactionId);
+ 
+ 	Assert(p->pd_upper != 0);
+ 
+ #ifdef DEBUG_CHECKSUM
+ 	elog(LOG, "calculating checksum for %u-%u %u-%u %u-%u",
+ 			0,	/* page_header_start */
+ 			page_header_stop,
+ 			page_lower_start,
+ 			page_lower_stop,
+ 			p->pd_upper,
+ 			BLCKSZ
+ 			);
+ #endif
+ 
+ #define	COMP_F16(from, to) \
+ do { \
+ 	for (i = from; i < to; i++) \
+ 	{ \
+ 			sum1 = sum1 + page[i]; \
+ 			sum2 = sum1 + sum2; \
+ 	} \
+ 	sum1 %= 255; \
+ 	sum2 %= 255; \
+ } while (0); \
+ 
+ #ifdef IGNORE_PAGE_HOLE
+ 	COMP_F16(0,
+ 			 page_header_stop);
+ 
+ 	/* ignore the checksum field since not done yet... */
+ 
+ 	COMP_F16(page_lower_start,
+ 			 page_lower_stop);
+ 
+ 	/* ignore the hole in the middle of the block */
+ 
+ 	COMP_F16(p->pd_upper,
+ 			 BLCKSZ - 1);
+ #else
+ 	COMP_F16(0,
+ 			 page_header_stop);
+ 
+ 	/* ignore the checksum field since not done yet... */
+ 
+ 	COMP_F16(page_lower_start,
+ 			 BLCKSZ - 1);
+ #endif
+ 
+ #ifdef DEBUG_CHECKSUM
+ 	elog(LOG, "checksum %u", ((sum2 << 8) | sum1));
+ #endif
+ 
+ 	return ((sum2 << 8) | sum1);
+ #endif
+ }
*** a/src/backend/utils/misc/guc.c
--- b/src/backend/utils/misc/guc.c
***************
*** 830,835 **** static struct config_bool ConfigureNamesBool[] =
--- 830,849 ----
  		NULL, NULL, NULL
  	},
  	{
+ 		{"page_checksums", PGC_POSTMASTER, WAL_SETTINGS,
+ 			gettext_noop("Marks database blocks with a checksum before writing them to disk. "),
+ 			gettext_noop("When enabled all database blocks will be marked with a checksum before writing to disk. "
+ 						 "When we read a database block from disk the checksum is checked, if it exists. "
+ 						 "If there is no checksum marked yet then no check is performed, though a "
+ 						 "checksum will be added later when we re-write the database block. "
+ 						 "When disabled checksums will be ignored, even if the block was marked "
+ 						 "with checksum. When disabled checksums will not be added to database blocks.")
+ 		},
+ 		&page_checksums,
+ 		true,
+ 		NULL, NULL, NULL
+ 	},
+ 	{
  		{"full_page_writes", PGC_SIGHUP, WAL_SETTINGS,
  			gettext_noop("Writes full pages to WAL when first modified after a checkpoint."),
  			gettext_noop("A page write in process during an operating system crash might be "
*** a/src/backend/utils/misc/postgresql.conf.sample
--- b/src/backend/utils/misc/postgresql.conf.sample
***************
*** 150,164 ****
  
  
  #------------------------------------------------------------------------------
! # WRITE AHEAD LOG
  #------------------------------------------------------------------------------
  
! # - Settings -
  
! #wal_level = minimal			# minimal, archive, or hot_standby
! 					# (change requires restart)
  #fsync = on				# turns forced synchronization on or off
  #synchronous_commit = on		# synchronization level; on, off, or local
  #wal_sync_method = fsync		# the default is the first option
  					# supported by the operating system:
  					#   open_datasync
--- 150,170 ----
  
  
  #------------------------------------------------------------------------------
! # WRITE AHEAD LOG & RELIABILITY
  #------------------------------------------------------------------------------
  
! # - Reliability -
  
! #page_checksums = off			# calculate checksum before database I/O
! #full_page_writes = on			# recover from partial page writes
  #fsync = on				# turns forced synchronization on or off
+ 
  #synchronous_commit = on		# synchronization level; on, off, or local
+ 
+ # - Write Ahead Log -
+ 
+ #wal_level = minimal			# minimal, archive, or hot_standby
+ 					# (change requires restart)
  #wal_sync_method = fsync		# the default is the first option
  					# supported by the operating system:
  					#   open_datasync
***************
*** 166,172 ****
  					#   fsync
  					#   fsync_writethrough
  					#   open_sync
- #full_page_writes = on			# recover from partial page writes
  #wal_buffers = -1			# min 32kB, -1 sets based on shared_buffers
  					# (change requires restart)
  #wal_writer_delay = 200ms		# 1-10000 milliseconds
--- 172,177 ----
*** a/src/include/access/xlog.h
--- b/src/include/access/xlog.h
***************
*** 272,277 **** extern int XLogFileInit(uint32 log, uint32 seg,
--- 272,278 ----
  			 bool *use_existent, bool use_lock);
  extern int	XLogFileOpen(uint32 log, uint32 seg);
  
+ extern XLogRecPtr XLogRecordHint(Buffer buffer);
  
  extern void XLogGetLastRemoved(uint32 *log, uint32 *seg);
  extern void XLogSetAsyncXactLSN(XLogRecPtr record);
*** a/src/include/catalog/pg_control.h
--- b/src/include/catalog/pg_control.h
***************
*** 61,67 **** typedef struct CheckPoint
  #define XLOG_BACKUP_END					0x50
  #define XLOG_PARAMETER_CHANGE			0x60
  #define XLOG_RESTORE_POINT				0x70
! #define XLOG_FPW_CHANGE				0x80
  
  
  /*
--- 61,68 ----
  #define XLOG_BACKUP_END					0x50
  #define XLOG_PARAMETER_CHANGE			0x60
  #define XLOG_RESTORE_POINT				0x70
! #define XLOG_FPW_CHANGE					0x80
! #define XLOG_HINT						0x90
  
  
  /*
*** a/src/include/catalog/storage.h
--- b/src/include/catalog/storage.h
***************
*** 36,41 **** extern void PostPrepare_smgr(void);
--- 36,42 ----
  
  extern void log_smgrcreate(RelFileNode *rnode, ForkNumber forkNum);
  
+ extern void smgr_hint(Buffer buffer);
  extern void smgr_redo(XLogRecPtr lsn, XLogRecord *record);
  extern void smgr_desc(StringInfo buf, uint8 xl_info, char *rec);
  
*** a/src/include/storage/bufpage.h
--- b/src/include/storage/bufpage.h
***************
*** 18,23 ****
--- 18,25 ----
  #include "storage/item.h"
  #include "storage/off.h"
  
+ extern bool page_checksums;
+ 
  /*
   * A postgres disk page is an abstraction layered on top of a postgres
   * disk block (which is simply a unit of i/o, see block.h).
***************
*** 93,99 **** typedef uint16 LocationIndex;
   *		pd_lower	- offset to start of free space.
   *		pd_upper	- offset to end of free space.
   *		pd_special	- offset to start of special space.
!  *		pd_pagesize_version - size in bytes and page layout version number.
   *		pd_prune_xid - oldest XID among potentially prunable tuples on page.
   *
   * The LSN is used by the buffer manager to enforce the basic rule of WAL:
--- 95,101 ----
   *		pd_lower	- offset to start of free space.
   *		pd_upper	- offset to end of free space.
   *		pd_special	- offset to start of special space.
!  *		pd_verify	- page verification information of different kinds
   *		pd_prune_xid - oldest XID among potentially prunable tuples on page.
   *
   * The LSN is used by the buffer manager to enforce the basic rule of WAL:
***************
*** 106,112 **** typedef uint16 LocationIndex;
   * pd_prune_xid is a hint field that helps determine whether pruning will be
   * useful.	It is currently unused in index pages.
   *
!  * The page version number and page size are packed together into a single
   * uint16 field.  This is for historical reasons: before PostgreSQL 7.3,
   * there was no concept of a page version number, and doing it this way
   * lets us pretend that pre-7.3 databases have page version number zero.
--- 108,115 ----
   * pd_prune_xid is a hint field that helps determine whether pruning will be
   * useful.	It is currently unused in index pages.
   *
!  * For verification we store either a 16 bit checksum or a watermark of
!  * the page version number and page size packed together into a single
   * uint16 field.  This is for historical reasons: before PostgreSQL 7.3,
   * there was no concept of a page version number, and doing it this way
   * lets us pretend that pre-7.3 databases have page version number zero.
***************
*** 130,136 **** typedef struct PageHeaderData
  	LocationIndex pd_lower;		/* offset to start of free space */
  	LocationIndex pd_upper;		/* offset to end of free space */
  	LocationIndex pd_special;	/* offset to start of special space */
! 	uint16		pd_pagesize_version;
  	TransactionId pd_prune_xid; /* oldest prunable XID, or zero if none */
  	ItemIdData	pd_linp[1];		/* beginning of line pointer array */
  } PageHeaderData;
--- 133,145 ----
  	LocationIndex pd_lower;		/* offset to start of free space */
  	LocationIndex pd_upper;		/* offset to end of free space */
  	LocationIndex pd_special;	/* offset to start of special space */
! 
! 	union
! 	{
! 		uint16		pd_pagesize_version;
! 		uint16		pd_checksum16;
! 	} pd_verify;				/* page verification data */
! 
  	TransactionId pd_prune_xid; /* oldest prunable XID, or zero if none */
  	ItemIdData	pd_linp[1];		/* beginning of line pointer array */
  } PageHeaderData;
***************
*** 155,161 **** typedef PageHeaderData *PageHeader;
  #define PD_ALL_VISIBLE		0x0004		/* all tuples on page are visible to
  										 * everyone */
  
! #define PD_VALID_FLAG_BITS	0x0007		/* OR of all valid pd_flags bits */
  
  /*
   * Page layout version number 0 is for pre-7.3 Postgres releases.
--- 164,205 ----
  #define PD_ALL_VISIBLE		0x0004		/* all tuples on page are visible to
  										 * everyone */
  
! #define PD_VALID_FLAG_BITS	0x400F		/* OR of all non-checksum pd_flags bits */
! 
! #define PD_CHECKSUM1		0x0008		/* First checksum bit - should be 1 */
! #define PD_CHECKSUM2		0x8000		/* Second checksum bit - should be 0 */
! 
! /*
!  * Page layout version number increment bits (still part of flags).
!  * Next flag would be called PD_PAGE_VERSION_PLUS2, PD_PAGE_VERSION_PLUS4 etc
!  */
! #define PD_PAGE_VERSION_PLUS1	0x4000
! 
! /*
!  * PD_SET_CHECKSUM is a bitmask of multiple flags
!  *
!  * When we wish to set a checksum we require exact settings for 3 bits.
!  *
!  * (1) First, we must set the version correctly, so we set all increment bits.
!  *
!  * (2) Next we set PD_CHECKSUM2 to be zero, the opposite setting of the increment
!  * bits. So any error that causes a run of multiple zeroes or ones to be written
!  * to the block will be detected.
!  *
!  * (3) Next we set PD_CHECKSUM1 to be one, to indicate the block has a checksum.
!  * A setting of just the increment bits but no checksum is version-legal but
!  * we call that an error also to reduce number of chances for error.
!  */
! #define PD_SET_CHECKSUM		0x4008		/* All 3 bits set correctly for checksum */
! 
! #define PageHasChecksum(page) \
! 	((((PageHeader) (page))->pd_flags & PD_SET_CHECKSUM) == PD_SET_CHECKSUM)
! 
! #define PageHasNoChecksum(page) \
! 	((((PageHeader) (page))->pd_flags & PD_SET_CHECKSUM) == 0)
! 
! #define PageHasChecksumFlagError(page) \
! 	(!PageHasChecksum(page) && !PageHasNoChecksum(page))
  
  /*
   * Page layout version number 0 is for pre-7.3 Postgres releases.
***************
*** 165,173 **** typedef PageHeaderData *PageHeader;
--- 209,238 ----
   * Release 8.3 uses 4; it changed the HeapTupleHeader layout again, and
   *		added the pd_flags field (by stealing some bits from pd_tli),
   *		as well as adding the pd_prune_xid field (which enlarges the header).
+  * Release 9.2 uses versions 4 and/or 5. Each page may only have one version,
+  * though different pages may have different versions. This is arranged
+  * deliberately to allow us to upgrade from one major version to another.
   */
  #define PG_PAGE_LAYOUT_VERSION		4
  
+ /*
+  * PageGetPageLayoutVersion
+  *		Returns the page layout version of a page.
+  *
+  * To calculate the page layout version we take the basic page layout (4)
+  * and increment that according to the incremental bits above. If further
+  * versions of the page are required AND those page versions must be
+  * compatible with v4 and v5 then we would do this by setting new
+  * incremental bits and adding them into the definition below.
+  * Note that this works for all page versions because we don't look at the
+  * actual data field if we have the version increment bits set.
+  * Not that any of the code actually uses this, but someone might.
+  */
+ #define PageGetPageLayoutVersion(page) \
+ 	((((PageHeader) (page))->pd_flags & PD_PAGE_VERSION_PLUS1) == PD_PAGE_VERSION_PLUS1 ? \
+ 			PG_PAGE_LAYOUT_VERSION + 1 : \
+ 			((PageHeader) (page))->pd_verify.pd_pagesize_version)
+ 
  
  /* ----------------------------------------------------------------
   *						page support macros
***************
*** 231,249 **** typedef PageHeaderData *PageHeader;
   * PageGetPageSize
   *		Returns the page size of a page.
   *
!  * this can only be called on a formatted page (unlike
!  * BufferGetPageSize, which can be called on an unformatted page).
!  * however, it can be called on a page that is not stored in a buffer.
   */
! #define PageGetPageSize(page) \
! 	((Size) (((PageHeader) (page))->pd_pagesize_version & (uint16) 0xFF00))
! 
! /*
!  * PageGetPageLayoutVersion
!  *		Returns the page layout version of a page.
!  */
! #define PageGetPageLayoutVersion(page) \
! 	(((PageHeader) (page))->pd_pagesize_version & 0x00FF)
  
  /*
   * PageSetPageSizeAndVersion
--- 296,308 ----
   * PageGetPageSize
   *		Returns the page size of a page.
   *
!  * Since PageSizeIsValid() when pagesize == BLCKSZ, just written BLCKSZ.
!  * This can be called on any page, initialised or not, in or out of buffers.
!  * You might think this can vary at runtime but you'd be wrong, since pages
!  * frequently need to occupy buffers and pages are copied from one to another
!  * so there are many hidden assumptions that this simple definition is true.
   */
! #define PageGetPageSize(page) (BLCKSZ)
  
  /*
   * PageSetPageSizeAndVersion
***************
*** 251,262 **** typedef PageHeaderData *PageHeader;
   *
   * We could support setting these two values separately, but there's
   * no real need for it at the moment.
   */
  #define PageSetPageSizeAndVersion(page, size, version) \
  ( \
  	AssertMacro(((size) & 0xFF00) == (size)), \
  	AssertMacro(((version) & 0x00FF) == (version)), \
! 	((PageHeader) (page))->pd_pagesize_version = (size) | (version) \
  )
  
  /* ----------------
--- 310,323 ----
   *
   * We could support setting these two values separately, but there's
   * no real need for it at the moment.
+  *
+  * Must not be used on a page that is flagged for checksums.
   */
  #define PageSetPageSizeAndVersion(page, size, version) \
  ( \
  	AssertMacro(((size) & 0xFF00) == (size)), \
  	AssertMacro(((version) & 0x00FF) == (version)), \
! 	((PageHeader) (page))->pd_verify.pd_pagesize_version = (size) | (version) \
  )
  
  /* ----------------
***************
*** 368,374 **** do { \
   */
  
  extern void PageInit(Page page, Size pageSize, Size specialSize);
! extern bool PageHeaderIsValid(PageHeader page);
  extern OffsetNumber PageAddItem(Page page, Item item, Size size,
  			OffsetNumber offsetNumber, bool overwrite, bool is_heap);
  extern Page PageGetTempPage(Page page);
--- 429,435 ----
   */
  
  extern void PageInit(Page page, Size pageSize, Size specialSize);
! extern bool PageIsVerified(Page page);
  extern OffsetNumber PageAddItem(Page page, Item item, Size size,
  			OffsetNumber offsetNumber, bool overwrite, bool is_heap);
  extern Page PageGetTempPage(Page page);
***************
*** 381,385 **** extern Size PageGetExactFreeSpace(Page page);
--- 442,448 ----
  extern Size PageGetHeapFreeSpace(Page page);
  extern void PageIndexTupleDelete(Page page, OffsetNumber offset);
  extern void PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems);
+ extern char *PageSetVerificationInfoOnCopy(Page page);
+ extern void PageSetVerificationInfoInplace(Page page);
  
  #endif   /* BUFPAGE_H */
*** a/src/include/storage/proc.h
--- b/src/include/storage/proc.h
***************
*** 168,174 **** typedef struct PGXACT
  
  	uint8		vacuumFlags;	/* vacuum-related flags, see above */
  	bool		overflowed;
! 	bool		inCommit;		/* true if within commit critical section */
  
  	uint8		nxids;
  } PGXACT;
--- 168,175 ----
  
  	uint8		vacuumFlags;	/* vacuum-related flags, see above */
  	bool		overflowed;
! 	bool		delayChkpt; 	/* true if this proc delays checkpoint start */
! 								/* previously called InCommit */
  
  	uint8		nxids;
  } PGXACT;
*** a/src/include/storage/procarray.h
--- b/src/include/storage/procarray.h
***************
*** 52,59 **** extern bool TransactionIdIsActive(TransactionId xid);
  extern TransactionId GetOldestXmin(bool allDbs, bool ignoreVacuum);
  extern TransactionId GetOldestActiveTransactionId(void);
  
! extern int	GetTransactionsInCommit(TransactionId **xids_p);
! extern bool HaveTransactionsInCommit(TransactionId *xids, int nxids);
  
  extern PGPROC *BackendPidGetProc(int pid);
  extern int	BackendXidGetPid(TransactionId xid);
--- 52,59 ----
  extern TransactionId GetOldestXmin(bool allDbs, bool ignoreVacuum);
  extern TransactionId GetOldestActiveTransactionId(void);
  
! extern VirtualTransactionId *GetVirtualXIDsDelayingChkpt(int *nvxids);
! extern bool HaveVirtualXIDsDelayingChkpt(VirtualTransactionId *vxids, int nvxids);
  
  extern PGPROC *BackendPidGetProc(int pid);
  extern int	BackendXidGetPid(TransactionId xid);
