diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index 58091f6b520..354f1033bf7 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -24,15 +24,102 @@
 #include "commands/dbcommands_xlog.h"
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
+#include "miscadmin.h"
+#include "replication/decode.h"
 #include "replication/message.h"
 #include "replication/origin.h"
 #include "storage/standby.h"
+#include "utils/memutils.h"
 #include "utils/relmapper.h"
 
+typedef struct CustomRmgrEntry {
+	RmgrId rmid;
+	RmgrData *rmgr;
+} CustomRmgrEntry;
+
 /* must be kept in sync with RmgrData definition in xlog_internal.h */
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
-	{ name, redo, desc, identify, startup, cleanup, mask },
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
+	{ name, redo, desc, identify, startup, cleanup, mask, decode },
 
 const RmgrData RmgrTable[RM_MAX_ID + 1] = {
 #include "access/rmgrlist.h"
 };
+
+static CustomRmgrEntry *CustomRmgrTable = NULL;
+static int NumCustomRmgrs = 0;
+
+/*
+ * Register a new custom rmgr.
+ *
+ * Refer to https://wiki.postgresql.org/wiki/ExtensibleRmgr to reserve a
+ * unique RmgrId for your extension, to avoid conflicts. During development,
+ * use RM_EXPERIMENTAL_ID.
+ */
+void
+RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr)
+{
+	if (!process_shared_preload_libraries_in_progress)
+		elog(ERROR, "custom rmgr must be registered while initializing extensions in shared_preload_libraries");
+
+	elog(LOG, "registering customer rmgr \"%s\" with ID %d",
+		 rmgr->rm_name, rmid);
+
+	if (rmid < RM_CUSTOM_MIN_ID || rmid > RM_CUSTOM_MAX_ID)
+		elog(PANIC, "custom rmgr id %d out of range", rmid);
+
+	if (CustomRmgrTable == NULL)
+		CustomRmgrTable = MemoryContextAllocZero(
+			TopMemoryContext, sizeof(CustomRmgrEntry));
+
+	/* check for existing builtin rmgr with the same name */
+	for (int i = 0; i <= RM_MAX_ID; i++)
+	{
+		const RmgrData *existing_rmgr = &RmgrTable[i];
+
+		if (!strcmp(existing_rmgr->rm_name, rmgr->rm_name))
+			elog(PANIC, "custom rmgr \"%s\" has the same name as builtin rmgr",
+				 existing_rmgr->rm_name);
+	}
+
+	/* check for conflicting custom rmgrs already registered */
+	for (int i = 0; i < NumCustomRmgrs; i++)
+	{
+		CustomRmgrEntry entry = CustomRmgrTable[i];
+
+		if (entry.rmid == rmid)
+			elog(PANIC, "custom rmgr ID %d already registered with name \"%s\"",
+				 rmid, entry.rmgr->rm_name);
+
+		if (!strcmp(entry.rmgr->rm_name, rmgr->rm_name))
+			elog(PANIC, "custom rmgr \"%s\" already registered with ID %d",
+				 rmgr->rm_name, entry.rmid);
+	}
+
+	CustomRmgrTable = (CustomRmgrEntry *) repalloc(
+		CustomRmgrTable, sizeof(CustomRmgrEntry) * NumCustomRmgrs + 1);
+
+	CustomRmgrTable[NumCustomRmgrs].rmid = rmid;
+	CustomRmgrTable[NumCustomRmgrs].rmgr = rmgr;
+	NumCustomRmgrs++;
+}
+
+/*
+ * GetCustomRmgr
+ *
+ * This is an O(N) list traversal because the expected size is very small.
+ */
+RmgrData
+GetCustomRmgr(RmgrId rmid)
+{
+	if (rmid < RM_CUSTOM_MIN_ID || rmid > RM_CUSTOM_MAX_ID)
+		elog(PANIC, "custom rmgr id %d out of range", rmid);
+
+	for (int i = 0; i < NumCustomRmgrs; i++)
+	{
+		CustomRmgrEntry entry = CustomRmgrTable[i];
+		if (entry.rmid == rmid)
+			return *entry.rmgr;
+	}
+
+	elog(PANIC, "custom rmgr with ID %d not found!", rmid);
+}
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 6208e123e5d..07f715f0d9f 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -1505,10 +1505,10 @@ checkXLogConsistency(XLogReaderState *record)
 		 * If masking function is defined, mask both the primary and replay
 		 * images
 		 */
