Extensible Rmgr for Table AMs

Started by Jeff Davisabout 4 years ago30 messages
#1Jeff Davis
pgsql@j-davis.com
1 attachment(s)

Motivation:

I'm working on a columnar compression AM[0]/messages/by-id/20ee0b0ae6958804a88fe9580157587720faf664.camel@j-davis.com. Currently, it uses generic
xlog, which works for crash recovery and physical replication, but not
logical decoding/replication.

Extensible rmgr would enable the table AM to support its own
redo/decode hooks and WAL format, so that it could support crash
recovery, physical replication, and logical replication.

Background:

I submitted another patch[0]/messages/by-id/20ee0b0ae6958804a88fe9580157587720faf664.camel@j-davis.com to add new logical records, which could be
used to support logical decoding directly, without the need for
extensible rmgr and without any assumptions about the table AM. This
was designed to be easy to use, but inefficient. Amit raised
concerns[1]/messages/by-id/CAA4eK1JVDnbQ80ULdZuhzQkzr_yYhfON-tg=d1U7aWjK_R1ixQ@mail.gmail.com about whether it could meet the needs of zheap. Andres
suggested (off-list) that it would be better to just tackle the
extensible rmgr problem.

The idea for extensible rmgr has been proposed before[3]/messages/by-id/1229541840.4793.79.camel@ebony.2ndQuadrant. The biggest
argument against it seemed to be that there was no complete use
case[4]/messages/by-id/20992.1232667957@sss.pgh.pa.us, so the worry was that something would be left out. Columnar is
complete enough that I think it qualifies as a good use case.

A subsequent proposal[5]/messages/by-id/1266774840.7341.29872.camel@ebony was shot down because of a (potential?) need
for catalog access[6]/messages/by-id/26134.1266776040@sss.pgh.pa.us. The attached patch does not use the catalog;
instead, it relies on table AM authors choosing IDs that don't conflict
with each other. This seems like a reasonable answer, considering that
there will likely be very few table AMs that go far enough to fully
support WAL including decoding.

Are there any other major arguments/objections that I missed?

Proposal:

The attached patch (against v14, so it's easier to test columnar) is
somewhat like a simplified version of [3]/messages/by-id/1229541840.4793.79.camel@ebony.2ndQuadrant combined with refactoring to
make decoding a part of the rmgr.

* adds a new RmgrData method rm_decode
* refactors decode.c to use
the new method
* add a layer of indirection GetRmgr to find an rmgr

* fast path to find builtin rmgr in RmgrTable
* to find a custom
rmgr, traverses list of custom rmgrs that
are currently loaded
(unlikely to ever be more than a few)
* rmgr IDs from 0-127 are
reserved for builtin rmgrs
* rmgr IDs from 128-255 are reserved for
custom rmgrs
* table AM authors need to avoid collisions between
rmgr IDs

I have tested with columnar using a simple WAL format for logical
decoding only, and I'm still using generic xlog for recovery and
physical replication. I haven't tested the redo path, or how easy it
might be to do something like generic xlog.

Questions:

0. Do we want to go this route, or something simpler like my other
proposal, which introduces new logical record types[0]/messages/by-id/20ee0b0ae6958804a88fe9580157587720faf664.camel@j-davis.com?

1. I am allocating the custom rmgr list in TopMemoryContext, and it
only works when loading as a part of shared_preload_libraries. This
avoids the need for shared memory in Simon's patch[3]/messages/by-id/1229541840.4793.79.camel@ebony.2ndQuadrant. Is that the
right thing to do?

2. If we go this route, what do we do with generic xlog? It seems like
a half feature, since it doesn't work with logical decoding.

3. If the custom rmgr throws an error during redo, the server won't
start. Should we have a GUC to turn non-builtin redo into a no-op to
reduce the impact of bugs in the implementation of a custom rmgr?

4. Do we want to encourage index AMs to use this mechanism as well? I
didn't really look into how suitable it is, but at a high level it
seems reasonable.

Regards,
Jeff Davis

[0]: /messages/by-id/20ee0b0ae6958804a88fe9580157587720faf664.camel@j-davis.com
/messages/by-id/20ee0b0ae6958804a88fe9580157587720faf664.camel@j-davis.com
[1]: /messages/by-id/CAA4eK1JVDnbQ80ULdZuhzQkzr_yYhfON-tg=d1U7aWjK_R1ixQ@mail.gmail.com
/messages/by-id/CAA4eK1JVDnbQ80ULdZuhzQkzr_yYhfON-tg=d1U7aWjK_R1ixQ@mail.gmail.com
[2]: https://github.com/citusdata/citus/tree/master/src/backend/columnar
[3]: /messages/by-id/1229541840.4793.79.camel@ebony.2ndQuadrant
[4]: /messages/by-id/20992.1232667957@sss.pgh.pa.us
[5]: /messages/by-id/1266774840.7341.29872.camel@ebony
[6]: /messages/by-id/26134.1266776040@sss.pgh.pa.us

Attachments:

extensible-rmgr.difftext/x-patch; charset=UTF-8; name=extensible-rmgr.diffDownload
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index 58091f6b520..3e4a3c5b675 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -24,15 +24,94 @@
 #include "commands/dbcommands_xlog.h"
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
+#include "replication/decode.h"
 #include "replication/message.h"
 #include "replication/origin.h"
 #include "storage/standby.h"
+#include "utils/memutils.h"
 #include "utils/relmapper.h"
 
+typedef struct CustomRmgrEntry {
+	RmgrId rmid;
+	RmgrData *rmgr;
+} CustomRmgrEntry;
+
 /* must be kept in sync with RmgrData definition in xlog_internal.h */
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
-	{ name, redo, desc, identify, startup, cleanup, mask },
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
+	{ name, redo, desc, identify, startup, cleanup, mask, decode },
 
 const RmgrData RmgrTable[RM_MAX_ID + 1] = {
 #include "access/rmgrlist.h"
 };
+
+static CustomRmgrEntry *CustomRmgrTable = NULL;
+static int NumCustomRmgrs = 0;
+
+/*
+ * Register a new custom rmgr.
+ */
+void
+RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr)
+{
+	elog(LOG, "registering customer rmgr \"%s\" with ID %d",
+		 rmgr->rm_name, rmid);
+
+	if (rmid < RM_CUSTOM_MIN_ID || rmid > RM_CUSTOM_MAX_ID)
+		elog(PANIC, "custom rmgr id %d out of range", rmid);
+
+	if (CustomRmgrTable == NULL)
+		CustomRmgrTable = MemoryContextAllocZero(
+			TopMemoryContext, sizeof(CustomRmgrEntry));
+
+	/* check for existing builtin rmgr with the same name */
+	for (int i = 0; i <= RM_MAX_ID; i++)
+	{
+		const RmgrData *existing_rmgr = &RmgrTable[i];
+
+		if (!strcmp(existing_rmgr->rm_name, rmgr->rm_name))
+			elog(PANIC, "custom rmgr \"%s\" has the same name as builtin rmgr",
+				 existing_rmgr->rm_name);
+	}
+
+	/* check for conflicting custom rmgrs already registered */
+	for (int i = 0; i < NumCustomRmgrs; i++)
+	{
+		CustomRmgrEntry entry = CustomRmgrTable[i];
+
+		if (entry.rmid == rmid)
+			elog(PANIC, "custom rmgr ID %d already registered with name \"%s\"",
+				 rmid, entry.rmgr->rm_name);
+
+		if (!strcmp(entry.rmgr->rm_name, rmgr->rm_name))
+			elog(PANIC, "custom rmgr \"%s\" already registered with ID %d",
+				 rmgr->rm_name, entry.rmid);
+	}
+
+	CustomRmgrTable = (CustomRmgrEntry *) repalloc(
+		CustomRmgrTable, sizeof(CustomRmgrEntry) * NumCustomRmgrs + 1);
+
+	CustomRmgrTable[NumCustomRmgrs].rmid = rmid;
+	CustomRmgrTable[NumCustomRmgrs].rmgr = rmgr;
+	NumCustomRmgrs++;
+}
+
+/*
+ * GetCustomRmgr
+ *
+ * This is an O(N) list traversal because the expected size is very small.
+ */
+RmgrData
+GetCustomRmgr(RmgrId rmid)
+{
+	if (rmid < RM_CUSTOM_MIN_ID || rmid > RM_CUSTOM_MAX_ID)
+		elog(PANIC, "custom rmgr id %d out of range", rmid);
+
+	for (int i = 0; i < NumCustomRmgrs; i++)
+	{
+		CustomRmgrEntry entry = CustomRmgrTable[i];
+		if (entry.rmid == rmid)
+			return *entry.rmgr;
+	}
+
+	elog(PANIC, "custom rmgr with ID %d not found!", rmid);
+}
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 6208e123e5d..07f715f0d9f 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -1505,10 +1505,10 @@ checkXLogConsistency(XLogReaderState *record)
 		 * If masking function is defined, mask both the primary and replay
 		 * images
 		 */
-		if (RmgrTable[rmid].rm_mask != NULL)
+		if (GetRmgr(rmid).rm_mask != NULL)
 		{
-			RmgrTable[rmid].rm_mask(replay_image_masked, blkno);
-			RmgrTable[rmid].rm_mask(primary_image_masked, blkno);
+			GetRmgr(rmid).rm_mask(replay_image_masked, blkno);
+			GetRmgr(rmid).rm_mask(primary_image_masked, blkno);
 		}
 
 		/* Time to compare the primary and replay images. */
@@ -7292,8 +7292,8 @@ StartupXLOG(void)
 		/* Initialize resource managers */
 		for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
 		{
-			if (RmgrTable[rmid].rm_startup != NULL)
-				RmgrTable[rmid].rm_startup();
+			if (GetRmgr(rmid).rm_startup != NULL)
+				GetRmgr(rmid).rm_startup();
 		}
 
 		/*
@@ -7518,7 +7518,7 @@ StartupXLOG(void)
 					RecordKnownAssignedTransactionIds(record->xl_xid);
 
 				/* Now apply the WAL record itself */
-				RmgrTable[record->xl_rmid].rm_redo(xlogreader);
+				GetRmgr(record->xl_rmid).rm_redo(xlogreader);
 
 				/*
 				 * After redo, check whether the backup pages associated with
@@ -7628,8 +7628,8 @@ StartupXLOG(void)
 			/* Allow resource managers to do any required cleanup. */
 			for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
 			{
-				if (RmgrTable[rmid].rm_cleanup != NULL)
-					RmgrTable[rmid].rm_cleanup();
+				if (GetRmgr(rmid).rm_cleanup != NULL)
+					GetRmgr(rmid).rm_cleanup();
 			}
 
 			ereport(LOG,
@@ -10663,16 +10663,16 @@ xlog_outdesc(StringInfo buf, XLogReaderState *record)
 	uint8		info = XLogRecGetInfo(record);
 	const char *id;
 
-	appendStringInfoString(buf, RmgrTable[rmid].rm_name);
+	appendStringInfoString(buf, GetRmgr(rmid).rm_name);
 	appendStringInfoChar(buf, '/');
 
-	id = RmgrTable[rmid].rm_identify(info);
+	id = GetRmgr(rmid).rm_identify(info);
 	if (id == NULL)
 		appendStringInfo(buf, "UNKNOWN (%X): ", info & ~XLR_INFO_MASK);
 	else
 		appendStringInfo(buf, "%s: ", id);
 
-	RmgrTable[rmid].rm_desc(buf, record);
+	GetRmgr(rmid).rm_desc(buf, record);
 }
 
 
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index f01aea6ddad..6e2d40695ac 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -735,7 +735,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
 							  (uint32) SizeOfXLogRecord, record->xl_tot_len);
 		return false;
 	}
-	if (record->xl_rmid > RM_MAX_ID)
+	if (record->xl_rmid > RM_MAX_ID && record->xl_rmid < RM_CUSTOM_MIN_ID)
 	{
 		report_invalid_record(state,
 							  "invalid resource manager ID %u at %X/%X",
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index daf2efb0d83..67e0107b191 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -43,21 +43,6 @@
 #include "replication/snapbuild.h"
 #include "storage/standby.h"
 
-typedef struct XLogRecordBuffer
-{
-	XLogRecPtr	origptr;
-	XLogRecPtr	endptr;
-	XLogReaderState *record;
-} XLogRecordBuffer;
-
-/* RMGR Handlers */
-static void DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-
 /* individual record(group)'s handlers */
 static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -107,6 +92,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 {
 	XLogRecordBuffer buf;
 	TransactionId txid;
+	RmgrId rmid;
 
 	buf.origptr = ctx->reader->ReadRecPtr;
 	buf.endptr = ctx->reader->EndRecPtr;
@@ -127,72 +113,23 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 								 buf.origptr);
 	}
 
-	/* cast so we get a warning when new rmgrs are added */
-	switch ((RmgrId) XLogRecGetRmid(record))
-	{
-			/*
-			 * Rmgrs we care about for logical decoding. Add new rmgrs in
-			 * rmgrlist.h's order.
-			 */
-		case RM_XLOG_ID:
-			DecodeXLogOp(ctx, &buf);
-			break;
-
-		case RM_XACT_ID:
-			DecodeXactOp(ctx, &buf);
-			break;
+	rmid = XLogRecGetRmid(record);
 
-		case RM_STANDBY_ID:
-			DecodeStandbyOp(ctx, &buf);
-			break;
-
-		case RM_HEAP2_ID:
-			DecodeHeap2Op(ctx, &buf);
-			break;
-
-		case RM_HEAP_ID:
-			DecodeHeapOp(ctx, &buf);
-			break;
-
-		case RM_LOGICALMSG_ID:
-			DecodeLogicalMsgOp(ctx, &buf);
-			break;
-
-			/*
-			 * Rmgrs irrelevant for logical decoding; they describe stuff not
-			 * represented in logical decoding. Add new rmgrs in rmgrlist.h's
-			 * order.
-			 */
-		case RM_SMGR_ID:
-		case RM_CLOG_ID:
-		case RM_DBASE_ID:
-		case RM_TBLSPC_ID:
-		case RM_MULTIXACT_ID:
-		case RM_RELMAP_ID:
-		case RM_BTREE_ID:
-		case RM_HASH_ID:
-		case RM_GIN_ID:
-		case RM_GIST_ID:
-		case RM_SEQ_ID:
-		case RM_SPGIST_ID:
-		case RM_BRIN_ID:
-		case RM_COMMIT_TS_ID:
-		case RM_REPLORIGIN_ID:
-		case RM_GENERIC_ID:
-			/* just deal with xid, and done */
-			ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
-									buf.origptr);
-			break;
-		case RM_NEXT_ID:
-			elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record));
+	if (GetRmgr(rmid).rm_decode != NULL)
+		GetRmgr(rmid).rm_decode(ctx, &buf);
+	else
+	{
+		/* just deal with xid, and done */
+		ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
+								buf.origptr);
 	}
 }
 
 /*
  * Handle rmgr XLOG_ID records for DecodeRecordIntoReorderBuffer().
  */
-static void
-DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	SnapBuild  *builder = ctx->snapshot_builder;
 	uint8		info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
@@ -234,8 +171,8 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 /*
  * Handle rmgr XACT_ID records for DecodeRecordIntoReorderBuffer().
  */
-static void
-DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	SnapBuild  *builder = ctx->snapshot_builder;
 	ReorderBuffer *reorder = ctx->reorder;
@@ -392,8 +329,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 /*
  * Handle rmgr STANDBY_ID records for DecodeRecordIntoReorderBuffer().
  */
-static void
-DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	SnapBuild  *builder = ctx->snapshot_builder;
 	XLogReaderState *r = buf->record;
@@ -438,8 +375,8 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 /*
  * Handle rmgr HEAP2_ID records for DecodeRecordIntoReorderBuffer().
  */
-static void
-DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	uint8		info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
 	TransactionId xid = XLogRecGetXid(buf->record);
@@ -498,8 +435,8 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 /*
  * Handle rmgr HEAP_ID records for DecodeRecordIntoReorderBuffer().
  */
-static void
-DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	uint8		info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
 	TransactionId xid = XLogRecGetXid(buf->record);
@@ -620,8 +557,8 @@ FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
 /*
  * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
  */
-static void
-DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	SnapBuild  *builder = ctx->snapshot_builder;
 	XLogReaderState *r = buf->record;
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 6652a60ec31..c7b640befb8 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -11599,7 +11599,7 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source)
 		if (pg_strcasecmp(tok, "all") == 0)
 		{
 			for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
-				if (RmgrTable[rmid].rm_mask != NULL)
+				if (GetRmgr(rmid).rm_mask != NULL)
 					newwalconsistency[rmid] = true;
 			found = true;
 		}
@@ -11611,8 +11611,8 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source)
 			 */
 			for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
 			{
-				if (pg_strcasecmp(tok, RmgrTable[rmid].rm_name) == 0 &&
-					RmgrTable[rmid].rm_mask != NULL)
+				if (pg_strcasecmp(tok, GetRmgr(rmid).rm_name) == 0 &&
+					GetRmgr(rmid).rm_mask != NULL)
 				{
 					newwalconsistency[rmid] = true;
 					found = true;
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 59ebac7d6aa..9fc1729d84c 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -28,7 +28,7 @@
  * RmgrNames is an array of resource manager names, to make error messages
  * a bit nicer.
  */
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
   name,
 
 static const char *RmgrNames[RM_MAX_ID + 1] = {
diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c
index 852d8ca4b1c..6a4ebd1310b 100644
--- a/src/bin/pg_waldump/rmgrdesc.c
+++ b/src/bin/pg_waldump/rmgrdesc.c
@@ -32,7 +32,7 @@
 #include "storage/standbydefs.h"
 #include "utils/relmapper.h"
 
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
 	{ name, desc, identify},
 
 const RmgrDescData RmgrDescTable[RM_MAX_ID + 1] = {
diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h
index c9b5c56a4c6..e433ec7ecfd 100644
--- a/src/include/access/rmgr.h
+++ b/src/include/access/rmgr.h
@@ -19,7 +19,7 @@ typedef uint8 RmgrId;
  * Note: RM_MAX_ID must fit in RmgrId; widening that type will affect the XLOG
  * file format.
  */
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
 	symname,
 
 typedef enum RmgrIds
@@ -31,5 +31,7 @@ typedef enum RmgrIds
 #undef PG_RMGR
 
 #define RM_MAX_ID				(RM_NEXT_ID - 1)
+#define RM_CUSTOM_MIN_ID		128
+#define RM_CUSTOM_MAX_ID		UINT8_MAX
 
 #endif							/* RMGR_H */
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index f582cf535f6..b1ffa2728b5 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -25,25 +25,25 @@
  */
 
 /* symbol name, textual name, redo, desc, identify, startup, cleanup */
-PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL)
-PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL)
-PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL)
-PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL)
-PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL)
-PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL)
-PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL)
-PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL)
-PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL)
-PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask)
-PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask)
-PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog_startup, btree_xlog_cleanup, btree_mask)
-PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask)
-PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask)
-PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask)
-PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask)
-PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask)
-PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask)
-PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL)
-PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL)
-PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask)
-PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL)
+PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL, xlog_decode)
+PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL, xact_decode)
+PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL, standby_decode)
+PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask, heap2_decode)
+PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask, heap_decode)
+PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog_startup, btree_xlog_cleanup, btree_mask, NULL)
+PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask, NULL)
+PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask, NULL)
+PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask, NULL)
+PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, NULL)
+PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask, NULL)
+PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask, NULL)
+PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL)
+PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode)
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index dcf41e9257c..ee7422097c6 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -287,6 +287,9 @@ typedef enum
 	RECOVERY_TARGET_ACTION_SHUTDOWN
 }			RecoveryTargetAction;
 
+struct LogicalDecodingContext;
+struct XLogRecordBuffer;
+
 /*
  * Method table for resource managers.
  *
@@ -312,10 +315,15 @@ typedef struct RmgrData
 	void		(*rm_startup) (void);
 	void		(*rm_cleanup) (void);
 	void		(*rm_mask) (char *pagedata, BlockNumber blkno);
+	void		(*rm_decode) (struct LogicalDecodingContext *ctx,
+							  struct XLogRecordBuffer *buf);
 } RmgrData;
 
 extern const RmgrData RmgrTable[];
 
+extern RmgrData GetCustomRmgr(RmgrId rmid);
+extern void RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr);
+
 /*
  * Exported to support xlog switching from checkpointer
  */
@@ -333,4 +341,8 @@ extern bool InArchiveRecovery;
 extern bool StandbyMode;
 extern char *recoveryRestoreCommand;
 
+#define GetBuiltinRmgr(rmid) RmgrTable[(rmid)]
+#define GetRmgr(rmid) ((rmid < RM_CUSTOM_MIN_ID) ? \
+					   GetBuiltinRmgr(rmid) : GetCustomRmgr(rmid))
+
 #endif							/* XLOG_INTERNAL_H */
diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h
index 69918080bb5..9cc4783c3ba 100644
--- a/src/include/replication/decode.h
+++ b/src/include/replication/decode.h
@@ -14,7 +14,21 @@
 #include "replication/logical.h"
 #include "replication/reorderbuffer.h"
 
-void		LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
+typedef struct XLogRecordBuffer
+{
+	XLogRecPtr	origptr;
+	XLogRecPtr	endptr;
+	XLogReaderState *record;
+} XLogRecordBuffer;
+
+extern void xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+
+extern void	LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
 										 XLogReaderState *record);
 
 #endif
#2Jeff Davis
pgsql@j-davis.com
In reply to: Jeff Davis (#1)
1 attachment(s)
Re: Extensible Rmgr for Table AMs

On Mon, 2021-11-08 at 15:36 -0800, Jeff Davis wrote:

The attached patch (against v14, so it's easier to test columnar) is
somewhat like a simplified version of [3] combined with refactoring
to
make decoding a part of the rmgr.

I created a wiki page here:
https://wiki.postgresql.org/wiki/ExtensibleRmgr

To coordinate reservation of RmgrIds, to avoid conflicts. I don't
expect it to be a practical problem given how much work it takes to
create a new table AM that needs full WAL support, but might as well
have some transparency on how to choose a new RmgrId.

I also updated the patch to point to the wiki page in the comments, and
added in a new RM_EXPERIMENTAL_ID that can be used while an extension
is still in development. Hopefully this will prevent people reserving
lots of RmgrIds for extensions that never get released.

Regards,
Jeff Davis

Attachments:

extensible-rmgr-pg14-v2.difftext/x-patch; charset=UTF-8; name=extensible-rmgr-pg14-v2.diffDownload
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index 58091f6b520..354f1033bf7 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -24,15 +24,102 @@
 #include "commands/dbcommands_xlog.h"
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
+#include "miscadmin.h"
+#include "replication/decode.h"
 #include "replication/message.h"
 #include "replication/origin.h"
 #include "storage/standby.h"
+#include "utils/memutils.h"
 #include "utils/relmapper.h"
 
+typedef struct CustomRmgrEntry {
+	RmgrId rmid;
+	RmgrData *rmgr;
+} CustomRmgrEntry;
+
 /* must be kept in sync with RmgrData definition in xlog_internal.h */
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
-	{ name, redo, desc, identify, startup, cleanup, mask },
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
+	{ name, redo, desc, identify, startup, cleanup, mask, decode },
 
 const RmgrData RmgrTable[RM_MAX_ID + 1] = {
 #include "access/rmgrlist.h"
 };
+
+static CustomRmgrEntry *CustomRmgrTable = NULL;
+static int NumCustomRmgrs = 0;
+
+/*
+ * Register a new custom rmgr.
+ *
+ * Refer to https://wiki.postgresql.org/wiki/ExtensibleRmgr to reserve a
+ * unique RmgrId for your extension, to avoid conflicts. During development,
+ * use RM_EXPERIMENTAL_ID.
+ */
+void
+RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr)
+{
+	if (!process_shared_preload_libraries_in_progress)
+		elog(ERROR, "custom rmgr must be registered while initializing extensions in shared_preload_libraries");
+
+	elog(LOG, "registering customer rmgr \"%s\" with ID %d",
+		 rmgr->rm_name, rmid);
+
+	if (rmid < RM_CUSTOM_MIN_ID || rmid > RM_CUSTOM_MAX_ID)
+		elog(PANIC, "custom rmgr id %d out of range", rmid);
+
+	if (CustomRmgrTable == NULL)
+		CustomRmgrTable = MemoryContextAllocZero(
+			TopMemoryContext, sizeof(CustomRmgrEntry));
+
+	/* check for existing builtin rmgr with the same name */
+	for (int i = 0; i <= RM_MAX_ID; i++)
+	{
+		const RmgrData *existing_rmgr = &RmgrTable[i];
+
+		if (!strcmp(existing_rmgr->rm_name, rmgr->rm_name))
+			elog(PANIC, "custom rmgr \"%s\" has the same name as builtin rmgr",
+				 existing_rmgr->rm_name);
+	}
+
+	/* check for conflicting custom rmgrs already registered */
+	for (int i = 0; i < NumCustomRmgrs; i++)
+	{
+		CustomRmgrEntry entry = CustomRmgrTable[i];
+
+		if (entry.rmid == rmid)
+			elog(PANIC, "custom rmgr ID %d already registered with name \"%s\"",
+				 rmid, entry.rmgr->rm_name);
+
+		if (!strcmp(entry.rmgr->rm_name, rmgr->rm_name))
+			elog(PANIC, "custom rmgr \"%s\" already registered with ID %d",
+				 rmgr->rm_name, entry.rmid);
+	}
+
+	CustomRmgrTable = (CustomRmgrEntry *) repalloc(
+		CustomRmgrTable, sizeof(CustomRmgrEntry) * NumCustomRmgrs + 1);
+
+	CustomRmgrTable[NumCustomRmgrs].rmid = rmid;
+	CustomRmgrTable[NumCustomRmgrs].rmgr = rmgr;
+	NumCustomRmgrs++;
+}
+
+/*
+ * GetCustomRmgr
+ *
+ * This is an O(N) list traversal because the expected size is very small.
+ */
+RmgrData
+GetCustomRmgr(RmgrId rmid)
+{
+	if (rmid < RM_CUSTOM_MIN_ID || rmid > RM_CUSTOM_MAX_ID)
+		elog(PANIC, "custom rmgr id %d out of range", rmid);
+
+	for (int i = 0; i < NumCustomRmgrs; i++)
+	{
+		CustomRmgrEntry entry = CustomRmgrTable[i];
+		if (entry.rmid == rmid)
+			return *entry.rmgr;
+	}
+
+	elog(PANIC, "custom rmgr with ID %d not found!", rmid);
+}
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 6208e123e5d..07f715f0d9f 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -1505,10 +1505,10 @@ checkXLogConsistency(XLogReaderState *record)
 		 * If masking function is defined, mask both the primary and replay
 		 * images
 		 */
-		if (RmgrTable[rmid].rm_mask != NULL)
+		if (GetRmgr(rmid).rm_mask != NULL)
 		{
-			RmgrTable[rmid].rm_mask(replay_image_masked, blkno);
-			RmgrTable[rmid].rm_mask(primary_image_masked, blkno);
+			GetRmgr(rmid).rm_mask(replay_image_masked, blkno);
+			GetRmgr(rmid).rm_mask(primary_image_masked, blkno);
 		}
 
 		/* Time to compare the primary and replay images. */
@@ -7292,8 +7292,8 @@ StartupXLOG(void)
 		/* Initialize resource managers */
 		for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
 		{
-			if (RmgrTable[rmid].rm_startup != NULL)
-				RmgrTable[rmid].rm_startup();
+			if (GetRmgr(rmid).rm_startup != NULL)
+				GetRmgr(rmid).rm_startup();
 		}
 
 		/*
@@ -7518,7 +7518,7 @@ StartupXLOG(void)
 					RecordKnownAssignedTransactionIds(record->xl_xid);
 
 				/* Now apply the WAL record itself */
-				RmgrTable[record->xl_rmid].rm_redo(xlogreader);
+				GetRmgr(record->xl_rmid).rm_redo(xlogreader);
 
 				/*
 				 * After redo, check whether the backup pages associated with
@@ -7628,8 +7628,8 @@ StartupXLOG(void)
 			/* Allow resource managers to do any required cleanup. */
 			for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
 			{
-				if (RmgrTable[rmid].rm_cleanup != NULL)
-					RmgrTable[rmid].rm_cleanup();
+				if (GetRmgr(rmid).rm_cleanup != NULL)
+					GetRmgr(rmid).rm_cleanup();
 			}
 
 			ereport(LOG,
@@ -10663,16 +10663,16 @@ xlog_outdesc(StringInfo buf, XLogReaderState *record)
 	uint8		info = XLogRecGetInfo(record);
 	const char *id;
 
-	appendStringInfoString(buf, RmgrTable[rmid].rm_name);
+	appendStringInfoString(buf, GetRmgr(rmid).rm_name);
 	appendStringInfoChar(buf, '/');
 
-	id = RmgrTable[rmid].rm_identify(info);
+	id = GetRmgr(rmid).rm_identify(info);
 	if (id == NULL)
 		appendStringInfo(buf, "UNKNOWN (%X): ", info & ~XLR_INFO_MASK);
 	else
 		appendStringInfo(buf, "%s: ", id);
 
-	RmgrTable[rmid].rm_desc(buf, record);
+	GetRmgr(rmid).rm_desc(buf, record);
 }
 
 
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index f01aea6ddad..6e2d40695ac 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -735,7 +735,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
 							  (uint32) SizeOfXLogRecord, record->xl_tot_len);
 		return false;
 	}
