From 05d1971d0f4f0f42899f5d6857892128487eeb40 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyoga.ntt@gmail.com>
Date: Wed, 11 Nov 2020 21:51:11 +0900
Subject: [PATCH v3 1/2] In-place table persistence change

Even though ALTER TABLE SET LOGGED/UNLOGGED does not require data
rewriting, currently it runs heap rewrite which causes large amount of
file I/O.  This patch makes the command run without heap rewrite.
Addition to that, SET LOGGED while wal_level > minimal emits WAL using
XLOG_FPI instead of massive number of HEAP_INSERT's, which should be
smaller.
---
 src/backend/access/rmgrdesc/smgrdesc.c |  23 ++
 src/backend/catalog/storage.c          | 355 +++++++++++++++++++++++--
 src/backend/commands/tablecmds.c       | 217 ++++++++++++---
 src/backend/storage/buffer/bufmgr.c    |  88 ++++++
 src/backend/storage/file/reinit.c      | 206 ++++++++------
 src/backend/storage/smgr/smgr.c        |   6 +
 src/common/relpath.c                   |   3 +-
 src/include/catalog/storage.h          |   2 +
 src/include/catalog/storage_xlog.h     |  16 ++
 src/include/common/relpath.h           |   5 +-
 src/include/storage/bufmgr.h           |   4 +
 src/include/storage/smgr.h             |   1 +
 12 files changed, 784 insertions(+), 142 deletions(-)

diff --git a/src/backend/access/rmgrdesc/smgrdesc.c b/src/backend/access/rmgrdesc/smgrdesc.c
index a7c0cb1bc3..097dacfee6 100644
--- a/src/backend/access/rmgrdesc/smgrdesc.c
+++ b/src/backend/access/rmgrdesc/smgrdesc.c
@@ -40,6 +40,23 @@ smgr_desc(StringInfo buf, XLogReaderState *record)
 						 xlrec->blkno, xlrec->flags);
 		pfree(path);
 	}
+	else if (info == XLOG_SMGR_UNLINK)
+	{
+		xl_smgr_unlink *xlrec = (xl_smgr_unlink *) rec;
+		char	   *path = relpathperm(xlrec->rnode, xlrec->forkNum);
+
+		appendStringInfoString(buf, path);
+		pfree(path);
+	}
+	else if (info == XLOG_SMGR_BUFPERSISTENCE)
+	{
+		xl_smgr_bufpersistence *xlrec = (xl_smgr_bufpersistence *) rec;
+		char	   *path = relpathperm(xlrec->rnode, MAIN_FORKNUM);
+
+		appendStringInfoString(buf, path);
+		appendStringInfo(buf, " persistence %d", xlrec->persistence);
+		pfree(path);
+	}
 }
 
 const char *
@@ -55,6 +72,12 @@ smgr_identify(uint8 info)
 		case XLOG_SMGR_TRUNCATE:
 			id = "TRUNCATE";
 			break;
+		case XLOG_SMGR_UNLINK:
+			id = "UNLINK";
+			break;
+		case XLOG_SMGR_BUFPERSISTENCE:
+			id = "BUFPERSISTENCE";
+			break;
 	}
 
 	return id;
diff --git a/src/backend/catalog/storage.c b/src/backend/catalog/storage.c
index d538f25726..0f1649758f 100644
--- a/src/backend/catalog/storage.c
+++ b/src/backend/catalog/storage.c
@@ -19,6 +19,7 @@
 
 #include "postgres.h"
 
+#include "access/amapi.h"
 #include "access/parallel.h"
 #include "access/visibilitymap.h"
 #include "access/xact.h"
@@ -57,9 +58,19 @@ int			wal_skip_threshold = 2048;	/* in kilobytes */
  * but I'm being paranoid.
  */
 
+
+/* This is bit-map, not ordianal numbers  */
+#define	PDOP_DELETE				0x00
+#define	PDOP_UNLINK_FORK		0x01
+#define	PDOP_SET_PERSISTENCE	0x02
+
+
 typedef struct PendingRelDelete
 {
 	RelFileNode relnode;		/* relation that may need to be deleted */
+	int			op;				/* operation mask */
+	bool		bufpersistence;	/* buffer persistence to set */
+	int			unlink_forknum;	/* forknum to unlink */
 	BackendId	backend;		/* InvalidBackendId if not a temp rel */
 	bool		atCommit;		/* T=delete at commit; F=delete at abort */
 	int			nestLevel;		/* xact nesting level of request */
@@ -153,6 +164,7 @@ RelationCreateStorage(RelFileNode rnode, char relpersistence)
 	pending = (PendingRelDelete *)
 		MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
 	pending->relnode = rnode;
+	pending->op = PDOP_DELETE;
 	pending->backend = backend;
 	pending->atCommit = false;	/* delete if abort */
 	pending->nestLevel = GetCurrentTransactionNestLevel();
@@ -168,6 +180,209 @@ RelationCreateStorage(RelFileNode rnode, char relpersistence)
 	return srel;
 }
 
