From 62799e4546aa0a15b2a09f6b14900d785d64f42f Mon Sep 17 00:00:00 2001
From: Jeff Davis <jeff@j-davis.com>
Date: Sat, 6 Nov 2021 13:01:38 -0700
Subject: [PATCH] Extensible rmgr.

Allow extensions to specify a new custom rmgr, which allows
specialized WAL. This is meant to be used by a custom Table Access
Method, which would not otherwise be able to offer logical
decoding/replication. It may also be used by new Index Access Methods.

Prior to this commit, only Generic WAL was available, which offers
support for recovery and physical replication but not logical
replication.

Reviewed-by: Julien Rouhaud
Discussion: https://postgr.es/m/ed1fb2e22d15d3563ae0eb610f7b61bb15999c0a.camel%40j-davis.com
---
 src/backend/access/transam/rmgr.c        | 91 ++++++++++++++++++++++++
 src/backend/access/transam/xlog.c        | 22 +++---
 src/backend/access/transam/xlogreader.c  |  2 +-
 src/backend/replication/logical/decode.c |  4 +-
 src/backend/utils/misc/guc.c             |  6 +-
 src/include/access/rmgr.h                | 14 ++++
 src/include/access/xlog_internal.h       |  7 ++
 7 files changed, 129 insertions(+), 17 deletions(-)

diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index f8847d5aebf..e04492a9507 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -24,12 +24,19 @@
 #include "commands/dbcommands_xlog.h"
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
+#include "miscadmin.h"
 #include "replication/decode.h"
 #include "replication/message.h"
 #include "replication/origin.h"
 #include "storage/standby.h"
+#include "utils/memutils.h"
 #include "utils/relmapper.h"
 
+typedef struct CustomRmgrEntry {
+	RmgrId rmid;
+	RmgrData *rmgr;
+} CustomRmgrEntry;
+
 /* must be kept in sync with RmgrData definition in xlog_internal.h */
 #define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
 	{ name, redo, desc, identify, startup, cleanup, mask, decode },
@@ -37,3 +44,87 @@
 const RmgrData RmgrTable[RM_MAX_ID + 1] = {
 #include "access/rmgrlist.h"
 };
+
+static CustomRmgrEntry *CustomRmgrTable = NULL;
+static int NumCustomRmgrs = 0;
+
+/*
+ * Register a new custom rmgr.
+ *
+ * Refer to https://wiki.postgresql.org/wiki/ExtensibleRmgr to reserve a
+ * unique RmgrId for your extension, to avoid conflicts. During development,
+ * use RM_EXPERIMENTAL_ID.
+ */
+void
+RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr)
+{
+	if (rmid < RM_CUSTOM_MIN_ID || rmid > RM_CUSTOM_MAX_ID)
+		ereport(PANIC, errmsg("custom rmgr id %d is out of range", rmid));
+
+	if (!process_shared_preload_libraries_in_progress)
+		ereport(ERROR,
+				(errmsg("custom rmgr must be registered while initializing modules in shared_preload_libraries")));
+
+	ereport(LOG,
+			(errmsg("registering custom rmgr \"%s\" with ID %d",
+					rmgr->rm_name, rmid)));
+
+	if (CustomRmgrTable == NULL)
+		CustomRmgrTable = MemoryContextAllocZero(
+			TopMemoryContext, sizeof(CustomRmgrEntry));
+
+	/* check for existing builtin rmgr with the same name */
+	for (int i = 0; i <= RM_MAX_ID; i++)
+	{
+		const RmgrData *existing_rmgr = &RmgrTable[i];
+
+		if (!strcmp(existing_rmgr->rm_name, rmgr->rm_name))
+			ereport(PANIC,
+					(errmsg("custom rmgr \"%s\" has the same name as builtin rmgr",
+							existing_rmgr->rm_name)));
+	}
+
+	/* check for conflicting custom rmgrs already registered */
+	for (int i = 0; i < NumCustomRmgrs; i++)
+	{
+		CustomRmgrEntry entry = CustomRmgrTable[i];
+
+		if (entry.rmid == rmid)
+			ereport(PANIC,
+					(errmsg("custom rmgr ID %d already registered with name \"%s\"",
+							rmid, entry.rmgr->rm_name)));
+
+		if (!strcmp(entry.rmgr->rm_name, rmgr->rm_name))
+			ereport(PANIC,
+					(errmsg("custom rmgr \"%s\" already registered with ID %d",
+							rmgr->rm_name, entry.rmid)));
+	}
+
+	CustomRmgrTable = (CustomRmgrEntry *) repalloc(
+		CustomRmgrTable, sizeof(CustomRmgrEntry) * NumCustomRmgrs + 1);
+
+	CustomRmgrTable[NumCustomRmgrs].rmid = rmid;
+	CustomRmgrTable[NumCustomRmgrs].rmgr = rmgr;
+	NumCustomRmgrs++;
+}
+
+/*
+ * GetCustomRmgr
+ *
+ * This is an O(N) list traversal because the expected size is very small.
+ */
+RmgrData
+GetCustomRmgr(RmgrId rmid)
+{
+	if (rmid < RM_CUSTOM_MIN_ID || rmid > RM_CUSTOM_MAX_ID)
+		ereport(PANIC, errmsg("custom rmgr id %d is out of range", rmid));
+
+	for (int i = 0; i < NumCustomRmgrs; i++)
+	{
+		CustomRmgrEntry entry = CustomRmgrTable[i];
+		if (entry.rmid == rmid)
+			return *entry.rmgr;
+	}
+
+	ereport(PANIC, errmsg("custom rmgr with ID %d not found!", rmid));
+}
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index dfe2a0bcce9..b56c7927194 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -1531,10 +1531,10 @@ checkXLogConsistency(XLogReaderState *record)
 		 * If masking function is defined, mask both the primary and replay
 		 * images
 		 */
