diff --git a/contrib/pageinspect/rawpage.c b/contrib/pageinspect/rawpage.c
index f51a4e3..a857a13 100644
--- a/contrib/pageinspect/rawpage.c
+++ b/contrib/pageinspect/rawpage.c
@@ -214,7 +214,7 @@ page_header(PG_FUNCTION_ARGS)
 	values[3] = UInt16GetDatum(page->pd_lower);
 	values[4] = UInt16GetDatum(page->pd_upper);
 	values[5] = UInt16GetDatum(page->pd_special);
-	values[6] = UInt16GetDatum(PageGetPageSize(page));
+	values[6] = UInt16GetDatum(PageGetPageSizeField(page));
 	values[7] = UInt16GetDatum(PageGetPageLayoutVersion(page));
 	values[8] = TransactionIdGetDatum(page->pd_prune_xid);
 
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 0ea9aeb..85795ff 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -1712,6 +1712,77 @@ SET ENABLE_SEQSCAN TO OFF;
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-page-checksums" xreflabel="page_checksums">
+      <indexterm>
+       <primary><varname>page_checksums</> configuration parameter</primary>
+      </indexterm>
+      <term><varname>page_checksums</varname> (<type>boolean</type>)</term>
+      <listitem>
+       <para>
+        When this parameter is on, the <productname>PostgreSQL</> server
+        calculates checksums when it writes main database pages to disk,
+        flagging the page as checksum protected.
+        When pages are read into shared buffers any page flagged with a
+        checksum has the checksum re-calculated and compared against the
+        stored value to provide greatly improved validation of page contents.
+        The checksum uses 16-bit checksums, using the fast Fletcher 16 algorithm.
+       </para>
+
+       <para>
+        When this parameter is off we write only the page version number and
+        blocksize as a standard watermark. If we read in a block that has a
+        checksum yet <varname>page_checksums</varname> is disabled we do not
+        verify the checksum value. Such pages will be reset to the standard
+        watermark if they are rewritten for any reason.
+        The database may thus contain a mix of pages with checksums and pages
+        without checksums.
+       </para>
+
+       <para>
+        The checksum field on each database page is only set on write, which
+        allows smooth and easy upgrade and transition when enabling
+        <varname>page_checksums</varname>. This means that the database will
+        contain a mix of pages with checksums and pages without checksums.
+        Use <command>VACUUM FREEZE</> with <varname>vacuum_freeze_table_age</>
+        set to zero if you wish to force checksums onto many datablocks,
+        or use <command>VACUUM FULL</> for a more aggressive rewriting of
+        datablocks. This should not be necessary however. A more natural and
+        gradual process of rewriting blocks with checksums will occur naturally
+        as writes occur on the database.
+       </para>
+
+       <para>
+        Writes via temp_buffers are not checksummed at any time.
+       </para>
+
+       <para>
+        Turning this parameter off reduces the CPU overhead of reading/writing
+        data from disk though might allow data corruption to go unnoticed.
+        With this parameter enabled there is still a non-zero probability that
+        an error would go undetected. Turning this parameter on will have a
+        CPU cost, as well as a slightly higher WAL overhead in some cases,
+        depending upon the workload.
+       </para>
+
+       <para>
+        The checksum uses 16-bit checksums, using the fast Fletcher 16 algorithm.
+        Later releases may increase the size of the checksum field and offer
+        alternative checksum or CRC algorithms.
+       </para>
+
+       <para>
+        The default is <literal>off</> for backwards compatibility and
+        to allow upgrade. The recommended setting is <literal>on</> though
+        this should not be enabled until upgrade is successfully complete
+        with full set of new backups.
+       </para>
+
+       <para>
+        This parameter can only be set at server start.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-wal-buffers" xreflabel="wal_buffers">
       <term><varname>wal_buffers</varname> (<type>integer</type>)</term>
       <indexterm>
diff --git a/src/backend/access/hash/hashpage.c b/src/backend/access/hash/hashpage.c
index 6b647a8..0bec9a9 100644
--- a/src/backend/access/hash/hashpage.c
+++ b/src/backend/access/hash/hashpage.c
@@ -730,6 +730,7 @@ _hash_alloc_buckets(Relation rel, BlockNumber firstblock, uint32 nblocks)
 	MemSet(zerobuf, 0, sizeof(zerobuf));
 
 	RelationOpenSmgr(rel);
+	/* no need to set page verification info for all zero pages */
 	smgrextend(rel->rd_smgr, MAIN_FORKNUM, lastblock, zerobuf, false);
 
 	return true;
diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index 31b2b67..0668f8f 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -266,6 +266,8 @@ end_heap_rewrite(RewriteState state)
 	/* Write the last page, if any */
 	if (state->rs_buffer_valid)
 	{
+		PageSetVerificationInfoInplace(state->rs_buffer);
+
 		if (state->rs_use_wal)
 			log_newpage(&state->rs_new_rel->rd_node,
 						MAIN_FORKNUM,
@@ -611,6 +613,8 @@ raw_heap_insert(RewriteState state, HeapTuple tup)
 		{
 			/* Doesn't fit, so write out the existing page */
 
+			PageSetVerificationInfoInplace(page);
+
 			/* XLOG stuff */
 			if (state->rs_use_wal)
 				log_newpage(&state->rs_new_rel->rd_node,
diff --git a/src/backend/access/heap/visibilitymap.c b/src/backend/access/heap/visibilitymap.c
index 6505e76..7da69d5 100644
--- a/src/backend/access/heap/visibilitymap.c
+++ b/src/backend/access/heap/visibilitymap.c
@@ -568,6 +568,7 @@ vm_extend(Relation rel, BlockNumber vm_nblocks)
 	/* Now extend the file */
 	while (vm_nblocks_now < vm_nblocks)
 	{
+		/* no need to set page verification info for all zero pages */
 		smgrextend(rel->rd_smgr, VISIBILITYMAP_FORKNUM, vm_nblocks_now,
 				   (char *) pg, false);
 		vm_nblocks_now++;
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index 7627738..1cb4cda 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -216,6 +216,7 @@ btbuildempty(PG_FUNCTION_ARGS)
 	_bt_initmetapage(metapage, P_NONE, 0);
 
 	/* Write the page.	If archiving/streaming, XLOG it. */
+	PageSetVerificationInfoInplace(metapage);
 	smgrwrite(index->rd_smgr, INIT_FORKNUM, BTREE_METAPAGE,
 			  (char *) metapage, true);
 	if (XLogIsNeeded())
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index 9aa3a13..59f086d 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -288,12 +288,15 @@ _bt_blwritepage(BTWriteState *wstate, Page page, BlockNumber blkno)
 	{
 		if (!wstate->btws_zeropage)
 			wstate->btws_zeropage = (Page) palloc0(BLCKSZ);
+		/* no need to set page verification info for all zero pages */
 		smgrextend(wstate->index->rd_smgr, MAIN_FORKNUM,
 				   wstate->btws_pages_written++,
 				   (char *) wstate->btws_zeropage,
 				   true);
 	}
 
+	PageSetVerificationInfoInplace(page);
+
 	/*
 	 * Now write the page.	There's no need for smgr to schedule an fsync for
 	 * this write; we'll do it ourselves before ending the build.
diff --git a/src/backend/access/spgist/spginsert.c b/src/backend/access/spgist/spginsert.c
index cbcf655..bdd6692 100644
--- a/src/backend/access/spgist/spginsert.c
+++ b/src/backend/access/spgist/spginsert.c
@@ -150,6 +150,7 @@ spgbuildempty(PG_FUNCTION_ARGS)
 	SpGistInitMetapage(page);
 
 	/* Write the page.	If archiving/streaming, XLOG it. */
+	PageSetVerificationInfoInplace(page);
 	smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_METAPAGE_BLKNO,
 			  (char *) page, true);
 	if (XLogIsNeeded())
@@ -159,6 +160,7 @@ spgbuildempty(PG_FUNCTION_ARGS)
 	/* Likewise for the root page. */
 	SpGistInitPage(page, SPGIST_LEAF);
 
+	PageSetVerificationInfoInplace(page);
 	smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_HEAD_BLKNO,
 			  (char *) page, true);
 	if (XLogIsNeeded())
diff --git a/src/backend/access/transam/README b/src/backend/access/transam/README
index de72f71..b415d3c 100644
--- a/src/backend/access/transam/README
+++ b/src/backend/access/transam/README
@@ -527,6 +527,14 @@ associated with the n'th distinct buffer ID seen in the "rdata" array, and
 per the above discussion, fully-rewritable buffers shouldn't be mentioned in
 "rdata".)
 
+Note that we must only use PageSetLSN/PageGetLSN() when we know the action
+is serialised. Only Startup process may modify data blocks during recovery,
+so Startup process may execute PageGetLSN() without fear of serialisation
+problems. All other processes must only call PageSet/GetLSN when holding
+either an exclusive buffer lock or a shared lock plus buffer header lock,
+or be writing the data block directly rather than through shared buffers
+while holding AccessExclusiveLock on the relation.
+
 Due to all these constraints, complex changes (such as a multilevel index
 insertion) normally need to be described by a series of atomic-action WAL
 records.  What do you do if the intermediate states are not self-consistent?
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 3d08e92..fca48bf 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -320,7 +320,7 @@ MarkAsPreparing(TransactionId xid, const char *gid,
 	proc->lxid = (LocalTransactionId) xid;
 	pgxact->xid = xid;
 	pgxact->xmin = InvalidTransactionId;
-	pgxact->inCommit = false;
+	pgxact->delayChkpt = false;
 	pgxact->vacuumFlags = 0;
 	proc->pid = 0;
 	proc->backendId = InvalidBackendId;
@@ -1028,18 +1028,18 @@ EndPrepare(GlobalTransaction gxact)
 	 * odds of a PANIC actually occurring should be very tiny given that we
 	 * were able to write the bogus CRC above.
 	 *
-	 * We have to set inCommit here, too; otherwise a checkpoint starting
+	 * We have to set delayChkpt here, too; otherwise a checkpoint starting
 	 * immediately after the WAL record is inserted could complete without
 	 * fsync'ing our state file.  (This is essentially the same kind of race
 	 * condition as the COMMIT-to-clog-write case that RecordTransactionCommit
-	 * uses inCommit for; see notes there.)
+	 * uses delayChkpt for; see notes there.)
 	 *
 	 * We save the PREPARE record's location in the gxact for later use by
 	 * CheckPointTwoPhase.
 	 */
 	START_CRIT_SECTION();
 
-	MyPgXact->inCommit = true;
+	MyPgXact->delayChkpt = true;
 
 	gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE,
 									records.head);
