From fa507e1aa7923bb46b907847c2d6555c78a2219c Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandbossart@gmail.com>
Date: Fri, 11 Feb 2022 09:43:57 -0800
Subject: [PATCH v4 8/8] Move removal of spilled logical slot data to
 custodian.

If there are many such files, startup can take much longer than
necessary.  To handle this, startup creates a new slot directory,
copies the state file, and swaps the new directory with the old
one.  The custodian then asynchronously cleans up the old slot
directory.
---
 src/backend/access/transam/xlog.c             |  15 +-
 src/backend/postmaster/custodian.c            |  14 +
 .../replication/logical/reorderbuffer.c       | 292 +++++++++++++++++-
 src/backend/replication/slot.c                |   4 +
 src/include/replication/reorderbuffer.h       |   1 +
 5 files changed, 317 insertions(+), 9 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 07aaee1c07..4d18798387 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7155,18 +7155,21 @@ StartupXLOG(void)
 					 checkPoint.newestCommitTsXid);
 	XLogCtl->ckptFullXid = checkPoint.nextXid;
 
-	/*
-	 * Initialize replication slots, before there's a chance to remove
-	 * required resources.
-	 */
-	StartupReplicationSlots();
-
 	/*
 	 * Startup logical state, needs to be setup now so we have proper data
 	 * during crash recovery.
+	 *
+	 * NB: This also performs some important cleanup that must be done prior to
+	 * other replication slot steps (e.g., StartupReplicationSlots()).
 	 */
 	StartupReorderBuffer();
 
+	/*
+	 * Initialize replication slots, before there's a chance to remove
+	 * required resources.
+	 */
+	StartupReplicationSlots();
+
 	/*
 	 * Startup CLOG. This must be done after ShmemVariableCache->nextXid has
 	 * been initialized and before we accept connections or begin WAL replay.
diff --git a/src/backend/postmaster/custodian.c b/src/backend/postmaster/custodian.c
index 9c5479b5cf..fdc614b1bd 100644
--- a/src/backend/postmaster/custodian.c
+++ b/src/backend/postmaster/custodian.c
@@ -41,6 +41,7 @@
 #include "pgstat.h"
 #include "postmaster/custodian.h"
 #include "postmaster/interrupt.h"
+#include "replication/reorderbuffer.h"
 #include "replication/snapbuild.h"
 #include "storage/bufmgr.h"
 #include "storage/condition_variable.h"
@@ -209,6 +210,19 @@ CustodianMain(void)
 		 */
 		RemovePgTempFiles(false, false);
 
+		/*
+		 * Remove any replication slot directories that have been staged for
+		 * deletion.  Since slot directories can accumulate many files, removing
+		 * all of the files during startup (which we used to do) can take a very
+		 * long time.  To avoid delaying startup, we simply have startup rename
+		 * the slot directories, and we clean them up here.
+		 *
+		 * Replication slot directories are not staged or cleaned in single-user
+		 * mode, so we don't need any extra handling outside of the custodian
+		 * process for this.
+		 */
+		RemoveStagedSlotDirectories();
+
 		/*
 		 * Remove serialized snapshots that are no longer required by any
 		 * logical replication slot.
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c2d9be81fa..ab51e41229 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -126,15 +126,19 @@
 #include "access/xlog_internal.h"
 #include "catalog/catalog.h"
 #include "commands/sequence.h"
+#include "common/string.h"
 #include "lib/binaryheap.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "postmaster/interrupt.h"
 #include "replication/logical.h"
 #include "replication/reorderbuffer.h"
 #include "replication/slot.h"
 #include "replication/snapbuild.h"	/* just for SnapBuildSnapDecRefcount */
 #include "storage/bufmgr.h"
+#include "storage/copydir.h"
 #include "storage/fd.h"
+#include "storage/proc.h"
 #include "storage/sinval.h"
 #include "utils/builtins.h"
 #include "utils/combocid.h"
@@ -297,12 +301,15 @@ static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn
 static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 									 bool txn_prepared);
 static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
+static void ReorderBufferCleanup(const char *slotname);
 static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
 										TransactionId xid, XLogSegNo segno);
 
 static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
 static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
 									  ReorderBufferTXN *txn, CommandId cid);