+/*
+ * RelationCreateInitFork
+ *		Create physical storage for the init fork of a relation.
+ *
+ * Create the init fork for the relation.
+ *
+ * This function is transactional. The creation is WAL-logged, and if the
+ * transaction aborts later on, the init fork will be removed.
+ */
+void
+RelationCreateInitFork(Relation rel)
+{
+	RelFileNode rnode = rel->rd_node;
+	PendingRelDelete *pending;
+	SMgrRelation srel;
+	PendingRelDelete *prev;
+	PendingRelDelete *next;
+	bool			  create = true;
+
+	/* switch buffer persistence */
+	SetRelationBuffersPersistence(rel->rd_smgr, false, false);
+
+	/*
+	 * If we have entries for init-fork operation of this relation, that means
+	 * that we have already registered pending sync entries to drop preexisting
+	 * init fork since before the current transaction started. This function
+	 * reverts that change just by removing the entries.
+	 */
+	prev = NULL;
+	for (pending = pendingDeletes; pending != NULL; pending = next)
+	{
+		next = pending->next;
+		if (RelFileNodeEquals(rnode, pending->relnode) &&
+			pending->op != PDOP_DELETE)
+		{
+			if (prev)
+				prev->next = next;
+			else
+				pendingDeletes = next;
+			pfree(pending);
+
+			create = false;
+		}
+		else
+		{
+			/* unrelated entry, don't touch it */
+			prev = pending;
+		}
+	}
+
+	if (!create)
+		return;
+
+	/* We don't have existing init fork, create it. */
+	srel = smgropen(rnode, InvalidBackendId);
+	smgrcreate(srel, INIT_FORKNUM, false);
+
+	/*
+	 * index-init fork needs further initialization. ambuildempty shoud do
+	 * WAL-log and file sync by itself but otherwise we do that by myself.
+	 */
+	if (rel->rd_rel->relkind == RELKIND_INDEX)
+		rel->rd_indam->ambuildempty(rel);
+	else
+	{
+		log_smgrcreate(&rnode, INIT_FORKNUM);
+		smgrimmedsync(srel, INIT_FORKNUM);
+	}
+
+	/*
+	 * We have created the init fork. If server crashes before the current
+	 * transaction ends the init fork left alone corrupts data while recovery.
+	 * The inittmp fork works as the sentinel to identify that situaton.
+	 */
+	smgrcreate(srel, INITTMP_FORKNUM, false);
+	log_smgrcreate(&rnode, INITTMP_FORKNUM);
+	smgrimmedsync(srel, INITTMP_FORKNUM);
+
+	/* drop this init fork file at abort and revert persistence */
+	pending = (PendingRelDelete *)
+		MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+	pending->relnode = rnode;
+	pending->op = PDOP_UNLINK_FORK | PDOP_SET_PERSISTENCE;
+	pending->unlink_forknum = INIT_FORKNUM;
+	pending->bufpersistence = true;
+	pending->backend = InvalidBackendId;
+	pending->atCommit = false;
+	pending->nestLevel = GetCurrentTransactionNestLevel();
+	pending->next = pendingDeletes;
+	pendingDeletes = pending;
+
+	/* drop inittmp fork at abort */
+	pending = (PendingRelDelete *)
+		MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+	pending->relnode = rnode;
+	pending->op = PDOP_UNLINK_FORK;
+	pending->unlink_forknum = INITTMP_FORKNUM;
+	pending->backend = InvalidBackendId;
+	pending->atCommit = false;
+	pending->nestLevel = GetCurrentTransactionNestLevel();
+	pending->next = pendingDeletes;
+	pendingDeletes = pending;
+
+	/* drop inittmp fork at commit*/
+	pending = (PendingRelDelete *)
+		MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+	pending->relnode = rnode;
+	pending->op = PDOP_UNLINK_FORK;
+	pending->unlink_forknum = INITTMP_FORKNUM;
+	pending->backend = InvalidBackendId;
+	pending->atCommit = true;
+	pending->nestLevel = GetCurrentTransactionNestLevel();
+	pending->next = pendingDeletes;
+	pendingDeletes = pending;
+}
+
+/*
+ * RelationDropInitFork
+ *		Delete physical storage for the init fork of a relation.
+ *
+ * Register pending-delete of the init fork. The real deletion is performed by
+ * smgrDoPendingDeletes at commit.
+ *
+ * This function is transactional. If the transaction aborts later on, the
+ * deletion doesn't happen.
+ */
+void
+RelationDropInitFork(Relation rel)
+{
+	RelFileNode rnode = rel->rd_node;
+	PendingRelDelete *pending;
+	PendingRelDelete *prev;
+	PendingRelDelete *next;
+	bool			  inxact_created = false;
+
+	/* switch buffer persistence */
+	SetRelationBuffersPersistence(rel->rd_smgr, true, false);
+
+	/*
+	 * If we have entries for init-fork  operation of this relation, that means
+	 * that  we have  created the  init fork  in the  current transaction.   We
+	 * immediately remove the init and  inittmp forks immediately in that case.
+	 * Otherwise just reister pending-delete for the existing init fork.
+	 */
+	prev = NULL;
+	for (pending = pendingDeletes; pending != NULL; pending = next)
+	{
+		next = pending->next;
+		if (RelFileNodeEquals(rnode, pending->relnode) &&
+			pending->op != PDOP_DELETE)
+		{
+			/* unlink list entry */
+			if (prev)
+				prev->next = next;
+			else
+				pendingDeletes = next;
+			pfree(pending);
+
+			inxact_created = true;
+		}
+		else
+		{
+			/* unrelated entry, don't touch it */
+			prev = pending;
+		}
+	}
+
+	if (inxact_created)
+	{
+		SMgrRelation srel = smgropen(rnode, InvalidBackendId);
+		smgrclose(srel);
+		log_smgrunlink(&rnode, INIT_FORKNUM);
+		smgrunlink(srel, INIT_FORKNUM, false);
+		log_smgrunlink(&rnode, INITTMP_FORKNUM);
+		smgrunlink(srel, INITTMP_FORKNUM, false);
+		return;
+	}
+
+	/* register drop of this init fork file at commit */
+	pending = (PendingRelDelete *)
+		MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+	pending->relnode = rnode;
+	pending->op = PDOP_UNLINK_FORK;
+	pending->unlink_forknum = INIT_FORKNUM;
+	pending->backend = InvalidBackendId;
+	pending->atCommit = true;
+	pending->nestLevel = GetCurrentTransactionNestLevel();
+	pending->next = pendingDeletes;
+	pendingDeletes = pending;
+
+	/* revert buffer-persistence changes at abort */
+	pending = (PendingRelDelete *)
+		MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+	pending->relnode = rnode;
+	pending->op = PDOP_SET_PERSISTENCE;
+	pending->bufpersistence = false;
+	pending->backend = InvalidBackendId;
+	pending->atCommit = true;
+	pending->nestLevel = GetCurrentTransactionNestLevel();
+	pending->next = pendingDeletes;
+	pendingDeletes = pending;
+}
+
 /*
  * Perform XLogInsert of an XLOG_SMGR_CREATE record to WAL.
  */