-	if (record->xl_rmid > RM_MAX_ID)
+	if (record->xl_rmid > RM_MAX_ID && record->xl_rmid < RM_CUSTOM_MIN_ID)
 	{
 		report_invalid_record(state,
 							  "invalid resource manager ID %u at %X/%X",
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index daf2efb0d83..67e0107b191 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -43,21 +43,6 @@
 #include "replication/snapbuild.h"
 #include "storage/standby.h"
 
-typedef struct XLogRecordBuffer
-{
-	XLogRecPtr	origptr;
-	XLogRecPtr	endptr;
-	XLogReaderState *record;
-} XLogRecordBuffer;
-
-/* RMGR Handlers */
-static void DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-
 /* individual record(group)'s handlers */
 static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -107,6 +92,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 {
 	XLogRecordBuffer buf;
 	TransactionId txid;
+	RmgrId rmid;
 
 	buf.origptr = ctx->reader->ReadRecPtr;
 	buf.endptr = ctx->reader->EndRecPtr;
@@ -127,72 +113,23 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 								 buf.origptr);
 	}
 
-	/* cast so we get a warning when new rmgrs are added */
-	switch ((RmgrId) XLogRecGetRmid(record))
-	{
-			/*
-			 * Rmgrs we care about for logical decoding. Add new rmgrs in
-			 * rmgrlist.h's order.
-			 */
-		case RM_XLOG_ID:
-			DecodeXLogOp(ctx, &buf);
-			break;
-
-		case RM_XACT_ID:
-			DecodeXactOp(ctx, &buf);
-			break;
+	rmid = XLogRecGetRmid(record);
 
-		case RM_STANDBY_ID:
-			DecodeStandbyOp(ctx, &buf);
-			break;
-
-		case RM_HEAP2_ID:
-			DecodeHeap2Op(ctx, &buf);
-			break;
-
-		case RM_HEAP_ID:
-			DecodeHeapOp(ctx, &buf);
-			break;
-
-		case RM_LOGICALMSG_ID:
-			DecodeLogicalMsgOp(ctx, &buf);
-			break;
-
-			/*
-			 * Rmgrs irrelevant for logical decoding; they describe stuff not
-			 * represented in logical decoding. Add new rmgrs in rmgrlist.h's
-			 * order.
-			 */
-		case RM_SMGR_ID:
-		case RM_CLOG_ID:
-		case RM_DBASE_ID:
-		case RM_TBLSPC_ID:
-		case RM_MULTIXACT_ID:
-		case RM_RELMAP_ID:
-		case RM_BTREE_ID:
-		case RM_HASH_ID:
-		case RM_GIN_ID:
-		case RM_GIST_ID:
-		case RM_SEQ_ID:
-		case RM_SPGIST_ID:
-		case RM_BRIN_ID:
-		case RM_COMMIT_TS_ID:
-		case RM_REPLORIGIN_ID:
-		case RM_GENERIC_ID:
-			/* just deal with xid, and done */
-			ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
-									buf.origptr);
-			break;
-		case RM_NEXT_ID:
-			elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record));
+	if (GetRmgr(rmid).rm_decode != NULL)
+		GetRmgr(rmid).rm_decode(ctx, &buf);
+	else
+	{
+		/* just deal with xid, and done */
+		ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
+								buf.origptr);
 	}
 }
 
 /*
  * Handle rmgr XLOG_ID records for DecodeRecordIntoReorderBuffer().
  */
-static void
-DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	SnapBuild  *builder = ctx->snapshot_builder;
 	uint8		info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
@@ -234,8 +171,8 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 /*
  * Handle rmgr XACT_ID records for DecodeRecordIntoReorderBuffer().
  */
-static void
-DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	SnapBuild  *builder = ctx->snapshot_builder;
 	ReorderBuffer *reorder = ctx->reorder;
@@ -392,8 +329,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 /*
  * Handle rmgr STANDBY_ID records for DecodeRecordIntoReorderBuffer().
  */
-static void
-DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	SnapBuild  *builder = ctx->snapshot_builder;
 	XLogReaderState *r = buf->record;
@@ -438,8 +375,8 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 /*
  * Handle rmgr HEAP2_ID records for DecodeRecordIntoReorderBuffer().
  */
-static void
-DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	uint8		info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
 	TransactionId xid = XLogRecGetXid(buf->record);
@@ -498,8 +435,8 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 /*
  * Handle rmgr HEAP_ID records for DecodeRecordIntoReorderBuffer().
  */
-static void
-DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	uint8		info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
 	TransactionId xid = XLogRecGetXid(buf->record);
@@ -620,8 +557,8 @@ FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
 /*
  * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
  */
-static void
-DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	SnapBuild  *builder = ctx->snapshot_builder;
 	XLogReaderState *r = buf->record;
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 6652a60ec31..c7b640befb8 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -11599,7 +11599,7 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source)
 		if (pg_strcasecmp(tok, "all") == 0)
 		{
 			for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
-				if (RmgrTable[rmid].rm_mask != NULL)
+				if (GetRmgr(rmid).rm_mask != NULL)
 					newwalconsistency[rmid] = true;
 			found = true;
 		}
@@ -11611,8 +11611,8 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source)
 			 */
 			for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
 			{
-				if (pg_strcasecmp(tok, RmgrTable[rmid].rm_name) == 0 &&
-					RmgrTable[rmid].rm_mask != NULL)
+				if (pg_strcasecmp(tok, GetRmgr(rmid).rm_name) == 0 &&
+					GetRmgr(rmid).rm_mask != NULL)
 				{
 					newwalconsistency[rmid] = true;
 					found = true;
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 59ebac7d6aa..9fc1729d84c 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -28,7 +28,7 @@
  * RmgrNames is an array of resource manager names, to make error messages
  * a bit nicer.
  */
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
   name,
 
 static const char *RmgrNames[RM_MAX_ID + 1] = {
diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c
index 852d8ca4b1c..6a4ebd1310b 100644
--- a/src/bin/pg_waldump/rmgrdesc.c
+++ b/src/bin/pg_waldump/rmgrdesc.c
@@ -32,7 +32,7 @@
 #include "storage/standbydefs.h"
 #include "utils/relmapper.h"
 
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
 	{ name, desc, identify},
 
 const RmgrDescData RmgrDescTable[RM_MAX_ID + 1] = {
diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h
index c9b5c56a4c6..9ad790a5cd0 100644
--- a/src/include/access/rmgr.h
+++ b/src/include/access/rmgr.h
@@ -19,7 +19,7 @@ typedef uint8 RmgrId;
  * Note: RM_MAX_ID must fit in RmgrId; widening that type will affect the XLOG
  * file format.
  */
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
 	symname,
 
 typedef enum RmgrIds
@@ -31,5 +31,14 @@ typedef enum RmgrIds
 #undef PG_RMGR
 
 #define RM_MAX_ID				(RM_NEXT_ID - 1)
+#define RM_CUSTOM_MIN_ID		128
+#define RM_CUSTOM_MAX_ID		UINT8_MAX
+
+/*
+ * RmgrId to use for extensions that require an RmgrId, but are still in
+ * development and have not reserved their own unique RmgrId yet. See:
+ * https://wiki.postgresql.org/wiki/ExtensibleRmgr
+ */
+#define RM_EXPERIMENTAL_ID		128
 
 #endif							/* RMGR_H */
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index f582cf535f6..b1ffa2728b5 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -25,25 +25,25 @@
  */
 
 /* symbol name, textual name, redo, desc, identify, startup, cleanup */
-PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL)
-PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL)
-PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL)
-PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL)
-PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL)
-PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL)
-PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL)
-PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL)
-PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL)
-PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask)
-PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask)
-PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog_startup, btree_xlog_cleanup, btree_mask)
-PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask)
-PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask)
-PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask)
-PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask)
-PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask)
-PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask)
-PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL)
-PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL)
-PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask)
-PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL)
+PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL, xlog_decode)
+PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL, xact_decode)
+PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL, standby_decode)
+PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask, heap2_decode)
+PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask, heap_decode)
+PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog_startup, btree_xlog_cleanup, btree_mask, NULL)
+PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask, NULL)
+PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask, NULL)
+PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask, NULL)
+PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, NULL)
+PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask, NULL)
+PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask, NULL)
+PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL)
+PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode)
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index dcf41e9257c..ee7422097c6 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -287,6 +287,9 @@ typedef enum
 	RECOVERY_TARGET_ACTION_SHUTDOWN
 }			RecoveryTargetAction;
 
+struct LogicalDecodingContext;
+struct XLogRecordBuffer;
+
 /*
  * Method table for resource managers.
  *
@@ -312,10 +315,15 @@ typedef struct RmgrData
 	void		(*rm_startup) (void);
 	void		(*rm_cleanup) (void);
 	void		(*rm_mask) (char *pagedata, BlockNumber blkno);
+	void		(*rm_decode) (struct LogicalDecodingContext *ctx,
+							  struct XLogRecordBuffer *buf);
 } RmgrData;
 
 extern const RmgrData RmgrTable[];
 
+extern RmgrData GetCustomRmgr(RmgrId rmid);
+extern void RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr);
+
 /*
  * Exported to support xlog switching from checkpointer
  */
@@ -333,4 +341,8 @@ extern bool InArchiveRecovery;
 extern bool StandbyMode;
 extern char *recoveryRestoreCommand;
 
+#define GetBuiltinRmgr(rmid) RmgrTable[(rmid)]
+#define GetRmgr(rmid) ((rmid < RM_CUSTOM_MIN_ID) ? \
+					   GetBuiltinRmgr(rmid) : GetCustomRmgr(rmid))
+
 #endif							/* XLOG_INTERNAL_H */
diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h
index 69918080bb5..9cc4783c3ba 100644
--- a/src/include/replication/decode.h
+++ b/src/include/replication/decode.h
@@ -14,7 +14,21 @@
 #include "replication/logical.h"
 #include "replication/reorderbuffer.h"
 
-void		LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
+typedef struct XLogRecordBuffer
+{
+	XLogRecPtr	origptr;
+	XLogRecPtr	endptr;
+	XLogReaderState *record;
+} XLogRecordBuffer;
+
+extern void xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+
+extern void	LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
 										 XLogReaderState *record);
 
 #endif
#3Robert Haas
robertmhaas@gmail.com
In reply to: Jeff Davis (#1)
Re: Extensible Rmgr for Table AMs

On Mon, Nov 8, 2021 at 6:36 PM Jeff Davis <pgsql@j-davis.com> wrote:

Extensible rmgr would enable the table AM to support its own
redo/decode hooks and WAL format, so that it could support crash
recovery, physical replication, and logical replication.

Without taking a position on your implementation, which I have not
studied, I like this idea in concept and I think it's an important
goal.

Are there any other major arguments/objections that I missed?

ISTR some discussion of the fact that uninstalling the extension that
uses this facility, or failing to install it on your standby, will
lead to an unusable database. Personally, I don't see that as a big
problem: we should just document that if you choose to use an
extension like this, then (1) it needs to be installed on all standbys
and (2) if you ever want to get rid of it, you need to stop using it,
drop all the objects created with it, and then wait until all the WAL
previously generated by that extension is gone not only from pg_wal
but from any archived WAL files or backups that you intend to use with
that cluster before you actually nuke it. Users who don't want to
abide by those restrictions need not install such extensions. Users
who don't read the documentation might end up sad, but it doesn't seem
particularly likely, and it's hardly the only part of the
documentation that users shouldn't ignore.

--
Robert Haas
EDB: http://www.enterprisedb.com

#4Jeff Davis
pgsql@j-davis.com
In reply to: Jeff Davis (#1)
3 attachment(s)
Re: Extensible Rmgr for Table AMs

On Mon, 2021-11-08 at 15:36 -0800, Jeff Davis wrote:

The attached patch (against v14, so it's easier to test columnar) is
somewhat like a simplified version of [3] combined with refactoring
to
make decoding a part of the rmgr.

New patches attached (v3). Essentially the content as v2, but split
into 3 patches and rebased.

Review on patch 0001 would be helpful. It makes decoding just another
method of rmgr, which makes a lot of sense to me from a code
organization standpoint regardless of the other patches. Is there any
reason not to do that?

The other patches then make rmgr extensible, which in turn makes
decoding extensible and solves the logical replication problem for
custom table AMs. The most obvious way to make rmgr extensible would be
to expand the rmgr table, but I was concerned about making that dynamic
(right now the structure is entirely constant and perhaps that's
important for some optimizations?). So, at the cost of complexity I
made a separate, dynamic rmgr table to hold the custom entries.

The custom rmgr API would probably be marked "experimental" for a
while, and I don't expect lots of people to use it given the challenges
of a production-quality table AM. But it's still important, because
without it a table AM has no chance to participate in logical
replication.

Regards,
Jeff Davis

Attachments:

0001-Make-logical-decoding-a-part-of-the-rmgr.patchtext/x-patch; charset=UTF-8; name=0001-Make-logical-decoding-a-part-of-the-rmgr.patchDownload
From b6b6c93513d42223ac2bdcb1cd1522fd845d0af7 Mon Sep 17 00:00:00 2001
From: Jeff Davis <jeff@j-davis.com>
Date: Sat, 6 Nov 2021 12:58:04 -0700
Subject: [PATCH 1/3] Make logical decoding a part of the rmgr.

Add a new rmgr method, rm_decode, and use that rather than a switch
statement.

In preparation for extensible rmgr.

Discussion: https://postgr.es/m/ed1fb2e22d15d3563ae0eb610f7b61bb15999c0a.camel%40j-davis.com
---
 src/backend/access/transam/rmgr.c        |   5 +-
 src/backend/replication/logical/decode.c | 105 +++++------------------
 src/bin/pg_rewind/parsexlog.c            |   2 +-
 src/bin/pg_waldump/rmgrdesc.c            |   2 +-
 src/include/access/rmgr.h                |   2 +-
 src/include/access/rmgrlist.h            |  44 +++++-----
 src/include/access/xlog_internal.h       |   5 ++
 src/include/replication/decode.h         |  16 +++-
 8 files changed, 69 insertions(+), 112 deletions(-)

diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index 58091f6b520..f8847d5aebf 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -24,14 +24,15 @@
 #include "commands/dbcommands_xlog.h"
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
+#include "replication/decode.h"
 #include "replication/message.h"
 #include "replication/origin.h"
 #include "storage/standby.h"
 #include "utils/relmapper.h"
 
 /* must be kept in sync with RmgrData definition in xlog_internal.h */
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
-	{ name, redo, desc, identify, startup, cleanup, mask },
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
+	{ name, redo, desc, identify, startup, cleanup, mask, decode },
 
 const RmgrData RmgrTable[RM_MAX_ID + 1] = {
 #include "access/rmgrlist.h"
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 1d22208c1ad..9b450c9f90e 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -43,21 +43,6 @@
 #include "replication/snapbuild.h"
 #include "storage/standby.h"
 
-typedef struct XLogRecordBuffer
-{
-	XLogRecPtr	origptr;
-	XLogRecPtr	endptr;
-	XLogReaderState *record;
-} XLogRecordBuffer;
-
-/* RMGR Handlers */
-static void DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-
 /* individual record(group)'s handlers */
 static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -107,6 +92,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 {
 	XLogRecordBuffer buf;
 	TransactionId txid;
+	RmgrId rmid;
 
 	buf.origptr = ctx->reader->ReadRecPtr;
 	buf.endptr = ctx->reader->EndRecPtr;
@@ -127,72 +113,23 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 								 buf.origptr);
 	}
 
-	/* cast so we get a warning when new rmgrs are added */
-	switch ((RmgrId) XLogRecGetRmid(record))
-	{
-			/*
-			 * Rmgrs we care about for logical decoding. Add new rmgrs in
-			 * rmgrlist.h's order.
-			 */
-		case RM_XLOG_ID:
-			DecodeXLogOp(ctx, &buf);
-			break;
-
-		case RM_XACT_ID:
-			DecodeXactOp(ctx, &buf);
-			break;
+	rmid = XLogRecGetRmid(record);
 
-		case RM_STANDBY_ID:
-			DecodeStandbyOp(ctx, &buf);
-			break;
-
-		case RM_HEAP2_ID:
-			DecodeHeap2Op(ctx, &buf);
-			break;
-
-		case RM_HEAP_ID:
-			DecodeHeapOp(ctx, &buf);
-			break;
-
-		case RM_LOGICALMSG_ID:
-			DecodeLogicalMsgOp(ctx, &buf);
-			break;
-
-			/*
-			 * Rmgrs irrelevant for logical decoding; they describe stuff not
-			 * represented in logical decoding. Add new rmgrs in rmgrlist.h's
-			 * order.
-			 */
-		case RM_SMGR_ID:
-		case RM_CLOG_ID:
-		case RM_DBASE_ID:
-		case RM_TBLSPC_ID:
-		case RM_MULTIXACT_ID:
-		case RM_RELMAP_ID:
-		case RM_BTREE_ID:
-		case RM_HASH_ID:
-		case RM_GIN_ID:
-		case RM_GIST_ID:
-		case RM_SEQ_ID:
-		case RM_SPGIST_ID:
-		case RM_BRIN_ID:
-		case RM_COMMIT_TS_ID:
-		case RM_REPLORIGIN_ID:
-		case RM_GENERIC_ID:
-			/* just deal with xid, and done */
-			ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
-									buf.origptr);
-			break;
-		case RM_NEXT_ID:
-			elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record));
+	if (RmgrTable[rmid].rm_decode != NULL)
+		RmgrTable[rmid].rm_decode(ctx, &buf);
+	else
+	{
+		/* just deal with xid, and done */
+		ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
+								buf.origptr);
 	}
 }
 
 /*
  * Handle rmgr XLOG_ID records for DecodeRecordIntoReorderBuffer().
  */
-static void
-DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	SnapBuild  *builder = ctx->snapshot_builder;
 	uint8		info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
@@ -234,8 +171,8 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 /*
  * Handle rmgr XACT_ID records for DecodeRecordIntoReorderBuffer().
  */
-static void
-DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	SnapBuild  *builder = ctx->snapshot_builder;
 	ReorderBuffer *reorder = ctx->reorder;
@@ -391,8 +328,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 /*
  * Handle rmgr STANDBY_ID records for DecodeRecordIntoReorderBuffer().
  */
-static void
-DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	SnapBuild  *builder = ctx->snapshot_builder;
 	XLogReaderState *r = buf->record;
@@ -437,8 +374,8 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 /*
  * Handle rmgr HEAP2_ID records for DecodeRecordIntoReorderBuffer().
  */
-static void
-DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	uint8		info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
 	TransactionId xid = XLogRecGetXid(buf->record);
@@ -497,8 +434,8 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 /*
  * Handle rmgr HEAP_ID records for DecodeRecordIntoReorderBuffer().
  */
-static void
-DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	uint8		info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
 	TransactionId xid = XLogRecGetXid(buf->record);
@@ -619,8 +556,8 @@ FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
 /*
  * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
  */
-static void
-DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	SnapBuild  *builder = ctx->snapshot_builder;
 	XLogReaderState *r = buf->record;
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 91437974584..f6cfee4ce8c 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -28,7 +28,7 @@
  * RmgrNames is an array of resource manager names, to make error messages
  * a bit nicer.
  */
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
   name,
 
 static const char *RmgrNames[RM_MAX_ID + 1] = {
diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c
index 852d8ca4b1c..6a4ebd1310b 100644
--- a/src/bin/pg_waldump/rmgrdesc.c
+++ b/src/bin/pg_waldump/rmgrdesc.c
@@ -32,7 +32,7 @@
 #include "storage/standbydefs.h"
 #include "utils/relmapper.h"
 
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
 	{ name, desc, identify},
 
 const RmgrDescData RmgrDescTable[RM_MAX_ID + 1] = {
diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h
index c9b5c56a4c6..d9b512630ca 100644
--- a/src/include/access/rmgr.h
+++ b/src/include/access/rmgr.h
@@ -19,7 +19,7 @@ typedef uint8 RmgrId;
  * Note: RM_MAX_ID must fit in RmgrId; widening that type will affect the XLOG
  * file format.
  */
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
 	symname,
 
 typedef enum RmgrIds
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index ed751aaf039..9a74721c97c 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -25,25 +25,25 @@
  */
 
 /* symbol name, textual name, redo, desc, identify, startup, cleanup */
-PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL)
-PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL)
-PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL)
-PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL)
-PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL)
-PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL)
-PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL)
-PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL)
-PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL)
-PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask)
-PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask)
-PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog_startup, btree_xlog_cleanup, btree_mask)
-PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask)
-PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask)
-PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask)
-PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask)
-PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask)
-PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask)
-PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL)
-PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL)
-PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask)
-PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL)
+PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL, xlog_decode)
+PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL, xact_decode)
+PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL, standby_decode)
+PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask, heap2_decode)
+PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask, heap_decode)
+PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog_startup, btree_xlog_cleanup, btree_mask, NULL)
+PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask, NULL)
+PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask, NULL)
+PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask, NULL)
+PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, NULL)
+PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask, NULL)
+PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask, NULL)
+PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL)
+PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode)
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index e27fca0cc0e..849954a8e5a 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -287,6 +287,9 @@ typedef enum
 	RECOVERY_TARGET_ACTION_SHUTDOWN
 }			RecoveryTargetAction;
 