+static void StageSlotDirForRemoval(const char *slotname, const char *slotpath);
+static void RemoveStagedSlotDirectory(const char *path);
 
 /*
  * ---------------------------------------
@@ -4835,6 +4842,202 @@ ReorderBufferCleanupSerializedTXNs(const char *slotname)
 	FreeDir(spill_dir);
 }
 
+/*
+ * Cleanup everything in the logical slot directory except for the "state" file.
+ * This is specially written for StartupReorderBuffer(), which has special logic
+ * to handle crashes at inconvenient times.
+ *
+ * NB: If anything except for the "state" file cannot be removed after startup,
+ * this will need to be updated.
+ */
+static void
+ReorderBufferCleanup(const char *slotname)
+{
+	char		path[MAXPGPATH];
+	char		newpath[MAXPGPATH];
+	char		statepath[MAXPGPATH];
+	char		newstatepath[MAXPGPATH];
+	struct stat statbuf;
+
+	sprintf(path, "pg_replslot/%s", slotname);
+	sprintf(newpath, "pg_replslot/%s.new", slotname);
+	sprintf(statepath, "pg_replslot/%s/state", slotname);
+	sprintf(newstatepath, "pg_replslot/%s.new/state", slotname);
+
+	/* we're only handling directories here, skip if it's not ours */
+	if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
+		return;
+
+	/*
+	 * Build our new slot directory, suffixed with ".new".  The caller (likely
+	 * StartupReorderBuffer()) should have already ensured that any pre-existing
+	 * ".new" directories leftover after a crash have been cleaned up.
+	 */
+	if (MakePGDirectory(newpath) < 0)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not create directory \"%s\": %m", newpath)));
+
+	copy_file(statepath, newstatepath);
+
+	fsync_fname(newstatepath, false);
+	fsync_fname(newpath, true);
+	fsync_fname("pg_replslot", true);
+
+	/*
+	 * Move the slot directory aside for cleanup by the custodian.  After this
+	 * step, there will be no slot directory.  StartupReorderBuffer() has
+	 * special logic to make sure we don't lose the slot if we crash at this
+	 * point.
+	 */
+	StageSlotDirForRemoval(slotname, path);
+
+	/*
+	 * Move our ".new" directory to become our new slot directory.
+	 */
+	if (rename(newpath, path) != 0)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not rename file \"%s\": %m", newpath)));
+
+	fsync_fname(path, true);
+	fsync_fname("pg_replslot", true);
+}
+
+/*
+ * This function renames the given directory with a special suffix that the
+ * custodian will know to look for.  An integer is appended to the end of the
+ * new directory name in case previously staged slot directories have not yet
+ * been removed.
+ */
+static void
+StageSlotDirForRemoval(const char *slotname, const char *slotpath)
+{
+	char		stage_path[MAXPGPATH];
+
+	/*
+	 * Find a name for the stage directory.  We just increment an integer at the
+	 * end of the name until we find one that doesn't exist.
+	 */
+	for (int n = 0; n <= INT_MAX; n++)
+	{
+		DIR		   *dir;
+
+		sprintf(stage_path, "pg_replslot/%s.to_remove_%d", slotname, n);
+
+		dir = AllocateDir(stage_path);
+		if (dir == NULL)
+		{
+			if (errno == ENOENT)
+				break;
+
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not open directory \"%s\": %m",
+							stage_path)));
+		}
+		FreeDir(dir);
+
+		stage_path[0] = '\0';
+	}
+
+	/*
+	 * In the unlikely event that we couldn't find a name for the stage
+	 * directory, bail out.
+	 */
+	if (stage_path[0] == '\0')
+		ereport(ERROR,
+				(errmsg("could not stage \"%s\" for deletion",
+						slotpath)));
+
+	/*
+	 * Rename the slot directory.
+	 */
+	if (rename(slotpath, stage_path) != 0)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not rename file \"%s\": %m", slotpath)));
+
+	fsync_fname(stage_path, true);
+	fsync_fname("pg_replslot", true);
+}
+
+/*
+ * Remove slot directories that have been staged for deletion by
+ * ReorderBufferCleanup().
+ */
+void
+RemoveStagedSlotDirectories(void)
+{
+	DIR		   *dir;
+	struct dirent *de;
+
+	dir = AllocateDir("pg_replslot");
+	while (!ShutdownRequestPending &&
+		   (de = ReadDir(dir, "pg_replslot")) != NULL)
+	{
+		struct stat st;
+		char		path[MAXPGPATH];
+
+		if (strstr(de->d_name, ".to_remove") == NULL)
+			continue;
+
+		sprintf(path, "pg_replslot/%s", de->d_name);
+		if (lstat(path, &st) != 0)
+			ereport(ERROR,
+					(errmsg("could not stat file \"%s\": %m", path)));
+
+		if (!S_ISDIR(st.st_mode))
+			continue;
+
+		RemoveStagedSlotDirectory(path);
+	}
+	FreeDir(dir);
+}
+
+/*
+ * Removes one slot directory that has been staged for deletion by
+ * ReorderBufferCleanup().  If a shutdown request is pending, exit as soon as
+ * possible.
+ */
+static void
+RemoveStagedSlotDirectory(const char *path)
+{
+	DIR		   *dir;
+	struct dirent *de;
+
+	dir = AllocateDir(path);
+	while (!ShutdownRequestPending &&
+		   (de = ReadDir(dir, path)) != NULL)
+	{
+		struct stat st;
+		char		filepath[MAXPGPATH];
+
+		if (strcmp(de->d_name, ".") == 0 ||
+			strcmp(de->d_name, "..") == 0)
+			continue;
+
+		sprintf(filepath, "%s/%s", path, de->d_name);
+
+		if (lstat(filepath, &st) < 0)
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not stat file \"%s\": %m", filepath)));
+		else if (S_ISDIR(st.st_mode))
+			RemoveStagedSlotDirectory(filepath);
+		else if (unlink(filepath) < 0)
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not remove file \"%s\": %m", filepath)));
+	}
+	FreeDir(dir);
+
+	if (rmdir(path) < 0)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not remove directory \"%s\": %m", path)));
+}
+
 /*
  * Given a replication slot, transaction ID and segment number, fill in the
  * corresponding spill file into 'path', which is a caller-owned buffer of size
@@ -4863,6 +5066,83 @@ StartupReorderBuffer(void)
 	DIR		   *logical_dir;
 	struct dirent *logical_de;
 
+	/*
+	 * First, handle any ".new" directories that were leftover after a crash.
+	 * These are created and swapped with the actual replication slot
+	 * directories so that cleanup of spilled data can be done asynchronously by
+	 * the custodian.
+	 */
+	logical_dir = AllocateDir("pg_replslot");
+	while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
+	{
+		char		name[NAMEDATALEN];
+		char		path[NAMEDATALEN + 12];
+		struct stat statbuf;
+
+		if (strcmp(logical_de->d_name, ".") == 0 ||
+			strcmp(logical_de->d_name, "..") == 0)
+			continue;
+
+		/*
+		 * Make sure it's a valid ".new" directory.
+		 */
+		if (!pg_str_endswith(logical_de->d_name, ".new") ||
+			strlen(logical_de->d_name) >= NAMEDATALEN + 4)
+			continue;
+
+		strncpy(name, logical_de->d_name, sizeof(name));
+		name[strlen(logical_de->d_name) - 4] = '\0';
+		if (!ReplicationSlotValidateName(name, DEBUG2))
+			continue;
+
+		sprintf(path, "pg_replslot/%s", name);
+		if (lstat(path, &statbuf) == 0)
+		{
+			if (!S_ISDIR(statbuf.st_mode))
+				continue;
+
+			/*
+			 * If the original directory still exists, just delete the ".new"
+			 * directory.  We'll try again when we call ReorderBufferCleanup()
+			 * later on.
+			 */
+			if (!rmtree(path, true))
+				ereport(ERROR,
+						(errmsg("could not remove directory \"%s\"", path)));
+		}
+		else if (errno == ENOENT)
+		{
+			char		newpath[NAMEDATALEN + 16];
+
+			/*
+			 * If the original directory is gone, we need to rename the ".new"
+			 * directory to take its place.  We know that the ".new" directory
+			 * is ready to be the real deal if we previously made it far enough
+			 * to delete the original directory.
+			 */
+			sprintf(newpath, "pg_replslot/%s", logical_de->d_name);
+			if (rename(newpath, path) != 0)
+				ereport(ERROR,
+						(errcode_for_file_access(),
+						 errmsg("could not rename file \"%s\" to \"%s\": %m",
+								newpath, path)));
+
+			fsync_fname(path, true);
+		}
+		else
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not stat file \"%s\": %m", path)));
+
+		fsync_fname("pg_replslot", true);
+	}
+	FreeDir(logical_dir);
+
+	/*
+	 * Now we can proceed with deleting all spilled data.  (This actually just
+	 * moves the directories aside so that the custodian can clean it up
+	 * asynchronously.)
+	 */
 	logical_dir = AllocateDir("pg_replslot");
 	while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
 	{
@@ -4875,12 +5155,18 @@ StartupReorderBuffer(void)
 			continue;
 
 		/*
-		 * ok, has to be a surviving logical slot, iterate and delete
-		 * everything starting with xid-*
+		 * ok, has to be a surviving logical slot, delete everything except for
+		 * state
 		 */
-		ReorderBufferCleanupSerializedTXNs(logical_de->d_name);
+		ReorderBufferCleanup(logical_de->d_name);
 	}
 	FreeDir(logical_dir);
+
+	/*
+	 * Wake up the custodian so it cleans up our old slot data.
+	 */
+	if (ProcGlobal->custodianLatch)
+		SetLatch(ProcGlobal->custodianLatch);
 }
 
 /* ---------------------------------------
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index e5e0cf8768..c45f8cf94d 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1430,6 +1430,10 @@ StartupReplicationSlots(void)
 			continue;
 		}
 
+		/* if it's an old slot directory that's staged for removal, ignore it */
+		if (strstr(replication_de->d_name, ".to_remove") != NULL)
+			continue;
+
 		/* looks like a slot in a normal state, restore */
 		RestoreSlotFromDisk(replication_de->d_name);
 	}
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 859424bbd9..ff56ae0b22 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -719,6 +719,7 @@ TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb);
 void		ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
 
 void		StartupReorderBuffer(void);
+void		RemoveStagedSlotDirectories(void);
 
 bool		ReorderBufferSequenceIsTransactional(ReorderBuffer *rb,
 												 RelFileNode rnode, bool created);
-- 
2.25.1