@@ -187,6 +402,44 @@ log_smgrcreate(const RelFileNode *rnode, ForkNumber forkNum)
 	XLogInsert(RM_SMGR_ID, XLOG_SMGR_CREATE | XLR_SPECIAL_REL_UPDATE);
 }
 
+/*
+ * Perform XLogInsert of an XLOG_SMGR_UNLINK record to WAL.
+ */
+void
+log_smgrunlink(const RelFileNode *rnode, ForkNumber forkNum)
+{
+	xl_smgr_unlink xlrec;
+
+	/*
+	 * Make an XLOG entry reporting the file unlink.
+	 */
+	xlrec.rnode = *rnode;
+	xlrec.forkNum = forkNum;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+	XLogInsert(RM_SMGR_ID, XLOG_SMGR_UNLINK | XLR_SPECIAL_REL_UPDATE);
+}
+
+/*
+ * Perform XLogInsert of an XLOG_SMGR_UNLINK record to WAL.
+ */
+void
+log_smgrbufpersistence(const RelFileNode *rnode, bool persistence)
+{
+	xl_smgr_bufpersistence xlrec;
+
+	/*
+	 * Make an XLOG entry reporting the file unlink.
+	 */
+	xlrec.rnode = *rnode;
+	xlrec.persistence = persistence;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+	XLogInsert(RM_SMGR_ID, XLOG_SMGR_BUFPERSISTENCE | XLR_SPECIAL_REL_UPDATE);
+}
+
 /*
  * RelationDropStorage
  *		Schedule unlinking of physical storage at transaction commit.
@@ -200,6 +453,7 @@ RelationDropStorage(Relation rel)
 	pending = (PendingRelDelete *)
 		MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
 	pending->relnode = rel->rd_node;
+	pending->op = PDOP_DELETE;
 	pending->backend = rel->rd_backend;
 	pending->atCommit = true;	/* delete if commit */
 	pending->nestLevel = GetCurrentTransactionNestLevel();
@@ -606,43 +860,68 @@ smgrDoPendingDeletes(bool isCommit)
 	prev = NULL;
 	for (pending = pendingDeletes; pending != NULL; pending = next)
 	{
+		SMgrRelation srel;
+
 		next = pending->next;
 		if (pending->nestLevel < nestLevel)
 		{
 			/* outer-level entries should not be processed yet */
 			prev = pending;
+			continue;
 		}
+
+		/* unlink list entry first, so we don't retry on failure */
+		if (prev)
+			prev->next = next;
 		else
+			pendingDeletes = next;
+
+		if (pending->atCommit != isCommit)
 		{
-			/* unlink list entry first, so we don't retry on failure */
-			if (prev)
-				prev->next = next;
-			else
-				pendingDeletes = next;
-			/* do deletion if called for */
-			if (pending->atCommit == isCommit)
-			{
-				SMgrRelation srel;
-
-				srel = smgropen(pending->relnode, pending->backend);
-
-				/* allocate the initial array, or extend it, if needed */
-				if (maxrels == 0)
-				{
-					maxrels = 8;
-					srels = palloc(sizeof(SMgrRelation) * maxrels);
-				}
-				else if (maxrels <= nrels)
-				{
-					maxrels *= 2;
-					srels = repalloc(srels, sizeof(SMgrRelation) * maxrels);
-				}
-
-				srels[nrels++] = srel;
-			}
 			/* must explicitly free the list entry */
 			pfree(pending);
 			/* prev does not change */
+			continue;
+		}
+
+		srel = smgropen(pending->relnode, pending->backend);
+
+		if (pending->op != PDOP_DELETE)
+		{
+			if (pending->op & PDOP_UNLINK_FORK)
+			{
+				BlockNumber block = 0;
+				RelFileNodeBackend rbnode;
+
+				rbnode.node = pending->relnode;
+				rbnode.backend = InvalidBackendId;
+
+				DropRelFileNodeBuffers(rbnode, &pending->unlink_forknum, 1,
+									   &block);
+				smgrclose(srel);
+				log_smgrunlink(&pending->relnode, pending->unlink_forknum);
+				smgrunlink(srel, pending->unlink_forknum, false);
+			}
+
+			if (pending->op & PDOP_SET_PERSISTENCE)
+				SetRelationBuffersPersistence(srel, pending->bufpersistence,
+											  false);
+		}
+		else
+		{
+			/* allocate the initial array, or extend it, if needed */
+			if (maxrels == 0)
+			{
+				maxrels = 8;
+				srels = palloc(sizeof(SMgrRelation) * maxrels);
+			}
+			else if (maxrels <= nrels)
+			{
+				maxrels *= 2;
+				srels = repalloc(srels, sizeof(SMgrRelation) * maxrels);
+			}
+
+			srels[nrels++] = srel;
 		}
 	}
 
@@ -824,7 +1103,8 @@ smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr)
 	for (pending = pendingDeletes; pending != NULL; pending = pending->next)
 	{
 		if (pending->nestLevel >= nestLevel && pending->atCommit == forCommit
-			&& pending->backend == InvalidBackendId)
+			&& pending->backend == InvalidBackendId &&
+			pending->op == PDOP_DELETE)
 			nrels++;
 	}
 	if (nrels == 0)
@@ -837,7 +1117,8 @@ smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr)
 	for (pending = pendingDeletes; pending != NULL; pending = pending->next)
 	{
 		if (pending->nestLevel >= nestLevel && pending->atCommit == forCommit
-			&& pending->backend == InvalidBackendId)
+			&& pending->backend == InvalidBackendId &&
+			pending->op == PDOP_DELETE)
 		{
 			*rptr = pending->relnode;
 			rptr++;
@@ -917,6 +1198,15 @@ smgr_redo(XLogReaderState *record)
 		reln = smgropen(xlrec->rnode, InvalidBackendId);
 		smgrcreate(reln, xlrec->forkNum, true);
 	}