+struct LogicalDecodingContext;
+struct XLogRecordBuffer;
+
 /*
  * Method table for resource managers.
  *
@@ -312,6 +315,8 @@ typedef struct RmgrData
 	void		(*rm_startup) (void);
 	void		(*rm_cleanup) (void);
 	void		(*rm_mask) (char *pagedata, BlockNumber blkno);
+	void		(*rm_decode) (struct LogicalDecodingContext *ctx,
+							  struct XLogRecordBuffer *buf);
 } RmgrData;
 
 extern const RmgrData RmgrTable[];
diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h
index 1db73f35549..a33c2a718a7 100644
--- a/src/include/replication/decode.h
+++ b/src/include/replication/decode.h
@@ -14,7 +14,21 @@
 #include "replication/logical.h"
 #include "replication/reorderbuffer.h"
 
-void		LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
+typedef struct XLogRecordBuffer
+{
+	XLogRecPtr	origptr;
+	XLogRecPtr	endptr;
+	XLogReaderState *record;
+} XLogRecordBuffer;
+
+extern void xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+
+extern void	LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
 										 XLogReaderState *record);
 
 #endif
-- 
2.17.1

0002-Add-macro-GetRmgr-in-preparation-for-extensible-rmgr.patchtext/x-patch; charset=UTF-8; name=0002-Add-macro-GetRmgr-in-preparation-for-extensible-rmgr.patchDownload
From cb723a9f77c848784e952a6eeeeaf2d7f939d312 Mon Sep 17 00:00:00 2001
From: Jeff Davis <jeff@j-davis.com>
Date: Sat, 6 Nov 2021 13:01:38 -0700
Subject: [PATCH 2/3] Add macro GetRmgr in preparation for extensible rmgr.

---
 src/backend/access/transam/xlog.c        | 22 +++++++++++-----------
 src/backend/access/transam/xlogreader.c  |  2 +-
 src/backend/replication/logical/decode.c |  4 ++--
 src/backend/utils/misc/guc.c             |  6 +++---
 src/include/access/xlog_internal.h       |  2 ++
 5 files changed, 19 insertions(+), 17 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index c9d4cbf3ff5..84a0d485e99 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -1531,10 +1531,10 @@ checkXLogConsistency(XLogReaderState *record)
 		 * If masking function is defined, mask both the primary and replay
 		 * images
 		 */
-		if (RmgrTable[rmid].rm_mask != NULL)
+		if (GetRmgr(rmid).rm_mask != NULL)
 		{
-			RmgrTable[rmid].rm_mask(replay_image_masked, blkno);
-			RmgrTable[rmid].rm_mask(primary_image_masked, blkno);
+			GetRmgr(rmid).rm_mask(replay_image_masked, blkno);
+			GetRmgr(rmid).rm_mask(primary_image_masked, blkno);
 		}
 
 		/* Time to compare the primary and replay images. */
@@ -7494,8 +7494,8 @@ StartupXLOG(void)
 		/* Initialize resource managers */
 		for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
 		{
-			if (RmgrTable[rmid].rm_startup != NULL)
-				RmgrTable[rmid].rm_startup();
+			if (GetRmgr(rmid).rm_startup != NULL)
+				GetRmgr(rmid).rm_startup();
 		}
 
 		/*
@@ -7717,7 +7717,7 @@ StartupXLOG(void)
 					RecordKnownAssignedTransactionIds(record->xl_xid);
 
 				/* Now apply the WAL record itself */
-				RmgrTable[record->xl_rmid].rm_redo(xlogreader);
+				GetRmgr(record->xl_rmid).rm_redo(xlogreader);
 
 				/*
 				 * After redo, check whether the backup pages associated with
@@ -7827,8 +7827,8 @@ StartupXLOG(void)
 			/* Allow resource managers to do any required cleanup. */
 			for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
 			{
-				if (RmgrTable[rmid].rm_cleanup != NULL)
-					RmgrTable[rmid].rm_cleanup();
+				if (GetRmgr(rmid).rm_cleanup != NULL)
+					GetRmgr(rmid).rm_cleanup();
 			}
 
 			ereport(LOG,
@@ -10740,16 +10740,16 @@ xlog_outdesc(StringInfo buf, XLogReaderState *record)
 	uint8		info = XLogRecGetInfo(record);
 	const char *id;
 
-	appendStringInfoString(buf, RmgrTable[rmid].rm_name);
+	appendStringInfoString(buf, GetRmgr(rmid).rm_name);
 	appendStringInfoChar(buf, '/');
 
-	id = RmgrTable[rmid].rm_identify(info);
+	id = GetRmgr(rmid).rm_identify(info);
 	if (id == NULL)
 		appendStringInfo(buf, "UNKNOWN (%X): ", info & ~XLR_INFO_MASK);
 	else
 		appendStringInfo(buf, "%s: ", id);
 
-	RmgrTable[rmid].rm_desc(buf, record);
+	GetRmgr(rmid).rm_desc(buf, record);
 }
 
 
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 35029cf97d6..612b1b37233 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -738,7 +738,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
 							  (uint32) SizeOfXLogRecord, record->xl_tot_len);
 		return false;
 	}
-	if (record->xl_rmid > RM_MAX_ID)
+	if (record->xl_rmid > RM_MAX_ID && record->xl_rmid < RM_CUSTOM_MIN_ID)
 	{
 		report_invalid_record(state,
 							  "invalid resource manager ID %u at %X/%X",
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 9b450c9f90e..4093b650635 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -115,8 +115,8 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 
 	rmid = XLogRecGetRmid(record);
 
-	if (RmgrTable[rmid].rm_decode != NULL)
-		RmgrTable[rmid].rm_decode(ctx, &buf);
+	if (GetRmgr(rmid).rm_decode != NULL)
+		GetRmgr(rmid).rm_decode(ctx, &buf);
 	else
 	{
 		/* just deal with xid, and done */
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 4c94f09c645..f8bb0fabc11 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -11676,7 +11676,7 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source)
 		if (pg_strcasecmp(tok, "all") == 0)
 		{
 			for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
-				if (RmgrTable[rmid].rm_mask != NULL)
+				if (GetRmgr(rmid).rm_mask != NULL)
 					newwalconsistency[rmid] = true;
 			found = true;
 		}
@@ -11688,8 +11688,8 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source)
 			 */
 			for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
 			{
-				if (pg_strcasecmp(tok, RmgrTable[rmid].rm_name) == 0 &&
-					RmgrTable[rmid].rm_mask != NULL)
+				if (pg_strcasecmp(tok, GetRmgr(rmid).rm_name) == 0 &&
+					GetRmgr(rmid).rm_mask != NULL)
 				{
 					newwalconsistency[rmid] = true;
 					found = true;
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index 849954a8e5a..9d4e122e946 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -338,4 +338,6 @@ extern bool InArchiveRecovery;
 extern bool StandbyMode;
 extern char *recoveryRestoreCommand;
 
+#define GetRmgr(rmid) RmgrTable[(rmid)]
+
 #endif							/* XLOG_INTERNAL_H */
-- 
2.17.1

0003-Custom-Rmgr.patchtext/x-patch; charset=UTF-8; name=0003-Custom-Rmgr.patchDownload
From 22c2f6895039ce39b0aca300a8051eb51a006071 Mon Sep 17 00:00:00 2001
From: Jeff Davis <jeff@j-davis.com>
Date: Sat, 6 Nov 2021 14:39:50 -0700
Subject: [PATCH 3/3] Custom Rmgr

---
 src/backend/access/transam/rmgr.c  | 86 ++++++++++++++++++++++++++++++
 src/include/access/rmgr.h          |  9 ++++
 src/include/access/xlog_internal.h |  7 ++-
 3 files changed, 101 insertions(+), 1 deletion(-)

diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index f8847d5aebf..354f1033bf7 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -24,12 +24,19 @@
 #include "commands/dbcommands_xlog.h"
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
+#include "miscadmin.h"
 #include "replication/decode.h"
 #include "replication/message.h"
 #include "replication/origin.h"
 #include "storage/standby.h"
+#include "utils/memutils.h"
 #include "utils/relmapper.h"
 
+typedef struct CustomRmgrEntry {
+	RmgrId rmid;
+	RmgrData *rmgr;
+} CustomRmgrEntry;
+
 /* must be kept in sync with RmgrData definition in xlog_internal.h */
 #define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
 	{ name, redo, desc, identify, startup, cleanup, mask, decode },
@@ -37,3 +44,82 @@
 const RmgrData RmgrTable[RM_MAX_ID + 1] = {
 #include "access/rmgrlist.h"
 };
+
+static CustomRmgrEntry *CustomRmgrTable = NULL;
+static int NumCustomRmgrs = 0;
+
+/*
+ * Register a new custom rmgr.
+ *
+ * Refer to https://wiki.postgresql.org/wiki/ExtensibleRmgr to reserve a
+ * unique RmgrId for your extension, to avoid conflicts. During development,
+ * use RM_EXPERIMENTAL_ID.
+ */
+void
+RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr)
+{
+	if (!process_shared_preload_libraries_in_progress)
+		elog(ERROR, "custom rmgr must be registered while initializing extensions in shared_preload_libraries");
+
+	elog(LOG, "registering customer rmgr \"%s\" with ID %d",
+		 rmgr->rm_name, rmid);
+
+	if (rmid < RM_CUSTOM_MIN_ID || rmid > RM_CUSTOM_MAX_ID)
+		elog(PANIC, "custom rmgr id %d out of range", rmid);
+
+	if (CustomRmgrTable == NULL)
+		CustomRmgrTable = MemoryContextAllocZero(
+			TopMemoryContext, sizeof(CustomRmgrEntry));
+
+	/* check for existing builtin rmgr with the same name */
+	for (int i = 0; i <= RM_MAX_ID; i++)
+	{
+		const RmgrData *existing_rmgr = &RmgrTable[i];
+
+		if (!strcmp(existing_rmgr->rm_name, rmgr->rm_name))
+			elog(PANIC, "custom rmgr \"%s\" has the same name as builtin rmgr",
+				 existing_rmgr->rm_name);
+	}
+
+	/* check for conflicting custom rmgrs already registered */
+	for (int i = 0; i < NumCustomRmgrs; i++)
+	{
+		CustomRmgrEntry entry = CustomRmgrTable[i];
+
+		if (entry.rmid == rmid)
+			elog(PANIC, "custom rmgr ID %d already registered with name \"%s\"",
+				 rmid, entry.rmgr->rm_name);
+
+		if (!strcmp(entry.rmgr->rm_name, rmgr->rm_name))
+			elog(PANIC, "custom rmgr \"%s\" already registered with ID %d",
+				 rmgr->rm_name, entry.rmid);
+	}
+
+	CustomRmgrTable = (CustomRmgrEntry *) repalloc(
+		CustomRmgrTable, sizeof(CustomRmgrEntry) * NumCustomRmgrs + 1);
+
+	CustomRmgrTable[NumCustomRmgrs].rmid = rmid;
+	CustomRmgrTable[NumCustomRmgrs].rmgr = rmgr;
+	NumCustomRmgrs++;
+}
+
+/*
+ * GetCustomRmgr
+ *
+ * This is an O(N) list traversal because the expected size is very small.
+ */
+RmgrData
+GetCustomRmgr(RmgrId rmid)
+{
+	if (rmid < RM_CUSTOM_MIN_ID || rmid > RM_CUSTOM_MAX_ID)
+		elog(PANIC, "custom rmgr id %d out of range", rmid);
+
+	for (int i = 0; i < NumCustomRmgrs; i++)
+	{
+		CustomRmgrEntry entry = CustomRmgrTable[i];
+		if (entry.rmid == rmid)
+			return *entry.rmgr;
+	}
+
+	elog(PANIC, "custom rmgr with ID %d not found!", rmid);
+}
diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h
index d9b512630ca..9ad790a5cd0 100644
--- a/src/include/access/rmgr.h
+++ b/src/include/access/rmgr.h
@@ -31,5 +31,14 @@ typedef enum RmgrIds
 #undef PG_RMGR
 
 #define RM_MAX_ID				(RM_NEXT_ID - 1)
+#define RM_CUSTOM_MIN_ID		128
+#define RM_CUSTOM_MAX_ID		UINT8_MAX
+
+/*
+ * RmgrId to use for extensions that require an RmgrId, but are still in
+ * development and have not reserved their own unique RmgrId yet. See:
+ * https://wiki.postgresql.org/wiki/ExtensibleRmgr
+ */
+#define RM_EXPERIMENTAL_ID		128
 
 #endif							/* RMGR_H */
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index 9d4e122e946..7cb1f6c3893 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -321,6 +321,9 @@ typedef struct RmgrData
 
 extern const RmgrData RmgrTable[];
 
+extern RmgrData GetCustomRmgr(RmgrId rmid);
+extern void RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr);
+
 /*
  * Exported to support xlog switching from checkpointer
  */
@@ -338,6 +341,8 @@ extern bool InArchiveRecovery;
 extern bool StandbyMode;
 extern char *recoveryRestoreCommand;
 
-#define GetRmgr(rmid) RmgrTable[(rmid)]
+#define GetBuiltinRmgr(rmid) RmgrTable[(rmid)]
+#define GetRmgr(rmid) ((rmid < RM_CUSTOM_MIN_ID) ? \
+					   GetBuiltinRmgr(rmid) : GetCustomRmgr(rmid))
 
 #endif							/* XLOG_INTERNAL_H */
-- 
2.17.1

#5Julien Rouhaud
rjuju123@gmail.com
In reply to: Jeff Davis (#4)
Re: Extensible Rmgr for Table AMs

Hi,

On Mon, Jan 17, 2022 at 12:42:06AM -0800, Jeff Davis wrote:

On Mon, 2021-11-08 at 15:36 -0800, Jeff Davis wrote:

The attached patch (against v14, so it's easier to test columnar) is
somewhat like a simplified version of [3] combined with refactoring
to
make decoding a part of the rmgr.

New patches attached (v3). Essentially the content as v2, but split
into 3 patches and rebased.

Review on patch 0001 would be helpful. It makes decoding just another
method of rmgr, which makes a lot of sense to me from a code
organization standpoint regardless of the other patches. Is there any
reason not to do that?

I think that it's a clean and sensible approach, so +1 for me.

There's a bit of 0003 present in 002:

diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 35029cf97d..612b1b3723 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -738,7 +738,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
                              (uint32) SizeOfXLogRecord, record->xl_tot_len);
        return false;
    }
-   if (record->xl_rmid > RM_MAX_ID)
+   if (record->xl_rmid > RM_MAX_ID && record->xl_rmid < RM_CUSTOM_MIN_ID)
    {

The other patches then make rmgr extensible, which in turn makes
decoding extensible and solves the logical replication problem for
custom table AMs. The most obvious way to make rmgr extensible would be
to expand the rmgr table, but I was concerned about making that dynamic
(right now the structure is entirely constant and perhaps that's
important for some optimizations?). So, at the cost of complexity I
made a separate, dynamic rmgr table to hold the custom entries.

I'm not sure about performance overhead, but allowing extension to access the
main table directly seems a bit scary. We definitely accepted some overhead
with the various extensible support, and this new GetRmgr() doesn't look like
it's going to cost a lot for the builtin case, especially compared to the cost
of any of the code associated with the rmgr.

Also, to answer your initial email I think it's a better way to go compared to
your previous approach, given the limitations and performance of the generic
xlog infrastructure, and hopefully index AMs should be able to rely on this
too.

A few random comments on 0003:

 #define RM_MAX_ID              (RM_NEXT_ID - 1)
+#define RM_CUSTOM_MIN_ID       128
+#define RM_CUSTOM_MAX_ID       UINT8_MAX

It would be a good idea to add a StaticAssertStmt here to make sure that
there's no overlap in the ranges.

+
+/*
+ * RmgrId to use for extensions that require an RmgrId, but are still in
+ * development and have not reserved their own unique RmgrId yet. See:
+ * https://wiki.postgresql.org/wiki/ExtensibleRmgr
+ */
+#define RM_EXPERIMENTAL_ID     128

I'm a bit dubious about the whole "register your ID in the Wiki" approach. It
might not be a problem since there probably won't be hundreds of users, and I
don't have any better suggestion since it has to be consistent across nodes.

+   elog(LOG, "registering customer rmgr \"%s\" with ID %d",
+        rmgr->rm_name, rmid);

Should it be a DEBUG message instead? Also s/customer/custom/

+RmgrData
+GetCustomRmgr(RmgrId rmid)
+{
+   if (rmid < RM_CUSTOM_MIN_ID || rmid > RM_CUSTOM_MAX_ID)
+       elog(PANIC, "custom rmgr id %d out of range", rmid);

Should this be an assert?

+#define GetBuiltinRmgr(rmid) RmgrTable[(rmid)]
+#define GetRmgr(rmid) ((rmid < RM_CUSTOM_MIN_ID) ? \
+                      GetBuiltinRmgr(rmid) : GetCustomRmgr(rmid))

rmid should be protected in the macro.

The custom rmgr API would probably be marked "experimental" for a
while, and I don't expect lots of people to use it given the challenges
of a production-quality table AM. But it's still important, because
without it a table AM has no chance to participate in logical
replication.

How you plan to mark it experimental?

#6Jeff Davis
pgsql@j-davis.com
In reply to: Julien Rouhaud (#5)
1 attachment(s)
Re: Extensible Rmgr for Table AMs

On Tue, 2022-01-18 at 17:53 +0800, Julien Rouhaud wrote:

Review on patch 0001 would be helpful. It makes decoding just
another
method of rmgr, which makes a lot of sense to me from a code
organization standpoint regardless of the other patches. Is there
any
reason not to do that?

I think that it's a clean and sensible approach, so +1 for me.

Thank you, committed 0001. Other patches not committed yet.

There's a bit of 0003 present in 002:

I just squashed 0002 and 0003 together. Not large enough to keep
separate.

A few random comments on 0003:

#define RM_MAX_ID              (RM_NEXT_ID - 1)
+#define RM_CUSTOM_MIN_ID       128
+#define RM_CUSTOM_MAX_ID       UINT8_MAX

It would be a good idea to add a StaticAssertStmt here to make sure
that
there's no overlap in the ranges.

Done.

+
+/*
+ * RmgrId to use for extensions that require an RmgrId, but are
still in
+ * development and have not reserved their own unique RmgrId yet.
See:
+ * https://wiki.postgresql.org/wiki/ExtensibleRmgr
+ */
+#define RM_EXPERIMENTAL_ID     128

I'm a bit dubious about the whole "register your ID in the Wiki"
approach. It
might not be a problem since there probably won't be hundreds of
users, and I
don't have any better suggestion since it has to be consistent across
nodes.

Agree, but I don't see a better approach, either.

I do some sanity checking, which should catch collisions when they
happen.

+   elog(LOG, "registering customer rmgr \"%s\" with ID %d",
+        rmgr->rm_name, rmid);

Should it be a DEBUG message instead? Also s/customer/custom/

It seems like a fairly important thing to have in the log. Only a
couple extensions will ever encounter this message, and only at server
start.

Typo fixed.

+RmgrData
+GetCustomRmgr(RmgrId rmid)
+{
+   if (rmid < RM_CUSTOM_MIN_ID || rmid > RM_CUSTOM_MAX_ID)
+       elog(PANIC, "custom rmgr id %d out of range", rmid);

Should this be an assert?

If we make it an Assert, then it won't be caught in production builds.

+#define GetBuiltinRmgr(rmid) RmgrTable[(rmid)]
+#define GetRmgr(rmid) ((rmid < RM_CUSTOM_MIN_ID) ? \
+                      GetBuiltinRmgr(rmid) : GetCustomRmgr(rmid))

rmid should be protected in the macro.

Done.

How you plan to mark it experimental?

I suppose it doesn't need to be marked explicitly -- there are other
APIs that change. For instance, the ProcessUtility_hook changed, and
that's used much more widely.

As long as we generally agree that some kind of custom rmgrs are the
way to go, if we change the API or implementation around from version
to version I can easily update my table access method.

Regards,
Jeff Davis

Attachments:

v4-0001-Extensible-rmgr.patchtext/x-patch; charset=UTF-8; name=v4-0001-Extensible-rmgr.patchDownload
From 62799e4546aa0a15b2a09f6b14900d785d64f42f Mon Sep 17 00:00:00 2001
From: Jeff Davis <jeff@j-davis.com>
Date: Sat, 6 Nov 2021 13:01:38 -0700
Subject: [PATCH] Extensible rmgr.

Allow extensions to specify a new custom rmgr, which allows
specialized WAL. This is meant to be used by a custom Table Access
Method, which would not otherwise be able to offer logical
decoding/replication. It may also be used by new Index Access Methods.

Prior to this commit, only Generic WAL was available, which offers
support for recovery and physical replication but not logical
replication.

Reviewed-by: Julien Rouhaud
Discussion: https://postgr.es/m/ed1fb2e22d15d3563ae0eb610f7b61bb15999c0a.camel%40j-davis.com
---
 src/backend/access/transam/rmgr.c        | 91 ++++++++++++++++++++++++
 src/backend/access/transam/xlog.c        | 22 +++---
 src/backend/access/transam/xlogreader.c  |  2 +-
 src/backend/replication/logical/decode.c |  4 +-
 src/backend/utils/misc/guc.c             |  6 +-
 src/include/access/rmgr.h                | 14 ++++
 src/include/access/xlog_internal.h       |  7 ++
 7 files changed, 129 insertions(+), 17 deletions(-)

diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index f8847d5aebf..e04492a9507 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -24,12 +24,19 @@
 #include "commands/dbcommands_xlog.h"
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
+#include "miscadmin.h"
 #include "replication/decode.h"
 #include "replication/message.h"
 #include "replication/origin.h"
 #include "storage/standby.h"
+#include "utils/memutils.h"
 #include "utils/relmapper.h"
 
+typedef struct CustomRmgrEntry {
+	RmgrId rmid;
+	RmgrData *rmgr;
+} CustomRmgrEntry;
+
 /* must be kept in sync with RmgrData definition in xlog_internal.h */
 #define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
 	{ name, redo, desc, identify, startup, cleanup, mask, decode },
@@ -37,3 +44,87 @@
 const RmgrData RmgrTable[RM_MAX_ID + 1] = {
 #include "access/rmgrlist.h"
 };
+
+static CustomRmgrEntry *CustomRmgrTable = NULL;
+static int NumCustomRmgrs = 0;
+
+/*
+ * Register a new custom rmgr.
+ *
+ * Refer to https://wiki.postgresql.org/wiki/ExtensibleRmgr to reserve a
+ * unique RmgrId for your extension, to avoid conflicts. During development,
+ * use RM_EXPERIMENTAL_ID.
+ */
+void
+RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr)
+{
+	if (rmid < RM_CUSTOM_MIN_ID || rmid > RM_CUSTOM_MAX_ID)
+		ereport(PANIC, errmsg("custom rmgr id %d is out of range", rmid));
+
+	if (!process_shared_preload_libraries_in_progress)
+		ereport(ERROR,
+				(errmsg("custom rmgr must be registered while initializing modules in shared_preload_libraries")));
+
+	ereport(LOG,
+			(errmsg("registering custom rmgr \"%s\" with ID %d",
+					rmgr->rm_name, rmid)));
+
+	if (CustomRmgrTable == NULL)
+		CustomRmgrTable = MemoryContextAllocZero(
+			TopMemoryContext, sizeof(CustomRmgrEntry));
+
+	/* check for existing builtin rmgr with the same name */
+	for (int i = 0; i <= RM_MAX_ID; i++)
+	{
+		const RmgrData *existing_rmgr = &RmgrTable[i];
+
+		if (!strcmp(existing_rmgr->rm_name, rmgr->rm_name))
+			ereport(PANIC,
+					(errmsg("custom rmgr \"%s\" has the same name as builtin rmgr",
+							existing_rmgr->rm_name)));
+	}
+
+	/* check for conflicting custom rmgrs already registered */
+	for (int i = 0; i < NumCustomRmgrs; i++)
+	{
+		CustomRmgrEntry entry = CustomRmgrTable[i];
+
+		if (entry.rmid == rmid)
+			ereport(PANIC,
+					(errmsg("custom rmgr ID %d already registered with name \"%s\"",
+							rmid, entry.rmgr->rm_name)));
+
+		if (!strcmp(entry.rmgr->rm_name, rmgr->rm_name))
+			ereport(PANIC,
+					(errmsg("custom rmgr \"%s\" already registered with ID %d",
+							rmgr->rm_name, entry.rmid)));
+	}
+
+	CustomRmgrTable = (CustomRmgrEntry *) repalloc(
+		CustomRmgrTable, sizeof(CustomRmgrEntry) * NumCustomRmgrs + 1);
+
+	CustomRmgrTable[NumCustomRmgrs].rmid = rmid;
+	CustomRmgrTable[NumCustomRmgrs].rmgr = rmgr;
+	NumCustomRmgrs++;
+}
+
+/*
+ * GetCustomRmgr
+ *
+ * This is an O(N) list traversal because the expected size is very small.
+ */
+RmgrData
+GetCustomRmgr(RmgrId rmid)
+{
+	if (rmid < RM_CUSTOM_MIN_ID || rmid > RM_CUSTOM_MAX_ID)
+		ereport(PANIC, errmsg("custom rmgr id %d is out of range", rmid));
+
+	for (int i = 0; i < NumCustomRmgrs; i++)
+	{
+		CustomRmgrEntry entry = CustomRmgrTable[i];
+		if (entry.rmid == rmid)
+			return *entry.rmgr;
+	}
+
+	ereport(PANIC, errmsg("custom rmgr with ID %d not found!", rmid));
+}
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index dfe2a0bcce9..b56c7927194 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -1531,10 +1531,10 @@ checkXLogConsistency(XLogReaderState *record)
 		 * If masking function is defined, mask both the primary and replay
 		 * images
 		 */
-		if (RmgrTable[rmid].rm_mask != NULL)
+		if (GetRmgr(rmid).rm_mask != NULL)
 		{
-			RmgrTable[rmid].rm_mask(replay_image_masked, blkno);
-			RmgrTable[rmid].rm_mask(primary_image_masked, blkno);
+			GetRmgr(rmid).rm_mask(replay_image_masked, blkno);
+			GetRmgr(rmid).rm_mask(primary_image_masked, blkno);
 		}
 
 		/* Time to compare the primary and replay images. */
@@ -7493,8 +7493,8 @@ StartupXLOG(void)
 		/* Initialize resource managers */
 		for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
 		{
-			if (RmgrTable[rmid].rm_startup != NULL)
-				RmgrTable[rmid].rm_startup();
+			if (GetRmgr(rmid).rm_startup != NULL)
+				GetRmgr(rmid).rm_startup();
 		}
 
 		/*
@@ -7716,7 +7716,7 @@ StartupXLOG(void)
 					RecordKnownAssignedTransactionIds(record->xl_xid);
 
 				/* Now apply the WAL record itself */
-				RmgrTable[record->xl_rmid].rm_redo(xlogreader);
+				GetRmgr(record->xl_rmid).rm_redo(xlogreader);
 
 				/*
 				 * After redo, check whether the backup pages associated with
@@ -7826,8 +7826,8 @@ StartupXLOG(void)
 			/* Allow resource managers to do any required cleanup. */
 			for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
 			{
-				if (RmgrTable[rmid].rm_cleanup != NULL)
-					RmgrTable[rmid].rm_cleanup();
+				if (GetRmgr(rmid).rm_cleanup != NULL)
+					GetRmgr(rmid).rm_cleanup();
 			}
 
 			ereport(LOG,
@@ -10739,16 +10739,16 @@ xlog_outdesc(StringInfo buf, XLogReaderState *record)
 	uint8		info = XLogRecGetInfo(record);
 	const char *id;
 
-	appendStringInfoString(buf, RmgrTable[rmid].rm_name);
+	appendStringInfoString(buf, GetRmgr(rmid).rm_name);
 	appendStringInfoChar(buf, '/');
 
-	id = RmgrTable[rmid].rm_identify(info);
+	id = GetRmgr(rmid).rm_identify(info);
 	if (id == NULL)
 		appendStringInfo(buf, "UNKNOWN (%X): ", info & ~XLR_INFO_MASK);
 	else
 		appendStringInfo(buf, "%s: ", id);
 
-	RmgrTable[rmid].rm_desc(buf, record);
+	GetRmgr(rmid).rm_desc(buf, record);
 }
 
 
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 35029cf97d6..612b1b37233 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -738,7 +738,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
 							  (uint32) SizeOfXLogRecord, record->xl_tot_len);
 		return false;
 	}
