WIP: relation metapages

Started by Robert Haasover 13 years ago3 messages
#1Robert Haas
robertmhaas@gmail.com
1 attachment(s)

Here's a WIP patch implementing metapages for all relations, somewhat
along lines previously discussed:

http://archives.postgresql.org/pgsql-hackers/2012-05/msg00860.php

It turns out that doing this for indexes was pretty easy and didn't
obviously break anything; doing it for heaps was harder and broke a
lot of stuff. If you apply the patch as attached here, you'll find
that we fail a whole bunch of regression tests, mostly due to plan
changes. It seems that having N+1 pages in the heap changes the
optimal way to do... everything. Of course, the extra page need not
be included in seq-scans, so you'd think this was mostly a matter of
adjusting the costing functions to reduce the number of pages by 1 for
costing purposes. However, so far I haven't been able to hack the
costing to make the plan changes go away, though, which may be a sign
that I've broken something else. I can't seem to make Merge Append
work at all, which is maybe a better sign that I've broken something.
If you want to see the patch pass regression tests, hack
heap_create_storage not to emit a metapage for heaps and all the
regression test failures disappear.

What I'm really looking for at this stage of the game is feedback on
the design decisions I made. The intention here is that it should be
possible to read old-format heaps and indexes transparently, but that
when we create or rewrite a relation, we add a new-style metapage.
For all index types except gist, this is really just a format change
for the metapage that already existed: the new data that gets stored
for all relation types is added at the beginning of the page, just
following the page header, and then the AM-specific stuff is moved
further down the page. For GiST, it means adding a metapage that
wasn't there before, but that went smoothly too. For some AMs, I had
to rejigger the WAL-logging a little; review of those changes would be
good. The basic idea is that we don't want to have to try to
reconstruct what the metapage should have been during recovery
(indeed, we can't) so we just log an image of the page instead.

For heaps, I refactored things so that heap_create() is no longer used
for indexes. Instead, index_create() calls RelationBuildLocalRelation
directly. This required moving a little bit of logic from
heap_create() into RelationBuildLocalRelation(), but it seems like it
may fit better there anyway. That means that heap_create() can now
assume that it's creating a heap and not an index. This refactoring
might be worth pulling out of the patch and committing separately,
since I think the result is actually simpler and cleaner than what
we're doing now; but it's a minor point in any case.

I put the new metapage code in src/backend/access/common/metapage.c,
but I don't have a lot of confidence that that's the appropriate
location for it. Suggestions are appreciated.

I am pretty sure that clustering a relation will cause it to end up
with the wrong relation ID in its metapage afterwards. Since nothing
relies on that information at this point, this shouldn't break
anything, but it needs to be fixed eventually.

I think the thing I'm most worried about is the plan changes that
result from adding heap metapages. Suggestions on what to do about
that from a costing perspective would be particularly appreciated.

Thanks,

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

Attachments:

metapage-v1.patchapplication/octet-stream; name=metapage-v1.patchDownload
*** a/src/backend/access/common/Makefile
--- b/src/backend/access/common/Makefile
***************
*** 12,18 **** subdir = src/backend/access/common
  top_builddir = ../../../..
  include $(top_builddir)/src/Makefile.global
  
! OBJS = heaptuple.o indextuple.o printtup.o reloptions.o scankey.o \
  	tupconvert.o tupdesc.o
  
  include $(top_srcdir)/src/backend/common.mk
--- 12,18 ----
  top_builddir = ../../../..
  include $(top_builddir)/src/Makefile.global
  
! OBJS = heaptuple.o indextuple.o metapage.o printtup.o reloptions.o scankey.o \
  	tupconvert.o tupdesc.o
  
  include $(top_srcdir)/src/backend/common.mk
*** /dev/null
--- b/src/backend/access/common/metapage.c
***************
*** 0 ****
--- 1,138 ----
+ /*-------------------------------------------------------------------------
+  *
+  * metapage.c
+  *	  Generic relation metapage support.
+  *
+  * Prior to PostgreSQL 9.3, some relation types (heap, gist) had no
+  * metapage at all, while others (btree, gin, hash, spgist) had only an
+  * access-method specific metapage.  Beginning in PostgreSQL 9.3, all
+  * relations have a metapage.  We reserve the first 512 bytes of each
+  * metablock for the page header and generic metadata that applies to all
+  * relation types; the remainder of the page may be used by each individual
+  * access method for its own purposes.
+  *
+  * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994, Regents of the University of California
+  *
+  *
+  * IDENTIFICATION
+  *	  src/backend/access/common/heaptuple.c
+  *
+  *-------------------------------------------------------------------------
+  */
+ 
+ #include "postgres.h"
+ 
+ #include <time.h>
+ 
+ #include "access/metapage.h"
+ #include "storage/buf.h"
+ #include "storage/bufmgr.h"
+ #include "utils/memutils.h"
+ 
+ /*
+  * Initialize a relation metapage.
+  */
+ void
+ MetapageInit(Relation relation, Page page)
+ {
+ 	RelationMetapage	meta;
+ 
+ 	if (relation->rd_metapage == NULL)
+ 		RelationBuildMetadata(relation);
+ 	PageSetRelationMetapage(page);
+ 	meta = BlindGetRelationMeta(page);
+ 	memcpy(meta, relation->rd_metapage, sizeof(RelationMetapageData));
+ }
+ 
+ /*
+  * Build metadata for a new relation.
+  */
+ void
+ RelationBuildMetadata(Relation relation)
+ {
+ 	RelationMetapage	meta;
+ 
+ 	/* The metadata shouldn't already be initialized. */
+ 	Assert(relation->rd_metapage == NULL);
+ 
+ 	/* Allocate memory for the data. */
+ 	meta = (RelationMetapage) MemoryContextAllocZero(CacheMemoryContext,
+ 		sizeof(RelationMetapageData));
+ 
+ 	/* Initialize. */
+ 	meta->rmp_magic = METAPAGE_MAGIC;
+ 	meta->rmp_version = METAPAGE_VERSION;
+ 	meta->rmp_dboid = relation->rd_node.dbNode;
+ 	meta->rmp_tsoid = relation->rd_node.spcNode;
+ 	meta->rmp_reloid = RelationGetRelid(relation);
+ 	meta->rmp_relfilenode = relation->rd_node.relNode;
+ 	meta->rmp_flags = 0;
+ 	meta->rmp_minlayout = PG_PAGE_LAYOUT_VERSION;
+ 	meta->rmp_maxlayout = PG_PAGE_LAYOUT_VERSION;
+ 	meta->rmp_relfilenode_time = (pg_time_t) time(NULL);
+ 
+ 	/* Save it. */
+ 	relation->rd_metapage = meta;
+ }
+ 
+ /*
+  * Get metadata for a relation.
+  */
+ RelationMetapage
+ RelationGetMetadata(Relation relation)
+ {
+ 	Buffer		buffer;
+ 	Page		page;
+ 	RelationMetapage	meta;
+ 
+ 	/* If data is already cached, just return it. */
+ 	if (relation->rd_metapage != NULL)
+ 		return relation->rd_metapage;
+ 
+ 	/* Allocate memory for the data. */
+ 	meta = (RelationMetapage) MemoryContextAllocZero(CacheMemoryContext,
+ 		sizeof(RelationMetapageData));
+ 
+ 	/*
+ 	 * If the relation has a metapage, read it.
+ 	 *
+ 	 * XXX: It's pretty annoying to have to call RelationGetNumberOfBlocks.
+ 	 * Can't we have an RBM_FAIL option for ReadBufferExtended?
+ 	 */
+ 	if (RelationGetNumberOfBlocks(relation) > METAPAGE_BLKNO)
+ 	{
+ 		buffer = ReadBuffer(relation, METAPAGE_BLKNO);
+ 		page = BufferGetPage(buffer);
+ 		LockBuffer(buffer, BUFFER_LOCK_SHARE);
+ 
+ 		/* If it's a metapage, copy it, cache it, and return it. */
+ 		if (PageIsRelationMetapage(page))
+ 		{
+ 			memcpy(meta, BlindGetRelationMeta(page),
+ 				   sizeof(RelationMetapageData));
+ 			UnlockReleaseBuffer(buffer);
+ 			relation->rd_metapage = meta;
+ 			return meta;
+ 		}
+ 
+ 		/* It wasn't actually a metapage, so let it go. */
+ 		UnlockReleaseBuffer(buffer);
+ 	}
+ 
+ 	/* There's no real metapage, so create a fake one. */
+ 	meta->rmp_magic = METAPAGE_MAGIC;
+ 	meta->rmp_version = 0;				/* fake metapage */
+ 	meta->rmp_dboid = relation->rd_node.dbNode;
+ 	meta->rmp_tsoid = relation->rd_node.spcNode;
+ 	meta->rmp_reloid = RelationGetRelid(relation);
+ 	meta->rmp_relfilenode = relation->rd_node.relNode;
+ 	meta->rmp_flags = 0;
+ 	meta->rmp_minlayout = PG_PAGE_LAYOUT_VERSION;
+ 	meta->rmp_maxlayout = PG_PAGE_LAYOUT_VERSION;
+ 	meta->rmp_relfilenode_time = 0;		/* unknown creation time */
+ 
+ 	/* Cache result for next time, and return it. */
+ 	relation->rd_metapage = meta;
+ 	return meta;
+ }
*** a/src/backend/access/gin/gininsert.c
--- b/src/backend/access/gin/gininsert.c
***************
*** 410,440 **** ginbuild(PG_FUNCTION_ARGS)
  	RootBuffer = GinNewBuffer(index);
  
  	START_CRIT_SECTION();
! 	GinInitMetabuffer(MetaBuffer);
  	MarkBufferDirty(MetaBuffer);
  	GinInitBuffer(RootBuffer, GIN_LEAF);
  	MarkBufferDirty(RootBuffer);
  
  	if (RelationNeedsWAL(index))
  	{
! 		XLogRecPtr	recptr;
! 		XLogRecData rdata;
! 		Page		page;
! 
! 		rdata.buffer = InvalidBuffer;
! 		rdata.data = (char *) &(index->rd_node);
! 		rdata.len = sizeof(RelFileNode);
! 		rdata.next = NULL;
! 
! 		recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_CREATE_INDEX, &rdata);
! 
! 		page = BufferGetPage(RootBuffer);
! 		PageSetLSN(page, recptr);
! 		PageSetTLI(page, ThisTimeLineID);
! 
! 		page = BufferGetPage(MetaBuffer);
! 		PageSetLSN(page, recptr);
! 		PageSetTLI(page, ThisTimeLineID);
  	}
  
  	UnlockReleaseBuffer(MetaBuffer);
--- 410,424 ----
  	RootBuffer = GinNewBuffer(index);
  
  	START_CRIT_SECTION();
! 	GinInitMetabuffer(index, MetaBuffer);
  	MarkBufferDirty(MetaBuffer);
  	GinInitBuffer(RootBuffer, GIN_LEAF);
  	MarkBufferDirty(RootBuffer);
  
  	if (RelationNeedsWAL(index))
  	{
! 		log_newpage_buffer(MetaBuffer);
! 		log_newpage_buffer(RootBuffer);
  	}
  
  	UnlockReleaseBuffer(MetaBuffer);
***************
*** 522,528 **** ginbuildempty(PG_FUNCTION_ARGS)
  
  	/* Initialize and xlog metabuffer and root buffer. */
  	START_CRIT_SECTION();
! 	GinInitMetabuffer(MetaBuffer);
  	MarkBufferDirty(MetaBuffer);
  	log_newpage_buffer(MetaBuffer);
  	GinInitBuffer(RootBuffer, GIN_LEAF);
--- 506,512 ----
  
  	/* Initialize and xlog metabuffer and root buffer. */
  	START_CRIT_SECTION();
! 	GinInitMetabuffer(index, MetaBuffer);
  	MarkBufferDirty(MetaBuffer);
  	log_newpage_buffer(MetaBuffer);
  	GinInitBuffer(RootBuffer, GIN_LEAF);
*** a/src/backend/access/gin/ginutil.c
--- b/src/backend/access/gin/ginutil.c
***************
*** 253,264 **** GinInitBuffer(Buffer b, uint32 f)
  }
  
  void
! GinInitMetabuffer(Buffer b)
  {
  	GinMetaPageData *metadata;
  	Page		page = BufferGetPage(b);
  
  	GinInitPage(page, GIN_META, BufferGetPageSize(b));
  
  	metadata = GinPageGetMeta(page);
  
--- 253,265 ----
  }
  
  void