-		if (RmgrTable[rmid].rm_mask != NULL)
+		if (GetRmgr(rmid).rm_mask != NULL)
 		{
-			RmgrTable[rmid].rm_mask(replay_image_masked, blkno);
-			RmgrTable[rmid].rm_mask(primary_image_masked, blkno);
+			GetRmgr(rmid).rm_mask(replay_image_masked, blkno);
+			GetRmgr(rmid).rm_mask(primary_image_masked, blkno);
 		}
 
 		/* Time to compare the primary and replay images. */
@@ -7493,8 +7493,8 @@ StartupXLOG(void)
 		/* Initialize resource managers */
 		for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
 		{
-			if (RmgrTable[rmid].rm_startup != NULL)
-				RmgrTable[rmid].rm_startup();
+			if (GetRmgr(rmid).rm_startup != NULL)
+				GetRmgr(rmid).rm_startup();
 		}
 
 		/*
@@ -7716,7 +7716,7 @@ StartupXLOG(void)
 					RecordKnownAssignedTransactionIds(record->xl_xid);
 
 				/* Now apply the WAL record itself */
-				RmgrTable[record->xl_rmid].rm_redo(xlogreader);
+				GetRmgr(record->xl_rmid).rm_redo(xlogreader);
 
 				/*
 				 * After redo, check whether the backup pages associated with
@@ -7826,8 +7826,8 @@ StartupXLOG(void)
 			/* Allow resource managers to do any required cleanup. */
 			for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
 			{
-				if (RmgrTable[rmid].rm_cleanup != NULL)
-					RmgrTable[rmid].rm_cleanup();
+				if (GetRmgr(rmid).rm_cleanup != NULL)
+					GetRmgr(rmid).rm_cleanup();
 			}
 
 			ereport(LOG,
@@ -10739,16 +10739,16 @@ xlog_outdesc(StringInfo buf, XLogReaderState *record)
 	uint8		info = XLogRecGetInfo(record);
 	const char *id;
 
-	appendStringInfoString(buf, RmgrTable[rmid].rm_name);
+	appendStringInfoString(buf, GetRmgr(rmid).rm_name);
 	appendStringInfoChar(buf, '/');
 
-	id = RmgrTable[rmid].rm_identify(info);
+	id = GetRmgr(rmid).rm_identify(info);
 	if (id == NULL)
 		appendStringInfo(buf, "UNKNOWN (%X): ", info & ~XLR_INFO_MASK);
 	else
 		appendStringInfo(buf, "%s: ", id);
 
-	RmgrTable[rmid].rm_desc(buf, record);
+	GetRmgr(rmid).rm_desc(buf, record);
 }
 
 
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 35029cf97d6..612b1b37233 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -738,7 +738,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
 							  (uint32) SizeOfXLogRecord, record->xl_tot_len);
 		return false;
 	}