-		if (RmgrTable[rmid].rm_mask != NULL)
+		if (GetRmgr(rmid).rm_mask != NULL)
 		{
-			RmgrTable[rmid].rm_mask(replay_image_masked, blkno);
-			RmgrTable[rmid].rm_mask(primary_image_masked, blkno);
+			GetRmgr(rmid).rm_mask(replay_image_masked, blkno);
+			GetRmgr(rmid).rm_mask(primary_image_masked, blkno);
 		}
 
 		/* Time to compare the primary and replay images. */
@@ -7292,8 +7292,8 @@ StartupXLOG(void)
 		/* Initialize resource managers */
 		for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
 		{
-			if (RmgrTable[rmid].rm_startup != NULL)
-				RmgrTable[rmid].rm_startup();
+			if (GetRmgr(rmid).rm_startup != NULL)
+				GetRmgr(rmid).rm_startup();
 		}
 
 		/*
@@ -7518,7 +7518,7 @@ StartupXLOG(void)
 					RecordKnownAssignedTransactionIds(record->xl_xid);
 
 				/* Now apply the WAL record itself */
-				RmgrTable[record->xl_rmid].rm_redo(xlogreader);
+				GetRmgr(record->xl_rmid).rm_redo(xlogreader);
 
 				/*
 				 * After redo, check whether the backup pages associated with
@@ -7628,8 +7628,8 @@ StartupXLOG(void)
 			/* Allow resource managers to do any required cleanup. */
 			for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
 			{
-				if (RmgrTable[rmid].rm_cleanup != NULL)
-					RmgrTable[rmid].rm_cleanup();
+				if (GetRmgr(rmid).rm_cleanup != NULL)
+					GetRmgr(rmid).rm_cleanup();
 			}
 
 			ereport(LOG,
@@ -10663,16 +10663,16 @@ xlog_outdesc(StringInfo buf, XLogReaderState *record)
 	uint8		info = XLogRecGetInfo(record);
 	const char *id;
 
-	appendStringInfoString(buf, RmgrTable[rmid].rm_name);
+	appendStringInfoString(buf, GetRmgr(rmid).rm_name);
 	appendStringInfoChar(buf, '/');
 
-	id = RmgrTable[rmid].rm_identify(info);
+	id = GetRmgr(rmid).rm_identify(info);
 	if (id == NULL)
 		appendStringInfo(buf, "UNKNOWN (%X): ", info & ~XLR_INFO_MASK);
 	else
 		appendStringInfo(buf, "%s: ", id);
 
-	RmgrTable[rmid].rm_desc(buf, record);
+	GetRmgr(rmid).rm_desc(buf, record);
 }
 
 
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index f01aea6ddad..6e2d40695ac 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -735,7 +735,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
 							  (uint32) SizeOfXLogRecord, record->xl_tot_len);
 		return false;
 	}
