From 1ba019a026c66c71bafdc9c71393bef4251917b2 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyota.ntt@gmail.com>
Date: Tue, 25 Apr 2023 15:49:10 +0900
Subject: [PATCH v29 2/3] In-place table persistence change

Previously, the command caused a large amount of file I/O due to heap
rewrites, even though ALTER TABLE SET UNLOGGED does not require any
data rewrites.  This patch eliminates the need for
rewrites. Additionally, ALTER TABLE SET LOGGED is updated to emit
XLOG_FPI records instead of numerous HEAP_INSERTs when wal_level >
minimal, reducing resource consumption.
---
 src/backend/access/rmgrdesc/smgrdesc.c |  12 +
 src/backend/catalog/storage.c          | 295 ++++++++++++++++++++++++-
 src/backend/commands/tablecmds.c       | 270 ++++++++++++++++++----
 src/backend/storage/buffer/bufmgr.c    |  84 +++++++
 src/backend/storage/file/reinit.c      |  51 ++++-
 src/bin/pg_rewind/parsexlog.c          |   6 +
 src/include/catalog/storage_xlog.h     |   8 +
 src/include/storage/bufmgr.h           |   2 +
 src/tools/pgindent/typedefs.list       |   1 +
 9 files changed, 675 insertions(+), 54 deletions(-)

diff --git a/src/backend/access/rmgrdesc/smgrdesc.c b/src/backend/access/rmgrdesc/smgrdesc.c
index f8187385c4..e2998a3ee4 100644
--- a/src/backend/access/rmgrdesc/smgrdesc.c
+++ b/src/backend/access/rmgrdesc/smgrdesc.c
@@ -71,6 +71,15 @@ smgr_desc(StringInfo buf, XLogReaderState *record)
 		appendStringInfo(buf, "%s %s", action, path);
 		pfree(path);
 	}
+	else if (info == XLOG_SMGR_BUFPERSISTENCE)
+	{
+		xl_smgr_bufpersistence *xlrec = (xl_smgr_bufpersistence *) rec;
+		char	   *path = relpathperm(xlrec->rlocator, MAIN_FORKNUM);
+
+		appendStringInfoString(buf, path);
+		appendStringInfo(buf, " persistence %d", xlrec->persistence);
+		pfree(path);
+	}
 }
 
 const char *
@@ -92,6 +101,9 @@ smgr_identify(uint8 info)
 		case XLOG_SMGR_MARK:
 			id = "MARK";
 			break;
+		case XLOG_SMGR_BUFPERSISTENCE:
+			id = "BUFPERSISTENCE";
+			break;
 	}
 
 	return id;
diff --git a/src/backend/catalog/storage.c b/src/backend/catalog/storage.c
index fe06c3c31d..6106376525 100644
--- a/src/backend/catalog/storage.c
+++ b/src/backend/catalog/storage.c
@@ -69,11 +69,13 @@ typedef struct PendingRelDelete
 
 #define	PCOP_UNLINK_FORK		(1 << 0)
 #define	PCOP_UNLINK_MARK		(1 << 1)