-	if (record->xl_rmid > RM_MAX_ID)
+	if (record->xl_rmid > RM_MAX_ID && record->xl_rmid < RM_CUSTOM_MIN_ID)
 	{
 		report_invalid_record(state,
 							  "invalid resource manager ID %u at %X/%X",
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 3fb5a92a1a1..52257a06882 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -115,8 +115,8 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 
 	rmid = XLogRecGetRmid(record);
 
-	if (RmgrTable[rmid].rm_decode != NULL)
-		RmgrTable[rmid].rm_decode(ctx, &buf);
+	if (GetRmgr(rmid).rm_decode != NULL)
+		GetRmgr(rmid).rm_decode(ctx, &buf);
 	else
 	{
 		/* just deal with xid, and done */
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index b3fd42e0f18..68971b0f1ea 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -11715,7 +11715,7 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source)
 		if (pg_strcasecmp(tok, "all") == 0)
 		{
 			for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
-				if (RmgrTable[rmid].rm_mask != NULL)
+				if (GetRmgr(rmid).rm_mask != NULL)
 					newwalconsistency[rmid] = true;
 			found = true;
 		}
@@ -11727,8 +11727,8 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source)
 			 */
 			for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
 			{
-				if (pg_strcasecmp(tok, RmgrTable[rmid].rm_name) == 0 &&
-					RmgrTable[rmid].rm_mask != NULL)
+				if (pg_strcasecmp(tok, GetRmgr(rmid).rm_name) == 0 &&
+					GetRmgr(rmid).rm_mask != NULL)
 				{
 					newwalconsistency[rmid] = true;
 					found = true;
diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h
index d9b512630ca..d387530c6f5 100644
--- a/src/include/access/rmgr.h
+++ b/src/include/access/rmgr.h
@@ -31,5 +31,19 @@ typedef enum RmgrIds
 #undef PG_RMGR
 
 #define RM_MAX_ID				(RM_NEXT_ID - 1)
+#define RM_CUSTOM_MIN_ID		128
+#define RM_CUSTOM_MAX_ID		UINT8_MAX
+
+StaticAssertDecl(RM_MAX_ID < RM_CUSTOM_MIN_ID,
+				 "RM_MAX_ID >= RM_CUSTOM_MIN_ID");
+StaticAssertDecl(RM_CUSTOM_MIN_ID < RM_CUSTOM_MAX_ID,
+				 "RM_CUSTOM_MIN_ID >= RM_CUSTOM_MAX_ID");
+
+/*
+ * RmgrId to use for extensions that require an RmgrId, but are still in
+ * development and have not reserved their own unique RmgrId yet. See:
+ * https://wiki.postgresql.org/wiki/ExtensibleRmgr
+ */
+#define RM_EXPERIMENTAL_ID		128
 
 #endif							/* RMGR_H */
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index 849954a8e5a..61f421c98c2 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -321,6 +321,9 @@ typedef struct RmgrData
 
 extern const RmgrData RmgrTable[];
 
+extern RmgrData GetCustomRmgr(RmgrId rmid);
+extern void RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr);
+
 /*
  * Exported to support xlog switching from checkpointer
  */
@@ -338,4 +341,8 @@ extern bool InArchiveRecovery;
 extern bool StandbyMode;
 extern char *recoveryRestoreCommand;
 
+#define GetBuiltinRmgr(rmid) RmgrTable[(rmid)]
+#define GetRmgr(rmid) (((rmid) < RM_CUSTOM_MIN_ID) ? \
+					   GetBuiltinRmgr((rmid)) : GetCustomRmgr((rmid)))
+
 #endif							/* XLOG_INTERNAL_H */
-- 
2.17.1