+	else if (info == XLOG_SMGR_UNLINK)
+	{
+		xl_smgr_unlink *xlrec = (xl_smgr_unlink *) XLogRecGetData(record);
+		SMgrRelation reln;
+
+		reln = smgropen(xlrec->rnode, InvalidBackendId);
+		smgrclose(reln);
+		smgrunlink(reln, xlrec->forkNum, true);
+	}
 	else if (info == XLOG_SMGR_TRUNCATE)
 	{
 		xl_smgr_truncate *xlrec = (xl_smgr_truncate *) XLogRecGetData(record);
@@ -1005,6 +1295,15 @@ smgr_redo(XLogReaderState *record)
 
 		FreeFakeRelcacheEntry(rel);
 	}
+	else if (info == XLOG_SMGR_BUFPERSISTENCE)
+	{
+		xl_smgr_bufpersistence *xlrec =
+			(xl_smgr_bufpersistence *) XLogRecGetData(record);
+		SMgrRelation reln;
+
+		reln = smgropen(xlrec->rnode, InvalidBackendId);
+		SetRelationBuffersPersistence(reln, xlrec->persistence, true);
+	}
 	else
 		elog(PANIC, "smgr_redo: unknown op code %u", info);
 }
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index e3cfaf8b07..29f786142a 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -4916,6 +4916,142 @@ ATParseTransformCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
 	tab->afterStmts = list_concat(tab->afterStmts, afterStmts);
 
 	return newcmd;