+#define	PCOP_SET_PERSISTENCE	(1 << 2)
 
 typedef struct PendingCleanup
 {
 	RelFileLocator rlocator;	/* relation that need a cleanup */
 	int			op;				/* operation mask */
+	bool		bufpersistence; /* buffer persistence to set */
 	ForkNumber	unlink_forknum; /* forknum to unlink */
 	StorageMarks unlink_mark;	/* mark to unlink */
 	BackendId	backend;		/* InvalidBackendId if not a temp rel */
@@ -223,6 +225,202 @@ RelationCreateStorage(RelFileLocator rlocator, char relpersistence,
 	return srel;
 }
 
+/*
+ * RelationCreateInitFork
+ *		Create physical storage for the init fork of a relation.
+ *
+ * Create the init fork for the relation.
+ *
+ * This function is transactional. The creation is WAL-logged, and if the
+ * transaction aborts later on, the init fork will be removed.
+ */
+void
+RelationCreateInitFork(Relation rel)
+{
+	RelFileLocator rlocator = rel->rd_locator;
+	PendingCleanup *pending;
+	PendingCleanup *prev;
+	PendingCleanup *next;
+	SMgrRelation srel;
+	bool		create = true;
+
+	/* switch buffer persistence */
+	SetRelationBuffersPersistence(RelationGetSmgr(rel), false, false);
+
+	/*
+	 * If a pending-unlink exists for this relation's init-fork, it indicates
+	 * the init-fork's existed before the current transaction; this function
+	 * reverts the pending-unlink by removing the entry. See
+	 * RelationDropInitFork.
+	 */
+	prev = NULL;
+	for (pending = pendingCleanups; pending != NULL; pending = next)
+	{
+		next = pending->next;
+
+		if (RelFileLocatorEquals(rlocator, pending->rlocator) &&
+			pending->unlink_forknum == INIT_FORKNUM)
+		{
+			if (prev)
+				prev->next = next;
+			else
+				pendingCleanups = next;
+
+			pfree(pending);
+			/* prev does not change */
+
+			create = false;
+		}
+		else
+			prev = pending;
+	}
+
+	if (!create)
+		return;
+
+	/* create the init fork, along with the mark file */
+	srel = smgropen(rlocator, InvalidBackendId);
+	log_smgrcreatemark(&rlocator, INIT_FORKNUM, SMGR_MARK_UNCOMMITTED);
+	smgrcreatemark(srel, INIT_FORKNUM, SMGR_MARK_UNCOMMITTED, false);
+
+	/* We don't have existing init fork, create it. */
+	smgrcreate(srel, INIT_FORKNUM, false);
+
+	/*
+	 * For index relations, WAL-logging and file sync are handled by
+	 * ambuildempty. In contrast, for heap relations, these tasks are performed
+	 * directly.
+	 */
+	if (rel->rd_rel->relkind == RELKIND_INDEX)
+		rel->rd_indam->ambuildempty(rel);
+	else
+	{
+		log_smgrcreate(&rlocator, INIT_FORKNUM);
+		smgrimmedsync(srel, INIT_FORKNUM);
+	}
+
+	/* drop the init fork, mark file then revert persistence at abort */
+	pending = (PendingCleanup *)
+		MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+	pending->rlocator = rlocator;
+	pending->op = PCOP_UNLINK_FORK | PCOP_UNLINK_MARK | PCOP_SET_PERSISTENCE;
+	pending->unlink_forknum = INIT_FORKNUM;
+	pending->unlink_mark = SMGR_MARK_UNCOMMITTED;
+	pending->bufpersistence = true;
+	pending->backend = InvalidBackendId;
+	pending->atCommit = false;
+	pending->nestLevel = GetCurrentTransactionNestLevel();
+	pending->next = pendingCleanups;
+	pendingCleanups = pending;
+
+	/* drop mark file at commit */
+	pending = (PendingCleanup *)
+		MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+	pending->rlocator = rlocator;
+	pending->op = PCOP_UNLINK_MARK;
+	pending->unlink_forknum = INIT_FORKNUM;
+	pending->unlink_mark = SMGR_MARK_UNCOMMITTED;
+	pending->backend = InvalidBackendId;
+	pending->atCommit = true;
+	pending->nestLevel = GetCurrentTransactionNestLevel();
+	pending->next = pendingCleanups;
+	pendingCleanups = pending;
+}
+
+/*
+ * RelationDropInitFork
+ *		Delete physical storage for the init fork of a relation.
+ */
+void
+RelationDropInitFork(Relation rel)
+{
+	RelFileLocator rlocator = rel->rd_locator;
+	PendingCleanup *pending;
+	PendingCleanup *prev;
+	PendingCleanup *next;
+	bool		inxact_created = false;
+
+	/* switch buffer persistence */
+	SetRelationBuffersPersistence(RelationGetSmgr(rel), true, false);
+
+	/*
+	 * Search for a pending-unlink associated with the init-fork of the
+	 * relation. Its presence indicates that the init-fork was created within
+	 * the current transaction.
+	 */
+	prev = NULL;
+	for (pending = pendingCleanups; pending != NULL; pending = next)
+	{
+		next = pending->next;
+
+		if (RelFileLocatorEquals(rlocator, pending->rlocator) &&
+			pending->unlink_forknum != INIT_FORKNUM)
+		{
+			/* unlink list entry */
+			if (prev)
+				prev->next = next;
+			else
+				pendingCleanups = next;
+
+			pfree(pending);
+
+			/* prev does not change */
+
+			inxact_created = true;
+		}
+		else
+			prev = pending;
+	}
+
+	/*
+	 * If the init-fork was created in this transaction, remove both the
+	 * init-fork and mark file. Otherwise, register an at-commit pending-unlink
+	 * for the existing init-fork. See RelationCreateInitFork.
+	 */
+	if (inxact_created)
+	{
+		SMgrRelation srel = smgropen(rlocator, InvalidBackendId);
+		ForkNumber	 forknum = INIT_FORKNUM;
+		BlockNumber	 firstblock = 0;
+
+		/*
+		 * Some AMs initialize init-fork via the buffer manager. To properly
+		 * drop the init-fork, first drop all buffers for the init-fork, then
+		 * unlink the init-fork and the mark file.
+		 */
+		DropRelationBuffers(srel, &forknum, 1, &firstblock);
+		log_smgrunlinkmark(&rlocator, INIT_FORKNUM, SMGR_MARK_UNCOMMITTED);
+		smgrunlinkmark(srel, INIT_FORKNUM, SMGR_MARK_UNCOMMITTED, false);
+		log_smgrunlink(&rlocator, INIT_FORKNUM);
+		smgrunlink(srel, INIT_FORKNUM, false);
+		return;
+	}
+
+	/* register drop of this init fork file at commit */
+	pending = (PendingCleanup *)
+		MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+	pending->rlocator = rlocator;
+	pending->op = PCOP_UNLINK_FORK;
+	pending->unlink_forknum = INIT_FORKNUM;
+	pending->backend = InvalidBackendId;
+	pending->atCommit = true;
+	pending->nestLevel = GetCurrentTransactionNestLevel();
+	pending->next = pendingCleanups;
+	pendingCleanups = pending;
+
+	/* revert buffer-persistence changes at abort */
+	pending = (PendingCleanup *)
+		MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+	pending->rlocator = rlocator;
+	pending->op = PCOP_SET_PERSISTENCE;
+	pending->bufpersistence = false;
+	pending->backend = InvalidBackendId;
+	pending->atCommit = false;
+	pending->nestLevel = GetCurrentTransactionNestLevel();
+	pending->next = pendingCleanups;
+	pendingCleanups = pending;
+}
+
 /*
  * Perform XLogInsert of an XLOG_SMGR_CREATE record to WAL.
  */
@@ -305,6 +503,25 @@ log_smgrunlinkmark(const RelFileLocator *rlocator, ForkNumber forkNum,
 	XLogInsert(RM_SMGR_ID, XLOG_SMGR_MARK | XLR_SPECIAL_REL_UPDATE);
 }
 
+/*
+ * Perform XLogInsert of an XLOG_SMGR_BUFPERSISTENCE record to WAL.
+ */
+void
+log_smgrbufpersistence(const RelFileLocator rlocator, bool persistence)
+{
+	xl_smgr_bufpersistence xlrec;
+
+	/*
+	 * Make an XLOG entry reporting the change of buffer persistence.
+	 */
+	xlrec.rlocator = rlocator;
+	xlrec.persistence = persistence;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+	XLogInsert(RM_SMGR_ID, XLOG_SMGR_BUFPERSISTENCE | XLR_SPECIAL_REL_UPDATE);
+}
+
 /*
  * RelationDropStorage
  *		Schedule unlinking of physical storage at transaction commit.
@@ -858,10 +1075,29 @@ smgrDoPendingCleanups(bool isCommit)
 				srel = smgropen(pending->rlocator, pending->backend);
 
 				Assert((pending->op &
-						~(PCOP_UNLINK_FORK | PCOP_UNLINK_MARK)) == 0);
+						~(PCOP_UNLINK_FORK | PCOP_UNLINK_MARK |
+						  PCOP_SET_PERSISTENCE)) == 0);
+
+				if (pending->op & PCOP_SET_PERSISTENCE)
+				{
+					SetRelationBuffersPersistence(srel, pending->bufpersistence,
+												  InRecovery);
+				}
 
 				if (pending->op & PCOP_UNLINK_FORK)
 				{
+					BlockNumber firstblock = 0;
+
+					/*
+					 * Unlink the fork file. Currently this operation is
+					 * applied only to init-forks. As it is not ceratin that
+					 * the init-fork is not loaded on shared buffers, drop all
+					 * buffers for it.
+					 */
+					Assert(pending->unlink_forknum == INIT_FORKNUM);
+					DropRelationBuffers(srel, &pending->unlink_forknum, 1,
+										&firstblock);
+
 					/* Don't emit wal while recovery. */
 					if (!InRecovery)
 						log_smgrunlink(&pending->rlocator,
@@ -1286,8 +1522,8 @@ smgr_redo(XLogReaderState *record)
 		else
 		{
 			/*
-			 * Delete pending action for this mark file if any. We should have
-			 * at most one entry for this action.
+			 * Delete any pending action for this mark file, if present. There
+			 * should be at most one entry for this action.
 			 */
 			PendingCleanup *prev = NULL;
 
@@ -1311,6 +1547,59 @@ smgr_redo(XLogReaderState *record)
 			}
 		}
 	}