@@ -1087,7 +1087,7 @@ EndPrepare(GlobalTransaction gxact)
 	 * checkpoint starting after this will certainly see the gxact as a
 	 * candidate for fsyncing.
 	 */
-	MyPgXact->inCommit = false;
+	MyPgXact->delayChkpt = false;
 
 	END_CRIT_SECTION();
 
@@ -1977,7 +1977,7 @@ RecoverPreparedTransactions(void)
  *	RecordTransactionCommitPrepared
  *
  * This is basically the same as RecordTransactionCommit: in particular,
- * we must set the inCommit flag to avoid a race condition.
+ * we must set the delayChkpt flag to avoid a race condition.
  *
  * We know the transaction made at least one XLOG entry (its PREPARE),
  * so it is never possible to optimize out the commit record.
@@ -2000,7 +2000,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
 	START_CRIT_SECTION();
 
 	/* See notes in RecordTransactionCommit */
-	MyPgXact->inCommit = true;
+	MyPgXact->delayChkpt = true;
 
 	/* Emit the XLOG commit record */
 	xlrec.xid = xid;
@@ -2065,7 +2065,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
 	TransactionIdCommitTree(xid, nchildren, children);
 
 	/* Checkpoint can proceed now */
-	MyPgXact->inCommit = false;
+	MyPgXact->delayChkpt = false;
 
 	END_CRIT_SECTION();
 
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index e22bdac..8d7bf3a 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -975,13 +975,13 @@ RecordTransactionCommit(void)
 		 * RecordTransactionAbort.	That's because loss of a transaction abort
 		 * is noncritical; the presumption would be that it aborted, anyway.
 		 *
-		 * It's safe to change the inCommit flag of our own backend without
+		 * It's safe to change the delayChkpt flag of our own backend without
 		 * holding the ProcArrayLock, since we're the only one modifying it.
-		 * This makes checkpoint's determination of which xacts are inCommit a
+		 * This makes checkpoint's determination of which xacts are delayChkpt a
 		 * bit fuzzy, but it doesn't matter.
 		 */
 		START_CRIT_SECTION();
-		MyPgXact->inCommit = true;
+		MyPgXact->delayChkpt = true;
 
 		SetCurrentTransactionStopTimestamp();
 
@@ -1155,7 +1155,7 @@ RecordTransactionCommit(void)
 	 */
 	if (markXidCommitted)
 	{
-		MyPgXact->inCommit = false;
+		MyPgXact->delayChkpt = false;
 		END_CRIT_SECTION();
 	}
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 266c0de..8f1c97d 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -733,6 +733,7 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
 	bool		doPageWrites;
 	bool		isLogSwitch = false;
 	bool		fpwChange = false;
+	bool		isHint = false;
 	uint8		info_orig = info;
 
 	/* cross-check on whether we should be here or not */
@@ -760,6 +761,10 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
 				fpwChange = true;
 				break;
 
+			case XLOG_HINT:
+				isHint = true;
+				break;
+
 			default:
 				break;
 		}
@@ -998,6 +1003,18 @@ begin:;
 	}
 
 	/*
+	 * If this is a hint record and we don't need a backup block then
+	 * we have no more work to do and can exit quickly without inserting
+	 * a WAL record at all. In that case return InvalidXLogRecPtr.
+	 */
+	if (isHint && !(info & XLR_BKP_BLOCK_MASK))
+	{
+		LWLockRelease(WALInsertLock);
+		END_CRIT_SECTION();
+		return InvalidXLogRecPtr;
+	}
+
+	/*
 	 * If there isn't enough space on the current XLOG page for a record
 	 * header, advance to the next page (leaving the unused space as zeroes).
 	 */
@@ -1280,7 +1297,8 @@ XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
 	/*
 	 * XXX We assume page LSN is first data on *every* page that can be passed
 	 * to XLogInsert, whether it otherwise has the standard page layout or
-	 * not.
+	 * not. We don't need the buffer header lock for PageGetLSN because we
+	 * have exclusive lock on the page and/or the relation.
 	 */
 	*lsn = PageGetLSN(page);
 
@@ -3724,6 +3742,12 @@ RestoreBkpBlocks(XLogRecPtr lsn, XLogRecord *record, bool cleanup)
 				   BLCKSZ - (bkpb.hole_offset + bkpb.hole_length));
 		}
 
+		/*
+		 * Any checksum set on this page will be invalid. We don't need
+		 * to reset it here since it will be reset before being written
+		 * and changing data when we don't have a cleanup lock is bad juju.
+		 */
+
 		PageSetLSN(page, lsn);
 		PageSetTLI(page, ThisTimeLineID);
 		MarkBufferDirty(buffer);
@@ -7664,8 +7688,8 @@ CreateCheckPoint(int flags)
 	uint32		redo_logSeg;
 	uint32		insert_logId;
 	uint32		insert_logSeg;
-	TransactionId *inCommitXids;
-	int			nInCommit;
+	VirtualTransactionId *vxids;
+	int	nvxids;
 
 	/*
 	 * An end-of-recovery checkpoint is really a shutdown checkpoint, just
@@ -7839,7 +7863,7 @@ CreateCheckPoint(int flags)
 
 	/*
 	 * Before flushing data, we must wait for any transactions that are
-	 * currently in their commit critical sections.  If an xact inserted its
+	 * currently in commit or hint critical sections.  If an xact inserted its
 	 * commit record into XLOG just before the REDO point, then a crash
 	 * restart from the REDO point would not replay that record, which means
 	 * that our flushing had better include the xact's update of pg_clog.  So
@@ -7855,21 +7879,32 @@ CreateCheckPoint(int flags)
 	 * protected by different locks, but again that seems best on grounds of
 	 * minimizing lock contention.)
 	 *
-	 * A transaction that has not yet set inCommit when we look cannot be at
+	 * A transaction that has not yet set delayChkpt when we look cannot be at
 	 * risk, since he's not inserted his commit record yet; and one that's
 	 * already cleared it is not at risk either, since he's done fixing clog
 	 * and we will correctly flush the update below.  So we cannot miss any
 	 * xacts we need to wait for.
 	 */