+}
+
+/*
+ * RelationChangePersistence: do in-place persistence change of a relation
+ */
+static void
+RelationChangePersistence(AlteredTableInfo *tab, char persistence,
+						  LOCKMODE lockmode)
+{
+	Relation 	rel;
+	Relation	classRel;
+	HeapTuple	tuple,
+				newtuple;
+	Datum		new_val[Natts_pg_class];
+	bool		new_null[Natts_pg_class],
+				new_repl[Natts_pg_class];
+	int			i;
+	List	   *relids;
+	ListCell   *lc_oid;
+
+	Assert(tab->rewrite == AT_REWRITE_ALTER_PERSISTENCE);
+	Assert(lockmode == AccessExclusiveLock);
+
+	/*
+	 * Under the following condition, we need to call ATRewriteTable, which
+	 * cannot be false in the AT_REWRITE_ALTER_PERSISTENCE case.
+	 */
+	Assert(tab->constraints == NULL && tab->partition_constraint == NULL &&
+		   tab->newvals == NULL && !tab->verify_new_notnull);
+
+	rel = table_open(tab->relid, lockmode);
+
+	Assert(rel->rd_rel->relpersistence != persistence);
+
+	elog(DEBUG1, "perform im-place persistnce change");
+
+	RelationOpenSmgr(rel);
+
+	/*
+	 * First we collect all relations that we need to change persistence.
+	 */
+
+	/* Collect OIDs of indexes and toast relations */
+	relids = RelationGetIndexList(rel);
+	relids = lcons_oid(rel->rd_id, relids);
+
+	/* Add toast relation if any */
+	if (OidIsValid(rel->rd_rel->reltoastrelid))
+	{
+		List	*toastidx;
+		Relation toastrel = table_open(rel->rd_rel->reltoastrelid, lockmode);
+
+		RelationOpenSmgr(toastrel);
+		relids = lappend_oid(relids, rel->rd_rel->reltoastrelid);
+		toastidx = RelationGetIndexList(toastrel);
+		relids = list_concat(relids, toastidx);
+		pfree(toastidx);
+		table_close(toastrel, NoLock);
+	}
+
+	table_close(rel, lockmode);
+
+	/* Make changes in storage */
+	classRel = table_open(RelationRelationId, RowExclusiveLock);
+
+	foreach (lc_oid, relids)
+	{
+		Oid reloid = lfirst_oid(lc_oid);
+		Relation r = relation_open(reloid, lockmode);
+
+		RelationOpenSmgr(r);
+
+		/* Create or drop init fork */
+		if (persistence == RELPERSISTENCE_UNLOGGED)
+			RelationCreateInitFork(r);
+		else
+			RelationDropInitFork(r);
+
+		/*
+		 * When this relation gets WAL-logged, immediately sync all files but
+		 * initfork to establish the initial state on storage.  Buffers have
+		 * alredy flushed out by RelationCreate(Drop)InitFork called just
+		 * above. Initfork should have been synced as needed.
+		 */
+		if (persistence == RELPERSISTENCE_PERMANENT)
+		{
+			for (i = 0 ; i < INIT_FORKNUM ; i++)
+			{
+				if (smgrexists(r->rd_smgr, i))
+					smgrimmedsync(r->rd_smgr, i);
+			}
+		}
+
+		/* Update catalog */
+		tuple = SearchSysCacheCopy1(RELOID,	ObjectIdGetDatum(reloid));
+		if (!HeapTupleIsValid(tuple))
+			elog(ERROR, "cache lookup failed for relation %u", reloid);
+
+		memset(new_val, 0, sizeof(new_val));
+		memset(new_null, false, sizeof(new_null));
+		memset(new_repl, false, sizeof(new_repl));
+
+		new_val[Anum_pg_class_relpersistence - 1] = CharGetDatum(persistence);
+		new_null[Anum_pg_class_relpersistence - 1] = false;
+		new_repl[Anum_pg_class_relpersistence - 1] = true;
+
+		newtuple = heap_modify_tuple(tuple, RelationGetDescr(classRel),
+									 new_val, new_null, new_repl);
+
+		CatalogTupleUpdate(classRel, &newtuple->t_self, newtuple);
+		heap_freetuple(newtuple);
+
+		/*
+		 * While wal_level >= replica, switching to LOGGED requires the
+		 * relation content to be WAL-logged to recovery the table.
+		 */
+		if (persistence == RELPERSISTENCE_PERMANENT && XLogIsNeeded())
+		{
+			ForkNumber fork;
+
+			for (fork = 0; fork < INIT_FORKNUM ; fork++)
+			{
+				if (smgrexists(r->rd_smgr, fork))
+					log_newpage_range(r, fork,
+									  0, smgrnblocks(r->rd_smgr, fork), false);
+			}
+		}
+
+		table_close(r, NoLock);
+	}
+
+	table_close(classRel, NoLock);
+
+
+
+
 }
 
 /*
@@ -5038,45 +5174,52 @@ ATRewriteTables(AlterTableStmt *parsetree, List **wqueue, LOCKMODE lockmode,
 										 tab->relid,
 										 tab->rewrite);
 
-			/*
-			 * Create transient table that will receive the modified data.
-			 *
-			 * Ensure it is marked correctly as logged or unlogged.  We have
-			 * to do this here so that buffers for the new relfilenode will
-			 * have the right persistence set, and at the same time ensure
-			 * that the original filenode's buffers will get read in with the
-			 * correct setting (i.e. the original one).  Otherwise a rollback
-			 * after the rewrite would possibly result with buffers for the
-			 * original filenode having the wrong persistence setting.
-			 *
-			 * NB: This relies on swap_relation_files() also swapping the
-			 * persistence. That wouldn't work for pg_class, but that can't be
-			 * unlogged anyway.
-			 */
-			OIDNewHeap = make_new_heap(tab->relid, NewTableSpace, persistence,
-									   lockmode);
+			if (tab->rewrite == AT_REWRITE_ALTER_PERSISTENCE)
+				RelationChangePersistence(tab, persistence, lockmode);
+			else
+			{
+				/*
+				 * Create transient table that will receive the modified data.
+				 *
+				 * Ensure it is marked correctly as logged or unlogged.  We
+				 * have to do this here so that buffers for the new relfilenode
+				 * will have the right persistence set, and at the same time
+				 * ensure that the original filenode's buffers will get read in
+				 * with the correct setting (i.e. the original one).  Otherwise
+				 * a rollback after the rewrite would possibly result with
+				 * buffers for the original filenode having the wrong
+				 * persistence setting.
+				 *
+				 * NB: This relies on swap_relation_files() also swapping the
+				 * persistence. That wouldn't work for pg_class, but that can't
+				 * be unlogged anyway.
+				 */
+				OIDNewHeap = make_new_heap(tab->relid, NewTableSpace, persistence,
+										   lockmode);
 
-			/*
-			 * Copy the heap data into the new table with the desired
-			 * modifications, and test the current data within the table
-			 * against new constraints generated by ALTER TABLE commands.
-			 */
-			ATRewriteTable(tab, OIDNewHeap, lockmode);
+				/*
+				 * Copy the heap data into the new table with the desired
+				 * modifications, and test the current data within the table
+				 * against new constraints generated by ALTER TABLE commands.
+				 */
+				ATRewriteTable(tab, OIDNewHeap, lockmode);
 
-			/*
-			 * Swap the physical files of the old and new heaps, then rebuild
-			 * indexes and discard the old heap.  We can use RecentXmin for
-			 * the table's new relfrozenxid because we rewrote all the tuples
-			 * in ATRewriteTable, so no older Xid remains in the table.  Also,
-			 * we never try to swap toast tables by content, since we have no
-			 * interest in letting this code work on system catalogs.
-			 */
-			finish_heap_swap(tab->relid, OIDNewHeap,
-							 false, false, true,
-							 !OidIsValid(tab->newTableSpace),
-							 RecentXmin,
-							 ReadNextMultiXactId(),
-							 persistence);
+				/*
+				 * Swap the physical files of the old and new heaps, then
+				 * rebuild indexes and discard the old heap.  We can use
+				 * RecentXmin for the table's new relfrozenxid because we
+				 * rewrote all the tuples in ATRewriteTable, so no older Xid
+				 * remains in the table.  Also, we never try to swap toast
+				 * tables by content, since we have no interest in letting this
+				 * code work on system catalogs.
+				 */
+				finish_heap_swap(tab->relid, OIDNewHeap,
+								 false, false, true,
+								 !OidIsValid(tab->newTableSpace),
+								 RecentXmin,
+								 ReadNextMultiXactId(),
+								 persistence);
+			}
 		}
 		else
 		{
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index ad0d1a9abc..ddd0133cdf 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -37,6 +37,7 @@
 #include "access/xlog.h"
 #include "catalog/catalog.h"
 #include "catalog/storage.h"
+#include "catalog/storage_xlog.h"
 #include "executor/instrument.h"
 #include "lib/binaryheap.h"
 #include "miscadmin.h"
@@ -3033,6 +3034,93 @@ DropRelFileNodeBuffers(RelFileNodeBackend rnode, ForkNumber *forkNum,
 	}
 }
 