-	if (record->xl_rmid > RM_MAX_ID)
+	if (record->xl_rmid > RM_MAX_ID && record->xl_rmid < RM_CUSTOM_MIN_ID)
 	{
 		report_invalid_record(state,
 							  "invalid resource manager ID %u at %X/%X",
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index daf2efb0d83..67e0107b191 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -43,21 +43,6 @@
 #include "replication/snapbuild.h"
 #include "storage/standby.h"
 
-typedef struct XLogRecordBuffer
-{
-	XLogRecPtr	origptr;
-	XLogRecPtr	endptr;
-	XLogReaderState *record;
-} XLogRecordBuffer;
-
-/* RMGR Handlers */
-static void DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-
 /* individual record(group)'s handlers */
 static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -107,6 +92,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 {
 	XLogRecordBuffer buf;
 	TransactionId txid;
+	RmgrId rmid;
 
 	buf.origptr = ctx->reader->ReadRecPtr;
 	buf.endptr = ctx->reader->EndRecPtr;
@@ -127,72 +113,23 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 								 buf.origptr);
 	}
 
-	/* cast so we get a warning when new rmgrs are added */
-	switch ((RmgrId) XLogRecGetRmid(record))
-	{
-			/*
-			 * Rmgrs we care about for logical decoding. Add new rmgrs in
-			 * rmgrlist.h's order.
-			 */
-		case RM_XLOG_ID:
-			DecodeXLogOp(ctx, &buf);
-			break;
-
-		case RM_XACT_ID:
-			DecodeXactOp(ctx, &buf);
-			break;
+	rmid = XLogRecGetRmid(record);
 
-		case RM_STANDBY_ID:
-			DecodeStandbyOp(ctx, &buf);
-			break;
-
-		case RM_HEAP2_ID:
-			DecodeHeap2Op(ctx, &buf);
-			break;
-
-		case RM_HEAP_ID:
-			DecodeHeapOp(ctx, &buf);
-			break;
-
-		case RM_LOGICALMSG_ID:
-			DecodeLogicalMsgOp(ctx, &buf);
-			break;
-
-			/*
-			 * Rmgrs irrelevant for logical decoding; they describe stuff not
-			 * represented in logical decoding. Add new rmgrs in rmgrlist.h's
-			 * order.
-			 */
-		case RM_SMGR_ID:
-		case RM_CLOG_ID:
-		case RM_DBASE_ID:
-		case RM_TBLSPC_ID:
-		case RM_MULTIXACT_ID:
-		case RM_RELMAP_ID:
-		case RM_BTREE_ID:
-		case RM_HASH_ID:
-		case RM_GIN_ID:
-		case RM_GIST_ID:
-		case RM_SEQ_ID:
-		case RM_SPGIST_ID:
-		case RM_BRIN_ID:
-		case RM_COMMIT_TS_ID:
-		case RM_REPLORIGIN_ID:
-		case RM_GENERIC_ID:
-			/* just deal with xid, and done */
-			ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
-									buf.origptr);
-			break;
-		case RM_NEXT_ID:
-			elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record));
+	if (GetRmgr(rmid).rm_decode != NULL)
+		GetRmgr(rmid).rm_decode(ctx, &buf);
+	else
+	{
+		/* just deal with xid, and done */
+		ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
+								buf.origptr);
 	}
 }
 
 /*
  * Handle rmgr XLOG_ID records for DecodeRecordIntoReorderBuffer().
  */
-static void
-DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	SnapBuild  *builder = ctx->snapshot_builder;
 	uint8		info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
@@ -234,8 +171,8 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 /*
  * Handle rmgr XACT_ID records for DecodeRecordIntoReorderBuffer().
  */
-static void
-DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	SnapBuild  *builder = ctx->snapshot_builder;
 	ReorderBuffer *reorder = ctx->reorder;
@@ -392,8 +329,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 /*
  * Handle rmgr STANDBY_ID records for DecodeRecordIntoReorderBuffer().
  */
-static void
-DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	SnapBuild  *builder = ctx->snapshot_builder;
 	XLogReaderState *r = buf->record;
@@ -438,8 +375,8 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 /*
  * Handle rmgr HEAP2_ID records for DecodeRecordIntoReorderBuffer().
  */
-static void
-DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	uint8		info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
 	TransactionId xid = XLogRecGetXid(buf->record);
@@ -498,8 +435,8 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 /*
  * Handle rmgr HEAP_ID records for DecodeRecordIntoReorderBuffer().
  */
-static void
-DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	uint8		info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
 	TransactionId xid = XLogRecGetXid(buf->record);
@@ -620,8 +557,8 @@ FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
 /*
  * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
  */
-static void
-DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	SnapBuild  *builder = ctx->snapshot_builder;
 	XLogReaderState *r = buf->record;
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 6652a60ec31..c7b640befb8 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -11599,7 +11599,7 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source)
 		if (pg_strcasecmp(tok, "all") == 0)
 		{
 			for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
-				if (RmgrTable[rmid].rm_mask != NULL)
+				if (GetRmgr(rmid).rm_mask != NULL)
 					newwalconsistency[rmid] = true;
 			found = true;
 		}
