From f7b85aea60b06eb7019befec38566b5e014bea12 Mon Sep 17 00:00:00 2001
From: Jeff Davis <jeff@j-davis.com>
Date: Sat, 30 Oct 2021 12:07:35 -0700
Subject: [PATCH] Logical wal.

---
 contrib/test_decoding/expected/messages.out   | 148 +++++++
 contrib/test_decoding/sql/messages.sql        |  58 +++
 src/backend/access/heap/heapam.c              |   4 +-
 src/backend/access/rmgrdesc/Makefile          |   2 +-
 .../{logicalmsgdesc.c => logicaldesc.c}       |  45 +-
 src/backend/access/transam/rmgr.c             |   2 +-
 src/backend/replication/logical/Makefile      |   2 +-
 src/backend/replication/logical/decode.c      | 275 ++++++++++--
 .../replication/logical/logical_xlog.c        | 399 ++++++++++++++++++
 .../replication/logical/logicalfuncs.c        | 165 +++++++-
 src/backend/replication/logical/message.c     |  89 ----
 src/bin/pg_waldump/.gitignore                 |   2 +-
 src/bin/pg_waldump/rmgrdesc.c                 |   2 +-
 src/include/access/heapam.h                   |   2 +
 src/include/access/heapam_xlog.h              |   3 +
 src/include/access/rmgrlist.h                 |   2 +-
 src/include/catalog/pg_proc.dat               |  17 +
 src/include/replication/decode.h              |   2 +-
 src/include/replication/logical_xlog.h        | 124 ++++++
 src/include/replication/message.h             |  41 --
 20 files changed, 1211 insertions(+), 173 deletions(-)
 rename src/backend/access/rmgrdesc/{logicalmsgdesc.c => logicaldesc.c} (59%)
 create mode 100644 src/backend/replication/logical/logical_xlog.c
 delete mode 100644 src/backend/replication/logical/message.c
 create mode 100644 src/include/replication/logical_xlog.h
 delete mode 100644 src/include/replication/message.h

diff --git a/contrib/test_decoding/expected/messages.out b/contrib/test_decoding/expected/messages.out
index c75d40190b6..aa284bc37c2 100644
--- a/contrib/test_decoding/expected/messages.out
+++ b/contrib/test_decoding/expected/messages.out
@@ -91,6 +91,154 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'for
 ------
 (0 rows)
 
+-- no data in this table, but emit logical INSERT/UPDATE/DELETE for it
+CREATE TABLE dummy(i int, t text, n numeric, primary key(t));
+SELECT pg_logical_emit_insert('dummy', row(1, 'one', 0.1)::dummy) <> '0/0'::pg_lsn;
+ ?column? 
+----------
+ t
+(1 row)
+
+SELECT pg_logical_emit_insert('dummy', row(2, 'two', 0.2)::dummy) <> '0/0'::pg_lsn;
+ ?column? 
+----------
+ t
+(1 row)
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
+                                 data                                  
+-----------------------------------------------------------------------
+ BEGIN
+ table public.dummy: INSERT: i[integer]:1 t[text]:'one' n[numeric]:0.1
+ COMMIT
+ BEGIN
+ table public.dummy: INSERT: i[integer]:2 t[text]:'two' n[numeric]:0.2
+ COMMIT
+(6 rows)
+
+SELECT * FROM dummy;
+ i | t | n 
+---+---+---
+(0 rows)
+
+SELECT pg_logical_emit_delete('dummy', row(12, 'twelve', 0.12)::dummy) <> '0/0'::pg_lsn;
+ ?column? 
+----------
+ t
+(1 row)
+
+SELECT pg_logical_emit_update('dummy', row(15, 'fifteen', 0.15)::dummy,
+       row(16, 'sixteen', 0.16)::dummy) <> '0/0'::pg_lsn;
+ ?column? 
+----------
+ t
+(1 row)
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
+                                                       data                                                        
+-------------------------------------------------------------------------------------------------------------------
+ BEGIN
+ table public.dummy: DELETE: t[text]:'twelve'
+ COMMIT
+ BEGIN
+ table public.dummy: UPDATE: old-key: t[text]:'fifteen' new-tuple: i[integer]:16 t[text]:'sixteen' n[numeric]:0.16
+ COMMIT
+(6 rows)
+
+ALTER TABLE dummy REPLICA IDENTITY NOTHING;
+SELECT pg_logical_emit_delete('dummy', row(12, 'twelve', 0.12)::dummy) <> '0/0'::pg_lsn;
+ ?column? 
+----------
+ t
+(1 row)
+
+SELECT pg_logical_emit_update('dummy', row(15, 'fifteen', 0.15)::dummy,
+       row(16, 'sixteen', 0.16)::dummy) <> '0/0'::pg_lsn;
+ ?column? 
+----------
+ t
+(1 row)
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
+                                    data                                     
+-----------------------------------------------------------------------------
+ BEGIN
+ table public.dummy: DELETE: (no-tuple-data)
+ COMMIT
+ BEGIN
+ table public.dummy: UPDATE: i[integer]:16 t[text]:'sixteen' n[numeric]:0.16
+ COMMIT
+(6 rows)
+
+SELECT pg_logical_emit_truncate(ARRAY['dummy'], true, false) <> '0/0'::pg_lsn;
+ ?column? 
+----------
+ t
+(1 row)
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
+                 data                  
+---------------------------------------
+ BEGIN
+ table public.dummy: TRUNCATE: cascade
+ COMMIT
+(3 rows)
+
+CREATE UNLOGGED TABLE dummy_u(i int, t text, n numeric, primary key (t));
+-- return invalid
+SELECT pg_logical_emit_insert('dummy_u', row(7, 'seven', 0.7)::dummy_u);
+ pg_logical_emit_insert 
+------------------------
+ 0/0
+(1 row)
+
+-- return invalid
+SELECT pg_logical_emit_update('dummy_u', row(7, 'seven', 0.7)::dummy_u,
+       row(11, 'eleven', 0.11)::dummy_u);
+ pg_logical_emit_update 
+------------------------
+ 0/0
+(1 row)
+
+-- return invalid
+SELECT pg_logical_emit_update('dummy_u', row(NULL, 'seven', 0.7)::dummy_u,
+       row(11, 'eleven', 0.11)::dummy_u);
+ pg_logical_emit_update 
+------------------------
+ 0/0
+(1 row)
+
+-- error
+SELECT pg_logical_emit_update('dummy_u', row(7, NULL, 0.7)::dummy_u,
+       row(11, 'eleven', 0.11)::dummy_u);
+ERROR:  replica identity column is NULL
+-- return invalid
+SELECT pg_logical_emit_delete('dummy_u', row(7, 'seven', 0.7)::dummy_u);
+ pg_logical_emit_delete 
+------------------------
+ 0/0
+(1 row)
+
+-- error
+SELECT pg_logical_emit_delete('dummy_u', row(7, NULL, 0.7)::dummy_u);
+ERROR:  replica identity column is NULL
+-- return invalid
+SELECT pg_logical_emit_truncate(ARRAY['dummy_u'], false, false);
+ pg_logical_emit_truncate 
+--------------------------
+ 0/0
+(1 row)
+
+SELECT pg_logical_emit_truncate(ARRAY['dummy','dummy_u'], false, true) <> '0/0'::pg_lsn;
+ ?column? 
+----------
+ t
+(1 row)
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
+ERROR:  could not open relation with OID 1663
+DROP TABLE dummy;
+DROP TABLE dummy_u;
 SELECT 'cleanup' FROM pg_drop_replication_slot('regression_slot');
  ?column? 
 ----------
diff --git a/contrib/test_decoding/sql/messages.sql b/contrib/test_decoding/sql/messages.sql
index cf3f7738e57..c04a2e1a0c8 100644
--- a/contrib/test_decoding/sql/messages.sql
+++ b/contrib/test_decoding/sql/messages.sql
@@ -31,4 +31,62 @@ SELECT 'otherdb2' FROM pg_logical_emit_message(true, 'test', 'otherdb2');
 \c :prevdb
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
 