+/* ---------------------------------------------------------------------
+ *		SetRelFileNodeBuffersPersistence
+ *
+ *		This function changes the persistence of all buffer pages of a relation
+ *		then writes all dirty pages of the relation out to disk when switching
+ *		to PERMANENT. (or more accurately, out to kernel disk buffers),
+ *		ensuring that the kernel has an up-to-date view of the relation.
+ *
+ *		Generally, the caller should be holding AccessExclusiveLock on the
+ *		target relation to ensure that no other backend is busy dirtying
+ *		more blocks of the relation; the effects can't be expected to last
+ *		after the lock is released.
+ *
+ *		XXX currently it sequentially searches the buffer pool, should be
+ *		changed to more clever ways of searching.  This routine is not
+ *		used in any performance-critical code paths, so it's not worth
+ *		adding additional overhead to normal paths to make it go faster;
+ *		but see also DropRelFileNodeBuffers.
+ * --------------------------------------------------------------------
+ */
+void
+SetRelationBuffersPersistence(SMgrRelation srel, bool permanent, bool isRedo)
+{
+	int			i;
+	RelFileNodeBackend rnode = srel->smgr_rnode;
+
+	Assert (!RelFileNodeBackendIsTemp(rnode));
+
+	if (!isRedo)
+		log_smgrbufpersistence(&srel->smgr_rnode.node, permanent);
+
+	ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
+
+	for (i = 0; i < NBuffers; i++)
+	{
+		BufferDesc *bufHdr = GetBufferDescriptor(i);
+		uint32		buf_state;
+
+		if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode.node))
+			continue;
+
+		ReservePrivateRefCountEntry();
+
+		buf_state = LockBufHdr(bufHdr);
+
+		if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode.node))
+		{
+			UnlockBufHdr(bufHdr, buf_state);
+			continue;
+		}
+
+		if (permanent)
+		{
+			/* Init fork is being dropped, drop buffers for it. */
+			if (bufHdr->tag.forkNum == INIT_FORKNUM)
+			{
+				InvalidateBuffer(bufHdr);
+				continue;
+			}
+
+			buf_state |= BM_PERMANENT;
+			pg_atomic_write_u32(&bufHdr->state, buf_state);
+
+			/* we flush this buffer when swithing to PERMANENT */
+			if ((buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
+			{
+				PinBuffer_Locked(bufHdr);
+				LWLockAcquire(BufferDescriptorGetContentLock(bufHdr),
+							  LW_SHARED);
+				FlushBuffer(bufHdr, srel);
+				LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
+				UnpinBuffer(bufHdr, true);
+			}
+			else
+				UnlockBufHdr(bufHdr, buf_state);
+		}
+		else
+		{
+			/* init fork is always BM_PERMANENT. See BufferAlloc */
+			if (bufHdr->tag.forkNum != INIT_FORKNUM)
+				buf_state &= ~BM_PERMANENT;
+
+			UnlockBufHdr(bufHdr, buf_state);
+		}
+	}
+}
+
 /* ---------------------------------------------------------------------
  *		DropRelFileNodesAllBuffers
  *
diff --git a/src/backend/storage/file/reinit.c b/src/backend/storage/file/reinit.c
index 0c2094f766..6524262a74 100644
--- a/src/backend/storage/file/reinit.c
+++ b/src/backend/storage/file/reinit.c
@@ -31,6 +31,7 @@ static void ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname,
 typedef struct
 {
 	char		oid[OIDCHARS + 1];
+	bool		dirty;
 } unlogged_relation_entry;
 
 /*
@@ -151,6 +152,8 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
 	DIR		   *dbspace_dir;
 	struct dirent *de;
 	char		rm_path[MAXPGPATH * 2];
+	HTAB	   *hash;
+	HASHCTL		ctl;
 
 	/* Caller must specify at least one operation. */
 	Assert((op & (UNLOGGED_RELATION_CLEANUP | UNLOGGED_RELATION_INIT)) != 0);
@@ -160,62 +163,73 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
 	 * the files with init forks.  Then, we go through again and nuke
 	 * everything with the same OID except the init fork.
 	 */