+	else if (info == XLOG_SMGR_BUFPERSISTENCE)
+	{
+		xl_smgr_bufpersistence *xlrec =
+		(xl_smgr_bufpersistence *) XLogRecGetData(record);
+		SMgrRelation reln;
+		PendingCleanup *pending;
+		PendingCleanup *prev = NULL;
+
+		reln = smgropen(xlrec->rlocator, InvalidBackendId);
+		SetRelationBuffersPersistence(reln, xlrec->persistence, true);
+
+		/*
+		 * Delete any pending action for persistence change, if present. There
+		 * should be at most one entry for this action.
+		 */
+		for (pending = pendingCleanups; pending != NULL;
+			 pending = pending->next)
+		{
+			if (RelFileLocatorEquals(xlrec->rlocator, pending->rlocator) &&
+				(pending->op & PCOP_SET_PERSISTENCE) != 0)
+			{
+				Assert(pending->bufpersistence == xlrec->persistence);
+
+				if (prev)
+					prev->next = pending->next;
+				else
+					pendingCleanups = pending->next;
+
+				pfree(pending);
+				break;
+			}
+
+			prev = pending;
+		}
+
+		/*
+		 * During abort, revert any changes to buffer persistence made made in
+		 * this transaction.
+		 */
+		if (!pending)
+		{
+			pending = (PendingCleanup *)
+				MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+			pending->rlocator = xlrec->rlocator;
+			pending->op = PCOP_SET_PERSISTENCE;
+			pending->bufpersistence = !xlrec->persistence;
+			pending->backend = InvalidBackendId;
+			pending->atCommit = false;
+			pending->nestLevel = GetCurrentTransactionNestLevel();
+			pending->next = pendingCleanups;
+			pendingCleanups = pending;
+		}
+	}
 	else
 		elog(PANIC, "smgr_redo: unknown op code %u", info);
 }
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 727f151750..3192a97b0e 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -55,6 +55,7 @@
 #include "commands/defrem.h"
 #include "commands/event_trigger.h"
 #include "commands/policy.h"