-	nInCommit = GetTransactionsInCommit(&inCommitXids);
-	if (nInCommit > 0)
+	vxids = GetVirtualXIDsDelayingChkpt(&nvxids);
+	if (nvxids > 0)
 	{
+		uint	nwaits = 0;
+
 		do
 		{
 			pg_usleep(10000L);	/* wait for 10 msec */
-		} while (HaveTransactionsInCommit(inCommitXids, nInCommit));
+			nwaits++;
+		} while (HaveVirtualXIDsDelayingChkpt(vxids, nvxids));
+
+		/*
+		 * We can afford to wait a while before we start getting bothered
+		 * about the total waiting time. Anything over 1 sec seems worth
+		 * reporting but we aren't expecting to see even this.
+		 */
+		if (nwaits > 100)
+			elog(LOG, "checkpoint start delayed by %u ms", (10 * nwaits));
 	}
-	pfree(inCommitXids);
+	pfree(vxids);
 
 	/*
 	 * Get the other info we need for the checkpoint record.
@@ -8454,6 +8489,51 @@ XLogRestorePoint(const char *rpName)
 }
 
 /*
+ * Write a backup block if needed when we are setting a hint. Note that
+ * this may be called for a variety of page types, not just heaps.
+ *
+ * Deciding the "if needed" part is delicate and requires us to either
+ * grab WALInsertLock or check the info_lck spinlock. If we check the
+ * spinlock and it says Yes then we will need to get WALInsertLock as well,
+ * so the design choice here is to just go straight for the WALInsertLock
+ * and trust that calls to this function are minimised elsewhere.
+ *
+ * Callable while holding just share lock on the buffer content.
+ *
+ * Possible that multiple concurrent backends could attempt to write
+ * WAL records. In that case, more than one backup block may be recorded
+ * though that isn't important to the outcome and the backup blocks are
+ * likely to be identical anyway.
+ */
+#define	XLOG_HINT_WATERMARK		13579
+XLogRecPtr
+XLogSaveBufferForHint(Buffer buffer)
+{
+	/*
+	 * Make an XLOG entry reporting the hint
+	 */
+	XLogRecData rdata[2];
+	int			watermark = XLOG_HINT_WATERMARK;
+
+	/*
+	 * Not allowed to have zero-length records, so use a small watermark
+	 */
+	rdata[0].data = (char *) (&watermark);
+	rdata[0].len = sizeof(int);
+	rdata[0].buffer = InvalidBuffer;
+	rdata[0].buffer_std = false;
+	rdata[0].next = &(rdata[1]);
+
+	rdata[1].data = NULL;
+	rdata[1].len = 0;
+	rdata[1].buffer = buffer;
+	rdata[1].buffer_std = true;
+	rdata[1].next = NULL;
+
+	return XLogInsert(RM_XLOG_ID, XLOG_HINT, rdata);
+}
+
+/*
  * Check if any of the GUC parameters that are critical for hot standby
  * have changed, and update the value in pg_control file if necessary.
  */