@@ -11611,8 +11611,8 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source)
 			 */
 			for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
 			{
-				if (pg_strcasecmp(tok, RmgrTable[rmid].rm_name) == 0 &&
-					RmgrTable[rmid].rm_mask != NULL)
+				if (pg_strcasecmp(tok, GetRmgr(rmid).rm_name) == 0 &&
+					GetRmgr(rmid).rm_mask != NULL)
 				{
 					newwalconsistency[rmid] = true;
 					found = true;
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 59ebac7d6aa..9fc1729d84c 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -28,7 +28,7 @@
  * RmgrNames is an array of resource manager names, to make error messages
  * a bit nicer.
  */
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
   name,
 
 static const char *RmgrNames[RM_MAX_ID + 1] = {
diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c
index 852d8ca4b1c..6a4ebd1310b 100644
--- a/src/bin/pg_waldump/rmgrdesc.c
+++ b/src/bin/pg_waldump/rmgrdesc.c
@@ -32,7 +32,7 @@
 #include "storage/standbydefs.h"
 #include "utils/relmapper.h"
 
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
 	{ name, desc, identify},
 
 const RmgrDescData RmgrDescTable[RM_MAX_ID + 1] = {
diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h
index c9b5c56a4c6..9ad790a5cd0 100644
--- a/src/include/access/rmgr.h
+++ b/src/include/access/rmgr.h
@@ -19,7 +19,7 @@ typedef uint8 RmgrId;
  * Note: RM_MAX_ID must fit in RmgrId; widening that type will affect the XLOG
  * file format.
  */
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
 	symname,
 
 typedef enum RmgrIds
@@ -31,5 +31,14 @@ typedef enum RmgrIds
 #undef PG_RMGR
 
 #define RM_MAX_ID				(RM_NEXT_ID - 1)
+#define RM_CUSTOM_MIN_ID		128
+#define RM_CUSTOM_MAX_ID		UINT8_MAX
+
+/*
+ * RmgrId to use for extensions that require an RmgrId, but are still in
+ * development and have not reserved their own unique RmgrId yet. See:
+ * https://wiki.postgresql.org/wiki/ExtensibleRmgr
+ */
+#define RM_EXPERIMENTAL_ID		128
 
 #endif							/* RMGR_H */
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index f582cf535f6..b1ffa2728b5 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -25,25 +25,25 @@
  */
 
 /* symbol name, textual name, redo, desc, identify, startup, cleanup */
-PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL)
-PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL)
-PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL)
-PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL)
-PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL)
-PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL)
-PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL)
-PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL)
-PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL)
-PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask)
-PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask)
-PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog_startup, btree_xlog_cleanup, btree_mask)
-PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask)
-PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask)
-PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask)
-PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask)
-PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask)
-PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask)
-PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL)
-PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL)
-PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask)
-PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL)
+PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL, xlog_decode)
+PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL, xact_decode)
+PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL, standby_decode)
+PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask, heap2_decode)
+PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask, heap_decode)
+PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog_startup, btree_xlog_cleanup, btree_mask, NULL)
+PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask, NULL)
+PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask, NULL)
+PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask, NULL)
+PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, NULL)
+PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask, NULL)
+PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask, NULL)
+PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL)
+PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode)
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index dcf41e9257c..ee7422097c6 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -287,6 +287,9 @@ typedef enum
 	RECOVERY_TARGET_ACTION_SHUTDOWN
 }			RecoveryTargetAction;
 
+struct LogicalDecodingContext;
+struct XLogRecordBuffer;
+
 /*
  * Method table for resource managers.
  *
@@ -312,10 +315,15 @@ typedef struct RmgrData
 	void		(*rm_startup) (void);
 	void		(*rm_cleanup) (void);
 	void		(*rm_mask) (char *pagedata, BlockNumber blkno);
+	void		(*rm_decode) (struct LogicalDecodingContext *ctx,
+							  struct XLogRecordBuffer *buf);
 } RmgrData;
 
 extern const RmgrData RmgrTable[];
 
+extern RmgrData GetCustomRmgr(RmgrId rmid);
+extern void RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr);
+
 /*
  * Exported to support xlog switching from checkpointer
  */
@@ -333,4 +341,8 @@ extern bool InArchiveRecovery;
 extern bool StandbyMode;
 extern char *recoveryRestoreCommand;
 
+#define GetBuiltinRmgr(rmid) RmgrTable[(rmid)]
+#define GetRmgr(rmid) ((rmid < RM_CUSTOM_MIN_ID) ? \
+					   GetBuiltinRmgr(rmid) : GetCustomRmgr(rmid))
+
 #endif							/* XLOG_INTERNAL_H */
diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h
index 69918080bb5..9cc4783c3ba 100644
--- a/src/include/replication/decode.h
+++ b/src/include/replication/decode.h
@@ -14,7 +14,21 @@
 #include "replication/logical.h"
 #include "replication/reorderbuffer.h"
 
-void		LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
+typedef struct XLogRecordBuffer
+{
+	XLogRecPtr	origptr;
+	XLogRecPtr	endptr;
+	XLogReaderState *record;
+} XLogRecordBuffer;
+
+extern void xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+
+extern void	LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
 										 XLogReaderState *record);
 
 #endif