+
+	/*
+	 * It's possible that someone could create a ton of unlogged relations
+	 * in the same database & tablespace, so we'd better use a hash table
+	 * rather than an array or linked list to keep track of which files
+	 * need to be reset.  Otherwise, this cleanup operation would be
+	 * O(n^2).
+	 */
+	memset(&ctl, 0, sizeof(ctl));
+	ctl.keysize = sizeof(unlogged_relation_entry);
+	ctl.entrysize = sizeof(unlogged_relation_entry);
+	hash = hash_create("unlogged hash", 32, &ctl, HASH_ELEM);
+
+	/* Scan the directory. */
+	dbspace_dir = AllocateDir(dbspacedirname);
+	while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
+	{
+		ForkNumber	forkNum;
+		int			oidchars;
+		bool		found;
+		unlogged_relation_entry key;
+		unlogged_relation_entry *ent;
+
+		/* Skip anything that doesn't look like a relation data file. */
+		if (!parse_filename_for_nontemp_relation(de->d_name, &oidchars,
+												 &forkNum))
+			continue;
+
+		/* Also skip it unless this is the init fork. */
+		if (forkNum != INIT_FORKNUM && forkNum != INITTMP_FORKNUM)
+			continue;
+
+		/*
+		 * Put the OID portion of the name into the hash table, if it
+		 * isn't already.
+		 */
+		memset(key.oid, 0, sizeof(key.oid));
+		memcpy(key.oid, de->d_name, oidchars);
+		ent = hash_search(hash, &key, HASH_ENTER, &found);
+
+		if (!found)
+			ent->dirty = 0;
+
+		/*
+		 * If we have the inittmp fork, the transaction that created the
+		 * corresponding init file was not committed nor aborted. Mark this
+		 * init fork as dirty so that we can clean up them properly.
+		 */
+		if (forkNum == INITTMP_FORKNUM)
+			ent->dirty = true;
+	}
+
+	/* Done with the first pass. */
+	FreeDir(dbspace_dir);
+
+	/*
+	 * If we didn't find any init forks, there's no point in continuing;
+	 * we can bail out now.
+	 */
+	if (hash_get_num_entries(hash) == 0)
+	{
+		hash_destroy(hash);
+		return;
+	}
+
 	if ((op & UNLOGGED_RELATION_CLEANUP) != 0)
 	{
-		HTAB	   *hash;
-		HASHCTL		ctl;
-
-		/*
-		 * It's possible that someone could create a ton of unlogged relations
-		 * in the same database & tablespace, so we'd better use a hash table
-		 * rather than an array or linked list to keep track of which files
-		 * need to be reset.  Otherwise, this cleanup operation would be
-		 * O(n^2).
-		 */
-		memset(&ctl, 0, sizeof(ctl));
-		ctl.keysize = sizeof(unlogged_relation_entry);
-		ctl.entrysize = sizeof(unlogged_relation_entry);
-		hash = hash_create("unlogged hash", 32, &ctl, HASH_ELEM);
-
-		/* Scan the directory. */
-		dbspace_dir = AllocateDir(dbspacedirname);
-		while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
-		{
-			ForkNumber	forkNum;
-			int			oidchars;
-			unlogged_relation_entry ent;
-
-			/* Skip anything that doesn't look like a relation data file. */
-			if (!parse_filename_for_nontemp_relation(de->d_name, &oidchars,
-													 &forkNum))
-				continue;
-
-			/* Also skip it unless this is the init fork. */
-			if (forkNum != INIT_FORKNUM)
-				continue;
-
-			/*
-			 * Put the OID portion of the name into the hash table, if it
-			 * isn't already.
-			 */
-			memset(ent.oid, 0, sizeof(ent.oid));
-			memcpy(ent.oid, de->d_name, oidchars);
-			hash_search(hash, &ent, HASH_ENTER, NULL);
-		}
-
-		/* Done with the first pass. */
-		FreeDir(dbspace_dir);
-
-		/*
-		 * If we didn't find any init forks, there's no point in continuing;
-		 * we can bail out now.
-		 */
-		if (hash_get_num_entries(hash) == 0)
-		{
-			hash_destroy(hash);
-			return;
-		}
-
 		/*
 		 * Now, make a second pass and remove anything that matches.
 		 */
@@ -224,39 +238,48 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
 		{
 			ForkNumber	forkNum;
 			int			oidchars;
-			bool		found;
-			unlogged_relation_entry ent;
+			unlogged_relation_entry key;
+			unlogged_relation_entry *ent;
 
 			/* Skip anything that doesn't look like a relation data file. */
 			if (!parse_filename_for_nontemp_relation(de->d_name, &oidchars,
 													 &forkNum))
 				continue;
 
-			/* We never remove the init fork. */
-			if (forkNum == INIT_FORKNUM)
-				continue;
-
 			/*
 			 * See whether the OID portion of the name shows up in the hash
 			 * table.
 			 */
-			memset(ent.oid, 0, sizeof(ent.oid));
-			memcpy(ent.oid, de->d_name, oidchars);
-			hash_search(hash, &ent, HASH_FIND, &found);
+			memset(key.oid, 0, sizeof(key.oid));
+			memcpy(key.oid, de->d_name, oidchars);
+			ent = hash_search(hash, &key, HASH_FIND, NULL);
 
-			/* If so, nuke it! */
-			if (found)
+			/* Don't remove files if corresponding init fork is not found */
+			if (!ent)
+				continue;
+
+			if (!ent->dirty)
+			{
+				/* Don't remove clean init file */
+				if (forkNum == INIT_FORKNUM)
+					continue;
+			}else
 			{
-				snprintf(rm_path, sizeof(rm_path), "%s/%s",
-						 dbspacedirname, de->d_name);
-				if (unlink(rm_path) < 0)
-					ereport(ERROR,
-							(errcode_for_file_access(),
-							 errmsg("could not remove file \"%s\": %m",
-									rm_path)));
-				else
-					elog(DEBUG2, "unlinked file \"%s\"", rm_path);
+				/* Remove dirty init file, together with inittmp file  */
+				if (forkNum != INIT_FORKNUM && forkNum != INITTMP_FORKNUM)
+					continue;
 			}
+
+			/* so, nuke it! */
+			snprintf(rm_path, sizeof(rm_path), "%s/%s",
+					 dbspacedirname, de->d_name);
+			if (unlink(rm_path) < 0)
+				ereport(ERROR,
+						(errcode_for_file_access(),
+						 errmsg("could not remove file \"%s\": %m",
+								rm_path)));
+			else
+				elog(DEBUG2, "unlinked file \"%s\"", rm_path);
 		}
 
 		/* Cleanup is complete. */
@@ -273,6 +296,9 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
 	 */
 	if ((op & UNLOGGED_RELATION_INIT) != 0)
 	{
+		unlogged_relation_entry key;
+		unlogged_relation_entry *ent;
+
 		/* Scan the directory. */
 		dbspace_dir = AllocateDir(dbspacedirname);
 		while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
@@ -288,6 +314,38 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
 													 &forkNum))
 				continue;
 
+			/*
+			 * See whether the OID portion of the name shows up in the hash
+			 * table.
+			 */
+			memset(key.oid, 0, sizeof(key.oid));
+			memcpy(key.oid, de->d_name, oidchars);
+			ent = hash_search(hash, &key, HASH_FIND, NULL);
+
+			/* Don't init file that doesn't have the init fork. */
+			if (!ent)
+				continue;
+
+			if (ent->dirty &&
+				(forkNum == INIT_FORKNUM || forkNum == INITTMP_FORKNUM))
+			{
+				/*
+				 * The init file is dirty. The files has been removed once at
+				 * cleanup time but recovery can create them again. Remove both
+				 * INIT and INITTMP files.
+				 */
+				snprintf(rm_path, sizeof(rm_path), "%s/%s",
+						 dbspacedirname, de->d_name);
+				if (unlink(rm_path) < 0)
+					ereport(ERROR,
+							(errcode_for_file_access(),
+							 errmsg("could not remove file \"%s\": %m",
+									rm_path)));
+				else
+					elog(DEBUG2, "unlinked file \"%s\"", rm_path);
+				continue;
+			}
+
 			/* Also skip it unless this is the init fork. */
 			if (forkNum != INIT_FORKNUM)
 				continue;
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index dcc09df0c7..5eb9e97b3d 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -645,6 +645,12 @@ smgrimmedsync(SMgrRelation reln, ForkNumber forknum)
 	smgrsw[reln->smgr_which].smgr_immedsync(reln, forknum);
 }
 