+#include "commands/progress.h"
 #include "commands/sequence.h"
 #include "commands/tablecmds.h"
 #include "commands/tablespace.h"
@@ -5464,6 +5465,188 @@ ATParseTransformCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
 	return newcmd;
 }
 
+/*
+ * RelationChangePersistence: perform in-place persistence change of a relation
+ */
+static void
+RelationChangePersistence(AlteredTableInfo *tab, char persistence,
+						  LOCKMODE lockmode)
+{
+	Relation	rel;
+	Relation	classRel;
+	HeapTuple	tuple,
+				newtuple;
+	Datum		new_val[Natts_pg_class];
+	bool		new_null[Natts_pg_class],
+				new_repl[Natts_pg_class];
+	int			i;
+	List	   *relids;
+	ListCell   *lc_oid;
+
+	Assert(tab->rewrite == AT_REWRITE_ALTER_PERSISTENCE);
+	Assert(lockmode == AccessExclusiveLock);
+
+	/*
+	 * Use ATRewriteTable instead of this function under the following
+	 * condition.
+	 */
+	Assert(tab->constraints == NULL && tab->partition_constraint == NULL &&
+		   tab->newvals == NULL && !tab->verify_new_notnull);
+
+	rel = table_open(tab->relid, lockmode);
+
+	Assert(rel->rd_rel->relpersistence != persistence);
+
+	elog(DEBUG1, "perform in-place persistence change");
+
+	/*
+	 * Initially, gather all relations that require a persistence change.
+	 */
+
+	/* Collect OIDs of indexes and toast relations */
+	relids = RelationGetIndexList(rel);
+	relids = lcons_oid(rel->rd_id, relids);
+
+	/* Add toast relation if any */
+	if (OidIsValid(rel->rd_rel->reltoastrelid))
+	{
+		List	   *toastidx;
+		Relation	toastrel = table_open(rel->rd_rel->reltoastrelid, lockmode);
+
+		relids = lappend_oid(relids, rel->rd_rel->reltoastrelid);
+		toastidx = RelationGetIndexList(toastrel);
+		relids = list_concat(relids, toastidx);
+		pfree(toastidx);
+		table_close(toastrel, NoLock);
+	}
+
+	table_close(rel, NoLock);
+
+	/* Make changes in storage */
+	classRel = table_open(RelationRelationId, RowExclusiveLock);
+
+	foreach(lc_oid, relids)
+	{
+		Oid			reloid = lfirst_oid(lc_oid);
+		Relation	r = relation_open(reloid, lockmode);
+
+		/*
+		 * XXXX: Some access methods don't support in-place persistence
+		 * changes. GiST uses page LSNs to figure out whether a block has been
+		 * modified. However, UNLOGGED GiST indexes use fake LSNs, which are
+		 * incompatible with the real LSNs used for LOGGED indexes.
+		 *
+		 * Potentially, if gistGetFakeLSN behaved similarly for both permanent
+		 * and unlogged indexes, we could avoid index rebuilds by emitting
+		 * extra WAL records while the index is unlogged.
+		 *
+		 * Compare relam against a positive list to ensure the hard way is
+		 * taken for unknown AMs.
+		 */
+		if (r->rd_rel->relkind == RELKIND_INDEX &&
+		/* GiST is excluded */
+			r->rd_rel->relam != BTREE_AM_OID &&
+			r->rd_rel->relam != HASH_AM_OID &&
+			r->rd_rel->relam != GIN_AM_OID &&
+			r->rd_rel->relam != SPGIST_AM_OID &&
+			r->rd_rel->relam != BRIN_AM_OID)
+		{
+			int			reindex_flags;
+			ReindexParams params = {0};
+
+			/* reindex doesn't allow concurrent use of the index */
+			table_close(r, NoLock);
+
+			reindex_flags =
+				REINDEX_REL_SUPPRESS_INDEX_USE |
+				REINDEX_REL_CHECK_CONSTRAINTS;
+
+			/* Set the same persistence with the parent relation. */
+			if (persistence == RELPERSISTENCE_UNLOGGED)
+				reindex_flags |= REINDEX_REL_FORCE_INDEXES_UNLOGGED;
+			else
+				reindex_flags |= REINDEX_REL_FORCE_INDEXES_PERMANENT;
+
+			reindex_index(reloid, reindex_flags, persistence, &params);
+
+			continue;
+		}
+
+		/* Create or drop init fork */
+		if (persistence == RELPERSISTENCE_UNLOGGED)
+			RelationCreateInitFork(r);
+		else
+			RelationDropInitFork(r);
+
+		/*
+		 * If this relation becomes WAL-logged, immediately sync all files
+		 * except the init-fork to establish the initial state on storage.  The
+		 * buffers should have already been flushed out by
+		 * RelationCreate(Drop)InitFork called just above. The init-fork should
+		 * already be synchronized as required.
+		 */
+		if (persistence == RELPERSISTENCE_PERMANENT)
+		{
+			for (i = 0; i < INIT_FORKNUM; i++)
+			{
+				if (smgrexists(RelationGetSmgr(r), i))
+					smgrimmedsync(RelationGetSmgr(r), i);
+			}
+		}
+
+		/* Update catalog */
+		tuple = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(reloid));
+		if (!HeapTupleIsValid(tuple))
+			elog(ERROR, "cache lookup failed for relation %u", reloid);
+
+		memset(new_val, 0, sizeof(new_val));
+		memset(new_null, false, sizeof(new_null));
+		memset(new_repl, false, sizeof(new_repl));
+
+		new_val[Anum_pg_class_relpersistence - 1] = CharGetDatum(persistence);
+		new_null[Anum_pg_class_relpersistence - 1] = false;
+		new_repl[Anum_pg_class_relpersistence - 1] = true;
+
+		newtuple = heap_modify_tuple(tuple, RelationGetDescr(classRel),
+									 new_val, new_null, new_repl);
+
+		CatalogTupleUpdate(classRel, &newtuple->t_self, newtuple);
+		heap_freetuple(newtuple);
+
+		/*
+		 * If wal_level >= replica, switching to LOGGED necessitates WAL-logging
+		 * the relation content for later recovery. This is not emitted when
+		 * wal_level = minimal.
+		 */
+		if (persistence == RELPERSISTENCE_PERMANENT && XLogIsNeeded())
+		{
+			ForkNumber	fork;
+			xl_smgr_truncate xlrec;
+
+			xlrec.blkno = 0;
+			xlrec.rlocator = r->rd_locator;
+			xlrec.flags = SMGR_TRUNCATE_ALL;
+
+			XLogBeginInsert();
+			XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+
+			XLogInsert(RM_SMGR_ID, XLOG_SMGR_TRUNCATE | XLR_SPECIAL_REL_UPDATE);
+
+			for (fork = 0; fork < INIT_FORKNUM; fork++)
+			{
+				if (smgrexists(RelationGetSmgr(r), fork))
+					log_newpage_range(r, fork, 0,
+									  smgrnblocks(RelationGetSmgr(r), fork),
+									  false);
+			}
+		}
+
+		table_close(r, NoLock);
+	}
+
+	table_close(classRel, NoLock);
+}
+
 /*
  * ATRewriteTables: ALTER TABLE phase 3
  */