! GinInitMetabuffer(Relation rel, Buffer b)
  {
  	GinMetaPageData *metadata;
  	Page		page = BufferGetPage(b);
  
  	GinInitPage(page, GIN_META, BufferGetPageSize(b));
+ 	MetapageInit(rel, page);
  
  	metadata = GinPageGetMeta(page);
  
*** a/src/backend/access/gin/ginxlog.c
--- b/src/backend/access/gin/ginxlog.c
***************
*** 70,107 **** forgetIncompleteSplit(RelFileNode node, BlockNumber leftBlkno, BlockNumber updat
  }
  
  static void
- ginRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
- {
- 	RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
- 	Buffer		RootBuffer,
- 				MetaBuffer;
- 	Page		page;
- 
- 	MetaBuffer = XLogReadBuffer(*node, GIN_METAPAGE_BLKNO, true);
- 	Assert(BufferIsValid(MetaBuffer));
- 	page = (Page) BufferGetPage(MetaBuffer);
- 
- 	GinInitMetabuffer(MetaBuffer);
- 
- 	PageSetLSN(page, lsn);
- 	PageSetTLI(page, ThisTimeLineID);
- 	MarkBufferDirty(MetaBuffer);
- 
- 	RootBuffer = XLogReadBuffer(*node, GIN_ROOT_BLKNO, true);
- 	Assert(BufferIsValid(RootBuffer));
- 	page = (Page) BufferGetPage(RootBuffer);
- 
- 	GinInitBuffer(RootBuffer, GIN_LEAF);
- 
- 	PageSetLSN(page, lsn);
- 	PageSetTLI(page, ThisTimeLineID);
- 	MarkBufferDirty(RootBuffer);
- 
- 	UnlockReleaseBuffer(RootBuffer);
- 	UnlockReleaseBuffer(MetaBuffer);
- }
- 
- static void
  ginRedoCreatePTree(XLogRecPtr lsn, XLogRecord *record)
  {
  	ginxlogCreatePostingTree *data = (ginxlogCreatePostingTree *) XLogRecGetData(record);
--- 70,75 ----
***************
*** 683,691 **** gin_redo(XLogRecPtr lsn, XLogRecord *record)
  	topCtx = MemoryContextSwitchTo(opCtx);
  	switch (info)
  	{
- 		case XLOG_GIN_CREATE_INDEX:
- 			ginRedoCreateIndex(lsn, record);
- 			break;
  		case XLOG_GIN_CREATE_PTREE:
  			ginRedoCreatePTree(lsn, record);
  			break;
--- 651,656 ----
***************
*** 731,740 **** gin_desc(StringInfo buf, uint8 xl_info, char *rec)
  
  	switch (info)
  	{
- 		case XLOG_GIN_CREATE_INDEX:
- 			appendStringInfo(buf, "Create index, ");
- 			desc_node(buf, *(RelFileNode *) rec, GIN_ROOT_BLKNO);
- 			break;
  		case XLOG_GIN_CREATE_PTREE:
  			appendStringInfo(buf, "Create posting tree, ");
  			desc_node(buf, ((ginxlogCreatePostingTree *) rec)->node, ((ginxlogCreatePostingTree *) rec)->blkno);
--- 696,701 ----
*** a/src/backend/access/gist/gist.c
--- b/src/backend/access/gist/gist.c
***************
*** 158,163 **** gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
--- 158,164 ----
  				List **splitinfo,
  				bool markfollowright)
  {
+ 	GistMetaPageData   *meta = GistMetaData(rel);
  	Page		page = BufferGetPage(buffer);
  	bool		is_leaf = (GistPageIsLeaf(page)) ? true : false;
  	XLogRecPtr	recptr;
***************
*** 202,208 **** gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
  		BlockNumber blkno = BufferGetBlockNumber(buffer);
  		bool		is_rootsplit;
  
! 		is_rootsplit = (blkno == GIST_ROOT_BLKNO);
  
  		/*
  		 * Form index tuples vector to split. If we're replacing an old tuple,
--- 203,209 ----
  		BlockNumber blkno = BufferGetBlockNumber(buffer);
  		bool		is_rootsplit;
  
! 		is_rootsplit = (blkno == meta->gist_root);
  
  		/*
  		 * Form index tuples vector to split. If we're replacing an old tuple,
***************
*** 287,293 **** gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
  			for (i = 0, ptr = dist; ptr; ptr = ptr->next)
  				downlinks[i++] = ptr->itup;
  
! 			rootpg.block.blkno = GIST_ROOT_BLKNO;
  			rootpg.block.num = ndownlinks;
  			rootpg.list = gistfillitupvec(downlinks, ndownlinks,
  										  &(rootpg.lenlist));
--- 288,294 ----
  			for (i = 0, ptr = dist; ptr; ptr = ptr->next)
  				downlinks[i++] = ptr->itup;
  
! 			rootpg.block.blkno = meta->gist_root;
  			rootpg.block.num = ndownlinks;
  			rootpg.list = gistfillitupvec(downlinks, ndownlinks,
  										  &(rootpg.lenlist));
***************
*** 325,331 **** gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
  			}
  
  			/* Set up rightlinks */
! 			if (ptr->next && ptr->block.blkno != GIST_ROOT_BLKNO)
  				GistPageGetOpaque(ptr->page)->rightlink =
  					ptr->next->block.blkno;
  			else
--- 326,332 ----
  			}
  
  			/* Set up rightlinks */
! 			if (ptr->next && ptr->block.blkno != meta->gist_root)
  				GistPageGetOpaque(ptr->page)->rightlink =
  					ptr->next->block.blkno;
  			else
***************
*** 475,480 **** gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
--- 476,482 ----
  void
  gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate)
  {
+ 	GistMetaPageData   *meta = GistMetaData(r);
  	ItemId		iid;
  	IndexTuple	idxtuple;
  	GISTInsertStack firststack;
***************
*** 487,493 **** gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate)
  	state.r = r;
  
  	/* Start from the root */
! 	firststack.blkno = GIST_ROOT_BLKNO;
  	firststack.lsn.xrecoff = 0;
  	firststack.parent = NULL;
  	firststack.downlinkoffnum = InvalidOffsetNumber;
--- 489,495 ----
  	state.r = r;
  
  	/* Start from the root */
! 	firststack.blkno = meta->gist_root;
  	firststack.lsn.xrecoff = 0;
  	firststack.parent = NULL;
  	firststack.downlinkoffnum = InvalidOffsetNumber;
***************
*** 542,548 **** gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate)
  			continue;
  		}
  
! 		if (stack->blkno != GIST_ROOT_BLKNO &&
  			XLByteLT(stack->parent->lsn,
  					 GistPageGetOpaque(stack->page)->nsn))
  		{
--- 544,550 ----
  			continue;
  		}
  
! 		if (stack->blkno != meta->gist_root &&
  			XLByteLT(stack->parent->lsn,
  					 GistPageGetOpaque(stack->page)->nsn))
  		{
***************
*** 628,634 **** gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate)
  					 * child pages, so we just need to retry from the root
  					 * page.
  					 */
! 					if (stack->blkno != GIST_ROOT_BLKNO)
  					{
  						UnlockReleaseBuffer(stack->buffer);
  						xlocked = false;
--- 630,636 ----
  					 * child pages, so we just need to retry from the root
  					 * page.
  					 */
! 					if (stack->blkno != meta->gist_root)
  					{
  						UnlockReleaseBuffer(stack->buffer);
  						xlocked = false;
***************
*** 667,673 **** gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate)
  				stack->page = (Page) BufferGetPage(stack->buffer);
  				stack->lsn = PageGetLSN(stack->page);
  
! 				if (stack->blkno == GIST_ROOT_BLKNO)
  				{
  					/*
  					 * the only page that can become inner instead of leaf is
--- 669,675 ----
  				stack->page = (Page) BufferGetPage(stack->buffer);
  				stack->lsn = PageGetLSN(stack->page);
  
! 				if (stack->blkno == meta->gist_root)
  				{
  					/*
  					 * the only page that can become inner instead of leaf is
***************
*** 730,735 **** gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate)
--- 732,738 ----
  static GISTInsertStack *
  gistFindPath(Relation r, BlockNumber child, OffsetNumber *downlinkoffnum)
  {
+ 	GistMetaPageData   *meta = GistMetaData(r);
  	Page		page;
  	Buffer		buffer;
  	OffsetNumber i,
***************
*** 742,748 **** gistFindPath(Relation r, BlockNumber child, OffsetNumber *downlinkoffnum)
  	BlockNumber blkno;
  
  	top = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
! 	top->blkno = GIST_ROOT_BLKNO;
  	top->downlinkoffnum = InvalidOffsetNumber;
  
  	fifo = list_make1(top);
--- 745,751 ----
  	BlockNumber blkno;
  
  	top = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
! 	top->blkno = meta->gist_root;
  	top->downlinkoffnum = InvalidOffsetNumber;
  
  	fifo = list_make1(top);
*** a/src/backend/access/gist/gistbuild.c
--- b/src/backend/access/gist/gistbuild.c
***************
*** 73,78 **** typedef struct
--- 73,79 ----
  } GISTBuildState;
  
  /* prototypes for private functions */
+ static void GistInitMetabuffer(Relation index, Buffer buffer, BlockNumber root);
  static void gistInitBuffering(GISTBuildState *buildstate);
  static int	calculatePagesPerBuffer(GISTBuildState *buildstate, int levelStep);
  static void gistBuildCallback(Relation index,
***************
*** 117,124 **** gistbuild(PG_FUNCTION_ARGS)
  	IndexBuildResult *result;
  	double		reltuples;
  	GISTBuildState buildstate;
! 	Buffer		buffer;
! 	Page		page;
  	MemoryContext oldcxt = CurrentMemoryContext;
  	int			fillfactor;
  
--- 118,128 ----
  	IndexBuildResult *result;
  	double		reltuples;
  	GISTBuildState buildstate;
! 	Buffer		metabuffer,
! 				rootbuffer;
! 	Page		metapage,
! 				rootpage;
! 	BlockNumber	rootblock;
  	MemoryContext oldcxt = CurrentMemoryContext;
  	int			fillfactor;
  
***************
*** 178,212 **** gistbuild(PG_FUNCTION_ARGS)
  	 */
  	buildstate.giststate->tempCxt = createTempGistContext();
  
! 	/* initialize the root page */
! 	buffer = gistNewBuffer(index);
! 	Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO);
! 	page = BufferGetPage(buffer);
  
  	START_CRIT_SECTION();
  
! 	GISTInitBuffer(buffer, F_LEAF);
  
! 	MarkBufferDirty(buffer);
  
  	if (RelationNeedsWAL(index))
  	{
  		XLogRecPtr	recptr;
  		XLogRecData rdata;
  
! 		rdata.data = (char *) &(index->rd_node);
  		rdata.len = sizeof(RelFileNode);
  		rdata.buffer = InvalidBuffer;
  		rdata.next = NULL;
  
  		recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_CREATE_INDEX, &rdata);
! 		PageSetLSN(page, recptr);
! 		PageSetTLI(page, ThisTimeLineID);
  	}
  	else
! 		PageSetLSN(page, GetXLogRecPtrForTemp());
  
! 	UnlockReleaseBuffer(buffer);
  
  	END_CRIT_SECTION();
  
--- 182,225 ----
  	 */
  	buildstate.giststate->tempCxt = createTempGistContext();
  
! 	/* initialize the metabuffer and root buffer */
! 	metabuffer = gistNewBuffer(index);
! 	metapage = BufferGetPage(metabuffer);
! 	Assert(BufferGetBlockNumber(metabuffer) == GIST_METAPAGE_BLKNO);
! 	rootbuffer = gistNewBuffer(index);
! 	rootpage = BufferGetPage(rootbuffer);
! 	rootblock = BufferGetBlockNumber(rootbuffer);
  
  	START_CRIT_SECTION();
  
! 	GistInitMetabuffer(index, metabuffer, rootblock);
! 	GISTInitBuffer(rootbuffer, F_LEAF | F_ROOT);
  
! 	MarkBufferDirty(rootbuffer);
  
  	if (RelationNeedsWAL(index))
  	{
  		XLogRecPtr	recptr;
  		XLogRecData rdata;
+ 		gistxlogCreateIndex	xlrec;
  
! 		xlrec.node = index->rd_node;
! 		xlrec.blkno = rootblock;
! 
! 		rdata.data = (char *) &xlrec;
  		rdata.len = sizeof(RelFileNode);
  		rdata.buffer = InvalidBuffer;
  		rdata.next = NULL;
  
  		recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_CREATE_INDEX, &rdata);
! 		PageSetLSN(rootpage, recptr);
! 		PageSetTLI(rootpage, ThisTimeLineID);
  	}
  	else
! 		PageSetLSN(rootpage, GetXLogRecPtrForTemp());
  
! 	UnlockReleaseBuffer(rootbuffer);
! 	UnlockReleaseBuffer(metabuffer);
  
  	END_CRIT_SECTION();
  
***************
*** 249,254 **** gistbuild(PG_FUNCTION_ARGS)
--- 262,285 ----
  }
  
  /*
+  * Initialize metabuffer.
+  */
+ static void
+ GistInitMetabuffer(Relation index, Buffer buffer, BlockNumber root)
+ {
+ 	GistMetaPageData   *meta;
+ 	Page		page = BufferGetPage(buffer);
+ 
+ 	GISTInitBuffer(buffer, 0);
+ 	MetapageInit(index, page);
+ 
+ 	meta = GistPageGetMeta(page);
+ 	meta->gist_magic = GIST_MAGIC;
+ 	meta->gist_version = GIST_VERSION;
+ 	meta->gist_root = root;
+ }
+ 
+ /*
   * Validator for "buffering" reloption on GiST indexes. Allows "on", "off"
   * and "auto" values.
   */