+void
+smgrunlink(SMgrRelation reln, ForkNumber forknum, bool isRedo)
+{
+	smgrsw[reln->smgr_which].smgr_unlink(reln->smgr_rnode, forknum, isRedo);
+}
+
 /*
  * AtEOXact_SMgr
  *
diff --git a/src/common/relpath.c b/src/common/relpath.c
index ad733d1363..2a5e5fa990 100644
--- a/src/common/relpath.c
+++ b/src/common/relpath.c
@@ -34,7 +34,8 @@ const char *const forkNames[] = {
 	"main",						/* MAIN_FORKNUM */
 	"fsm",						/* FSM_FORKNUM */
 	"vm",						/* VISIBILITYMAP_FORKNUM */
-	"init"						/* INIT_FORKNUM */
+	"init",						/* INIT_FORKNUM */
+	"itmp"						/* INITTMP_FORKNUM */
 };
 
 StaticAssertDecl(lengthof(forkNames) == (MAX_FORKNUM + 1),
diff --git a/src/include/catalog/storage.h b/src/include/catalog/storage.h
index 30c38e0ca6..c2259cd7e3 100644
--- a/src/include/catalog/storage.h
+++ b/src/include/catalog/storage.h
@@ -23,6 +23,8 @@
 extern int	wal_skip_threshold;
 
 extern SMgrRelation RelationCreateStorage(RelFileNode rnode, char relpersistence);
+extern void RelationCreateInitFork(Relation rel);
+extern void RelationDropInitFork(Relation rel);
 extern void RelationDropStorage(Relation rel);
 extern void RelationPreserveStorage(RelFileNode rnode, bool atCommit);
 extern void RelationPreTruncate(Relation rel);
diff --git a/src/include/catalog/storage_xlog.h b/src/include/catalog/storage_xlog.h
index 7b21cab2e0..d48b5288ce 100644
--- a/src/include/catalog/storage_xlog.h
+++ b/src/include/catalog/storage_xlog.h
@@ -29,6 +29,8 @@
 /* XLOG gives us high 4 bits */
 #define XLOG_SMGR_CREATE	0x10
 #define XLOG_SMGR_TRUNCATE	0x20
+#define XLOG_SMGR_UNLINK	0x30
+#define XLOG_SMGR_BUFPERSISTENCE	0x40
 
 typedef struct xl_smgr_create
 {
@@ -36,6 +38,18 @@ typedef struct xl_smgr_create
 	ForkNumber	forkNum;
 } xl_smgr_create;
 
+typedef struct xl_smgr_unlink
+{
+	RelFileNode rnode;
+	ForkNumber	forkNum;
+} xl_smgr_unlink;
+
+typedef struct xl_smgr_bufpersistence
+{
+	RelFileNode rnode;
+	bool		persistence;
+} xl_smgr_bufpersistence;
+
 /* flags for xl_smgr_truncate */
 #define SMGR_TRUNCATE_HEAP		0x0001
 #define SMGR_TRUNCATE_VM		0x0002
@@ -51,6 +65,8 @@ typedef struct xl_smgr_truncate
 } xl_smgr_truncate;
 
 extern void log_smgrcreate(const RelFileNode *rnode, ForkNumber forkNum);
+extern void log_smgrunlink(const RelFileNode *rnode, ForkNumber forkNum);
+extern void log_smgrbufpersistence(const RelFileNode *rnode, bool persistence);
 
 extern void smgr_redo(XLogReaderState *record);
 extern void smgr_desc(StringInfo buf, XLogReaderState *record);
diff --git a/src/include/common/relpath.h b/src/include/common/relpath.h
index 869cabcc0d..f6e1a74a38 100644
--- a/src/include/common/relpath.h
+++ b/src/include/common/relpath.h
@@ -43,7 +43,8 @@ typedef enum ForkNumber
 	MAIN_FORKNUM = 0,
 	FSM_FORKNUM,
 	VISIBILITYMAP_FORKNUM,
-	INIT_FORKNUM
+	INIT_FORKNUM,
+	INITTMP_FORKNUM
 
 	/*
 	 * NOTE: if you add a new fork, change MAX_FORKNUM and possibly
@@ -52,7 +53,7 @@ typedef enum ForkNumber
 	 */
 } ForkNumber;
 
-#define MAX_FORKNUM		INIT_FORKNUM
+#define MAX_FORKNUM		INITTMP_FORKNUM
 
 #define FORKNAMECHARS	4		/* max chars for a fork name */
 
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index ee91b8fa26..e2496ed1c8 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -168,6 +168,8 @@ extern PGDLLIMPORT int32 *LocalRefCount;
  */
 #define BufferGetPage(buffer) ((Page)BufferGetBlock(buffer))
 
+struct SmgrRelationData;
+
 /*
  * prototypes for functions in bufmgr.c
  */
@@ -205,6 +207,8 @@ extern void FlushRelationsAllBuffers(struct SMgrRelationData **smgrs, int nrels)
 extern void FlushDatabaseBuffers(Oid dbid);
 extern void DropRelFileNodeBuffers(RelFileNodeBackend rnode, ForkNumber *forkNum,
 								   int nforks, BlockNumber *firstDelBlock);
+extern void SetRelationBuffersPersistence(struct SMgrRelationData *srel,
+										  bool permanent, bool isRedo);
 extern void DropRelFileNodesAllBuffers(RelFileNodeBackend *rnodes, int nnodes);
 extern void DropDatabaseBuffers(Oid dbid);
 
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index f28a842401..5d74631006 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -86,6 +86,7 @@ extern void smgrclose(SMgrRelation reln);
 extern void smgrcloseall(void);
 extern void smgrclosenode(RelFileNodeBackend rnode);
 extern void smgrcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo);
+extern void smgrunlink(SMgrRelation reln, ForkNumber forknum, bool isRedo);
 extern void smgrdosyncall(SMgrRelation *rels, int nrels);
 extern void smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo);
 extern void smgrextend(SMgrRelation reln, ForkNumber forknum,
-- 
2.18.4

