diff --git a/contrib/pg_upgrade/file.c b/contrib/pg_upgrade/file.c
new file mode 100644
index a5d92c6..d8cd8f5
*** a/contrib/pg_upgrade/file.c
--- b/contrib/pg_upgrade/file.c
*************** copy_file(const char *srcfile, const cha
*** 221,281 ****
  #endif
  
  
- /*
-  * load_directory()
-  *
-  * Read all the file names in the specified directory, and return them as
-  * an array of "char *" pointers.  The array address is returned in
-  * *namelist, and the function result is the count of file names.
-  *
-  * To free the result data, free each (char *) array member, then free the
-  * namelist array itself.
-  */
- int
- load_directory(const char *dirname, char ***namelist)
- {
- 	DIR		   *dirdesc;
- 	struct dirent *direntry;
- 	int			count = 0;
- 	int			allocsize = 64;		/* initial array size */
- 
- 	*namelist = (char **) pg_malloc(allocsize * sizeof(char *));
- 
- 	if ((dirdesc = opendir(dirname)) == NULL)
- 		pg_log(PG_FATAL, "could not open directory \"%s\": %s\n",
- 			   dirname, getErrorText(errno));
- 
- 	while (errno = 0, (direntry = readdir(dirdesc)) != NULL)
- 	{
- 		if (count >= allocsize)
- 		{
- 			allocsize *= 2;
- 			*namelist = (char **)
- 						pg_realloc(*namelist, allocsize * sizeof(char *));
- 		}
- 
- 		(*namelist)[count++] = pg_strdup(direntry->d_name);
- 	}
- 
- #ifdef WIN32
- 	/*
- 	 * This fix is in mingw cvs (runtime/mingwex/dirent.c rev 1.4), but not in
- 	 * released version
- 	 */
- 	if (GetLastError() == ERROR_NO_MORE_FILES)
- 		errno = 0;
- #endif
- 
- 	if (errno)
- 		pg_log(PG_FATAL, "could not read directory \"%s\": %s\n",
- 			   dirname, getErrorText(errno));
- 
- 	closedir(dirdesc);
- 
- 	return count;
- }
- 
- 
  void
  check_hard_link(void)
  {
--- 221,226 ----
diff --git a/contrib/pg_upgrade/pg_upgrade.h b/contrib/pg_upgrade/pg_upgrade.h
new file mode 100644
index 3058343..f35ce75
*** a/contrib/pg_upgrade/pg_upgrade.h
--- b/contrib/pg_upgrade/pg_upgrade.h
***************
*** 7,13 ****
  
  #include <unistd.h>
  #include <assert.h>
- #include <dirent.h>
  #include <sys/stat.h>
  #include <sys/time.h>
  
--- 7,12 ----
*************** const char *setupPageConverter(pageCnvCt
*** 366,372 ****
  typedef void *pageCnvCtx;
  #endif
  
- int			load_directory(const char *dirname, char ***namelist);
  const char *copyAndUpdateFile(pageCnvCtx *pageConverter, const char *src,
  				  const char *dst, bool force);
  const char *linkAndUpdateFile(pageCnvCtx *pageConverter, const char *src,
--- 365,370 ----
diff --git a/contrib/pg_upgrade/relfilenode.c b/contrib/pg_upgrade/relfilenode.c
new file mode 100644
index 33a867f..d763ba7
*** a/contrib/pg_upgrade/relfilenode.c
--- b/contrib/pg_upgrade/relfilenode.c
***************
*** 17,25 ****
  
  static void transfer_single_new_db(pageCnvCtx *pageConverter,
  					   FileNameMap *maps, int size);
! static void transfer_relfile(pageCnvCtx *pageConverter,
! 				 const char *fromfile, const char *tofile,
! 				 const char *nspname, const char *relname);
  
  
  /*
--- 17,24 ----
  
  static void transfer_single_new_db(pageCnvCtx *pageConverter,
  					   FileNameMap *maps, int size);
! static void transfer_relfile(pageCnvCtx *pageConverter, FileNameMap *map,
! 							 const char *suffix);
  
  
  /*
*************** static void
*** 131,185 ****
  transfer_single_new_db(pageCnvCtx *pageConverter,
  					   FileNameMap *maps, int size)
  {
- 	char		old_dir[MAXPGPATH];
- 	char		file_pattern[MAXPGPATH];
- 	char		**namelist = NULL;
- 	int			numFiles = 0;
  	int			mapnum;
! 	int			fileno;
! 	bool		vm_crashsafe_change = false;
! 
! 	old_dir[0] = '\0';
! 
! 	/* Do not copy non-crashsafe vm files for binaries that assume crashsafety */
  	if (old_cluster.controldata.cat_ver < VISIBILITY_MAP_CRASHSAFE_CAT_VER &&
  		new_cluster.controldata.cat_ver >= VISIBILITY_MAP_CRASHSAFE_CAT_VER)
! 		vm_crashsafe_change = true;
  
  	for (mapnum = 0; mapnum < size; mapnum++)
  	{
! 		char		old_file[MAXPGPATH];
! 		char		new_file[MAXPGPATH];
! 
! 		/* Changed tablespaces?  Need a new directory scan? */
! 		if (strcmp(maps[mapnum].old_dir, old_dir) != 0)
! 		{
! 			if (numFiles > 0)
! 			{
! 				for (fileno = 0; fileno < numFiles; fileno++)
! 					pg_free(namelist[fileno]);
! 				pg_free(namelist);
! 			}
! 
! 			snprintf(old_dir, sizeof(old_dir), "%s", maps[mapnum].old_dir);
! 			numFiles = load_directory(old_dir, &namelist);
! 		}
! 
! 		/* Copying files might take some time, so give feedback. */
! 
! 		snprintf(old_file, sizeof(old_file), "%s/%u", maps[mapnum].old_dir,
! 				 maps[mapnum].old_relfilenode);
! 		snprintf(new_file, sizeof(new_file), "%s/%u", maps[mapnum].new_dir,
! 				 maps[mapnum].new_relfilenode);
! 		pg_log(PG_REPORT, OVERWRITE_MESSAGE, old_file);
! 
! 		/*
! 		 * Copy/link the relation's primary file (segment 0 of main fork)
! 		 * to the new cluster
! 		 */
! 		unlink(new_file);
! 		transfer_relfile(pageConverter, old_file, new_file,
! 						 maps[mapnum].nspname, maps[mapnum].relname);
  
  		/* fsm/vm files added in PG 8.4 */
  		if (GET_MAJOR_VERSION(old_cluster.major_version) >= 804)
--- 130,150 ----
  transfer_single_new_db(pageCnvCtx *pageConverter,
  					   FileNameMap *maps, int size)
  {
  	int			mapnum;
! 	bool		vm_crashsafe_match = true;
! 	
! 	/*
! 	 * Do the old and new cluster disagree on the crash-safetiness of the vm
!      * files?  If so, do not copy them.
!      */
  	if (old_cluster.controldata.cat_ver < VISIBILITY_MAP_CRASHSAFE_CAT_VER &&
  		new_cluster.controldata.cat_ver >= VISIBILITY_MAP_CRASHSAFE_CAT_VER)
! 		vm_crashsafe_match = false;
  
  	for (mapnum = 0; mapnum < size; mapnum++)
  	{
! 		/* transfer primary file */
! 		transfer_relfile(pageConverter, &maps[mapnum], "");
  
  		/* fsm/vm files added in PG 8.4 */
  		if (GET_MAJOR_VERSION(old_cluster.major_version) >= 804)
*************** transfer_single_new_db(pageCnvCtx *pageC
*** 187,253 ****
  			/*
  			 * Copy/link any fsm and vm files, if they exist
  			 */
! 			snprintf(file_pattern, sizeof(file_pattern), "%u_",
! 					 maps[mapnum].old_relfilenode);
! 
! 			for (fileno = 0; fileno < numFiles; fileno++)
! 			{
! 				char	   *vm_offset = strstr(namelist[fileno], "_vm");
! 				bool		is_vm_file = false;
! 
! 				/* Is a visibility map file? (name ends with _vm) */
! 				if (vm_offset && strlen(vm_offset) == strlen("_vm"))
! 					is_vm_file = true;
! 
! 				if (strncmp(namelist[fileno], file_pattern,
! 							strlen(file_pattern)) == 0 &&
! 					(!is_vm_file || !vm_crashsafe_change))
! 				{
! 					snprintf(old_file, sizeof(old_file), "%s/%s", maps[mapnum].old_dir,
! 							 namelist[fileno]);
! 					snprintf(new_file, sizeof(new_file), "%s/%u%s", maps[mapnum].new_dir,
! 							 maps[mapnum].new_relfilenode, strchr(namelist[fileno], '_'));
! 
! 					unlink(new_file);
! 					transfer_relfile(pageConverter, old_file, new_file,
! 								 maps[mapnum].nspname, maps[mapnum].relname);
! 				}
! 			}
! 		}
! 
! 		/*
! 		 * Now copy/link any related segments as well. Remember, PG breaks
! 		 * large files into 1GB segments, the first segment has no extension,
! 		 * subsequent segments are named relfilenode.1, relfilenode.2,
! 		 * relfilenode.3, ...  'fsm' and 'vm' files use underscores so are not
! 		 * copied.
! 		 */
! 		snprintf(file_pattern, sizeof(file_pattern), "%u.",
! 				 maps[mapnum].old_relfilenode);
! 
! 		for (fileno = 0; fileno < numFiles; fileno++)
! 		{
! 			if (strncmp(namelist[fileno], file_pattern,
! 						strlen(file_pattern)) == 0)
! 			{
! 				snprintf(old_file, sizeof(old_file), "%s/%s", maps[mapnum].old_dir,
! 						 namelist[fileno]);
! 				snprintf(new_file, sizeof(new_file), "%s/%u%s", maps[mapnum].new_dir,
! 						 maps[mapnum].new_relfilenode, strchr(namelist[fileno], '.'));
! 
! 				unlink(new_file);
! 				transfer_relfile(pageConverter, old_file, new_file,
! 								 maps[mapnum].nspname, maps[mapnum].relname);
! 			}
  		}
  	}
- 
- 	if (numFiles > 0)
- 	{
- 		for (fileno = 0; fileno < numFiles; fileno++)
- 			pg_free(namelist[fileno]);
- 		pg_free(namelist);
- 	}
  }
  
  
--- 152,162 ----
  			/*
  			 * Copy/link any fsm and vm files, if they exist
  			 */
! 			transfer_relfile(pageConverter, &maps[mapnum], "_fsm");
! 			if (vm_crashsafe_match)
! 				transfer_relfile(pageConverter, &maps[mapnum], "_vm");
  		}
  	}
  }
  
  
*************** transfer_single_new_db(pageCnvCtx *pageC
*** 257,287 ****
   * Copy or link file from old cluster to new one.
   */
  static void
! transfer_relfile(pageCnvCtx *pageConverter, const char *old_file,
! 			  const char *new_file, const char *nspname, const char *relname)
  {
  	const char *msg;
! 
! 	if ((user_opts.transfer_mode == TRANSFER_MODE_LINK) && (pageConverter != NULL))
! 		pg_log(PG_FATAL, "This upgrade requires page-by-page conversion, "
! 			   "you must use copy mode instead of link mode.\n");
! 
! 	if (user_opts.transfer_mode == TRANSFER_MODE_COPY)
  	{
! 		pg_log(PG_VERBOSE, "copying \"%s\" to \"%s\"\n", old_file, new_file);
  
! 		if ((msg = copyAndUpdateFile(pageConverter, old_file, new_file, true)) != NULL)
! 			pg_log(PG_FATAL, "error while copying relation \"%s.%s\" (\"%s\" to \"%s\"): %s\n",
! 				   nspname, relname, old_file, new_file, msg);
! 	}
! 	else
! 	{
! 		pg_log(PG_VERBOSE, "linking \"%s\" to \"%s\"\n", old_file, new_file);
  
- 		if ((msg = linkAndUpdateFile(pageConverter, old_file, new_file)) != NULL)
- 			pg_log(PG_FATAL,
- 				   "error while creating link for relation \"%s.%s\" (\"%s\" to \"%s\"): %s\n",
- 				   nspname, relname, old_file, new_file, msg);
- 	}
  	return;
  }
--- 166,243 ----
   * Copy or link file from old cluster to new one.
   */
  static void
! transfer_relfile(pageCnvCtx *pageConverter, FileNameMap *map,
! 				 const char *type_suffix)
  {
  	const char *msg;
! 	char		old_file[MAXPGPATH];
! 	char		new_file[MAXPGPATH];
! 	int			fd;
! 	int			segno;
! 	char		extent_suffix[65];
! 	
! 	/*
! 	 * Now copy/link any related segments as well. Remember, PG breaks
! 	 * large files into 1GB segments, the first segment has no extension,
! 	 * subsequent segments are named relfilenode.1, relfilenode.2,
! 	 * relfilenode.3.
! 	 * copied.
! 	 */
! 	for (segno = 0;; segno++)
  	{
! 		if (segno == 0)
! 			extent_suffix[0] = '\0';
! 		else
! 			snprintf(extent_suffix, sizeof(extent_suffix), ".%d", segno);
  
! 		snprintf(old_file, sizeof(old_file), "%s/%u%s%s", map->old_dir,
! 				 map->old_relfilenode, type_suffix, extent_suffix);
! 		snprintf(new_file, sizeof(new_file), "%s/%u%s%s", map->new_dir,
! 				 map->new_relfilenode, type_suffix, extent_suffix);
! 	
! 		/* Is it an extent, fsm, or vm file? */
! 		if (type_suffix[0] != '\0' || segno != 0)
! 		{
! 			/* Did file open fail? */
! 			if ((fd = open(old_file, O_RDONLY)) == -1)
! 			{
! 				/* File does not exist?  That's OK, just return */
! 				if (errno == ENOENT)
! 					return;
! 				else
! 					pg_log(PG_FATAL, "non-existant file error while copying relation \"%s.%s\" (\"%s\" to \"%s\")\n",
! 						   map->nspname, map->relname, old_file, new_file);
! 			}
! 			close(fd);
! 		}
! 
! 		unlink(new_file);
! 	
! 		/* Copying files might take some time, so give feedback. */
! 		pg_log(PG_REPORT, OVERWRITE_MESSAGE, old_file);
! 	
! 		if ((user_opts.transfer_mode == TRANSFER_MODE_LINK) && (pageConverter != NULL))
! 			pg_log(PG_FATAL, "This upgrade requires page-by-page conversion, "
! 				   "you must use copy mode instead of link mode.\n");
! 	
! 		if (user_opts.transfer_mode == TRANSFER_MODE_COPY)
! 		{
! 			pg_log(PG_VERBOSE, "copying \"%s\" to \"%s\"\n", old_file, new_file);
! 	
! 			if ((msg = copyAndUpdateFile(pageConverter, old_file, new_file, true)) != NULL)
! 				pg_log(PG_FATAL, "error while copying relation \"%s.%s\" (\"%s\" to \"%s\"): %s\n",
! 					   map->nspname, map->relname, old_file, new_file, msg);
! 		}
! 		else
! 		{
! 			pg_log(PG_VERBOSE, "linking \"%s\" to \"%s\"\n", old_file, new_file);
! 	
! 			if ((msg = linkAndUpdateFile(pageConverter, old_file, new_file)) != NULL)
! 				pg_log(PG_FATAL,
! 					   "error while creating link for relation \"%s.%s\" (\"%s\" to \"%s\"): %s\n",
! 					   map->nspname, map->relname, old_file, new_file, msg);
! 		}
!    }
  
  	return;
  }
diff --git a/doc/src/sgml/Makefile b/doc/src/sgml/Makefile
new file mode 100644
index f40a1fe..5c3afad
*** a/doc/src/sgml/Makefile
--- b/doc/src/sgml/Makefile
*************** postgres.xml: $(srcdir)/postgres.sgml $(
*** 255,266 ****
  	rm postgres.xmltmp
  # ' hello Emacs
  
! xslthtml: xslthtml-stamp
! 
! xslthtml-stamp: stylesheet.xsl postgres.xml
  	$(XSLTPROC) $(XSLTPROCFLAGS) $(XSLTPROC_HTML_FLAGS) $^
- 	cp $(srcdir)/stylesheet.css html/
- 	touch $@
  
  htmlhelp: stylesheet-hh.xsl postgres.xml
  	$(XSLTPROC) $(XSLTPROCFLAGS) $^
--- 255,262 ----
  	rm postgres.xmltmp
  # ' hello Emacs
  
! xslthtml: stylesheet.xsl postgres.xml
  	$(XSLTPROC) $(XSLTPROCFLAGS) $(XSLTPROC_HTML_FLAGS) $^
  
  htmlhelp: stylesheet-hh.xsl postgres.xml
  	$(XSLTPROC) $(XSLTPROCFLAGS) $^
diff --git a/doc/src/sgml/ref/create_table.sgml b/doc/src/sgml/ref/create_table.sgml
new file mode 100644
index 8872920..445ca40
*** a/doc/src/sgml/ref/create_table.sgml
--- b/doc/src/sgml/ref/create_table.sgml
*************** CREATE TABLE employees OF employee_type
*** 1453,1459 ****
    <simplelist type="inline">
     <member><xref linkend="sql-altertable"></member>
     <member><xref linkend="sql-droptable"></member>
-    <member><xref linkend="sql-createtableas"></member>
     <member><xref linkend="sql-createtablespace"></member>
     <member><xref linkend="sql-createtype"></member>
    </simplelist>
--- 1453,1458 ----
diff --git a/src/backend/access/gin/ginfast.c b/src/backend/access/gin/ginfast.c
new file mode 100644
index 55df02a..b9bfde2
*** a/src/backend/access/gin/ginfast.c
--- b/src/backend/access/gin/ginfast.c
*************** ginHeapTupleFastInsert(GinState *ginstat
*** 290,296 ****
  		if (metadata->head == InvalidBlockNumber)
  		{
  			/*
! 			 * Main list is empty, so just insert sublist as main list
  			 */
  			START_CRIT_SECTION();
  
--- 290,296 ----
  		if (metadata->head == InvalidBlockNumber)
  		{
  			/*
! 			 * Main list is empty, so just copy sublist into main list
  			 */
  			START_CRIT_SECTION();
  
*************** ginHeapTupleFastInsert(GinState *ginstat
*** 313,326 ****
  			LockBuffer(buffer, GIN_EXCLUSIVE);
  			page = BufferGetPage(buffer);
  
- 			rdata[0].next = rdata + 1;
- 
- 			rdata[1].buffer = buffer;
- 			rdata[1].buffer_std = true;
- 			rdata[1].data = NULL;
- 			rdata[1].len = 0;
- 			rdata[1].next = NULL;
- 
  			Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber);
  
  			START_CRIT_SECTION();
--- 313,318 ----
diff --git a/src/backend/access/gin/ginxlog.c b/src/backend/access/gin/ginxlog.c
new file mode 100644
index 4536c9c..250619c
*** a/src/backend/access/gin/ginxlog.c
--- b/src/backend/access/gin/ginxlog.c
*************** ginRedoCreateIndex(XLogRecPtr lsn, XLogR
*** 77,85 ****
  				MetaBuffer;
  	Page		page;
  
- 	/* Backup blocks are not used in create_index records */
- 	Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
- 
  	MetaBuffer = XLogReadBuffer(*node, GIN_METAPAGE_BLKNO, true);
  	Assert(BufferIsValid(MetaBuffer));
  	page = (Page) BufferGetPage(MetaBuffer);
--- 77,82 ----
*************** ginRedoCreatePTree(XLogRecPtr lsn, XLogR
*** 112,120 ****
  	Buffer		buffer;
  	Page		page;
  
- 	/* Backup blocks are not used in create_ptree records */
- 	Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
- 
  	buffer = XLogReadBuffer(data->node, data->blkno, true);
  	Assert(BufferIsValid(buffer));
  	page = (Page) BufferGetPage(buffer);
--- 109,114 ----
*************** ginRedoInsert(XLogRecPtr lsn, XLogRecord
*** 165,176 ****
  		}
  	}
  
! 	/* If we have a full-page image, restore it and we're done */
! 	if (record->xl_info & XLR_BKP_BLOCK(0))
! 	{
! 		(void) RestoreBackupBlock(lsn, record, 0, false, false);
  		return;
- 	}
  
  	buffer = XLogReadBuffer(data->node, data->blkno, false);
  	if (!BufferIsValid(buffer))
--- 159,167 ----
  		}
  	}
  
! 	/* nothing else to do if page was backed up */
! 	if (record->xl_info & XLR_BKP_BLOCK_1)
  		return;
  
  	buffer = XLogReadBuffer(data->node, data->blkno, false);
  	if (!BufferIsValid(buffer))
*************** ginRedoSplit(XLogRecPtr lsn, XLogRecord
*** 265,273 ****
  	if (data->isData)
  		flags |= GIN_DATA;
  
- 	/* Backup blocks are not used in split records */
- 	Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
- 
  	lbuffer = XLogReadBuffer(data->node, data->lblkno, true);
  	Assert(BufferIsValid(lbuffer));
  	lpage = (Page) BufferGetPage(lbuffer);
--- 256,261 ----
*************** ginRedoVacuumPage(XLogRecPtr lsn, XLogRe
*** 381,392 ****
  	Buffer		buffer;
  	Page		page;
  
! 	/* If we have a full-page image, restore it and we're done */
! 	if (record->xl_info & XLR_BKP_BLOCK(0))
! 	{
! 		(void) RestoreBackupBlock(lsn, record, 0, false, false);
  		return;
- 	}
  
  	buffer = XLogReadBuffer(data->node, data->blkno, false);
  	if (!BufferIsValid(buffer))
--- 369,377 ----
  	Buffer		buffer;
  	Page		page;
  
! 	/* nothing to do if page was backed up (and no info to do it with) */
! 	if (record->xl_info & XLR_BKP_BLOCK_1)
  		return;
  
  	buffer = XLogReadBuffer(data->node, data->blkno, false);
  	if (!BufferIsValid(buffer))
*************** static void
*** 435,472 ****
  ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)
  {
  	ginxlogDeletePage *data = (ginxlogDeletePage *) XLogRecGetData(record);
! 	Buffer		dbuffer;
! 	Buffer		pbuffer;
! 	Buffer		lbuffer;
  	Page		page;
  
! 	if (record->xl_info & XLR_BKP_BLOCK(0))
! 		dbuffer = RestoreBackupBlock(lsn, record, 0, false, true);
! 	else
  	{
! 		dbuffer = XLogReadBuffer(data->node, data->blkno, false);
! 		if (BufferIsValid(dbuffer))
  		{
! 			page = BufferGetPage(dbuffer);
  			if (!XLByteLE(lsn, PageGetLSN(page)))
  			{
  				Assert(GinPageIsData(page));
  				GinPageGetOpaque(page)->flags = GIN_DELETED;
  				PageSetLSN(page, lsn);
  				PageSetTLI(page, ThisTimeLineID);
! 				MarkBufferDirty(dbuffer);
  			}
  		}
  	}
  
! 	if (record->xl_info & XLR_BKP_BLOCK(1))
! 		pbuffer = RestoreBackupBlock(lsn, record, 1, false, true);
! 	else
  	{
! 		pbuffer = XLogReadBuffer(data->node, data->parentBlkno, false);
! 		if (BufferIsValid(pbuffer))
  		{
! 			page = BufferGetPage(pbuffer);
  			if (!XLByteLE(lsn, PageGetLSN(page)))
  			{
  				Assert(GinPageIsData(page));
--- 420,452 ----
  ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)
  {
  	ginxlogDeletePage *data = (ginxlogDeletePage *) XLogRecGetData(record);
! 	Buffer		buffer;
  	Page		page;
  
! 	if (!(record->xl_info & XLR_BKP_BLOCK_1))
  	{
! 		buffer = XLogReadBuffer(data->node, data->blkno, false);
! 		if (BufferIsValid(buffer))
  		{
! 			page = BufferGetPage(buffer);
  			if (!XLByteLE(lsn, PageGetLSN(page)))
  			{
  				Assert(GinPageIsData(page));
  				GinPageGetOpaque(page)->flags = GIN_DELETED;
  				PageSetLSN(page, lsn);
  				PageSetTLI(page, ThisTimeLineID);
! 				MarkBufferDirty(buffer);
  			}
+ 			UnlockReleaseBuffer(buffer);
  		}
  	}
  
! 	if (!(record->xl_info & XLR_BKP_BLOCK_2))
  	{
! 		buffer = XLogReadBuffer(data->node, data->parentBlkno, false);
! 		if (BufferIsValid(buffer))
  		{
! 			page = BufferGetPage(buffer);
  			if (!XLByteLE(lsn, PageGetLSN(page)))
  			{
  				Assert(GinPageIsData(page));
*************** ginRedoDeletePage(XLogRecPtr lsn, XLogRe
*** 474,508 ****
  				GinPageDeletePostingItem(page, data->parentOffset);
  				PageSetLSN(page, lsn);
  				PageSetTLI(page, ThisTimeLineID);
! 				MarkBufferDirty(pbuffer);
  			}
  		}
  	}
  
! 	if (record->xl_info & XLR_BKP_BLOCK(2))
! 		(void) RestoreBackupBlock(lsn, record, 2, false, false);
! 	else if (data->leftBlkno != InvalidBlockNumber)
  	{
! 		lbuffer = XLogReadBuffer(data->node, data->leftBlkno, false);
! 		if (BufferIsValid(lbuffer))
  		{
! 			page = BufferGetPage(lbuffer);
  			if (!XLByteLE(lsn, PageGetLSN(page)))
  			{
  				Assert(GinPageIsData(page));
  				GinPageGetOpaque(page)->rightlink = data->rightLink;
  				PageSetLSN(page, lsn);
  				PageSetTLI(page, ThisTimeLineID);
! 				MarkBufferDirty(lbuffer);
  			}
! 			UnlockReleaseBuffer(lbuffer);
  		}
  	}
- 
- 	if (BufferIsValid(pbuffer))
- 		UnlockReleaseBuffer(pbuffer);
- 	if (BufferIsValid(dbuffer))
- 		UnlockReleaseBuffer(dbuffer);
  }
  
  static void
--- 454,482 ----
  				GinPageDeletePostingItem(page, data->parentOffset);
  				PageSetLSN(page, lsn);
  				PageSetTLI(page, ThisTimeLineID);
! 				MarkBufferDirty(buffer);
  			}
+ 			UnlockReleaseBuffer(buffer);
  		}
  	}
  
! 	if (!(record->xl_info & XLR_BKP_BLOCK_3) && data->leftBlkno != InvalidBlockNumber)
  	{
! 		buffer = XLogReadBuffer(data->node, data->leftBlkno, false);
! 		if (BufferIsValid(buffer))
  		{
! 			page = BufferGetPage(buffer);
  			if (!XLByteLE(lsn, PageGetLSN(page)))
  			{
  				Assert(GinPageIsData(page));
  				GinPageGetOpaque(page)->rightlink = data->rightLink;
  				PageSetLSN(page, lsn);
  				PageSetTLI(page, ThisTimeLineID);
! 				MarkBufferDirty(buffer);
  			}
! 			UnlockReleaseBuffer(buffer);
  		}
  	}
  }
  
  static void
*************** ginRedoUpdateMetapage(XLogRecPtr lsn, XL
*** 531,539 ****
  		/*
  		 * insert into tail page
  		 */
! 		if (record->xl_info & XLR_BKP_BLOCK(0))
! 			(void) RestoreBackupBlock(lsn, record, 0, false, false);
! 		else
  		{
  			buffer = XLogReadBuffer(data->node, data->metadata.tail, false);
  			if (BufferIsValid(buffer))
--- 505,511 ----
  		/*
  		 * insert into tail page
  		 */
! 		if (!(record->xl_info & XLR_BKP_BLOCK_1))
  		{
  			buffer = XLogReadBuffer(data->node, data->metadata.tail, false);
  			if (BufferIsValid(buffer))
*************** ginRedoUpdateMetapage(XLogRecPtr lsn, XL
*** 581,605 ****
  		/*
  		 * New tail
  		 */
! 		if (record->xl_info & XLR_BKP_BLOCK(0))
! 			(void) RestoreBackupBlock(lsn, record, 0, false, false);
! 		else
  		{
! 			buffer = XLogReadBuffer(data->node, data->prevTail, false);
! 			if (BufferIsValid(buffer))
! 			{
! 				Page		page = BufferGetPage(buffer);
  
! 				if (!XLByteLE(lsn, PageGetLSN(page)))
! 				{
! 					GinPageGetOpaque(page)->rightlink = data->newRightlink;
  
! 					PageSetLSN(page, lsn);
! 					PageSetTLI(page, ThisTimeLineID);
! 					MarkBufferDirty(buffer);
! 				}
! 				UnlockReleaseBuffer(buffer);
  			}
  		}
  	}
  
--- 553,572 ----
  		/*
  		 * New tail
  		 */
! 		buffer = XLogReadBuffer(data->node, data->prevTail, false);
! 		if (BufferIsValid(buffer))
  		{
! 			Page		page = BufferGetPage(buffer);
  
! 			if (!XLByteLE(lsn, PageGetLSN(page)))
! 			{
! 				GinPageGetOpaque(page)->rightlink = data->newRightlink;
  
! 				PageSetLSN(page, lsn);
! 				PageSetTLI(page, ThisTimeLineID);
! 				MarkBufferDirty(buffer);
  			}
+ 			UnlockReleaseBuffer(buffer);
  		}
  	}
  
*************** ginRedoInsertListPage(XLogRecPtr lsn, XL
*** 618,629 ****
  				tupsize;
  	IndexTuple	tuples = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsertListPage));
  
! 	/* If we have a full-page image, restore it and we're done */
! 	if (record->xl_info & XLR_BKP_BLOCK(0))
! 	{
! 		(void) RestoreBackupBlock(lsn, record, 0, false, false);
  		return;
- 	}
  
  	buffer = XLogReadBuffer(data->node, data->blkno, true);
  	Assert(BufferIsValid(buffer));
--- 585,592 ----
  				tupsize;
  	IndexTuple	tuples = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsertListPage));
  
! 	if (record->xl_info & XLR_BKP_BLOCK_1)
  		return;
  
  	buffer = XLogReadBuffer(data->node, data->blkno, true);
  	Assert(BufferIsValid(buffer));
*************** ginRedoDeleteListPages(XLogRecPtr lsn, X
*** 669,677 ****
  	Page		metapage;
  	int			i;
  
- 	/* Backup blocks are not used in delete_listpage records */
- 	Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
- 
  	metabuffer = XLogReadBuffer(data->node, GIN_METAPAGE_BLKNO, false);
  	if (!BufferIsValid(metabuffer))
  		return;					/* assume index was deleted, nothing to do */
--- 632,637 ----
*************** ginRedoDeleteListPages(XLogRecPtr lsn, X
*** 685,700 ****
  		MarkBufferDirty(metabuffer);
  	}
  
- 	/*
- 	 * In normal operation, shiftList() takes exclusive lock on all the
- 	 * pages-to-be-deleted simultaneously.	During replay, however, it should
- 	 * be all right to lock them one at a time.  This is dependent on the fact
- 	 * that we are deleting pages from the head of the list, and that readers
- 	 * share-lock the next page before releasing the one they are on. So we
- 	 * cannot get past a reader that is on, or due to visit, any page we are
- 	 * going to delete.  New incoming readers will block behind our metapage
- 	 * lock and then see a fully updated page list.
- 	 */
  	for (i = 0; i < data->ndeleted; i++)
  	{
  		Buffer		buffer = XLogReadBuffer(data->node, data->toDelete[i], false);
--- 645,650 ----
*************** gin_redo(XLogRecPtr lsn, XLogRecord *rec
*** 728,733 ****
--- 678,684 ----
  	 * implement a similar optimization as we have in b-tree, and remove
  	 * killed tuples outside VACUUM, we'll need to handle that here.
  	 */
+ 	RestoreBkpBlocks(lsn, record, false);
  
  	topCtx = MemoryContextSwitchTo(opCtx);
  	switch (info)
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c
new file mode 100644
index 4440499..76029d9
*** a/src/backend/access/gist/gistxlog.c
--- b/src/backend/access/gist/gistxlog.c
*************** typedef struct
*** 32,79 ****
  static MemoryContext opCtx;		/* working memory for operations */
  
  /*
!  * Replay the clearing of F_FOLLOW_RIGHT flag on a child page.
!  *
!  * Even if the WAL record includes a full-page image, we have to update the
!  * follow-right flag, because that change is not included in the full-page
!  * image.  To be sure that the intermediate state with the wrong flag value is
!  * not visible to concurrent Hot Standby queries, this function handles
!  * restoring the full-page image as well as updating the flag.  (Note that
!  * we never need to do anything else to the child page in the current WAL
!  * action.)
   */
  static void
! gistRedoClearFollowRight(XLogRecPtr lsn, XLogRecord *record, int block_index,
! 						 RelFileNode node, BlockNumber childblkno)
  {
  	Buffer		buffer;
- 	Page		page;
  
! 	if (record->xl_info & XLR_BKP_BLOCK(block_index))
! 		buffer = RestoreBackupBlock(lsn, record, block_index, false, true);
! 	else
  	{
! 		buffer = XLogReadBuffer(node, childblkno, false);
! 		if (!BufferIsValid(buffer))
! 			return;				/* page was deleted, nothing to do */
! 	}
! 	page = (Page) BufferGetPage(buffer);
  
! 	/*
! 	 * Note that we still update the page even if page LSN is equal to the LSN
! 	 * of this record, because the updated NSN is not included in the full
! 	 * page image.
! 	 */
! 	if (!XLByteLT(lsn, PageGetLSN(page)))
! 	{
! 		GistPageGetOpaque(page)->nsn = lsn;
! 		GistClearFollowRight(page);
  
! 		PageSetLSN(page, lsn);
! 		PageSetTLI(page, ThisTimeLineID);
! 		MarkBufferDirty(buffer);
  	}
- 	UnlockReleaseBuffer(buffer);
  }
  
  /*
--- 32,66 ----
  static MemoryContext opCtx;		/* working memory for operations */
  
  /*
!  * Replay the clearing of F_FOLLOW_RIGHT flag.
   */
  static void
! gistRedoClearFollowRight(RelFileNode node, XLogRecPtr lsn,
! 						 BlockNumber leftblkno)
  {
  	Buffer		buffer;
  
! 	buffer = XLogReadBuffer(node, leftblkno, false);
! 	if (BufferIsValid(buffer))
  	{
! 		Page		page = (Page) BufferGetPage(buffer);
  
! 		/*
! 		 * Note that we still update the page even if page LSN is equal to the
! 		 * LSN of this record, because the updated NSN is not included in the
! 		 * full page image.
! 		 */
! 		if (!XLByteLT(lsn, PageGetLSN(page)))
! 		{
! 			GistPageGetOpaque(page)->nsn = lsn;
! 			GistClearFollowRight(page);
  
! 			PageSetLSN(page, lsn);
! 			PageSetTLI(page, ThisTimeLineID);
! 			MarkBufferDirty(buffer);
! 		}
! 		UnlockReleaseBuffer(buffer);
  	}
  }
  
  /*
*************** gistRedoPageUpdateRecord(XLogRecPtr lsn,
*** 88,124 ****
  	Page		page;
  	char	   *data;
  
- 	/*
- 	 * We need to acquire and hold lock on target page while updating the left
- 	 * child page.  If we have a full-page image of target page, getting the
- 	 * lock is a side-effect of restoring that image.  Note that even if the
- 	 * target page no longer exists, we'll still attempt to replay the change
- 	 * on the child page.
- 	 */
- 	if (record->xl_info & XLR_BKP_BLOCK(0))
- 		buffer = RestoreBackupBlock(lsn, record, 0, false, true);
- 	else
- 		buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
- 
- 	/* Fix follow-right data on left child page */
  	if (BlockNumberIsValid(xldata->leftchild))
! 		gistRedoClearFollowRight(lsn, record, 1,
! 								 xldata->node, xldata->leftchild);
! 
! 	/* Done if target page no longer exists */
! 	if (!BufferIsValid(buffer))
! 		return;
  
  	/* nothing more to do if page was backed up (and no info to do it with) */
! 	if (record->xl_info & XLR_BKP_BLOCK(0))
! 	{
! 		UnlockReleaseBuffer(buffer);
  		return;
- 	}
  
  	page = (Page) BufferGetPage(buffer);
  
- 	/* nothing more to do if change already applied */
  	if (XLByteLE(lsn, PageGetLSN(page)))
  	{
  		UnlockReleaseBuffer(buffer);
--- 75,92 ----
  	Page		page;
  	char	   *data;
  
  	if (BlockNumberIsValid(xldata->leftchild))
! 		gistRedoClearFollowRight(xldata->node, lsn, xldata->leftchild);
  
  	/* nothing more to do if page was backed up (and no info to do it with) */
! 	if (record->xl_info & XLR_BKP_BLOCK_1)
  		return;
  
+ 	buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
+ 	if (!BufferIsValid(buffer))
+ 		return;
  	page = (Page) BufferGetPage(buffer);
  
  	if (XLByteLE(lsn, PageGetLSN(page)))
  	{
  		UnlockReleaseBuffer(buffer);
*************** gistRedoPageUpdateRecord(XLogRecPtr lsn,
*** 172,187 ****
  			GistClearTuplesDeleted(page);
  	}
  
! 	if (!GistPageIsLeaf(page) &&
! 		PageGetMaxOffsetNumber(page) == InvalidOffsetNumber &&
! 		xldata->blkno == GIST_ROOT_BLKNO)
! 	{
  		/*
  		 * all links on non-leaf root page was deleted by vacuum full, so root
  		 * page becomes a leaf
  		 */
  		GistPageSetLeaf(page);
- 	}
  
  	GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
  	PageSetLSN(page, lsn);
--- 140,152 ----
  			GistClearTuplesDeleted(page);
  	}
  
! 	if (!GistPageIsLeaf(page) && PageGetMaxOffsetNumber(page) == InvalidOffsetNumber && xldata->blkno == GIST_ROOT_BLKNO)
! 
  		/*
  		 * all links on non-leaf root page was deleted by vacuum full, so root
  		 * page becomes a leaf
  		 */
  		GistPageSetLeaf(page);
  
  	GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
  	PageSetLSN(page, lsn);
*************** gistRedoPageUpdateRecord(XLogRecPtr lsn,
*** 191,196 ****
--- 156,185 ----
  }
  
  static void
+ gistRedoPageDeleteRecord(XLogRecPtr lsn, XLogRecord *record)
+ {
+ 	gistxlogPageDelete *xldata = (gistxlogPageDelete *) XLogRecGetData(record);
+ 	Buffer		buffer;
+ 	Page		page;
+ 
+ 	/* nothing else to do if page was backed up (and no info to do it with) */
+ 	if (record->xl_info & XLR_BKP_BLOCK_1)
+ 		return;
+ 
+ 	buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
+ 	if (!BufferIsValid(buffer))
+ 		return;
+ 
+ 	page = (Page) BufferGetPage(buffer);
+ 	GistPageSetDeleted(page);
+ 
+ 	PageSetLSN(page, lsn);
+ 	PageSetTLI(page, ThisTimeLineID);
+ 	MarkBufferDirty(buffer);
+ 	UnlockReleaseBuffer(buffer);
+ }
+ 
+ static void
  decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record)
  {
  	char	   *begin = XLogRecGetData(record),
*************** gistRedoPageSplitRecord(XLogRecPtr lsn,
*** 226,247 ****
  {
  	gistxlogPageSplit *xldata = (gistxlogPageSplit *) XLogRecGetData(record);
  	PageSplitRecord xlrec;
- 	Buffer		firstbuffer = InvalidBuffer;
  	Buffer		buffer;
  	Page		page;
  	int			i;
  	bool		isrootsplit = false;
  
  	decodePageSplitRecord(&xlrec, record);
  
- 	/*
- 	 * We must hold lock on the first-listed page throughout the action,
- 	 * including while updating the left child page (if any).  We can unlock
- 	 * remaining pages in the list as soon as they've been written, because
- 	 * there is no path for concurrent queries to reach those pages without
- 	 * first visiting the first-listed page.
- 	 */
- 
  	/* loop around all pages */
  	for (i = 0; i < xlrec.data->npage; i++)
  	{
--- 215,229 ----
  {
  	gistxlogPageSplit *xldata = (gistxlogPageSplit *) XLogRecGetData(record);
  	PageSplitRecord xlrec;
  	Buffer		buffer;
  	Page		page;
  	int			i;
  	bool		isrootsplit = false;
  
+ 	if (BlockNumberIsValid(xldata->leftchild))
+ 		gistRedoClearFollowRight(xldata->node, lsn, xldata->leftchild);
  	decodePageSplitRecord(&xlrec, record);
  
  	/* loop around all pages */
  	for (i = 0; i < xlrec.data->npage; i++)
  	{
*************** gistRedoPageSplitRecord(XLogRecPtr lsn,
*** 291,310 ****
  		PageSetLSN(page, lsn);
  		PageSetTLI(page, ThisTimeLineID);
  		MarkBufferDirty(buffer);
! 
! 		if (i == 0)
! 			firstbuffer = buffer;
! 		else
! 			UnlockReleaseBuffer(buffer);
  	}
- 
- 	/* Fix follow-right data on left child page, if any */
- 	if (BlockNumberIsValid(xldata->leftchild))
- 		gistRedoClearFollowRight(lsn, record, 0,
- 								 xldata->node, xldata->leftchild);
- 
- 	/* Finally, release lock on the first page */
- 	UnlockReleaseBuffer(firstbuffer);
  }
  
  static void
--- 273,280 ----
  		PageSetLSN(page, lsn);
  		PageSetTLI(page, ThisTimeLineID);
  		MarkBufferDirty(buffer);
! 		UnlockReleaseBuffer(buffer);
  	}
  }
  
  static void
*************** gistRedoCreateIndex(XLogRecPtr lsn, XLog
*** 314,322 ****
  	Buffer		buffer;
  	Page		page;
  
- 	/* Backup blocks are not used in create_index records */
- 	Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
- 
  	buffer = XLogReadBuffer(*node, GIST_ROOT_BLKNO, true);
  	Assert(BufferIsValid(buffer));
  	page = (Page) BufferGetPage(buffer);
--- 284,289 ----
*************** gist_redo(XLogRecPtr lsn, XLogRecord *re
*** 341,346 ****
--- 308,314 ----
  	 * implement a similar optimization we have in b-tree, and remove killed
  	 * tuples outside VACUUM, we'll need to handle that here.
  	 */
+ 	RestoreBkpBlocks(lsn, record, false);
  
  	oldCxt = MemoryContextSwitchTo(opCtx);
  	switch (info)
*************** gist_redo(XLogRecPtr lsn, XLogRecord *re
*** 348,353 ****
--- 316,324 ----
  		case XLOG_GIST_PAGE_UPDATE:
  			gistRedoPageUpdateRecord(lsn, record);
  			break;
+ 		case XLOG_GIST_PAGE_DELETE:
+ 			gistRedoPageDeleteRecord(lsn, record);
+ 			break;
  		case XLOG_GIST_PAGE_SPLIT:
  			gistRedoPageSplitRecord(lsn, record);
  			break;
*************** out_gistxlogPageUpdate(StringInfo buf, g
*** 377,382 ****
--- 348,361 ----
  }
  
  static void
+ out_gistxlogPageDelete(StringInfo buf, gistxlogPageDelete *xlrec)
+ {
+ 	appendStringInfo(buf, "page_delete: rel %u/%u/%u; blkno %u",
+ 				xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode,
+ 					 xlrec->blkno);
+ }
+ 
+ static void
  out_gistxlogPageSplit(StringInfo buf, gistxlogPageSplit *xlrec)
  {
  	appendStringInfo(buf, "page_split: ");
*************** gist_desc(StringInfo buf, uint8 xl_info,
*** 396,401 ****
--- 375,383 ----
  			appendStringInfo(buf, "page_update: ");
  			out_gistxlogPageUpdate(buf, (gistxlogPageUpdate *) rec);
  			break;
+ 		case XLOG_GIST_PAGE_DELETE:
+ 			out_gistxlogPageDelete(buf, (gistxlogPageDelete *) rec);
+ 			break;
  		case XLOG_GIST_PAGE_SPLIT:
  			out_gistxlogPageSplit(buf, (gistxlogPageSplit *) rec);
  			break;
*************** gistXLogUpdate(RelFileNode node, Buffer
*** 516,545 ****
  			   Buffer leftchildbuf)
  {
  	XLogRecData *rdata;
! 	gistxlogPageUpdate xlrec;
  	int			cur,
  				i;
  	XLogRecPtr	recptr;
  
! 	rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (3 + ituplen));
  
! 	xlrec.node = node;
! 	xlrec.blkno = BufferGetBlockNumber(buffer);
! 	xlrec.ntodelete = ntodelete;
! 	xlrec.leftchild =
  		BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber;
  
! 	rdata[0].data = (char *) &xlrec;
! 	rdata[0].len = sizeof(gistxlogPageUpdate);
! 	rdata[0].buffer = InvalidBuffer;
  	rdata[0].next = &(rdata[1]);
  
! 	rdata[1].data = (char *) todelete;
! 	rdata[1].len = sizeof(OffsetNumber) * ntodelete;
! 	rdata[1].buffer = buffer;
! 	rdata[1].buffer_std = true;
  
! 	cur = 2;
  
  	/* new tuples */
  	for (i = 0; i < ituplen; i++)
--- 498,534 ----
  			   Buffer leftchildbuf)
  {
  	XLogRecData *rdata;
! 	gistxlogPageUpdate *xlrec;
  	int			cur,
  				i;
  	XLogRecPtr	recptr;
  
! 	rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (4 + ituplen));
! 	xlrec = (gistxlogPageUpdate *) palloc(sizeof(gistxlogPageUpdate));
  
! 	xlrec->node = node;
! 	xlrec->blkno = BufferGetBlockNumber(buffer);
! 	xlrec->ntodelete = ntodelete;
! 	xlrec->leftchild =
  		BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber;
  
! 	rdata[0].buffer = buffer;
! 	rdata[0].buffer_std = true;
! 	rdata[0].data = NULL;
! 	rdata[0].len = 0;
  	rdata[0].next = &(rdata[1]);
  
! 	rdata[1].data = (char *) xlrec;
! 	rdata[1].len = sizeof(gistxlogPageUpdate);
! 	rdata[1].buffer = InvalidBuffer;
! 	rdata[1].next = &(rdata[2]);
  
! 	rdata[2].data = (char *) todelete;
! 	rdata[2].len = sizeof(OffsetNumber) * ntodelete;
! 	rdata[2].buffer = buffer;
! 	rdata[2].buffer_std = true;
! 
! 	cur = 3;
  
  	/* new tuples */
  	for (i = 0; i < ituplen; i++)
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
new file mode 100644
index 64aecf2..570cf95
*** a/src/backend/access/heap/heapam.c
--- b/src/backend/access/heap/heapam.c
*************** heap_xlog_cleanup_info(XLogRecPtr lsn, X
*** 4620,4628 ****
  	 * conflict processing to occur before we begin index vacuum actions. see
  	 * vacuumlazy.c and also comments in btvacuumpage()
  	 */
- 
- 	/* Backup blocks are not used in cleanup_info records */
- 	Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
  }
  
  /*
--- 4620,4625 ----
*************** heap_xlog_clean(XLogRecPtr lsn, XLogReco
*** 4655,4669 ****
  		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
  											xlrec->node);
  
! 	/*
! 	 * If we have a full-page image, restore it (using a cleanup lock) and
! 	 * we're done.
! 	 */
! 	if (record->xl_info & XLR_BKP_BLOCK(0))
! 	{
! 		(void) RestoreBackupBlock(lsn, record, 0, true, false);
  		return;
- 	}
  
  	buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, xlrec->block, RBM_NORMAL);
  	if (!BufferIsValid(buffer))
--- 4652,4661 ----
  		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
  											xlrec->node);
  
! 	RestoreBkpBlocks(lsn, record, true);
! 
! 	if (record->xl_info & XLR_BKP_BLOCK_1)
  		return;
  
  	buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, xlrec->block, RBM_NORMAL);
  	if (!BufferIsValid(buffer))
*************** heap_xlog_freeze(XLogRecPtr lsn, XLogRec
*** 4729,4744 ****
  	if (InHotStandby)
  		ResolveRecoveryConflictWithSnapshot(cutoff_xid, xlrec->node);
  
! 	/* If we have a full-page image, restore it and we're done */
! 	if (record->xl_info & XLR_BKP_BLOCK(0))
! 	{
! 		(void) RestoreBackupBlock(lsn, record, 0, false, false);
  		return;
- 	}
  
! 	buffer = XLogReadBuffer(xlrec->node, xlrec->block, false);
  	if (!BufferIsValid(buffer))
  		return;
  	page = (Page) BufferGetPage(buffer);
  
  	if (XLByteLE(lsn, PageGetLSN(page)))
--- 4721,4735 ----
  	if (InHotStandby)
  		ResolveRecoveryConflictWithSnapshot(cutoff_xid, xlrec->node);
  
! 	RestoreBkpBlocks(lsn, record, false);
! 
! 	if (record->xl_info & XLR_BKP_BLOCK_1)
  		return;
  
! 	buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, xlrec->block, RBM_NORMAL);
  	if (!BufferIsValid(buffer))
  		return;
+ 	LockBufferForCleanup(buffer);
  	page = (Page) BufferGetPage(buffer);
  
  	if (XLByteLE(lsn, PageGetLSN(page)))
*************** heap_xlog_visible(XLogRecPtr lsn, XLogRe
*** 4788,4793 ****
--- 4779,4796 ----
  	Page		page;
  
  	/*
+ 	 * Read the heap page, if it still exists.	If the heap file has been
+ 	 * dropped or truncated later in recovery, this might fail.  In that case,
+ 	 * there's no point in doing anything further, since the visibility map
+ 	 * will have to be cleared out at the same time.
+ 	 */
+ 	buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, xlrec->block,
+ 									RBM_NORMAL);
+ 	if (!BufferIsValid(buffer))
+ 		return;
+ 	page = (Page) BufferGetPage(buffer);
+ 
+ 	/*
  	 * If there are any Hot Standby transactions running that have an xmin
  	 * horizon old enough that this page isn't all-visible for them, they
  	 * might incorrectly decide that an index-only scan can skip a heap fetch.
*************** heap_xlog_visible(XLogRecPtr lsn, XLogRe
*** 4799,4848 ****
  	if (InHotStandby)
  		ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid, xlrec->node);
  
  	/*
! 	 * Read the heap page, if it still exists.	If the heap file has been
! 	 * dropped or truncated later in recovery, we don't need to update the
! 	 * page, but we'd better still update the visibility map.
  	 */
! 	buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, xlrec->block,
! 									RBM_NORMAL);
! 	if (BufferIsValid(buffer))
  	{
! 		LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
! 
! 		page = (Page) BufferGetPage(buffer);
! 
! 		/*
! 		 * We don't bump the LSN of the heap page when setting the visibility
! 		 * map bit, because that would generate an unworkable volume of
! 		 * full-page writes.  This exposes us to torn page hazards, but since
! 		 * we're not inspecting the existing page contents in any way, we
! 		 * don't care.
! 		 *
! 		 * However, all operations that clear the visibility map bit *do* bump
! 		 * the LSN, and those operations will only be replayed if the XLOG LSN
! 		 * follows the page LSN.  Thus, if the page LSN has advanced past our
! 		 * XLOG record's LSN, we mustn't mark the page all-visible, because
! 		 * the subsequent update won't be replayed to clear the flag.
! 		 */
! 		if (!XLByteLE(lsn, PageGetLSN(page)))
! 		{
! 			PageSetAllVisible(page);
! 			MarkBufferDirty(buffer);
! 		}
! 
! 		/* Done with heap page. */
! 		UnlockReleaseBuffer(buffer);
  	}
  
  	/*
! 	 * Even if we skipped the heap page update due to the LSN interlock, it's
  	 * still safe to update the visibility map.  Any WAL record that clears
  	 * the visibility map bit does so before checking the page LSN, so any
  	 * bits that need to be cleared will still be cleared.
  	 */
! 	if (record->xl_info & XLR_BKP_BLOCK(0))
! 		(void) RestoreBackupBlock(lsn, record, 0, false, false);
  	else
  	{
  		Relation	reln;
--- 4802,4838 ----
  	if (InHotStandby)
  		ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid, xlrec->node);
  
+ 	LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+ 
  	/*
! 	 * We don't bump the LSN of the heap page when setting the visibility map
! 	 * bit, because that would generate an unworkable volume of full-page
! 	 * writes.	This exposes us to torn page hazards, but since we're not
! 	 * inspecting the existing page contents in any way, we don't care.
! 	 *
! 	 * However, all operations that clear the visibility map bit *do* bump the
! 	 * LSN, and those operations will only be replayed if the XLOG LSN follows
! 	 * the page LSN.  Thus, if the page LSN has advanced past our XLOG
! 	 * record's LSN, we mustn't mark the page all-visible, because the
! 	 * subsequent update won't be replayed to clear the flag.
  	 */
! 	if (!XLByteLE(lsn, PageGetLSN(page)))
  	{
! 		PageSetAllVisible(page);
! 		MarkBufferDirty(buffer);
  	}
  
+ 	/* Done with heap page. */
+ 	UnlockReleaseBuffer(buffer);
+ 
  	/*
! 	 * Even we skipped the heap page update due to the LSN interlock, it's
  	 * still safe to update the visibility map.  Any WAL record that clears
  	 * the visibility map bit does so before checking the page LSN, so any
  	 * bits that need to be cleared will still be cleared.
  	 */
! 	if (record->xl_info & XLR_BKP_BLOCK_1)
! 		RestoreBkpBlocks(lsn, record, false);
  	else
  	{
  		Relation	reln;
*************** heap_xlog_visible(XLogRecPtr lsn, XLogRe
*** 4854,4866 ****
  		/*
  		 * Don't set the bit if replay has already passed this point.
  		 *
! 		 * It might be safe to do this unconditionally; if replay has passed
  		 * this point, we'll replay at least as far this time as we did
  		 * before, and if this bit needs to be cleared, the record responsible
  		 * for doing so should be again replayed, and clear it.  For right
  		 * now, out of an abundance of conservatism, we use the same test here
! 		 * we did for the heap page.  If this results in a dropped bit, no
! 		 * real harm is done; and the next VACUUM will fix it.
  		 */
  		if (!XLByteLE(lsn, PageGetLSN(BufferGetPage(vmbuffer))))
  			visibilitymap_set(reln, xlrec->block, lsn, vmbuffer,
--- 4844,4856 ----
  		/*
  		 * Don't set the bit if replay has already passed this point.
  		 *
! 		 * It might be safe to do this unconditionally; if replay has past
  		 * this point, we'll replay at least as far this time as we did
  		 * before, and if this bit needs to be cleared, the record responsible
  		 * for doing so should be again replayed, and clear it.  For right
  		 * now, out of an abundance of conservatism, we use the same test here
! 		 * we did for the heap page; if this results in a dropped bit, no real
! 		 * harm is done; and the next VACUUM will fix it.
  		 */
  		if (!XLByteLE(lsn, PageGetLSN(BufferGetPage(vmbuffer))))
  			visibilitymap_set(reln, xlrec->block, lsn, vmbuffer,
*************** heap_xlog_newpage(XLogRecPtr lsn, XLogRe
*** 4878,4886 ****
  	Buffer		buffer;
  	Page		page;
  
- 	/* Backup blocks are not used in newpage records */
- 	Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
- 
  	/*
  	 * Note: the NEWPAGE log record is used for both heaps and indexes, so do
  	 * not do anything that assumes we are touching a heap.
--- 4868,4873 ----
*************** heap_xlog_delete(XLogRecPtr lsn, XLogRec
*** 4936,4947 ****
  		FreeFakeRelcacheEntry(reln);
  	}
  
! 	/* If we have a full-page image, restore it and we're done */
! 	if (record->xl_info & XLR_BKP_BLOCK(0))
! 	{
! 		(void) RestoreBackupBlock(lsn, record, 0, false, false);
  		return;
- 	}
  
  	buffer = XLogReadBuffer(xlrec->target.node, blkno, false);
  	if (!BufferIsValid(buffer))
--- 4923,4930 ----
  		FreeFakeRelcacheEntry(reln);
  	}
  
! 	if (record->xl_info & XLR_BKP_BLOCK_1)
  		return;
  
  	buffer = XLogReadBuffer(xlrec->target.node, blkno, false);
  	if (!BufferIsValid(buffer))
*************** heap_xlog_insert(XLogRecPtr lsn, XLogRec
*** 5021,5032 ****
  		FreeFakeRelcacheEntry(reln);
  	}
  
! 	/* If we have a full-page image, restore it and we're done */
! 	if (record->xl_info & XLR_BKP_BLOCK(0))
! 	{
! 		(void) RestoreBackupBlock(lsn, record, 0, false, false);
  		return;
- 	}
  
  	if (record->xl_info & XLOG_HEAP_INIT_PAGE)
  	{
--- 5004,5011 ----
  		FreeFakeRelcacheEntry(reln);
  	}
  
! 	if (record->xl_info & XLR_BKP_BLOCK_1)
  		return;
  
  	if (record->xl_info & XLOG_HEAP_INIT_PAGE)
  	{
*************** heap_xlog_multi_insert(XLogRecPtr lsn, X
*** 5128,5133 ****
--- 5107,5114 ----
  	 * required.
  	 */
  
+ 	RestoreBkpBlocks(lsn, record, false);
+ 
  	xlrec = (xl_heap_multi_insert *) recdata;
  	recdata += SizeOfHeapMultiInsert;
  
*************** heap_xlog_multi_insert(XLogRecPtr lsn, X
*** 5156,5167 ****
  		FreeFakeRelcacheEntry(reln);
  	}
  
! 	/* If we have a full-page image, restore it and we're done */
! 	if (record->xl_info & XLR_BKP_BLOCK(0))
! 	{
! 		(void) RestoreBackupBlock(lsn, record, 0, false, false);
  		return;
- 	}
  
  	if (isinit)
  	{
--- 5137,5144 ----
  		FreeFakeRelcacheEntry(reln);
  	}
  
! 	if (record->xl_info & XLR_BKP_BLOCK_1)
  		return;
  
  	if (isinit)
  	{
*************** static void
*** 5255,5264 ****
  heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
  {
  	xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record);
  	bool		samepage = (ItemPointerGetBlockNumber(&(xlrec->newtid)) ==
  							ItemPointerGetBlockNumber(&(xlrec->target.tid)));
- 	Buffer		obuffer,
- 				nbuffer;
  	Page		page;
  	OffsetNumber offnum;
  	ItemId		lp = NULL;
--- 5232,5240 ----
  heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
  {
  	xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record);
+ 	Buffer		buffer;
  	bool		samepage = (ItemPointerGetBlockNumber(&(xlrec->newtid)) ==
  							ItemPointerGetBlockNumber(&(xlrec->target.tid)));
  	Page		page;
  	OffsetNumber offnum;
  	ItemId		lp = NULL;
*************** heap_xlog_update(XLogRecPtr lsn, XLogRec
*** 5289,5332 ****
  		FreeFakeRelcacheEntry(reln);
  	}
  
! 	/*
! 	 * In normal operation, it is important to lock the two pages in
! 	 * page-number order, to avoid possible deadlocks against other update
! 	 * operations going the other way.	However, during WAL replay there can
! 	 * be no other update happening, so we don't need to worry about that. But
! 	 * we *do* need to worry that we don't expose an inconsistent state to Hot
! 	 * Standby queries --- so the original page can't be unlocked before we've
! 	 * added the new tuple to the new page.
! 	 */
! 
! 	if (record->xl_info & XLR_BKP_BLOCK(0))
  	{
- 		obuffer = RestoreBackupBlock(lsn, record, 0, false, true);
  		if (samepage)
! 		{
! 			/* backup block covered both changes, so we're done */
! 			UnlockReleaseBuffer(obuffer);
! 			return;
! 		}
  		goto newt;
  	}
  
  	/* Deal with old tuple version */
  
! 	obuffer = XLogReadBuffer(xlrec->target.node,
! 							 ItemPointerGetBlockNumber(&(xlrec->target.tid)),
! 							 false);
! 	if (!BufferIsValid(obuffer))
  		goto newt;
! 	page = (Page) BufferGetPage(obuffer);
  
  	if (XLByteLE(lsn, PageGetLSN(page)))		/* changes are applied */
  	{
  		if (samepage)
- 		{
- 			UnlockReleaseBuffer(obuffer);
  			return;
- 		}
  		goto newt;
  	}
  
--- 5265,5291 ----
  		FreeFakeRelcacheEntry(reln);
  	}
  
! 	if (record->xl_info & XLR_BKP_BLOCK_1)
  	{
  		if (samepage)
! 			return;				/* backup block covered both changes */
  		goto newt;
  	}
  
  	/* Deal with old tuple version */
  
! 	buffer = XLogReadBuffer(xlrec->target.node,
! 							ItemPointerGetBlockNumber(&(xlrec->target.tid)),
! 							false);
! 	if (!BufferIsValid(buffer))
  		goto newt;
! 	page = (Page) BufferGetPage(buffer);
  
  	if (XLByteLE(lsn, PageGetLSN(page)))		/* changes are applied */
  	{
+ 		UnlockReleaseBuffer(buffer);
  		if (samepage)
  			return;
  		goto newt;
  	}
  
*************** heap_xlog_update(XLogRecPtr lsn, XLogRec
*** 5364,5377 ****
  	 * is already applied
  	 */
  	if (samepage)
- 	{
- 		nbuffer = obuffer;
  		goto newsame;
- 	}
- 
  	PageSetLSN(page, lsn);
  	PageSetTLI(page, ThisTimeLineID);
! 	MarkBufferDirty(obuffer);
  
  	/* Deal with new tuple */
  
--- 5323,5333 ----
  	 * is already applied
  	 */
  	if (samepage)
  		goto newsame;
  	PageSetLSN(page, lsn);
  	PageSetTLI(page, ThisTimeLineID);
! 	MarkBufferDirty(buffer);
! 	UnlockReleaseBuffer(buffer);
  
  	/* Deal with new tuple */
  
*************** newt:;
*** 5393,5430 ****
  		FreeFakeRelcacheEntry(reln);
  	}
  
! 	if (record->xl_info & XLR_BKP_BLOCK(1))
! 	{
! 		(void) RestoreBackupBlock(lsn, record, 1, false, false);
! 		if (BufferIsValid(obuffer))
! 			UnlockReleaseBuffer(obuffer);
  		return;
- 	}
  
  	if (record->xl_info & XLOG_HEAP_INIT_PAGE)
  	{
! 		nbuffer = XLogReadBuffer(xlrec->target.node,
! 								 ItemPointerGetBlockNumber(&(xlrec->newtid)),
! 								 true);
! 		Assert(BufferIsValid(nbuffer));
! 		page = (Page) BufferGetPage(nbuffer);
  
! 		PageInit(page, BufferGetPageSize(nbuffer), 0);
  	}
  	else
  	{
! 		nbuffer = XLogReadBuffer(xlrec->target.node,
! 								 ItemPointerGetBlockNumber(&(xlrec->newtid)),
! 								 false);
! 		if (!BufferIsValid(nbuffer))
  			return;
! 		page = (Page) BufferGetPage(nbuffer);
  
  		if (XLByteLE(lsn, PageGetLSN(page)))	/* changes are applied */
  		{
! 			UnlockReleaseBuffer(nbuffer);
! 			if (BufferIsValid(obuffer))
! 				UnlockReleaseBuffer(obuffer);
  			return;
  		}
  	}
--- 5349,5379 ----
  		FreeFakeRelcacheEntry(reln);
  	}
  
! 	if (record->xl_info & XLR_BKP_BLOCK_2)
  		return;
  
  	if (record->xl_info & XLOG_HEAP_INIT_PAGE)
  	{
! 		buffer = XLogReadBuffer(xlrec->target.node,
! 								ItemPointerGetBlockNumber(&(xlrec->newtid)),
! 								true);
! 		Assert(BufferIsValid(buffer));
! 		page = (Page) BufferGetPage(buffer);
  
! 		PageInit(page, BufferGetPageSize(buffer), 0);
  	}
  	else
  	{
! 		buffer = XLogReadBuffer(xlrec->target.node,
! 								ItemPointerGetBlockNumber(&(xlrec->newtid)),
! 								false);
! 		if (!BufferIsValid(buffer))
  			return;
! 		page = (Page) BufferGetPage(buffer);
  
  		if (XLByteLE(lsn, PageGetLSN(page)))	/* changes are applied */
  		{
! 			UnlockReleaseBuffer(buffer);
  			return;
  		}
  	}
*************** newsame:;
*** 5469,5482 ****
  
  	PageSetLSN(page, lsn);
  	PageSetTLI(page, ThisTimeLineID);
! 	MarkBufferDirty(nbuffer);
! 	UnlockReleaseBuffer(nbuffer);
! 
! 	if (BufferIsValid(obuffer) && obuffer != nbuffer)
! 		UnlockReleaseBuffer(obuffer);
  
  	/*
! 	 * If the new page is running low on free space, update the FSM as well.
  	 * Arbitrarily, our definition of "low" is less than 20%. We can't do much
  	 * better than that without knowing the fill-factor for the table.
  	 *
--- 5418,5428 ----
  
  	PageSetLSN(page, lsn);
  	PageSetTLI(page, ThisTimeLineID);
! 	MarkBufferDirty(buffer);
! 	UnlockReleaseBuffer(buffer);
  
  	/*
! 	 * If the page is running low on free space, update the FSM as well.
  	 * Arbitrarily, our definition of "low" is less than 20%. We can't do much
  	 * better than that without knowing the fill-factor for the table.
  	 *
*************** newsame:;
*** 5492,5499 ****
  	 */
  	if (!hot_update && freespace < BLCKSZ / 5)
  		XLogRecordPageWithFreeSpace(xlrec->target.node,
! 								 ItemPointerGetBlockNumber(&(xlrec->newtid)),
! 									freespace);
  }
  
  static void
--- 5438,5444 ----
  	 */
  	if (!hot_update && freespace < BLCKSZ / 5)
  		XLogRecordPageWithFreeSpace(xlrec->target.node,
! 					 ItemPointerGetBlockNumber(&(xlrec->newtid)), freespace);
  }
  
  static void
*************** heap_xlog_lock(XLogRecPtr lsn, XLogRecor
*** 5506,5517 ****
  	ItemId		lp = NULL;
  	HeapTupleHeader htup;
  
! 	/* If we have a full-page image, restore it and we're done */
! 	if (record->xl_info & XLR_BKP_BLOCK(0))
! 	{
! 		(void) RestoreBackupBlock(lsn, record, 0, false, false);
  		return;
- 	}
  
  	buffer = XLogReadBuffer(xlrec->target.node,
  							ItemPointerGetBlockNumber(&(xlrec->target.tid)),
--- 5451,5458 ----
  	ItemId		lp = NULL;
  	HeapTupleHeader htup;
  
! 	if (record->xl_info & XLR_BKP_BLOCK_1)
  		return;
  
  	buffer = XLogReadBuffer(xlrec->target.node,
  							ItemPointerGetBlockNumber(&(xlrec->target.tid)),
*************** heap_xlog_inplace(XLogRecPtr lsn, XLogRe
*** 5569,5580 ****
  	uint32		oldlen;
  	uint32		newlen;
  
! 	/* If we have a full-page image, restore it and we're done */
! 	if (record->xl_info & XLR_BKP_BLOCK(0))
! 	{
! 		(void) RestoreBackupBlock(lsn, record, 0, false, false);
  		return;
- 	}
  
  	buffer = XLogReadBuffer(xlrec->target.node,
  							ItemPointerGetBlockNumber(&(xlrec->target.tid)),
--- 5510,5517 ----
  	uint32		oldlen;
  	uint32		newlen;
  
! 	if (record->xl_info & XLR_BKP_BLOCK_1)
  		return;
  
  	buffer = XLogReadBuffer(xlrec->target.node,
  							ItemPointerGetBlockNumber(&(xlrec->target.tid)),
*************** heap_redo(XLogRecPtr lsn, XLogRecord *re
*** 5623,5628 ****
--- 5560,5567 ----
  	 * required. The ones in heap2 rmgr do.
  	 */
  
+ 	RestoreBkpBlocks(lsn, record, false);
+ 
  	switch (info & XLOG_HEAP_OPMASK)
  	{
  		case XLOG_HEAP_INSERT:
*************** heap2_redo(XLogRecPtr lsn, XLogRecord *r
*** 5656,5661 ****
--- 5595,5605 ----
  {
  	uint8		info = record->xl_info & ~XLR_INFO_MASK;
  
+ 	/*
+ 	 * Note that RestoreBkpBlocks() is called after conflict processing within
+ 	 * each record type handling function.
+ 	 */
+ 
  	switch (info & XLOG_HEAP_OPMASK)
  	{
  		case XLOG_HEAP2_FREEZE:
diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c
new file mode 100644
index 8f53480..72ea171
*** a/src/backend/access/nbtree/nbtxlog.c
--- b/src/backend/access/nbtree/nbtxlog.c
*************** btree_xlog_insert(bool isleaf, bool isme
*** 218,226 ****
  		datalen -= sizeof(xl_btree_metadata);
  	}
  
! 	if (record->xl_info & XLR_BKP_BLOCK(0))
! 		(void) RestoreBackupBlock(lsn, record, 0, false, false);
! 	else
  	{
  		buffer = XLogReadBuffer(xlrec->target.node,
  							 ItemPointerGetBlockNumber(&(xlrec->target.tid)),
--- 218,227 ----
  		datalen -= sizeof(xl_btree_metadata);
  	}
  
! 	if ((record->xl_info & XLR_BKP_BLOCK_1) && !ismeta && isleaf)
! 		return;					/* nothing to do */
! 
! 	if (!(record->xl_info & XLR_BKP_BLOCK_1))
  	{
  		buffer = XLogReadBuffer(xlrec->target.node,
  							 ItemPointerGetBlockNumber(&(xlrec->target.tid)),
*************** btree_xlog_insert(bool isleaf, bool isme
*** 248,260 ****
  		}
  	}
  
- 	/*
- 	 * Note: in normal operation, we'd update the metapage while still holding
- 	 * lock on the page we inserted into.  But during replay it's not
- 	 * necessary to hold that lock, since no other index updates can be
- 	 * happening concurrently, and readers will cope fine with following an
- 	 * obsolete link from the metapage.
- 	 */
  	if (ismeta)
  		_bt_restore_meta(xlrec->target.node, lsn,
  						 md.root, md.level,
--- 249,254 ----
*************** btree_xlog_split(bool onleft, bool isroo
*** 296,302 ****
  		forget_matching_split(xlrec->node, downlink, false);
  
  		/* Extract left hikey and its size (still assuming 16-bit alignment) */
! 		if (!(record->xl_info & XLR_BKP_BLOCK(0)))
  		{
  			/* We assume 16-bit alignment is enough for IndexTupleSize */
  			left_hikey = (Item) datapos;
--- 290,296 ----
  		forget_matching_split(xlrec->node, downlink, false);
  
  		/* Extract left hikey and its size (still assuming 16-bit alignment) */
! 		if (!(record->xl_info & XLR_BKP_BLOCK_1))
  		{
  			/* We assume 16-bit alignment is enough for IndexTupleSize */
  			left_hikey = (Item) datapos;
*************** btree_xlog_split(bool onleft, bool isroo
*** 316,322 ****
  		datalen -= sizeof(OffsetNumber);
  	}
  
! 	if (onleft && !(record->xl_info & XLR_BKP_BLOCK(0)))
  	{
  		/*
  		 * We assume that 16-bit alignment is enough to apply IndexTupleSize
--- 310,316 ----
  		datalen -= sizeof(OffsetNumber);
  	}
  
! 	if (onleft && !(record->xl_info & XLR_BKP_BLOCK_1))
  	{
  		/*
  		 * We assume that 16-bit alignment is enough to apply IndexTupleSize
*************** btree_xlog_split(bool onleft, bool isroo
*** 329,335 ****
  		datalen -= newitemsz;
  	}
  
! 	/* Reconstruct right (new) sibling page from scratch */
  	rbuf = XLogReadBuffer(xlrec->node, xlrec->rightsib, true);
  	Assert(BufferIsValid(rbuf));
  	rpage = (Page) BufferGetPage(rbuf);
--- 323,329 ----
  		datalen -= newitemsz;
  	}
  
! 	/* Reconstruct right (new) sibling from scratch */
  	rbuf = XLogReadBuffer(xlrec->node, xlrec->rightsib, true);
  	Assert(BufferIsValid(rbuf));
  	rpage = (Page) BufferGetPage(rbuf);
*************** btree_xlog_split(bool onleft, bool isroo
*** 363,383 ****
  
  	/* don't release the buffer yet; we touch right page's first item below */
  
! 	/* Now reconstruct left (original) sibling page */
! 	if (record->xl_info & XLR_BKP_BLOCK(0))
! 		(void) RestoreBackupBlock(lsn, record, 0, false, false);
! 	else
  	{
  		Buffer		lbuf = XLogReadBuffer(xlrec->node, xlrec->leftsib, false);
  
  		if (BufferIsValid(lbuf))
  		{
- 			/*
- 			 * Note that this code ensures that the items remaining on the
- 			 * left page are in the correct item number order, but it does not
- 			 * reproduce the physical order they would have had.  Is this
- 			 * worth changing?  See also _bt_restore_page().
- 			 */
  			Page		lpage = (Page) BufferGetPage(lbuf);
  			BTPageOpaque lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage);
  
--- 357,374 ----
  
  	/* don't release the buffer yet; we touch right page's first item below */
  
! 	/*
! 	 * Reconstruct left (original) sibling if needed.  Note that this code
! 	 * ensures that the items remaining on the left page are in the correct
! 	 * item number order, but it does not reproduce the physical order they
! 	 * would have had.	Is this worth changing?  See also _bt_restore_page().
! 	 */
! 	if (!(record->xl_info & XLR_BKP_BLOCK_1))
  	{
  		Buffer		lbuf = XLogReadBuffer(xlrec->node, xlrec->leftsib, false);
  
  		if (BufferIsValid(lbuf))
  		{
  			Page		lpage = (Page) BufferGetPage(lbuf);
  			BTPageOpaque lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage);
  
*************** btree_xlog_split(bool onleft, bool isroo
*** 441,457 ****
  	/* We no longer need the right buffer */
  	UnlockReleaseBuffer(rbuf);
  
! 	/*
! 	 * Fix left-link of the page to the right of the new right sibling.
! 	 *
! 	 * Note: in normal operation, we do this while still holding lock on the
! 	 * two split pages.  However, that's not necessary for correctness in WAL
! 	 * replay, because no other index update can be in progress, and readers
! 	 * will cope properly when following an obsolete left-link.
! 	 */
! 	if (record->xl_info & XLR_BKP_BLOCK(1))
! 		(void) RestoreBackupBlock(lsn, record, 1, false, false);
! 	else if (xlrec->rnext != P_NONE)
  	{
  		Buffer		buffer = XLogReadBuffer(xlrec->node, xlrec->rnext, false);
  
--- 432,439 ----
  	/* We no longer need the right buffer */
  	UnlockReleaseBuffer(rbuf);
  
! 	/* Fix left-link of the page to the right of the new right sibling */
! 	if (xlrec->rnext != P_NONE && !(record->xl_info & XLR_BKP_BLOCK_2))
  	{
  		Buffer		buffer = XLogReadBuffer(xlrec->node, xlrec->rnext, false);
  
*************** btree_xlog_split(bool onleft, bool isroo
*** 481,491 ****
  static void
  btree_xlog_vacuum(XLogRecPtr lsn, XLogRecord *record)
  {
! 	xl_btree_vacuum *xlrec = (xl_btree_vacuum *) XLogRecGetData(record);
  	Buffer		buffer;
  	Page		page;
  	BTPageOpaque opaque;
  
  	/*
  	 * If queries might be active then we need to ensure every block is
  	 * unpinned between the lastBlockVacuumed and the current block, if there
--- 463,475 ----
  static void
  btree_xlog_vacuum(XLogRecPtr lsn, XLogRecord *record)
  {
! 	xl_btree_vacuum *xlrec;
  	Buffer		buffer;
  	Page		page;
  	BTPageOpaque opaque;
  
+ 	xlrec = (xl_btree_vacuum *) XLogRecGetData(record);
+ 
  	/*
  	 * If queries might be active then we need to ensure every block is
  	 * unpinned between the lastBlockVacuumed and the current block, if there
*************** btree_xlog_vacuum(XLogRecPtr lsn, XLogRe
*** 518,531 ****
  	}
  
  	/*
! 	 * If we have a full-page image, restore it (using a cleanup lock) and
! 	 * we're done.
  	 */
! 	if (record->xl_info & XLR_BKP_BLOCK(0))
! 	{
! 		(void) RestoreBackupBlock(lsn, record, 0, true, false);
  		return;
- 	}
  
  	/*
  	 * Like in btvacuumpage(), we need to take a cleanup lock on every leaf
--- 502,514 ----
  	}
  
  	/*
! 	 * If the block was restored from a full page image, nothing more to do.
! 	 * The RestoreBkpBlocks() call already pinned and took cleanup lock on it.
! 	 * XXX: Perhaps we should call RestoreBkpBlocks() *after* the loop above,
! 	 * to make the disk access more sequential.
  	 */
! 	if (record->xl_info & XLR_BKP_BLOCK_1)
  		return;
  
  	/*
  	 * Like in btvacuumpage(), we need to take a cleanup lock on every leaf
*************** btree_xlog_vacuum(XLogRecPtr lsn, XLogRe
*** 580,587 ****
   * XXX optimise later with something like XLogPrefetchBuffer()
   */
  static TransactionId
! btree_xlog_delete_get_latestRemovedXid(xl_btree_delete *xlrec)
  {
  	OffsetNumber *unused;
  	Buffer		ibuffer,
  				hbuffer;
--- 563,571 ----
   * XXX optimise later with something like XLogPrefetchBuffer()
   */
  static TransactionId
! btree_xlog_delete_get_latestRemovedXid(XLogRecord *record)
  {
+ 	xl_btree_delete *xlrec = (xl_btree_delete *) XLogRecGetData(record);
  	OffsetNumber *unused;
  	Buffer		ibuffer,
  				hbuffer;
*************** btree_xlog_delete_get_latestRemovedXid(x
*** 718,752 ****
  static void
  btree_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
  {
! 	xl_btree_delete *xlrec = (xl_btree_delete *) XLogRecGetData(record);
  	Buffer		buffer;
  	Page		page;
  	BTPageOpaque opaque;
  
! 	/*
! 	 * If we have any conflict processing to do, it must happen before we
! 	 * update the page.
! 	 *
! 	 * Btree delete records can conflict with standby queries.  You might
! 	 * think that vacuum records would conflict as well, but we've handled
! 	 * that already.  XLOG_HEAP2_CLEANUP_INFO records provide the highest xid
! 	 * cleaned by the vacuum of the heap and so we can resolve any conflicts
! 	 * just once when that arrives.  After that we know that no conflicts
! 	 * exist from individual btree vacuum records on that index.
! 	 */
! 	if (InHotStandby)
! 	{
! 		TransactionId latestRemovedXid = btree_xlog_delete_get_latestRemovedXid(xlrec);
! 
! 		ResolveRecoveryConflictWithSnapshot(latestRemovedXid, xlrec->node);
! 	}
! 
! 	/* If we have a full-page image, restore it and we're done */
! 	if (record->xl_info & XLR_BKP_BLOCK(0))
! 	{
! 		(void) RestoreBackupBlock(lsn, record, 0, false, false);
  		return;
! 	}
  
  	/*
  	 * We don't need to take a cleanup lock to apply these changes. See
--- 702,716 ----
  static void
  btree_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
  {
! 	xl_btree_delete *xlrec;
  	Buffer		buffer;
  	Page		page;
  	BTPageOpaque opaque;
  
! 	if (record->xl_info & XLR_BKP_BLOCK_1)
  		return;
! 
! 	xlrec = (xl_btree_delete *) XLogRecGetData(record);
  
  	/*
  	 * We don't need to take a cleanup lock to apply these changes. See
*************** btree_xlog_delete_page(uint8 info, XLogR
*** 802,819 ****
  	leftsib = xlrec->leftblk;
  	rightsib = xlrec->rightblk;
  
- 	/*
- 	 * In normal operation, we would lock all the pages this WAL record
- 	 * touches before changing any of them.  In WAL replay, it should be okay
- 	 * to lock just one page at a time, since no concurrent index updates can
- 	 * be happening, and readers should not care whether they arrive at the
- 	 * target page or not (since it's surely empty).
- 	 */
- 
  	/* parent page */
! 	if (record->xl_info & XLR_BKP_BLOCK(0))
! 		(void) RestoreBackupBlock(lsn, record, 0, false, false);
! 	else
  	{
  		buffer = XLogReadBuffer(xlrec->target.node, parent, false);
  		if (BufferIsValid(buffer))
--- 766,773 ----
  	leftsib = xlrec->leftblk;
  	rightsib = xlrec->rightblk;
  
  	/* parent page */
! 	if (!(record->xl_info & XLR_BKP_BLOCK_1))
  	{
  		buffer = XLogReadBuffer(xlrec->target.node, parent, false);
  		if (BufferIsValid(buffer))
*************** btree_xlog_delete_page(uint8 info, XLogR
*** 859,867 ****
  	}
  
  	/* Fix left-link of right sibling */
! 	if (record->xl_info & XLR_BKP_BLOCK(1))
! 		(void) RestoreBackupBlock(lsn, record, 1, false, false);
! 	else
  	{
  		buffer = XLogReadBuffer(xlrec->target.node, rightsib, false);
  		if (BufferIsValid(buffer))
--- 813,819 ----
  	}
  
  	/* Fix left-link of right sibling */
! 	if (!(record->xl_info & XLR_BKP_BLOCK_2))
  	{
  		buffer = XLogReadBuffer(xlrec->target.node, rightsib, false);
  		if (BufferIsValid(buffer))
*************** btree_xlog_delete_page(uint8 info, XLogR
*** 885,893 ****
  	}
  
  	/* Fix right-link of left sibling, if any */
! 	if (record->xl_info & XLR_BKP_BLOCK(2))
! 		(void) RestoreBackupBlock(lsn, record, 2, false, false);
! 	else
  	{
  		if (leftsib != P_NONE)
  		{
--- 837,843 ----
  	}
  
  	/* Fix right-link of left sibling, if any */
! 	if (!(record->xl_info & XLR_BKP_BLOCK_3))
  	{
  		if (leftsib != P_NONE)
  		{
*************** btree_xlog_newroot(XLogRecPtr lsn, XLogR
*** 961,969 ****
  	BTPageOpaque pageop;
  	BlockNumber downlink = 0;
  
- 	/* Backup blocks are not used in newroot records */
- 	Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
- 
  	buffer = XLogReadBuffer(xlrec->node, xlrec->rootblk, true);
  	Assert(BufferIsValid(buffer));
  	page = (Page) BufferGetPage(buffer);
--- 911,916 ----
*************** btree_xlog_newroot(XLogRecPtr lsn, XLogR
*** 1005,1040 ****
  		forget_matching_split(xlrec->node, downlink, true);
  }
  
! static void
! btree_xlog_reuse_page(XLogRecPtr lsn, XLogRecord *record)
  {
! 	xl_btree_reuse_page *xlrec = (xl_btree_reuse_page *) XLogRecGetData(record);
  
  	/*
! 	 * Btree reuse_page records exist to provide a conflict point when we
! 	 * reuse pages in the index via the FSM.  That's all they do though.
! 	 *
! 	 * latestRemovedXid was the page's btpo.xact.  The btpo.xact <
! 	 * RecentGlobalXmin test in _bt_page_recyclable() conceptually mirrors the
! 	 * pgxact->xmin > limitXmin test in GetConflictingVirtualXIDs().
! 	 * Consequently, one XID value achieves the same exclusion effect on
! 	 * master and standby.
  	 */
  	if (InHotStandby)
  	{
! 		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
! 											xlrec->node);
! 	}
  
! 	/* Backup blocks are not used in reuse_page records */
! 	Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
! }
  
  
! void
! btree_redo(XLogRecPtr lsn, XLogRecord *record)
! {
! 	uint8		info = record->xl_info & ~XLR_INFO_MASK;
  
  	switch (info)
  	{
--- 952,1018 ----
  		forget_matching_split(xlrec->node, downlink, true);
  }
  
! 
! void
! btree_redo(XLogRecPtr lsn, XLogRecord *record)
  {
! 	uint8		info = record->xl_info & ~XLR_INFO_MASK;
  
  	/*
! 	 * If we have any conflict processing to do, it must happen before we
! 	 * update the page.
  	 */
  	if (InHotStandby)
  	{
! 		switch (info)
! 		{
! 			case XLOG_BTREE_DELETE:
  
! 				/*
! 				 * Btree delete records can conflict with standby queries. You
! 				 * might think that vacuum records would conflict as well, but
! 				 * we've handled that already. XLOG_HEAP2_CLEANUP_INFO records
! 				 * provide the highest xid cleaned by the vacuum of the heap
! 				 * and so we can resolve any conflicts just once when that
! 				 * arrives. After that any we know that no conflicts exist
! 				 * from individual btree vacuum records on that index.
! 				 */
! 				{
! 					TransactionId latestRemovedXid = btree_xlog_delete_get_latestRemovedXid(record);
! 					xl_btree_delete *xlrec = (xl_btree_delete *) XLogRecGetData(record);
! 
! 					ResolveRecoveryConflictWithSnapshot(latestRemovedXid, xlrec->node);
! 				}
! 				break;
  
+ 			case XLOG_BTREE_REUSE_PAGE:
  
! 				/*
! 				 * Btree reuse page records exist to provide a conflict point
! 				 * when we reuse pages in the index via the FSM. That's all it
! 				 * does though. latestRemovedXid was the page's btpo.xact. The
! 				 * btpo.xact < RecentGlobalXmin test in _bt_page_recyclable()
! 				 * conceptually mirrors the pgxact->xmin > limitXmin test in
! 				 * GetConflictingVirtualXIDs().  Consequently, one XID value
! 				 * achieves the same exclusion effect on master and standby.
! 				 */
! 				{
! 					xl_btree_reuse_page *xlrec = (xl_btree_reuse_page *) XLogRecGetData(record);
! 
! 					ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, xlrec->node);
! 				}
! 				return;
! 
! 			default:
! 				break;
! 		}
! 	}
! 
! 	/*
! 	 * Vacuum needs to pin and take cleanup lock on every leaf page, a regular
! 	 * exclusive lock is enough for all other purposes.
! 	 */
! 	RestoreBkpBlocks(lsn, record, (info == XLOG_BTREE_VACUUM));
  
  	switch (info)
  	{
*************** btree_redo(XLogRecPtr lsn, XLogRecord *r
*** 1074,1080 ****
  			btree_xlog_newroot(lsn, record);
  			break;
  		case XLOG_BTREE_REUSE_PAGE:
! 			btree_xlog_reuse_page(lsn, record);
  			break;
  		default:
  			elog(PANIC, "btree_redo: unknown op code %u", info);
--- 1052,1058 ----
  			btree_xlog_newroot(lsn, record);
  			break;
  		case XLOG_BTREE_REUSE_PAGE:
! 			/* Handled above before restoring bkp block */
  			break;
  		default:
  			elog(PANIC, "btree_redo: unknown op code %u", info);
diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c
new file mode 100644
index 8746b35..54e78f1
*** a/src/backend/access/spgist/spgxlog.c
--- b/src/backend/access/spgist/spgxlog.c
*************** spgRedoCreateIndex(XLogRecPtr lsn, XLogR
*** 76,84 ****
  	Buffer		buffer;
  	Page		page;
  
- 	/* Backup blocks are not used in create_index records */
- 	Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
- 
  	buffer = XLogReadBuffer(*node, SPGIST_METAPAGE_BLKNO, true);
  	Assert(BufferIsValid(buffer));
  	page = (Page) BufferGetPage(buffer);
--- 76,81 ----
*************** spgRedoAddLeaf(XLogRecPtr lsn, XLogRecor
*** 120,133 ****
  	ptr += sizeof(spgxlogAddLeaf);
  	leafTuple = (SpGistLeafTuple) ptr;
  
! 	/*
! 	 * In normal operation we would have both current and parent pages locked
! 	 * simultaneously; but in WAL replay it should be safe to update the leaf
! 	 * page before updating the parent.
! 	 */
! 	if (record->xl_info & XLR_BKP_BLOCK(0))
! 		(void) RestoreBackupBlock(lsn, record, 0, false, false);
! 	else
  	{
  		buffer = XLogReadBuffer(xldata->node, xldata->blknoLeaf,
  								xldata->newPage);
--- 117,123 ----
  	ptr += sizeof(spgxlogAddLeaf);
  	leafTuple = (SpGistLeafTuple) ptr;
  
! 	if (!(record->xl_info & XLR_BKP_BLOCK_1))
  	{
  		buffer = XLogReadBuffer(xldata->node, xldata->blknoLeaf,
  								xldata->newPage);
*************** spgRedoAddLeaf(XLogRecPtr lsn, XLogRecor
*** 179,187 ****
  	}
  
  	/* update parent downlink if necessary */
! 	if (record->xl_info & XLR_BKP_BLOCK(1))
! 		(void) RestoreBackupBlock(lsn, record, 1, false, false);
! 	else if (xldata->blknoParent != InvalidBlockNumber)
  	{
  		buffer = XLogReadBuffer(xldata->node, xldata->blknoParent, false);
  		if (BufferIsValid(buffer))
--- 169,176 ----
  	}
  
  	/* update parent downlink if necessary */
! 	if (xldata->blknoParent != InvalidBlockNumber &&
! 		!(record->xl_info & XLR_BKP_BLOCK_2))
  	{
  		buffer = XLogReadBuffer(xldata->node, xldata->blknoParent, false);
  		if (BufferIsValid(buffer))
*************** spgRedoMoveLeafs(XLogRecPtr lsn, XLogRec
*** 230,245 ****
  
  	/* now ptr points to the list of leaf tuples */
  
- 	/*
- 	 * In normal operation we would have all three pages (source, dest, and
- 	 * parent) locked simultaneously; but in WAL replay it should be safe to
- 	 * update them one at a time, as long as we do it in the right order.
- 	 */
- 
  	/* Insert tuples on the dest page (do first, so redirect is valid) */
! 	if (record->xl_info & XLR_BKP_BLOCK(1))
! 		(void) RestoreBackupBlock(lsn, record, 1, false, false);
! 	else
  	{
  		buffer = XLogReadBuffer(xldata->node, xldata->blknoDst,
  								xldata->newPage);
--- 219,226 ----
  
  	/* now ptr points to the list of leaf tuples */
  
  	/* Insert tuples on the dest page (do first, so redirect is valid) */
! 	if (!(record->xl_info & XLR_BKP_BLOCK_2))
  	{
  		buffer = XLogReadBuffer(xldata->node, xldata->blknoDst,
  								xldata->newPage);
*************** spgRedoMoveLeafs(XLogRecPtr lsn, XLogRec
*** 272,280 ****
  	}
  
  	/* Delete tuples from the source page, inserting a redirection pointer */
! 	if (record->xl_info & XLR_BKP_BLOCK(0))
! 		(void) RestoreBackupBlock(lsn, record, 0, false, false);
! 	else
  	{
  		buffer = XLogReadBuffer(xldata->node, xldata->blknoSrc, false);
  		if (BufferIsValid(buffer))
--- 253,259 ----
  	}
  
  	/* Delete tuples from the source page, inserting a redirection pointer */
! 	if (!(record->xl_info & XLR_BKP_BLOCK_1))
  	{
  		buffer = XLogReadBuffer(xldata->node, xldata->blknoSrc, false);
  		if (BufferIsValid(buffer))
*************** spgRedoMoveLeafs(XLogRecPtr lsn, XLogRec
*** 297,305 ****
  	}
  
  	/* And update the parent downlink */
! 	if (record->xl_info & XLR_BKP_BLOCK(2))
! 		(void) RestoreBackupBlock(lsn, record, 2, false, false);
! 	else
  	{
  		buffer = XLogReadBuffer(xldata->node, xldata->blknoParent, false);
  		if (BufferIsValid(buffer))
--- 276,282 ----
  	}
  
  	/* And update the parent downlink */
! 	if (!(record->xl_info & XLR_BKP_BLOCK_3))
  	{
  		buffer = XLogReadBuffer(xldata->node, xldata->blknoParent, false);
  		if (BufferIsValid(buffer))
*************** spgRedoAddNode(XLogRecPtr lsn, XLogRecor
*** 345,353 ****
  	{
  		/* update in place */
  		Assert(xldata->blknoParent == InvalidBlockNumber);
! 		if (record->xl_info & XLR_BKP_BLOCK(0))
! 			(void) RestoreBackupBlock(lsn, record, 0, false, false);
! 		else
  		{
  			buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
  			if (BufferIsValid(buffer))
--- 322,328 ----
  	{
  		/* update in place */
  		Assert(xldata->blknoParent == InvalidBlockNumber);
! 		if (!(record->xl_info & XLR_BKP_BLOCK_1))
  		{
  			buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
  			if (BufferIsValid(buffer))
*************** spgRedoAddNode(XLogRecPtr lsn, XLogRecor
*** 372,393 ****
  	}
  	else
  	{
- 		/*
- 		 * In normal operation we would have all three pages (source, dest,
- 		 * and parent) locked simultaneously; but in WAL replay it should be
- 		 * safe to update them one at a time, as long as we do it in the right
- 		 * order.
- 		 *
- 		 * The logic here depends on the assumption that blkno != blknoNew,
- 		 * else we can't tell which BKP bit goes with which page, and the LSN
- 		 * checks could go wrong too.
- 		 */
- 		Assert(xldata->blkno != xldata->blknoNew);
- 
  		/* Install new tuple first so redirect is valid */
! 		if (record->xl_info & XLR_BKP_BLOCK(1))
! 			(void) RestoreBackupBlock(lsn, record, 1, false, false);
! 		else
  		{
  			buffer = XLogReadBuffer(xldata->node, xldata->blknoNew,
  									xldata->newPage);
--- 347,354 ----
  	}
  	else
  	{
  		/* Install new tuple first so redirect is valid */
! 		if (!(record->xl_info & XLR_BKP_BLOCK_2))
  		{
  			buffer = XLogReadBuffer(xldata->node, xldata->blknoNew,
  									xldata->newPage);
*************** spgRedoAddNode(XLogRecPtr lsn, XLogRecor
*** 404,420 ****
  					addOrReplaceTuple(page, (Item) innerTuple,
  									  innerTuple->size, xldata->offnumNew);
  
! 					/*
! 					 * If parent is in this same page, don't advance LSN;
! 					 * doing so would fool us into not applying the parent
! 					 * downlink update below.  We'll update the LSN when we
! 					 * fix the parent downlink.
! 					 */
! 					if (xldata->blknoParent != xldata->blknoNew)
! 					{
! 						PageSetLSN(page, lsn);
! 						PageSetTLI(page, ThisTimeLineID);
! 					}
  					MarkBufferDirty(buffer);
  				}
  				UnlockReleaseBuffer(buffer);
--- 365,372 ----
  					addOrReplaceTuple(page, (Item) innerTuple,
  									  innerTuple->size, xldata->offnumNew);
  
! 					PageSetLSN(page, lsn);
! 					PageSetTLI(page, ThisTimeLineID);
  					MarkBufferDirty(buffer);
  				}
  				UnlockReleaseBuffer(buffer);
*************** spgRedoAddNode(XLogRecPtr lsn, XLogRecor
*** 422,430 ****
  		}
  
  		/* Delete old tuple, replacing it with redirect or placeholder tuple */
! 		if (record->xl_info & XLR_BKP_BLOCK(0))
! 			(void) RestoreBackupBlock(lsn, record, 0, false, false);
! 		else
  		{
  			buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
  			if (BufferIsValid(buffer))
--- 374,380 ----
  		}
  
  		/* Delete old tuple, replacing it with redirect or placeholder tuple */
! 		if (!(record->xl_info & XLR_BKP_BLOCK_1))
  		{
  			buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
  			if (BufferIsValid(buffer))
*************** spgRedoAddNode(XLogRecPtr lsn, XLogRecor
*** 455,471 ****
  					else
  						SpGistPageGetOpaque(page)->nRedirection++;
  
! 					/*
! 					 * If parent is in this same page, don't advance LSN;
! 					 * doing so would fool us into not applying the parent
! 					 * downlink update below.  We'll update the LSN when we
! 					 * fix the parent downlink.
! 					 */
! 					if (xldata->blknoParent != xldata->blkno)
! 					{
! 						PageSetLSN(page, lsn);
! 						PageSetTLI(page, ThisTimeLineID);
! 					}
  					MarkBufferDirty(buffer);
  				}
  				UnlockReleaseBuffer(buffer);
--- 405,412 ----
  					else
  						SpGistPageGetOpaque(page)->nRedirection++;
  
! 					PageSetLSN(page, lsn);
! 					PageSetTLI(page, ThisTimeLineID);
  					MarkBufferDirty(buffer);
  				}
  				UnlockReleaseBuffer(buffer);
*************** spgRedoAddNode(XLogRecPtr lsn, XLogRecor
*** 484,495 ****
  		else
  			bbi = 2;
  
! 		if (record->xl_info & XLR_BKP_BLOCK(bbi))
! 		{
! 			if (bbi == 2)		/* else we already did it */
! 				(void) RestoreBackupBlock(lsn, record, bbi, false, false);
! 		}
! 		else
  		{
  			buffer = XLogReadBuffer(xldata->node, xldata->blknoParent, false);
  			if (BufferIsValid(buffer))
--- 425,431 ----
  		else
  			bbi = 2;
  
! 		if (!(record->xl_info & XLR_SET_BKP_BLOCK(bbi)))
  		{
  			buffer = XLogReadBuffer(xldata->node, xldata->blknoParent, false);
  			if (BufferIsValid(buffer))
*************** spgRedoSplitTuple(XLogRecPtr lsn, XLogRe
*** 531,546 ****
  	ptr += prefixTuple->size;
  	postfixTuple = (SpGistInnerTuple) ptr;
  
- 	/*
- 	 * In normal operation we would have both pages locked simultaneously; but
- 	 * in WAL replay it should be safe to update them one at a time, as long
- 	 * as we do it in the right order.
- 	 */
- 
  	/* insert postfix tuple first to avoid dangling link */
! 	if (record->xl_info & XLR_BKP_BLOCK(1))
! 		(void) RestoreBackupBlock(lsn, record, 1, false, false);
! 	else if (xldata->blknoPostfix != xldata->blknoPrefix)
  	{
  		buffer = XLogReadBuffer(xldata->node, xldata->blknoPostfix,
  								xldata->newPage);
--- 467,475 ----
  	ptr += prefixTuple->size;
  	postfixTuple = (SpGistInnerTuple) ptr;
  
  	/* insert postfix tuple first to avoid dangling link */
! 	if (xldata->blknoPostfix != xldata->blknoPrefix &&
! 		!(record->xl_info & XLR_BKP_BLOCK_2))
  	{
  		buffer = XLogReadBuffer(xldata->node, xldata->blknoPostfix,
  								xldata->newPage);
*************** spgRedoSplitTuple(XLogRecPtr lsn, XLogRe
*** 566,574 ****
  	}
  
  	/* now handle the original page */
! 	if (record->xl_info & XLR_BKP_BLOCK(0))
! 		(void) RestoreBackupBlock(lsn, record, 0, false, false);
! 	else
  	{
  		buffer = XLogReadBuffer(xldata->node, xldata->blknoPrefix, false);
  		if (BufferIsValid(buffer))
--- 495,501 ----
  	}
  
  	/* now handle the original page */
! 	if (!(record->xl_info & XLR_BKP_BLOCK_1))
  	{
  		buffer = XLogReadBuffer(xldata->node, xldata->blknoPrefix, false);
  		if (BufferIsValid(buffer))
*************** spgRedoPickSplit(XLogRecPtr lsn, XLogRec
*** 608,615 ****
  	uint8	   *leafPageSelect;
  	Buffer		srcBuffer;
  	Buffer		destBuffer;
- 	Page		srcPage;
- 	Page		destPage;
  	Page		page;
  	int			bbi;
  	int			i;
--- 535,540 ----
*************** spgRedoPickSplit(XLogRecPtr lsn, XLogRec
*** 638,651 ****
  	{
  		/* when splitting root, we touch it only in the guise of new inner */
  		srcBuffer = InvalidBuffer;
- 		srcPage = NULL;
  	}
  	else if (xldata->initSrc)
  	{
  		/* just re-init the source page */
  		srcBuffer = XLogReadBuffer(xldata->node, xldata->blknoSrc, true);
  		Assert(BufferIsValid(srcBuffer));
! 		srcPage = (Page) BufferGetPage(srcBuffer);
  
  		SpGistInitBuffer(srcBuffer,
  					 SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
--- 563,575 ----
  	{
  		/* when splitting root, we touch it only in the guise of new inner */
  		srcBuffer = InvalidBuffer;
  	}
  	else if (xldata->initSrc)
  	{
  		/* just re-init the source page */
  		srcBuffer = XLogReadBuffer(xldata->node, xldata->blknoSrc, true);
  		Assert(BufferIsValid(srcBuffer));
! 		page = (Page) BufferGetPage(srcBuffer);
  
  		SpGistInitBuffer(srcBuffer,
  					 SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
*************** spgRedoPickSplit(XLogRecPtr lsn, XLogRec
*** 653,676 ****
  	}
  	else
  	{
! 		/*
! 		 * Delete the specified tuples from source page.  (In case we're in
! 		 * Hot Standby, we need to hold lock on the page till we're done
! 		 * inserting leaf tuples and the new inner tuple, else the added
! 		 * redirect tuple will be a dangling link.)
! 		 */
! 		if (record->xl_info & XLR_BKP_BLOCK(bbi))
! 		{
! 			srcBuffer = RestoreBackupBlock(lsn, record, bbi, false, true);
! 			srcPage = NULL;		/* don't need to do any page updates */
! 		}
! 		else
  		{
  			srcBuffer = XLogReadBuffer(xldata->node, xldata->blknoSrc, false);
  			if (BufferIsValid(srcBuffer))
  			{
! 				srcPage = BufferGetPage(srcBuffer);
! 				if (!XLByteLE(lsn, PageGetLSN(srcPage)))
  				{
  					/*
  					 * We have it a bit easier here than in doPickSplit(),
--- 577,590 ----
  	}
  	else
  	{
! 		/* delete the specified tuples from source page */
! 		if (!(record->xl_info & XLR_SET_BKP_BLOCK(bbi)))
  		{
  			srcBuffer = XLogReadBuffer(xldata->node, xldata->blknoSrc, false);
  			if (BufferIsValid(srcBuffer))
  			{
! 				page = BufferGetPage(srcBuffer);
! 				if (!XLByteLE(lsn, PageGetLSN(page)))
  				{
  					/*
  					 * We have it a bit easier here than in doPickSplit(),
*************** spgRedoPickSplit(XLogRecPtr lsn, XLogRec
*** 678,691 ****
  					 * we can inject the correct redirection tuple now.
  					 */
  					if (!state.isBuild)
! 						spgPageIndexMultiDelete(&state, srcPage,
  												toDelete, xldata->nDelete,
  												SPGIST_REDIRECT,
  												SPGIST_PLACEHOLDER,
  												xldata->blknoInner,
  												xldata->offnumInner);
  					else
! 						spgPageIndexMultiDelete(&state, srcPage,
  												toDelete, xldata->nDelete,
  												SPGIST_PLACEHOLDER,
  												SPGIST_PLACEHOLDER,
--- 592,605 ----
  					 * we can inject the correct redirection tuple now.
  					 */
  					if (!state.isBuild)
! 						spgPageIndexMultiDelete(&state, page,
  												toDelete, xldata->nDelete,
  												SPGIST_REDIRECT,
  												SPGIST_PLACEHOLDER,
  												xldata->blknoInner,
  												xldata->offnumInner);
  					else
! 						spgPageIndexMultiDelete(&state, page,
  												toDelete, xldata->nDelete,
  												SPGIST_PLACEHOLDER,
  												SPGIST_PLACEHOLDER,
*************** spgRedoPickSplit(XLogRecPtr lsn, XLogRec
*** 694,705 ****
  
  					/* don't update LSN etc till we're done with it */
  				}
- 				else
- 					srcPage = NULL;		/* don't do any page updates */
  			}
- 			else
- 				srcPage = NULL;
  		}
  		bbi++;
  	}
  
--- 608,617 ----
  
  					/* don't update LSN etc till we're done with it */
  				}
  			}
  		}
+ 		else
+ 			srcBuffer = InvalidBuffer;
  		bbi++;
  	}
  
*************** spgRedoPickSplit(XLogRecPtr lsn, XLogRec
*** 707,720 ****
  	if (xldata->blknoDest == InvalidBlockNumber)
  	{
  		destBuffer = InvalidBuffer;
- 		destPage = NULL;
  	}
  	else if (xldata->initDest)
  	{
  		/* just re-init the dest page */
  		destBuffer = XLogReadBuffer(xldata->node, xldata->blknoDest, true);
  		Assert(BufferIsValid(destBuffer));
! 		destPage = (Page) BufferGetPage(destBuffer);
  
  		SpGistInitBuffer(destBuffer,
  					 SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
--- 619,631 ----
  	if (xldata->blknoDest == InvalidBlockNumber)
  	{
  		destBuffer = InvalidBuffer;
  	}
  	else if (xldata->initDest)
  	{
  		/* just re-init the dest page */
  		destBuffer = XLogReadBuffer(xldata->node, xldata->blknoDest, true);
  		Assert(BufferIsValid(destBuffer));
! 		page = (Page) BufferGetPage(destBuffer);
  
  		SpGistInitBuffer(destBuffer,
  					 SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
*************** spgRedoPickSplit(XLogRecPtr lsn, XLogRec
*** 722,748 ****
  	}
  	else
  	{
! 		/*
! 		 * We could probably release the page lock immediately in the
! 		 * full-page-image case, but for safety let's hold it till later.
! 		 */
! 		if (record->xl_info & XLR_BKP_BLOCK(bbi))
! 		{
! 			destBuffer = RestoreBackupBlock(lsn, record, bbi, false, true);
! 			destPage = NULL;	/* don't need to do any page updates */
! 		}
! 		else
! 		{
  			destBuffer = XLogReadBuffer(xldata->node, xldata->blknoDest, false);
! 			if (BufferIsValid(destBuffer))
! 			{
! 				destPage = (Page) BufferGetPage(destBuffer);
! 				if (XLByteLE(lsn, PageGetLSN(destPage)))
! 					destPage = NULL;	/* don't do any page updates */
! 			}
! 			else
! 				destPage = NULL;
! 		}
  		bbi++;
  	}
  
--- 633,642 ----
  	}
  	else
  	{
! 		if (!(record->xl_info & XLR_SET_BKP_BLOCK(bbi)))
  			destBuffer = XLogReadBuffer(xldata->node, xldata->blknoDest, false);
! 		else
! 			destBuffer = InvalidBuffer;
  		bbi++;
  	}
  
*************** spgRedoPickSplit(XLogRecPtr lsn, XLogRec
*** 750,783 ****
  	for (i = 0; i < xldata->nInsert; i++)
  	{
  		SpGistLeafTuple lt = (SpGistLeafTuple) ptr;
  
  		ptr += lt->size;
  
! 		page = leafPageSelect[i] ? destPage : srcPage;
! 		if (page == NULL)
  			continue;			/* no need to touch this page */
  
! 		addOrReplaceTuple(page, (Item) lt, lt->size, toInsert[i]);
  	}
  
! 	/* Now update src and dest page LSNs if needed */
! 	if (srcPage != NULL)
  	{
! 		PageSetLSN(srcPage, lsn);
! 		PageSetTLI(srcPage, ThisTimeLineID);
! 		MarkBufferDirty(srcBuffer);
  	}
! 	if (destPage != NULL)
  	{
! 		PageSetLSN(destPage, lsn);
! 		PageSetTLI(destPage, ThisTimeLineID);
! 		MarkBufferDirty(destBuffer);
  	}
  
  	/* restore new inner tuple */
! 	if (record->xl_info & XLR_BKP_BLOCK(bbi))
! 		(void) RestoreBackupBlock(lsn, record, bbi, false, false);
! 	else
  	{
  		Buffer		buffer = XLogReadBuffer(xldata->node, xldata->blknoInner,
  											xldata->initInner);
--- 644,690 ----
  	for (i = 0; i < xldata->nInsert; i++)
  	{
  		SpGistLeafTuple lt = (SpGistLeafTuple) ptr;
+ 		Buffer		leafBuffer;
  
  		ptr += lt->size;
  
! 		leafBuffer = leafPageSelect[i] ? destBuffer : srcBuffer;
! 		if (!BufferIsValid(leafBuffer))
  			continue;			/* no need to touch this page */
+ 		page = BufferGetPage(leafBuffer);
  
! 		if (!XLByteLE(lsn, PageGetLSN(page)))
! 		{
! 			addOrReplaceTuple(page, (Item) lt, lt->size, toInsert[i]);
! 		}
  	}
  
! 	/* Now update src and dest page LSNs */
! 	if (BufferIsValid(srcBuffer))
  	{
! 		page = BufferGetPage(srcBuffer);
! 		if (!XLByteLE(lsn, PageGetLSN(page)))
! 		{
! 			PageSetLSN(page, lsn);
! 			PageSetTLI(page, ThisTimeLineID);
! 			MarkBufferDirty(srcBuffer);
! 		}
! 		UnlockReleaseBuffer(srcBuffer);
  	}
! 	if (BufferIsValid(destBuffer))
  	{
! 		page = BufferGetPage(destBuffer);
! 		if (!XLByteLE(lsn, PageGetLSN(page)))
! 		{
! 			PageSetLSN(page, lsn);
! 			PageSetTLI(page, ThisTimeLineID);
! 			MarkBufferDirty(destBuffer);
! 		}
! 		UnlockReleaseBuffer(destBuffer);
  	}
  
  	/* restore new inner tuple */
! 	if (!(record->xl_info & XLR_SET_BKP_BLOCK(bbi)))
  	{
  		Buffer		buffer = XLogReadBuffer(xldata->node, xldata->blknoInner,
  											xldata->initInner);
*************** spgRedoPickSplit(XLogRecPtr lsn, XLogRec
*** 815,829 ****
  	}
  	bbi++;
  
- 	/*
- 	 * Now we can release the leaf-page locks.	It's okay to do this before
- 	 * updating the parent downlink.
- 	 */
- 	if (BufferIsValid(srcBuffer))
- 		UnlockReleaseBuffer(srcBuffer);
- 	if (BufferIsValid(destBuffer))
- 		UnlockReleaseBuffer(destBuffer);
- 
  	/* update parent downlink, unless we did it above */
  	if (xldata->blknoParent == InvalidBlockNumber)
  	{
--- 722,727 ----
*************** spgRedoPickSplit(XLogRecPtr lsn, XLogRec
*** 832,840 ****
  	}
  	else if (xldata->blknoInner != xldata->blknoParent)
  	{
! 		if (record->xl_info & XLR_BKP_BLOCK(bbi))
! 			(void) RestoreBackupBlock(lsn, record, bbi, false, false);
! 		else
  		{
  			Buffer		buffer = XLogReadBuffer(xldata->node, xldata->blknoParent, false);
  
--- 730,736 ----
  	}
  	else if (xldata->blknoInner != xldata->blknoParent)
  	{
! 		if (!(record->xl_info & XLR_SET_BKP_BLOCK(bbi)))
  		{
  			Buffer		buffer = XLogReadBuffer(xldata->node, xldata->blknoParent, false);
  
*************** spgRedoVacuumLeaf(XLogRecPtr lsn, XLogRe
*** 892,900 ****
  	ptr += sizeof(OffsetNumber) * xldata->nChain;
  	chainDest = (OffsetNumber *) ptr;
  
! 	if (record->xl_info & XLR_BKP_BLOCK(0))
! 		(void) RestoreBackupBlock(lsn, record, 0, false, false);
! 	else
  	{
  		buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
  		if (BufferIsValid(buffer))
--- 788,794 ----
  	ptr += sizeof(OffsetNumber) * xldata->nChain;
  	chainDest = (OffsetNumber *) ptr;
  
! 	if (!(record->xl_info & XLR_BKP_BLOCK_1))
  	{
  		buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
  		if (BufferIsValid(buffer))
*************** spgRedoVacuumRoot(XLogRecPtr lsn, XLogRe
*** 963,971 ****
  	ptr += sizeof(spgxlogVacuumRoot);
  	toDelete = (OffsetNumber *) ptr;
  
! 	if (record->xl_info & XLR_BKP_BLOCK(0))
! 		(void) RestoreBackupBlock(lsn, record, 0, false, false);
! 	else
  	{
  		buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
  		if (BufferIsValid(buffer))
--- 857,863 ----
  	ptr += sizeof(spgxlogVacuumRoot);
  	toDelete = (OffsetNumber *) ptr;
  
! 	if (!(record->xl_info & XLR_BKP_BLOCK_1))
  	{
  		buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
  		if (BufferIsValid(buffer))
*************** spgRedoVacuumRedirect(XLogRecPtr lsn, XL
*** 997,1016 ****
  	ptr += sizeof(spgxlogVacuumRedirect);
  	itemToPlaceholder = (OffsetNumber *) ptr;
  
! 	/*
! 	 * If any redirection tuples are being removed, make sure there are no
! 	 * live Hot Standby transactions that might need to see them.
! 	 */
! 	if (InHotStandby)
! 	{
! 		if (TransactionIdIsValid(xldata->newestRedirectXid))
! 			ResolveRecoveryConflictWithSnapshot(xldata->newestRedirectXid,
! 												xldata->node);
! 	}
! 
! 	if (record->xl_info & XLR_BKP_BLOCK(0))
! 		(void) RestoreBackupBlock(lsn, record, 0, false, false);
! 	else
  	{
  		buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
  
--- 889,895 ----
  	ptr += sizeof(spgxlogVacuumRedirect);
  	itemToPlaceholder = (OffsetNumber *) ptr;
  
! 	if (!(record->xl_info & XLR_BKP_BLOCK_1))
  	{
  		buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
  
*************** spg_redo(XLogRecPtr lsn, XLogRecord *rec
*** 1075,1080 ****
--- 954,989 ----
  	uint8		info = record->xl_info & ~XLR_INFO_MASK;
  	MemoryContext oldCxt;
  
+ 	/*
+ 	 * If we have any conflict processing to do, it must happen before we
+ 	 * update the page.
+ 	 */
+ 	if (InHotStandby)
+ 	{
+ 		switch (info)
+ 		{
+ 			case XLOG_SPGIST_VACUUM_REDIRECT:
+ 				{
+ 					spgxlogVacuumRedirect *xldata =
+ 					(spgxlogVacuumRedirect *) XLogRecGetData(record);
+ 
+ 					/*
+ 					 * If any redirection tuples are being removed, make sure
+ 					 * there are no live Hot Standby transactions that might
+ 					 * need to see them.
+ 					 */
+ 					if (TransactionIdIsValid(xldata->newestRedirectXid))
+ 						ResolveRecoveryConflictWithSnapshot(xldata->newestRedirectXid,
+ 															xldata->node);
+ 					break;
+ 				}
+ 			default:
+ 				break;
+ 		}
+ 	}
+ 
+ 	RestoreBkpBlocks(lsn, record, false);
+ 
  	oldCxt = MemoryContextSwitchTo(opCtx);
  	switch (info)
  	{
diff --git a/src/backend/access/transam/README b/src/backend/access/transam/README
new file mode 100644
index f8ebf57..573c9ad
*** a/src/backend/access/transam/README
--- b/src/backend/access/transam/README
*************** critical section.)
*** 438,446 ****
  4. Mark the shared buffer(s) as dirty with MarkBufferDirty().  (This must
  happen before the WAL record is inserted; see notes in SyncOneBuffer().)
  
! 5. If the relation requires WAL-logging, build a WAL log record and pass it
! to XLogInsert(); then update the page's LSN and TLI using the returned XLOG
! location.  For instance,
  
  		recptr = XLogInsert(rmgr_id, info, rdata);
  
--- 438,445 ----
  4. Mark the shared buffer(s) as dirty with MarkBufferDirty().  (This must
  happen before the WAL record is inserted; see notes in SyncOneBuffer().)
  
! 5. Build a WAL log record and pass it to XLogInsert(); then update the page's
! LSN and TLI using the returned XLOG location.  For instance,
  
  		recptr = XLogInsert(rmgr_id, info, rdata);
  
*************** which buffers were handled that way ---
*** 467,475 ****
  what the XLOG record actually contains.  XLOG records that describe multi-page
  changes therefore require some care to design: you must be certain that you
  know what data is indicated by each "BKP" bit.  An example of the trickiness
! is that in a HEAP_UPDATE record, BKP(0) normally is associated with the source
! page and BKP(1) is associated with the destination page --- but if these are
! the same page, only BKP(0) would have been set.
  
  For this reason as well as the risk of deadlocking on buffer locks, it's best
  to design WAL records so that they reflect small atomic actions involving just
--- 466,474 ----
  what the XLOG record actually contains.  XLOG records that describe multi-page
  changes therefore require some care to design: you must be certain that you
  know what data is indicated by each "BKP" bit.  An example of the trickiness
! is that in a HEAP_UPDATE record, BKP(1) normally is associated with the source
! page and BKP(2) is associated with the destination page --- but if these are
! the same page, only BKP(1) would have been set.
  
  For this reason as well as the risk of deadlocking on buffer locks, it's best
  to design WAL records so that they reflect small atomic actions involving just
*************** incrementally update the page, the rdata
*** 498,516 ****
  ID at least once; otherwise there is no defense against torn-page problems.
  The standard replay-routine pattern for this case is
  
! 	if (record->xl_info & XLR_BKP_BLOCK(N))
! 	{
! 		/* apply the change from the full-page image */
! 		(void) RestoreBackupBlock(lsn, record, N, false, false);
! 		return;
! 	}
  
  	buffer = XLogReadBuffer(rnode, blkno, false);
  	if (!BufferIsValid(buffer))
! 	{
! 		/* page has been deleted, so we need do nothing */
! 		return;
! 	}
  	page = (Page) BufferGetPage(buffer);
  
  	if (XLByteLE(lsn, PageGetLSN(page)))
--- 497,508 ----
  ID at least once; otherwise there is no defense against torn-page problems.
  The standard replay-routine pattern for this case is
  
! 	if (record->xl_info & XLR_BKP_BLOCK_n)
! 		<< do nothing, page was rewritten from logged copy >>;
  
  	buffer = XLogReadBuffer(rnode, blkno, false);
  	if (!BufferIsValid(buffer))
! 		<< do nothing, page has been deleted >>;
  	page = (Page) BufferGetPage(buffer);
  
  	if (XLByteLE(lsn, PageGetLSN(page)))
*************** The standard replay-routine pattern for
*** 528,569 ****
  	UnlockReleaseBuffer(buffer);
  
  As noted above, for a multi-page update you need to be able to determine
! which XLR_BKP_BLOCK(N) flag applies to each page.  If a WAL record reflects
  a combination of fully-rewritable and incremental updates, then the rewritable
! pages don't count for the XLR_BKP_BLOCK(N) numbering.  (XLR_BKP_BLOCK(N) is
! associated with the N'th distinct buffer ID seen in the "rdata" array, and
  per the above discussion, fully-rewritable buffers shouldn't be mentioned in
  "rdata".)
  
- When replaying a WAL record that describes changes on multiple pages, you
- must be careful to lock the pages properly to prevent concurrent Hot Standby
- queries from seeing an inconsistent state.  If this requires that two
- or more buffer locks be held concurrently, the coding pattern shown above
- is too simplistic, since it assumes the routine can exit as soon as it's
- known the current page requires no modification.  Instead, you might have
- something like
- 
- 	if (record->xl_info & XLR_BKP_BLOCK(0))
- 	{
- 		/* apply the change from the full-page image */
- 		buffer0 = RestoreBackupBlock(lsn, record, 0, false, true);
- 	}
- 	else
- 	{
- 		buffer0 = XLogReadBuffer(rnode, blkno, false);
- 		if (BufferIsValid(buffer0))
- 		{
- 			... apply the change if not already done ...
- 			MarkBufferDirty(buffer0);
- 		}
- 	}
- 
- 	... similarly apply the changes for remaining pages ...
- 
- 	/* and now we can release the lock on the first page */
- 	if (BufferIsValid(buffer0))
- 		UnlockReleaseBuffer(buffer0);
- 
  Due to all these constraints, complex changes (such as a multilevel index
  insertion) normally need to be described by a series of atomic-action WAL
  records.  What do you do if the intermediate states are not self-consistent?
--- 520,532 ----
  	UnlockReleaseBuffer(buffer);
  
  As noted above, for a multi-page update you need to be able to determine
! which XLR_BKP_BLOCK_n flag applies to each page.  If a WAL record reflects
  a combination of fully-rewritable and incremental updates, then the rewritable
! pages don't count for the XLR_BKP_BLOCK_n numbering.  (XLR_BKP_BLOCK_n is
! associated with the n'th distinct buffer ID seen in the "rdata" array, and
  per the above discussion, fully-rewritable buffers shouldn't be mentioned in
  "rdata".)
  
  Due to all these constraints, complex changes (such as a multilevel index
  insertion) normally need to be described by a series of atomic-action WAL
  records.  What do you do if the intermediate states are not self-consistent?
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
new file mode 100644
index 1faf666..c541b5a
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
*************** begin:;
*** 835,842 ****
  	 * At the exit of this loop, write_len includes the backup block data.
  	 *
  	 * Also set the appropriate info bits to show which buffers were backed
! 	 * up. The XLR_BKP_BLOCK(N) bit corresponds to the N'th distinct buffer
! 	 * value (ignoring InvalidBuffer) appearing in the rdata chain.
  	 */
  	rdt_lastnormal = rdt;
  	write_len = len;
--- 835,842 ----
  	 * At the exit of this loop, write_len includes the backup block data.
  	 *
  	 * Also set the appropriate info bits to show which buffers were backed
! 	 * up. The i'th XLR_SET_BKP_BLOCK bit corresponds to the i'th distinct
! 	 * buffer value (ignoring InvalidBuffer) appearing in the rdata chain.
  	 */
  	rdt_lastnormal = rdt;
  	write_len = len;
*************** begin:;
*** 848,854 ****
  		if (!dtbuf_bkp[i])
  			continue;
  
! 		info |= XLR_BKP_BLOCK(i);
  
  		bkpb = &(dtbuf_xlg[i]);
  		page = (char *) BufferGetBlock(dtbuf[i]);
--- 848,854 ----
  		if (!dtbuf_bkp[i])
  			continue;
  
! 		info |= XLR_SET_BKP_BLOCK(i);
  
  		bkpb = &(dtbuf_xlg[i]);
  		page = (char *) BufferGetBlock(dtbuf[i]);
*************** CleanupBackupHistory(void)
*** 3080,3095 ****
  }
  
  /*
!  * Restore a full-page image from a backup block attached to an XLOG record.
!  *
!  * lsn: LSN of the XLOG record being replayed
!  * record: the complete XLOG record
!  * block_index: which backup block to restore (0 .. XLR_MAX_BKP_BLOCKS - 1)
!  * get_cleanup_lock: TRUE to get a cleanup rather than plain exclusive lock
!  * keep_buffer: TRUE to return the buffer still locked and pinned
   *
!  * Returns the buffer number containing the page.  Note this is not terribly
!  * useful unless keep_buffer is specified as TRUE.
   *
   * Note: when a backup block is available in XLOG, we restore it
   * unconditionally, even if the page in the database appears newer.
--- 3080,3088 ----
  }
  
  /*
!  * Restore the backup blocks present in an XLOG record, if any.
   *
!  * We assume all of the record has been read into memory at *record.
   *
   * Note: when a backup block is available in XLOG, we restore it
   * unconditionally, even if the page in the database appears newer.
*************** CleanupBackupHistory(void)
*** 3100,3119 ****
   * modifications of the page that appear in XLOG, rather than possibly
   * ignoring them as already applied, but that's not a huge drawback.
   *
!  * If 'get_cleanup_lock' is true, a cleanup lock is obtained on the buffer,
!  * else a normal exclusive lock is used.  During crash recovery, that's just
!  * pro forma because there can't be any regular backends in the system, but
!  * in hot standby mode the distinction is important.
!  *
!  * If 'keep_buffer' is true, return without releasing the buffer lock and pin;
!  * then caller is responsible for doing UnlockReleaseBuffer() later.  This
!  * is needed in some cases when replaying XLOG records that touch multiple
!  * pages, to prevent inconsistent states from being visible to other backends.
!  * (Again, that's only important in hot standby mode.)
   */
! Buffer
! RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record, int block_index,
! 				   bool get_cleanup_lock, bool keep_buffer)
  {
  	Buffer		buffer;
  	Page		page;
--- 3093,3107 ----
   * modifications of the page that appear in XLOG, rather than possibly
   * ignoring them as already applied, but that's not a huge drawback.
   *
!  * If 'cleanup' is true, a cleanup lock is used when restoring blocks.
!  * Otherwise, a normal exclusive lock is used.	During crash recovery, that's
!  * just pro forma because there can't be any regular backends in the system,
!  * but in hot standby mode the distinction is important. The 'cleanup'
!  * argument applies to all backup blocks in the WAL record, that suffices for
!  * now.
   */
! void
! RestoreBkpBlocks(XLogRecPtr lsn, XLogRecord *record, bool cleanup)
  {
  	Buffer		buffer;
  	Page		page;
*************** RestoreBackupBlock(XLogRecPtr lsn, XLogR
*** 3121,3179 ****
  	char	   *blk;
  	int			i;
  
! 	/* Locate requested BkpBlock in the record */
  	blk = (char *) XLogRecGetData(record) + record->xl_len;
  	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
  	{
! 		if (!(record->xl_info & XLR_BKP_BLOCK(i)))
  			continue;
  
  		memcpy(&bkpb, blk, sizeof(BkpBlock));
  		blk += sizeof(BkpBlock);
  
! 		if (i == block_index)
! 		{
! 			/* Found it, apply the update */
! 			buffer = XLogReadBufferExtended(bkpb.node, bkpb.fork, bkpb.block,
! 											RBM_ZERO);
! 			Assert(BufferIsValid(buffer));
! 			if (get_cleanup_lock)
! 				LockBufferForCleanup(buffer);
! 			else
! 				LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
! 
! 			page = (Page) BufferGetPage(buffer);
! 
! 			if (bkpb.hole_length == 0)
! 			{
! 				memcpy((char *) page, blk, BLCKSZ);
! 			}
! 			else
! 			{
! 				memcpy((char *) page, blk, bkpb.hole_offset);
! 				/* must zero-fill the hole */
! 				MemSet((char *) page + bkpb.hole_offset, 0, bkpb.hole_length);
! 				memcpy((char *) page + (bkpb.hole_offset + bkpb.hole_length),
! 					   blk + bkpb.hole_offset,
! 					   BLCKSZ - (bkpb.hole_offset + bkpb.hole_length));
! 			}
! 
! 			PageSetLSN(page, lsn);
! 			PageSetTLI(page, ThisTimeLineID);
! 			MarkBufferDirty(buffer);
  
! 			if (!keep_buffer)
! 				UnlockReleaseBuffer(buffer);
  
! 			return buffer;
  		}
  
  		blk += BLCKSZ - bkpb.hole_length;
  	}
- 
- 	/* Caller specified a bogus block_index */
- 	elog(ERROR, "failed to restore block_index %d", block_index);
- 	return InvalidBuffer;		/* keep compiler quiet */
  }
  
  /*
--- 3109,3157 ----
  	char	   *blk;
  	int			i;
  
! 	if (!(record->xl_info & XLR_BKP_BLOCK_MASK))
! 		return;
! 
  	blk = (char *) XLogRecGetData(record) + record->xl_len;
  	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
  	{
! 		if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
  			continue;
  
  		memcpy(&bkpb, blk, sizeof(BkpBlock));
  		blk += sizeof(BkpBlock);
  
! 		buffer = XLogReadBufferExtended(bkpb.node, bkpb.fork, bkpb.block,
! 										RBM_ZERO);
! 		Assert(BufferIsValid(buffer));
! 		if (cleanup)
! 			LockBufferForCleanup(buffer);
! 		else
! 			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
  
! 		page = (Page) BufferGetPage(buffer);
  
! 		if (bkpb.hole_length == 0)
! 		{
! 			memcpy((char *) page, blk, BLCKSZ);
! 		}
! 		else
! 		{
! 			memcpy((char *) page, blk, bkpb.hole_offset);
! 			/* must zero-fill the hole */
! 			MemSet((char *) page + bkpb.hole_offset, 0, bkpb.hole_length);
! 			memcpy((char *) page + (bkpb.hole_offset + bkpb.hole_length),
! 				   blk + bkpb.hole_offset,
! 				   BLCKSZ - (bkpb.hole_offset + bkpb.hole_length));
  		}
  
+ 		PageSetLSN(page, lsn);
+ 		PageSetTLI(page, ThisTimeLineID);
+ 		MarkBufferDirty(buffer);
+ 		UnlockReleaseBuffer(buffer);
+ 
  		blk += BLCKSZ - bkpb.hole_length;
  	}
  }
  
  /*
*************** RecordIsValid(XLogRecord *record, XLogRe
*** 3215,3221 ****
  	{
  		uint32		blen;
  
! 		if (!(record->xl_info & XLR_BKP_BLOCK(i)))
  			continue;
  
  		if (remaining < sizeof(BkpBlock))
--- 3193,3199 ----
  	{
  		uint32		blen;
  
! 		if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
  			continue;
  
  		if (remaining < sizeof(BkpBlock))
*************** xlog_outrec(StringInfo buf, XLogRecord *
*** 8103,8110 ****
  	int			i;
  
  	appendStringInfo(buf, "prev %X/%X; xid %u",
! 					 (uint32) (record->xl_prev >> 32),
! 					 (uint32) record->xl_prev,
  					 record->xl_xid);
  
  	appendStringInfo(buf, "; len %u",
--- 8081,8087 ----
  	int			i;
  
  	appendStringInfo(buf, "prev %X/%X; xid %u",
! 					 (uint32) (record->xl_prev >> 32), (uint32) record->xl_prev,
  					 record->xl_xid);
  
  	appendStringInfo(buf, "; len %u",
*************** xlog_outrec(StringInfo buf, XLogRecord *
*** 8112,8119 ****
  
  	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
  	{
! 		if (record->xl_info & XLR_BKP_BLOCK(i))
! 			appendStringInfo(buf, "; bkpb%d", i);
  	}
  
  	appendStringInfo(buf, ": %s", RmgrTable[record->xl_rmid].rm_name);
--- 8089,8096 ----
  
  	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
  	{
! 		if (record->xl_info & XLR_SET_BKP_BLOCK(i))
! 			appendStringInfo(buf, "; bkpb%d", i + 1);
  	}
  
  	appendStringInfo(buf, ": %s", RmgrTable[record->xl_rmid].rm_name);
diff --git a/src/include/access/gist_private.h b/src/include/access/gist_private.h
new file mode 100644
index 1e8eabd..52877ae
*** a/src/include/access/gist_private.h
--- b/src/include/access/gist_private.h
*************** typedef GISTScanOpaqueData *GISTScanOpaq
*** 167,173 ****
  #define XLOG_GIST_PAGE_SPLIT		0x30
   /* #define XLOG_GIST_INSERT_COMPLETE	 0x40 */	/* not used anymore */
  #define XLOG_GIST_CREATE_INDEX		0x50
!  /* #define XLOG_GIST_PAGE_DELETE		 0x60 */	/* not used anymore */
  
  typedef struct gistxlogPageUpdate
  {
--- 167,173 ----
  #define XLOG_GIST_PAGE_SPLIT		0x30
   /* #define XLOG_GIST_INSERT_COMPLETE	 0x40 */	/* not used anymore */
  #define XLOG_GIST_CREATE_INDEX		0x50
! #define XLOG_GIST_PAGE_DELETE		0x60
  
  typedef struct gistxlogPageUpdate
  {
*************** typedef struct gistxlogPage
*** 211,216 ****
--- 211,222 ----
  	int			num;			/* number of index tuples following */
  } gistxlogPage;
  
+ typedef struct gistxlogPageDelete
+ {
+ 	RelFileNode node;
+ 	BlockNumber blkno;
+ } gistxlogPageDelete;
+ 
  /* SplitedPageLayout - gistSplit function result */
  typedef struct SplitedPageLayout
  {
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
new file mode 100644
index 32c2e40..2893f3b
*** a/src/include/access/xlog.h
--- b/src/include/access/xlog.h
*************** typedef struct XLogRecord
*** 71,77 ****
   */
  #define XLR_BKP_BLOCK_MASK		0x0F	/* all info bits used for bkp blocks */
  #define XLR_MAX_BKP_BLOCKS		4
! #define XLR_BKP_BLOCK(iblk)		(0x08 >> (iblk))		/* iblk in 0..3 */
  
  /* Sync methods */
  #define SYNC_METHOD_FSYNC		0
--- 71,81 ----
   */
  #define XLR_BKP_BLOCK_MASK		0x0F	/* all info bits used for bkp blocks */
  #define XLR_MAX_BKP_BLOCKS		4
! #define XLR_SET_BKP_BLOCK(iblk) (0x08 >> (iblk))
! #define XLR_BKP_BLOCK_1			XLR_SET_BKP_BLOCK(0)	/* 0x08 */
! #define XLR_BKP_BLOCK_2			XLR_SET_BKP_BLOCK(1)	/* 0x04 */
! #define XLR_BKP_BLOCK_3			XLR_SET_BKP_BLOCK(2)	/* 0x02 */
! #define XLR_BKP_BLOCK_4			XLR_SET_BKP_BLOCK(3)	/* 0x01 */
  
  /* Sync methods */
  #define SYNC_METHOD_FSYNC		0
*************** extern int	sync_method;
*** 90,102 ****
   * If buffer is valid then XLOG will check if buffer must be backed up
   * (ie, whether this is first change of that page since last checkpoint).
   * If so, the whole page contents are attached to the XLOG record, and XLOG
!  * sets XLR_BKP_BLOCK(N) bit in xl_info.  Note that the buffer must be pinned
   * and exclusive-locked by the caller, so that it won't change under us.
   * NB: when the buffer is backed up, we DO NOT insert the data pointed to by
   * this XLogRecData struct into the XLOG record, since we assume it's present
   * in the buffer.  Therefore, rmgr redo routines MUST pay attention to
!  * XLR_BKP_BLOCK(N) to know what is actually stored in the XLOG record.
!  * The N'th XLR_BKP_BLOCK bit corresponds to the N'th distinct buffer
   * value (ignoring InvalidBuffer) appearing in the rdata chain.
   *
   * When buffer is valid, caller must set buffer_std to indicate whether the
--- 94,106 ----
   * If buffer is valid then XLOG will check if buffer must be backed up
   * (ie, whether this is first change of that page since last checkpoint).
   * If so, the whole page contents are attached to the XLOG record, and XLOG
!  * sets XLR_BKP_BLOCK_X bit in xl_info.  Note that the buffer must be pinned
   * and exclusive-locked by the caller, so that it won't change under us.
   * NB: when the buffer is backed up, we DO NOT insert the data pointed to by
   * this XLogRecData struct into the XLOG record, since we assume it's present
   * in the buffer.  Therefore, rmgr redo routines MUST pay attention to
!  * XLR_BKP_BLOCK_X to know what is actually stored in the XLOG record.
!  * The i'th XLR_BKP_BLOCK bit corresponds to the i'th distinct buffer
   * value (ignoring InvalidBuffer) appearing in the rdata chain.
   *
   * When buffer is valid, caller must set buffer_std to indicate whether the
*************** extern int	XLogFileOpen(XLogSegNo segno)
*** 270,278 ****
  extern void XLogGetLastRemoved(XLogSegNo *segno);
  extern void XLogSetAsyncXactLSN(XLogRecPtr record);
  
! extern Buffer RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record,
! 				   int block_index,
! 				   bool get_cleanup_lock, bool keep_buffer);
  
  extern void xlog_redo(XLogRecPtr lsn, XLogRecord *record);
  extern void xlog_desc(StringInfo buf, uint8 xl_info, char *rec);
--- 274,280 ----
  extern void XLogGetLastRemoved(XLogSegNo *segno);
  extern void XLogSetAsyncXactLSN(XLogRecPtr record);
  
! extern void RestoreBkpBlocks(XLogRecPtr lsn, XLogRecord *record, bool cleanup);
  
  extern void xlog_redo(XLogRecPtr lsn, XLogRecord *record);
  extern void xlog_desc(StringInfo buf, uint8 xl_info, char *rec);