@@ -8551,8 +8631,8 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
 {
 	uint8		info = record->xl_info & ~XLR_INFO_MASK;
 
-	/* Backup blocks are not used in xlog records */
-	Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+	/* Backup blocks are not used in most xlog records */
+	Assert(info == XLOG_HINT || !(record->xl_info & XLR_BKP_BLOCK_MASK));
 
 	if (info == XLOG_NEXTOID)
 	{
@@ -8704,6 +8784,32 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
 	{
 		/* nothing to do here */
 	}
+	else if (info == XLOG_HINT)
+	{
+		int	*watermark = (int *) XLogRecGetData(record);
+
+		/* Check the watermark is correct for the hint record */
+		Assert(*watermark == XLOG_HINT_WATERMARK);
+
+		/* Backup blocks must be present for smgr hint records */
+		Assert(record->xl_info & XLR_BKP_BLOCK_MASK);
+
+		/*
+		 * Hint records have no information that needs to be replayed.
+		 * The sole purpose of them is to ensure that a hint bit does
+		 * not cause a checksum invalidation if a hint bit write should
+		 * cause a torn page. So the body of the record is empty but
+		 * there must be one backup block.
+		 *
+		 * Since the only change in the backup block is a hint bit,
+		 * there is no confict with Hot Standby.
+		 *
+		 * This also means there is no corresponding API call for this,
+		 * so an smgr implementation has no need to implement anything.
+		 * Which means nothing is needed in md.c etc
+		 */
+		RestoreBkpBlocks(lsn, record, false);
+	}
 	else if (info == XLOG_BACKUP_END)
 	{
 		XLogRecPtr	startpoint;
@@ -8839,6 +8945,10 @@ xlog_desc(StringInfo buf, uint8 xl_info, char *rec)
 		appendStringInfo(buf, "restore point: %s", xlrec->rp_name);
 
 	}
+	else if (info == XLOG_HINT)
+	{
+		appendStringInfo(buf, "page hint");
+	}
 	else if (info == XLOG_BACKUP_END)
 	{
 		XLogRecPtr	startpoint;
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 07dc326..2c957b8 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -8537,6 +8537,8 @@ copy_relation_data(SMgrRelation src, SMgrRelation dst,
 
 		smgrread(src, forkNum, blkno, buf);
 
+		PageSetVerificationInfoInplace(page);
+
 		/* XLOG stuff */
 		if (use_wal)
 			log_newpage(&dst->smgr_rnode.node, forkNum, blkno, page);
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 1adb6d3..fe104d8 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -34,6 +34,7 @@
 #include <unistd.h>
 
 #include "catalog/catalog.h"
+#include "catalog/storage.h"
 #include "executor/instrument.h"
 #include "miscadmin.h"
 #include "pg_trace.h"
@@ -440,7 +441,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 			smgrread(smgr, forkNum, blockNum, (char *) bufBlock);
 
 			/* check for garbage data */
-			if (!PageHeaderIsValid((PageHeader) bufBlock))
+			if (!PageIsVerified((Page) bufBlock))
 			{
 				if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages)
 				{
@@ -635,14 +636,23 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 				 * victim.	We need lock to inspect the page LSN, so this
 				 * can't be done inside StrategyGetBuffer.
 				 */
-				if (strategy != NULL &&
-					XLogNeedsFlush(BufferGetLSN(buf)) &&
-					StrategyRejectBuffer(strategy, buf))
+				if (strategy != NULL)
 				{
-					/* Drop lock/pin and loop around for another buffer */
-					LWLockRelease(buf->content_lock);
-					UnpinBuffer(buf, true);
-					continue;
+					XLogRecPtr	lsn;
+
+					/* Read the LSN while holding buffer header lock */
+					LockBufHdr(buf);
+					lsn = BufferGetLSN(buf);
+					UnlockBufHdr(buf);
+
+					if (XLogNeedsFlush(lsn) &&
+						StrategyRejectBuffer(strategy, buf))
+					{
+						/* Drop lock/pin and loop around for another buffer */
+						LWLockRelease(buf->content_lock);
+						UnpinBuffer(buf, true);
+						continue;
+					}
 				}
 
 				/* OK, do the I/O */
@@ -1873,6 +1883,8 @@ FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln)
 {
 	XLogRecPtr	recptr;
 	ErrorContextCallback errcontext;
+	Block		bufBlock;
+	char		*bufToWrite;
 
 	/*
 	 * Acquire the buffer's io_in_progress lock.  If StartBufferIO returns
@@ -1901,12 +1913,23 @@ FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln)
 										reln->smgr_rnode.node.dbNode,
 										reln->smgr_rnode.node.relNode);
 
+	LockBufHdr(buf);
+
+	/*
+	 * Run PageGetLSN while holding header lock, since we don't have the
+	 * buffer locked exclusively in all cases.
+	 */
+	recptr = BufferGetLSN(buf);
+
+	/* To check if block content changes while flushing. - vadim 01/17/97 */
+	buf->flags &= ~BM_JUST_DIRTIED;
+	UnlockBufHdr(buf);
+
 	/*
 	 * Force XLOG flush up to buffer's LSN.  This implements the basic WAL
 	 * rule that log updates must hit disk before any of the data-file changes
 	 * they describe do.
 	 */
-	recptr = BufferGetLSN(buf);
 	XLogFlush(recptr);
 
 	/*
@@ -1915,15 +1938,17 @@ FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln)
 	 * we have the io_in_progress lock.
 	 */
 
-	/* To check if block content changes while flushing. - vadim 01/17/97 */
-	LockBufHdr(buf);
-	buf->flags &= ~BM_JUST_DIRTIED;
-	UnlockBufHdr(buf);
+	bufBlock = BufHdrGetBlock(buf);
+
+	bufToWrite = PageSetVerificationInfoOnCopy((Page) bufBlock);
 
+	/*
+	 * bufToWrite is either the shared buffer or a copy, as appropriate.
+	 */
 	smgrwrite(reln,
 			  buf->tag.forkNum,
 			  buf->tag.blockNum,
-			  (char *) BufHdrGetBlock(buf),
+			  bufToWrite,
 			  false);
 
 	pgBufferUsage.shared_blks_written++;
@@ -1934,6 +1959,8 @@ FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln)
 	 */
 	TerminateBufferIO(buf, true, 0);
 
+	/* XXX Assert(buf is not BM_JUST_DIRTIED) */
+
 	TRACE_POSTGRESQL_BUFFER_FLUSH_DONE(buf->tag.forkNum,
 									   buf->tag.blockNum,
 									   reln->smgr_rnode.node.spcNode,
@@ -2326,6 +2353,7 @@ void
 SetBufferCommitInfoNeedsSave(Buffer buffer)
 {
 	volatile BufferDesc *bufHdr;
+	Page 	page = BufferGetPage(buffer);
 
 	if (!BufferIsValid(buffer))
 		elog(ERROR, "bad buffer ID: %d", buffer);
@@ -2354,15 +2382,105 @@ SetBufferCommitInfoNeedsSave(Buffer buffer)
 	if ((bufHdr->flags & (BM_DIRTY | BM_JUST_DIRTIED)) !=
 		(BM_DIRTY | BM_JUST_DIRTIED))
 	{
+		XLogRecPtr	lsn = {0, 0};
 		bool		dirtied = false;
+		bool		delayChkpt = false;
+
+		/*
+		 * If we are currently writing checksums, or if the page has a
+		 * checksum set then we must check to see if a full page image is
+		 * required to protect against torn pages.
+		 *
+		 * If page_checksums is set then we must write a new checksum
+		 * for any change, even a hint. If we write a checksum and we crash
+		 * it could leave torn pages on disk. So to protect against that
+		 * a hint must be written if this is the first change on the block
+		 * since the last checkpoint.
+		 *
+		 * Even if page_checksums is off, we may also need to write a full
+		 * page image. This protects against events that can occur if
+		 * page_checksums is turned on and off and then on again. In that case,
+		 * if a page has a checksum set and then we write a page
+		 * while page_checksums = off a torn page could result in a page
+		 * that has a checksums set and yet not match the actual page.
+		 * So even if page_checksums = off we write a full page image if the
+		 * page is marked as having a checksum because we are going
+		 * to reset the checksum.
+		 *
+		 * We don't check full_page_writes here because that can be set
+		 * on dynamically during a backup, so even if the usual value is
+		 * off we may still need to use a full page image.
+		 */
+		if (page_checksums || !PageHasNoChecksum(page))
+		{
+			/*
+			 * If we're in recovery we cannot dirty a page because of a hint.
+			 * We can set the hint, just not dirty the page as a result so
+			 * the hint is lost when we evict the page or shutdown.
+			 *
+			 * See long discussion in bufpage.c
+			 */
+			if (RecoveryInProgress())
+				return;
+
+			/*
+			 * If the block is already dirty because we either made a change
+			 * or set a hint already, then we don't need to write a full page
+			 * image.  Note that aggressive cleaning of blocks
+			 * dirtied by hint bit setting would increase the call rate.
+			 * Bulk setting of hint bits would reduce the call rate...
+			 *
+			 * We must issue the WAL record before we mark the buffer dirty.
+			 * Otherwise we might write the page before we write the WAL.
+			 * That causes a race condition, since a checkpoint might occur
+			 * between writing the WAL record and marking the buffer dirty.
+			 * We solve that with a kluge, but one that is already in use
+			 * during transaction commit to prevent race conditions.
+			 * Basically, we simply prevent the checkpoint WAL record from
+			 * being written until we have marked the buffer dirty. We don't
+			 * start the checkpoint flush until we have marked dirty, so our
+			 * checkpoint must flush the change to disk successfully or the
+			 * checkpoint never gets written, so crash recovery will fix.
+			 *
+			 * It's possible we may enter here without an xid, so it is
+			 * essential that CreateCheckpoint waits for virtual transactions
+			 * rather than full transactionids.
+			 */
+			MyPgXact->delayChkpt = delayChkpt = true;
+			lsn = XLogSaveBufferForHint(buffer);
+		}
 
 		LockBufHdr(bufHdr);
 		Assert(bufHdr->refcount > 0);
 		if (!(bufHdr->flags & BM_DIRTY))
-			dirtied = true;
+		{
+			dirtied = true;		/* Means "will be dirtied by this action" */
+
+			/*
+			 * Set the page LSN if we wrote a backup block. We aren't
+			 * supposed to set this when only holding a share lock but
+			 * as long as we serialise it somehow we're OK. We choose to
+			 * set LSN while holding the buffer header lock, which causes
+			 * any reader of an LSN who holds only a share lock to also
+			 * obtain a buffer header lock before using PageGetLSN().
+			 * Fortunately, thats not too many places.
+			 *
+			 * If !page_checksums and PageHasChecksum() you might think
+			 * we should reset the checksum here. That will happen when
+			 * the page is written sometime later in this checkpoint cycle.
+			 */
+			if (!XLogRecPtrIsInvalid(lsn))
+			{
+				PageSetLSN(page, lsn);
+				PageSetTLI(page, ThisTimeLineID);
+			}
+		}
 		bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
 		UnlockBufHdr(bufHdr);
 
+		if (delayChkpt)
+			MyPgXact->delayChkpt = false;
+
 		if (dirtied)
 		{
 			VacuumPageDirty++;
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
index 096d36a..a220310 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -200,6 +200,8 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
 		/* Find smgr relation for buffer */
 		oreln = smgropen(bufHdr->tag.rnode, MyBackendId);
 
+		/* XXX do we want to write checksums for local buffers? An option? */
+
 		/* And write... */
 		smgrwrite(oreln,
 				  bufHdr->tag.forkNum,
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 09b7311..be70240 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -398,7 +398,7 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
 		pgxact->xmin = InvalidTransactionId;
 		/* must be cleared with xid/xmin: */
 		pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
-		pgxact->inCommit = false; /* be sure this is cleared in abort */
+		pgxact->delayChkpt = false; /* be sure this is cleared in abort */
 		proc->recoveryConflictPending = false;
 
 		/* Clear the subtransaction-XID cache too while holding the lock */
@@ -425,7 +425,7 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
 		pgxact->xmin = InvalidTransactionId;
 		/* must be cleared with xid/xmin: */
 		pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
-		pgxact->inCommit = false; /* be sure this is cleared in abort */
+		pgxact->delayChkpt = false; /* be sure this is cleared in abort */
 		proc->recoveryConflictPending = false;
 
 		Assert(pgxact->nxids == 0);
@@ -460,7 +460,7 @@ ProcArrayClearTransaction(PGPROC *proc)
 
 	/* redundant, but just in case */
 	pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
-	pgxact->inCommit = false;
+	pgxact->delayChkpt = false;
 
 	/* Clear the subtransaction-XID cache too */
 	pgxact->nxids = 0;
@@ -1741,65 +1741,70 @@ GetOldestActiveTransactionId(void)
 }
 
 /*
- * GetTransactionsInCommit -- Get the XIDs of transactions that are committing
+ * GetVirtualXIDsDelayingChkpt -- Get the VXIDs of transactions that are
+ * delaying checkpoint because they have critical actions in progress.
  *
- * Constructs an array of XIDs of transactions that are currently in commit
- * critical sections, as shown by having inCommit set in their PGPROC entries.
+ * Constructs an array of VXIDs of transactions that are currently in commit
+ * critical sections, as shown by having delayChkpt set in their PGXACT.
  *
- * *xids_p is set to a palloc'd array that should be freed by the caller.
- * The return value is the number of valid entries.
+ * Returns a palloc'd array that should be freed by the caller.
+ * *nvxids is the number of valid entries.
  *
- * Note that because backends set or clear inCommit without holding any lock,
+ * Note that because backends set or clear delayChkpt without holding any lock,
  * the result is somewhat indeterminate, but we don't really care.  Even in
  * a multiprocessor with delayed writes to shared memory, it should be certain
- * that setting of inCommit will propagate to shared memory when the backend
- * takes the WALInsertLock, so we cannot fail to see an xact as inCommit if
+ * that setting of delayChkpt will propagate to shared memory when the backend
+ * takes a lock, so we cannot fail to see an virtual xact as delayChkpt if
  * it's already inserted its commit record.  Whether it takes a little while
- * for clearing of inCommit to propagate is unimportant for correctness.
+ * for clearing of delayChkpt to propagate is unimportant for correctness.
  */
-int
-GetTransactionsInCommit(TransactionId **xids_p)
+VirtualTransactionId *
+GetVirtualXIDsDelayingChkpt(int *nvxids)
 {
+	VirtualTransactionId *vxids;
 	ProcArrayStruct *arrayP = procArray;
-	TransactionId *xids;
-	int			nxids;
+	int			count = 0;
 	int			index;
 
-	xids = (TransactionId *) palloc(arrayP->maxProcs * sizeof(TransactionId));
-	nxids = 0;
+	/* allocate what's certainly enough result space */
+	vxids = (VirtualTransactionId *)
+		palloc(sizeof(VirtualTransactionId) * arrayP->maxProcs);
 
 	LWLockAcquire(ProcArrayLock, LW_SHARED);
 
 	for (index = 0; index < arrayP->numProcs; index++)
 	{
 		int		pgprocno = arrayP->pgprocnos[index];
-		volatile PGXACT *pgxact = &allPgXact[pgprocno];
-		TransactionId pxid;
+		volatile PGPROC    *proc = &allProcs[pgprocno];
+		volatile PGXACT    *pgxact = &allPgXact[pgprocno];
 
-		/* Fetch xid just once - see GetNewTransactionId */
-		pxid = pgxact->xid;
+		if (pgxact->delayChkpt)
+		{
+			VirtualTransactionId vxid;
 
-		if (pgxact->inCommit && TransactionIdIsValid(pxid))
-			xids[nxids++] = pxid;
+			GET_VXID_FROM_PGPROC(vxid, *proc);
+			if (VirtualTransactionIdIsValid(vxid))
+				vxids[count++] = vxid;
+		}
 	}
 
 	LWLockRelease(ProcArrayLock);
 
-	*xids_p = xids;
-	return nxids;
+	*nvxids = count;
+	return vxids;
 }
 
 /*
- * HaveTransactionsInCommit -- Are any of the specified XIDs in commit?
+ * HaveVirtualXIDsDelayingChkpt -- Are any of the specified VXIDs delaying?
  *
- * This is used with the results of GetTransactionsInCommit to see if any
- * of the specified XIDs are still in their commit critical sections.
+ * This is used with the results of GetVirtualXIDsDelayingChkpt to see if any
+ * of the specified VXIDs are still in critical sections of code.
  *
- * Note: this is O(N^2) in the number of xacts that are/were in commit, but
+ * Note: this is O(N^2) in the number of vxacts that are/were delaying, but
  * those numbers should be small enough for it not to be a problem.
  */
 bool
-HaveTransactionsInCommit(TransactionId *xids, int nxids)
+HaveVirtualXIDsDelayingChkpt(VirtualTransactionId *vxids, int nvxids)
 {
 	bool		result = false;
 	ProcArrayStruct *arrayP = procArray;
@@ -1807,30 +1812,32 @@ HaveTransactionsInCommit(TransactionId *xids, int nxids)
 
 	LWLockAcquire(ProcArrayLock, LW_SHARED);
 
-	for (index = 0; index < arrayP->numProcs; index++)
+	while (VirtualTransactionIdIsValid(*vxids))
 	{
-		int		pgprocno = arrayP->pgprocnos[index];
-		volatile PGXACT    *pgxact = &allPgXact[pgprocno];
-		TransactionId	pxid;
-
-		/* Fetch xid just once - see GetNewTransactionId */
-		pxid = pgxact->xid;
-
-		if (pgxact->inCommit && TransactionIdIsValid(pxid))
+		for (index = 0; index < arrayP->numProcs; index++)
 		{
-			int			i;
+			int		pgprocno = arrayP->pgprocnos[index];
+			volatile PGPROC    *proc = &allProcs[pgprocno];
+			volatile PGXACT    *pgxact = &allPgXact[pgprocno];
+			VirtualTransactionId vxid;
 
-			for (i = 0; i < nxids; i++)
+			GET_VXID_FROM_PGPROC(vxid, *proc);
+			if (VirtualTransactionIdIsValid(vxid))
 			{
-				if (xids[i] == pxid)
+				if (VirtualTransactionIdEquals(vxid, *vxids) &&
+					pgxact->delayChkpt)
 				{
 					result = true;
 					break;
 				}
 			}
-			if (result)
-				break;
 		}
+
+		if (result)
+			break;
+
+		/* The virtual transaction is gone now, wait for the next one */
+		vxids++;
 	}
 
 	LWLockRelease(ProcArrayLock);
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 2196f51..845fcfb 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -356,7 +356,7 @@ InitProcess(void)
 	MyProc->backendId = InvalidBackendId;
 	MyProc->databaseId = InvalidOid;
 	MyProc->roleId = InvalidOid;
-	MyPgXact->inCommit = false;
+	MyPgXact->delayChkpt = false;
 	MyPgXact->vacuumFlags = 0;
 	/* NB -- autovac launcher intentionally does not set IS_AUTOVACUUM */
 	if (IsAutoVacuumWorkerProcess())
@@ -514,7 +514,7 @@ InitAuxiliaryProcess(void)
 	MyProc->backendId = InvalidBackendId;
 	MyProc->databaseId = InvalidOid;
 	MyProc->roleId = InvalidOid;
-	MyPgXact->inCommit = false;
+	MyPgXact->delayChkpt = false;
 	MyPgXact->vacuumFlags = 0;
 	MyProc->lwWaiting = false;
 	MyProc->lwWaitMode = 0;
diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c
index 90a731c..1c17ecc 100644
--- a/src/backend/storage/page/bufpage.c
+++ b/src/backend/storage/page/bufpage.c
@@ -16,6 +16,12 @@
 
 #include "access/htup.h"
 
+bool page_checksums = false;
+
+static Page pageCopy;	/* temporary buffer to allow checksum calculation */
+
+static bool PageVerificationInfoOK(Page page);
+static uint16 PageCalcChecksum16(Page page);
 
 /* ----------------------------------------------------------------
  *						Page support functions
@@ -25,6 +31,10 @@
 /*
  * PageInit
  *		Initializes the contents of a page.
+ *		Note that we don't automatically add a checksum, or flag that the
+ * 		page has a checksum field. We start with a normal page layout and defer
+ *		the decision on what page verification will be written just before
+ *		we write the block to disk.
  */
 void
 PageInit(Page page, Size pageSize, Size specialSize)
@@ -67,21 +77,26 @@ PageInit(Page page, Size pageSize, Size specialSize)
  * will clean up such a page and make it usable.
  */
 bool
-PageHeaderIsValid(PageHeader page)
+PageIsVerified(Page page)
 {
+	PageHeader	p = (PageHeader) page;
 	char	   *pagebytes;
 	int			i;
 
-	/* Check normal case */
-	if (PageGetPageSize(page) == BLCKSZ &&
-		PageGetPageLayoutVersion(page) == PG_PAGE_LAYOUT_VERSION &&
-		(page->pd_flags & ~PD_VALID_FLAG_BITS) == 0 &&
-		page->pd_lower >= SizeOfPageHeaderData &&
-		page->pd_lower <= page->pd_upper &&
-		page->pd_upper <= page->pd_special &&
-		page->pd_special <= BLCKSZ &&
-		page->pd_special == MAXALIGN(page->pd_special))
-		return true;
+	/*
+	 * Don't verify page data unless the page passes basic non-zero test
+	 */
+	if (p->pd_lower >= SizeOfPageHeaderData)
+	{
+		/* Check normal case */
+		if (PageVerificationInfoOK(page) &&
+			(p->pd_flags & ~PD_VALID_FLAG_BITS) == 0 &&
+			p->pd_lower <= p->pd_upper &&
+			p->pd_upper <= p->pd_special &&
+			p->pd_special <= BLCKSZ &&
+			p->pd_special == MAXALIGN(p->pd_special))
+			return true;
+	}
 
 	/* Check all-zeroes case */
 	pagebytes = (char *) page;
@@ -827,3 +842,311 @@ PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
 
 	pfree(itemidbase);
 }
+
+/*
+ * Test whether the page verification information is correct or not.
+ *
+ * IMPORTANT NOTE -
+ * Verification info is not valid at all times on a data page. We set
+ * verification info before we flush page/buffer, and implicitly invalidate
+ * verification info when we write to the page. A heavily accessed buffer
+ * might then spend most of its life with invalid page verification info,
+ * so testing verification info on random pages in the buffer pool will tell
+ * you nothing. The reason for this is that page verification info protects
+ * Postgres data from errors on the filesystems on which we rely. We do not
+ * protect buffers against uncorrectable memory errors, since these have a
+ * very low measured incidence according to research on large server farms,
+ * http://www.cs.toronto.edu/~bianca/papers/sigmetrics09.pdf, discussed 2010/12/22.
+ *
+ * To confirm your understanding that means that WAL-logged changes to a page
+ * do NOT update the page verification info, so full page images may not have
+ * correct verification information on them. But those page images have the
+ * WAL CRC covering them and so are verified separately from this mechanism.
+ *
+ * Any write of a data block can cause a torn page if the write is unsuccessful.
+ * Full page writes protect us from that, which are stored in WAL. Setting
+ * hint bits when a page is already dirty is OK because a full page write
+ * must already have been written for that since the last checkpoint.
+ * Setting hint bits on an otherwise clean page can allow torn pages; this
+ * doesn't normally matter since they are just hints. When the page has
+ * checksums, losing a few bits would cause the checksum to be invalid.
+ * So if we have full_page_writes = on and page_checksums = on then we must
+ * write a WAL record specifically so that we record a full page image in WAL.
+ * New WAL records cannot be written during recovery, so hint bits set
+ * during recovery must not dirty the page if the buffer is not already dirty,
+ * when page_checksums = on.
+ *
+ * So we cannot enable/disable page_checksums except at a checkpoint if
+ * full_page_writes is enabled. We choose to only allow changes at server start.
+ *
+ * WAL replay ignores page verification info unless it writes out or reads in
+ * blocks from disk; restoring full page images does not check verification
+ * info via this function. In recovery, since we only dirty a block when we
+ * have a full page image available if we crash, we are fully OK to use page
+ * verification normally.
+ *
+ * The best way to understand this is that WAL CRCs protect records entering
+ * the WAL stream, and page verification protects blocks entering the shared
+ * buffer pool. They are similar in purpose, yet completely separate.
+ * Together they ensure we are able to detect errors in data re-entering
+ * PostgreSQL controlled memory. Note also that the WAL checksum is a
+ * 32-bit CRC, whereas the page checksum is a Fletcher16 checksum, not a CRC.
+ *
+ * Note also that the verification mechanism can vary from page to page.
+ * All we do here is look at what the page itself says is the verification
+ * mechanism and then apply that test. This allows us to run without the CPU
+ * cost of verification if we choose, as well as to provide an upgrade path
+ * for anyone doing direct upgrades using pg_upgrade.
+ *
+ * There is some concern that trusting page data to say how to check page
+ * data is dangerously self-referential. To ensure no mistakes we set two
+ * non-adjacent bits to signify that the page has a checksum and
+ * should be verified when that block is read back into a buffer.
+ * We use two bits in case a multiple bit error removes one of the checksum
+ * flags *and* destroys data, which would lead to skipping the checksum check
+ * and silently accepting bad data. We also require that a third bit is
+ * zeroed, to protect against a run of 1s being written to the page header.
+ *
+ * This function returns a boolean, not a full damage assessment.
+ */
+static bool
+PageVerificationInfoOK(Page page)
+{
+	PageHeader	p = (PageHeader) page;
+
+	/* Quick exit if nobody cares about checksumming */
+	if (!page_checksums)
+		return true;
+
+	if (PageHasChecksum(p))
+	{
+		uint16	checksum = PageCalcChecksum16(page);
+
+		if (checksum == p->pd_verify.pd_checksum16)
+		{
+#ifdef CHECK_HOLE
+			/* Also check page hole is all-zeroes */
+			char	   *pagebytes;
+			bool		empty = true;
+			int			i;
+
+			pagebytes = (char *) page;
+			for (i = p->pd_lower; i < p->pd_upper; i++)
+			{
+				if (pagebytes[i] != 0)
+				{
+					empty = false;
+					break;
+				}
+			}
+
+			if (!empty)
+				elog(LOG, "hole was not empty at byte %d pd_lower %d pd_upper %d",
+								i, p->pd_lower, p->pd_upper);
+#endif
+			return true;
+		}
+
+		elog(LOG, "page verification failed - checksum was %u page checksum field is %u",
+						checksum, p->pd_verify.pd_checksum16);
+	}
+	else if (PageHasNoChecksum(p))
+	{
+		uint16		pd_pagesize_version = BLCKSZ | PG_PAGE_LAYOUT_VERSION;
+
+		if (p->pd_verify.pd_pagesize_version == pd_pagesize_version)
+			return true;
+
+		elog(LOG, "page verification failed - page size and version set incorrectly %u",
+						p->pd_verify.pd_checksum16);
+	}
+	else
+		elog(LOG, "page verification failed - page flags are incorrectly set");
+
+	return false;
+}
+
+#define PageSetChecksum(page) \
+do \
+{ \
+	PageHeader	p = (PageHeader) page; \
+	p->pd_flags |= PD_PAGE_VERSION_PLUS1; \
+	p->pd_flags |= PD_CHECKSUM1; \
+	p->pd_flags &= ~PD_CHECKSUM2; \
+	p->pd_verify.pd_checksum16 = PageCalcChecksum16(page); \
+} while (0);
+
+/* ensure any older checksum info is overwritten with watermark */
+#define PageResetVersion(page) \
+do \
+{ \
+	PageHeader	p = (PageHeader) page; \
+	if (!PageHasNoChecksum(p)) \
+	{ \
+		p->pd_flags &= ~PD_PAGE_VERSION_PLUS1; \
+		p->pd_flags &= ~PD_CHECKSUM1; \
+		p->pd_flags &= ~PD_CHECKSUM2; \
+		PageSetPageSizeAndVersion(p, BLCKSZ, PG_PAGE_LAYOUT_VERSION); \
+	} \
+} while (0);
+
+/*
+ * Set verification info for page in shared buffers.
+ *
+ * Either we set a new checksum, or we set the standard watermark. We must
+ * not leave an invalid checksum in place. Note that the verification info is
+ * not WAL logged, whereas the data changes to pages are, so data is safe
+ * whether or not we have page_checksums enabled. The purpose of checksums
+ * is to detect page corruption to allow replacement from backup.
+ *
+ * Returns a pointer to the block-sized data that needs to be written. That
+ * allows us to either copy, or not, depending upon whether we checksum.
+ */
+char *
+PageSetVerificationInfoOnCopy(Page page)
+{
+	if (PageIsNew(page))
+		return (char *) page;
+
+	if (page_checksums)
+	{
+		if (pageCopy == NULL)
+		{
+			pageCopy = (Page) malloc(BLCKSZ);
+			if (pageCopy == NULL)
+				ereport(ERROR,
+					(errcode(ERRCODE_OUT_OF_MEMORY),
+					 errmsg("out of memory")));
+		}
+
+		/*
+		 * We make a copy iff we need to calculate a checksum because other
+		 * backends may set hint bits on this page while we write, which
+		 * would mean the checksum differs from the page contents. It doesn't
+		 * matter if we include or exclude hints during the copy, as long
+		 * as we write a valid page and associated checksum.
+		 */
+		memcpy((char *) pageCopy, (char *) page, BLCKSZ);
+		PageSetChecksum(pageCopy);
+		return (char *) pageCopy;
+	}
+
+	PageResetVersion(page);
+	return (char *) page;
+}
+
+/*
+ * Set verification info for page in private memory.
+ *
+ * This is a simpler version of PageSetVerificationInfoOnCopy(). The more
+ * explicit API allows us to more easily see if we're making the correct call
+ * and reduces the amount of additional code specific to page verification.
+ */
+void
+PageSetVerificationInfoInplace(Page page)
+{
+	if (PageIsNew(page))
+		return;
+
+	if (page_checksums)
+	{
+		PageSetChecksum(page);
+		return;
+	}
+
+	PageResetVersion(page);
+	return;
+}
+
+/*
+ * Calculate checksum for a PostgreSQL Page. We do this in 3 steps, first
+ * we calculate the checksum for the header, avoiding the verification
+ * info, which will be added afterwards. Next, we add the line pointers up to
+ * the hole in the middle of the block at pd_lower. Last, we add the tail
+ * of the page from pd_upper to the end of page.
+ */
+static uint16
+PageCalcChecksum16(Page page)
+{
+#define PAGE_VERIFICATION_USES_FLETCHER16 (true)
+#ifdef PAGE_VERIFICATION_USES_FLETCHER16
+	/*
+	 * Following calculation is a Flecther's 16 checksum. The calc is isolated
+	 * here and tuning and/or replacement algorithms are possible.
+	 */
+	PageHeader	p = (PageHeader) page;
+	uint	page_header_stop = (uint)(offsetof(PageHeaderData, pd_special) + sizeof(LocationIndex));
+	uint	page_lower_start = (uint)(offsetof(PageHeaderData, pd_prune_xid));
+	uint	page_lower_stop;
+	uint 	sum1 = 0;
+	uint64	sum2 = 0;
+	int		i;
+
+	/*
+	 * Avoid calculating checksum if page is new, just return a value that
+	 * will cause the check to fail. We may still pass the all-zeroes check.
+	 */
+	if (PageIsNew(page))
+		return 1;
+
+	/*
+	 * Just add in the pd_prune_xid if there are no line pointers yet.
+	 */
+	page_lower_stop = p->pd_lower;
+	if (page_lower_stop == 0)
+		page_lower_stop = page_lower_start + sizeof(TransactionId);
+
+	Assert(p->pd_upper != 0);
+
+#ifdef DEBUG_CHECKSUM
+	elog(LOG, "calculating checksum for %u-%u %u-%u %u-%u",
+			0,	/* page_header_start */
+			page_header_stop,
+			page_lower_start,
+			page_lower_stop,
+			p->pd_upper,
+			BLCKSZ
+			);
+#endif
+
+#define	COMP_F16(from, to) \
+do { \
+	for (i = from; i < to; i++) \
+	{ \
+			sum1 = sum1 + page[i]; \
+			sum2 = sum1 + sum2; \
+	} \
+	sum1 %= 255; \
+	sum2 %= 255; \
+} while (0); \
+
+#ifdef IGNORE_PAGE_HOLE
+	COMP_F16(0,
+			 page_header_stop);
+
+	/* ignore the checksum field since not done yet... */
+
+	COMP_F16(page_lower_start,
+			 page_lower_stop);
+
+	/* ignore the hole in the middle of the block */
+
+	COMP_F16(p->pd_upper,
+			 BLCKSZ - 1);
+#else
+	COMP_F16(0,
+			 page_header_stop);
+
+	/* ignore the checksum field since not done yet... */
+
+	COMP_F16(page_lower_start,
+			 BLCKSZ - 1);
+#endif
+
+#ifdef DEBUG_CHECKSUM
+	elog(LOG, "checksum %u", ((sum2 << 8) | sum1));
+#endif
+
+	return ((sum2 << 8) | sum1);
+#endif
+}
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 7df5292..6c7421e 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -830,6 +830,20 @@ static struct config_bool ConfigureNamesBool[] =
 		NULL, NULL, NULL
 	},
 	{
+		{"page_checksums", PGC_POSTMASTER, WAL_SETTINGS,
+			gettext_noop("Marks database blocks with a checksum before writing them to disk. "),
+			gettext_noop("When enabled all database blocks will be marked with a checksum before writing to disk. "
+						 "When we read a database block from disk the checksum is checked, if it exists. "
+						 "If there is no checksum marked yet then no check is performed, though a "
+						 "checksum will be added later when we re-write the database block. "
+						 "When disabled checksums will be ignored, even if the block was marked "
+						 "with checksum. When disabled checksums will not be added to database blocks.")
+		},
+		&page_checksums,
+		true,
+		NULL, NULL, NULL
+	},
+	{
 		{"full_page_writes", PGC_SIGHUP, WAL_SETTINGS,
 			gettext_noop("Writes full pages to WAL when first modified after a checkpoint."),
 			gettext_noop("A page write in process during an operating system crash might be "
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 400c52b..de02b5f 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -150,15 +150,21 @@
 
 
 #------------------------------------------------------------------------------
-# WRITE AHEAD LOG
+# WRITE AHEAD LOG & RELIABILITY
 #------------------------------------------------------------------------------
 
-# - Settings -
+# - Reliability -
 
-#wal_level = minimal			# minimal, archive, or hot_standby
-					# (change requires restart)
+#page_checksums = off			# calculate checksum before database I/O
+#full_page_writes = on			# recover from partial page writes
 #fsync = on				# turns forced synchronization on or off
+
 #synchronous_commit = on		# synchronization level; on, off, or local
+
+# - Write Ahead Log -
+
+#wal_level = minimal			# minimal, archive, or hot_standby
+					# (change requires restart)
 #wal_sync_method = fsync		# the default is the first option
 					# supported by the operating system:
 					#   open_datasync
@@ -166,7 +172,6 @@
 					#   fsync
 					#   fsync_writethrough
 					#   open_sync
-#full_page_writes = on			# recover from partial page writes
 #wal_buffers = -1			# min 32kB, -1 sets based on shared_buffers
 					# (change requires restart)
 #wal_writer_delay = 200ms		# 1-10000 milliseconds
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index f8aecef..5f18d4d 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -272,6 +272,7 @@ extern int XLogFileInit(uint32 log, uint32 seg,
 			 bool *use_existent, bool use_lock);
 extern int	XLogFileOpen(uint32 log, uint32 seg);
 
+extern XLogRecPtr XLogSaveBufferForHint(Buffer buffer);
 
 extern void XLogGetLastRemoved(uint32 *log, uint32 *seg);
 extern void XLogSetAsyncXactLSN(XLogRecPtr record);
diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h
index 1031e56..445a3e0 100644
--- a/src/include/catalog/pg_control.h
+++ b/src/include/catalog/pg_control.h
@@ -61,7 +61,8 @@ typedef struct CheckPoint
 #define XLOG_BACKUP_END					0x50
 #define XLOG_PARAMETER_CHANGE			0x60
 #define XLOG_RESTORE_POINT				0x70
-#define XLOG_FPW_CHANGE				0x80
+#define XLOG_FPW_CHANGE					0x80
+#define XLOG_HINT						0x90
 
 
 /*
diff --git a/src/include/storage/bufpage.h b/src/include/storage/bufpage.h
index 1ab64e0..722cdb5 100644
--- a/src/include/storage/bufpage.h
+++ b/src/include/storage/bufpage.h
@@ -18,6 +18,8 @@
 #include "storage/item.h"
 #include "storage/off.h"
 
+extern bool page_checksums;
+
 /*
  * A postgres disk page is an abstraction layered on top of a postgres
  * disk block (which is simply a unit of i/o, see block.h).
@@ -93,7 +95,7 @@ typedef uint16 LocationIndex;
  *		pd_lower	- offset to start of free space.
  *		pd_upper	- offset to end of free space.
  *		pd_special	- offset to start of special space.
- *		pd_pagesize_version - size in bytes and page layout version number.
+ *		pd_verify	- page verification information of different kinds
  *		pd_prune_xid - oldest XID among potentially prunable tuples on page.
  *
  * The LSN is used by the buffer manager to enforce the basic rule of WAL:
@@ -106,7 +108,8 @@ typedef uint16 LocationIndex;
  * pd_prune_xid is a hint field that helps determine whether pruning will be
  * useful.	It is currently unused in index pages.
  *
- * The page version number and page size are packed together into a single
+ * For verification we store either a 16 bit checksum or a watermark of
+ * the page version number and page size packed together into a single
  * uint16 field.  This is for historical reasons: before PostgreSQL 7.3,
  * there was no concept of a page version number, and doing it this way
  * lets us pretend that pre-7.3 databases have page version number zero.
@@ -130,7 +133,13 @@ typedef struct PageHeaderData
 	LocationIndex pd_lower;		/* offset to start of free space */
 	LocationIndex pd_upper;		/* offset to end of free space */
 	LocationIndex pd_special;	/* offset to start of special space */
-	uint16		pd_pagesize_version;
+
+	union
+	{
+		uint16		pd_pagesize_version;
+		uint16		pd_checksum16;
+	} pd_verify;				/* page verification data */
+
 	TransactionId pd_prune_xid; /* oldest prunable XID, or zero if none */
 	ItemIdData	pd_linp[1];		/* beginning of line pointer array */
 } PageHeaderData;
@@ -155,7 +164,42 @@ typedef PageHeaderData *PageHeader;
 #define PD_ALL_VISIBLE		0x0004		/* all tuples on page are visible to
 										 * everyone */
 
-#define PD_VALID_FLAG_BITS	0x0007		/* OR of all valid pd_flags bits */
+#define PD_VALID_FLAG_BITS	0x400F		/* OR of all non-checksum pd_flags bits */
+
+#define PD_CHECKSUM1		0x0008		/* First checksum bit - should be 1 */
+#define PD_CHECKSUM2		0x8000		/* Second checksum bit - should be 0 */
+
+/*
+ * Page layout version number increment bits (still part of flags).
+ * Next flag would be called PD_PAGE_VERSION_PLUS2, PD_PAGE_VERSION_PLUS4 etc
+ */
+#define PD_PAGE_VERSION_PLUS1	0x4000
+
+/*
+ * PD_SET_CHECKSUM is a bitmask of multiple flags
+ *
+ * When we wish to set a checksum we require exact settings for 3 bits.
+ *
+ * (1) First, we must set the version correctly, so we set all increment bits.
+ *
+ * (2) Next we set PD_CHECKSUM2 to be zero, the opposite setting of the increment
+ * bits. So any error that causes a run of multiple zeroes or ones to be written
+ * to the block will be detected.
+ *
+ * (3) Next we set PD_CHECKSUM1 to be one, to indicate the block has a checksum.
+ * A setting of just the increment bits but no checksum is version-legal but
+ * we call that an error also to reduce number of chances for error.
+ */
+#define PD_SET_CHECKSUM		0x4008		/* All 3 bits set correctly for checksum */
+
+#define PageHasChecksum(page) \
+	((((PageHeader) (page))->pd_flags & PD_SET_CHECKSUM) == PD_SET_CHECKSUM)
+
+#define PageHasNoChecksum(page) \
+	((((PageHeader) (page))->pd_flags & PD_SET_CHECKSUM) == 0)
+
+#define PageHasChecksumFlagError(page) \
+	(!PageHasChecksum(page) && !PageHasNoChecksum(page))
 
 /*
  * Page layout version number 0 is for pre-7.3 Postgres releases.
@@ -165,9 +209,29 @@ typedef PageHeaderData *PageHeader;
  * Release 8.3 uses 4; it changed the HeapTupleHeader layout again, and
  *		added the pd_flags field (by stealing some bits from pd_tli),
  *		as well as adding the pd_prune_xid field (which enlarges the header).
+ * Release 9.2 uses versions 4 and/or 5. Each page may only have one version,
+ * though different pages may have different versions. This is arranged
+ * deliberately to allow us to upgrade from one major version to another.
  */
 #define PG_PAGE_LAYOUT_VERSION		4
 
+/*
+ * PageGetPageLayoutVersion
+ *		Returns the page layout version of a page.
+ *
+ * To calculate the page layout version we take the basic page layout (4)
+ * and increment that according to the incremental bits above. If further
+ * versions of the page are required AND those page versions must be
+ * compatible with v4 and v5 then we would do this by setting new
+ * incremental bits and adding them into the definition below.
+ * Note that this works for all page versions because we don't look at the
+ * actual data field if we have the version increment bits set.
+ */
+#define PageGetPageLayoutVersion(page) \
+	((((PageHeader) (page))->pd_flags & PD_PAGE_VERSION_PLUS1) == PD_PAGE_VERSION_PLUS1 ? \
+			PG_PAGE_LAYOUT_VERSION + 1 : \
+			((PageHeader) (page))->pd_verify.pd_pagesize_version - BLCKSZ)
+
 
 /* ----------------------------------------------------------------
  *						page support macros
@@ -231,19 +295,19 @@ typedef PageHeaderData *PageHeader;
  * PageGetPageSize
  *		Returns the page size of a page.
  *
- * this can only be called on a formatted page (unlike
- * BufferGetPageSize, which can be called on an unformatted page).
- * however, it can be called on a page that is not stored in a buffer.
+ * Since PageSizeIsValid() when pagesize == BLCKSZ, just written BLCKSZ.
+ * This can be called on any page, initialised or not, in or out of buffers.
+ * You might think this can vary at runtime but you'd be wrong, since pages
+ * frequently need to occupy buffers and pages are copied from one to another
+ * so there are many hidden assumptions that this simple definition is true.
  */
-#define PageGetPageSize(page) \
-	((Size) (((PageHeader) (page))->pd_pagesize_version & (uint16) 0xFF00))
+#define PageGetPageSize(page) (BLCKSZ)
+
+#define PageGetPageSizeField(page) \
+	((((PageHeader) (page))->pd_flags & PD_PAGE_VERSION_PLUS1) == PD_PAGE_VERSION_PLUS1 ? \
+			(BLCKSZ) : \
+			(((PageHeader) (page))->pd_verify.pd_pagesize_version & 0xFF00))
 
-/*
- * PageGetPageLayoutVersion
- *		Returns the page layout version of a page.
- */
-#define PageGetPageLayoutVersion(page) \
-	(((PageHeader) (page))->pd_pagesize_version & 0x00FF)
 
 /*
  * PageSetPageSizeAndVersion
@@ -251,12 +315,14 @@ typedef PageHeaderData *PageHeader;
  *
  * We could support setting these two values separately, but there's
  * no real need for it at the moment.
+ *
+ * Must not be used on a page that is flagged for checksums.
  */
 #define PageSetPageSizeAndVersion(page, size, version) \
 ( \
 	AssertMacro(((size) & 0xFF00) == (size)), \
 	AssertMacro(((version) & 0x00FF) == (version)), \
-	((PageHeader) (page))->pd_pagesize_version = (size) | (version) \
+	((PageHeader) (page))->pd_verify.pd_pagesize_version = (size) | (version) \
 )
 
 /* ----------------
@@ -368,7 +434,7 @@ do { \
  */
 
 extern void PageInit(Page page, Size pageSize, Size specialSize);
-extern bool PageHeaderIsValid(PageHeader page);
+extern bool PageIsVerified(Page page);
 extern OffsetNumber PageAddItem(Page page, Item item, Size size,
 			OffsetNumber offsetNumber, bool overwrite, bool is_heap);
 extern Page PageGetTempPage(Page page);
@@ -381,5 +447,7 @@ extern Size PageGetExactFreeSpace(Page page);
 extern Size PageGetHeapFreeSpace(Page page);
 extern void PageIndexTupleDelete(Page page, OffsetNumber offset);
 extern void PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems);
+extern char *PageSetVerificationInfoOnCopy(Page page);
+extern void PageSetVerificationInfoInplace(Page page);
 
 #endif   /* BUFPAGE_H */
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 415c093..ea4a13f 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -168,7 +168,8 @@ typedef struct PGXACT
 
 	uint8		vacuumFlags;	/* vacuum-related flags, see above */
 	bool		overflowed;
-	bool		inCommit;		/* true if within commit critical section */
+	bool		delayChkpt; 	/* true if this proc delays checkpoint start */
+								/* previously called InCommit */
 
 	uint8		nxids;
 } PGXACT;
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index 0b0aa35..8648bdb 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -52,8 +52,8 @@ extern bool TransactionIdIsActive(TransactionId xid);
 extern TransactionId GetOldestXmin(bool allDbs, bool ignoreVacuum);
 extern TransactionId GetOldestActiveTransactionId(void);
 
-extern int	GetTransactionsInCommit(TransactionId **xids_p);
-extern bool HaveTransactionsInCommit(TransactionId *xids, int nxids);
+extern VirtualTransactionId *GetVirtualXIDsDelayingChkpt(int *nvxids);
+extern bool HaveVirtualXIDsDelayingChkpt(VirtualTransactionId *vxids, int nvxids);
 
 extern PGPROC *BackendPidGetProc(int pid);
 extern int	BackendXidGetPid(TransactionId xid);