+-- no data in this table, but emit logical INSERT/UPDATE/DELETE for it
+CREATE TABLE dummy(i int, t text, n numeric, primary key(t));
+
+SELECT pg_logical_emit_insert('dummy', row(1, 'one', 0.1)::dummy) <> '0/0'::pg_lsn;
+SELECT pg_logical_emit_insert('dummy', row(2, 'two', 0.2)::dummy) <> '0/0'::pg_lsn;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
+
+SELECT * FROM dummy;
+
+SELECT pg_logical_emit_delete('dummy', row(12, 'twelve', 0.12)::dummy) <> '0/0'::pg_lsn;
+
+SELECT pg_logical_emit_update('dummy', row(15, 'fifteen', 0.15)::dummy,
+       row(16, 'sixteen', 0.16)::dummy) <> '0/0'::pg_lsn;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
+
+ALTER TABLE dummy REPLICA IDENTITY NOTHING;
+
+SELECT pg_logical_emit_delete('dummy', row(12, 'twelve', 0.12)::dummy) <> '0/0'::pg_lsn;
+
+SELECT pg_logical_emit_update('dummy', row(15, 'fifteen', 0.15)::dummy,
+       row(16, 'sixteen', 0.16)::dummy) <> '0/0'::pg_lsn;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
+
+SELECT pg_logical_emit_truncate(ARRAY['dummy'], true, false) <> '0/0'::pg_lsn;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
+
+CREATE UNLOGGED TABLE dummy_u(i int, t text, n numeric, primary key (t));
+-- return invalid
+SELECT pg_logical_emit_insert('dummy_u', row(7, 'seven', 0.7)::dummy_u);
+-- return invalid
+SELECT pg_logical_emit_update('dummy_u', row(7, 'seven', 0.7)::dummy_u,
+       row(11, 'eleven', 0.11)::dummy_u);
+-- return invalid
+SELECT pg_logical_emit_update('dummy_u', row(NULL, 'seven', 0.7)::dummy_u,
+       row(11, 'eleven', 0.11)::dummy_u);
+-- error
+SELECT pg_logical_emit_update('dummy_u', row(7, NULL, 0.7)::dummy_u,
+       row(11, 'eleven', 0.11)::dummy_u);
+-- return invalid
+SELECT pg_logical_emit_delete('dummy_u', row(7, 'seven', 0.7)::dummy_u);
+-- error
+SELECT pg_logical_emit_delete('dummy_u', row(7, NULL, 0.7)::dummy_u);
+
+-- return invalid
+SELECT pg_logical_emit_truncate(ARRAY['dummy_u'], false, false);
+
+SELECT pg_logical_emit_truncate(ARRAY['dummy','dummy_u'], false, true) <> '0/0'::pg_lsn;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
+
+DROP TABLE dummy;
+
+DROP TABLE dummy_u;
+
 SELECT 'cleanup' FROM pg_drop_replication_slot('regression_slot');
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 74ad445e59b..9d70598dbde 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -108,8 +108,6 @@ static bool ConditionalMultiXactIdWait(MultiXactId multi, MultiXactStatus status
 static void index_delete_sort(TM_IndexDeleteOp *delstate);
 static int	bottomup_sort_and_shrink(TM_IndexDeleteOp *delstate);
 static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup);
-static HeapTuple ExtractReplicaIdentity(Relation rel, HeapTuple tup, bool key_required,
-										bool *copy);
 
 
 /*
@@ -8372,7 +8370,7 @@ log_heap_new_cid(Relation relation, HeapTuple tup)
  * *copy is set to true if the returned tuple is a modified copy rather than
  * the same tuple that was passed in.
  */
-static HeapTuple
+HeapTuple
 ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
 					   bool *copy)
 {
diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile
index f88d72fd862..ed6dff179be 100644
--- a/src/backend/access/rmgrdesc/Makefile
+++ b/src/backend/access/rmgrdesc/Makefile
@@ -18,7 +18,7 @@ OBJS = \
 	gistdesc.o \
 	hashdesc.o \
 	heapdesc.o \
-	logicalmsgdesc.o \
+	logicaldesc.o \
 	mxactdesc.o \
 	nbtdesc.o \
 	relmapdesc.o \
diff --git a/src/backend/access/rmgrdesc/logicalmsgdesc.c b/src/backend/access/rmgrdesc/logicaldesc.c
similarity index 59%
rename from src/backend/access/rmgrdesc/logicalmsgdesc.c
rename to src/backend/access/rmgrdesc/logicaldesc.c
index 099e11a84e7..c2b0434a606 100644
--- a/src/backend/access/rmgrdesc/logicalmsgdesc.c
+++ b/src/backend/access/rmgrdesc/logicaldesc.c
@@ -1,22 +1,22 @@
 /*-------------------------------------------------------------------------
  *
- * logicalmsgdesc.c
- *	  rmgr descriptor routines for replication/logical/message.c
+ * logicaldesc.c
+ *	  rmgr descriptor routines for replication/logical/logical_xlog.c
  *
  * Portions Copyright (c) 2015-2022, PostgreSQL Global Development Group
  *
  *
  * IDENTIFICATION
- *	  src/backend/access/rmgrdesc/logicalmsgdesc.c
+ *	  src/backend/access/rmgrdesc/logicaldesc.c
  *
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
 
-#include "replication/message.h"
+#include "replication/logical_xlog.h"
 
 void
-logicalmsg_desc(StringInfo buf, XLogReaderState *record)
+logical_desc(StringInfo buf, XLogReaderState *record)
 {
 	char	   *rec = XLogRecGetData(record);
 	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
@@ -40,13 +40,42 @@ logicalmsg_desc(StringInfo buf, XLogReaderState *record)
 			sep = " ";
 		}
 	}
+	else if (info == XLOG_LOGICAL_INSERT)
+	{
+
+	}
+	else if (info == XLOG_LOGICAL_UPDATE)
+	{
+
+	}
+	else if (info == XLOG_LOGICAL_DELETE)
+	{
+
+	}
+	else if (info == XLOG_LOGICAL_TRUNCATE)
+	{
+
+	}
 }
 
 const char *
-logicalmsg_identify(uint8 info)
+logical_identify(uint8 info)
 {
-	if ((info & ~XLR_INFO_MASK) == XLOG_LOGICAL_MESSAGE)
-		return "MESSAGE";
+	switch (info)
+	{
+		case XLOG_LOGICAL_MESSAGE:
+			return "MESSAGE";
+		case XLOG_LOGICAL_INSERT:
+			return "INSERT";
+		case XLOG_LOGICAL_UPDATE:
+			return "UPDATE";
+		case XLOG_LOGICAL_DELETE:
+			return "DELETE";
+		case XLOG_LOGICAL_TRUNCATE:
+			return "TRUNCATE";
+		default:
+			return NULL;
+	}
 
 	return NULL;
 }
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index f8847d5aebf..ca0bd614fcf 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -25,7 +25,7 @@
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
 #include "replication/decode.h"
-#include "replication/message.h"
+#include "replication/logical_xlog.h"
 #include "replication/origin.h"
 #include "storage/standby.h"
 #include "utils/relmapper.h"
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index c4e2fdeb719..9fa281a1dca 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -19,7 +19,7 @@ OBJS = \
 	launcher.o \
 	logical.o \
 	logicalfuncs.o \
-	message.o \
+	logical_xlog.o \
 	origin.o \
 	proto.o \
 	relation.o \
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 77bc7aea7a0..81feca97eb0 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -35,23 +35,29 @@
 #include "access/xlogrecord.h"
 #include "access/xlogutils.h"
 #include "catalog/pg_control.h"
+#include "commands/sequence.h"
 #include "replication/decode.h"
 #include "replication/logical.h"
-#include "replication/message.h"
+#include "replication/logical_xlog.h"
 #include "replication/origin.h"
 #include "replication/reorderbuffer.h"
 #include "replication/snapbuild.h"
 #include "storage/standby.h"
-#include "commands/sequence.h"
 
 /* individual record(group)'s handlers */
-static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeHeapInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeHeapUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeHeapDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeHeapTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 
+static void DecodeLogicalMsg(LogicalDecodingContext *cxt, XLogRecordBuffer *buf);
+static void DecodeLogicalInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeLogicalUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeLogicalDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeLogicalTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+
 static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 						 xl_xact_parsed_commit *parsed, TransactionId xid,
 						 bool two_phase);
