From 2a9c103b9ce034647ec878da10c7b194ccebea20 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <bossartn@amazon.com>
Date: Sun, 12 Dec 2021 22:07:11 -0800
Subject: [PATCH v5 6/8] Move removal of old logical rewrite mapping files to
 custodian.

If there are many such files to remove, checkpoints can take much
longer.  To avoid this, move this work to the newly-introduced
custodian process.
---
 src/backend/access/heap/rewriteheap.c | 83 +++++++++++++++++++++++----
 src/backend/postmaster/checkpointer.c | 33 +++++++++++
 src/backend/postmaster/custodian.c    | 10 ++++
 src/include/access/rewriteheap.h      |  1 +
 src/include/postmaster/bgwriter.h     |  3 +
 5 files changed, 120 insertions(+), 10 deletions(-)

diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index 2a53826736..c5a1103687 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -116,10 +116,13 @@
 #include "lib/ilist.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "postmaster/bgwriter.h"
+#include "postmaster/interrupt.h"
 #include "replication/logical.h"
 #include "replication/slot.h"
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
+#include "storage/proc.h"
 #include "storage/procarray.h"
 #include "storage/smgr.h"
 #include "utils/memutils.h"
@@ -1182,7 +1185,8 @@ heap_xlog_logical_rewrite(XLogReaderState *r)
  * Perform a checkpoint for logical rewrite mappings
  *
  * This serves two tasks:
- * 1) Remove all mappings not needed anymore based on the logical restart LSN
+ * 1) Alert the custodian to remove all mappings not needed anymore based on the
+ *    logical restart LSN
  * 2) Flush all remaining mappings to disk, so that replay after a checkpoint
  *	  only has to deal with the parts of a mapping that have been written out
  *	  after the checkpoint started.
@@ -1210,6 +1214,11 @@ CheckPointLogicalRewriteHeap(void)
 	if (cutoff != InvalidXLogRecPtr && redo < cutoff)
 		cutoff = redo;
 
+	/* let the custodian know what it can remove */
+	CheckPointSetLogicalRewriteCutoff(cutoff);
+	if (ProcGlobal->custodianLatch)
+		SetLatch(ProcGlobal->custodianLatch);
+
 	mappings_dir = AllocateDir("pg_logical/mappings");
 	while ((mapping_de = ReadDir(mappings_dir, "pg_logical/mappings")) != NULL)
 	{
@@ -1240,15 +1249,7 @@ CheckPointLogicalRewriteHeap(void)
 
 		lsn = ((uint64) hi) << 32 | lo;
 
-		if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
-		{
-			elog(DEBUG1, "removing logical rewrite file \"%s\"", path);
-			if (unlink(path) < 0)
-				ereport(ERROR,
-						(errcode_for_file_access(),
-						 errmsg("could not remove file \"%s\": %m", path)));
-		}
-		else
+		if (lsn >= cutoff && cutoff != InvalidXLogRecPtr)
 		{
 			/* on some operating systems fsyncing a file requires O_RDWR */
 			int			fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
@@ -1286,3 +1287,65 @@ CheckPointLogicalRewriteHeap(void)
 	/* persist directory entries to disk */
 	fsync_fname("pg_logical/mappings", true);
 }
+
+/*
+ * Remove all mappings not needed anymore based on the logical restart LSN saved
+ * by the checkpointer.  We use this saved value instead of calling
+ * ReplicationSlotsComputeLogicalRestartLSN() so that we don't interfere with an
+ * ongoing call to CheckPointLogicalRewriteHeap() that is flushing mappings to
+ * disk.
+ */
+void
+RemoveOldLogicalRewriteMappings(void)
+{
+	XLogRecPtr	cutoff;
+	DIR		   *mappings_dir;
+	struct dirent *mapping_de;
+	char		path[MAXPGPATH + 20];
+	bool		value_set = false;
+
+	cutoff = CheckPointGetLogicalRewriteCutoff(&value_set);
+	if (!value_set)
+		return;
+
+	mappings_dir = AllocateDir("pg_logical/mappings");
+	while (!ShutdownRequestPending &&
+		   (mapping_de = ReadDir(mappings_dir, "pg_logical/mappings")) != NULL)
+	{
+		struct stat statbuf;
+		Oid			dboid;
+		Oid			relid;
+		XLogRecPtr	lsn;
+		TransactionId rewrite_xid;
+		TransactionId create_xid;
+		uint32		hi,
+					lo;
+
+		if (strcmp(mapping_de->d_name, ".") == 0 ||
+			strcmp(mapping_de->d_name, "..") == 0)
+			continue;
+
+		snprintf(path, sizeof(path), "pg_logical/mappings/%s", mapping_de->d_name);
+		if (lstat(path, &statbuf) == 0 && !S_ISREG(statbuf.st_mode))
+			continue;
+
+		/* Skip over files that cannot be ours. */
+		if (strncmp(mapping_de->d_name, "map-", 4) != 0)
+			continue;
+
+		if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
+				   &dboid, &relid, &hi, &lo, &rewrite_xid, &create_xid) != 6)
+			elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
+
+		lsn = ((uint64) hi) << 32 | lo;
+		if (lsn >= cutoff && cutoff != InvalidXLogRecPtr)
+			continue;
+
+		elog(DEBUG1, "removing logical rewrite file \"%s\"", path);
+		if (unlink(path) < 0)
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not remove file \"%s\": %m", path)));
+	}
+	FreeDir(mappings_dir);
+}
diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c
index 4488e3a443..666f2a0368 100644
--- a/src/backend/postmaster/checkpointer.c
+++ b/src/backend/postmaster/checkpointer.c
@@ -128,6 +128,9 @@ typedef struct
 	uint32		num_backend_writes; /* counts user backend buffer writes */
 	uint32		num_backend_fsync;	/* counts user backend fsync calls */
 