-	if (record->xl_rmid > RM_MAX_ID)
+	if (record->xl_rmid > RM_MAX_ID && record->xl_rmid < RM_CUSTOM_MIN_ID)
 	{
 		report_invalid_record(state,
 							  "invalid resource manager ID %u at %X/%X",
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 3fb5a92a1a1..52257a06882 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -115,8 +115,8 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 
 	rmid = XLogRecGetRmid(record);
 
-	if (RmgrTable[rmid].rm_decode != NULL)
-		RmgrTable[rmid].rm_decode(ctx, &buf);
+	if (GetRmgr(rmid).rm_decode != NULL)
+		GetRmgr(rmid).rm_decode(ctx, &buf);
 	else
 	{
 		/* just deal with xid, and done */
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index b3fd42e0f18..68971b0f1ea 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -11715,7 +11715,7 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source)
 		if (pg_strcasecmp(tok, "all") == 0)
 		{
 			for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
-				if (RmgrTable[rmid].rm_mask != NULL)
+				if (GetRmgr(rmid).rm_mask != NULL)
 					newwalconsistency[rmid] = true;
 			found = true;
 		}
@@ -11727,8 +11727,8 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source)
 			 */
 			for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
 			{
-				if (pg_strcasecmp(tok, RmgrTable[rmid].rm_name) == 0 &&
-					RmgrTable[rmid].rm_mask != NULL)
+				if (pg_strcasecmp(tok, GetRmgr(rmid).rm_name) == 0 &&
+					GetRmgr(rmid).rm_mask != NULL)
 				{
 					newwalconsistency[rmid] = true;
 					found = true;
diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h
index d9b512630ca..d387530c6f5 100644
--- a/src/include/access/rmgr.h
+++ b/src/include/access/rmgr.h
@@ -31,5 +31,19 @@ typedef enum RmgrIds
 #undef PG_RMGR
 
 #define RM_MAX_ID				(RM_NEXT_ID - 1)
+#define RM_CUSTOM_MIN_ID		128
+#define RM_CUSTOM_MAX_ID		UINT8_MAX
+
+StaticAssertDecl(RM_MAX_ID < RM_CUSTOM_MIN_ID,
+				 "RM_MAX_ID >= RM_CUSTOM_MIN_ID");
+StaticAssertDecl(RM_CUSTOM_MIN_ID < RM_CUSTOM_MAX_ID,
+				 "RM_CUSTOM_MIN_ID >= RM_CUSTOM_MAX_ID");
+
+/*
+ * RmgrId to use for extensions that require an RmgrId, but are still in
+ * development and have not reserved their own unique RmgrId yet. See:
+ * https://wiki.postgresql.org/wiki/ExtensibleRmgr
+ */
+#define RM_EXPERIMENTAL_ID		128
 
 #endif							/* RMGR_H */
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index 849954a8e5a..61f421c98c2 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -321,6 +321,9 @@ typedef struct RmgrData
 
 extern const RmgrData RmgrTable[];
 
+extern RmgrData GetCustomRmgr(RmgrId rmid);
+extern void RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr);
+
 /*
  * Exported to support xlog switching from checkpointer
  */
@@ -338,4 +341,8 @@ extern bool InArchiveRecovery;
 extern bool StandbyMode;
 extern char *recoveryRestoreCommand;
 
+#define GetBuiltinRmgr(rmid) RmgrTable[(rmid)]
+#define GetRmgr(rmid) (((rmid) < RM_CUSTOM_MIN_ID) ? \
+					   GetBuiltinRmgr((rmid)) : GetCustomRmgr((rmid)))
+
 #endif							/* XLOG_INTERNAL_H */
-- 
2.17.1

#7Julien Rouhaud
rjuju123@gmail.com
In reply to: Jeff Davis (#6)
Re: Extensible Rmgr for Table AMs

Hi,

On Mon, Jan 31, 2022 at 06:36:59PM -0800, Jeff Davis wrote:

On Tue, 2022-01-18 at 17:53 +0800, Julien Rouhaud wrote:

There's a bit of 0003 present in 002:

I just squashed 0002 and 0003 together. Not large enough to keep
separate.

Agreed.

+   elog(LOG, "registering customer rmgr \"%s\" with ID %d",
+        rmgr->rm_name, rmid);

Should it be a DEBUG message instead? Also s/customer/custom/

It seems like a fairly important thing to have in the log. Only a
couple extensions will ever encounter this message, and only at server
start.

Ok.

+RmgrData
+GetCustomRmgr(RmgrId rmid)
+{
+   if (rmid < RM_CUSTOM_MIN_ID || rmid > RM_CUSTOM_MAX_ID)
+       elog(PANIC, "custom rmgr id %d out of range", rmid);

Should this be an assert?

If we make it an Assert, then it won't be caught in production builds.

Sure, but it seems something so fundamental that it should get done right
during the development phase rather than spending cycles in optimized builds to
check for it.
But even if that happened it would get caught by the final "not found" PANIC
anyway, and for end users I don't think that getting this error instead would
make much difference.

How you plan to mark it experimental?

I suppose it doesn't need to be marked explicitly -- there are other
APIs that change. For instance, the ProcessUtility_hook changed, and
that's used much more widely.
As long as we generally agree that some kind of custom rmgrs are the
way to go, if we change the API or implementation around from version
to version I can easily update my table access method.

Agreed, API breaks happen often and that's not really a problem for third-party
extensions, especially if that comes with hard compile failure.

Other than that the patch looks good to me, as you said we just need a decision
on whether custom rmgrs are wanted or not.

#8Julien Rouhaud
rjuju123@gmail.com
In reply to: Julien Rouhaud (#7)
Re: Extensible Rmgr for Table AMs

On Tue, Feb 01, 2022 at 12:39:38PM +0800, Julien Rouhaud wrote:

Other than that the patch looks good to me, as you said we just need a decision
on whether custom rmgrs are wanted or not.

One last thing, did you do some benchmark with a couple custom rmgr to see how
much the O(n) access is showing up in profiles?

#9Jeff Davis
pgsql@j-davis.com
In reply to: Julien Rouhaud (#8)
Re: Extensible Rmgr for Table AMs

On Tue, 2022-02-01 at 20:45 +0800, Julien Rouhaud wrote:

On Tue, Feb 01, 2022 at 12:39:38PM +0800, Julien Rouhaud wrote:

Other than that the patch looks good to me, as you said we just
need a decision
on whether custom rmgrs are wanted or not.

One last thing, did you do some benchmark with a couple custom rmgr
to see how
much the O(n) access is showing up in profiles?

What kind of a test case would be reasonable there? You mean having a
lot of custom rmgrs?

I was expecting that few people would have more than one custom rmgr
loaded anyway, so a sparse array or hashtable seemed wasteful. If
custom rmgrs become popular we probably need to have a larger ID space
anyway, but it seems like overengineering to do so now.

Regards,
Jeff Davis

#10Julien Rouhaud
rjuju123@gmail.com
In reply to: Jeff Davis (#9)
Re: Extensible Rmgr for Table AMs

Hi,

On Tue, Feb 01, 2022 at 03:38:32PM -0800, Jeff Davis wrote:

On Tue, 2022-02-01 at 20:45 +0800, Julien Rouhaud wrote:

One last thing, did you do some benchmark with a couple custom rmgr
to see how
much the O(n) access is showing up in profiles?

What kind of a test case would be reasonable there? You mean having a
lot of custom rmgrs?

I was expecting that few people would have more than one custom rmgr
loaded anyway, so a sparse array or hashtable seemed wasteful. If
custom rmgrs become popular we probably need to have a larger ID space
anyway, but it seems like overengineering to do so now.

I agree that having dozen of custom rmgrs doesn't seem likely, but I also have
no idea of how much overhead you get by not doing a direct array access. I
think it would be informative to benchmark something like simple OLTP write
workload on a fast storage (or a ramdisk, or with fsync off...), with the used
rmgr being the 1st and the 2nd custom rmgr. Both scenario still seems
plausible and shouldn't degenerate on good hardware.

#11Robert Haas
robertmhaas@gmail.com
In reply to: Julien Rouhaud (#10)
Re: Extensible Rmgr for Table AMs

On Thu, Feb 3, 2022 at 12:34 AM Julien Rouhaud <rjuju123@gmail.com> wrote:

I agree that having dozen of custom rmgrs doesn't seem likely, but I also have
no idea of how much overhead you get by not doing a direct array access. I
think it would be informative to benchmark something like simple OLTP write
workload on a fast storage (or a ramdisk, or with fsync off...), with the used
rmgr being the 1st and the 2nd custom rmgr. Both scenario still seems
plausible and shouldn't degenerate on good hardware.

I think it would be hard to measure the overhead of this approach on a
macrobenchmark. That having been said, I find this a surprising
implementation choice. I think that the approaches that are most worth
considering are:

(1) reallocate the array if needed so that we can continue to just do
RmgrTable[rmid]
(2) have one array for builtins and a second array for extensions and
do rmid < RM_CUSTOM_MIN_ID ? BuiltinRmgrTable[rmid] :
ExtensionRmgrTable[rmid]
(3) change RmgrTable to be an array of pointers to structs rather than
an an array of structs. then the structs don't move around and can be
const, but the pointers can be moved into a larger array if required

I'm not really sure which is best. My intuition for what will be
cheapest on modern hardware is pretty shaky. However, I can't see how
it can be the thing the patch is doing now; a linear search seems like
it has to be the slowest option.

--
Robert Haas
EDB: http://www.enterprisedb.com

#12Julien Rouhaud
rjuju123@gmail.com
In reply to: Robert Haas (#11)
Re: Extensible Rmgr for Table AMs

Hi,

On Fri, Feb 04, 2022 at 09:10:42AM -0500, Robert Haas wrote:

On Thu, Feb 3, 2022 at 12:34 AM Julien Rouhaud <rjuju123@gmail.com> wrote:

I agree that having dozen of custom rmgrs doesn't seem likely, but I also have
no idea of how much overhead you get by not doing a direct array access. I
think it would be informative to benchmark something like simple OLTP write
workload on a fast storage (or a ramdisk, or with fsync off...), with the used
rmgr being the 1st and the 2nd custom rmgr. Both scenario still seems
plausible and shouldn't degenerate on good hardware.

I think it would be hard to measure the overhead of this approach on a
macrobenchmark.

Yeah that's also my initial thought, but I wouldn't be terribly surprised to be
wrong.

That having been said, I find this a surprising
implementation choice. I think that the approaches that are most worth
considering are:

(1) reallocate the array if needed so that we can continue to just do
RmgrTable[rmid]
(2) have one array for builtins and a second array for extensions and
do rmid < RM_CUSTOM_MIN_ID ? BuiltinRmgrTable[rmid] :
ExtensionRmgrTable[rmid]
(3) change RmgrTable to be an array of pointers to structs rather than
an an array of structs. then the structs don't move around and can be
const, but the pointers can be moved into a larger array if required

I'm not really sure which is best. My intuition for what will be
cheapest on modern hardware is pretty shaky. However, I can't see how
it can be the thing the patch is doing now; a linear search seems like
it has to be the slowest option.

I guess the idea was to have a compromise between letting rmgr authors choose
arbitrary ids to avoid any conflicts, especially with private implementations,
without wasting too much memory. But those approaches would be pretty much
incompatible with the current definition:

+#define RM_CUSTOM_MIN_ID       128
+#define RM_CUSTOM_MAX_ID       UINT8_MAX

even if you only allocate up to the max id found, nothing guarantees that you
won't get a quite high id.

#13Robert Haas
robertmhaas@gmail.com
In reply to: Julien Rouhaud (#12)
Re: Extensible Rmgr for Table AMs

On Fri, Feb 4, 2022 at 9:48 AM Julien Rouhaud <rjuju123@gmail.com> wrote:

I guess the idea was to have a compromise between letting rmgr authors choose
arbitrary ids to avoid any conflicts, especially with private implementations,
without wasting too much memory. But those approaches would be pretty much
incompatible with the current definition:

+#define RM_CUSTOM_MIN_ID       128
+#define RM_CUSTOM_MAX_ID       UINT8_MAX

even if you only allocate up to the max id found, nothing guarantees that you
won't get a quite high id.

Right, which I guess raises another question: if the maximum is
UINT8_MAX, which BTW I find perfectly reasonable, why are we not just
defining this as an array of size 256? There's no point in adding code
complexity to save a few kB of memory.

--
Robert Haas
EDB: http://www.enterprisedb.com

#14Julien Rouhaud
rjuju123@gmail.com
In reply to: Robert Haas (#13)
Re: Extensible Rmgr for Table AMs

On Fri, Feb 04, 2022 at 09:53:09AM -0500, Robert Haas wrote:

On Fri, Feb 4, 2022 at 9:48 AM Julien Rouhaud <rjuju123@gmail.com> wrote:

I guess the idea was to have a compromise between letting rmgr authors choose
arbitrary ids to avoid any conflicts, especially with private implementations,
without wasting too much memory. But those approaches would be pretty much
incompatible with the current definition:

+#define RM_CUSTOM_MIN_ID       128
+#define RM_CUSTOM_MAX_ID       UINT8_MAX

even if you only allocate up to the max id found, nothing guarantees that you
won't get a quite high id.

Right, which I guess raises another question: if the maximum is
UINT8_MAX, which BTW I find perfectly reasonable, why are we not just
defining this as an array of size 256? There's no point in adding code
complexity to save a few kB of memory.

Agreed, especially if combined with your suggested approach 3 (array of
pointers).

#15Jeff Davis
pgsql@j-davis.com
In reply to: Julien Rouhaud (#14)
1 attachment(s)
Re: Extensible Rmgr for Table AMs

On Fri, 2022-02-04 at 22:56 +0800, Julien Rouhaud wrote:

Right, which I guess raises another question: if the maximum is
UINT8_MAX, which BTW I find perfectly reasonable, why are we not
just
defining this as an array of size 256? There's no point in adding
code
complexity to save a few kB of memory.

Agreed, especially if combined with your suggested approach 3 (array
of
pointers).

Implemented and attached. I also updated pg_waldump and pg_rewind to do
something reasonable.

Additionally, I now have a reasonably complete implementation of a
custom resource manager now:

https://github.com/citusdata/citus/tree/custom-rmgr-15

(Not committed or intended to actually be used right now -- just a
POC.)

Offline, Andres mentioned that I should test recovery performance if we
take your approach, because making the RmgrTable non-const could impact
optimizations. Not sure if that would be a practical concern compared
to all the other work done at REDO time.

Regards,
Jeff Davis

Attachments:

0001-Extensible-rmgr.patchtext/x-patch; charset=UTF-8; name=0001-Extensible-rmgr.patchDownload
From faa7eb847fab30c0fbe47e703d1c2d64f84ba51d Mon Sep 17 00:00:00 2001
From: Jeff Davis <jeff@j-davis.com>
Date: Sat, 6 Nov 2021 13:01:38 -0700
Subject: [PATCH] Extensible rmgr.

Allow extensions to specify a new custom rmgr, which allows
specialized WAL. This is meant to be used by a custom Table Access
Method, which would not otherwise be able to offer logical
decoding/replication. It may also be used by new Index Access Methods.

Prior to this commit, only Generic WAL was available, which offers
support for recovery and physical replication but not logical
replication.

Reviewed-by: Julien Rouhaud
Discussion: https://postgr.es/m/ed1fb2e22d15d3563ae0eb610f7b61bb15999c0a.camel%40j-davis.com
---
 src/backend/access/transam/rmgr.c         | 87 ++++++++++++++++++++++-
 src/backend/access/transam/xlogreader.c   |  2 +-
 src/backend/access/transam/xlogrecovery.c | 29 +++-----
 src/backend/replication/logical/decode.c  |  4 +-
 src/backend/utils/misc/guc.c              |  6 +-
 src/bin/pg_rewind/parsexlog.c             | 11 +--
 src/bin/pg_waldump/pg_waldump.c           | 27 +++----
 src/bin/pg_waldump/rmgrdesc.c             | 32 ++++++++-
 src/bin/pg_waldump/rmgrdesc.h             |  2 +-
 src/include/access/rmgr.h                 | 11 ++-
 src/include/access/xlog_internal.h        |  5 +-
 11 files changed, 168 insertions(+), 48 deletions(-)

diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index f8847d5aebf..1805596071e 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -24,6 +24,7 @@
 #include "commands/dbcommands_xlog.h"
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
+#include "miscadmin.h"
 #include "replication/decode.h"
 #include "replication/message.h"
 #include "replication/origin.h"
@@ -32,8 +33,90 @@
 
 /* must be kept in sync with RmgrData definition in xlog_internal.h */
 #define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
-	{ name, redo, desc, identify, startup, cleanup, mask, decode },
+	&(struct RmgrData){ name, redo, desc, identify, startup, cleanup, mask, decode },
 
-const RmgrData RmgrTable[RM_MAX_ID + 1] = {
+static RmgrData *RmgrTable[RM_MAX_ID + 1] = {
 #include "access/rmgrlist.h"
 };
+
+/*
+ * Start up all resource managers.
+ */
+void
+StartupResourceManagers()
+{
+	for (int i = 0; i < RM_MAX_ID; i++)
+	{
+		if (RmgrTable[i] == NULL)
+			continue;
+
+		if (RmgrTable[i]->rm_startup != NULL)
+			RmgrTable[i]->rm_startup();
+	}
+}
+
+/*
+ * Clean up all resource managers.
+ */
+void
+CleanupResourceManagers()
+{
+	for (int i = 0; i < RM_MAX_ID; i++)
+	{
+		if (RmgrTable[i] == NULL)
+			continue;
+
+		if (RmgrTable[i]->rm_cleanup != NULL)
+			RmgrTable[i]->rm_cleanup();
+	}
+}
+
+/*
+ * Register a new custom rmgr.
+ *
+ * Refer to https://wiki.postgresql.org/wiki/ExtensibleRmgr to reserve a
+ * unique RmgrId for your extension, to avoid conflicts. During development,
+ * use RM_EXPERIMENTAL_ID.
+ */
+void
+RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr)
+{
+	if (rmid < RM_MIN_CUSTOM_ID)
+		ereport(PANIC, errmsg("custom rmgr id %d is out of range", rmid));
+
+	if (!process_shared_preload_libraries_in_progress)
+		ereport(ERROR,
+				(errmsg("custom rmgr must be registered while initializing modules in shared_preload_libraries")));
+
+	ereport(LOG,
+			(errmsg("registering custom rmgr \"%s\" with ID %d",
+					rmgr->rm_name, rmid)));
+
+	if (RmgrTable[rmid] != NULL)
+		ereport(PANIC,
+				(errmsg("custom rmgr ID %d already registered with name \"%s\"",
+						rmid, RmgrTable[rmid]->rm_name)));
+
+	/* check for existing rmgr with the same name */
+	for (int i = 0; i <= RM_MAX_ID; i++)
+	{
+		const RmgrData *existing_rmgr = RmgrTable[i];
+
+		if (existing_rmgr == NULL)
+			continue;
+
+		if (!strcmp(existing_rmgr->rm_name, rmgr->rm_name))
+			ereport(PANIC,
+					(errmsg("custom rmgr \"%s\" has the same name as builtin rmgr",
+							existing_rmgr->rm_name)));
+	}
+
+	/* register it */
+	RmgrTable[rmid] = rmgr;
+}
+
+RmgrData *
+GetRmgr(RmgrId rmid)
+{
+	return RmgrTable[rmid];
+}
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index e437c429920..e2c4d97b91f 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -1102,7 +1102,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
 							  (uint32) SizeOfXLogRecord, record->xl_tot_len);
 		return false;
 	}
-	if (record->xl_rmid > RM_MAX_ID)
+	if (record->xl_rmid > RM_MAX_BUILTIN_ID && record->xl_rmid < RM_MIN_CUSTOM_ID)
 	{
 		report_invalid_record(state,
 							  "invalid resource manager ID %u at %X/%X",
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 9feea3e6ec9..e903af95c0a 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -1531,7 +1531,6 @@ ShutdownWalRecovery(void)
 void
 PerformWalRecovery(void)
 {
-	int			rmid;
 	XLogRecord *record;
 	bool		reachedRecoveryTarget = false;
 	TimeLineID	replayTLI;
@@ -1604,12 +1603,7 @@ PerformWalRecovery(void)
 
 		InRedo = true;
 
-		/* Initialize resource managers */
-		for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
-		{
-			if (RmgrTable[rmid].rm_startup != NULL)
-				RmgrTable[rmid].rm_startup();
-		}
+		StartupResourceManagers();
 
 		ereport(LOG,
 				(errmsg("redo starts at %X/%X",
@@ -1746,12 +1740,7 @@ PerformWalRecovery(void)
 			}
 		}
 
-		/* Allow resource managers to do any required cleanup. */
-		for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
-		{
-			if (RmgrTable[rmid].rm_cleanup != NULL)
-				RmgrTable[rmid].rm_cleanup();
-		}
+		CleanupResourceManagers();
 
 		ereport(LOG,
 				(errmsg("redo done at %X/%X system usage: %s",
@@ -1871,7 +1860,7 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
 		xlogrecovery_redo(xlogreader, *replayTLI);
 
 	/* Now apply the WAL record itself */
-	RmgrTable[record->xl_rmid].rm_redo(xlogreader);
+	GetRmgr(record->xl_rmid)->rm_redo(xlogreader);
 
 	/*
 	 * After redo, check whether the backup pages associated with the WAL
@@ -2101,16 +2090,16 @@ xlog_outdesc(StringInfo buf, XLogReaderState *record)
 	uint8		info = XLogRecGetInfo(record);
 	const char *id;
 
-	appendStringInfoString(buf, RmgrTable[rmid].rm_name);
+	appendStringInfoString(buf, GetRmgr(rmid)->rm_name);
 	appendStringInfoChar(buf, '/');
 
-	id = RmgrTable[rmid].rm_identify(info);
+	id = GetRmgr(rmid)->rm_identify(info);
 	if (id == NULL)
 		appendStringInfo(buf, "UNKNOWN (%X): ", info & ~XLR_INFO_MASK);
 	else
 		appendStringInfo(buf, "%s: ", id);
 
-	RmgrTable[rmid].rm_desc(buf, record);
+	GetRmgr(rmid)->rm_desc(buf, record);
 }
 
 #ifdef WAL_DEBUG
@@ -2339,10 +2328,10 @@ verifyBackupPageConsistency(XLogReaderState *record)
 		 * If masking function is defined, mask both the primary and replay
 		 * images
 		 */
-		if (RmgrTable[rmid].rm_mask != NULL)
+		if (GetRmgr(rmid)->rm_mask != NULL)
 		{
-			RmgrTable[rmid].rm_mask(replay_image_masked, blkno);
-			RmgrTable[rmid].rm_mask(primary_image_masked, blkno);
+			GetRmgr(rmid)->rm_mask(replay_image_masked, blkno);
+			GetRmgr(rmid)->rm_mask(primary_image_masked, blkno);
 		}
 
 		/* Time to compare the primary and replay images. */
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 77bc7aea7a0..f49e1bbacd1 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -117,8 +117,8 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 
 	rmid = XLogRecGetRmid(record);
 
-	if (RmgrTable[rmid].rm_decode != NULL)
-		RmgrTable[rmid].rm_decode(ctx, &buf);
+	if (GetRmgr(rmid)->rm_decode != NULL)
+		GetRmgr(rmid)->rm_decode(ctx, &buf);
 	else
 	{
 		/* just deal with xid, and done */
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 932aefc777d..d6e729b51bc 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -11753,7 +11753,7 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source)
 		if (pg_strcasecmp(tok, "all") == 0)
 		{
 			for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
-				if (RmgrTable[rmid].rm_mask != NULL)
+				if (GetRmgr(rmid) != NULL && GetRmgr(rmid)->rm_mask != NULL)
 					newwalconsistency[rmid] = true;
 			found = true;
 		}
@@ -11765,8 +11765,8 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source)
 			 */
 			for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
 			{
-				if (pg_strcasecmp(tok, RmgrTable[rmid].rm_name) == 0 &&
-					RmgrTable[rmid].rm_mask != NULL)
+				if (GetRmgr(rmid) != NULL && GetRmgr(rmid)->rm_mask != NULL &&
+					pg_strcasecmp(tok, GetRmgr(rmid)->rm_name) == 0)
 				{
 					newwalconsistency[rmid] = true;
 					found = true;
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 7cfa169e9b9..470134248b9 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -25,8 +25,8 @@
 #include "pg_rewind.h"
 
 /*
- * RmgrNames is an array of resource manager names, to make error messages
- * a bit nicer.
+ * RmgrNames is an array of the built-in resource manager names, to make error
+ * messages a bit nicer.
  */
 #define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
   name,
@@ -35,6 +35,9 @@ static const char *RmgrNames[RM_MAX_ID + 1] = {
 #include "access/rmgrlist.h"
 };
 
+#define RmgrName(rmid) (((rmid) <= RM_MAX_BUILTIN_ID) ? \
+						RmgrNames[rmid] : "custom")
+
 static void extractPageInfo(XLogReaderState *record);
 
 static int	xlogreadfd = -1;
@@ -427,9 +430,9 @@ extractPageInfo(XLogReaderState *record)
 		 * track that change.
 		 */
 		pg_fatal("WAL record modifies a relation, but record type is not recognized: "
-				 "lsn: %X/%X, rmgr: %s, info: %02X",
+				 "lsn: %X/%X, rmid: %d, rmgr: %s, info: %02X",
 				 LSN_FORMAT_ARGS(record->ReadRecPtr),
-				 RmgrNames[rmid], info);
+				 rmid, RmgrName(rmid), info);
 	}
 
 	for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index fc081adfb8c..d424a7afca8 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -51,7 +51,7 @@ typedef struct XLogDumpConfig
 	bool		stats_per_record;
 
 	/* filter options */
-	bool		filter_by_rmgr[RM_MAX_ID + 1];
+	bool		filter_by_rmgr[RM_MAX_BUILTIN_ID + 1];
 	bool		filter_by_rmgr_enabled;
 	TransactionId filter_by_xid;
 	bool		filter_by_xid_enabled;
@@ -71,8 +71,8 @@ typedef struct XLogDumpStats
 	uint64		count;
 	XLogRecPtr	startptr;
 	XLogRecPtr	endptr;
-	Stats		rmgr_stats[RM_NEXT_ID];
-	Stats		record_stats[RM_NEXT_ID][MAX_XLINFO_TYPES];
+	Stats		rmgr_stats[RM_MAX_ID + 1];
+	Stats		record_stats[RM_MAX_ID + 1][MAX_XLINFO_TYPES];
 } XLogDumpStats;
 
 #define fatal_error(...) do { pg_log_fatal(__VA_ARGS__); exit(EXIT_FAILURE); } while(0)
@@ -95,9 +95,9 @@ print_rmgr_list(void)
 {
 	int			i;
 
-	for (i = 0; i <= RM_MAX_ID; i++)
+	for (i = 0; i <= RM_MAX_BUILTIN_ID; i++)
 	{
-		printf("%s\n", RmgrDescTable[i].rm_name);
+		printf("%s\n", GetRmgrDesc(i)->rm_name);
 	}
 }
 
@@ -473,7 +473,7 @@ static void
 XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record)
 {
 	const char *id;
-	const RmgrDescData *desc = &RmgrDescTable[XLogRecGetRmid(record)];
+	const RmgrDescData *desc = GetRmgrDesc(XLogRecGetRmid(record));
 	uint32		rec_len;
 	uint32		fpi_len;
 	RelFileNode rnode;
@@ -658,7 +658,7 @@ XLogDumpDisplayStats(XLogDumpConfig *config, XLogDumpStats *stats)
 	 * calculate column totals.
 	 */
 
-	for (ri = 0; ri < RM_NEXT_ID; ri++)
+	for (ri = 0; ri < RM_MAX_ID; ri++)
 	{
 		total_count += stats->rmgr_stats[ri].count;
 		total_rec_len += stats->rmgr_stats[ri].rec_len;
@@ -679,13 +679,13 @@ XLogDumpDisplayStats(XLogDumpConfig *config, XLogDumpStats *stats)
 		   "Type", "N", "(%)", "Record size", "(%)", "FPI size", "(%)", "Combined size", "(%)",
 		   "----", "-", "---", "-----------", "---", "--------", "---", "-------------", "---");
 
-	for (ri = 0; ri < RM_NEXT_ID; ri++)
+	for (ri = 0; ri <= RM_MAX_ID; ri++)
 	{
 		uint64		count,
 					rec_len,
 					fpi_len,
 					tot_len;
-		const RmgrDescData *desc = &RmgrDescTable[ri];
+		const RmgrDescData *desc = GetRmgrDesc(ri);
 
 		if (!config->stats_per_record)
 		{
@@ -694,6 +694,9 @@ XLogDumpDisplayStats(XLogDumpConfig *config, XLogDumpStats *stats)
 			fpi_len = stats->rmgr_stats[ri].fpi_len;
 			tot_len = rec_len + fpi_len;
 
+			if (ri > RM_MAX_BUILTIN_ID && count == 0)
+				continue;
+
 			XLogDumpStatsRow(desc->rm_name,
 							 count, total_count, rec_len, total_rec_len,
 							 fpi_len, total_fpi_len, tot_len, total_len);
@@ -913,16 +916,16 @@ main(int argc, char **argv)
 						exit(EXIT_SUCCESS);
 					}
 
-					for (i = 0; i <= RM_MAX_ID; i++)
+					for (i = 0; i <= RM_MAX_BUILTIN_ID; i++)
 					{
-						if (pg_strcasecmp(optarg, RmgrDescTable[i].rm_name) == 0)
+						if (pg_strcasecmp(optarg, GetRmgrDesc(i)->rm_name) == 0)
 						{
 							config.filter_by_rmgr[i] = true;
 							config.filter_by_rmgr_enabled = true;
 							break;
 						}
 					}
-					if (i > RM_MAX_ID)
+					if (i > RM_MAX_BUILTIN_ID)
 					{
 						pg_log_error("resource manager \"%s\" does not exist",
 									 optarg);
diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c
index 6a4ebd1310b..dd8b1bcd9bf 100644
--- a/src/bin/pg_waldump/rmgrdesc.c
+++ b/src/bin/pg_waldump/rmgrdesc.c
@@ -35,6 +35,36 @@
 #define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
 	{ name, desc, identify},
 
-const RmgrDescData RmgrDescTable[RM_MAX_ID + 1] = {
+static const RmgrDescData RmgrDescTable[RM_MAX_BUILTIN_ID + 1] = {
 #include "access/rmgrlist.h"
 };
+
+/*
+ * No information on custom resource managers; just print the ID.
+ */
+static void
+default_desc(StringInfo buf, XLogReaderState *record)
+{
+	appendStringInfo(buf, "rmid: %d", XLogRecGetRmid(record));
+}
+
+/*
+ * No information on custom resource managers; just return NULL and let the
+ * caller handle it.
+ */
+static const char *
+default_identify(uint8 info)
+{
+	return NULL;
+}
+
+const RmgrDescData *
+GetRmgrDesc(RmgrId rmid)
+{
+	if (rmid <= RM_MAX_BUILTIN_ID)
+		return &RmgrDescTable[rmid];
+	else
+	{
+		return &(RmgrDescData){ "custom", default_desc, default_identify };
+	}
+}
diff --git a/src/bin/pg_waldump/rmgrdesc.h b/src/bin/pg_waldump/rmgrdesc.h
index 42f8483b482..f733cd467d5 100644
--- a/src/bin/pg_waldump/rmgrdesc.h
+++ b/src/bin/pg_waldump/rmgrdesc.h
@@ -18,6 +18,6 @@ typedef struct RmgrDescData
 	const char *(*rm_identify) (uint8 info);
 } RmgrDescData;
 
-extern const RmgrDescData RmgrDescTable[];
+extern const RmgrDescData *GetRmgrDesc(RmgrId rmid);
 
 #endif							/* RMGRDESC_H */
diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h
index d9b512630ca..13b65567a5f 100644
--- a/src/include/access/rmgr.h
+++ b/src/include/access/rmgr.h
@@ -30,6 +30,15 @@ typedef enum RmgrIds
 
 #undef PG_RMGR
 
-#define RM_MAX_ID				(RM_NEXT_ID - 1)
+#define RM_MAX_ID				UINT8_MAX
+#define RM_MAX_BUILTIN_ID		(RM_NEXT_ID - 1)
+#define RM_MIN_CUSTOM_ID		128
+
+/*
+ * RmgrId to use for extensions that require an RmgrId, but are still in
+ * development and have not reserved their own unique RmgrId yet. See:
+ * https://wiki.postgresql.org/wiki/ExtensibleRmgr
+ */
+#define RM_EXPERIMENTAL_ID		128
 
 #endif							/* RMGR_H */
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index 0e94833129a..1b8a0067036 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -319,7 +319,10 @@ typedef struct RmgrData
 							  struct XLogRecordBuffer *buf);
 } RmgrData;
 
-extern const RmgrData RmgrTable[];
+extern void StartupResourceManagers(void);
+extern void CleanupResourceManagers(void);
+extern void RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr);
+extern RmgrData *GetRmgr(RmgrId rmid);
 
 /*
  * Exported to support xlog switching from checkpointer
-- 
2.17.1

#16Andres Freund
andres@anarazel.de
In reply to: Jeff Davis (#15)
Re: Extensible Rmgr for Table AMs

Hi,

On 2022-03-23 21:43:08 -0700, Jeff Davis wrote:

/* must be kept in sync with RmgrData definition in xlog_internal.h */
#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
-	{ name, redo, desc, identify, startup, cleanup, mask, decode },
+	&(struct RmgrData){ name, redo, desc, identify, startup, cleanup, mask, decode },
-const RmgrData RmgrTable[RM_MAX_ID + 1] = {
+static RmgrData *RmgrTable[RM_MAX_ID + 1] = {
#include "access/rmgrlist.h"
};

I think this has been discussed before, but to me it's not obvious that it's a
good idea to change RmgrTable from RmgrData to RmgrData *. That adds an
indirection, without obvious benefit.

+
+/*
+ * Start up all resource managers.
+ */
+void
+StartupResourceManagers()

(void)

+void
+RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr)
+{
+	if (rmid < RM_MIN_CUSTOM_ID)
+		ereport(PANIC, errmsg("custom rmgr id %d is out of range", rmid));
+
+	if (!process_shared_preload_libraries_in_progress)
+		ereport(ERROR,
+				(errmsg("custom rmgr must be registered while initializing modules in shared_preload_libraries")));
+
+	ereport(LOG,
+			(errmsg("registering custom rmgr \"%s\" with ID %d",
+					rmgr->rm_name, rmid)));
+
+	if (RmgrTable[rmid] != NULL)
+		ereport(PANIC,
+				(errmsg("custom rmgr ID %d already registered with name \"%s\"",
+						rmid, RmgrTable[rmid]->rm_name)));
+
+	/* check for existing rmgr with the same name */
+	for (int i = 0; i <= RM_MAX_ID; i++)
+	{
+		const RmgrData *existing_rmgr = RmgrTable[i];
+
+		if (existing_rmgr == NULL)
+			continue;
+
+		if (!strcmp(existing_rmgr->rm_name, rmgr->rm_name))
+			ereport(PANIC,
+					(errmsg("custom rmgr \"%s\" has the same name as builtin rmgr",
+							existing_rmgr->rm_name)));
+	}
+
+	/* register it */
+	RmgrTable[rmid] = rmgr;
+}

Random idea: Might be worth emitting the id->name mapping just after a redo
location is determined, to make it easier to debug things.

+RmgrData *
+GetRmgr(RmgrId rmid)
+{
+	return RmgrTable[rmid];
+}

Given this is so simple, why incur the cost of a function call? Rather than
continuing to expose RmgrTable and move GetRmgr() into the header, as a static
inline? As-is this also prevent the compiler from optimizing across repeated
GetRmgr() calls (which often won't be possible anyway, but still)..

-	if (record->xl_rmid > RM_MAX_ID)
+	if (record->xl_rmid > RM_MAX_BUILTIN_ID && record->xl_rmid < RM_MIN_CUSTOM_ID)
{
report_invalid_record(state,
"invalid resource manager ID %u at %X/%X",

Shouldn't this continue to enforce RM_MAX_ID as well?

@@ -1604,12 +1603,7 @@ PerformWalRecovery(void)

InRedo = true;

-		/* Initialize resource managers */
-		for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
-		{
-			if (RmgrTable[rmid].rm_startup != NULL)
-				RmgrTable[rmid].rm_startup();
-		}
+		StartupResourceManagers();

Personally I'd rather name it ResourceManagersStartup() or RmgrStartup().

@@ -1871,7 +1860,7 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
xlogrecovery_redo(xlogreader, *replayTLI);

/* Now apply the WAL record itself */
-	RmgrTable[record->xl_rmid].rm_redo(xlogreader);
+	GetRmgr(record->xl_rmid)->rm_redo(xlogreader);

/*
* After redo, check whether the backup pages associated with the WAL

So we have just added one indirect call and one pointer indirection
(previously RmgrTable could be resolved by the linker, now it needs to be
dereferenced again), that's not too bad. I was afraid there'd be multiple
calls.

@@ -2101,16 +2090,16 @@ xlog_outdesc(StringInfo buf, XLogReaderState *record)
uint8 info = XLogRecGetInfo(record);
const char *id;

-	appendStringInfoString(buf, RmgrTable[rmid].rm_name);
+	appendStringInfoString(buf, GetRmgr(rmid)->rm_name);
appendStringInfoChar(buf, '/');
-	id = RmgrTable[rmid].rm_identify(info);
+	id = GetRmgr(rmid)->rm_identify(info);
if (id == NULL)
appendStringInfo(buf, "UNKNOWN (%X): ", info & ~XLR_INFO_MASK);
else
appendStringInfo(buf, "%s: ", id);
-	RmgrTable[rmid].rm_desc(buf, record);
+	GetRmgr(rmid)->rm_desc(buf, record);
}

Like here. It's obviously not as performance critical as replay. But it's
still a shame to add 3 calls to GetRmgr, that each then need to dereference
RmgrTable. The compiler won't be able to optimize any of that away.

@@ -117,8 +117,8 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor

rmid = XLogRecGetRmid(record);

-	if (RmgrTable[rmid].rm_decode != NULL)
-		RmgrTable[rmid].rm_decode(ctx, &buf);
+	if (GetRmgr(rmid)->rm_decode != NULL)
+		GetRmgr(rmid)->rm_decode(ctx, &buf);
else
{
/* just deal with xid, and done */

This one might actually matter, overhead wise.

@@ -473,7 +473,7 @@ static void
XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record)
{
const char *id;
-	const RmgrDescData *desc = &RmgrDescTable[XLogRecGetRmid(record)];
+	const RmgrDescData *desc = GetRmgrDesc(XLogRecGetRmid(record));
uint32		rec_len;
uint32		fpi_len;
RelFileNode rnode;
@@ -658,7 +658,7 @@ XLogDumpDisplayStats(XLogDumpConfig *config, XLogDumpStats *stats)
* calculate column totals.
*/

- for (ri = 0; ri < RM_NEXT_ID; ri++)
+ for (ri = 0; ri < RM_MAX_ID; ri++)
{
total_count += stats->rmgr_stats[ri].count;
total_rec_len += stats->rmgr_stats[ri].rec_len;

@@ -694,6 +694,9 @@ XLogDumpDisplayStats(XLogDumpConfig *config, XLogDumpStats *stats)
fpi_len = stats->rmgr_stats[ri].fpi_len;
tot_len = rec_len + fpi_len;

+			if (ri > RM_MAX_BUILTIN_ID && count == 0)
+				continue;
+

Ah, I see. I had written a concerned comment about the previous hunk...

XLogDumpStatsRow(desc->rm_name,
count, total_count, rec_len, total_rec_len,
fpi_len, total_fpi_len, tot_len, total_len);
@@ -913,16 +916,16 @@ main(int argc, char **argv)
exit(EXIT_SUCCESS);
}

-					for (i = 0; i <= RM_MAX_ID; i++)
+					for (i = 0; i <= RM_MAX_BUILTIN_ID; i++)
{
-						if (pg_strcasecmp(optarg, RmgrDescTable[i].rm_name) == 0)
+						if (pg_strcasecmp(optarg, GetRmgrDesc(i)->rm_name) == 0)
{
config.filter_by_rmgr[i] = true;
config.filter_by_rmgr_enabled = true;
break;
}
}
-					if (i > RM_MAX_ID)
+					if (i > RM_MAX_BUILTIN_ID)
{
pg_log_error("resource manager \"%s\" does not exist",
optarg);

So we can't filter by rmgr id for non-builtin rmgr's? That sucks. Maybe add a
custom:<i> syntax? Or generally allow numerical identifiers in addition to the
names?

Greetings,

Andres Freund

#17Jeff Davis
pgsql@j-davis.com
In reply to: Andres Freund (#16)
Re: Extensible Rmgr for Table AMs

On Mon, 2022-03-28 at 11:00 -0700, Andres Freund wrote:

I think this has been discussed before, but to me it's not obvious
that it's a
good idea to change RmgrTable from RmgrData to RmgrData *. That adds
an
indirection, without obvious benefit.

I did some performance tests. I created a narrow table, took a base
backup, loaded 100M records, finished the base backup. Then I recovered
using the different build combinations (patched/unpatched, clang/gcc).

compiler run1 run2
unpatched: gcc 102s 106s
patched: gcc 107s 105s
unpatched: clang 109s 110s
patched: clang 109s 111s

I don't see a clear signal that this patch worsens performance. The
102s run was the very first run, so I suspect it was just due to the
processor starting out cold. Let me know if you think the test is
testing the right paths.

Perhaps I should address your other perf concerns around GetRmgr (make
it static inline, reduce number of calls), and then leave the
indirection for the sake of cleanliness?

If you are still concerned, I can switch back to separate tables to
eliminate the indirection for built-in rmgrs. Separate rmgr tables
still require a branch (to decide which table to access), but it should
be a highly predictable one.

Regards,
Jeff Davis

#18Jeff Davis
pgsql@j-davis.com
In reply to: Andres Freund (#16)
1 attachment(s)
Re: Extensible Rmgr for Table AMs

On Mon, 2022-03-28 at 11:00 -0700, Andres Freund wrote:

+/*
+ * Start up all resource managers.
+ */
+void
+StartupResourceManagers()

(void)

Fixed.

Random idea: Might be worth emitting the id->name mapping just after
a redo
location is determined, to make it easier to debug things.

Not quite sure I understood this idea, do you mean dump all rmgrs and
the IDs when performing recovery?

+RmgrData *
+GetRmgr(RmgrId rmid)
+{
+	return RmgrTable[rmid];
+}

Given this is so simple, why incur the cost of a function call?
Rather than
continuing to expose RmgrTable and move GetRmgr() into the header, as
a static
inline?

Made it static inline.

Shouldn't this continue to enforce RM_MAX_ID as well?

Done.

Personally I'd rather name it ResourceManagersStartup() or
RmgrStartup().

Done.

Like here. It's obviously not as performance critical as replay. But
it's
still a shame to add 3 calls to GetRmgr, that each then need to
dereference
RmgrTable. The compiler won't be able to optimize any of that away.

Changed to only call it once and save it in a variable for each call
site where it makes sense.

So we can't filter by rmgr id for non-builtin rmgr's? That sucks.
Maybe add a
custom:<i> syntax? Or generally allow numerical identifiers in
addition to the
names?

Good idea. I changed it to allow "custom###" to mean the custom rmgr
with ID ### (3 digits).

I still may change it to go back to two RmgrTables (one for builtin and
one for custom) to remove the lingering performance doubts. Other than
that, and some cleanup, this is pretty close to the version I intend to
commit.

Regards,
Jeff Davis

Attachments:

v6-0001-Extensible-rmgr.patchtext/x-patch; charset=UTF-8; name=v6-0001-Extensible-rmgr.patchDownload
From 014027bc982896e7e1b6fafe4b622ed5fa45a08b Mon Sep 17 00:00:00 2001
From: Jeff Davis <jeff@j-davis.com>
Date: Sat, 6 Nov 2021 13:01:38 -0700
Subject: [PATCH] Extensible rmgr.

Allow extensions to specify a new custom rmgr, which allows
specialized WAL. This is meant to be used by a custom Table Access
Method, which would not otherwise be able to offer logical
decoding/replication. It may also be used by new Index Access Methods.

Prior to this commit, only Generic WAL was available, which offers
support for recovery and physical replication but not logical
replication.

Reviewed-by: Julien Rouhaud, Andres Freund
Discussion: https://postgr.es/m/ed1fb2e22d15d3563ae0eb610f7b61bb15999c0a.camel%40j-davis.com
---
 src/backend/access/transam/rmgr.c         | 80 ++++++++++++++++++++++-
 src/backend/access/transam/xlogreader.c   |  2 +-
 src/backend/access/transam/xlogrecovery.c | 33 ++++------
 src/backend/replication/logical/decode.c  |  8 +--
 src/backend/utils/misc/guc.c              | 10 ++-
 src/bin/pg_rewind/parsexlog.c             | 11 ++--
 src/bin/pg_waldump/pg_waldump.c           | 65 +++++++++++++-----
 src/bin/pg_waldump/rmgrdesc.c             | 60 ++++++++++++++++-
 src/bin/pg_waldump/rmgrdesc.h             |  2 +-
 src/include/access/rmgr.h                 | 15 ++++-
 src/include/access/xlog_internal.h        | 11 +++-
 11 files changed, 239 insertions(+), 58 deletions(-)

diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index f8847d5aebf..504d0a5b5ac 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -24,6 +24,7 @@
 #include "commands/dbcommands_xlog.h"
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
+#include "miscadmin.h"
 #include "replication/decode.h"
 #include "replication/message.h"
 #include "replication/origin.h"
@@ -32,8 +33,83 @@
 
 /* must be kept in sync with RmgrData definition in xlog_internal.h */
 #define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
-	{ name, redo, desc, identify, startup, cleanup, mask, decode },
+	&(struct RmgrData){ name, redo, desc, identify, startup, cleanup, mask, decode },
 
-const RmgrData RmgrTable[RM_MAX_ID + 1] = {
+RmgrData *RmgrTable[RM_MAX_ID + 1] = {
 #include "access/rmgrlist.h"
 };
+
+/*
+ * Start up all resource managers.
+ */
+void
+RmgrStartup(void)
+{
+	for (int i = 0; i < RM_MAX_ID; i++)
+	{
+		if (RmgrTable[i] == NULL)
+			continue;
+
+		if (RmgrTable[i]->rm_startup != NULL)
+			RmgrTable[i]->rm_startup();
+	}
+}
+
+/*
+ * Clean up all resource managers.
+ */
+void
+RmgrCleanup(void)
+{
+	for (int i = 0; i < RM_MAX_ID; i++)
+	{
+		if (RmgrTable[i] == NULL)
+			continue;
+
+		if (RmgrTable[i]->rm_cleanup != NULL)
+			RmgrTable[i]->rm_cleanup();
+	}
+}
+
+/*
+ * Register a new custom rmgr.
+ *
+ * Refer to https://wiki.postgresql.org/wiki/ExtensibleRmgr to reserve a
+ * unique RmgrId for your extension, to avoid conflicts. During development,
+ * use RM_EXPERIMENTAL_ID.
+ */
+void
+RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr)
+{
+	if (rmid < RM_MIN_CUSTOM_ID)
+		ereport(PANIC, errmsg("custom rmgr id %d is out of range", rmid));
+
+	if (!process_shared_preload_libraries_in_progress)
+		ereport(ERROR,
+				(errmsg("custom rmgr must be registered while initializing modules in shared_preload_libraries")));
+
+	if (RmgrTable[rmid] != NULL)
+		ereport(PANIC,
+				(errmsg("custom rmgr ID %d already registered with name \"%s\"",
+						rmid, RmgrTable[rmid]->rm_name)));
+
+	/* check for existing rmgr with the same name */
+	for (int i = 0; i <= RM_MAX_ID; i++)
+	{
+		const RmgrData *existing_rmgr = RmgrTable[i];
+
+		if (existing_rmgr == NULL)
+			continue;
+
+		if (!strcmp(existing_rmgr->rm_name, rmgr->rm_name))
+			ereport(PANIC,
+					(errmsg("custom rmgr \"%s\" has the same name as builtin rmgr",
+							existing_rmgr->rm_name)));
+	}
+
+	/* register it */
+	RmgrTable[rmid] = rmgr;
+	ereport(LOG,
+			(errmsg("registered custom rmgr \"%s\" with ID %d",
+					rmgr->rm_name, rmid)));
+}
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index e437c429920..1017e913e51 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -1102,7 +1102,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
 							  (uint32) SizeOfXLogRecord, record->xl_tot_len);
 		return false;
 	}
-	if (record->xl_rmid > RM_MAX_ID)
+	if (!RM_IS_VALID(record->xl_rmid))
 	{
 		report_invalid_record(state,
 							  "invalid resource manager ID %u at %X/%X",
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 8d2395dae25..6c661fa6e27 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -1531,7 +1531,6 @@ ShutdownWalRecovery(void)
 void
 PerformWalRecovery(void)
 {
-	int			rmid;
 	XLogRecord *record;
 	bool		reachedRecoveryTarget = false;
 	TimeLineID	replayTLI;
@@ -1604,12 +1603,7 @@ PerformWalRecovery(void)
 
 		InRedo = true;
 
-		/* Initialize resource managers */
-		for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
-		{
-			if (RmgrTable[rmid].rm_startup != NULL)
-				RmgrTable[rmid].rm_startup();
-		}
+		RmgrStartup();
 
 		ereport(LOG,
 				(errmsg("redo starts at %X/%X",
@@ -1746,12 +1740,7 @@ PerformWalRecovery(void)
 			}
 		}
 
-		/* Allow resource managers to do any required cleanup. */
-		for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
-		{
-			if (RmgrTable[rmid].rm_cleanup != NULL)
-				RmgrTable[rmid].rm_cleanup();
-		}
+		RmgrCleanup();
 
 		ereport(LOG,
 				(errmsg("redo done at %X/%X system usage: %s",
@@ -1871,7 +1860,7 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
 		xlogrecovery_redo(xlogreader, *replayTLI);
 
 	/* Now apply the WAL record itself */
-	RmgrTable[record->xl_rmid].rm_redo(xlogreader);
+	GetRmgr(record->xl_rmid)->rm_redo(xlogreader);
 
 	/*
 	 * After redo, check whether the backup pages associated with the WAL
@@ -2101,20 +2090,20 @@ rm_redo_error_callback(void *arg)
 void
 xlog_outdesc(StringInfo buf, XLogReaderState *record)
 {
-	RmgrId		rmid = XLogRecGetRmid(record);
+	RmgrData   *rmgr = GetRmgr(XLogRecGetRmid(record));
 	uint8		info = XLogRecGetInfo(record);
 	const char *id;
 
-	appendStringInfoString(buf, RmgrTable[rmid].rm_name);
+	appendStringInfoString(buf, rmgr->rm_name);
 	appendStringInfoChar(buf, '/');
 
-	id = RmgrTable[rmid].rm_identify(info);
+	id = rmgr->rm_identify(info);
 	if (id == NULL)
 		appendStringInfo(buf, "UNKNOWN (%X): ", info & ~XLR_INFO_MASK);
 	else
 		appendStringInfo(buf, "%s: ", id);
 
-	RmgrTable[rmid].rm_desc(buf, record);
+	rmgr->rm_desc(buf, record);
 }
 
 #ifdef WAL_DEBUG
@@ -2263,7 +2252,7 @@ getRecordTimestamp(XLogReaderState *record, TimestampTz *recordXtime)
 static void
 verifyBackupPageConsistency(XLogReaderState *record)
 {
-	RmgrId		rmid = XLogRecGetRmid(record);
+	RmgrData   *rmgr = GetRmgr(XLogRecGetRmid(record));
 	RelFileNode rnode;
 	ForkNumber	forknum;
 	BlockNumber blkno;
@@ -2343,10 +2332,10 @@ verifyBackupPageConsistency(XLogReaderState *record)
 		 * If masking function is defined, mask both the primary and replay
 		 * images
 		 */
-		if (RmgrTable[rmid].rm_mask != NULL)
+		if (rmgr->rm_mask != NULL)
 		{
-			RmgrTable[rmid].rm_mask(replay_image_masked, blkno);
-			RmgrTable[rmid].rm_mask(primary_image_masked, blkno);
+			rmgr->rm_mask(replay_image_masked, blkno);
+			rmgr->rm_mask(primary_image_masked, blkno);
 		}
 
 		/* Time to compare the primary and replay images. */
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 77bc7aea7a0..3bdeda6b6a9 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -94,7 +94,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 {
 	XLogRecordBuffer buf;
 	TransactionId txid;
-	RmgrId rmid;
+	RmgrData *rmgr;
 
 	buf.origptr = ctx->reader->ReadRecPtr;
 	buf.endptr = ctx->reader->EndRecPtr;
@@ -115,10 +115,10 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 								 buf.origptr);
 	}
 
-	rmid = XLogRecGetRmid(record);
+	rmgr = GetRmgr(XLogRecGetRmid(record));
 
-	if (RmgrTable[rmid].rm_decode != NULL)
-		RmgrTable[rmid].rm_decode(ctx, &buf);
+	if (rmgr->rm_decode != NULL)
+		rmgr->rm_decode(ctx, &buf);
 	else
 	{
 		/* just deal with xid, and done */
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 9e8ab1420d9..d3e7dbc3bf8 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -11775,8 +11775,11 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source)
 		if (pg_strcasecmp(tok, "all") == 0)
 		{
 			for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
-				if (RmgrTable[rmid].rm_mask != NULL)
+			{
+				RmgrData *rmgr = GetRmgr(rmid);
+				if (rmgr != NULL && rmgr->rm_mask != NULL)
 					newwalconsistency[rmid] = true;
+			}
 			found = true;
 		}
 		else
@@ -11787,8 +11790,9 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source)
 			 */
 			for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
 			{
-				if (pg_strcasecmp(tok, RmgrTable[rmid].rm_name) == 0 &&
-					RmgrTable[rmid].rm_mask != NULL)
+				RmgrData *rmgr = GetRmgr(rmid);
+				if (rmgr != NULL && rmgr->rm_mask != NULL &&
+					pg_strcasecmp(tok, rmgr->rm_name) == 0)
 				{
 					newwalconsistency[rmid] = true;
 					found = true;
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 49966e7b7fd..dfa836d1561 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -25,8 +25,8 @@
 #include "pg_rewind.h"
 
 /*
- * RmgrNames is an array of resource manager names, to make error messages
- * a bit nicer.
+ * RmgrNames is an array of the built-in resource manager names, to make error
+ * messages a bit nicer.
  */
 #define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
   name,
@@ -35,6 +35,9 @@ static const char *RmgrNames[RM_MAX_ID + 1] = {
 #include "access/rmgrlist.h"
 };
 
+#define RmgrName(rmid) (((rmid) <= RM_MAX_BUILTIN_ID) ? \
+						RmgrNames[rmid] : "custom")
+
 static void extractPageInfo(XLogReaderState *record);
 
 static int	xlogreadfd = -1;
@@ -436,9 +439,9 @@ extractPageInfo(XLogReaderState *record)
 		 * track that change.
 		 */
 		pg_fatal("WAL record modifies a relation, but record type is not recognized: "
-				 "lsn: %X/%X, rmgr: %s, info: %02X",
+				 "lsn: %X/%X, rmid: %d, rmgr: %s, info: %02X",
 				 LSN_FORMAT_ARGS(record->ReadRecPtr),
-				 RmgrNames[rmid], info);
+				 rmid, RmgrName(rmid), info);
 	}
 
 	for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index 4cb40d068a9..afd786fe7bd 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -80,8 +80,8 @@ typedef struct XLogDumpStats
 	uint64		count;
 	XLogRecPtr	startptr;
 	XLogRecPtr	endptr;
-	Stats		rmgr_stats[RM_NEXT_ID];
-	Stats		record_stats[RM_NEXT_ID][MAX_XLINFO_TYPES];
+	Stats		rmgr_stats[RM_MAX_ID + 1];
+	Stats		record_stats[RM_MAX_ID + 1][MAX_XLINFO_TYPES];
 } XLogDumpStats;
 
 #define fatal_error(...) do { pg_log_fatal(__VA_ARGS__); exit(EXIT_FAILURE); } while(0)
@@ -104,9 +104,9 @@ print_rmgr_list(void)
 {
 	int			i;
 
-	for (i = 0; i <= RM_MAX_ID; i++)
+	for (i = 0; i <= RM_MAX_BUILTIN_ID; i++)
 	{
-		printf("%s\n", RmgrDescTable[i].rm_name);
+		printf("%s\n", GetRmgrDesc(i)->rm_name);
 	}
 }
 
@@ -535,7 +535,7 @@ static void
 XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record)
 {
 	const char *id;
-	const RmgrDescData *desc = &RmgrDescTable[XLogRecGetRmid(record)];
+	const RmgrDescData *desc = GetRmgrDesc(XLogRecGetRmid(record));
 	uint32		rec_len;
 	uint32		fpi_len;
 	RelFileNode rnode;
@@ -720,7 +720,7 @@ XLogDumpDisplayStats(XLogDumpConfig *config, XLogDumpStats *stats)
 	 * calculate column totals.
 	 */
 
-	for (ri = 0; ri < RM_NEXT_ID; ri++)
+	for (ri = 0; ri < RM_MAX_ID; ri++)
 	{
 		total_count += stats->rmgr_stats[ri].count;
 		total_rec_len += stats->rmgr_stats[ri].rec_len;
@@ -741,13 +741,18 @@ XLogDumpDisplayStats(XLogDumpConfig *config, XLogDumpStats *stats)
 		   "Type", "N", "(%)", "Record size", "(%)", "FPI size", "(%)", "Combined size", "(%)",
 		   "----", "-", "---", "-----------", "---", "--------", "---", "-------------", "---");
 
-	for (ri = 0; ri < RM_NEXT_ID; ri++)
+	for (ri = 0; ri <= RM_MAX_ID; ri++)
 	{
 		uint64		count,
 					rec_len,
 					fpi_len,
 					tot_len;
-		const RmgrDescData *desc = &RmgrDescTable[ri];
+		const RmgrDescData *desc;
+
+		if (!RM_IS_VALID(ri))
+			continue;
+
+		desc = GetRmgrDesc(ri);
 
 		if (!config->stats_per_record)
 		{
@@ -756,6 +761,9 @@ XLogDumpDisplayStats(XLogDumpConfig *config, XLogDumpStats *stats)
 			fpi_len = stats->rmgr_stats[ri].fpi_len;
 			tot_len = rec_len + fpi_len;
 
+			if (RM_IS_CUSTOM(ri) && count == 0)
+				continue;
+
 			XLogDumpStatsRow(desc->rm_name,
 							 count, total_count, rec_len, total_rec_len,
 							 fpi_len, total_fpi_len, tot_len, total_len);
@@ -1000,7 +1008,7 @@ main(int argc, char **argv)
 				break;
 			case 'r':
 				{
-					int			i;
+					int			rmid;
 
 					if (pg_strcasecmp(optarg, "list") == 0)
 					{
@@ -1008,20 +1016,41 @@ main(int argc, char **argv)
 						exit(EXIT_SUCCESS);
 					}
 
-					for (i = 0; i <= RM_MAX_ID; i++)
+					/*
+					 * First look for the generated name of a custom rmgr, of
+					 * the form "custom###". We accept this form, because the
+					 * custom rmgr module is not loaded, so there's no way to
+					 * compare with the real name.
+					 */
+					if (sscanf(optarg, "custom%03d", &rmid) == 1)
 					{
-						if (pg_strcasecmp(optarg, RmgrDescTable[i].rm_name) == 0)
+						if (!RM_IS_CUSTOM(rmid))
 						{
-							config.filter_by_rmgr[i] = true;
-							config.filter_by_rmgr_enabled = true;
-							break;
+							pg_log_error("custom resource manager \"%s\" does not exist",
+										 optarg);
+							goto bad_argument;
 						}
+						config.filter_by_rmgr[rmid] = true;
+						config.filter_by_rmgr_enabled = true;
 					}
-					if (i > RM_MAX_ID)
+					else
 					{
-						pg_log_error("resource manager \"%s\" does not exist",
-									 optarg);
-						goto bad_argument;
+						/* then look for builtin rmgrs */
+						for (rmid = 0; rmid <= RM_MAX_BUILTIN_ID; rmid++)
+						{
+							if (pg_strcasecmp(optarg, GetRmgrDesc(rmid)->rm_name) == 0)
+							{
+								config.filter_by_rmgr[rmid] = true;
+								config.filter_by_rmgr_enabled = true;
+								break;
+							}
+						}
+						if (rmid > RM_MAX_BUILTIN_ID)
+						{
+							pg_log_error("resource manager \"%s\" does not exist",
+										 optarg);
+							goto bad_argument;
+						}
 					}
 				}
 				break;
diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c
index 6a4ebd1310b..0f4a4b30827 100644
--- a/src/bin/pg_waldump/rmgrdesc.c
+++ b/src/bin/pg_waldump/rmgrdesc.c
@@ -35,6 +35,64 @@
 #define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
 	{ name, desc, identify},
 
-const RmgrDescData RmgrDescTable[RM_MAX_ID + 1] = {
+static const RmgrDescData RmgrDescTable[RM_MAX_BUILTIN_ID + 1] = {
 #include "access/rmgrlist.h"
 };
+
+/*
+ * We are unable to get the real name of a custom rmgr because the module is
+ * not loaded. Generate a table of numeric names of the form "custom###" where
+ * "###" is the 3-digit resource manager ID.
+ */
+#define CUSTOM_NUMERIC_NAME_LEN sizeof("custom###")
+
+static bool CustomNumericNamesInitialized = false;
+static char CustomNumericNames[RM_N_CUSTOM_IDS][CUSTOM_NUMERIC_NAME_LEN] = {0};
+
+/*
+ * No information on custom resource managers; just print the ID.
+ */
+static void
+default_desc(StringInfo buf, XLogReaderState *record)
+{
+	appendStringInfo(buf, "rmid: %d", XLogRecGetRmid(record));
+}
+
+/*
+ * No information on custom resource managers; just return NULL and let the
+ * caller handle it.
+ */
+static const char *
+default_identify(uint8 info)
+{
+	return NULL;
+}
+
+const RmgrDescData *
+GetRmgrDesc(RmgrId rmid)
+{
+	Assert(RM_IS_VALID(rmid));
+
+	if (RM_IS_BUILTIN(rmid))
+	{
+		return &RmgrDescTable[rmid];
+	}
+	else
+	{
+		if (!CustomNumericNamesInitialized)
+		{
+			for (int i = 0; i < RM_N_CUSTOM_IDS; i++)
+			{
+				snprintf(CustomNumericNames[i], CUSTOM_NUMERIC_NAME_LEN,
+						 "custom%03d", i + RM_MIN_CUSTOM_ID);
+			}
+			CustomNumericNamesInitialized = true;
+		}
+
+		return &(RmgrDescData) {
+				CustomNumericNames[rmid - RM_MIN_CUSTOM_ID],
+				default_desc,
+				default_identify
+				};
+	}
+}
diff --git a/src/bin/pg_waldump/rmgrdesc.h b/src/bin/pg_waldump/rmgrdesc.h
index 42f8483b482..f733cd467d5 100644
--- a/src/bin/pg_waldump/rmgrdesc.h
+++ b/src/bin/pg_waldump/rmgrdesc.h
@@ -18,6 +18,6 @@ typedef struct RmgrDescData
 	const char *(*rm_identify) (uint8 info);
 } RmgrDescData;
 
-extern const RmgrDescData RmgrDescTable[];
+extern const RmgrDescData *GetRmgrDesc(RmgrId rmid);
 
 #endif							/* RMGRDESC_H */
diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h
index d9b512630ca..6e407f5df66 100644
--- a/src/include/access/rmgr.h
+++ b/src/include/access/rmgr.h
@@ -30,6 +30,19 @@ typedef enum RmgrIds
 
 #undef PG_RMGR
 
-#define RM_MAX_ID				(RM_NEXT_ID - 1)
+#define RM_MAX_ID				UINT8_MAX
+#define RM_MAX_BUILTIN_ID		(RM_NEXT_ID - 1)
+#define RM_MIN_CUSTOM_ID		128
+#define RM_N_CUSTOM_IDS			(RM_MAX_ID - RM_MIN_CUSTOM_ID + 1)
+#define RM_IS_BUILTIN(rmid)		((rmid) <= RM_MAX_BUILTIN_ID)
+#define RM_IS_CUSTOM(rmid)		((rmid) >= RM_MIN_CUSTOM_ID && (rmid) < RM_MAX_ID)
+#define RM_IS_VALID(rmid)		(RM_IS_BUILTIN((rmid)) || RM_IS_CUSTOM((rmid)))
+
+/*
+ * RmgrId to use for extensions that require an RmgrId, but are still in
+ * development and have not reserved their own unique RmgrId yet. See:
+ * https://wiki.postgresql.org/wiki/ExtensibleRmgr
+ */
+#define RM_EXPERIMENTAL_ID		128
 
 #endif							/* RMGR_H */
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index 0e94833129a..ecbe0fa1343 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -319,7 +319,16 @@ typedef struct RmgrData
 							  struct XLogRecordBuffer *buf);
 } RmgrData;
 
-extern const RmgrData RmgrTable[];
+extern RmgrData *RmgrTable[];
+extern void RmgrStartup(void);
+extern void RmgrCleanup(void);
+extern void RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr);
+
+static inline RmgrData *
+GetRmgr(RmgrId rmid)
+{
+	return RmgrTable[rmid];
+}
 
 /*
  * Exported to support xlog switching from checkpointer
-- 
2.17.1

#19Andres Freund
andres@anarazel.de
In reply to: Jeff Davis (#17)
Re: Extensible Rmgr for Table AMs

Hi,

On 2022-03-31 14:20:51 -0700, Jeff Davis wrote:

If you are still concerned, I can switch back to separate tables to
eliminate the indirection for built-in rmgrs. Separate rmgr tables
still require a branch (to decide which table to access), but it should
be a highly predictable one.

I still think the easiest and fastest would be to just make RmgrTable longer,
and not const. When registering, copy the caller provided struct into the
respective RmgrData element. Yes, we'd waste a bit of space at the end of the
array, but it's typically not going to be touched and thus not be backed by
"actual" memory.

Greetings,

Andres Freund

#20Bharath Rupireddy
bharath.rupireddyforpostgres@gmail.com
In reply to: Jeff Davis (#18)
Re: Extensible Rmgr for Table AMs

On Mon, Apr 4, 2022 at 8:33 AM Jeff Davis <pgsql@j-davis.com> wrote:

I still may change it to go back to two RmgrTables (one for builtin and
one for custom) to remove the lingering performance doubts. Other than
that, and some cleanup, this is pretty close to the version I intend to
commit.

I just had a quick look over the v6 patch, few comments:

1) Why can't rmid be chosen by the extensions in sequential order from
(129 till 255), say, to start with a columnar extension can choose
129, another extension can register 130 and so on right? This way, the
RmgrStartup, RmgrCleanup, and XLogDumpDisplayRecord can avoid looping
over all the 256 entries, with a global variable like
current_max_rmid(representing the current number of rmgers)? Is there
any specific reason to allow them to choose rmgrid randomly between
129 till 255? Am I missing some discussion here?
2) RM_MAX_ID is now UINT8_MAX - do we need to make it configurable? or
do we think that only a few (127) custom rmgrs can exist?
3) Do we need to talk about extensible rmgrs and
https://wiki.postgresql.org/wiki/ExtensibleRmgr in documentation
somewhere? This will help extension developers to refer when needed.
4) Do we need to change this description?
       <para>
        The default value of this setting is the empty string, which disables
        the feature.  It can be set to <literal>all</literal> to check all
        records, or to a comma-separated list of resource managers to check
        only records originating from those resource managers.  Currently,
        the supported resource managers are <literal>heap</literal>,
        <literal>heap2</literal>, <literal>btree</literal>,
<literal>hash</literal>,
        <literal>gin</literal>, <literal>gist</literal>,
<literal>sequence</literal>,
        <literal>spgist</literal>, <literal>brin</literal>, and
<literal>generic</literal>. Onl
        superusers can change this setting.
5) Do we need to add a PGDLLIMPORT qualifier to RegisterCustomRmgr()
(possibly for RmgrStartup, RmgrTable, RmgrCleanup as well?)  so that
the Windows versions of extensions can use this feature?
6) Should the below strcmp pg_strcmp? The custom rmgrs can still use
rm_name with different cases and below code fail to catch it?
+ if (!strcmp(existing_rmgr->rm_name, rmgr->rm_name))
+ ereport(PANIC,
+ (errmsg("custom rmgr \"%s\" has the same name as builtin rmgr",
+ existing_rmgr->rm_name)));
7) What's the intention of the below code? It seems like below it
checks if there's already a rmgr with the given name (RM_MAX_ID). Had
it been RM_MAX_BUILTIN_ID instead of  RM_MAX_ID, the error message
does make sense. Is the intention here to not have duplicate rmgrs in
the entire RM_MAX_ID(both builtin and custom)?
+ /* check for existing rmgr with the same name */
+ for (int i = 0; i <= RM_MAX_ID; i++)
+ {
+ const RmgrData *existing_rmgr = RmgrTable[i];
+
+ if (existing_rmgr == NULL)
+ continue;
+
+ if (!strcmp(existing_rmgr->rm_name, rmgr->rm_name))
+ ereport(PANIC,
+ (errmsg("custom rmgr \"%s\" has the same name as builtin rmgr",
+ existing_rmgr->rm_name)));
8) Per https://wiki.postgresql.org/wiki/ExtensibleRmgr, 128 is
reserved, can we have it as a macro? Or something like (RM_MAX_ID/2+1)
+#define RM_MIN_CUSTOM_ID 128
9) Thinking if there's a way to test the code, maybe exposing
RegisterCustomRmgr as a function? I think we need to have a dummy
extension that will be used for testing this kind of patches/features
but that's a separate discussion IMO.

Regards,
Bharath Rupireddy.

#21Jeff Davis
pgsql@j-davis.com
In reply to: Andres Freund (#19)
Re: Extensible Rmgr for Table AMs

On Sun, 2022-04-03 at 21:33 -0700, Andres Freund wrote:

I still think the easiest and fastest would be to just make RmgrTable
longer,
and not const. When registering, copy the caller provided struct into
the
respective RmgrData element. Yes, we'd waste a bit of space at the
end of the
array, but it's typically not going to be touched and thus not be
backed by
"actual" memory.

Sounds good to me. I tried to break down the performance between these
approaches and didn't get a clear signal, so I'll go with your
intuition here.

Posting new patch in response to Bharath's review, which will include
this change.

Note that GetRmgr() also has an unlikely branch where it tests for the
validity of the rmgr before using it, so that it can throw a nice error
message if someone forgot to include the module in
shared_preload_libraries. I expect this will be highly predictable and
therefore not a problem.

Regards,
Jeff Davis

#22Jeff Davis
pgsql@j-davis.com
In reply to: Bharath Rupireddy (#20)
1 attachment(s)
Re: Extensible Rmgr for Table AMs

On Mon, 2022-04-04 at 11:15 +0530, Bharath Rupireddy wrote:

1) Why can't rmid be chosen by the extensions in sequential order
from
(129 till 255), say, to start with a columnar extension can choose
129, another extension can register 130 and so on right?

I'm not sure what you mean by "chosen by the extensions in sequential
order". If you mean:

(a) that it would depend on the library load order, and that postgres
would assign the ID; then that won't work because the rmgr IDs need to
be stable across restarts and config changes

(b) that there should be a convention where people reserve IDs on the
wiki page in sequential order; then that's fine with me

2) RM_MAX_ID is now UINT8_MAX - do we need to make it configurable?
or
do we think that only a few (127) custom rmgrs can exist?

127 should be plenty for quite a long time. If we need more, we can
come up with a different solution (which is OK because the WAL format
changes in new major versions).

3) Do we need to talk about extensible rmgrs and
https://wiki.postgresql.org/wiki/ExtensibleRmgr in documentation
somewhere? This will help extension developers to refer when needed.

We don't have user-facing documentation for every extension hook, and I
don't think it would be a good idea to document this one (at least in
the user-facing docs). Otherwise, we'd have to document the whole
resource manager interface, and that seems like way too much of the
internals.

4) Do we need to change this description?
<para>
The default value of this setting is the empty string, which
disables
the feature. It can be set to <literal>all</literal> to
check all
records, or to a comma-separated list of resource managers to
check
only records originating from those resource
managers. Currently,
the supported resource managers are <literal>heap</literal>,
<literal>heap2</literal>, <literal>btree</literal>,
<literal>hash</literal>,
<literal>gin</literal>, <literal>gist</literal>,
<literal>sequence</literal>,
<literal>spgist</literal>, <literal>brin</literal>, and
<literal>generic</literal>. Onl
superusers can change this setting.

Yes, you're right, I updated the docs for pg_waldump and the
wal_consistency_checking GUC.

5) Do we need to add a PGDLLIMPORT qualifier to RegisterCustomRmgr()
(possibly for RmgrStartup, RmgrTable, RmgrCleanup as well?) so that
the Windows versions of extensions can use this feature?

That seems to only be required for variables, not functions.

I don't think we want to mark RmgrTable this way, because it's not
intended to be accessed by extensions directly. It's accessed by
GetRmgr(), which is 'static inline', but that's also not intended to be
used by extensions.

6) Should the below strcmp pg_strcmp? The custom rmgrs can still use
rm_name with different cases and below code fail to catch it?
+ if (!strcmp(existing_rmgr->rm_name, rmgr->rm_name))
+ ereport(PANIC,
+ (errmsg("custom rmgr \"%s\" has the same name as builtin rmgr",
+ existing_rmgr->rm_name)));

There are already a lot of places where users can choose identifiers
that differ only in uppercase/lowercase. I don't see a reason to make
an exception here.

7) What's the intention of the below code? It seems like below it
checks if there's already a rmgr with the given name (RM_MAX_ID). Had
it been RM_MAX_BUILTIN_ID instead of RM_MAX_ID, the error message
does make sense. Is the intention here to not have duplicate rmgrs in
the entire RM_MAX_ID(both builtin and custom)?

Thank you. I redid a lot of the error messages and checked them out to
make sure they are helpful.

8) Per https://wiki.postgresql.org/wiki/ExtensibleRmgr, 128 is
reserved, can we have it as a macro? Or something like
(RM_MAX_ID/2+1)

It's already defined as RM_EXPERIMENTAL_ID. Is that what you meant or
did I misunderstand?

I don't see the point in defining it as RM_MAX_ID/2+1.

9) Thinking if there's a way to test the code, maybe exposing
RegisterCustomRmgr as a function? I think we need to have a dummy
extension that will be used for testing this kind of patches/features
but that's a separate discussion IMO.

It's already exposed as a C function in xlog_internal.h.

You mean as a SQL function? I don't think that makes sense. It can only
be called while processing shared_preload_libraries (on server startup)
anyway.

Other changes this version:

* implemented Andres's proposed performance change[1]/messages/by-id/20220404043337.ocjnni7hknjsibhg@alap3.anarazel.de
* fix wal_consistency_checking
* general cleanup

Regards,
Jeff Davis

[1]: /messages/by-id/20220404043337.ocjnni7hknjsibhg@alap3.anarazel.de
/messages/by-id/20220404043337.ocjnni7hknjsibhg@alap3.anarazel.de

Attachments:

v7-0001-Extensible-rmgr.patchtext/x-patch; charset=UTF-8; name=v7-0001-Extensible-rmgr.patchDownload
From 7f6acaf93d279baa464ae3ba2be173a01d969402 Mon Sep 17 00:00:00 2001
From: Jeff Davis <jeff@j-davis.com>
Date: Sat, 6 Nov 2021 13:01:38 -0700
Subject: [PATCH] Extensible rmgr.

Allow extensions to specify a new custom rmgr, which allows
specialized WAL. This is meant to be used by a custom Table Access
Method, which would not otherwise be able to offer logical
decoding/replication. It may also be used by new Index Access Methods.

Prior to this commit, only Generic WAL was available, which offers
support for recovery and physical replication but not logical
replication.

Reviewed-by: Julien Rouhaud, Andres Freund
Discussion: https://postgr.es/m/ed1fb2e22d15d3563ae0eb610f7b61bb15999c0a.camel%40j-davis.com
---
 doc/src/sgml/config.sgml                  |  9 +++
 doc/src/sgml/ref/pg_waldump.sgml          |  8 ++
 src/backend/access/transam/rmgr.c         | 91 ++++++++++++++++++++++-
 src/backend/access/transam/xlogreader.c   |  2 +-
 src/backend/access/transam/xlogrecovery.c | 33 +++-----
 src/backend/replication/logical/decode.c  |  8 +-
 src/backend/utils/misc/guc.c              | 26 +++++--
 src/bin/pg_rewind/parsexlog.c             | 11 ++-
 src/bin/pg_waldump/pg_waldump.c           | 66 +++++++++++-----
 src/bin/pg_waldump/rmgrdesc.c             | 67 ++++++++++++++++-
 src/bin/pg_waldump/rmgrdesc.h             |  2 +-
 src/include/access/rmgr.h                 | 15 +++-
 src/include/access/xlog_internal.h        | 23 +++++-
 13 files changed, 300 insertions(+), 61 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 43e4ade83e0..c80327af67a 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -11148,6 +11148,15 @@ LOG:  CleanUpLock: deleting: lock(0xb7acd844) id(24688,24696,0,0,0,1)
         <literal>spgist</literal>, <literal>brin</literal>, and <literal>generic</literal>. Only
         superusers can change this setting.
        </para>
+
+       <para>
+        Extensions may define additional resource managers with modules loaded
+        in <xref linkend="guc-shared-preload-libraries"/>. However, the names
+        of the custom resource managers are not available at the time the
+        configuration file is processed, so they must be referred to
+        numerically with the form <literal>custom###</literal>, where
+        "<literal>###</literal>" is the three-digit resource manager ID.
+       </para>
       </listitem>
      </varlistentry>
 
diff --git a/doc/src/sgml/ref/pg_waldump.sgml b/doc/src/sgml/ref/pg_waldump.sgml
index 1a05af5d972..57746d9421f 100644
--- a/doc/src/sgml/ref/pg_waldump.sgml
+++ b/doc/src/sgml/ref/pg_waldump.sgml
@@ -173,6 +173,14 @@ PostgreSQL documentation
         If <literal>list</literal> is passed as name, print a list of valid resource manager
         names, and exit.
        </para>
+       <para>
+        Extensions may define custom resource managers, but pg_waldump does
+        not load the extension module and therefore does not recognize custom
+        resource managers by name. Instead, you can specify the custom
+        resource managers as <literal>custom###</literal> where
+        "<literal>###</literal>" is the three-digit resource manager ID. Names
+        of this form will always be considered valid.
+       </para>
       </listitem>
      </varlistentry>
 
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index f8847d5aebf..d215116a912 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -24,6 +24,7 @@
 #include "commands/dbcommands_xlog.h"
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
+#include "miscadmin.h"
 #include "replication/decode.h"
 #include "replication/message.h"
 #include "replication/origin.h"
@@ -34,6 +35,94 @@
 #define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
 	{ name, redo, desc, identify, startup, cleanup, mask, decode },
 
-const RmgrData RmgrTable[RM_MAX_ID + 1] = {
+RmgrData RmgrTable[RM_MAX_ID + 1] = {
 #include "access/rmgrlist.h"
 };
+
+/*
+ * Start up all resource managers.
+ */
+void
+RmgrStartup(void)
+{
+	for (int rmid = 0; rmid <= RM_MAX_ID; rmid++)
+	{
+		if (!RmgrExists(rmid))
+			continue;
+
+		if (RmgrTable[rmid].rm_startup != NULL)
+			RmgrTable[rmid].rm_startup();
+	}
+}
+
+/*
+ * Clean up all resource managers.
+ */
+void
+RmgrCleanup(void)
+{
+	for (int rmid = 0; rmid <= RM_MAX_ID; rmid++)
+	{
+		if (!RmgrExists(rmid))
+			continue;
+
+		if (RmgrTable[rmid].rm_cleanup != NULL)
+			RmgrTable[rmid].rm_cleanup();
+	}
+}
+
+/*
+ * Emit PANIC message when we encounter a record with an RmgrId we don't recognize.
+ */
+void
+RmgrNotFound(RmgrId rmid)
+{
+	ereport(PANIC, (errmsg("resource manager with ID %d not registered", rmid),
+					errhint("Include the extension module that implements this resource manager in shared_preload_libraries.")));
+}
+
+/*
+ * Register a new custom rmgr.
+ *
+ * Refer to https://wiki.postgresql.org/wiki/ExtensibleRmgr to reserve a
+ * unique RmgrId for your extension, to avoid conflicts. During development,
+ * use RM_EXPERIMENTAL_ID.
+ */
+void
+RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr)
+{
+	if (rmgr->rm_name == NULL)
+		ereport(PANIC, errmsg("custom resource manager is invalid"));
+
+	if (rmid < RM_MIN_CUSTOM_ID)
+		ereport(PANIC, errmsg("custom resource manager ID %d is out of range", rmid));
+
+	if (!process_shared_preload_libraries_in_progress)
+		ereport(PANIC,
+				(errmsg("failed to register custom resource manager \"%s\" with ID %d", rmgr->rm_name, rmid),
+				 errdetail("Custom resource manager must be registered while initializing modules in shared_preload_libraries.")));
+
+	if (RmgrTable[rmid].rm_name != NULL)
+		ereport(PANIC,
+				(errmsg("failed to register custom resource manager \"%s\" with ID %d", rmgr->rm_name, rmid),
+				 errdetail("Custom resource manager \"%s\" already registered with the same ID.",
+						   RmgrTable[rmid].rm_name)));
+
+	/* check for existing rmgr with the same name */
+	for (int i = 0; i <= RM_MAX_ID; i++)
+	{
+		if (!RmgrExists(i))
+			continue;
+
+		if (!strcmp(RmgrTable[i].rm_name, rmgr->rm_name))
+			ereport(PANIC,
+				(errmsg("failed to register custom resource manager \"%s\" with ID %d", rmgr->rm_name, rmid),
+				 errdetail("Existing resource manager with ID %d has the same name.", i)));
+	}
+
+	/* register it */
+	RmgrTable[rmid] = *rmgr;
+	ereport(LOG,
+			(errmsg("registered custom resource manager \"%s\" with ID %d",
+					rmgr->rm_name, rmid)));
+}
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index e437c429920..161cf13fed2 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -1102,7 +1102,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
 							  (uint32) SizeOfXLogRecord, record->xl_tot_len);
 		return false;
 	}
-	if (record->xl_rmid > RM_MAX_ID)
+	if (!RMID_IS_VALID(record->xl_rmid))
 	{
 		report_invalid_record(state,
 							  "invalid resource manager ID %u at %X/%X",
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 8d2395dae25..999e24fcf2c 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -1531,7 +1531,6 @@ ShutdownWalRecovery(void)
 void
 PerformWalRecovery(void)
 {
-	int			rmid;
 	XLogRecord *record;
 	bool		reachedRecoveryTarget = false;
 	TimeLineID	replayTLI;
@@ -1604,12 +1603,7 @@ PerformWalRecovery(void)
 
 		InRedo = true;
 
-		/* Initialize resource managers */
-		for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
-		{
-			if (RmgrTable[rmid].rm_startup != NULL)
-				RmgrTable[rmid].rm_startup();
-		}
+		RmgrStartup();
 
 		ereport(LOG,
 				(errmsg("redo starts at %X/%X",
@@ -1746,12 +1740,7 @@ PerformWalRecovery(void)
 			}
 		}
 
-		/* Allow resource managers to do any required cleanup. */
-		for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
-		{
-			if (RmgrTable[rmid].rm_cleanup != NULL)
-				RmgrTable[rmid].rm_cleanup();
-		}
+		RmgrCleanup();
 
 		ereport(LOG,
 				(errmsg("redo done at %X/%X system usage: %s",
@@ -1871,7 +1860,7 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
 		xlogrecovery_redo(xlogreader, *replayTLI);
 
 	/* Now apply the WAL record itself */
-	RmgrTable[record->xl_rmid].rm_redo(xlogreader);
+	GetRmgr(record->xl_rmid).rm_redo(xlogreader);
 
 	/*
 	 * After redo, check whether the backup pages associated with the WAL
@@ -2101,20 +2090,20 @@ rm_redo_error_callback(void *arg)
 void
 xlog_outdesc(StringInfo buf, XLogReaderState *record)
 {
-	RmgrId		rmid = XLogRecGetRmid(record);
+	RmgrData	rmgr = GetRmgr(XLogRecGetRmid(record));
 	uint8		info = XLogRecGetInfo(record);
 	const char *id;
 
-	appendStringInfoString(buf, RmgrTable[rmid].rm_name);
+	appendStringInfoString(buf, rmgr.rm_name);
 	appendStringInfoChar(buf, '/');
 
-	id = RmgrTable[rmid].rm_identify(info);
+	id = rmgr.rm_identify(info);
 	if (id == NULL)
 		appendStringInfo(buf, "UNKNOWN (%X): ", info & ~XLR_INFO_MASK);
 	else
 		appendStringInfo(buf, "%s: ", id);
 
-	RmgrTable[rmid].rm_desc(buf, record);
+	rmgr.rm_desc(buf, record);
 }
 
 #ifdef WAL_DEBUG
@@ -2263,7 +2252,7 @@ getRecordTimestamp(XLogReaderState *record, TimestampTz *recordXtime)
 static void
 verifyBackupPageConsistency(XLogReaderState *record)
 {
-	RmgrId		rmid = XLogRecGetRmid(record);
+	RmgrData	rmgr = GetRmgr(XLogRecGetRmid(record));
 	RelFileNode rnode;
 	ForkNumber	forknum;
 	BlockNumber blkno;
@@ -2343,10 +2332,10 @@ verifyBackupPageConsistency(XLogReaderState *record)
 		 * If masking function is defined, mask both the primary and replay
 		 * images
 		 */
-		if (RmgrTable[rmid].rm_mask != NULL)
+		if (rmgr.rm_mask != NULL)
 		{
-			RmgrTable[rmid].rm_mask(replay_image_masked, blkno);
-			RmgrTable[rmid].rm_mask(primary_image_masked, blkno);
+			rmgr.rm_mask(replay_image_masked, blkno);
+			rmgr.rm_mask(primary_image_masked, blkno);
 		}
 
 		/* Time to compare the primary and replay images. */
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 77bc7aea7a0..c6ea7c98e15 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -94,7 +94,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 {
 	XLogRecordBuffer buf;
 	TransactionId txid;
-	RmgrId rmid;
+	RmgrData rmgr;
 
 	buf.origptr = ctx->reader->ReadRecPtr;
 	buf.endptr = ctx->reader->EndRecPtr;
@@ -115,10 +115,10 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 								 buf.origptr);
 	}
 
-	rmid = XLogRecGetRmid(record);
+	rmgr = GetRmgr(XLogRecGetRmid(record));
 
-	if (RmgrTable[rmid].rm_decode != NULL)
-		RmgrTable[rmid].rm_decode(ctx, &buf);
+	if (rmgr.rm_decode != NULL)
+		rmgr.rm_decode(ctx, &buf);
 	else
 	{
 		/* just deal with xid, and done */
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 9e8ab1420d9..5a08769e497 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -11769,26 +11769,40 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source)
 	{
 		char	   *tok = (char *) lfirst(l);
 		bool		found = false;
-		RmgrId		rmid;
+		int			rmid;
 
 		/* Check for 'all'. */
 		if (pg_strcasecmp(tok, "all") == 0)
 		{
-			for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
-				if (RmgrTable[rmid].rm_mask != NULL)
+			for (rmid = 0; rmid <= RM_MAX_BUILTIN_ID; rmid++)
+				if (GetRmgr(rmid).rm_mask != NULL)
 					newwalconsistency[rmid] = true;
 			found = true;
 		}
+		else if (sscanf(tok, "custom%03d", &rmid) == 1)
+		{
+			/*
+			 * The server hasn't loaded the shared_preload_libraries yet, so
+			 * we don't have access to the custom resource manager
+			 * names. Allow the form "custom###", where "###" is the 3-digit
+			 * resource manager ID.
+			 */
+			if (RMID_IS_CUSTOM(rmid))
+			{
+				newwalconsistency[rmid] = true;
+				found = true;
+			}
+		}
 		else
 		{
 			/*
 			 * Check if the token matches with any individual resource
 			 * manager.
 			 */
-			for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
+			for (rmid = 0; rmid <= RM_MAX_BUILTIN_ID; rmid++)
 			{
-				if (pg_strcasecmp(tok, RmgrTable[rmid].rm_name) == 0 &&
-					RmgrTable[rmid].rm_mask != NULL)
+				if (GetRmgr(rmid).rm_mask != NULL &&
+					pg_strcasecmp(tok, GetRmgr(rmid).rm_name) == 0)
 				{
 					newwalconsistency[rmid] = true;
 					found = true;
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 49966e7b7fd..dfa836d1561 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -25,8 +25,8 @@
 #include "pg_rewind.h"
 
 /*
- * RmgrNames is an array of resource manager names, to make error messages
- * a bit nicer.
+ * RmgrNames is an array of the built-in resource manager names, to make error
+ * messages a bit nicer.
  */
 #define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
   name,
@@ -35,6 +35,9 @@ static const char *RmgrNames[RM_MAX_ID + 1] = {
 #include "access/rmgrlist.h"
 };
 
+#define RmgrName(rmid) (((rmid) <= RM_MAX_BUILTIN_ID) ? \
+						RmgrNames[rmid] : "custom")
+
 static void extractPageInfo(XLogReaderState *record);
 
 static int	xlogreadfd = -1;
@@ -436,9 +439,9 @@ extractPageInfo(XLogReaderState *record)
 		 * track that change.
 		 */
 		pg_fatal("WAL record modifies a relation, but record type is not recognized: "
-				 "lsn: %X/%X, rmgr: %s, info: %02X",
+				 "lsn: %X/%X, rmid: %d, rmgr: %s, info: %02X",
 				 LSN_FORMAT_ARGS(record->ReadRecPtr),
-				 RmgrNames[rmid], info);
+				 rmid, RmgrName(rmid), info);
 	}
 
 	for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index 4cb40d068a9..4f47449a6cb 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -80,8 +80,8 @@ typedef struct XLogDumpStats
 	uint64		count;
 	XLogRecPtr	startptr;
 	XLogRecPtr	endptr;
-	Stats		rmgr_stats[RM_NEXT_ID];
-	Stats		record_stats[RM_NEXT_ID][MAX_XLINFO_TYPES];
+	Stats		rmgr_stats[RM_MAX_ID + 1];
+	Stats		record_stats[RM_MAX_ID + 1][MAX_XLINFO_TYPES];
 } XLogDumpStats;
 
 #define fatal_error(...) do { pg_log_fatal(__VA_ARGS__); exit(EXIT_FAILURE); } while(0)
@@ -104,9 +104,9 @@ print_rmgr_list(void)
 {
 	int			i;
 
-	for (i = 0; i <= RM_MAX_ID; i++)
+	for (i = 0; i <= RM_MAX_BUILTIN_ID; i++)
 	{
-		printf("%s\n", RmgrDescTable[i].rm_name);
+		printf("%s\n", GetRmgrDesc(i)->rm_name);
 	}
 }
 
@@ -535,7 +535,7 @@ static void
 XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record)
 {
 	const char *id;
-	const RmgrDescData *desc = &RmgrDescTable[XLogRecGetRmid(record)];
+	const RmgrDescData *desc = GetRmgrDesc(XLogRecGetRmid(record));
 	uint32		rec_len;
 	uint32		fpi_len;
 	RelFileNode rnode;
@@ -720,7 +720,7 @@ XLogDumpDisplayStats(XLogDumpConfig *config, XLogDumpStats *stats)
 	 * calculate column totals.
 	 */
 
-	for (ri = 0; ri < RM_NEXT_ID; ri++)
+	for (ri = 0; ri < RM_MAX_ID; ri++)
 	{
 		total_count += stats->rmgr_stats[ri].count;
 		total_rec_len += stats->rmgr_stats[ri].rec_len;
@@ -741,13 +741,18 @@ XLogDumpDisplayStats(XLogDumpConfig *config, XLogDumpStats *stats)
 		   "Type", "N", "(%)", "Record size", "(%)", "FPI size", "(%)", "Combined size", "(%)",
 		   "----", "-", "---", "-----------", "---", "--------", "---", "-------------", "---");
 
-	for (ri = 0; ri < RM_NEXT_ID; ri++)
+	for (ri = 0; ri <= RM_MAX_ID; ri++)
 	{
 		uint64		count,
 					rec_len,
 					fpi_len,
 					tot_len;
-		const RmgrDescData *desc = &RmgrDescTable[ri];
+		const RmgrDescData *desc;
+
+		if (!RMID_IS_VALID(ri))
+			continue;
+
+		desc = GetRmgrDesc(ri);
 
 		if (!config->stats_per_record)
 		{
@@ -756,6 +761,9 @@ XLogDumpDisplayStats(XLogDumpConfig *config, XLogDumpStats *stats)
 			fpi_len = stats->rmgr_stats[ri].fpi_len;
 			tot_len = rec_len + fpi_len;
 
+			if (RMID_IS_CUSTOM(ri) && count == 0)
+				continue;
+
 			XLogDumpStatsRow(desc->rm_name,
 							 count, total_count, rec_len, total_rec_len,
 							 fpi_len, total_fpi_len, tot_len, total_len);
@@ -1000,7 +1008,7 @@ main(int argc, char **argv)
 				break;
 			case 'r':
 				{
-					int			i;
+					int			rmid;
 
 					if (pg_strcasecmp(optarg, "list") == 0)
 					{
@@ -1008,20 +1016,42 @@ main(int argc, char **argv)
 						exit(EXIT_SUCCESS);
 					}
 
-					for (i = 0; i <= RM_MAX_ID; i++)
+					/*
+					 * First look for the generated name of a custom rmgr, of
+					 * the form "custom###". We accept this form, because the
+					 * custom rmgr module is not loaded, so there's no way to
+					 * know the real name. This convention should be
+					 * consistent with that in rmgrdesc.c.
+					 */
+					if (sscanf(optarg, "custom%03d", &rmid) == 1)
 					{
-						if (pg_strcasecmp(optarg, RmgrDescTable[i].rm_name) == 0)
+						if (!RMID_IS_CUSTOM(rmid))
 						{
-							config.filter_by_rmgr[i] = true;
-							config.filter_by_rmgr_enabled = true;
-							break;
+							pg_log_error("custom resource manager \"%s\" does not exist",
+										 optarg);
+							goto bad_argument;
 						}
+						config.filter_by_rmgr[rmid] = true;
+						config.filter_by_rmgr_enabled = true;
 					}
-					if (i > RM_MAX_ID)
+					else
 					{
-						pg_log_error("resource manager \"%s\" does not exist",
-									 optarg);
-						goto bad_argument;
+						/* then look for builtin rmgrs */
+						for (rmid = 0; rmid <= RM_MAX_BUILTIN_ID; rmid++)
+						{
+							if (pg_strcasecmp(optarg, GetRmgrDesc(rmid)->rm_name) == 0)
+							{
+								config.filter_by_rmgr[rmid] = true;
+								config.filter_by_rmgr_enabled = true;
+								break;
+							}
+						}
+						if (rmid > RM_MAX_BUILTIN_ID)
+						{
+							pg_log_error("resource manager \"%s\" does not exist",
+										 optarg);
+							goto bad_argument;
+						}
 					}
 				}
 				break;
diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c
index 6a4ebd1310b..7871a16b08a 100644
--- a/src/bin/pg_waldump/rmgrdesc.c
+++ b/src/bin/pg_waldump/rmgrdesc.c
@@ -35,6 +35,71 @@
 #define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
 	{ name, desc, identify},
 
-const RmgrDescData RmgrDescTable[RM_MAX_ID + 1] = {
+static const RmgrDescData RmgrDescTable[RM_MAX_BUILTIN_ID + 1] = {
 #include "access/rmgrlist.h"
 };
+
+/*
+ * We are unable to get the real name of a custom rmgr because the module is
+ * not loaded. Generate a table of numeric names of the form "custom###" where
+ * "###" is the 3-digit resource manager ID.
+ */
+#define CUSTOM_NUMERIC_NAME_LEN sizeof("custom###")
+
+static char CustomNumericNames[RM_N_CUSTOM_IDS][CUSTOM_NUMERIC_NAME_LEN] = {0};
+static RmgrDescData CustomRmgrDesc[RM_N_CUSTOM_IDS] = {0};
+static bool CustomRmgrDescInitialized = false;
+
+/*
+ * No information on custom resource managers; just print the ID.
+ */
+static void
+default_desc(StringInfo buf, XLogReaderState *record)
+{
+	appendStringInfo(buf, "rmid: %d", XLogRecGetRmid(record));
+}
+
+/*
+ * No information on custom resource managers; just return NULL and let the
+ * caller handle it.
+ */
+static const char *
+default_identify(uint8 info)
+{
+	return NULL;
+}
+
+/*
+ * pg_waldump does not load the modules that register the resource managers;
+ * therefore we are missing the RmgrDesc information. Generate phony data
+ * where the name follows the convention "custom###" where ### is the 3-digit
+ * rmgr ID.
+ */
+static void
+initialize_custom_rmgrs(void)
+{
+	for (int i = 0; i < RM_N_CUSTOM_IDS; i++)
+	{
+		snprintf(CustomNumericNames[i], CUSTOM_NUMERIC_NAME_LEN,
+				 "custom%03d", i + RM_MIN_CUSTOM_ID);
+		CustomRmgrDesc[i].rm_name = CustomNumericNames[i];
+		CustomRmgrDesc[i].rm_desc = default_desc;
+		CustomRmgrDesc[i].rm_identify = default_identify;
+	}
+	CustomRmgrDescInitialized = true;
+}
+
+const RmgrDescData *
+GetRmgrDesc(RmgrId rmid)
+{
+	Assert(RMID_IS_VALID(rmid));
+
+	if (RMID_IS_BUILTIN(rmid))
+		return &RmgrDescTable[rmid];
+	else
+	{
+		if (!CustomRmgrDescInitialized)
+			initialize_custom_rmgrs();
+		return &CustomRmgrDesc[rmid - RM_MIN_CUSTOM_ID];
+	}
+}
diff --git a/src/bin/pg_waldump/rmgrdesc.h b/src/bin/pg_waldump/rmgrdesc.h
index 42f8483b482..f733cd467d5 100644
--- a/src/bin/pg_waldump/rmgrdesc.h
+++ b/src/bin/pg_waldump/rmgrdesc.h
@@ -18,6 +18,6 @@ typedef struct RmgrDescData
 	const char *(*rm_identify) (uint8 info);
 } RmgrDescData;
 
-extern const RmgrDescData RmgrDescTable[];
+extern const RmgrDescData *GetRmgrDesc(RmgrId rmid);
 
 #endif							/* RMGRDESC_H */
diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h
index d9b512630ca..dfa8521997c 100644
--- a/src/include/access/rmgr.h
+++ b/src/include/access/rmgr.h
@@ -30,6 +30,19 @@ typedef enum RmgrIds
 
 #undef PG_RMGR
 
-#define RM_MAX_ID				(RM_NEXT_ID - 1)
+#define RM_MAX_ID				UINT8_MAX
+#define RM_MAX_BUILTIN_ID		(RM_NEXT_ID - 1)
+#define RM_MIN_CUSTOM_ID		128
+#define RM_N_CUSTOM_IDS			(RM_MAX_ID - RM_MIN_CUSTOM_ID + 1)
+#define RMID_IS_BUILTIN(rmid)	((rmid) <= RM_MAX_BUILTIN_ID)
+#define RMID_IS_CUSTOM(rmid)	((rmid) >= RM_MIN_CUSTOM_ID && (rmid) < RM_MAX_ID)
+#define RMID_IS_VALID(rmid)		(RMID_IS_BUILTIN((rmid)) || RMID_IS_CUSTOM((rmid)))
+
+/*
+ * RmgrId to use for extensions that require an RmgrId, but are still in
+ * development and have not reserved their own unique RmgrId yet. See:
+ * https://wiki.postgresql.org/wiki/ExtensibleRmgr
+ */
+#define RM_EXPERIMENTAL_ID		128
 
 #endif							/* RMGR_H */
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index 0e94833129a..9370f346c83 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -304,7 +304,8 @@ struct XLogRecordBuffer;
  * rm_mask takes as input a page modified by the resource manager and masks
  * out bits that shouldn't be flagged by wal_consistency_checking.
  *
- * RmgrTable[] is indexed by RmgrId values (see rmgrlist.h).
+ * RmgrTable[] is indexed by RmgrId values (see rmgrlist.h). If rm_name is
+ * NULL, the structure is considered invalid.
  */
 typedef struct RmgrData
 {
@@ -319,7 +320,25 @@ typedef struct RmgrData
 							  struct XLogRecordBuffer *buf);
 } RmgrData;
 
-extern const RmgrData RmgrTable[];
+extern RmgrData RmgrTable[];
+extern void RmgrStartup(void);
+extern void RmgrCleanup(void);
+extern void RmgrNotFound(RmgrId rmid);
+extern void RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr);
+
+static inline bool
+RmgrExists(RmgrId rmid)
+{
+	return RmgrTable[rmid].rm_name != NULL;
+}
+
+static inline RmgrData
+GetRmgr(RmgrId rmid)
+{
+	if (unlikely(!RmgrExists(rmid)))
+		RmgrNotFound(rmid);
+	return RmgrTable[rmid];
+}
 
 /*
  * Exported to support xlog switching from checkpointer
-- 
2.17.1

#23Bharath Rupireddy
bharath.rupireddyforpostgres@gmail.com
In reply to: Jeff Davis (#22)
Re: Extensible Rmgr for Table AMs

On Tue, Apr 5, 2022 at 5:55 AM Jeff Davis <pgsql@j-davis.com> wrote:

On Mon, 2022-04-04 at 11:15 +0530, Bharath Rupireddy wrote:

1) Why can't rmid be chosen by the extensions in sequential order
from
(129 till 255), say, to start with a columnar extension can choose
129, another extension can register 130 and so on right?

I'm not sure what you mean by "chosen by the extensions in sequential
order". If you mean:

(b) that there should be a convention where people reserve IDs on the
wiki page in sequential order; then that's fine with me

Yes, I meant this. If we do this, then a global variable
current_max_rmid can be maintained (by RegisterCustomRmgr) and the
other functions RmgrStartup, RmgrCleanup and RegisterCustomRmgr can
avoid for-loops with full range RM_MAX_ID (for (int id = 0; id <
current_max_rmid; id++)).

3) Do we need to talk about extensible rmgrs and
https://wiki.postgresql.org/wiki/ExtensibleRmgr in documentation
somewhere? This will help extension developers to refer when needed.

We don't have user-facing documentation for every extension hook, and I
don't think it would be a good idea to document this one (at least in
the user-facing docs). Otherwise, we'd have to document the whole
resource manager interface, and that seems like way too much of the
internals.

With the custom rmgrs, now it's time to have it in the documentation
(probably not immediately, even after this patch gets in) describing
what rmgrs are, what are built-in and custom rmgrs and how and when an
extension needs to register, load and use rmgrs etc.

4) Do we need to change this description?
<para>
<literal>generic</literal>. Onl
superusers can change this setting.

Yes, you're right, I updated the docs for pg_waldump and the
wal_consistency_checking GUC.

+       <para>
+        Extensions may define additional resource managers with modules loaded
+        in <xref linkend="guc-shared-preload-libraries"/>. However, the names
+        of the custom resource managers are not available at the time the

+ errhint("Include the extension module that implements this resource
manager in shared_preload_libraries.")));

With the above, do you mean to say that only the extensions that load
their library via shared_preload_libraries are allowed to have custom
rmgrs? How about extensions using {session, local}_preload_libraries,
LOAD command? How about extensions that don't load the shared library
via {session, local, shared}_preload_libraries or LOAD command and let
any of its functions load the shared library on first use?

5) Do we need to add a PGDLLIMPORT qualifier to RegisterCustomRmgr()
(possibly for RmgrStartup, RmgrTable, RmgrCleanup as well?) so that
the Windows versions of extensions can use this feature?

That seems to only be required for variables, not functions.

You are right. For instance, the functions DefineCustomXXX are not
using PGDLLIMPORT qualifer, I believe they can be used by extensions
on WINDOWS.

6) Should the below strcmp pg_strcmp? The custom rmgrs can still use
rm_name with different cases and below code fail to catch it?
+ if (!strcmp(existing_rmgr->rm_name, rmgr->rm_name))
+ ereport(PANIC,
+ (errmsg("custom rmgr \"%s\" has the same name as builtin rmgr",
+ existing_rmgr->rm_name)));

There are already a lot of places where users can choose identifiers
that differ only in uppercase/lowercase. I don't see a reason to make
an exception here.

I think postgres parses the user-specified strings and converts them
to lowercase unless they are double-quoted, that is why we see strcmp,
not pg_strcasecmp. But, the custom rmgr name isn't parsed by postgres
right?

For instance - extension 1, with id 130 and name 'rmgr_foo' and
extension 2 with id 131 and name 'Rmgr_foo'/'RMGR_FOO'...., then the
following code may not catch it right? Do you want rmgr names to be
case-sensitive/insensitive?

+ if (!strcmp(RmgrTable[i].rm_name, rmgr->rm_name))
+ ereport(PANIC,
+ (errmsg("failed to register custom resource manager \"%s\" with ID
%d", rmgr->rm_name, rmid),
+ errdetail("Existing resource manager with ID %d has the same name.", i)));

8) Per https://wiki.postgresql.org/wiki/ExtensibleRmgr, 128 is
reserved, can we have it as a macro? Or something like
(RM_MAX_ID/2+1)

It's already defined as RM_EXPERIMENTAL_ID. Is that what you meant or
did I misunderstand?

+#define RM_MAX_ID UINT8_MAX
+#define RM_MAX_BUILTIN_ID (RM_NEXT_ID - 1)
+#define RM_MIN_CUSTOM_ID 128
+#define RM_N_CUSTOM_IDS (RM_MAX_ID - RM_MIN_CUSTOM_ID + 1)
+#define RMID_IS_BUILTIN(rmid) ((rmid) <= RM_MAX_BUILTIN_ID)
+#define RMID_IS_CUSTOM(rmid) ((rmid) >= RM_MIN_CUSTOM_ID && (rmid) < RM_MAX_ID)
+#define RMID_IS_VALID(rmid) (RMID_IS_BUILTIN((rmid)) || RMID_IS_CUSTOM((rmid)))

To me the above seems complex, why can't we just have the following,
it's okay even if we don't use some of the macros in the *.c files?
total rmgrs = 256
built in = 128 (index 0 - 127)
custom = 128 (index 128 - 255)

#define RM_N_IDS (UINT8_MAX + 1)
#define RM_N_BUILTIN_IDS (RM_N_IDS / 2)
#define RM_N_CUSTOM_IDS (RM_N_IDS / 2)
#define RM_MAX_ID (RM_N_IDS - 1)
#define RM_MAX_BUILTIN_ID (RM_NEXT_ID - 1)
#define RM_MIN_CUSTOM_ID (RM_N_IDS / 2)
#define RM_MAX_CUSTOM_ID RM_MAX_ID
#define RMID_IS_BUILTIN(rmid) ((rmid) <= RM_MAX_BUILTIN_ID)
#define RMID_IS_CUSTOM(rmid) ((rmid) >= RM_MIN_CUSTOM_ID && (rmid) <=
RM_MAX_CUSTOM_ID)
#define RMID_IS_VALID(rmid) (RMID_IS_BUILTIN((rmid)) || RMID_IS_CUSTOM((rmid)))

If okay, you can specify in a comment that out of UINT8_MAX + 1 rmgrs,
first half rmgr ids are supposed to be reserved for built in rmgrs and
second half rmgr ids for extensions.

I don't see the point in defining it as RM_MAX_ID/2+1.

9) Thinking if there's a way to test the code, maybe exposing
RegisterCustomRmgr as a function? I think we need to have a dummy
extension that will be used for testing this kind of patches/features
but that's a separate discussion IMO.

It's already exposed as a C function in xlog_internal.h.

You mean as a SQL function? I don't think that makes sense. It can only
be called while processing shared_preload_libraries (on server startup)
anyway.

Yes, an SQL function showing all of the available rmgrs. With the
custom rmgrs, I think, an SQL function like pg_resource_managers()
returning a set of {name, id, (if possible - loaded_by_extension_name,
rm_redo, rm_desc, rm_identify ... api names)}

Some more comments:

1) Is there any specific reason to have a function to emit a PANIC
message? And it's being used only in one place in GetRmgr.
+void
+RmgrNotFound(RmgrId rmid)
2) How about "If rm_name is NULL, the corresponding RmgrTable[] entry
is considered invalid."?
+ * RmgrTable[] is indexed by RmgrId values (see rmgrlist.h). If rm_name is
+ * NULL, the structure is considered invalid.
3) How about adding errdetail something like "Custom resource manager
ID must be between %d and %d (both inclusive)", RM_MAX_BUILTIN_ID,
RM_MAX_ID)?
+ if (rmid < RM_MIN_CUSTOM_ID)
+ ereport(PANIC, errmsg("custom resource manager ID %d is out of range", rmid));
4) How about adding errdetail something like "Custom resource manager
must have a name"
+ if (rmgr->rm_name == NULL)
+ ereport(PANIC, errmsg("custom resource manager is invalid"));

5) How about "successfully registered custom resource manager"?
+ (errmsg("registered custom resource manager \"%s\" with ID %d",

6) Shouldn't this be ri <= RM_NEXT_ID?
- for (ri = 0; ri < RM_NEXT_ID; ri++)
+ for (ri = 0; ri < RM_MAX_ID; ri++)
- for (ri = 0; ri < RM_NEXT_ID; ri++)
+ for (ri = 0; ri <= RM_MAX_ID; ri++)

7) Unrelated to this patch, why is there always an extra slot RM_MAX_ID + 1?

-const RmgrDescData RmgrDescTable[RM_MAX_ID + 1] = {
+static const RmgrDescData RmgrDescTable[RM_MAX_BUILTIN_ID + 1] = {
 #include "access/rmgrlist.h"
-const RmgrData RmgrTable[RM_MAX_ID + 1] = {
+RmgrData RmgrTable[RM_MAX_ID + 1] = {

Regards,
Bharath Rupireddy.

#24Jeff Davis
pgsql@j-davis.com
In reply to: Bharath Rupireddy (#23)
Re: Extensible Rmgr for Table AMs

On Tue, 2022-04-05 at 15:12 +0530, Bharath Rupireddy wrote:

Yes, I meant this. If we do this, then a global variable
current_max_rmid can be maintained (by RegisterCustomRmgr) and the
other functions RmgrStartup, RmgrCleanup and RegisterCustomRmgr can
avoid for-loops with full range RM_MAX_ID (for (int id = 0; id <
current_max_rmid; id++)).

There are only 255 entries, so the loops in those functions aren't a
significant overhead.

If we expand it, then we can come up a different solution. We can
change it easily enough later, because the WAL format changes between
releases and so do the APIs for any extension that might be using this.

With the custom rmgrs, now it's time to have it in the documentation
(probably not immediately, even after this patch gets in) describing
what rmgrs are, what are built-in and custom rmgrs and how and when
an
extension needs to register, load and use rmgrs etc.

Added a documentation section right after Generic WAL.

With the above, do you mean to say that only the extensions that load
their library via shared_preload_libraries are allowed to have custom
rmgrs? How about extensions using {session, local}_preload_libraries,
LOAD command? How about extensions that don't load the shared library
via {session, local, shared}_preload_libraries or LOAD command and
let
any of its functions load the shared library on first use?

Correct. The call to RegisterCustomRmgr *must* happen in the postmaster
before any other processes (even the startup process) are launched.
Otherwise recovery wouldn't work. The only place to make that happen is
shared_preload_libraries.

For instance - extension 1, with id 130 and name 'rmgr_foo' and
extension 2 with id 131 and name 'Rmgr_foo'/'RMGR_FOO'...., then the
following code may not catch it right? Do you want rmgr names to be
case-sensitive/insensitive?

You convinced me. pg_waldump.c uses pg_strcasecmp(), so I'll use it as
well.

#define RM_N_IDS (UINT8_MAX + 1)
#define RM_N_BUILTIN_IDS (RM_N_IDS / 2)
#define RM_N_CUSTOM_IDS (RM_N_IDS / 2)
#define RM_MAX_ID (RM_N_IDS - 1)
#define RM_MAX_BUILTIN_ID (RM_NEXT_ID - 1)
#define RM_MIN_CUSTOM_ID (RM_N_IDS / 2)
#define RM_MAX_CUSTOM_ID RM_MAX_ID
#define RMID_IS_BUILTIN(rmid) ((rmid) <= RM_MAX_BUILTIN_ID)
#define RMID_IS_CUSTOM(rmid) ((rmid) >= RM_MIN_CUSTOM_ID && (rmid) <=
RM_MAX_CUSTOM_ID)
#define RMID_IS_VALID(rmid) (RMID_IS_BUILTIN((rmid)) ||
RMID_IS_CUSTOM((rmid)))

I improved this a bit. But I didn't use your version that's based on
division, that was more confusing to me.

Yes, an SQL function showing all of the available rmgrs. With the
custom rmgrs, I think, an SQL function like pg_resource_managers()
returning a set of {name, id, (if possible -
loaded_by_extension_name,
rm_redo, rm_desc, rm_identify ... api names)}

Added.

Some more comments:

1) Is there any specific reason to have a function to emit a PANIC
message? And it's being used only in one place in GetRmgr.
+void
+RmgrNotFound(RmgrId rmid)

Good call -- better to raise an error, and it can get escalated if it
happens in the wrong place. Done.

2) How about "If rm_name is NULL, the corresponding RmgrTable[] entry
is considered invalid."?
+ * RmgrTable[] is indexed by RmgrId values (see rmgrlist.h). If
rm_name is
+ * NULL, the structure is considered invalid.

Done.

3) How about adding errdetail something like "Custom resource manager
ID must be between %d and %d (both inclusive)", RM_MAX_BUILTIN_ID,
RM_MAX_ID)?
+ if (rmid < RM_MIN_CUSTOM_ID)
+ ereport(PANIC, errmsg("custom resource manager ID %d is out of
range", rmid));

Done.

4) How about adding errdetail something like "Custom resource manager
must have a name"
+ if (rmgr->rm_name == NULL)
+ ereport(PANIC, errmsg("custom resource manager is invalid"));

Improved this message.

5) How about "successfully registered custom resource manager"?
+ (errmsg("registered custom resource manager \"%s\" with ID %d",

That's a little much, we don't want to sound too excited that it worked
;-)

6) Shouldn't this be ri <= RM_NEXT_ID?
- for (ri = 0; ri < RM_NEXT_ID; ri++)
+ for (ri = 0; ri < RM_MAX_ID; ri++)
- for (ri = 0; ri < RM_NEXT_ID; ri++)
+ for (ri = 0; ri <= RM_MAX_ID; ri++)

That would leave out the custom rmgrs.

7) Unrelated to this patch, why is there always an extra slot
RM_MAX_ID + 1?

RM_MAX_ID is 255, but we need 256 entries from 0-255 (inclusive).

Good suggestions, thank you for the review!

I'm happy with this patch and I committed it. Changes from last
version:

1. I came up with a better solution (based on a suggestion from
Andres) to handle wal_consistency_checking properly. We can't
understand the custom resource managers when the configuration file is
first loaded, so if we enounter an unknown resource manager, I set a
flag and reprocess it after the shared_preload_libraries are loaded.

2. Added pg_get_wal_resource_managers().

3. Documentation.

Regards,
Jeff Davis

#25Andres Freund
andres@anarazel.de
In reply to: Jeff Davis (#24)
Re: Extensible Rmgr for Table AMs

Hi,

On 2022-04-06 23:15:27 -0700, Jeff Davis wrote:

I'm happy with this patch and I committed it.

That caused breakage with WAL_DEBUG enabled. Fixed that. But it was
half-broken before, at least since 70e81861fad, because 'rmid' didn't refer to
the current record's rmid anymore, but to rmid from "Initialize resource
managers" - a constant.

Causes plenty new warnings here:

In file included from /home/andres/src/postgresql/src/include/access/xlogrecord.h:14,
from /home/andres/src/postgresql/src/include/access/xlogreader.h:41,
from /home/andres/src/postgresql/src/include/access/brin_xlog.h:17,
from /home/andres/src/postgresql/src/backend/access/transam/rmgr.c:10:
/home/andres/src/postgresql/src/backend/access/transam/rmgr.c: In function ‘RegisterCustomRmgr’:
/home/andres/src/postgresql/src/include/access/rmgr.h:42:66: warning: comparison is always true due to limited range of data type [-Wtype-limits]
42 | (rmid) <= RM_MAX_CUSTOM_ID)
| ^~
/home/andres/src/postgresql/src/backend/access/transam/rmgr.c:104:14: note: in expansion of macro ‘RMID_IS_CUSTOM’
104 | if (!RMID_IS_CUSTOM(rmid))
| ^~~~~~~~~~~~~~
In file included from /home/andres/src/postgresql/src/include/c.h:814,
from /home/andres/src/postgresql/src/include/postgres.h:46,
from /home/andres/src/postgresql/src/bin/pg_waldump/rmgrdesc.c:9:
/home/andres/src/postgresql/src/bin/pg_waldump/rmgrdesc.c: In function ‘GetRmgrDesc’:
/home/andres/src/postgresql/src/include/access/rmgr.h:42:66: warning: comparison is always true due to limited range of data type [-Wtype-limits]
42 | (rmid) <= RM_MAX_CUSTOM_ID)
| ^~
/home/andres/src/postgresql/src/bin/pg_waldump/rmgrdesc.c:89:9: note: in expansion of macro ‘Assert’
89 | Assert(RMID_IS_VALID(rmid));
| ^~~~~~
/home/andres/src/postgresql/src/include/access/rmgr.h:43:57: note: in expansion of macro ‘RMID_IS_CUSTOM’
43 | #define RMID_IS_VALID(rmid) (RMID_IS_BUILTIN((rmid)) || RMID_IS_CUSTOM((rmid)))
| ^~~~~~~~~~~~~~
/home/andres/src/postgresql/src/bin/pg_waldump/rmgrdesc.c:89:16: note: in expansion of macro ‘RMID_IS_VALID’
89 | Assert(RMID_IS_VALID(rmid));
| ^~~~~~~~~~~~~
/home/andres/src/postgresql/src/include/access/rmgr.h:42:66: warning: comparison is always true due to limited range of data type [-Wtype-limits]
42 | (rmid) <= RM_MAX_CUSTOM_ID)
| ^~
/home/andres/src/postgresql/src/bin/pg_waldump/rmgrdesc.c:89:9: note: in expansion of macro ‘Assert’
89 | Assert(RMID_IS_VALID(rmid));
| ^~~~~~
/home/andres/src/postgresql/src/include/access/rmgr.h:43:57: note: in expansion of macro ‘RMID_IS_CUSTOM’
43 | #define RMID_IS_VALID(rmid) (RMID_IS_BUILTIN((rmid)) || RMID_IS_CUSTOM((rmid)))
| ^~~~~~~~~~~~~~
/home/andres/src/postgresql/src/bin/pg_waldump/rmgrdesc.c:89:16: note: in expansion of macro ‘RMID_IS_VALID’
89 | Assert(RMID_IS_VALID(rmid));
| ^~~~~~~~~~~~~

Greetings,

Andres Freund

#26Andres Freund
andres@anarazel.de
In reply to: Andres Freund (#25)
Re: Extensible Rmgr for Table AMs

Hi,

On 2022-04-06 23:35:05 -0700, Andres Freund wrote:

Causes plenty new warnings here:

And my machine isn't alone. There's also:
https://buildfarm.postgresql.org/cgi-bin/show_log.pl?nm=lapwing&amp;dt=2022-04-07%2006%3A40%3A14

rmgrdesc.c:44:1: error: missing braces around initializer [-Werror=missing-braces]
rmgrdesc.c:44:1: error: (near initialization for 'CustomNumericNames[0]') [-Werror=missing-braces]
rmgrdesc.c:45:1: error: missing braces around initializer [-Werror=missing-braces]
rmgrdesc.c:45:1: error: (near initialization for 'CustomRmgrDesc[0]') [-Werror=missing-braces]
cc1: all warnings being treated as errors

Greetings,

Andres Freund

#27Andres Freund
andres@anarazel.de
In reply to: Andres Freund (#26)
Re: Extensible Rmgr for Table AMs

Hi,

On 2022-04-06 23:56:40 -0700, Andres Freund wrote:

On 2022-04-06 23:35:05 -0700, Andres Freund wrote:

Causes plenty new warnings here:

And my machine isn't alone. There's also:
https://buildfarm.postgresql.org/cgi-bin/show_log.pl?nm=lapwing&amp;dt=2022-04-07%2006%3A40%3A14

rmgrdesc.c:44:1: error: missing braces around initializer [-Werror=missing-braces]
rmgrdesc.c:44:1: error: (near initialization for 'CustomNumericNames[0]') [-Werror=missing-braces]
rmgrdesc.c:45:1: error: missing braces around initializer [-Werror=missing-braces]
rmgrdesc.c:45:1: error: (near initialization for 'CustomRmgrDesc[0]') [-Werror=missing-braces]
cc1: all warnings being treated as errors

(pushing a fix for this one in a few)

There's also:

https://buildfarm.postgresql.org/cgi-bin/show_log.pl?nm=wrasse&amp;dt=2022-04-07%2006%3A49%3A14

ccache /opt/developerstudio12.6/bin/cc -m64 -Xa -v -O findtimezone.o initdb.o localtime.o -L../../../src/port -L../../../src/common -L../../../src/fe_utils -lpgfeutils -L../../../src/common -lpgcommon -L../../../src/port -lpgport -L../../../src/interfaces/libpq -lpq -L/home/nm/sw/nopath/uuid-64/lib -L/home/nm/sw/nopath/openldap-64/lib -Wl,--as-needed -Wl,-R'/home/nm/farm/studio64v12_6/HEAD/inst/lib' -lpgcommon -lpgport -lxslt -lxml2 -lpam -lssl -lcrypto -lgssapi_krb5 -lz -lreadline -ltermcap -lnsl -lsocket -lm -o initdb
Undefined first referenced
symbol in file
RmgrTable initdb.o

IIRC sun studio is annoying because it emits references to global variables
when they're mentioned in a static inline, even if that inline is unused.

Greetings,

Andres Freund

#28Andres Freund
andres@anarazel.de
In reply to: Andres Freund (#27)
Re: Extensible Rmgr for Table AMs

Hi,

On 2022-04-07 00:45:34 -0700, Andres Freund wrote:

On 2022-04-06 23:56:40 -0700, Andres Freund wrote:
https://buildfarm.postgresql.org/cgi-bin/show_log.pl?nm=wrasse&amp;dt=2022-04-07%2006%3A49%3A14

ccache /opt/developerstudio12.6/bin/cc -m64 -Xa -v -O findtimezone.o initdb.o localtime.o -L../../../src/port -L../../../src/common -L../../../src/fe_utils -lpgfeutils -L../../../src/common -lpgcommon -L../../../src/port -lpgport -L../../../src/interfaces/libpq -lpq -L/home/nm/sw/nopath/uuid-64/lib -L/home/nm/sw/nopath/openldap-64/lib -Wl,--as-needed -Wl,-R'/home/nm/farm/studio64v12_6/HEAD/inst/lib' -lpgcommon -lpgport -lxslt -lxml2 -lpam -lssl -lcrypto -lgssapi_krb5 -lz -lreadline -ltermcap -lnsl -lsocket -lm -o initdb
Undefined first referenced
symbol in file
RmgrTable initdb.o

IIRC sun studio is annoying because it emits references to global variables
when they're mentioned in a static inline, even if that inline is unused.

Looks like a #ifndef FRONTEND around the inlines should work? Or just go back
to the macros, but without the RM_MAX_CUSTOM_ID check that's implied due to
the type width anyway.

Greetings,

Andres Freund

#29Bharath Rupireddy
bharath.rupireddyforpostgres@gmail.com
In reply to: Jeff Davis (#24)
Re: Extensible Rmgr for Table AMs

On Thu, Apr 7, 2022 at 11:45 AM Jeff Davis <pgsql@j-davis.com> wrote:

I'm happy with this patch and I committed it. Changes from last
version:

Hi Jeff, I think there's a typo [1] in pg_waldump.c, we might miss to
take account of the last custom resource mgr stats. Please fix it.

diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index 7d92dcaf87..30ca7684bd 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -720,7 +720,7 @@ XLogDumpDisplayStats(XLogDumpConfig *config,
XLogDumpStats *stats)
         * calculate column totals.
         */
-       for (ri = 0; ri < RM_MAX_ID; ri++)
+       for (ri = 0; ri <= RM_MAX_ID; ri++)
        {
                total_count += stats->rmgr_stats[ri].count;
                total_rec_len += stats->rmgr_stats[ri].rec_len;

Regards,
Bharath Rupireddy.

#30Tom Lane
tgl@sss.pgh.pa.us
In reply to: Jeff Davis (#24)
Re: Extensible Rmgr for Table AMs

Jeff Davis <pgsql@j-davis.com> writes:

I'm happy with this patch and I committed it.

wrasse is less happy:

ccache /opt/developerstudio12.6/bin/cc -m64 -Xa -v -O findtimezone.o initdb.o localtime.o -L../../../src/port -L../../../src/common -L../../../src/fe_utils -lpgfeutils -L../../../src/common -lpgcommon -L../../../src/port -lpgport -L../../../src/interfaces/libpq -lpq -L/home/nm/sw/nopath/uuid-64/lib -L/home/nm/sw/nopath/openldap-64/lib -Wl,--as-needed -Wl,-R'/home/nm/farm/studio64v12_6/HEAD/inst/lib' -lpgcommon -lpgport -lxslt -lxml2 -lpam -lssl -lcrypto -lgssapi_krb5 -lz -lreadline -ltermcap -lnsl -lsocket -lm -o initdb
Undefined first referenced
symbol in file
RmgrTable initdb.o
ld: fatal: symbol referencing errors

I think this means the static inline functions you added in
xlog_internal.h need to be wrapped in #ifndef FRONTEND.

regards, tom lane