@@ -457,7 +463,7 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	{
 		case XLOG_HEAP_INSERT:
 			if (SnapBuildProcessChange(builder, xid, buf->origptr))
-				DecodeInsert(ctx, buf);
+				DecodeHeapInsert(ctx, buf);
 			break;
 
 			/*
@@ -468,17 +474,17 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		case XLOG_HEAP_HOT_UPDATE:
 		case XLOG_HEAP_UPDATE:
 			if (SnapBuildProcessChange(builder, xid, buf->origptr))
-				DecodeUpdate(ctx, buf);
+				DecodeHeapUpdate(ctx, buf);
 			break;
 
 		case XLOG_HEAP_DELETE:
 			if (SnapBuildProcessChange(builder, xid, buf->origptr))
-				DecodeDelete(ctx, buf);
+				DecodeHeapDelete(ctx, buf);
 			break;
 
 		case XLOG_HEAP_TRUNCATE:
 			if (SnapBuildProcessChange(builder, xid, buf->origptr))
-				DecodeTruncate(ctx, buf);
+				DecodeHeapTruncate(ctx, buf);
 			break;
 
 		case XLOG_HEAP_INPLACE:
@@ -559,31 +565,71 @@ FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
  * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
  */
 void
-logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+logical_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
-	SnapBuild  *builder = ctx->snapshot_builder;
 	XLogReaderState *r = buf->record;
+	SnapBuild  *builder = ctx->snapshot_builder;
 	TransactionId xid = XLogRecGetXid(r);
 	uint8		info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
-	RepOriginId origin_id = XLogRecGetOrigin(r);
-	Snapshot	snapshot;
-	xl_logical_message *message;
-
-	if (info != XLOG_LOGICAL_MESSAGE)
-		elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
 
-	ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
+	ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
 
 	/*
 	 * If we don't have snapshot or we are just fast-forwarding, there is no
-	 * point in decoding messages.
+	 * point in decoding data changes.
 	 */
 	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
 		ctx->fast_forward)
 		return;
 
+	switch (info)
+	{
+		case XLOG_LOGICAL_MESSAGE:
+			DecodeLogicalMsg(ctx, buf);
+			break;
+
+		case XLOG_LOGICAL_INSERT:
+			if (SnapBuildProcessChange(builder, xid, buf->origptr))
+				DecodeLogicalInsert(ctx, buf);
+			break;
+
+		case XLOG_LOGICAL_UPDATE:
+			if (SnapBuildProcessChange(builder, xid, buf->origptr))
+				DecodeLogicalUpdate(ctx, buf);
+			break;
+
+		case XLOG_LOGICAL_DELETE:
+			if (SnapBuildProcessChange(builder, xid, buf->origptr))
+				DecodeLogicalDelete(ctx, buf);
+			break;
+
+		case XLOG_LOGICAL_TRUNCATE:
+			if (SnapBuildProcessChange(builder, xid, buf->origptr))
+				DecodeLogicalTruncate(ctx, buf);
+			break;
+
+		default:
+			elog(ERROR, "unexpected RM_LOGICAL_ID record type: %u", info);
+			break;
+	}
+}
+
+static void
+DecodeLogicalMsg(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	XLogReaderState *r = buf->record;
+	SnapBuild  *builder = ctx->snapshot_builder;
+	TransactionId xid = XLogRecGetXid(r);
+	RepOriginId origin_id = XLogRecGetOrigin(r);
+	Snapshot	snapshot;
+	xl_logical_message *message;
+
 	message = (xl_logical_message *) XLogRecGetData(r);
 
+	if (message->transactional &&
+		!SnapBuildProcessChange(builder, xid, buf->origptr))
+		return;
+
 	if (message->dbId != ctx->slot->data.database ||
 		FilterByOrigin(ctx, origin_id))
 		return;
@@ -603,6 +649,187 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 												 * prefix */
 							  message->message_size,
 							  message->message + message->prefix_size);
+
+}
+
+/*
+ * Parse XLOG_HEAP_INSERT (not MULTI_INSERT!) records into tuplebufs.
+ *
+ * Deletes can contain the new tuple.
+ */
+static void
+DecodeLogicalInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	XLogReaderState *r = buf->record;
+	Size		datalen = XLogRecGetDataLen(r) - SizeOfLogicalInsert;
+	char	   *tupledata = XLogRecGetData(r) + SizeOfLogicalInsert;
+	Size		tuplelen = datalen - SizeOfHeapHeader;
+	xl_logical_insert *xlrec;
+	ReorderBufferChange *change;
+
+	xlrec = (xl_logical_insert *) XLogRecGetData(r);
+
+	/* only interested in our database */
+	if (xlrec->node.dbNode != ctx->slot->data.database)
+		return;
+
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+		return;
+
+	change = ReorderBufferGetChange(ctx->reorder);
+	change->action = REORDER_BUFFER_CHANGE_INSERT;
+	change->origin_id = XLogRecGetOrigin(r);
+
+	memcpy(&change->data.tp.relnode, &xlrec->node, sizeof(RelFileNode));
+
+	change->data.tp.newtuple =
+		ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
+
+	DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple);
+
+	change->data.tp.clear_toast_afterwards = true;
+
+	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
+							 change, false);
+}
+
+/*
+ * Parse XLOG_LOGICAL_UPDATE from wal into proper tuplebufs.
+ *
+ * Updates can possibly contain a new tuple and the old primary key.
+ */
+static void
+DecodeLogicalUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	XLogReaderState *r = buf->record;
+	xl_logical_update *xlrec = (xl_logical_update *) XLogRecGetData(r);
+	char	   *new_tupledata = XLogRecGetData(r) + SizeOfLogicalUpdate;
+	Size		new_datalen = xlrec->new_datalen;
+	Size		new_tuplelen = new_datalen - SizeOfHeapHeader;
+	ReorderBufferChange *change;
+
+	/* only interested in our database */
+	if (xlrec->node.dbNode != ctx->slot->data.database)
+		return;
+
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+		return;
+
+	change = ReorderBufferGetChange(ctx->reorder);
+	change->action = REORDER_BUFFER_CHANGE_UPDATE;
+	change->origin_id = XLogRecGetOrigin(r);
+
+	memcpy(&change->data.tp.relnode, &xlrec->node, sizeof(RelFileNode));
+
+	change->data.tp.newtuple =
+		ReorderBufferGetTupleBuf(ctx->reorder, new_tuplelen);
+
+	DecodeXLogTuple(new_tupledata, new_datalen, change->data.tp.newtuple);
+
+	if (xlrec->flags & XLL_UPDATE_CONTAINS_OLD)
+	{
+		char	*old_tupledata = new_tupledata + new_datalen;
+		Size	 old_datalen;
+		Size	 old_tuplelen;
+
+		/* remaining data is the old tuple */
+		old_datalen = XLogRecGetDataLen(r) - new_datalen - SizeOfLogicalUpdate;
+		old_tuplelen = old_datalen - SizeOfHeapHeader;
+
+		change->data.tp.oldtuple =
+			ReorderBufferGetTupleBuf(ctx->reorder, old_tuplelen);
+
+		DecodeXLogTuple(old_tupledata, old_datalen, change->data.tp.oldtuple);
+	}
+
+	change->data.tp.clear_toast_afterwards = true;
+
+	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
+							 change, false);
+}
+
+/*
+ * Parse XLOG_LOGICAL_DELETE from wal into proper tuplebufs.
+ *
+ * Deletes can possibly contain the old primary key.
+ */
+static void
+DecodeLogicalDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	XLogReaderState *r = buf->record;
+	xl_logical_delete *xlrec;
+	ReorderBufferChange *change;
+
+	xlrec = (xl_logical_delete *) XLogRecGetData(r);
+
+	/* only interested in our database */
+	if (xlrec->node.dbNode != ctx->slot->data.database)
+		return;
+
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+		return;
+
+	change = ReorderBufferGetChange(ctx->reorder);
+	change->action = REORDER_BUFFER_CHANGE_DELETE;
+	change->origin_id = XLogRecGetOrigin(r);
+
+	memcpy(&change->data.tp.relnode, &xlrec->node, sizeof(RelFileNode));
+
+	if (xlrec->flags & XLL_DELETE_CONTAINS_OLD)
+	{
+		Size	 old_datalen   = XLogRecGetDataLen(r) - SizeOfLogicalDelete;
+		char	*old_tupledata = XLogRecGetData(r) + SizeOfLogicalDelete;
+		Size	 old_tuplelen  = old_datalen - SizeOfHeapHeader;
+
+		change->data.tp.oldtuple =
+			ReorderBufferGetTupleBuf(ctx->reorder, old_tuplelen);
+
+		DecodeXLogTuple(old_tupledata, old_datalen, change->data.tp.oldtuple);
+	}
+
+	change->data.tp.clear_toast_afterwards = true;
+
+	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
+							 change, false);
+}
+
+/*
+ * Parse XLOG_LOGICAL_TRUNCATE from wal
+ */
+static void
+DecodeLogicalTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	XLogReaderState *r = buf->record;
+	xl_logical_truncate *xlrec;
+	ReorderBufferChange *change;
+
+	xlrec = (xl_logical_truncate *) XLogRecGetData(r);
+
+	/* only interested in our database */
+	if (xlrec->dbId != ctx->slot->data.database)
+		return;
+
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+		return;
+
+	change = ReorderBufferGetChange(ctx->reorder);
+	change->action = REORDER_BUFFER_CHANGE_TRUNCATE;
+	change->origin_id = XLogRecGetOrigin(r);
+	if (xlrec->flags & XLL_TRUNCATE_CASCADE)
+		change->data.truncate.cascade = true;
+	if (xlrec->flags & XLL_TRUNCATE_RESTART_SEQS)
+		change->data.truncate.restart_seqs = true;
+	change->data.truncate.nrelids = xlrec->nrelids;
+	change->data.truncate.relids = ReorderBufferGetRelids(ctx->reorder,
+														  xlrec->nrelids);
+	memcpy(change->data.truncate.relids, xlrec->relids,
+		   xlrec->nrelids * sizeof(Oid));
+	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
+							 buf->origptr, change, false);
 }
 
 /*
@@ -839,7 +1066,7 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
  * Deletes can contain the new tuple.
  */
 static void
-DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+DecodeHeapInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	Size		datalen;
 	char	   *tupledata;
@@ -898,7 +1125,7 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
  * Updates can possibly contain a new tuple and the old primary key.
  */
 static void
-DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+DecodeHeapUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	XLogReaderState *r = buf->record;
 	xl_heap_update *xlrec;
@@ -965,7 +1192,7 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
  * Deletes can possibly contain the old primary key.
  */
 static void
-DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+DecodeHeapDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	XLogReaderState *r = buf->record;
 	xl_heap_delete *xlrec;
@@ -1019,7 +1246,7 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
  * Parse XLOG_HEAP_TRUNCATE from wal
  */
 static void
-DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+DecodeHeapTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	XLogReaderState *r = buf->record;
 	xl_heap_truncate *xlrec;
diff --git a/src/backend/replication/logical/logical_xlog.c b/src/backend/replication/logical/logical_xlog.c
new file mode 100644
index 00000000000..30ae3e73be6
--- /dev/null
+++ b/src/backend/replication/logical/logical_xlog.c
@@ -0,0 +1,399 @@
+/*-------------------------------------------------------------------------
+ *
+ * logical_xlog.c
+ *	  Logical xlog records.
+ *
+ * Copyright (c) 2013-2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/replication/logical/logical_xlog.c
+ *
+ * Logical Messages
+ *
+ * Generic logical messages allow XLOG logging of arbitrary binary blobs that
+ * get passed to the logical decoding plugin. In normal XLOG processing they
+ * are same as NOOP.
+ *
+ * These messages can be either transactional or non-transactional.
+ * Transactional messages are part of current transaction and will be sent to
+ * decoding plugin using in a same way as DML operations.
+ * Non-transactional messages are sent to the plugin at the time when the
+ * logical decoding reads them from XLOG. This also means that transactional
+ * messages won't be delivered if the transaction was rolled back but the
+ * non-transactional one will always be delivered.
+ *
+ * Every message carries prefix to avoid conflicts between different decoding
+ * plugins. The plugin authors must take extra care to use unique prefix,
+ * good options seems to be for example to use the name of the extension.
+ *
+ * Logical Insert/Update/Delete/Truncate
+ *
+ * These records are intended to be used by non-heap table access methods that
+ * wish to support logical decoding and replication. They are treated
+ * similarly to the analogous heap records, but are not tied to physical pages
+ * or other details of the heap. These records are not processed during redo,
+ * so do not contribute to durability or physical replication; use generic WAL
+ * records for that. Note that using both logical WAL records and generic WAL
+ * records is redundant compared with the heap.
+ *
+ * ---------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "access/heapam_xlog.h"
+#include "access/xact.h"
+#include "access/xloginsert.h"
+#include "catalog/catalog.h"
+#include "miscadmin.h"
+#include "nodes/execnodes.h"
+#include "replication/logical.h"
+#include "replication/logical_xlog.h"
+#include "utils/memutils.h"
+#include "utils/relcache.h"
+
+static void CheckReplicaIdentity(Relation relation, TupleTableSlot *slot);
+
+/*
+ * Write logical decoding message into XLog.
+ */
+XLogRecPtr
+LogLogicalMessage(const char *prefix, const char *message, size_t size,
+				  bool transactional)
+{
+	xl_logical_message xlrec;
+
+	/*
+	 * Force xid to be allocated if we're emitting a transactional message.
+	 */
+	if (transactional)
+	{
+		Assert(IsTransactionState());
+		GetCurrentTransactionId();
+	}
+
+	xlrec.dbId = MyDatabaseId;
+	xlrec.transactional = transactional;
+	/* trailing zero is critical; see logicalmsg_desc */
+	xlrec.prefix_size = strlen(prefix) + 1;
+	xlrec.message_size = size;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, SizeOfLogicalMessage);
+	XLogRegisterData(unconstify(char *, prefix), xlrec.prefix_size);
+	XLogRegisterData(unconstify(char *, message), size);
+
+	/* allow origin filtering */
+	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
+	return XLogInsert(RM_LOGICAL_ID, XLOG_LOGICAL_MESSAGE);
+}
+
+/*
+ * Write logical insert into log.
+ */
+XLogRecPtr
+LogLogicalInsert(Relation relation, TupleTableSlot *new_slot)
+{
+	bool		free_new_tuple;
+	HeapTuple	new_tuple;
+	xl_logical_insert xlrec;
+	xl_heap_header xlhdr;
+	XLogRecPtr recptr;
+
+	if (!equalTupleDescs(relation->rd_att, new_slot->tts_tupleDescriptor))
+		ereport(ERROR, (errmsg("record type must match relation type")));
+
+	if (!RelationIsLogicallyLogged(relation))
+		return InvalidXLogRecPtr;
+
+	new_tuple = ExecFetchSlotHeapTuple(new_slot, true, &free_new_tuple);
+
+	/* force xid to be allocated */
+	Assert(IsTransactionState());
+	GetCurrentTransactionId();
+
+	new_tuple->t_tableOid = new_slot->tts_tableOid;
+	ItemPointerCopy(&new_tuple->t_self, &new_slot->tts_tid);
+
+	xlrec.node = relation->rd_node;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, SizeOfLogicalInsert);
+
+	xlhdr.t_infomask2 = new_tuple->t_data->t_infomask2;
+	xlhdr.t_infomask = new_tuple->t_data->t_infomask;
+	xlhdr.t_hoff = new_tuple->t_data->t_hoff;
+
+	XLogRegisterData((char *) &xlhdr, SizeOfHeapHeader);
+	XLogRegisterData((char *) new_tuple->t_data + SizeofHeapTupleHeader,
+					 new_tuple->t_len - SizeofHeapTupleHeader);
+
+	/* allow origin filtering */
+	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
+	recptr = XLogInsert(RM_LOGICAL_ID, XLOG_LOGICAL_INSERT);
+
+	if (free_new_tuple)
+		pfree(new_tuple);
+
+	return recptr;
+}
+
+/*
+ * Write logical update into log.
+ */
+XLogRecPtr
+LogLogicalUpdate(Relation relation, TupleTableSlot *old_slot,
+				 TupleTableSlot *new_slot)
+{
+	HeapTuple	old_whole_tuple;
+	HeapTuple	old_id_tuple;
+	HeapTuple	new_tuple;
+	bool		free_old_whole_tuple;
+	bool		free_old_id_tuple;
+	bool		free_new_tuple;
+	xl_heap_header new_xlhdr;
+	xl_logical_update xlrec;
+	XLogRecPtr recptr;
+
+	if (!equalTupleDescs(relation->rd_att, old_slot->tts_tupleDescriptor) ||
+		!equalTupleDescs(relation->rd_att, new_slot->tts_tupleDescriptor))
+		ereport(ERROR, (errmsg("record types must match relation type")));
+
+	CheckReplicaIdentity(relation, old_slot);
+
+	if (!RelationIsLogicallyLogged(relation))
+		return InvalidXLogRecPtr;
+
+	/* force xid to be allocated */
+	Assert(IsTransactionState());
+	GetCurrentTransactionId();
+
+	old_whole_tuple = ExecFetchSlotHeapTuple(old_slot, true,
+											 &free_old_whole_tuple);
+	new_tuple = ExecFetchSlotHeapTuple(new_slot, true, &free_new_tuple);
+
+	xlrec.node = relation->rd_node;
+	xlrec.new_datalen = new_tuple->t_len - SizeofHeapTupleHeader +
+		SizeOfHeapHeader;
+	xlrec.flags = 0;
+
+	old_id_tuple = ExtractReplicaIdentity(relation, old_whole_tuple, true,
+										  &free_old_id_tuple);
+
+	if (old_id_tuple != NULL)
+	{
+		xlrec.flags |= XLL_UPDATE_CONTAINS_OLD;
+		old_id_tuple->t_tableOid = old_slot->tts_tableOid;
+		ItemPointerCopy(&old_id_tuple->t_self, &old_slot->tts_tid);
+	}
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, SizeOfLogicalUpdate);
+
+	new_tuple->t_tableOid = new_slot->tts_tableOid;
+	ItemPointerCopy(&new_tuple->t_self, &new_slot->tts_tid);
+
+	new_xlhdr.t_infomask2 = new_tuple->t_data->t_infomask2;
+	new_xlhdr.t_infomask = new_tuple->t_data->t_infomask;
+	new_xlhdr.t_hoff = new_tuple->t_data->t_hoff;
+
+	/* write new tuple first, then old */
+	XLogRegisterData((char *) &new_xlhdr, SizeOfHeapHeader);
+	XLogRegisterData((char *) new_tuple->t_data + SizeofHeapTupleHeader,
+					 new_tuple->t_len - SizeofHeapTupleHeader);
+
+	if (old_id_tuple != NULL)
+	{
+		xl_heap_header old_xlhdr;
+
+		old_xlhdr.t_infomask2 = old_id_tuple->t_data->t_infomask2;
+		old_xlhdr.t_infomask = old_id_tuple->t_data->t_infomask;
+		old_xlhdr.t_hoff = old_id_tuple->t_data->t_hoff;
+
+		XLogRegisterData((char *) &old_xlhdr, SizeOfHeapHeader);
+		XLogRegisterData((char *) old_id_tuple->t_data + SizeofHeapTupleHeader,
+					 old_id_tuple->t_len - SizeofHeapTupleHeader);
+	}
+
+	/* allow origin filtering */
+	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
+	recptr = XLogInsert(RM_LOGICAL_ID, XLOG_LOGICAL_UPDATE);
+
+	if (free_old_whole_tuple)
+		pfree(old_whole_tuple);
+	if (free_old_id_tuple)
+		pfree(old_id_tuple);
+	if (free_new_tuple)
+		pfree(new_tuple);
+
+	return recptr;
+}
+
+/*
+ * Write logical update into log.
+ */
+XLogRecPtr
+LogLogicalDelete(Relation relation, TupleTableSlot *old_slot)
+{
+	HeapTuple		 old_whole_tuple;
+	HeapTuple		 old_id_tuple;
+	bool			 free_old_whole_tuple;
+	bool			 free_old_id_tuple;
+	xl_logical_delete xlrec;
+	XLogRecPtr		 recptr;
+
+	if (!equalTupleDescs(relation->rd_att, old_slot->tts_tupleDescriptor))
+		ereport(ERROR, (errmsg("record type must match relation type")));
+
+	CheckReplicaIdentity(relation, old_slot);
+
+	if (!RelationIsLogicallyLogged(relation))
+		return InvalidXLogRecPtr;
+
+	/* force xid to be allocated */
+	Assert(IsTransactionState());
+	GetCurrentTransactionId();
+
+	old_whole_tuple = ExecFetchSlotHeapTuple(old_slot, true,
+											 &free_old_whole_tuple);
+
+	xlrec.node = relation->rd_node;
+	xlrec.flags = 0;
+
+	old_id_tuple = ExtractReplicaIdentity(relation, old_whole_tuple, true,
+										  &free_old_id_tuple);
+
+	if (old_id_tuple != NULL)
+	{
+		xlrec.flags |= XLL_UPDATE_CONTAINS_OLD;
+		old_id_tuple->t_tableOid = old_slot->tts_tableOid;
+		ItemPointerCopy(&old_id_tuple->t_self, &old_slot->tts_tid);
+	}
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, SizeOfLogicalDelete);
+
+	if (old_id_tuple != NULL)
+	{
+		xl_heap_header old_xlhdr;
+
+		old_xlhdr.t_infomask2 = old_id_tuple->t_data->t_infomask2;
+		old_xlhdr.t_infomask = old_id_tuple->t_data->t_infomask;
+		old_xlhdr.t_hoff = old_id_tuple->t_data->t_hoff;
+
+		XLogRegisterData((char *) &old_xlhdr, SizeOfHeapHeader);
+		XLogRegisterData((char *) old_id_tuple->t_data + SizeofHeapTupleHeader,
+					 old_id_tuple->t_len - SizeofHeapTupleHeader);
+	}
+
+	/* allow origin filtering */
+	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
+	recptr = XLogInsert(RM_LOGICAL_ID, XLOG_LOGICAL_DELETE);
+
+	if (free_old_whole_tuple)
+		pfree(old_whole_tuple);
+	if (free_old_id_tuple)
+		pfree(old_id_tuple);
+
+	return recptr;
+}
+
+/*
+ * Write logical truncate into log.
+ */
+XLogRecPtr
+LogLogicalTruncate(List *relids, bool cascade, bool restart_seqs)
+{
+	xl_logical_truncate	 xlrec;
+	XLogRecPtr			 recptr;
+	Oid					*logrelids;
+	ListCell			*lc;
+	int					 nrelids = 0;
+
+	/* force xid to be allocated */
+	Assert(IsTransactionState());
+	GetCurrentTransactionId();
+
+	xlrec.dbId = MyDatabaseId;
+	xlrec.nrelids = list_length(relids);
+	xlrec.flags = 0;
+	if (cascade)
+		xlrec.flags |= XLL_TRUNCATE_CASCADE;
+	if (restart_seqs)
+		xlrec.flags |= XLL_TRUNCATE_RESTART_SEQS;
+
+	logrelids = palloc(list_length(relids) * sizeof(Oid));
+	foreach(lc, relids)
+	{
+		Oid			relid = lfirst_oid(lc);
+		Relation	rel	  = relation_open(relid, AccessShareLock);
+
+		if (RelationIsLogicallyLogged(rel))
+		{
+			logrelids[nrelids++] = lfirst_oid(lc);
+		}
+
+		relation_close(rel, NoLock);
+	}
+
+	if (nrelids == 0)
+		return InvalidXLogRecPtr;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, SizeOfLogicalTruncate);
+	XLogRegisterData((char *) logrelids, nrelids * sizeof(Oid));
+
+	/* allow origin filtering */
+	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
+	recptr = XLogInsert(RM_LOGICAL_ID, XLOG_LOGICAL_TRUNCATE);
+
+	return recptr;
+}
+
+/*
+ * Redo is basically just noop for logical decoding messages.
+ */
+void
+logical_redo(XLogReaderState *record)
+{
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	switch (info)
+	{
+		case XLOG_LOGICAL_MESSAGE:
+		case XLOG_LOGICAL_INSERT:
+		case XLOG_LOGICAL_UPDATE:
+		case XLOG_LOGICAL_DELETE:
+		case XLOG_LOGICAL_TRUNCATE:
+			break;
+		default:
+			elog(PANIC, "logical_redo: unknown op code %u", info);
+	}
+
+	/* This is only interesting for logical decoding, see decode.c. */
+}
+
+/*
+ * Check that replica identity columns are non-NULL.
+ */
+static void
+CheckReplicaIdentity(Relation relation, TupleTableSlot *slot)
+{
+	/* check for NULL attributes in the replica identity */
+	Bitmapset *id_attrs = RelationGetIndexAttrBitmap(
+		relation, INDEX_ATTR_BITMAP_IDENTITY_KEY);
+	int id_attr = (-1) * FirstLowInvalidHeapAttributeNumber;
+
+	while ((id_attr = bms_next_member(id_attrs, id_attr)) >= 0)
+	{
+		AttrNumber attno = id_attr + FirstLowInvalidHeapAttributeNumber;
+		if (slot_attisnull(slot, attno))
+			ereport(ERROR, (errmsg("replica identity column is NULL")));
+	}
+}
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 6058d36e0d5..2776861ab8d 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -17,6 +17,7 @@
 
 #include <unistd.h>
 