***************
*** 684,694 **** gistbufferinginserttuples(GISTBuildState *buildstate, Buffer buffer, int level,
  						  IndexTuple *itup, int ntup, OffsetNumber oldoffnum,
  						  BlockNumber parentblk, OffsetNumber downlinkoffnum)
  {
  	GISTBuildBuffers *gfbb = buildstate->gfbb;
  	List	   *splitinfo;
  	bool		is_split;
  
! 	is_split = gistplacetopage(buildstate->indexrel,
  							   buildstate->freespace,
  							   buildstate->giststate,
  							   buffer,
--- 715,727 ----
  						  IndexTuple *itup, int ntup, OffsetNumber oldoffnum,
  						  BlockNumber parentblk, OffsetNumber downlinkoffnum)
  {
+ 	Relation	r = buildstate->indexrel;
+ 	GistMetaPageData *meta = GistMetaData(r);
  	GISTBuildBuffers *gfbb = buildstate->gfbb;
  	List	   *splitinfo;
  	bool		is_split;
  
! 	is_split = gistplacetopage(r,
  							   buildstate->freespace,
  							   buildstate->giststate,
  							   buffer,
***************
*** 703,709 **** gistbufferinginserttuples(GISTBuildState *buildstate, Buffer buffer, int level,
  	 * nodes up to the root. That simplifies the algorithm to re-find correct
  	 * parent.
  	 */
! 	if (is_split && BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO)
  	{
  		Page		page = BufferGetPage(buffer);
  		OffsetNumber off;
--- 736,742 ----
  	 * nodes up to the root. That simplifies the algorithm to re-find correct
  	 * parent.
  	 */
! 	if (is_split && BufferGetBlockNumber(buffer) == meta->gist_root)
  	{
  		Page		page = BufferGetPage(buffer);
  		OffsetNumber off;
***************
*** 727,733 **** gistbufferinginserttuples(GISTBuildState *buildstate, Buffer buffer, int level,
  				ItemId		iid = PageGetItemId(page, off);
  				IndexTuple	idxtuple = (IndexTuple) PageGetItem(page, iid);
  				BlockNumber childblkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
! 				Buffer		childbuf = ReadBuffer(buildstate->indexrel, childblkno);
  
  				LockBuffer(childbuf, GIST_SHARE);
  				gistMemorizeAllDownlinks(buildstate, childbuf);
--- 760,766 ----
  				ItemId		iid = PageGetItemId(page, off);
  				IndexTuple	idxtuple = (IndexTuple) PageGetItem(page, iid);
  				BlockNumber childblkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
! 				Buffer		childbuf = ReadBuffer(r, childblkno);
  
  				LockBuffer(childbuf, GIST_SHARE);
  				gistMemorizeAllDownlinks(buildstate, childbuf);
***************
*** 737,743 **** gistbufferinginserttuples(GISTBuildState *buildstate, Buffer buffer, int level,
  				 * Also remember that the parent of the new child page is the
  				 * root block.
  				 */
! 				gistMemorizeParent(buildstate, childblkno, GIST_ROOT_BLKNO);
  			}
  		}
  	}
--- 770,776 ----
  				 * Also remember that the parent of the new child page is the
  				 * root block.
  				 */
! 				gistMemorizeParent(buildstate, childblkno, meta->gist_root);
  			}
  		}
  	}
***************
*** 773,779 **** gistbufferinginserttuples(GISTBuildState *buildstate, Buffer buffer, int level,
  		 */
  		gistRelocateBuildBuffersOnSplit(gfbb,
  										buildstate->giststate,
! 										buildstate->indexrel,
  										level,
  										buffer, splitinfo);
  
--- 806,812 ----
  		 */
  		gistRelocateBuildBuffersOnSplit(gfbb,
  										buildstate->giststate,
! 										r,
  										level,
  										buffer, splitinfo);
  
***************
*** 1057,1063 **** gistGetMaxLevel(Relation index)
  	 * level.
  	 */
  	maxLevel = 0;
! 	blkno = GIST_ROOT_BLKNO;
  	while (true)
  	{
  		Buffer		buffer;
--- 1090,1096 ----
  	 * level.
  	 */
  	maxLevel = 0;
! 	blkno = GistMetaData(index)->gist_root;
  	while (true)
  	{
  		Buffer		buffer;
*** a/src/backend/access/gist/gistbuildbuffers.c
--- b/src/backend/access/gist/gistbuildbuffers.c
***************
*** 576,582 **** gistRelocateBuildBuffersOnSplit(GISTBuildBuffers *gfbb, GISTSTATE *giststate,
  	 * described by Arge et al did, but it's of no use, as you might as well
  	 * read the tuples straight from the heap instead of the root buffer.
  	 */
! 	Assert(blocknum != GIST_ROOT_BLKNO);
  	memcpy(&oldBuf, nodeBuffer, sizeof(GISTNodeBuffer));
  	oldBuf.isTemp = true;
  
--- 576,582 ----
  	 * described by Arge et al did, but it's of no use, as you might as well
  	 * read the tuples straight from the heap instead of the root buffer.
  	 */
! 	Assert(blocknum != GistMetaData(r)->gist_root);
  	memcpy(&oldBuf, nodeBuffer, sizeof(GISTNodeBuffer));
  	oldBuf.isTemp = true;
  
*** a/src/backend/access/gist/gistget.c
--- b/src/backend/access/gist/gistget.c
***************
*** 489,495 **** gistgettuple(PG_FUNCTION_ARGS)
  		so->curTreeItem = NULL;
  		so->curPageData = so->nPageData = 0;
  
! 		fakeItem.blkno = GIST_ROOT_BLKNO;
  		memset(&fakeItem.data.parentlsn, 0, sizeof(GistNSN));
  		gistScanPage(scan, &fakeItem, NULL, NULL, NULL);
  	}
--- 489,495 ----
  		so->curTreeItem = NULL;
  		so->curPageData = so->nPageData = 0;
  
! 		fakeItem.blkno = GistMetaData(scan->indexRelation)->gist_root;
  		memset(&fakeItem.data.parentlsn, 0, sizeof(GistNSN));
  		gistScanPage(scan, &fakeItem, NULL, NULL, NULL);
  	}
***************
*** 560,566 **** gistgetbitmap(PG_FUNCTION_ARGS)
  	so->curTreeItem = NULL;
  	so->curPageData = so->nPageData = 0;
  
! 	fakeItem.blkno = GIST_ROOT_BLKNO;
  	memset(&fakeItem.data.parentlsn, 0, sizeof(GistNSN));
  	gistScanPage(scan, &fakeItem, NULL, tbm, &ntids);
  
--- 560,566 ----
  	so->curTreeItem = NULL;
  	so->curPageData = so->nPageData = 0;
  
! 	fakeItem.blkno = GistMetaData(scan->indexRelation)->gist_root;
  	memset(&fakeItem.data.parentlsn, 0, sizeof(GistNSN));
  	gistScanPage(scan, &fakeItem, NULL, tbm, &ntids);
  
*** a/src/backend/access/gist/gistutil.c
--- b/src/backend/access/gist/gistutil.c
***************
*** 29,34 **** static Datum attrS[INDEX_MAX_KEYS];
--- 29,83 ----
  static bool isnullS[INDEX_MAX_KEYS];
  
  /*
+  * Fetch local cache of AM-specific info about the index, initializing it
+  * if necessary
+  */
+ GistMetaPageData *
+ GistMetaData(Relation index)
+ {
+ 	Buffer			metabuffer;
+ 	Page			metapage;
+ 	GistMetaPageData   *cache;
+ 
+ 	if (index->rd_amcache != NULL)
+ 		return (GistMetaPageData *) index->rd_amcache;
+ 
+ 	/* Allocate cache memory. */
+ 	cache = MemoryContextAllocZero(index->rd_indexcxt,
+ 								   sizeof(GistMetaPageData));
+ 
+ 	/* Read the metapage. */
+ 	metabuffer = ReadBuffer(index, GIST_METAPAGE_BLKNO);
+ 	LockBuffer(metabuffer, GIST_SHARE);
+ 
+ 	/*
+ 	 * If this index was inherited from PostgreSQL < 9.3 via pg_upgrade, it
+ 	 * might not have a metapage.  In that case, we fake up some suitable
+ 	 * metapage data.  Otherwise, we just copy the data that exists on the
+ 	 * page.
+ 	 */
+ 	metapage = BufferGetPage(metabuffer);
+ 	if (PageIsRelationMetapage(metapage))
+ 	{
+ 		GistMetaPageData   *meta;
+ 
+ 		meta = GistPageGetMeta(metapage);
+ 		memcpy(cache, meta, sizeof(GistMetaPageData));
+ 	}
+ 	else
+ 	{
+ 		cache->gist_magic = GIST_MAGIC;
+ 		cache->gist_version = 0;			/* no real metapage */
+ 		cache->gist_root = GIST_OLD_ROOT_BLKNO;
+ 	}
+ 	index->rd_amcache = (char *) cache;
+ 
+ 	UnlockReleaseBuffer(metabuffer);
+ 
+ 	return cache;
+ }
+ 
+ /*
   * Write itup vector to page, has no control of free space.
   */
  void
*** a/src/backend/access/gist/gistvacuum.c
--- b/src/backend/access/gist/gistvacuum.c
***************
*** 35,40 **** gistvacuumcleanup(PG_FUNCTION_ARGS)
--- 35,41 ----
  				blkno;
  	BlockNumber totFreePages;
  	bool		needLock;
+ 	GistMetaPageData *meta = GistMetaData(rel);
  
  	/* No-op in ANALYZE ONLY mode */
  	if (info->analyze_only)
***************
*** 67,78 **** gistvacuumcleanup(PG_FUNCTION_ARGS)
  		UnlockRelationForExtension(rel, ExclusiveLock);
  
  	totFreePages = 0;
! 	for (blkno = GIST_ROOT_BLKNO + 1; blkno < npages; blkno++)
  	{
  		Buffer		buffer;
  		Page		page;
  
  		vacuum_delay_point();
  
  		buffer = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
  									info->strategy);
--- 68,88 ----
  		UnlockRelationForExtension(rel, ExclusiveLock);
  
  	totFreePages = 0;
! 
! 	/*
! 	 * Block 0 might be the metapage, or it might be the root page, both
! 	 * of which we want to skip.  As of PostgreSQL 9.3, the metapage might
! 	 * also be somewhere else in the index; we must skip it no matter where
! 	 * it is.
! 	 */
! 	for (blkno = 1; blkno < npages; blkno++)
  	{
  		Buffer		buffer;
  		Page		page;
  
  		vacuum_delay_point();
+ 		if (blkno == meta->gist_root)
+ 			continue;
  
  		buffer = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
  									info->strategy);
***************
*** 109,119 **** typedef struct GistBDItem
  } GistBDItem;
  
  static void
! pushStackIfSplited(Page page, GistBDItem *stack)
  {
  	GISTPageOpaque opaque = GistPageGetOpaque(page);
  
! 	if (stack->blkno != GIST_ROOT_BLKNO && !XLogRecPtrIsInvalid(stack->parentlsn) &&
  		(GistFollowRight(page) || XLByteLT(stack->parentlsn, opaque->nsn)) &&
  		opaque->rightlink != InvalidBlockNumber /* sanity check */ )
  	{
--- 119,129 ----
  } GistBDItem;
  
  static void
! pushStackIfSplited(GistMetaPageData *meta, Page page, GistBDItem *stack)
  {
  	GISTPageOpaque opaque = GistPageGetOpaque(page);
  
! 	if (stack->blkno != meta->gist_root && !XLogRecPtrIsInvalid(stack->parentlsn) &&
  		(GistFollowRight(page) || XLByteLT(stack->parentlsn, opaque->nsn)) &&
  		opaque->rightlink != InvalidBlockNumber /* sanity check */ )
  	{
***************
*** 147,152 **** gistbulkdelete(PG_FUNCTION_ARGS)
--- 157,163 ----
  	Relation	rel = info->index;
  	GistBDItem *stack,
  			   *ptr;
+ 	GistMetaPageData *meta = GistMetaData(rel);
  
  	/* first time through? */
  	if (stats == NULL)
***************
*** 156,162 **** gistbulkdelete(PG_FUNCTION_ARGS)
  	stats->num_index_tuples = 0;
  
  	stack = (GistBDItem *) palloc0(sizeof(GistBDItem));
! 	stack->blkno = GIST_ROOT_BLKNO;
  
  	while (stack)
  	{
--- 167,173 ----
  	stats->num_index_tuples = 0;
  
  	stack = (GistBDItem *) palloc0(sizeof(GistBDItem));
! 	stack->blkno = meta->gist_root;
  
  	while (stack)
  	{
***************
*** 182,188 **** gistbulkdelete(PG_FUNCTION_ARGS)
  			LockBuffer(buffer, GIST_EXCLUSIVE);
  
  			page = (Page) BufferGetPage(buffer);
! 			if (stack->blkno == GIST_ROOT_BLKNO && !GistPageIsLeaf(page))
  			{
  				/* only the root can become non-leaf during relock */
  				UnlockReleaseBuffer(buffer);
--- 193,199 ----
  			LockBuffer(buffer, GIST_EXCLUSIVE);
  
  			page = (Page) BufferGetPage(buffer);
! 			if (stack->blkno == meta->gist_root && !GistPageIsLeaf(page))
  			{
  				/* only the root can become non-leaf during relock */
  				UnlockReleaseBuffer(buffer);
***************
*** 194,200 **** gistbulkdelete(PG_FUNCTION_ARGS)
  			 * check for split proceeded after look at parent, we should check
  			 * it after relock
  			 */
! 			pushStackIfSplited(page, stack);
  
  			/*
  			 * Remove deletable tuples from page
--- 205,211 ----
  			 * check for split proceeded after look at parent, we should check
  			 * it after relock
  			 */
! 			pushStackIfSplited(meta, page, stack);
  
  			/*
  			 * Remove deletable tuples from page
***************
*** 247,253 **** gistbulkdelete(PG_FUNCTION_ARGS)
  		else
  		{
  			/* check for split proceeded after look at parent */
! 			pushStackIfSplited(page, stack);
  
  			maxoff = PageGetMaxOffsetNumber(page);
  
--- 258,264 ----
  		else
  		{
  			/* check for split proceeded after look at parent */
! 			pushStackIfSplited(meta, page, stack);
  
  			maxoff = PageGetMaxOffsetNumber(page);
  
*** a/src/backend/access/gist/gistxlog.c
--- b/src/backend/access/gist/gistxlog.c
***************
*** 140,152 **** gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record)
  			GistClearTuplesDeleted(page);
  	}
  
! 	if (!GistPageIsLeaf(page) && PageGetMaxOffsetNumber(page) == InvalidOffsetNumber && xldata->blkno == GIST_ROOT_BLKNO)
! 
  		/*
  		 * all links on non-leaf root page was deleted by vacuum full, so root
  		 * page becomes a leaf
  		 */
  		GistPageSetLeaf(page);
  
  	GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
  	PageSetLSN(page, lsn);
--- 140,155 ----
  			GistClearTuplesDeleted(page);
  	}
  
! 	if (!GistPageIsLeaf(page) &&
! 		PageGetMaxOffsetNumber(page) == InvalidOffsetNumber &&
! 		(xldata->blkno == GIST_OLD_ROOT_BLKNO || GistPageIsRoot(page)))
! 	{
  		/*
  		 * all links on non-leaf root page was deleted by vacuum full, so root
  		 * page becomes a leaf
  		 */
  		GistPageSetLeaf(page);
+ 	}
  
  	GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
  	PageSetLSN(page, lsn);
***************
*** 230,247 **** gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
  		NewPage    *newpage = xlrec.page + i;
  		int			flags;
  
! 		if (newpage->header->blkno == GIST_ROOT_BLKNO)
  		{
  			Assert(i == 0);
  			isrootsplit = true;
  		}
  
- 		buffer = XLogReadBuffer(xlrec.data->node, newpage->header->blkno, true);
- 		Assert(BufferIsValid(buffer));
- 		page = (Page) BufferGetPage(buffer);
- 
  		/* ok, clear buffer */
! 		if (xlrec.data->origleaf && newpage->header->blkno != GIST_ROOT_BLKNO)
  			flags = F_LEAF;
  		else
  			flags = 0;
--- 233,258 ----
  		NewPage    *newpage = xlrec.page + i;
  		int			flags;
  
! 		buffer = XLogReadBuffer(xlrec.data->node, newpage->header->blkno, true);
! 		Assert(BufferIsValid(buffer));
! 		page = (Page) BufferGetPage(buffer);
! 
! 		/*
! 		 * If this index was inherited from PostgreSQL < 9.3 via pg_upgrade,
! 		 * the root will be at block zero.  Otherwise, the metapage will be at
! 		 * that offset, and the root page will be flagged as such.
! 		 */
! 		if (newpage->header->blkno == GIST_OLD_ROOT_BLKNO
! 			|| GistPageIsRoot(page))
  		{
  			Assert(i == 0);
  			isrootsplit = true;
  		}
  
  		/* ok, clear buffer */
! 		if (isrootsplit)
! 			flags = F_ROOT;
! 		else if (xlrec.data->origleaf)
  			flags = F_LEAF;
  		else
  			flags = 0;
***************
*** 250,256 **** gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
  		/* and fill it */
  		gistfillbuffer(page, newpage->itup, newpage->header->num, FirstOffsetNumber);
  
! 		if (newpage->header->blkno == GIST_ROOT_BLKNO)
  		{
  			GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
  			GistPageGetOpaque(page)->nsn = xldata->orignsn;
--- 261,267 ----
  		/* and fill it */
  		gistfillbuffer(page, newpage->itup, newpage->header->num, FirstOffsetNumber);
  
! 		if (isrootsplit)
  		{
  			GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
  			GistPageGetOpaque(page)->nsn = xldata->orignsn;
***************
*** 280,294 **** gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
  static void
  gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
  {
! 	RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
  	Buffer		buffer;
  	Page		page;
  
! 	buffer = XLogReadBuffer(*node, GIST_ROOT_BLKNO, true);
  	Assert(BufferIsValid(buffer));
  	page = (Page) BufferGetPage(buffer);
  
! 	GISTInitBuffer(buffer, F_LEAF);
  
  	PageSetLSN(page, lsn);
  	PageSetTLI(page, ThisTimeLineID);
--- 291,305 ----
  static void
  gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
  {
! 	gistxlogCreateIndex *xldata = (gistxlogCreateIndex *) XLogRecGetData(record);
  	Buffer		buffer;
  	Page		page;
  
! 	buffer = XLogReadBuffer(xldata->node, xldata->blkno, true);
  	Assert(BufferIsValid(buffer));
  	page = (Page) BufferGetPage(buffer);
  
! 	GISTInitBuffer(buffer, F_LEAF|F_ROOT);
  
  	PageSetLSN(page, lsn);
  	PageSetTLI(page, ThisTimeLineID);
*** a/src/backend/access/hash/hashpage.c
--- b/src/backend/access/hash/hashpage.c
***************
*** 382,387 **** _hash_metapinit(Relation rel, double num_tuples, ForkNumber forkNum)
--- 382,388 ----
  	 */
  	metabuf = _hash_getnewbuf(rel, HASH_METAPAGE, forkNum);
  	pg = BufferGetPage(metabuf);
+ 	MetapageInit(rel, pg);
  
  	pageopaque = (HashPageOpaque) PageGetSpecialPointer(pg);
  	pageopaque->hasho_prevblkno = InvalidBlockNumber;
*** a/src/backend/access/heap/heapam.c
--- b/src/backend/access/heap/heapam.c
***************
*** 42,47 ****
--- 42,48 ----
  
  #include "access/heapam.h"
  #include "access/hio.h"
+ #include "access/metapage.h"
  #include "access/multixact.h"
  #include "access/relscan.h"
  #include "access/sysattr.h"
***************
*** 162,172 **** initscan(HeapScanDesc scan, ScanKey key, bool is_rescan)
  	{
  		scan->rs_syncscan = true;
  		scan->rs_startblock = ss_get_location(scan->rs_rd, scan->rs_nblocks);
  	}
  	else
  	{
  		scan->rs_syncscan = false;
! 		scan->rs_startblock = 0;
  	}
  
  	scan->rs_inited = false;
--- 163,175 ----
  	{
  		scan->rs_syncscan = true;
  		scan->rs_startblock = ss_get_location(scan->rs_rd, scan->rs_nblocks);
+ 		if (scan->rs_startblock < scan->rs_lowpage)
+ 			scan->rs_startblock = scan->rs_lowpage;
  	}
  	else
  	{
  		scan->rs_syncscan = false;
! 		scan->rs_startblock = scan->rs_lowpage;
  	}
  
  	scan->rs_inited = false;
***************
*** 213,218 **** heapgetpage(HeapScanDesc scan, BlockNumber page)
--- 216,222 ----
  	ItemId		lpp;
  	bool		all_visible;
  
+ 	Assert(page >= scan->rs_lowpage);
  	Assert(page < scan->rs_nblocks);
  
  	/* release previous scan buffer, if any */
***************
*** 348,354 **** heapgettup(HeapScanDesc scan,
  			/*
  			 * return null immediately if relation is empty
  			 */
! 			if (scan->rs_nblocks == 0)
  			{
  				Assert(!BufferIsValid(scan->rs_cbuf));
  				tuple->t_data = NULL;
--- 352,358 ----
  			/*
  			 * return null immediately if relation is empty
  			 */
! 			if (scan->rs_nblocks == scan->rs_lowpage)
  			{
  				Assert(!BufferIsValid(scan->rs_cbuf));
  				tuple->t_data = NULL;
***************
*** 382,388 **** heapgettup(HeapScanDesc scan,
  			/*
  			 * return null immediately if relation is empty
  			 */
! 			if (scan->rs_nblocks == 0)
  			{
  				Assert(!BufferIsValid(scan->rs_cbuf));
  				tuple->t_data = NULL;
--- 386,392 ----
  			/*
  			 * return null immediately if relation is empty
  			 */
! 			if (scan->rs_nblocks == scan->rs_lowpage)
  			{
  				Assert(!BufferIsValid(scan->rs_cbuf));
  				tuple->t_data = NULL;
***************
*** 397,403 **** heapgettup(HeapScanDesc scan,
  			 */
  			scan->rs_syncscan = false;
  			/* start from last page of the scan */
! 			if (scan->rs_startblock > 0)
  				page = scan->rs_startblock - 1;
  			else
  				page = scan->rs_nblocks - 1;
--- 401,407 ----
  			 */
  			scan->rs_syncscan = false;
  			/* start from last page of the scan */
! 			if (scan->rs_startblock > scan->rs_lowpage)
  				page = scan->rs_startblock - 1;
  			else
  				page = scan->rs_nblocks - 1;
***************
*** 522,528 **** heapgettup(HeapScanDesc scan,
  		if (backward)
  		{
  			finished = (page == scan->rs_startblock);
! 			if (page == 0)
  				page = scan->rs_nblocks;
  			page--;
  		}
--- 526,532 ----
  		if (backward)
  		{
  			finished = (page == scan->rs_startblock);
! 			if (page == scan->rs_lowpage)
  				page = scan->rs_nblocks;
  			page--;
  		}
***************
*** 530,536 **** heapgettup(HeapScanDesc scan,
  		{
  			page++;
  			if (page >= scan->rs_nblocks)
! 				page = 0;
  			finished = (page == scan->rs_startblock);
  
  			/*
--- 534,540 ----
  		{
  			page++;
  			if (page >= scan->rs_nblocks)
! 				page = scan->rs_lowpage;
  			finished = (page == scan->rs_startblock);
  
  			/*
***************
*** 623,629 **** heapgettup_pagemode(HeapScanDesc scan,
  			/*
  			 * return null immediately if relation is empty
  			 */
! 			if (scan->rs_nblocks == 0)
  			{
  				Assert(!BufferIsValid(scan->rs_cbuf));
  				tuple->t_data = NULL;
--- 627,633 ----
  			/*
  			 * return null immediately if relation is empty
  			 */
! 			if (scan->rs_nblocks == scan->rs_lowpage)
  			{
  				Assert(!BufferIsValid(scan->rs_cbuf));
  				tuple->t_data = NULL;
***************
*** 654,660 **** heapgettup_pagemode(HeapScanDesc scan,
  			/*
  			 * return null immediately if relation is empty
  			 */
! 			if (scan->rs_nblocks == 0)
  			{
  				Assert(!BufferIsValid(scan->rs_cbuf));
  				tuple->t_data = NULL;
--- 658,664 ----
  			/*
  			 * return null immediately if relation is empty
  			 */
! 			if (scan->rs_nblocks == scan->rs_lowpage)
  			{
  				Assert(!BufferIsValid(scan->rs_cbuf));
  				tuple->t_data = NULL;
***************
*** 669,675 **** heapgettup_pagemode(HeapScanDesc scan,
  			 */
  			scan->rs_syncscan = false;
  			/* start from last page of the scan */
! 			if (scan->rs_startblock > 0)
  				page = scan->rs_startblock - 1;
  			else
  				page = scan->rs_nblocks - 1;
--- 673,679 ----
  			 */
  			scan->rs_syncscan = false;
  			/* start from last page of the scan */
! 			if (scan->rs_startblock > scan->rs_lowpage)
  				page = scan->rs_startblock - 1;
  			else
  				page = scan->rs_nblocks - 1;
***************
*** 783,789 **** heapgettup_pagemode(HeapScanDesc scan,
  		if (backward)
  		{
  			finished = (page == scan->rs_startblock);
! 			if (page == 0)
  				page = scan->rs_nblocks;
  			page--;
  		}
--- 787,793 ----
  		if (backward)
  		{
  			finished = (page == scan->rs_startblock);
! 			if (page == scan->rs_lowpage)
  				page = scan->rs_nblocks;
  			page--;
  		}
***************
*** 791,797 **** heapgettup_pagemode(HeapScanDesc scan,
  		{
  			page++;
  			if (page >= scan->rs_nblocks)
! 				page = 0;
  			finished = (page == scan->rs_startblock);
  
  			/*
--- 795,801 ----
  		{
  			page++;
  			if (page >= scan->rs_nblocks)
! 				page = scan->rs_lowpage;
  			finished = (page == scan->rs_startblock);
  
  			/*
***************
*** 1202,1207 **** heap_beginscan_internal(Relation relation, Snapshot snapshot,
--- 1206,1212 ----
  	scan->rs_strategy = NULL;	/* set in initscan */
  	scan->rs_allow_strat = allow_strat;
  	scan->rs_allow_sync = allow_sync;
+ 	scan->rs_lowpage = RelationFirstNonMetapage(relation);
  
  	/*
  	 * we can use page-at-a-time mode if it's an MVCC-safe snapshot
*** a/src/backend/access/heap/hio.c
--- b/src/backend/access/heap/hio.c
***************
*** 17,22 ****
--- 17,23 ----
  
  #include "access/heapam.h"
  #include "access/hio.h"
+ #include "access/metapage.h"
  #include "access/visibilitymap.h"
  #include "storage/bufmgr.h"
  #include "storage/freespace.h"
***************
*** 290,302 **** RelationGetBufferForTuple(Relation relation, Size len,
  		{
  			BlockNumber nblocks = RelationGetNumberOfBlocks(relation);
  
! 			if (nblocks > 0)
  				targetBlock = nblocks - 1;
  		}
  	}
  
  	while (targetBlock != InvalidBlockNumber)
  	{
  		/*
  		 * Read and exclusive-lock the target block, as well as the other
  		 * block if one was given, taking suitable care with lock ordering and
--- 291,306 ----
  		{
  			BlockNumber nblocks = RelationGetNumberOfBlocks(relation);
  
! 			if (nblocks > RelationFirstNonMetapage(relation))
  				targetBlock = nblocks - 1;
  		}
  	}
  
  	while (targetBlock != InvalidBlockNumber)
  	{
+ 		/* Make sure we don't insert into the metapage. */
+ 		Assert(targetBlock != METAPAGE_BLKNO || !RelationHasMetapage(relation));
+ 
  		/*
  		 * Read and exclusive-lock the target block, as well as the other
  		 * block if one was given, taking suitable care with lock ordering and
*** a/src/backend/access/nbtree/nbtpage.c
--- b/src/backend/access/nbtree/nbtpage.c
***************
*** 36,47 ****
   *	_bt_initmetapage() -- Fill a page buffer with a correct metapage image
   */
  void
! _bt_initmetapage(Page page, BlockNumber rootbknum, uint32 level)
  {
  	BTMetaPageData *metad;
  	BTPageOpaque metaopaque;
  
  	_bt_pageinit(page, BLCKSZ);
  
  	metad = BTPageGetMeta(page);
  	metad->btm_magic = BTREE_MAGIC;
--- 36,48 ----
   *	_bt_initmetapage() -- Fill a page buffer with a correct metapage image
   */
  void
! _bt_initmetapage(Relation rel, Page page, BlockNumber rootbknum, uint32 level)
  {
  	BTMetaPageData *metad;
  	BTPageOpaque metaopaque;
  
  	_bt_pageinit(page, BLCKSZ);
+ 	MetapageInit(rel, page);
  
  	metad = BTPageGetMeta(page);
  	metad->btm_magic = BTREE_MAGIC;
*** a/src/backend/access/nbtree/nbtree.c
--- b/src/backend/access/nbtree/nbtree.c
***************
*** 213,219 **** btbuildempty(PG_FUNCTION_ARGS)
  
  	/* Construct metapage. */
  	metapage = (Page) palloc(BLCKSZ);
! 	_bt_initmetapage(metapage, P_NONE, 0);
  
  	/* Write the page.	If archiving/streaming, XLOG it. */
  	smgrwrite(index->rd_smgr, INIT_FORKNUM, BTREE_METAPAGE,
--- 213,219 ----
  
  	/* Construct metapage. */
  	metapage = (Page) palloc(BLCKSZ);
! 	_bt_initmetapage(index, metapage, P_NONE, 0);
  
  	/* Write the page.	If archiving/streaming, XLOG it. */
  	smgrwrite(index->rd_smgr, INIT_FORKNUM, BTREE_METAPAGE,
*** a/src/backend/access/nbtree/nbtsort.c
--- b/src/backend/access/nbtree/nbtsort.c
***************
*** 658,664 **** _bt_uppershutdown(BTWriteState *wstate, BTPageState *state)
  	 * by filling in a valid magic number in the metapage.
  	 */
  	metapage = (Page) palloc(BLCKSZ);
! 	_bt_initmetapage(metapage, rootblkno, rootlevel);
  	_bt_blwritepage(wstate, metapage, BTREE_METAPAGE);
  }
  
--- 658,664 ----
  	 * by filling in a valid magic number in the metapage.
  	 */
  	metapage = (Page) palloc(BLCKSZ);
! 	_bt_initmetapage(wstate->index, metapage, rootblkno, rootlevel);
  	_bt_blwritepage(wstate, metapage, BTREE_METAPAGE);
  }
  
*** a/src/backend/access/spgist/spginsert.c
--- b/src/backend/access/spgist/spginsert.c
***************
*** 82,88 **** spgbuild(PG_FUNCTION_ARGS)
  
  	START_CRIT_SECTION();
  
! 	SpGistInitMetapage(BufferGetPage(metabuffer));
  	MarkBufferDirty(metabuffer);
  	SpGistInitBuffer(rootbuffer, SPGIST_LEAF);
  	MarkBufferDirty(rootbuffer);
--- 82,88 ----
  
  	START_CRIT_SECTION();
  
! 	SpGistInitMetapage(index, BufferGetPage(metabuffer));
  	MarkBufferDirty(metabuffer);
  	SpGistInitBuffer(rootbuffer, SPGIST_LEAF);
  	MarkBufferDirty(rootbuffer);
***************
*** 94,100 **** spgbuild(PG_FUNCTION_ARGS)
  		XLogRecPtr	recptr;
  		XLogRecData rdata;
  
! 		/* WAL data is just the relfilenode */
  		rdata.data = (char *) &(index->rd_node);
  		rdata.len = sizeof(RelFileNode);
  		rdata.buffer = InvalidBuffer;
--- 94,103 ----
  		XLogRecPtr	recptr;
  		XLogRecData rdata;
  
! 		/* log image of metabuffer */
! 		log_newpage_buffer(metabuffer);
! 
! 		/* WAL data for root and null buffers is just the relfilenode */
  		rdata.data = (char *) &(index->rd_node);
  		rdata.len = sizeof(RelFileNode);
  		rdata.buffer = InvalidBuffer;
***************
*** 102,109 **** spgbuild(PG_FUNCTION_ARGS)
  
  		recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_CREATE_INDEX, &rdata);
  
- 		PageSetLSN(BufferGetPage(metabuffer), recptr);
- 		PageSetTLI(BufferGetPage(metabuffer), ThisTimeLineID);
  		PageSetLSN(BufferGetPage(rootbuffer), recptr);
  		PageSetTLI(BufferGetPage(rootbuffer), ThisTimeLineID);
  		PageSetLSN(BufferGetPage(nullbuffer), recptr);
--- 105,110 ----
***************
*** 152,158 **** spgbuildempty(PG_FUNCTION_ARGS)
  
  	/* Construct metapage. */
  	page = (Page) palloc(BLCKSZ);
! 	SpGistInitMetapage(page);
  
  	/* Write the page.	If archiving/streaming, XLOG it. */
  	smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_METAPAGE_BLKNO,
--- 153,159 ----
  
  	/* Construct metapage. */
  	page = (Page) palloc(BLCKSZ);
! 	SpGistInitMetapage(index, page);
  
  	/* Write the page.	If archiving/streaming, XLOG it. */
  	smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_METAPAGE_BLKNO,
*** a/src/backend/access/spgist/spgutils.c
--- b/src/backend/access/spgist/spgutils.c
***************
*** 471,482 **** SpGistInitBuffer(Buffer b, uint16 f)
   * Initialize metadata page
   */
  void
! SpGistInitMetapage(Page page)
  {
  	SpGistMetaPageData *metadata;
  	int			i;
  
  	SpGistInitPage(page, SPGIST_META);
  	metadata = SpGistPageGetMeta(page);
  	memset(metadata, 0, sizeof(SpGistMetaPageData));
  	metadata->magicNumber = SPGIST_MAGIC_NUMBER;
--- 471,483 ----
   * Initialize metadata page
   */
  void
! SpGistInitMetapage(Relation index, Page page)
  {
  	SpGistMetaPageData *metadata;
  	int			i;
  
  	SpGistInitPage(page, SPGIST_META);
+ 	MetapageInit(index, page);
  	metadata = SpGistPageGetMeta(page);
  	memset(metadata, 0, sizeof(SpGistMetaPageData));
  	metadata->magicNumber = SPGIST_MAGIC_NUMBER;
*** a/src/backend/access/spgist/spgxlog.c
--- b/src/backend/access/spgist/spgxlog.c
***************
*** 75,89 **** spgRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
  	Buffer		buffer;
  	Page		page;
  
- 	buffer = XLogReadBuffer(*node, SPGIST_METAPAGE_BLKNO, true);
- 	Assert(BufferIsValid(buffer));
- 	page = (Page) BufferGetPage(buffer);
- 	SpGistInitMetapage(page);
- 	PageSetLSN(page, lsn);
- 	PageSetTLI(page, ThisTimeLineID);
- 	MarkBufferDirty(buffer);
- 	UnlockReleaseBuffer(buffer);
- 
  	buffer = XLogReadBuffer(*node, SPGIST_ROOT_BLKNO, true);
  	Assert(BufferIsValid(buffer));
  	SpGistInitBuffer(buffer, SPGIST_LEAF);
--- 75,80 ----
*** a/src/backend/bootstrap/bootparse.y
--- b/src/backend/bootstrap/bootparse.y
***************
*** 217,223 **** Boot_CreateStmt:
  												   PG_CATALOG_NAMESPACE,
  												   shared_relation ? GLOBALTABLESPACE_OID : 0,
  												   $3,
- 												   InvalidOid,
  												   tupdesc,
  												   RELKIND_RELATION,
  												   RELPERSISTENCE_PERMANENT,
--- 217,222 ----
*** a/src/backend/catalog/heap.c
--- b/src/backend/catalog/heap.c
***************
*** 29,34 ****
--- 29,35 ----
   */
  #include "postgres.h"
  
+ #include "access/metapage.h"
  #include "access/sysattr.h"
  #include "access/transam.h"
  #include "access/xact.h"
***************
*** 57,62 ****
--- 58,64 ----
  #include "parser/parse_collate.h"
  #include "parser/parse_expr.h"
  #include "parser/parse_relation.h"
+ #include "storage/bufmgr.h"
  #include "storage/predicate.h"
  #include "storage/smgr.h"
  #include "utils/acl.h"
***************
*** 224,233 **** SystemAttributeByName(const char *attname, bool relhasoids)
  /* ----------------------------------------------------------------
   *		heap_create		- Create an uncataloged heap relation
   *
-  *		Note API change: the caller must now always provide the OID
-  *		to use for the relation.  The relfilenode may (and, normally,
-  *		should) be left unspecified.
-  *
   *		rel->rd_rel is initialized by RelationBuildLocalRelation,
   *		and is mostly zeroes at return.
   * ----------------------------------------------------------------
--- 226,231 ----
***************
*** 237,243 **** heap_create(const char *relname,
  			Oid relnamespace,
  			Oid reltablespace,
  			Oid relid,
- 			Oid relfilenode,
  			TupleDesc tupDesc,
  			char relkind,
  			char relpersistence,
--- 235,240 ----
***************
*** 250,255 **** heap_create(const char *relname,
--- 247,255 ----
  	/* The caller must have provided an OID for the relation. */
  	Assert(OidIsValid(relid));
  
+ 	/* Use index_create() for indexes. */
+ 	Assert(relkind != RELKIND_INDEX);
+ 
  	/*
  	 * Decide if we need storage or not, and handle a couple other special
  	 * cases for particular relkinds.
***************
*** 282,316 **** heap_create(const char *relname,
  	}
  
  	/*
- 	 * Unless otherwise requested, the physical ID (relfilenode) is initially
- 	 * the same as the logical ID (OID).  When the caller did specify a
- 	 * relfilenode, it already exists; do not attempt to create it.
- 	 */
- 	if (OidIsValid(relfilenode))
- 		create_storage = false;
- 	else
- 		relfilenode = relid;
- 
- 	/*
- 	 * Never allow a pg_class entry to explicitly specify the database's
- 	 * default tablespace in reltablespace; force it to zero instead. This
- 	 * ensures that if the database is cloned with a different default
- 	 * tablespace, the pg_class entry will still match where CREATE DATABASE
- 	 * will put the physically copied relation.
- 	 *
- 	 * Yes, this is a bit of a hack.
- 	 */
- 	if (reltablespace == MyDatabaseTableSpace)
- 		reltablespace = InvalidOid;
- 
- 	/*
  	 * build the relcache entry.
  	 */
  	rel = RelationBuildLocalRelation(relname,
  									 relnamespace,
  									 tupDesc,
  									 relid,
! 									 relfilenode,
  									 reltablespace,
  									 shared_relation,
  									 mapped_relation,
--- 282,294 ----
  	}
  
  	/*
  	 * build the relcache entry.
  	 */
  	rel = RelationBuildLocalRelation(relname,
  									 relnamespace,
  									 tupDesc,
  									 relid,
! 									 relid,
  									 reltablespace,
  									 shared_relation,
  									 mapped_relation,
***************
*** 327,332 **** heap_create(const char *relname,
--- 305,311 ----
  	{
  		RelationOpenSmgr(rel);
  		RelationCreateStorage(rel->rd_node, relpersistence);
+ 		heap_create_storage(rel);
  	}
  
  	return rel;
***************
*** 1106,1112 **** heap_create_with_catalog(const char *relname,
  							   relnamespace,
  							   reltablespace,
  							   relid,
- 							   InvalidOid,
  							   tupdesc,
  							   relkind,
  							   relpersistence,
--- 1085,1090 ----
***************
*** 1292,1303 **** heap_create_with_catalog(const char *relname,
  	if (oncommit != ONCOMMIT_NOOP)
  		register_on_commit_action(relid, oncommit);
  
- 	if (relpersistence == RELPERSISTENCE_UNLOGGED)
- 	{
- 		Assert(relkind == RELKIND_RELATION || relkind == RELKIND_TOASTVALUE);
- 		heap_create_init_fork(new_rel_desc);
- 	}
- 
  	/*
  	 * ok, the relation has been cataloged, so close our relations and return
  	 * the OID of the newly created relation.
--- 1270,1275 ----
***************
*** 1309,1328 **** heap_create_with_catalog(const char *relname,
  }
  
  /*
!  * Set up an init fork for an unlogged table so that it can be correctly
!  * reinitialized on restart.  Since we're going to do an immediate sync, we
!  * only need to xlog this if archiving or streaming is enabled.  And the
!  * immediate sync is required, because otherwise there's no guarantee that
!  * this will hit the disk before the next checkpoint moves the redo pointer.
   */
  void
! heap_create_init_fork(Relation rel)
  {
! 	RelationOpenSmgr(rel);
! 	smgrcreate(rel->rd_smgr, INIT_FORKNUM, false);
! 	if (XLogIsNeeded())
! 		log_smgrcreate(&rel->rd_smgr->smgr_rnode.node, INIT_FORKNUM);
! 	smgrimmedsync(rel->rd_smgr, INIT_FORKNUM);
  }
  
  /*
--- 1281,1331 ----
  }
  
  /*
!  * Create storage for a new heap relation.
!  *
!  * This creates the physical file and inserts the metapage.  If the relation
!  * is unlogged, we do the same for the init fork as well.
   */
  void
! heap_create_storage(Relation rel)
  {
! 	Buffer		metabuffer;
! 	Page		metapage;
! 
! 	/* Write metapage. */
! 	metabuffer = ReadBufferExtended(rel, MAIN_FORKNUM, P_NEW, RBM_ZERO, NULL);
! 	metapage = BufferGetPage(metabuffer);
! 	Assert(BufferGetBlockNumber(metabuffer) == METAPAGE_BLKNO);
! 	LockBuffer(metabuffer, BUFFER_LOCK_EXCLUSIVE);
! 	START_CRIT_SECTION();
! 	PageInit(metapage, BLCKSZ, 0);
! 	MetapageInit(rel, metapage);
! 	MarkBufferDirty(metabuffer);
! 	if (RelationNeedsWAL(rel))
! 		log_newpage_buffer(metabuffer);
! 	END_CRIT_SECTION();
! 	UnlockReleaseBuffer(metabuffer);
! 
! 	/*
! 	 * If this relation is unlogged, set up an init fork so that it can be
! 	 * correctly reinitialized on restart.
! 	 */
! 	if (rel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED)
! 	{
! 		smgrcreate(rel->rd_smgr, INIT_FORKNUM, false);
! 		metabuffer = ReadBufferExtended(rel, INIT_FORKNUM, P_NEW, RBM_ZERO,
! 										NULL);
! 		metapage = BufferGetPage(metabuffer);
! 		Assert(BufferGetBlockNumber(metabuffer) == METAPAGE_BLKNO);
! 		LockBuffer(metabuffer, BUFFER_LOCK_EXCLUSIVE);
! 		START_CRIT_SECTION();
! 		PageInit(metapage, BLCKSZ, 0);
! 		MetapageInit(rel, metapage);
! 		MarkBufferDirty(metabuffer);
! 		log_newpage_buffer(metabuffer);
! 		END_CRIT_SECTION();
! 		UnlockReleaseBuffer(metabuffer);
! 	}
  }
  
  /*
*** a/src/backend/catalog/index.c
--- b/src/backend/catalog/index.c
***************
*** 803,826 **** index_create(Relation heapRelation,
  	}
  
  	/*
! 	 * create the index relation's relcache entry and physical disk file. (If
! 	 * we fail further down, it's the smgr's responsibility to remove the disk
! 	 * file again.)
! 	 */
! 	indexRelation = heap_create(indexRelationName,
! 								namespaceId,
! 								tableSpaceId,
! 								indexRelationId,
! 								relFileNode,
! 								indexTupDesc,
! 								RELKIND_INDEX,
! 								relpersistence,
! 								shared_relation,
! 								mapped_relation);
  
  	Assert(indexRelationId == RelationGetRelid(indexRelation));
  
  	/*
  	 * Obtain exclusive lock on it.  Although no other backends can see it
  	 * until we commit, this prevents deadlock-risk complaints from lock
  	 * manager in cases such as CLUSTER.
--- 803,835 ----
  	}
  
  	/*
! 	 * create the index relation's relcache entry.
! 	 */
! 	indexRelation = RelationBuildLocalRelation(indexRelationName,
! 											   namespaceId,
! 											   indexTupDesc,
! 											   indexRelationId,
! 			OidIsValid(relFileNode) ? relFileNode : indexRelationId,
! 											   tableSpaceId,
! 											   shared_relation,
! 											   mapped_relation,
! 											   relpersistence,
! 											   RELKIND_INDEX);
  
  	Assert(indexRelationId == RelationGetRelid(indexRelation));
  
  	/*
+ 	 * create the physical disk file, unless the caller provided an existing
+ 	 * relFileNode for reuse. (If we fail further down, it's the smgr's
+ 	 * responsibility to remove the disk file again.)
+ 	 */
+ 	if (!OidIsValid(relFileNode))
+ 	{
+ 		RelationOpenSmgr(indexRelation);
+ 		RelationCreateStorage(indexRelation->rd_node, relpersistence);
+ 	}
+ 
+ 	/*
  	 * Obtain exclusive lock on it.  Although no other backends can see it
  	 * until we commit, this prevents deadlock-risk complaints from lock
  	 * manager in cases such as CLUSTER.
*** a/src/backend/commands/sequence.c
--- b/src/backend/commands/sequence.c
***************
*** 14,19 ****
--- 14,20 ----
   */
  #include "postgres.h"
  
+ #include "access/metapage.h"
  #include "access/transam.h"
  #include "access/xlogutils.h"
  #include "catalog/dependency.h"
***************
*** 314,320 **** fill_seq_with_data(Relation rel, HeapTuple tuple)
  	/* Initialize first page of relation with special magic number */
  
  	buf = ReadBuffer(rel, P_NEW);
! 	Assert(BufferGetBlockNumber(buf) == 0);
  
  	page = BufferGetPage(buf);
  
--- 315,321 ----
  	/* Initialize first page of relation with special magic number */
  
  	buf = ReadBuffer(rel, P_NEW);
! 	Assert(BufferGetBlockNumber(buf) == RelationFirstNonMetapage(rel));
  
  	page = BufferGetPage(buf);
  
***************
*** 323,329 **** fill_seq_with_data(Relation rel, HeapTuple tuple)
  	sm->magic = SEQ_MAGIC;
  
  	/* hack: ensure heap_insert will insert on the just-created page */
! 	RelationSetTargetBlock(rel, 0);
  
  	/* Now insert sequence tuple */
  	simple_heap_insert(rel, tuple);
--- 324,330 ----
  	sm->magic = SEQ_MAGIC;
  
  	/* hack: ensure heap_insert will insert on the just-created page */
! 	RelationSetTargetBlock(rel, BufferGetBlockNumber(buf));
  
  	/* Now insert sequence tuple */
  	simple_heap_insert(rel, tuple);
***************
*** 1076,1082 **** read_info(SeqTable elm, Relation rel, Buffer *buf)
  	sequence_magic *sm;
  	Form_pg_sequence seq;
  
! 	*buf = ReadBuffer(rel, 0);
  	LockBuffer(*buf, BUFFER_LOCK_EXCLUSIVE);
  
  	page = BufferGetPage(*buf);
--- 1077,1083 ----
  	sequence_magic *sm;
  	Form_pg_sequence seq;
  
! 	*buf = ReadBuffer(rel, RelationFirstNonMetapage(rel));
  	LockBuffer(*buf, BUFFER_LOCK_EXCLUSIVE);
  
  	page = BufferGetPage(*buf);
*** a/src/backend/commands/tablecmds.c
--- b/src/backend/commands/tablecmds.c
***************
*** 1155,1162 **** ExecuteTruncate(TruncateStmt *stmt)
  			 * deletion at commit.
  			 */
  			RelationSetNewRelfilenode(rel, RecentXmin);
! 			if (rel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED)
! 				heap_create_init_fork(rel);
  
  			heap_relid = RelationGetRelid(rel);
  			toast_relid = rel->rd_rel->reltoastrelid;
--- 1155,1161 ----
  			 * deletion at commit.
  			 */
  			RelationSetNewRelfilenode(rel, RecentXmin);
! 			heap_create_storage(rel);
  
  			heap_relid = RelationGetRelid(rel);
  			toast_relid = rel->rd_rel->reltoastrelid;
***************
*** 1168,1175 **** ExecuteTruncate(TruncateStmt *stmt)
  			{
  				rel = relation_open(toast_relid, AccessExclusiveLock);
  				RelationSetNewRelfilenode(rel, RecentXmin);
! 				if (rel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED)
! 					heap_create_init_fork(rel);
  				heap_close(rel, NoLock);
  			}
  
--- 1167,1173 ----
  			{
  				rel = relation_open(toast_relid, AccessExclusiveLock);
  				RelationSetNewRelfilenode(rel, RecentXmin);
! 				heap_create_storage(rel);
  				heap_close(rel, NoLock);
  			}
  
*** a/src/backend/commands/vacuumlazy.c
--- b/src/backend/commands/vacuumlazy.c
***************
*** 39,44 ****
--- 39,45 ----
  
  #include "access/genam.h"
  #include "access/heapam.h"
+ #include "access/metapage.h"
  #include "access/transam.h"
  #include "access/visibilitymap.h"
  #include "catalog/storage.h"
***************
*** 360,366 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
  			   Relation *Irel, int nindexes, bool scan_all)
  {
  	BlockNumber nblocks,
! 				blkno;
  	HeapTupleData tuple;
  	char	   *relname;
  	BlockNumber empty_pages,
--- 361,368 ----
  			   Relation *Irel, int nindexes, bool scan_all)
  {
  	BlockNumber nblocks,
! 				blkno,
! 				startblock;
  	HeapTupleData tuple;
  	char	   *relname;
  	BlockNumber empty_pages,
***************
*** 398,403 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
--- 400,407 ----
  
  	lazy_space_alloc(vacrelstats, nblocks);
  
+ 	startblock = RelationFirstNonMetapage(onerel);
+ 
  	/*
  	 * We want to skip pages that don't require vacuuming according to the
  	 * visibility map, but only when we can skip at least SKIP_PAGES_THRESHOLD
***************
*** 429,435 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
  	 * them.  If we make the reverse mistake and vacuum a page unnecessarily,
  	 * it'll just be a no-op.
  	 */
! 	for (next_not_all_visible_block = 0;
  		 next_not_all_visible_block < nblocks;
  		 next_not_all_visible_block++)
  	{
--- 433,439 ----
  	 * them.  If we make the reverse mistake and vacuum a page unnecessarily,
  	 * it'll just be a no-op.
  	 */
! 	for (next_not_all_visible_block = startblock;
  		 next_not_all_visible_block < nblocks;
  		 next_not_all_visible_block++)
  	{
***************
*** 442,448 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
  	else
  		skipping_all_visible_blocks = false;
  
! 	for (blkno = 0; blkno < nblocks; blkno++)
  	{
  		Buffer		buf;
  		Page		page;
--- 446,452 ----
  	else
  		skipping_all_visible_blocks = false;
  
! 	for (blkno = startblock; blkno < nblocks; blkno++)
  	{
  		Buffer		buf;
  		Page		page;
*** a/src/backend/storage/freespace/freespace.c
--- b/src/backend/storage/freespace/freespace.c
***************
*** 24,29 ****
--- 24,30 ----
  #include "postgres.h"
  
  #include "access/htup.h"
+ #include "access/metapage.h"
  #include "access/xlogutils.h"
  #include "miscadmin.h"
  #include "storage/freespace.h"
***************
*** 182,187 **** RecordPageWithFreeSpace(Relation rel, BlockNumber heapBlk, Size spaceAvail)
--- 183,191 ----
  	FSMAddress	addr;
  	uint16		slot;
  
+ 	/* No free space in the metablock! */
+ 	Assert(heapBlk != METAPAGE_BLKNO || !RelationHasMetapage(rel));
+ 
  	/* Get the location of the FSM byte representing the heap block */
  	addr = fsm_get_location(heapBlk, &slot);
  
*** a/src/backend/utils/cache/relcache.c
--- b/src/backend/utils/cache/relcache.c
***************
*** 30,35 ****
--- 30,36 ----
  #include <fcntl.h>
  #include <unistd.h>
  
+ #include "access/metapage.h"
  #include "access/reloptions.h"
  #include "access/sysattr.h"
  #include "access/transam.h"
***************
*** 1121,1126 **** RelationInitIndexAccessInfo(Relation relation)
--- 1122,1128 ----
  	relation->rd_exclprocs = NULL;
  	relation->rd_exclstrats = NULL;
  	relation->rd_amcache = NULL;
+ 	relation->rd_metapage = NULL;
  }
  
  /*
***************
*** 1675,1680 **** RelationReloadIndexInfo(Relation relation)
--- 1677,1687 ----
  		pfree(relation->rd_amcache);
  	relation->rd_amcache = NULL;
  
+ 	/* Must free any metapage upon relcache flush */
+ 	if (relation->rd_metapage)
+ 		pfree(relation->rd_metapage);
+ 	relation->rd_metapage = NULL;
+ 
  	/*
  	 * If it's a shared index, we might be called before backend startup has
  	 * finished selecting a database, in which case we have no way to read
***************
*** 1774,1779 **** RelationDestroyRelation(Relation relation)
--- 1781,1788 ----
  	list_free(relation->rd_indexlist);
  	bms_free(relation->rd_indexattr);
  	FreeTriggerDesc(relation->trigdesc);
+ 	if (relation->rd_metapage)
+ 		pfree(relation->rd_metapage);
  	if (relation->rd_options)
  		pfree(relation->rd_options);
  	if (relation->rd_indextuple)
***************
*** 2420,2425 **** RelationBuildLocalRelation(const char *relname,
--- 2429,2447 ----
  	bool		nailit;
  
  	AssertArg(natts >= 0);
+ 	Assert(OidIsValid(relfilenode));
+ 
+ 	/*
+ 	 * Never allow a pg_class entry to explicitly specify the database's
+ 	 * default tablespace in reltablespace; force it to zero instead. This
+ 	 * ensures that if the database is cloned with a different default
+ 	 * tablespace, the pg_class entry will still match where CREATE DATABASE
+ 	 * will put the physically copied relation.
+ 	 *
+ 	 * Yes, this is a bit of a hack.
+ 	 */
+ 	if (reltablespace == MyDatabaseTableSpace)
+ 		reltablespace = InvalidOid;
  
  	/*
  	 * check for creation of a rel that must be nailed in cache.
***************
*** 4194,4199 **** load_relcache_init_file(bool shared)
--- 4216,4222 ----
  		rel->rd_createSubid = InvalidSubTransactionId;
  		rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
  		rel->rd_amcache = NULL;
+ 		rel->rd_metapage = NULL;
  		MemSet(&rel->pgstat_info, 0, sizeof(rel->pgstat_info));
  
  		/*
*** a/src/include/access/gin_private.h
--- b/src/include/access/gin_private.h
***************
*** 13,18 ****
--- 13,19 ----
  #include "access/genam.h"
  #include "access/gin.h"
  #include "access/itup.h"
+ #include "access/metapage.h"
  #include "fmgr.h"
  #include "storage/bufmgr.h"
  #include "utils/rbtree.h"
***************
*** 50,56 **** typedef GinPageOpaqueData *GinPageOpaque;
  #define GIN_LIST_FULLROW  (1 << 5)		/* makes sense only on GIN_LIST page */
  
  /* Page numbers of fixed-location pages */
! #define GIN_METAPAGE_BLKNO	(0)
  #define GIN_ROOT_BLKNO		(1)
  
  typedef struct GinMetaPageData
--- 51,57 ----
  #define GIN_LIST_FULLROW  (1 << 5)		/* makes sense only on GIN_LIST page */
  
  /* Page numbers of fixed-location pages */
! #define GIN_METAPAGE_BLKNO	METAPAGE_BLKNO
  #define GIN_ROOT_BLKNO		(1)
  
  typedef struct GinMetaPageData
***************
*** 99,105 **** typedef struct GinMetaPageData
  #define GIN_CURRENT_VERSION		1
  
  #define GinPageGetMeta(p) \
! 	((GinMetaPageData *) PageGetContents(p))
  
  /*
   * Macros for accessing a GIN index page's opaque data
--- 100,106 ----
  #define GIN_CURRENT_VERSION		1
  
  #define GinPageGetMeta(p) \
! 	((GinMetaPageData *) GetAccessMethodMeta(p))
  
  /*
   * Macros for accessing a GIN index page's opaque data
***************
*** 314,320 **** typedef struct GinState
  
  /* XLog stuff */
  
! #define XLOG_GIN_CREATE_INDEX  0x00
  
  #define XLOG_GIN_CREATE_PTREE  0x10
  
--- 315,321 ----
  
  /* XLog stuff */
  
! /* 0x00 is free, was XLOG_GIN_CREATE_INDEX */
  
  #define XLOG_GIN_CREATE_PTREE  0x10
  
***************
*** 433,439 **** extern void initGinState(GinState *state, Relation index);
  extern Buffer GinNewBuffer(Relation index);
  extern void GinInitBuffer(Buffer b, uint32 f);
  extern void GinInitPage(Page page, uint32 f, Size pageSize);
! extern void GinInitMetabuffer(Buffer b);
  extern int ginCompareEntries(GinState *ginstate, OffsetNumber attnum,
  				  Datum a, GinNullCategory categorya,
  				  Datum b, GinNullCategory categoryb);
--- 434,440 ----
  extern Buffer GinNewBuffer(Relation index);
  extern void GinInitBuffer(Buffer b, uint32 f);
  extern void GinInitPage(Page page, uint32 f, Size pageSize);
! extern void GinInitMetabuffer(Relation rel, Buffer b);
  extern int ginCompareEntries(GinState *ginstate, OffsetNumber attnum,
  				  Datum a, GinNullCategory categorya,
  				  Datum b, GinNullCategory categoryb);
*** a/src/include/access/gist.h
--- b/src/include/access/gist.h
***************
*** 62,67 ****
--- 62,68 ----
  #define F_DELETED			(1 << 1)	/* the page has been deleted */
  #define F_TUPLES_DELETED	(1 << 2)	/* some tuples on the page are dead */
  #define F_FOLLOW_RIGHT		(1 << 3)	/* page to the right has no downlink */
+ #define F_ROOT				(1 << 4)	/* root page - not set prior to 9.3 */
  
  typedef XLogRecPtr GistNSN;
  
***************
*** 138,143 **** typedef struct GISTENTRY
--- 139,153 ----
  #define GistClearFollowRight(page)	( GistPageGetOpaque(page)->flags &= ~F_FOLLOW_RIGHT)
  
  /*
+  * Prior to PostgreSQL 9.3, the F_ROOT flag did not exist; the root page was
+  * at a fixed location, GIST_OLD_ROOT_BLKNO.  This block now contains the
+  * metapage.  To accommodate pg_upgrade from earlier server versions, the
+  * correct test for whether you've got the root block is therefore blkno == 0
+  * || GistPageIsRoot(page).
+  */
+ #define GistPageIsRoot(page)	( GistPageGetOpaque(page)->flags & F_ROOT)
+ 
+ /*
   * Vector of GISTENTRY structs; user-defined methods union and picksplit
   * take it as one of their arguments
   */
*** a/src/include/access/gist_private.h
--- b/src/include/access/gist_private.h
***************
*** 16,21 ****
--- 16,22 ----
  
  #include "access/gist.h"
  #include "access/itup.h"
+ #include "access/metapage.h"
  #include "fmgr.h"
  #include "storage/bufmgr.h"
  #include "storage/buffile.h"
***************
*** 211,216 **** typedef struct gistxlogPage
--- 212,223 ----
  	int			num;			/* number of index tuples following */
  } gistxlogPage;
  
+ typedef struct gistxlogCreateIndex
+ {
+ 	RelFileNode node;
+ 	BlockNumber blkno;
+ } gistxlogCreateIndex;
+ 
  typedef struct gistxlogPageDelete
  {
  	RelFileNode node;
***************
*** 278,285 **** typedef struct
  	GISTInsertStack *stack;
  } GISTInsertState;
  
! /* root page of a gist index */
! #define GIST_ROOT_BLKNO				0
  
  /*
   * Before PostgreSQL 9.1, we used rely on so-called "invalid tuples" on inner
--- 285,296 ----
  	GISTInsertStack *stack;
  } GISTInsertState;
  
! /*
!  * Before PostgreSQL 9.3, the root block was always at block 0.  Now, the
!  * metapage is at block 0, and it contains the location of the root block.
!  */
! #define GIST_OLD_ROOT_BLKNO				0
! #define GIST_METAPAGE_BLKNO				METAPAGE_BLKNO
  
  /*
   * Before PostgreSQL 9.1, we used rely on so-called "invalid tuples" on inner
***************
*** 461,472 **** extern Datum gistgetbitmap(PG_FUNCTION_ARGS);
--- 472,502 ----
  
  /* gistutil.c */
  
+ #define	GIST_MAGIC			0x23c1eb95
+ #define	GIST_VERSION		1
+ 
+ typedef struct GistMetaPageData
+ {
+ 	uint32          gist_magic;			/* should contain GIST_MAGIC */
+ 	uint32          gist_version;		/* should contain GIST_VERSION */
+ 	BlockNumber		gist_root;			/* current root location */
+ } GistMetaPageData;
+ 
+ /*
+  * Prior to PostgreSQL 9.3, GiST indexes had no metapage.  Thus, this should
+  * only ever get called on a relation metapage.
+  */
+ #define GistPageGetMeta(p) \
+         (AssertMacro(PageIsRelationMetapage(p)), \
+ 		 (GistMetaPageData *) BlindGetAccessMethodMeta(p))
+ 
  #define GiSTPageSize   \
  	( BLCKSZ - SizeOfPageHeaderData - MAXALIGN(sizeof(GISTPageOpaqueData)) )
  
  #define GIST_MIN_FILLFACTOR			10
  #define GIST_DEFAULT_FILLFACTOR		90
  
+ extern GistMetaPageData *GistMetaData(Relation index);
  extern Datum gistoptions(PG_FUNCTION_ARGS);
  extern bool gistfitpage(IndexTuple *itvec, int len);
  extern bool gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete, Size freespace);
*** a/src/include/access/hash.h
--- b/src/include/access/hash.h
***************
*** 19,24 ****
--- 19,25 ----
  
  #include "access/genam.h"
  #include "access/itup.h"
+ #include "access/metapage.h"
  #include "access/sdir.h"
  #include "access/xlog.h"
  #include "fmgr.h"
***************
*** 113,119 **** typedef HashScanOpaqueData *HashScanOpaque;
   * Definitions for metapage.
   */
  
! #define HASH_METAPAGE	0		/* metapage is always block 0 */
  
  #define HASH_MAGIC		0x6440640
  #define HASH_VERSION	2		/* 2 signifies only hash key value is stored */
--- 114,120 ----
   * Definitions for metapage.
   */
  
! #define HASH_METAPAGE	METAPAGE_BLKNO	/* metapage is always block 0 */
  
  #define HASH_MAGIC		0x6440640
  #define HASH_VERSION	2		/* 2 signifies only hash key value is stored */
***************
*** 204,210 **** typedef HashMetaPageData *HashMetaPage;
  	 (MAXALIGN(SizeOfPageHeaderData) + MAXALIGN(sizeof(HashPageOpaqueData))))
  
  #define HashPageGetMeta(page) \
! 	((HashMetaPage) PageGetContents(page))
  
  /*
   * The number of bits in an ovflpage bitmap word.
--- 205,211 ----
  	 (MAXALIGN(SizeOfPageHeaderData) + MAXALIGN(sizeof(HashPageOpaqueData))))
  
  #define HashPageGetMeta(page) \
! 	((HashMetaPage) GetAccessMethodMeta(page))
  
  /*
   * The number of bits in an ovflpage bitmap word.
*** /dev/null
--- b/src/include/access/metapage.h
***************
*** 0 ****
--- 1,86 ----
+ /*-------------------------------------------------------------------------
+  *
+  * metapage.h
+  *	  Generic relation metapage support.
+  *
+  * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994, Regents of the University of California
+  *
+  * src/include/access/metapage.h
+  *
+  *-------------------------------------------------------------------------
+  */
+ #ifndef METAPAGE_H
+ #define METAPAGE_H
+ 
+ #include "pgtime.h"
+ #include "storage/bufpage.h"
+ #include "utils/rel.h"
+ 
+ #define METAPAGE_MAGIC				0x518f912a
+ #define METAPAGE_VERSION			1
+ #define METAPAGE_BLKNO				0
+ 
+ /*
+  * Metadata that is common to all relation types.  This information is stored
+  * at the beginning of each page, following the page-header, at the address
+  * returned by PageGetContents().
+  */
+ typedef struct RelationMetapageData
+ {
+ 	uint32          rmp_magic;			/* should contain METAPAGE_MAGIC */
+ 	uint32          rmp_version;		/* should contain METAPAGE_VERSION */
+ 	Oid				rmp_dboid;			/* database OID */
+ 	Oid				rmp_tsoid;			/* tablespace OID */
+ 	Oid				rmp_reloid;			/* relation OID */
+ 	Oid				rmp_relfilenode;	/* relation relfilenode */
+ 	uint32			rmp_flags;			/* relation-level flag bits */
+ 	uint16			rmp_minlayout;		/* oldest page layout version in rel */
+ 	uint16			rmp_maxlayout;		/* newest page layout version in rel */
+ 	pg_time_t		rmp_relfilenode_time; /* time relfilenode created */
+ } RelationMetapageData;
+ 
+ typedef RelationMetapageData *RelationMetapage;
+ 
+ /*
+  * Metadata that is specific to a particular access method is stored later
+  * in the page, following the common metadata.  In order to allow for future
+  * expansion of the common metadata, we start the access-method specific
+  * metadata 512 bytes from the beginning of the page.  That way, future
+  * versions of RelationMetapageData can be larger without affecting the
+  * placement of data on the page.
+  *
+  * Prior to the introduction of metapages for all relations, access methods
+  * that had metapages stored their private metadata at the offset where
+  * common metadata is now stored.  Code that needs to read both old and new
+  * metapages can use GetAccessMethodMeta() to locate their data at whichever
+  * address it's stored, while code that knows it is looking at a new-format
+  * metapage can use BlindGetAccessMethodMeta() for efficiency.
+  */
+ #define BlindGetRelationMeta(page) \
+ 	((RelationMetapage) PageGetContents((page)))
+ #define ACCESS_METHOD_META_OFFSET 512
+ #define BlindGetAccessMethodMeta(page) \
+ 	(((char *) (page)) + ACCESS_METHOD_META_OFFSET)
+ #define GetRelationMeta(page) \
+ 	(PageIsRelationMetapage((page)) ? BlindGetRelationMeta((page)) : NULL)
+ #define GetAccessMethodMeta(page) \
+ 	(PageIsRelationMetapage((page)) ? BlindGetAccessMethodMeta((page)) \
+ 		: PageGetContents((page)))
+ 
+ /*
+  * Function prototypes.
+  */
+ extern void MetapageInit(Relation relation, Page page);
+ extern void RelationBuildMetadata(Relation relation);
+ extern RelationMetapage RelationGetMetadata(Relation rel);
+ 
+ /*
+  * Useful macros.
+  */
+ #define RelationHasMetapage(relation) \
+ 	(RelationGetMetadata((relation))->rmp_version != 0)
+ #define RelationFirstNonMetapage(relation) \
+ 	(RelationHasMetapage((relation)) ? 1 : 0)
+ 
+ #endif   /* METAPAGE_H */
*** a/src/include/access/nbtree.h
--- b/src/include/access/nbtree.h
***************
*** 16,21 ****
--- 16,22 ----
  
  #include "access/genam.h"
  #include "access/itup.h"
+ #include "access/metapage.h"
  #include "access/sdir.h"
  #include "access/xlog.h"
  #include "access/xlogutils.h"
***************
*** 102,110 **** typedef struct BTMetaPageData
  } BTMetaPageData;
  
  #define BTPageGetMeta(p) \
! 	((BTMetaPageData *) PageGetContents(p))
  
! #define BTREE_METAPAGE	0		/* first page is meta */
  #define BTREE_MAGIC		0x053162	/* magic number of btree pages */
  #define BTREE_VERSION	2		/* current version number */
  
--- 103,111 ----
  } BTMetaPageData;
  
  #define BTPageGetMeta(p) \
! 	((BTMetaPageData *) GetAccessMethodMeta(p))
  
! #define BTREE_METAPAGE	METAPAGE_BLKNO
  #define BTREE_MAGIC		0x053162	/* magic number of btree pages */
  #define BTREE_VERSION	2		/* current version number */
  
***************
*** 622,628 **** extern void _bt_insert_parent(Relation rel, Buffer buf, Buffer rbuf,
  /*
   * prototypes for functions in nbtpage.c
   */
! extern void _bt_initmetapage(Page page, BlockNumber rootbknum, uint32 level);
  extern Buffer _bt_getroot(Relation rel, int access);
  extern Buffer _bt_gettrueroot(Relation rel);
  extern void _bt_checkpage(Relation rel, Buffer buf);
--- 623,630 ----
  /*
   * prototypes for functions in nbtpage.c
   */
! extern void _bt_initmetapage(Relation rel, Page page, BlockNumber rootbknum,
! 				 uint32 level);
  extern Buffer _bt_getroot(Relation rel, int access);
  extern Buffer _bt_gettrueroot(Relation rel);
  extern void _bt_checkpage(Relation rel, Buffer buf);
*** a/src/include/access/relscan.h
--- b/src/include/access/relscan.h
***************
*** 33,39 **** typedef struct HeapScanDescData
  	bool		rs_allow_sync;	/* allow or disallow use of syncscan */
  
  	/* state set up at initscan time */
! 	BlockNumber rs_nblocks;		/* number of blocks to scan */
  	BlockNumber rs_startblock;	/* block # to start at */
  	BufferAccessStrategy rs_strategy;	/* access strategy for reads */
  	bool		rs_syncscan;	/* report location to syncscan logic? */
--- 33,40 ----
  	bool		rs_allow_sync;	/* allow or disallow use of syncscan */
  
  	/* state set up at initscan time */
! 	BlockNumber	rs_lowpage;		/* lowest page number to scan */
! 	BlockNumber rs_nblocks;		/* highest page to scan, plus one */
  	BlockNumber rs_startblock;	/* block # to start at */
  	BufferAccessStrategy rs_strategy;	/* access strategy for reads */
  	bool		rs_syncscan;	/* report location to syncscan logic? */
*** a/src/include/access/spgist_private.h
--- b/src/include/access/spgist_private.h
***************
*** 15,27 ****
  #define SPGIST_PRIVATE_H
  
  #include "access/itup.h"
  #include "access/spgist.h"
  #include "nodes/tidbitmap.h"
  #include "utils/rel.h"
  
  
  /* Page numbers of fixed-location pages */
! #define SPGIST_METAPAGE_BLKNO	 (0)	/* metapage */
  #define SPGIST_ROOT_BLKNO		 (1)	/* root for normal entries */
  #define SPGIST_NULL_BLKNO		 (2)	/* root for null-value entries */
  #define SPGIST_LAST_FIXED_BLKNO  SPGIST_NULL_BLKNO
--- 15,28 ----
  #define SPGIST_PRIVATE_H
  
  #include "access/itup.h"
+ #include "access/metapage.h"
  #include "access/spgist.h"
  #include "nodes/tidbitmap.h"
  #include "utils/rel.h"
  
  
  /* Page numbers of fixed-location pages */
! #define SPGIST_METAPAGE_BLKNO	 METAPAGE_BLKNO	/* metapage */
  #define SPGIST_ROOT_BLKNO		 (1)	/* root for normal entries */
  #define SPGIST_NULL_BLKNO		 (2)	/* root for null-value entries */
  #define SPGIST_LAST_FIXED_BLKNO  SPGIST_NULL_BLKNO
***************
*** 97,103 **** typedef struct SpGistMetaPageData
  #define SPGIST_MAGIC_NUMBER (0xBA0BABEE)
  
  #define SpGistPageGetMeta(p) \
! 	((SpGistMetaPageData *) PageGetContents(p))
  
  /*
   * Private state of index AM.  SpGistState is common to both insert and
--- 98,104 ----
  #define SPGIST_MAGIC_NUMBER (0xBA0BABEE)
  
  #define SpGistPageGetMeta(p) \
! 	((SpGistMetaPageData *) GetAccessMethodMeta(p))
  
  /*
   * Private state of index AM.  SpGistState is common to both insert and
***************
*** 624,630 **** extern Buffer SpGistGetBuffer(Relation index, int flags,
  extern void SpGistSetLastUsedPage(Relation index, Buffer buffer);
  extern void SpGistInitPage(Page page, uint16 f);
  extern void SpGistInitBuffer(Buffer b, uint16 f);
! extern void SpGistInitMetapage(Page page);
  extern unsigned int SpGistGetTypeSize(SpGistTypeDesc *att, Datum datum);
  extern SpGistLeafTuple spgFormLeafTuple(SpGistState *state,
  				 ItemPointer heapPtr,
--- 625,631 ----
  extern void SpGistSetLastUsedPage(Relation index, Buffer buffer);
  extern void SpGistInitPage(Page page, uint16 f);
  extern void SpGistInitBuffer(Buffer b, uint16 f);
! extern void SpGistInitMetapage(Relation index, Page page);
  extern unsigned int SpGistGetTypeSize(SpGistTypeDesc *att, Datum datum);
  extern SpGistLeafTuple spgFormLeafTuple(SpGistState *state,
  				 ItemPointer heapPtr,
*** a/src/include/catalog/heap.h
--- b/src/include/catalog/heap.h
***************
*** 41,47 **** extern Relation heap_create(const char *relname,
  			Oid relnamespace,
  			Oid reltablespace,
  			Oid relid,
- 			Oid relfilenode,
  			TupleDesc tupDesc,
  			char relkind,
  			char relpersistence,
--- 41,46 ----
***************
*** 68,74 **** extern Oid heap_create_with_catalog(const char *relname,
  						 bool use_user_acl,
  						 bool allow_system_table_mods);
  
! extern void heap_create_init_fork(Relation rel);
  
  extern void heap_drop_with_catalog(Oid relid);
  
--- 67,73 ----
  						 bool use_user_acl,
  						 bool allow_system_table_mods);
  
! extern void heap_create_storage(Relation rel);
  
  extern void heap_drop_with_catalog(Oid relid);
  
*** a/src/include/storage/bufpage.h
--- b/src/include/storage/bufpage.h
***************
*** 154,161 **** typedef PageHeaderData *PageHeader;
  										 * tuple? */
  #define PD_ALL_VISIBLE		0x0004		/* all tuples on page are visible to
  										 * everyone */
  
! #define PD_VALID_FLAG_BITS	0x0007		/* OR of all valid pd_flags bits */
  
  /*
   * Page layout version number 0 is for pre-7.3 Postgres releases.
--- 154,162 ----
  										 * tuple? */
  #define PD_ALL_VISIBLE		0x0004		/* all tuples on page are visible to
  										 * everyone */
+ #define PD_RELATION_METAPAGE 0x0008		/* page is a relation metapage */
  
! #define PD_VALID_FLAG_BITS	0x000F		/* OR of all valid pd_flags bits */
  
  /*
   * Page layout version number 0 is for pre-7.3 Postgres releases.
***************
*** 345,350 **** typedef PageHeaderData *PageHeader;
--- 346,356 ----
  #define PageClearAllVisible(page) \
  	(((PageHeader) (page))->pd_flags &= ~PD_ALL_VISIBLE)
  
+ #define PageIsRelationMetapage(page) \
+ 	(((PageHeader) (page))->pd_flags & PD_RELATION_METAPAGE)
+ #define PageSetRelationMetapage(page) \
+ 	(((PageHeader) (page))->pd_flags |= PD_RELATION_METAPAGE)
+ 
  #define PageIsPrunable(page, oldestxmin) \
  ( \
  	AssertMacro(TransactionIdIsNormal(oldestxmin)), \
*** a/src/include/utils/rel.h
--- b/src/include/utils/rel.h
***************
*** 109,114 **** typedef struct RelationData
--- 109,116 ----
  	RuleLock   *rd_rules;		/* rewrite rules */
  	MemoryContext rd_rulescxt;	/* private memory cxt for rd_rules, if any */
  	TriggerDesc *trigdesc;		/* Trigger info, or NULL if rel has none */
+ 	/* use "struct" here to avoid needing to include metapage.h: */
+ 	struct RelationMetapageData *rd_metapage;	/* Relation metapage data */
  
  	/*
  	 * rd_options is set whenever rd_rel is loaded into the relcache entry.
#2Heikki Linnakangas
heikki.linnakangas@enterprisedb.com
In reply to: Robert Haas (#1)
Re: WIP: relation metapages

On 14.06.2012 18:01, Robert Haas wrote:

What I'm really looking for at this stage of the game is feedback on
the design decisions I made. The intention here is that it should be
possible to read old-format heaps and indexes transparently, but that
when we create or rewrite a relation, we add a new-style metapage.

That dodges the problem of pg_upgrade, but eventually, we will want to
use the metapage for something important, and it can't be optional at
that point anymore. So we will eventually need to deal with pg_upgrade
somehow.

I put the new metapage code in src/backend/access/common/metapage.c,
but I don't have a lot of confidence that that's the appropriate
location for it. Suggestions are appreciated.

That seems good to me.

It would be nice to have the oid of the access method in the metapage
(or some other way to identify it).

--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com

#3Robert Haas
robertmhaas@gmail.com
In reply to: Heikki Linnakangas (#2)
Re: WIP: relation metapages

On Thu, Jun 14, 2012 at 5:34 PM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:

On 14.06.2012 18:01, Robert Haas wrote:

What I'm really looking for at this stage of the game is feedback on
the design decisions I made.  The intention here is that it should be
possible to read old-format heaps and indexes transparently, but that
when we create or rewrite a relation, we add a new-style metapage.

That dodges the problem of pg_upgrade, but eventually, we will want to use
the metapage for something important, and it can't be optional at that point
anymore. So we will eventually need to deal with pg_upgrade somehow.

Well, the code as I've written deals with pg_upgrade just fine: you
can move your old relation files over and they still work. What's
missing at present is an efficient way to convert them to the new
format. If you don't mind the inefficiency, you can use VACUUM FULL
or CLUSTER, and you're there, but obviously we'll want something
better before we start relying on this for anything too critical. For
indexes, it should be pretty trivial to reduce the requirement from
"rewrite while holding AccessExclusiveLock" to "brief
AccessExclusiveLock". Everything except GiST already has a metapage,
so you just need to rewrite that page in the new format, which is a
SMOP. GiST requires moving the existing metapage out to a free page
(or a new page) and writing a metapage pointing to it into block 0,
which is again pretty simple. Heaps are a bit harder. We could adopt
your idea of storing a block number in the metablock; any index TIDs
that point to block 0 will be interpreted as pointing to that block
instead. Then we can basically just relocate block to any free block,
or a new one, as with GiST. Alternatively, we can read the tuples in
block 0 and reinsert them; vacuum; and then repurpose block 0. Taking
it even further, we could try to do better than "brief
AccessExclusiveLock". That might be possible too, especially for
indexes, but I'm not sure that it's necessary, and I haven't thought
through the details.

Even if we just get it down to "brief AccessExclusiveLock", I think we
might also be able to improve the experience by making autovacuum do
conversions automatically. So, if you pg_upgrade, the first
autovacuum worker that spins through the database will
ConditionalLockAcquire an AccessExclusiveLock on each relation in turn
and try to do the conversion. If it can't get the lock it'll keep
retrying until it succeeds. Odds are good that for most people this
would make the addition of the metapage completely transparent. On
the other hand, if metapages exist mostly to support operations like
making an unlogged table logged or visca versa, that's not really
necessary: we can add the metapage when someone performs a DDL
operation that requires it. There is a lot of optimization possible
on top of the basic mechanism, but how much of it makes sense to do,
and which parts, depends on exactly which of the many things we could
do with this we end up deciding to actually do. I'm trying to start
with the basics, which means getting the basic infrastructure in place
and working.

It would be nice to have the oid of the access method in the metapage (or
some other way to identify it).

Yeah, I was thinking about that. I think a magic number might be
preferable to an OID, and we actually already have that as the first
4-bytes of the access method metadata for all index types except GIN.
I'm thinking about trying to fix up GIN so that it adds one as well;
the trick is to do it in a way that is backward-compatible, which I
have an idea how to do but haven't tackled yet. We can add a magic
number for heaps as well.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company