@@ -5594,48 +5777,55 @@ ATRewriteTables(AlterTableStmt *parsetree, List **wqueue, LOCKMODE lockmode,
 										 tab->relid,
 										 tab->rewrite);
 
-			/*
-			 * Create transient table that will receive the modified data.
-			 *
-			 * Ensure it is marked correctly as logged or unlogged.  We have
-			 * to do this here so that buffers for the new relfilenumber will
-			 * have the right persistence set, and at the same time ensure
-			 * that the original filenumbers's buffers will get read in with
-			 * the correct setting (i.e. the original one).  Otherwise a
-			 * rollback after the rewrite would possibly result with buffers
-			 * for the original filenumbers having the wrong persistence
-			 * setting.
-			 *
-			 * NB: This relies on swap_relation_files() also swapping the
-			 * persistence. That wouldn't work for pg_class, but that can't be
-			 * unlogged anyway.
-			 */
-			OIDNewHeap = make_new_heap(tab->relid, NewTableSpace, NewAccessMethod,
-									   persistence, lockmode);
+			if (tab->rewrite == AT_REWRITE_ALTER_PERSISTENCE)
+				RelationChangePersistence(tab, persistence, lockmode);
+			else
+			{
+				/*
+				 * Create transient table that will receive the modified data.
+				 *
+				 * Ensure it is marked correctly as logged or unlogged.  We
+				 * have to do this here so that buffers for the new
+				 * relfilenumber will have the right persistence set, and at
+				 * the same time ensure that the original filenumbers's buffers
+				 * will get read in with the correct setting (i.e. the original
+				 * one).  Otherwise a rollback after the rewrite would possibly
+				 * result with buffers for the original filenumbers having the
+				 * wrong persistence setting.
+				 *
+				 * NB: This relies on swap_relation_files() also swapping the
+				 * persistence. That wouldn't work for pg_class, but that
+				 * can't be unlogged anyway.
+				 */
+				OIDNewHeap = make_new_heap(tab->relid, NewTableSpace,
+										   NewAccessMethod,
+										   persistence, lockmode);
 
-			/*
-			 * Copy the heap data into the new table with the desired
-			 * modifications, and test the current data within the table
-			 * against new constraints generated by ALTER TABLE commands.
-			 */
-			ATRewriteTable(tab, OIDNewHeap, lockmode);
+				/*
+				 * Copy the heap data into the new table with the desired
+				 * modifications, and test the current data within the table
+				 * against new constraints generated by ALTER TABLE commands.
+				 */
+				ATRewriteTable(tab, OIDNewHeap, lockmode);
 
-			/*
-			 * Swap the physical files of the old and new heaps, then rebuild
-			 * indexes and discard the old heap.  We can use RecentXmin for
-			 * the table's new relfrozenxid because we rewrote all the tuples
-			 * in ATRewriteTable, so no older Xid remains in the table.  Also,
-			 * we never try to swap toast tables by content, since we have no
-			 * interest in letting this code work on system catalogs.
-			 */
-			finish_heap_swap(tab->relid, OIDNewHeap,
-							 false, false, true,
-							 !OidIsValid(tab->newTableSpace),
-							 RecentXmin,
-							 ReadNextMultiXactId(),
-							 persistence);
-
-			InvokeObjectPostAlterHook(RelationRelationId, tab->relid, 0);
+				/*
+				 * Swap the physical files of the old and new heaps, then
+				 * rebuild indexes and discard the old heap.  We can use
+				 * RecentXmin for the table's new relfrozenxid because we
+				 * rewrote all the tuples in ATRewriteTable, so no older Xid
+				 * remains in the table.  Also, we never try to swap toast
+				 * tables by content, since we have no interest in letting
+				 * this code work on system catalogs.
+				 */
+				finish_heap_swap(tab->relid, OIDNewHeap,
+								 false, false, true,
+								 !OidIsValid(tab->newTableSpace),
+								 RecentXmin,
+								 ReadNextMultiXactId(),
+								 persistence);
+
+				InvokeObjectPostAlterHook(RelationRelationId, tab->relid, 0);
+			}
 		}
 		else if (tab->rewrite > 0 && tab->relkind == RELKIND_SEQUENCE)
 		{
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index df22aaa1c5..7d21fd5ac5 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -3708,6 +3708,90 @@ DropRelationBuffers(SMgrRelation smgr_reln, ForkNumber *forkNum,
 	}
 }
 