+#include "access/relation.h"
 #include "access/xact.h"
 #include "access/xlog_internal.h"
 #include "access/xlogrecovery.h"
@@ -29,7 +30,7 @@
 #include "nodes/makefuncs.h"
 #include "replication/decode.h"
 #include "replication/logical.h"
-#include "replication/message.h"
+#include "replication/logical_xlog.h"
 #include "storage/fd.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
@@ -389,3 +390,165 @@ pg_logical_emit_message_text(PG_FUNCTION_ARGS)
 	/* bytea and text are compatible */
 	return pg_logical_emit_message_bytea(fcinfo);
 }
+
+/*
+ * SQL function for writing logical insert into WAL.
+ */
+Datum
+pg_logical_emit_insert(PG_FUNCTION_ARGS)
+{
+	Oid				 relid = PG_GETARG_OID(0);
+	HeapTupleHeader	 htup  = PG_GETARG_HEAPTUPLEHEADER(1);
+	Relation		 rel   = relation_open(relid, AccessShareLock);
+	HeapTupleData	 tuple;
+	TupleTableSlot	*slot;
+	Oid				 rel_type;
+	Oid				 tup_type;
+	XLogRecPtr		 lsn;
+
+	/* check that tuple matches the type of the relation */
+	rel_type = get_rel_type_id(relid);
+	tup_type = HeapTupleHeaderGetTypeId(htup);
+	if (rel_type != tup_type)
+		ereport(ERROR, (errmsg("record type must match table type")));
+
+	tuple.t_len = HeapTupleHeaderGetDatumLength(htup);
+	ItemPointerSetInvalid(&(tuple.t_self));
+	tuple.t_tableOid = relid;
+	tuple.t_data = htup;
+
+	slot = MakeTupleTableSlot(rel->rd_att, &TTSOpsHeapTuple);
+	ExecClearTuple(slot);
+	ExecStoreHeapTuple(&tuple, slot, false);
+
+	lsn = LogLogicalInsert(rel, slot);
+
+	ExecDropSingleTupleTableSlot(slot);
+
+	relation_close(rel, NoLock);
+
+	PG_RETURN_LSN(lsn);
+}
+
+/*
+ * SQL function for writing logical update into WAL.
+ */
+Datum
+pg_logical_emit_update(PG_FUNCTION_ARGS)
+{
+	Oid				 relid = PG_GETARG_OID(0);
+	HeapTupleHeader	 old_htup  = PG_GETARG_HEAPTUPLEHEADER(1);
+	HeapTupleHeader	 new_htup  = PG_GETARG_HEAPTUPLEHEADER(2);
+	Relation		 rel   = relation_open(relid, AccessShareLock);
+	HeapTupleData	 old_tuple;
+	HeapTupleData	 new_tuple;
+	TupleTableSlot	*old_slot;
+	TupleTableSlot	*new_slot;
+	Oid				 rel_type;
+	Oid				 old_tup_type;
+	Oid				 new_tup_type;
+	XLogRecPtr		 lsn;
+
+	/* check that tuple matches the type of the relation */
+	rel_type = get_rel_type_id(relid);
+	old_tup_type = HeapTupleHeaderGetTypeId(old_htup);
+	new_tup_type = HeapTupleHeaderGetTypeId(new_htup);
+
+	if (rel_type != old_tup_type || rel_type != new_tup_type)
+		ereport(ERROR, (errmsg("record type must match table type")));
+
+	old_tuple.t_len = HeapTupleHeaderGetDatumLength(old_htup);
+	ItemPointerSetInvalid(&(old_tuple.t_self));
+	old_tuple.t_tableOid = relid;
+	old_tuple.t_data = old_htup;
+
+	new_tuple.t_len = HeapTupleHeaderGetDatumLength(new_htup);
+	ItemPointerSetInvalid(&(new_tuple.t_self));
+	new_tuple.t_tableOid = relid;
+	new_tuple.t_data = new_htup;
+
+	old_slot = MakeTupleTableSlot(rel->rd_att, &TTSOpsHeapTuple);
+	new_slot = MakeTupleTableSlot(rel->rd_att, &TTSOpsHeapTuple);
+	ExecClearTuple(old_slot);
+	ExecClearTuple(new_slot);
+	ExecStoreHeapTuple(&old_tuple, old_slot, false);
+	ExecStoreHeapTuple(&new_tuple, new_slot, false);
+
+	lsn = LogLogicalUpdate(rel, old_slot, new_slot);
+
+	ExecDropSingleTupleTableSlot(old_slot);
+	ExecDropSingleTupleTableSlot(new_slot);
+
+	relation_close(rel, NoLock);
+
+	PG_RETURN_LSN(lsn);
+}
+
+/*
+ * SQL function for writing logical delete into WAL.
+ */
+Datum
+pg_logical_emit_delete(PG_FUNCTION_ARGS)
+{
+	Oid				 relid = PG_GETARG_OID(0);
+	HeapTupleHeader	 htup  = PG_GETARG_HEAPTUPLEHEADER(1);
+	Relation		 rel   = relation_open(relid, AccessShareLock);
+	HeapTupleData	 tuple;
+	TupleTableSlot	*slot;
+	Oid				 rel_type;
+	Oid				 tup_type;
+	XLogRecPtr		 lsn;
+
+	/* check that tuple matches the type of the relation */
+	rel_type = get_rel_type_id(relid);
+	tup_type = HeapTupleHeaderGetTypeId(htup);
+	if (rel_type != tup_type)
+		ereport(ERROR, (errmsg("record type must match table type")));
+
+	tuple.t_len = HeapTupleHeaderGetDatumLength(htup);
+	ItemPointerSetInvalid(&(tuple.t_self));
+	tuple.t_tableOid = relid;
+	tuple.t_data = htup;
+
+	slot = MakeTupleTableSlot(rel->rd_att, &TTSOpsHeapTuple);
+	ExecClearTuple(slot);
+	ExecStoreHeapTuple(&tuple, slot, false);
+
+	lsn = LogLogicalDelete(rel, slot);
+
+	ExecDropSingleTupleTableSlot(slot);
+
+	relation_close(rel, NoLock);
+
+	PG_RETURN_LSN(lsn);
+}
+
+/*
+ * SQL function for writing logical truncate into WAL.
+ */
+Datum
+pg_logical_emit_truncate(PG_FUNCTION_ARGS)
+{
+	ArrayType	*arr		  = PG_GETARG_ARRAYTYPE_P(0);
+	bool		 cascade	  = PG_GETARG_BOOL(1);
+	bool		 restart_seqs = PG_GETARG_BOOL(2);
+	Datum		*values;
+	bool		*nulls;
+	int			 nrelids;
+	List		*relids		  = NIL;
+	XLogRecPtr	 lsn;
+
+	deconstruct_array(arr, REGCLASSOID, sizeof(Oid), true, TYPALIGN_INT,
+					  &values, &nulls, &nrelids);
+
+	for (int i = 0; i < nrelids; i++)
+	{
+		if (nulls[i])
+			ereport(ERROR, (errmsg("unexpected NULL element")));
+		relids = lappend_oid(relids, DatumGetObjectId(values[i]));
+	}
+
+	lsn = LogLogicalTruncate(relids, cascade, restart_seqs);
+
+	PG_RETURN_LSN(lsn);
+}
diff --git a/src/backend/replication/logical/message.c b/src/backend/replication/logical/message.c
deleted file mode 100644
index 1c34912610e..00000000000
--- a/src/backend/replication/logical/message.c
+++ /dev/null
@@ -1,89 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * message.c
- *	  Generic logical messages.
- *
- * Copyright (c) 2013-2022, PostgreSQL Global Development Group
- *
- * IDENTIFICATION
- *	  src/backend/replication/logical/message.c
- *
- * NOTES
- *
- * Generic logical messages allow XLOG logging of arbitrary binary blobs that
- * get passed to the logical decoding plugin. In normal XLOG processing they
- * are same as NOOP.
- *
- * These messages can be either transactional or non-transactional.
- * Transactional messages are part of current transaction and will be sent to
- * decoding plugin using in a same way as DML operations.
- * Non-transactional messages are sent to the plugin at the time when the
- * logical decoding reads them from XLOG. This also means that transactional
- * messages won't be delivered if the transaction was rolled back but the
- * non-transactional one will always be delivered.
- *
- * Every message carries prefix to avoid conflicts between different decoding
- * plugins. The plugin authors must take extra care to use unique prefix,
- * good options seems to be for example to use the name of the extension.
- *
- * ---------------------------------------------------------------------------
- */
-
-#include "postgres.h"
-
-#include "access/xact.h"
-#include "access/xloginsert.h"
-#include "miscadmin.h"
-#include "nodes/execnodes.h"
-#include "replication/logical.h"
-#include "replication/message.h"
-#include "utils/memutils.h"
-
-/*
- * Write logical decoding message into XLog.
- */
-XLogRecPtr
-LogLogicalMessage(const char *prefix, const char *message, size_t size,
-				  bool transactional)
-{
-	xl_logical_message xlrec;
-
-	/*
-	 * Force xid to be allocated if we're emitting a transactional message.
-	 */
-	if (transactional)
-	{
-		Assert(IsTransactionState());
-		GetCurrentTransactionId();
-	}
-
-	xlrec.dbId = MyDatabaseId;
-	xlrec.transactional = transactional;
-	/* trailing zero is critical; see logicalmsg_desc */
-	xlrec.prefix_size = strlen(prefix) + 1;
-	xlrec.message_size = size;
-
-	XLogBeginInsert();
-	XLogRegisterData((char *) &xlrec, SizeOfLogicalMessage);
-	XLogRegisterData(unconstify(char *, prefix), xlrec.prefix_size);
-	XLogRegisterData(unconstify(char *, message), size);
-
-	/* allow origin filtering */
-	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
-
-	return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE);
-}
-
-/*
- * Redo is basically just noop for logical decoding messages.
- */
-void
-logicalmsg_redo(XLogReaderState *record)
-{
-	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
-
-	if (info != XLOG_LOGICAL_MESSAGE)
-		elog(PANIC, "logicalmsg_redo: unknown op code %u", info);
-
-	/* This is only interesting for logical decoding, see decode.c. */
-}
diff --git a/src/bin/pg_waldump/.gitignore b/src/bin/pg_waldump/.gitignore
index 3be00a8b61f..567655fa626 100644
--- a/src/bin/pg_waldump/.gitignore
+++ b/src/bin/pg_waldump/.gitignore
@@ -10,7 +10,7 @@
 /gistdesc.c
 /hashdesc.c
 /heapdesc.c