+	XLogRecPtr	logical_rewrite_mappings_cutoff;	/* can remove older mappings */
+	bool		logical_rewrite_mappings_cutoff_set;
+
 	int			num_requests;	/* current # of requests */
 	int			max_requests;	/* allocated array size */
 	CheckpointerRequest requests[FLEXIBLE_ARRAY_MEMBER];
@@ -1342,3 +1345,33 @@ FirstCallSinceLastCheckpoint(void)
 
 	return FirstCall;
 }
+
+/*
+ * Used by CheckPointLogicalRewriteHeap() to tell the custodian which logical
+ * rewrite mapping files it can remove.
+ */
+void
+CheckPointSetLogicalRewriteCutoff(XLogRecPtr cutoff)
+{
+	SpinLockAcquire(&CheckpointerShmem->ckpt_lck);
+	CheckpointerShmem->logical_rewrite_mappings_cutoff = cutoff;
+	CheckpointerShmem->logical_rewrite_mappings_cutoff_set = true;
+	SpinLockRelease(&CheckpointerShmem->ckpt_lck);
+}
+
+/*
+ * Used by the custodian to determine which logical rewrite mapping files it can
+ * remove.
+ */
+XLogRecPtr
+CheckPointGetLogicalRewriteCutoff(bool *value_set)
+{
+	XLogRecPtr	cutoff;
+
+	SpinLockAcquire(&CheckpointerShmem->ckpt_lck);
+	cutoff = CheckpointerShmem->logical_rewrite_mappings_cutoff;
+	*value_set = CheckpointerShmem->logical_rewrite_mappings_cutoff_set;
+	SpinLockRelease(&CheckpointerShmem->ckpt_lck);
+
+	return cutoff;
+}
diff --git a/src/backend/postmaster/custodian.c b/src/backend/postmaster/custodian.c
index 8591c5db9b..7f914a617f 100644
--- a/src/backend/postmaster/custodian.c
+++ b/src/backend/postmaster/custodian.c
@@ -36,6 +36,7 @@
 
 #include <time.h>
 
+#include "access/rewriteheap.h"
 #include "libpq/pqsignal.h"
 #include "pgstat.h"
 #include "postmaster/custodian.h"
@@ -219,6 +220,15 @@ CustodianMain(void)
 		 */
 		RemoveOldSerializedSnapshots();
 
+		/*
+		 * Remove logical rewrite mapping files that are no longer needed.
+		 *
+		 * It is not important for these to be removed in single-user mode, so
+		 * we don't need any extra handling outside of the custodian process for
+		 * this.
+		 */
+		RemoveOldLogicalRewriteMappings();
+
 		/* Calculate how long to sleep */
 		end_time = (pg_time_t) time(NULL);
 		elapsed_secs = end_time - start_time;
diff --git a/src/include/access/rewriteheap.h b/src/include/access/rewriteheap.h
index aa5c48f219..f493094557 100644
--- a/src/include/access/rewriteheap.h
+++ b/src/include/access/rewriteheap.h
@@ -53,5 +53,6 @@ typedef struct LogicalRewriteMappingData
  */
 #define LOGICAL_REWRITE_FORMAT "map-%x-%x-%X_%X-%x-%x"
 void		CheckPointLogicalRewriteHeap(void);
+void		RemoveOldLogicalRewriteMappings(void);
 
 #endif							/* REWRITE_HEAP_H */
diff --git a/src/include/postmaster/bgwriter.h b/src/include/postmaster/bgwriter.h
index 2882efd67b..051e6732cb 100644
--- a/src/include/postmaster/bgwriter.h
+++ b/src/include/postmaster/bgwriter.h
@@ -42,4 +42,7 @@ extern void CheckpointerShmemInit(void);
 
 extern bool FirstCallSinceLastCheckpoint(void);
 
+extern void CheckPointSetLogicalRewriteCutoff(XLogRecPtr cutoff);
+extern XLogRecPtr CheckPointGetLogicalRewriteCutoff(bool *value_set);
+
 #endif							/* _BGWRITER_H */
-- 
2.25.1