+/* ---------------------------------------------------------------------
+ *		SetRelationBuffersPersistence
+ *
+ *		This function changes the persistence of all buffer pages of a relation
+ *		then writes all dirty pages to disk (or kernel disk buffers) when
+ *		switching to PERMANENT, ensuring the kernel has an up-to-date view of
+ *		the relation.
+ *
+ *		The caller must be holding AccessExclusiveLock on the target relation
+ *		to ensure no other backend is busy dirtying more blocks.
+ *
+ *		XXX currently it sequentially searches the buffer pool; consider
+ *		implementing more efficient search methods.  This routine isn't used in
+ *		performance-critical code paths, so it's not worth additional overhead
+ *		to make it go faster; see also DropRelationBuffers.
+ *		--------------------------------------------------------------------
+ */
+void
+SetRelationBuffersPersistence(SMgrRelation srel, bool permanent, bool isRedo)
+{
+	int			i;
+	RelFileLocatorBackend rlocator = srel->smgr_rlocator;
+
+	Assert(!RelFileLocatorBackendIsTemp(rlocator));
+
+	if (!isRedo)
+		log_smgrbufpersistence(srel->smgr_rlocator.locator, permanent);
+
+	ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
+
+	for (i = 0; i < NBuffers; i++)
+	{
+		BufferDesc *bufHdr = GetBufferDescriptor(i);
+		uint32		buf_state;
+
+		if (!RelFileLocatorEquals(BufTagGetRelFileLocator(&bufHdr->tag),
+								  rlocator.locator))
+			continue;
+
+		ReservePrivateRefCountEntry();
+
+		buf_state = LockBufHdr(bufHdr);
+
+		if (!RelFileLocatorEquals(BufTagGetRelFileLocator(&bufHdr->tag),
+								  rlocator.locator))
+		{
+			UnlockBufHdr(bufHdr, buf_state);
+			continue;
+		}
+
+		if (permanent)
+		{
+			/* The init fork is being dropped, drop buffers for it. */
+			if (BufTagGetForkNum(&bufHdr->tag) == INIT_FORKNUM)
+			{
+				InvalidateBuffer(bufHdr);
+				continue;
+			}
+
+			buf_state |= BM_PERMANENT;
+			pg_atomic_write_u32(&bufHdr->state, buf_state);
+
+			/* flush this buffer when switching to PERMANENT */
+			if ((buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
+			{
+				PinBuffer_Locked(bufHdr);
+				LWLockAcquire(BufferDescriptorGetContentLock(bufHdr),
+							  LW_SHARED);
+				FlushBuffer(bufHdr, srel, IOOBJECT_RELATION, IOCONTEXT_NORMAL);
+				LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
+				UnpinBuffer(bufHdr);
+			}
+			else
+				UnlockBufHdr(bufHdr, buf_state);
+		}
+		else
+		{
+			/* There shouldn't be an init fork */
+			Assert(BufTagGetForkNum(&bufHdr->tag) != INIT_FORKNUM);
+			UnlockBufHdr(bufHdr, buf_state);
+		}
+	}
+}
+
 /* ---------------------------------------------------------------------
  *		DropRelationsAllBuffers
  *
diff --git a/src/backend/storage/file/reinit.c b/src/backend/storage/file/reinit.c
index e84fcbf884..a5d8763e15 100644
--- a/src/backend/storage/file/reinit.c
+++ b/src/backend/storage/file/reinit.c
@@ -38,6 +38,7 @@ typedef struct
 {
 	RelFileNumber	relNumber;		/* hash key */
 	bool			has_init;		/* has INIT fork */
+	bool			dirty_init;		/* needs to remove INIT fork */
 	bool			dirty_all;		/* needs to remove all forks */
 }  relfile_entry;
 
@@ -45,7 +46,10 @@ typedef struct
  * Clean up and reset relation files from before the last restart.
  *
  * If op includes UNLOGGED_RELATION_CLEANUP, we perform different operations
- * depending on the existence of mark files.
+ * depending on the existence of the "cleanup" forks.
+ *
+ * If SMGR_MARK_UNCOMMITTED mark file for init fork is present, we remove the
+ * init fork along with the mark file.
  *
  * If SMGR_MARK_UNCOMMITTED mark file for main fork is present we remove the
  * whole relation along with the mark file.
@@ -54,7 +58,7 @@ typedef struct
  * with the "init" fork, except for the "init" fork itself.
  *
  * If op includes UNLOGGED_RELATION_DROP_BUFFER, we drop all buffers for all
- * relations that are to be cleaned up.
+ * relations that have the "cleanup" and/or the "init" forks.
  *
  * If op includes UNLOGGED_RELATION_INIT, we copy the "init" fork to the main
  * fork.
@@ -241,7 +245,7 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname,
 			 * Put the OID portion of the name into the hash table,
 			 * if it isn't already.  If it has SMGR_MARK_UNCOMMITTED mark
 			 * files, the storage file is in dirty state, where clean up is
-			 * needed.
+			 * needed.  isn't already.
 			 */
 			key = atooid(de->d_name);
 			ent = hash_search(hash, &key, HASH_ENTER, &found);
@@ -249,10 +253,13 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname,
 			if (!found)
 			{
 				ent->has_init = false;
+				ent->dirty_init = false;
 				ent->dirty_all = false;
 			}
 
-			if (forkNum == MAIN_FORKNUM && mark == SMGR_MARK_UNCOMMITTED)
+			if (forkNum == INIT_FORKNUM && mark == SMGR_MARK_UNCOMMITTED)
+				ent->dirty_init = true;
+			else if (forkNum == MAIN_FORKNUM && mark == SMGR_MARK_UNCOMMITTED)
 				ent->dirty_all = true;
 			else
 			{
@@ -276,11 +283,10 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname,
 	{
 		/*
 		 * When we come here after recovery, smgr object for this file might
-		 * have been created. In that case we need to drop all buffers then the
-		 * smgr object.  Otherwise checkpointer wrongly tries to flush buffers
-		 * for nonexistent relation storage. This is safe as far as no other
-		 * backends have accessed the relation before starting archive
-		 * recovery.
+		 * have been created. In that case we need to drop all buffers then
+		 * the smgr object before initializing the unlogged relation.  This is
+		 * safe as far as no other backends have accessed the relation before
+		 * starting archive recovery.
 		 */
 		HASH_SEQ_STATUS status;
 		relfile_entry *ent;
@@ -296,6 +302,13 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname,
 		{
 			RelFileLocatorBackend rel;
 
+			/*
+			 * The relation is persistent and stays persistent. Don't drop the
+			 * buffers for this relation.
+			 */
+			if (ent->has_init && ent->dirty_init)
+				continue;
+
 			if (maxrels <= nrels)
 			{
 				maxrels *= 2;
@@ -352,8 +365,24 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname,
 				if (!ent->has_init)
 					continue;
 
-				if (forkNum == INIT_FORKNUM && mark == SMGR_MARK_NONE)
-					continue;
+				if (ent->dirty_init)
+				{
+					/*
+					 * The crashed transaction did SET UNLOGGED. This relation
+					 * is restored to a LOGGED relation.
+					 */
+					if (forkNum != INIT_FORKNUM)
+						continue;
+				}
+				else
+				{
+					/*
+					 * we don't remove the INIT fork of a non-dirty
+					 * relation files.
+					 */
+					if (forkNum == INIT_FORKNUM && mark == SMGR_MARK_NONE)
+						continue;
+				}
 			}
 
 			/* so, nuke it! */
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index e9e4bafb01..ddc8014e55 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -434,6 +434,12 @@ extractPageInfo(XLogReaderState *record)
 		 * empty so we don't need to bother the content.
 		 */
 	}
+	else if (rmid == RM_SMGR_ID && rminfo == XLOG_SMGR_BUFPERSISTENCE)
+	{
+		/*
+		 * We can safely ignore these. These don't make any on-disk changes.
+		 */
+	}
 	else if (rmid == RM_XACT_ID &&
 			 ((rminfo & XLOG_XACT_OPMASK) == XLOG_XACT_COMMIT ||
 			  (rminfo & XLOG_XACT_OPMASK) == XLOG_XACT_COMMIT_PREPARED ||
diff --git a/src/include/catalog/storage_xlog.h b/src/include/catalog/storage_xlog.h
index a36646c6ee..847660b6af 100644
--- a/src/include/catalog/storage_xlog.h
+++ b/src/include/catalog/storage_xlog.h
@@ -62,6 +62,12 @@ typedef struct xl_smgr_mark
 	smgr_mark_action action;
 } xl_smgr_mark;
 
+typedef struct xl_smgr_bufpersistence
+{
+	RelFileLocator rlocator;
+	bool		persistence;
+} xl_smgr_bufpersistence;
+
 /* flags for xl_smgr_truncate */
 #define SMGR_TRUNCATE_HEAP		0x0001
 #define SMGR_TRUNCATE_VM		0x0002
@@ -82,6 +88,8 @@ extern void log_smgrcreatemark(const RelFileLocator *rlocator,
 							   ForkNumber forkNum, StorageMarks mark);
 extern void log_smgrunlinkmark(const RelFileLocator *rlocator,
 							   ForkNumber forkNum, StorageMarks mark);
+extern void log_smgrbufpersistence(const RelFileLocator rlocator,
+								   bool persistence);
 
 extern void smgr_redo(XLogReaderState *record);
 extern void smgr_desc(StringInfo buf, XLogReaderState *record);
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 0f5fb6be00..e3d9273710 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -222,6 +222,8 @@ extern void DropRelationBuffers(struct SMgrRelationData *smgr_reln,
 								int nforks, BlockNumber *firstDelBlock);
 extern void DropRelationsAllBuffers(struct SMgrRelationData **smgr_reln,
 									int nlocators);
+extern void SetRelationBuffersPersistence(struct SMgrRelationData *srel,
+										  bool permanent, bool isRedo);
 extern void DropDatabaseBuffers(Oid dbid);
 
 #define RelationGetNumberOfBlocks(reln) \
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 057e3d3104..f72c5c8a9e 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3939,6 +3939,7 @@ xl_replorigin_set
 xl_restore_point
 xl_running_xacts
 xl_seq_rec
+xl_smgr_bufpersistence
 xl_smgr_create
 xl_smgr_mark
 xl_smgr_truncate
-- 
2.25.1