-/logicalmsgdesc.c
+/logicaldesc.c
 /mxactdesc.c
 /nbtdesc.c
 /relmapdesc.c
diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c
index 6a4ebd1310b..86804a243bc 100644
--- a/src/bin/pg_waldump/rmgrdesc.c
+++ b/src/bin/pg_waldump/rmgrdesc.c
@@ -26,7 +26,7 @@
 #include "commands/dbcommands_xlog.h"
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
-#include "replication/message.h"
+#include "replication/logical_xlog.h"
 #include "replication/origin.h"
 #include "rmgrdesc.h"
 #include "storage/standbydefs.h"
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index b46ab7d7390..77fa0337f6e 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -175,6 +175,8 @@ extern void simple_heap_insert(Relation relation, HeapTuple tup);
 extern void simple_heap_delete(Relation relation, ItemPointer tid);
 extern void simple_heap_update(Relation relation, ItemPointer otid,
 							   HeapTuple tup);
+extern HeapTuple ExtractReplicaIdentity(Relation rel, HeapTuple tup,
+										bool key_required, bool *copy);
 
 extern TransactionId heap_index_delete_tuples(Relation rel,
 											  TM_IndexDeleteOp *delstate);
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index 5c47fdcec80..5460f7836f9 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -124,6 +124,9 @@ typedef struct xl_heap_delete
  * For truncate we list all truncated relids in an array, followed by all
  * sequence relids that need to be restarted, if any.
  * All rels are always within the same database, so we just list dbid once.
+ *
+ * Note: identical to xl_logical_truncate, except that for
+ * xl_logical_truncate, no redo is performed.
  */
 typedef struct xl_heap_truncate
 {
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index cf8b6d48193..d43175088fb 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -46,4 +46,4 @@ PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, bri
 PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL)
 PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL)
 PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL)
-PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode)
+PG_RMGR(RM_LOGICAL_ID, "Logical", logical_redo, logical_desc, logical_identify, NULL, NULL, NULL, logical_decode)
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 01e1dd4d6d1..cd57c2b3aa2 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -10937,6 +10937,23 @@
   proname => 'pg_logical_emit_message', provolatile => 'v', proparallel => 'u',
   prorettype => 'pg_lsn', proargtypes => 'bool text bytea',
   prosrc => 'pg_logical_emit_message_bytea' },
+{ oid => '9297', descr => 'emit a logical insert',
+  proname => 'pg_logical_emit_insert', provolatile => 'v', proparallel => 'u',
+  prorettype => 'pg_lsn', proargtypes => 'regclass record',
+  prosrc => 'pg_logical_emit_insert' },
+{ oid => '9298', descr => 'emit a logical update',
+  proname => 'pg_logical_emit_update', provolatile => 'v', proparallel => 'u',
+  prorettype => 'pg_lsn', proargtypes => 'regclass record record',
+  prosrc => 'pg_logical_emit_update' },
+{ oid => '9299', descr => 'emit a logical delete',
+  proname => 'pg_logical_emit_delete', provolatile => 'v', proparallel => 'u',
+  prorettype => 'pg_lsn', proargtypes => 'regclass record',
+  prosrc => 'pg_logical_emit_delete' },
+{ oid => '9300', descr => 'emit a logical truncate',
+  proname => 'pg_logical_emit_truncate', provolatile => 'v', proparallel => 'u',
+  prorettype => 'pg_lsn', proargtypes => '_regclass bool bool',
+  prosrc => 'pg_logical_emit_truncate' },
+
 
 # event triggers
 { oid => '3566', descr => 'list objects dropped by the current command',
diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h
index 8e07bb7409a..118590ad61e 100644
--- a/src/include/replication/decode.h
+++ b/src/include/replication/decode.h
@@ -26,7 +26,7 @@ extern void heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 extern void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void logical_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 extern void sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 
 extern void	LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
diff --git a/src/include/replication/logical_xlog.h b/src/include/replication/logical_xlog.h
new file mode 100644
index 00000000000..dce74f71111
--- /dev/null
+++ b/src/include/replication/logical_xlog.h
@@ -0,0 +1,124 @@
+/*-------------------------------------------------------------------------
+ * logical_xlog.h
+ *	   Exports from replication/logical/logical_xlog.c
+ *
+ * Copyright (c) 2013-2022, PostgreSQL Global Development Group
+ *
+ * src/include/replication/logical_xlog.h
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_LOGICAL_MESSAGE_H
+#define PG_LOGICAL_MESSAGE_H
+
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "access/xlogreader.h"
+#include "storage/off.h"
+#include "utils/rel.h"
+
+/*
+ * xl_heap_update flag values, 8 bits are available.
+ */
+#define XLL_UPDATE_CONTAINS_OLD		(1<<0)
+
+/*
+ * xl_heap_delete flag values, 8 bits are available.
+ */
+#define XLL_DELETE_CONTAINS_OLD		(1<<0)
+
+/*
+ * Generic logical decoding message wal record.
+ */
+typedef struct xl_logical_message
+{
+	Oid			dbId;			/* database Oid emitted from */
+	bool		transactional;	/* is message transactional? */
+	Size		prefix_size;	/* length of prefix */
+	Size		message_size;	/* size of the message */
+	/* payload, including null-terminated prefix of length prefix_size */
+	char		message[FLEXIBLE_ARRAY_MEMBER];
+} xl_logical_message;
+
+#define SizeOfLogicalMessage	(offsetof(xl_logical_message, message))
+
+/* This is what we need to know about insert */
+typedef struct xl_logical_insert
+{
+	RelFileNode		node;
+
+	/* tuple data follows */
+} xl_logical_insert;
+
+#define SizeOfLogicalInsert	(offsetof(xl_logical_insert, node) + sizeof(RelFileNode))
+
+/*
+ * This is what we need to know about update.
+ */
+typedef struct xl_logical_update
+{
+	RelFileNode	node;
+	Size		new_datalen;
+	uint8		flags;
+
+	/* tuple data follows */
+} xl_logical_update;
+
+#define SizeOfLogicalUpdate	(offsetof(xl_logical_update, flags) + sizeof(uint8))
+
+/* This is what we need to know about delete */
+typedef struct xl_logical_delete
+{
+	RelFileNode	node;
+	uint8		flags;
+
+	/* tuple data follows */
+} xl_logical_delete;
+
+#define SizeOfLogicalDelete	(offsetof(xl_logical_delete, flags) + sizeof(uint8))
+
+/*
+ * For truncate we list all truncated relids in an array, followed by all
+ * sequence relids that need to be restarted, if any.
+ * All rels are always within the same database, so we just list dbid once.
+ *
+ * Note: identical to xl_logical_truncate, except that no redo is performed, only
+ * decoding.
+ */
+typedef struct xl_logical_truncate
+{
+	Oid			dbId;
+	uint32		nrelids;
+	uint8		flags;
+	Oid			relids[FLEXIBLE_ARRAY_MEMBER];
+} xl_logical_truncate;
+
+#define SizeOfLogicalTruncate	(offsetof(xl_logical_truncate, relids))
+
+struct TupleTableSlot;
+
+extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message,
+									size_t size, bool transactional);
+extern XLogRecPtr LogLogicalInsert(Relation relation, struct TupleTableSlot *slot);
+extern XLogRecPtr LogLogicalUpdate(Relation relation, struct TupleTableSlot *old_slot,
+								   struct TupleTableSlot *new_slot);
+extern XLogRecPtr LogLogicalDelete(Relation relation, struct TupleTableSlot *slot);
+extern XLogRecPtr LogLogicalTruncate(List *relids, bool cascade, bool restart_seqs);
+
+/* RMGR API*/
+#define XLOG_LOGICAL_MESSAGE	0x00
+#define XLOG_LOGICAL_INSERT		0x10
+#define XLOG_LOGICAL_UPDATE		0x20
+#define XLOG_LOGICAL_DELETE		0x30
+#define XLOG_LOGICAL_TRUNCATE	0x40
+
+/*
+ * xl_logical_truncate flag values, 8 bits are available.
+ */
+#define XLL_TRUNCATE_CASCADE					(1<<0)
+#define XLL_TRUNCATE_RESTART_SEQS				(1<<1)
+
+void		logical_redo(XLogReaderState *record);
+void		logical_desc(StringInfo buf, XLogReaderState *record);
+const char *logical_identify(uint8 info);
+
+#endif							/* PG_LOGICAL_MESSAGE_H */
diff --git a/src/include/replication/message.h b/src/include/replication/message.h
deleted file mode 100644
index 7d7785292f1..00000000000
--- a/src/include/replication/message.h
+++ /dev/null
@@ -1,41 +0,0 @@
-/*-------------------------------------------------------------------------
- * message.h
- *	   Exports from replication/logical/message.c
- *
- * Copyright (c) 2013-2022, PostgreSQL Global Development Group
- *
- * src/include/replication/message.h
- *-------------------------------------------------------------------------
- */
-#ifndef PG_LOGICAL_MESSAGE_H
-#define PG_LOGICAL_MESSAGE_H
-
-#include "access/xlog.h"
-#include "access/xlogdefs.h"
-#include "access/xlogreader.h"
-
-/*
- * Generic logical decoding message wal record.
- */
-typedef struct xl_logical_message
-{
-	Oid			dbId;			/* database Oid emitted from */
-	bool		transactional;	/* is message transactional? */
-	Size		prefix_size;	/* length of prefix */
-	Size		message_size;	/* size of the message */
-	/* payload, including null-terminated prefix of length prefix_size */
-	char		message[FLEXIBLE_ARRAY_MEMBER];
-} xl_logical_message;
-
-#define SizeOfLogicalMessage	(offsetof(xl_logical_message, message))
-
-extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message,
-									size_t size, bool transactional);
-
-/* RMGR API*/
-#define XLOG_LOGICAL_MESSAGE	0x00
-void		logicalmsg_redo(XLogReaderState *record);
-void		logicalmsg_desc(StringInfo buf, XLogReaderState *record);
-const char *logicalmsg_identify(uint8 info);
-
-#endif							/* PG_LOGICAL_MESSAGE_H */
-- 
2.17.1

