Logical insert/update/delete WAL records for custom table AMs
Attached is a WIP patch to add new WAL records to represent a logical
insert, update, or delete. These records do not do anything at REDO
time, they are only processed during logical decoding/replication.
These are intended to be used by a custom table AM, like my columnar
compression extension[0]https://github.com/citusdata/citus/tree/master/src/backend/columnar, which currently supports physical replication
but can't support logical decoding/replication because decoding is not
extensible. Using these new logical records would be redundant, making
inserts/updates/deletes less efficient, but at least logical decoding
would work (the lack of which is columnar's biggest weakness).
Alternatively, we could support extensible WAL with extensible
decoding. I also like this approach, but it takes more work for an AM
like columnar to get that right -- it needs to keep additional state in
the walsender to track and assemble the compressed columns stored
across many blocks. It also requires a lot of care, because mistakes
can get you into serious trouble.
This proposal, for new logical records without WAL extensibility,
provides a more shallow ramp to get a table AM working (including
logical replication/decoding) without the need to invest in the WAL
design. Later, of course I'd like the option for extensible WAL as well
(to be more efficient), but right now I'd prefer it just worked
(inefficiently).
The patch is still very rough, but I tried in simple insert cases in my
columnar[0]https://github.com/citusdata/citus/tree/master/src/backend/columnar extension (which only supports insert, not update/delete).
I'm looking for some review on the approach and structure before I
polish and test it. Note that my main test case is columnar, which
doesn't support update/delete. Also note that the patch is against v14
(for now).
Regards,
Jeff Davis
[0]: https://github.com/citusdata/citus/tree/master/src/backend/columnar
Attachments:
logical-xlog.difftext/x-patch; charset=UTF-8; name=logical-xlog.diffDownload
diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile
index f88d72fd862..ed6dff179be 100644
--- a/src/backend/access/rmgrdesc/Makefile
+++ b/src/backend/access/rmgrdesc/Makefile
@@ -18,7 +18,7 @@ OBJS = \
gistdesc.o \
hashdesc.o \
heapdesc.o \
- logicalmsgdesc.o \
+ logicaldesc.o \
mxactdesc.o \
nbtdesc.o \
relmapdesc.o \
diff --git a/src/backend/access/rmgrdesc/logicalmsgdesc.c b/src/backend/access/rmgrdesc/logicaldesc.c
similarity index 59%
rename from src/backend/access/rmgrdesc/logicalmsgdesc.c
rename to src/backend/access/rmgrdesc/logicaldesc.c
index d64ce2e7eff..079ab5a847e 100644
--- a/src/backend/access/rmgrdesc/logicalmsgdesc.c
+++ b/src/backend/access/rmgrdesc/logicaldesc.c
@@ -1,22 +1,22 @@
/*-------------------------------------------------------------------------
*
- * logicalmsgdesc.c
- * rmgr descriptor routines for replication/logical/message.c
+ * logicaldesc.c
+ * rmgr descriptor routines for replication/logical/logical_xlog.c
*
* Portions Copyright (c) 2015-2021, PostgreSQL Global Development Group
*
*
* IDENTIFICATION
- * src/backend/access/rmgrdesc/logicalmsgdesc.c
+ * src/backend/access/rmgrdesc/logicaldesc.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
-#include "replication/message.h"
+#include "replication/logical_xlog.h"
void
-logicalmsg_desc(StringInfo buf, XLogReaderState *record)
+logical_desc(StringInfo buf, XLogReaderState *record)
{
char *rec = XLogRecGetData(record);
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
@@ -40,13 +40,42 @@ logicalmsg_desc(StringInfo buf, XLogReaderState *record)
sep = " ";
}
}
+ else if (info == XLOG_LOGICAL_INSERT)
+ {
+
+ }
+ else if (info == XLOG_LOGICAL_UPDATE)
+ {
+
+ }
+ else if (info == XLOG_LOGICAL_DELETE)
+ {
+
+ }
+ else if (info == XLOG_LOGICAL_TRUNCATE)
+ {
+
+ }
}
const char *
-logicalmsg_identify(uint8 info)
+logical_identify(uint8 info)
{
- if ((info & ~XLR_INFO_MASK) == XLOG_LOGICAL_MESSAGE)
- return "MESSAGE";
+ switch (info)
+ {
+ case XLOG_LOGICAL_MESSAGE:
+ return "MESSAGE";
+ case XLOG_LOGICAL_INSERT:
+ return "INSERT";
+ case XLOG_LOGICAL_UPDATE:
+ return "UPDATE";
+ case XLOG_LOGICAL_DELETE:
+ return "DELETE";
+ case XLOG_LOGICAL_TRUNCATE:
+ return "TRUNCATE";
+ default:
+ return NULL;
+ }
return NULL;
}
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index 58091f6b520..f31f7187ac4 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -24,7 +24,7 @@
#include "commands/dbcommands_xlog.h"
#include "commands/sequence.h"
#include "commands/tablespace.h"
-#include "replication/message.h"
+#include "replication/logical_xlog.h"
#include "replication/origin.h"
#include "storage/standby.h"
#include "utils/relmapper.h"
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index c4e2fdeb719..9fa281a1dca 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -19,7 +19,7 @@ OBJS = \
launcher.o \
logical.o \
logicalfuncs.o \
- message.o \
+ logical_xlog.o \
origin.o \
proto.o \
relation.o \
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 453efc51e16..fb9be9c71ae 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -37,7 +37,7 @@
#include "catalog/pg_control.h"
#include "replication/decode.h"
#include "replication/logical.h"
-#include "replication/message.h"
+#include "replication/logical_xlog.h"
#include "replication/origin.h"
#include "replication/reorderbuffer.h"
#include "replication/snapbuild.h"
@@ -56,16 +56,19 @@ static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeLogicalOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
/* individual record(group)'s handlers */
-static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeHeapInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeLogicalMsg(LogicalDecodingContext *cxt, XLogRecordBuffer *buf);
+static void DecodeLogicalInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+
static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
xl_xact_parsed_commit *parsed, TransactionId xid,
bool two_phase);
@@ -154,8 +157,8 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
DecodeHeapOp(ctx, &buf);
break;
- case RM_LOGICALMSG_ID:
- DecodeLogicalMsgOp(ctx, &buf);
+ case RM_LOGICAL_ID:
+ DecodeLogicalOp(ctx, &buf);
break;
/*
@@ -518,7 +521,7 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
case XLOG_HEAP_INSERT:
if (SnapBuildProcessChange(builder, xid, buf->origptr))
- DecodeInsert(ctx, buf);
+ DecodeHeapInsert(ctx, buf);
break;
/*
@@ -616,35 +619,22 @@ FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
return filter_by_origin_cb_wrapper(ctx, origin_id);
}
-/*
- * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
- */
static void
-DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+DecodeLogicalMsg(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
- SnapBuild *builder = ctx->snapshot_builder;
XLogReaderState *r = buf->record;
+ SnapBuild *builder = ctx->snapshot_builder;
TransactionId xid = XLogRecGetXid(r);
- uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
RepOriginId origin_id = XLogRecGetOrigin(r);
Snapshot snapshot;
xl_logical_message *message;
- if (info != XLOG_LOGICAL_MESSAGE)
- elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
-
- ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
+ message = (xl_logical_message *) XLogRecGetData(r);
- /*
- * If we don't have snapshot or we are just fast-forwarding, there is no
- * point in decoding messages.
- */
- if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
- ctx->fast_forward)
+ if (message->transactional &&
+ !SnapBuildProcessChange(builder, xid, buf->origptr))
return;
- message = (xl_logical_message *) XLogRecGetData(r);
-
if (message->dbId != ctx->slot->data.database ||
FilterByOrigin(ctx, origin_id))
return;
@@ -664,6 +654,115 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
* prefix */
message->message_size,
message->message + message->prefix_size);
+
+}
+
+/*
+ * Parse XLOG_HEAP_INSERT (not MULTI_INSERT!) records into tuplebufs.
+ *
+ * Deletes can contain the new tuple.
+ */
+static void
+DecodeLogicalInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+ char *tupledata;
+ Size tuplelen;
+ XLogReaderState *r = buf->record;
+ xl_logical_insert *xlrec;
+ ReorderBufferChange *change;
+
+ xlrec = (xl_logical_insert *) XLogRecGetData(r);
+ tupledata = XLogRecGetData(r) + SizeOfLogicalInsert;
+
+ /*
+ * Ignore insert records without new tuples (this does happen when
+ * raw_heap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL).
+ */
+ if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
+ return;
+
+ /* only interested in our database */
+ if (xlrec->node.dbNode != ctx->slot->data.database)
+ return;
+
+ /* output plugin doesn't look for this origin, no need to queue */
+ if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+ return;
+
+ change = ReorderBufferGetChange(ctx->reorder);
+ if (!(xlrec->flags & XLH_INSERT_IS_SPECULATIVE))
+ change->action = REORDER_BUFFER_CHANGE_INSERT;
+ else
+ change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT;
+ change->origin_id = XLogRecGetOrigin(r);
+
+ memcpy(&change->data.tp.relnode, &xlrec->node, sizeof(RelFileNode));
+
+ tuplelen = xlrec->datalen - SizeOfHeapHeader;
+
+ change->data.tp.newtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
+
+ DecodeXLogTuple(tupledata, xlrec->datalen, change->data.tp.newtuple);
+
+ change->data.tp.clear_toast_afterwards = true;
+
+ ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
+ change,
+ xlrec->flags & XLH_INSERT_ON_TOAST_RELATION);
+}
+
+/*
+ * Handle rmgr LOGICAL_ID records for DecodeRecordIntoReorderBuffer().
+ */
+static void
+DecodeLogicalOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+ XLogReaderState *r = buf->record;
+ SnapBuild *builder = ctx->snapshot_builder;
+ TransactionId xid = XLogRecGetXid(r);
+ uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
+
+ ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
+
+ /*
+ * If we don't have snapshot or we are just fast-forwarding, there is no
+ * point in decoding data changes.
+ */
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+ ctx->fast_forward)
+ return;
+
+ switch (info)
+ {
+ case XLOG_LOGICAL_MESSAGE:
+ DecodeLogicalMsg(ctx, buf);
+ break;
+
+ case XLOG_LOGICAL_INSERT:
+ if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ DecodeLogicalInsert(ctx, buf);
+ break;
+
+ case XLOG_LOGICAL_UPDATE:
+ if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ DecodeUpdate(ctx, buf);
+ break;
+
+ case XLOG_LOGICAL_DELETE:
+ if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ DecodeDelete(ctx, buf);
+ break;
+
+ case XLOG_LOGICAL_TRUNCATE:
+ if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ DecodeTruncate(ctx, buf);
+ break;
+
+ default:
+ elog(ERROR, "unexpected RM_LOGICAL_ID record type: %u", info);
+ break;
+ }
}
/*
@@ -900,7 +999,7 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
* Deletes can contain the new tuple.
*/
static void
-DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+DecodeHeapInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
Size datalen;
char *tupledata;
diff --git a/src/backend/replication/logical/message.c b/src/backend/replication/logical/logical_xlog.c
similarity index 50%
rename from src/backend/replication/logical/message.c
rename to src/backend/replication/logical/logical_xlog.c
index 93bd372421a..36bbfc8f17e 100644
--- a/src/backend/replication/logical/message.c
+++ b/src/backend/replication/logical/logical_xlog.c
@@ -1,14 +1,14 @@
/*-------------------------------------------------------------------------
*
- * message.c
- * Generic logical messages.
+ * logical_xlog.c
+ * Logical xlog records.
*
* Copyright (c) 2013-2021, PostgreSQL Global Development Group
*
* IDENTIFICATION
- * src/backend/replication/logical/message.c
+ * src/backend/replication/logical/logical_xlog.c
*
- * NOTES
+ * Logical Messages
*
* Generic logical messages allow XLOG logging of arbitrary binary blobs that
* get passed to the logical decoding plugin. In normal XLOG processing they
@@ -26,16 +26,27 @@
* plugins. The plugin authors must take extra care to use unique prefix,
* good options seems to be for example to use the name of the extension.
*
+ * Logical Insert/Update/Delete/Truncate
+ *
+ * These records are intended to be used by non-heap table access methods that
+ * wish to support logical decoding and replication. They are treated
+ * similarly to the analogous heap records, but are not tied to physical pages
+ * or other details of the heap. These records are not processed during redo,
+ * so do not contribute to durability or physical replication; use generic WAL
+ * records for that. Note that using both logical WAL records and generic WAL
+ * records is redundant compared with the heap.
+ *
* ---------------------------------------------------------------------------
*/
#include "postgres.h"
+#include "access/heapam_xlog.h"
#include "access/xact.h"
#include "miscadmin.h"
#include "nodes/execnodes.h"
#include "replication/logical.h"
-#include "replication/message.h"
+#include "replication/logical_xlog.h"
#include "utils/memutils.h"
/*
@@ -70,19 +81,75 @@ LogLogicalMessage(const char *prefix, const char *message, size_t size,
/* allow origin filtering */
XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
- return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE);
+ return XLogInsert(RM_LOGICAL_ID, XLOG_LOGICAL_MESSAGE);
}
+XLogRecPtr
+LogLogicalInsert(Relation relation, TupleTableSlot *slot)
+{
+ bool shouldFree;
+ HeapTuple tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree);
+ xl_logical_insert xlrec;
+ xl_heap_header xlhdr;
+ XLogRecPtr recptr;
+
+ /* force xid to be allocated */
+ Assert(IsTransactionState());
+ GetCurrentTransactionId();
+
+ tuple->t_tableOid = slot->tts_tableOid;
+ ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
+
+ xlrec.node = relation->rd_node;
+ xlrec.datalen = tuple->t_len;
+ xlrec.offnum = ItemPointerGetOffsetNumber(&tuple->t_self);
+ xlrec.flags = 0;
+ xlrec.flags |= XLH_INSERT_CONTAINS_NEW_TUPLE;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfLogicalInsert);
+
+ xlhdr.t_infomask2 = tuple->t_data->t_infomask2;
+ xlhdr.t_infomask = tuple->t_data->t_infomask;
+ xlhdr.t_hoff = tuple->t_data->t_hoff;
+
+ XLogRegisterData((char *) &xlhdr, SizeOfHeapHeader);
+ /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
+ XLogRegisterData((char *) tuple->t_data + SizeofHeapTupleHeader,
+ tuple->t_len - SizeofHeapTupleHeader);
+
+
+ /* allow origin filtering */
+ XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
+ recptr = XLogInsert(RM_LOGICAL_ID, XLOG_LOGICAL_INSERT);
+
+ if (shouldFree)
+ pfree(tuple);
+
+ return recptr;
+}
+
+
/*
* Redo is basically just noop for logical decoding messages.
*/
void
-logicalmsg_redo(XLogReaderState *record)
+logical_redo(XLogReaderState *record)
{
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
- if (info != XLOG_LOGICAL_MESSAGE)
- elog(PANIC, "logicalmsg_redo: unknown op code %u", info);
+ switch (info)
+ {
+ case XLOG_LOGICAL_MESSAGE:
+ case XLOG_LOGICAL_INSERT:
+ case XLOG_LOGICAL_UPDATE:
+ case XLOG_LOGICAL_DELETE:
+ case XLOG_LOGICAL_TRUNCATE:
+ break;
+ default:
+ elog(PANIC, "logical_redo: unknown op code %u", info);
+ }
/* This is only interesting for logical decoding, see decode.c. */
}
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 1f38c5b33ea..b84e3971e5e 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -28,7 +28,7 @@
#include "nodes/makefuncs.h"
#include "replication/decode.h"
#include "replication/logical.h"
-#include "replication/message.h"
+#include "replication/logical_xlog.h"
#include "storage/fd.h"
#include "utils/array.h"
#include "utils/builtins.h"
diff --git a/src/bin/pg_waldump/.gitignore b/src/bin/pg_waldump/.gitignore
index 3be00a8b61f..567655fa626 100644
--- a/src/bin/pg_waldump/.gitignore
+++ b/src/bin/pg_waldump/.gitignore
@@ -10,7 +10,7 @@
/gistdesc.c
/hashdesc.c
/heapdesc.c
-/logicalmsgdesc.c
+/logicaldesc.c
/mxactdesc.c
/nbtdesc.c
/relmapdesc.c
diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c
index 852d8ca4b1c..c7db22e70f1 100644
--- a/src/bin/pg_waldump/rmgrdesc.c
+++ b/src/bin/pg_waldump/rmgrdesc.c
@@ -26,7 +26,7 @@
#include "commands/dbcommands_xlog.h"
#include "commands/sequence.h"
#include "commands/tablespace.h"
-#include "replication/message.h"
+#include "replication/logical_xlog.h"
#include "replication/origin.h"
#include "rmgrdesc.h"
#include "storage/standbydefs.h"
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index f582cf535f6..48cba42f561 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -46,4 +46,4 @@ PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, bri
PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL)
PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL)
PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask)
-PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL)
+PG_RMGR(RM_LOGICAL_ID, "Logical", logical_redo, logical_desc, logical_identify, NULL, NULL, NULL)
diff --git a/src/include/replication/message.h b/src/include/replication/logical_xlog.h
similarity index 54%
rename from src/include/replication/message.h
rename to src/include/replication/logical_xlog.h
index d3fb324c816..72c572eaae5 100644
--- a/src/include/replication/message.h
+++ b/src/include/replication/logical_xlog.h
@@ -1,10 +1,10 @@
/*-------------------------------------------------------------------------
- * message.h
- * Exports from replication/logical/message.c
+ * logical_xlog.h
+ * Exports from replication/logical/logical_xlog.c
*
* Copyright (c) 2013-2021, PostgreSQL Global Development Group
*
- * src/include/replication/message.h
+ * src/include/replication/logical_xlog.h
*-------------------------------------------------------------------------
*/
#ifndef PG_LOGICAL_MESSAGE_H
@@ -13,6 +13,7 @@
#include "access/xlog.h"
#include "access/xlogdefs.h"
#include "access/xlogreader.h"
+#include "storage/off.h"
/*
* Generic logical decoding message wal record.
@@ -29,13 +30,34 @@ typedef struct xl_logical_message
#define SizeOfLogicalMessage (offsetof(xl_logical_message, message))
+/* This is what we need to know about insert */
+typedef struct xl_logical_insert
+{
+ RelFileNode node;
+ OffsetNumber offnum; /* inserted tuple's offset */
+ Size datalen;
+ uint8 flags;
+
+ /* xl_heap_header & TUPLE DATA in backup block 0 */
+} xl_logical_insert;
+
+#define SizeOfLogicalInsert (offsetof(xl_logical_insert, flags) + sizeof(uint8))
+
+struct TupleTableSlot;
+
extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message,
size_t size, bool transactional);
+extern XLogRecPtr LogLogicalInsert(Relation relation, struct TupleTableSlot *slot);
/* RMGR API*/
#define XLOG_LOGICAL_MESSAGE 0x00
-void logicalmsg_redo(XLogReaderState *record);
-void logicalmsg_desc(StringInfo buf, XLogReaderState *record);
-const char *logicalmsg_identify(uint8 info);
+#define XLOG_LOGICAL_INSERT 0x10
+#define XLOG_LOGICAL_UPDATE 0x20
+#define XLOG_LOGICAL_DELETE 0x30
+#define XLOG_LOGICAL_TRUNCATE 0x40
+
+void logical_redo(XLogReaderState *record);
+void logical_desc(StringInfo buf, XLogReaderState *record);
+const char *logical_identify(uint8 info);
#endif /* PG_LOGICAL_MESSAGE_H */
On Sun, Oct 31, 2021 at 11:40 PM Jeff Davis <pgsql@j-davis.com> wrote:
Attached is a WIP patch to add new WAL records to represent a logical
insert, update, or delete. These records do not do anything at REDO
time, they are only processed during logical decoding/replication.These are intended to be used by a custom table AM, like my columnar
compression extension[0], which currently supports physical replication
but can't support logical decoding/replication because decoding is not
extensible. Using these new logical records would be redundant, making
inserts/updates/deletes less efficient, but at least logical decoding
would work (the lack of which is columnar's biggest weakness).Alternatively, we could support extensible WAL with extensible
decoding. I also like this approach, but it takes more work for an AM
like columnar to get that right -- it needs to keep additional state in
the walsender to track and assemble the compressed columns stored
across many blocks. It also requires a lot of care, because mistakes
can get you into serious trouble.This proposal, for new logical records without WAL extensibility,
provides a more shallow ramp to get a table AM working (including
logical replication/decoding) without the need to invest in the WAL
design. Later, of course I'd like the option for extensible WAL as well
(to be more efficient), but right now I'd prefer it just worked
(inefficiently).
You have modeled DecodeLogicalInsert based on current DecodeInsert and
it generates the same change message, so not sure how exactly these
new messages will be different from current heap_insert/update/delete
messages? Also, we have code to deal with different types of messages
at various places during decoding, so if they will be different then
we need to probably deal at those places as well.
--
With Regards,
Amit Kapila.
On Wed, 2021-11-03 at 11:25 +0530, Amit Kapila wrote:
You have modeled DecodeLogicalInsert based on current DecodeInsert
and
it generates the same change message, so not sure how exactly these
new messages will be different from current heap_insert/update/delete
messages?
Is there a reason you think the messages should be different for heap
versus another table AM? Isn't the table AM a physical implementation
detail?
Regards,
Jeff Davis
On Thu, Nov 4, 2021 at 7:09 AM Jeff Davis <pgsql@j-davis.com> wrote:
On Wed, 2021-11-03 at 11:25 +0530, Amit Kapila wrote:
You have modeled DecodeLogicalInsert based on current DecodeInsert
and
it generates the same change message, so not sure how exactly these
new messages will be different from current heap_insert/update/delete
messages?Is there a reason you think the messages should be different for heap
versus another table AM? Isn't the table AM a physical implementation
detail?
We have special handling for speculative insertions and toast
insertions. Can't different tableAM's have different representations
for toast or may be some different concept like speculative
insertions? Similarly, I remember that for zheap we didn't had
TransactionIds for subtransactions so we need to make some changes in
logical decoding to compensate for that. I guess similar stuff could
be required for another table AMs. Then a different table AM can have
a different tuple format which won't work for current change records
unless we convert it to heap tuple format before writing WAL for it.
--
With Regards,
Amit Kapila.
On Thu, 2021-11-04 at 14:33 +0530, Amit Kapila wrote:
Can't different tableAM's have different representations
for toast or may be some different concept like speculative
insertions?
The decoding plugin should just use the common interfaces to toast, and
if the table AM supports toast, everything should work fine. The only
special thing it needs to do is check VARATT_IS_EXTERNAL_ONDISK(),
because on-disk toast data can't be decoded (which is true for heap,
too).
I haven't looked as much into speculative insertions, but I don't think
those are a problem either. Shouldn't they be handled before they make
it into the change stream that the plugin sees?
Similarly, I remember that for zheap we didn't had
TransactionIds for subtransactions so we need to make some changes in
logical decoding to compensate for that. I guess similar stuff could
be required for another table AMs. Then a different table AM can have
a different tuple format which won't work for current change records
unless we convert it to heap tuple format before writing WAL for it.
Columnar certainly has a different format. That makes me wonder whether
ReorderBufferChange and/or ReorderBufferTupleBuf are too low-level, and
we should have a higher-level representation of a change that is based
on slots.
Can you tell me more about the changes you made for zheap? I still
don't understand why the decoding plugin would have to know what table
AM the change came from.
Regards,
Jeff Davis
On Fri, Nov 5, 2021 at 4:53 AM Jeff Davis <pgsql@j-davis.com> wrote:
On Thu, 2021-11-04 at 14:33 +0530, Amit Kapila wrote:
Can't different tableAM's have different representations
for toast or may be some different concept like speculative
insertions?The decoding plugin should just use the common interfaces to toast, and
if the table AM supports toast, everything should work fine.
I think that is true only if table AM uses same format as heap for
toast. It also seems to be relying heap tuple format.
The only
special thing it needs to do is check VARATT_IS_EXTERNAL_ONDISK(),
because on-disk toast data can't be decoded (which is true for heap,
too).I haven't looked as much into speculative insertions, but I don't think
those are a problem either. Shouldn't they be handled before they make
it into the change stream that the plugin sees?
They will be handled before the plugin sees them but I was talking
about what if the table AM has some other WAL similar to WAL of
speculative insertions that needs special handling.
Similarly, I remember that for zheap we didn't had
TransactionIds for subtransactions so we need to make some changes in
logical decoding to compensate for that. I guess similar stuff could
be required for another table AMs. Then a different table AM can have
a different tuple format which won't work for current change records
unless we convert it to heap tuple format before writing WAL for it.Columnar certainly has a different format. That makes me wonder whether
ReorderBufferChange and/or ReorderBufferTupleBuf are too low-level, and
we should have a higher-level representation of a change that is based
on slots.Can you tell me more about the changes you made for zheap? I still
don't understand why the decoding plugin would have to know what table
AM the change came from.
I am not talking about decoding plugin but rather decoding itself,
basically, the work we do in reorderbuffer.c, decode.c, etc. The two
things I remember were tuple format and transaction ids as mentioned
in my previous email. I think we should try to find a solution for
tuple format as the current decoding code relies on it if we want
decoding to deal with another table AMs transparently.
--
With Regards,
Amit Kapila.
On Fri, 2021-11-05 at 10:00 +0530, Amit Kapila wrote:
I am not talking about decoding plugin but rather decoding itself,
basically, the work we do in reorderbuffer.c, decode.c, etc. The two
things I remember were tuple format and transaction ids as mentioned
in my previous email.
If it's difficult to come up with something that will work for all
table AMs, then it suggests that we might want to go towards fully
extensible rmgr, and have a decoding method in the rmgr.
I started a thread (with a patch) here:
/messages/by-id/ed1fb2e22d15d3563ae0eb610f7b61bb15999c0a.camel@j-davis.com
I think we should try to find a solution for
tuple format as the current decoding code relies on it if we want
decoding to deal with another table AMs transparently.
Agreed, but it seems like we need basic extensibility first. For now,
we'll need to convert to a heap tuple, but later I'd like to support
other formats for the decoding plugin to work with.
Regards,
Jeff Davis
On Tue, Nov 9, 2021 at 5:12 AM Jeff Davis <pgsql@j-davis.com> wrote:
On Fri, 2021-11-05 at 10:00 +0530, Amit Kapila wrote:
I am not talking about decoding plugin but rather decoding itself,
basically, the work we do in reorderbuffer.c, decode.c, etc. The two
things I remember were tuple format and transaction ids as mentioned
in my previous email.If it's difficult to come up with something that will work for all
table AMs, then it suggests that we might want to go towards fully
extensible rmgr, and have a decoding method in the rmgr.I started a thread (with a patch) here:
/messages/by-id/ed1fb2e22d15d3563ae0eb610f7b61bb15999c0a.camel@j-davis.com
I think we should try to find a solution for
tuple format as the current decoding code relies on it if we want
decoding to deal with another table AMs transparently.Agreed, but it seems like we need basic extensibility first. For now,
we'll need to convert to a heap tuple,
Okay, but that might have a cost because we need to convert it before
WAL logging it, and then we probably also need to convert back to the
original table AM format during recovery/standby apply.
--
With Regards,
Amit Kapila.
On Wed, 10 Nov 2021 at 03:17, Amit Kapila <amit.kapila16@gmail.com> wrote:
On Tue, Nov 9, 2021 at 5:12 AM Jeff Davis <pgsql@j-davis.com> wrote:
On Fri, 2021-11-05 at 10:00 +0530, Amit Kapila wrote:
I am not talking about decoding plugin but rather decoding itself,
basically, the work we do in reorderbuffer.c, decode.c, etc. The two
things I remember were tuple format and transaction ids as mentioned
in my previous email.If it's difficult to come up with something that will work for all
table AMs, then it suggests that we might want to go towards fully
extensible rmgr, and have a decoding method in the rmgr.I started a thread (with a patch) here:
/messages/by-id/ed1fb2e22d15d3563ae0eb610f7b61bb15999c0a.camel@j-davis.com
I think we should try to find a solution for
tuple format as the current decoding code relies on it if we want
decoding to deal with another table AMs transparently.Agreed, but it seems like we need basic extensibility first. For now,
we'll need to convert to a heap tuple,Okay, but that might have a cost because we need to convert it before
WAL logging it, and then we probably also need to convert back to the
original table AM format during recovery/standby apply.
I spoke with Jeff in detail about this patch in NYC Dec 21, and I now
think it is worth pursuing. It seems much more likely that this would
be acceptable than fully extensible rmgr.
Amit asks a good question: should we be writing a message that seems
to presume the old heap tuple format? My answer is that we clearly
need it to be written in *some* common format, and the current heap
format currently works, so de facto it is the format we should use.
Right now, TAMs have to reformat back into this same format, so it is
the way the APIs work. Put it another way: I don't see any other
format that makes sense to use, now, but that could change in the
future.
So I'm signing up as a reviewer and we'll see if this is good to go.
--
Simon Riggs http://www.EnterpriseDB.com/
On Sun, 31 Oct 2021 at 18:10, Jeff Davis <pgsql@j-davis.com> wrote:
I'm looking for some review on the approach and structure before I
polish and test it.
Repurposing the logical msg rmgr into a general purpose logical rmgr
seems right.
Structure looks obvious, which is good.
Please pursue this and I will review with you as you go.
--
Simon Riggs http://www.EnterpriseDB.com/
On Wed, 2022-01-05 at 20:19 +0000, Simon Riggs wrote:
I spoke with Jeff in detail about this patch in NYC Dec 21, and I now
think it is worth pursuing. It seems much more likely that this would
be acceptable than fully extensible rmgr.
Thank you. I had some conversations with others who felt this approach
is wasteful, which it is. But if this patch still has potential, I'm
happy to pursue it along with the extensible rmgr approach.
So I'm signing up as a reviewer and we'll see if this is good to go.
Great, I attached a rebased version.
Regards,
Jeff Davis
Attachments:
logical-xlog-v2.difftext/x-patch; charset=UTF-8; name=logical-xlog-v2.diffDownload
diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile
index f88d72fd862..ed6dff179be 100644
--- a/src/backend/access/rmgrdesc/Makefile
+++ b/src/backend/access/rmgrdesc/Makefile
@@ -18,7 +18,7 @@ OBJS = \
gistdesc.o \
hashdesc.o \
heapdesc.o \
- logicalmsgdesc.o \
+ logicaldesc.o \
mxactdesc.o \
nbtdesc.o \
relmapdesc.o \
diff --git a/src/backend/access/rmgrdesc/logicalmsgdesc.c b/src/backend/access/rmgrdesc/logicaldesc.c
similarity index 59%
rename from src/backend/access/rmgrdesc/logicalmsgdesc.c
rename to src/backend/access/rmgrdesc/logicaldesc.c
index 099e11a84e7..c2b0434a606 100644
--- a/src/backend/access/rmgrdesc/logicalmsgdesc.c
+++ b/src/backend/access/rmgrdesc/logicaldesc.c
@@ -1,22 +1,22 @@
/*-------------------------------------------------------------------------
*
- * logicalmsgdesc.c
- * rmgr descriptor routines for replication/logical/message.c
+ * logicaldesc.c
+ * rmgr descriptor routines for replication/logical/logical_xlog.c
*
* Portions Copyright (c) 2015-2022, PostgreSQL Global Development Group
*
*
* IDENTIFICATION
- * src/backend/access/rmgrdesc/logicalmsgdesc.c
+ * src/backend/access/rmgrdesc/logicaldesc.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
-#include "replication/message.h"
+#include "replication/logical_xlog.h"
void
-logicalmsg_desc(StringInfo buf, XLogReaderState *record)
+logical_desc(StringInfo buf, XLogReaderState *record)
{
char *rec = XLogRecGetData(record);
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
@@ -40,13 +40,42 @@ logicalmsg_desc(StringInfo buf, XLogReaderState *record)
sep = " ";
}
}
+ else if (info == XLOG_LOGICAL_INSERT)
+ {
+
+ }
+ else if (info == XLOG_LOGICAL_UPDATE)
+ {
+
+ }
+ else if (info == XLOG_LOGICAL_DELETE)
+ {
+
+ }
+ else if (info == XLOG_LOGICAL_TRUNCATE)
+ {
+
+ }
}
const char *
-logicalmsg_identify(uint8 info)
+logical_identify(uint8 info)
{
- if ((info & ~XLR_INFO_MASK) == XLOG_LOGICAL_MESSAGE)
- return "MESSAGE";
+ switch (info)
+ {
+ case XLOG_LOGICAL_MESSAGE:
+ return "MESSAGE";
+ case XLOG_LOGICAL_INSERT:
+ return "INSERT";
+ case XLOG_LOGICAL_UPDATE:
+ return "UPDATE";
+ case XLOG_LOGICAL_DELETE:
+ return "DELETE";
+ case XLOG_LOGICAL_TRUNCATE:
+ return "TRUNCATE";
+ default:
+ return NULL;
+ }
return NULL;
}
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index 58091f6b520..f31f7187ac4 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -24,7 +24,7 @@
#include "commands/dbcommands_xlog.h"
#include "commands/sequence.h"
#include "commands/tablespace.h"
-#include "replication/message.h"
+#include "replication/logical_xlog.h"
#include "replication/origin.h"
#include "storage/standby.h"
#include "utils/relmapper.h"
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index c4e2fdeb719..9fa281a1dca 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -19,7 +19,7 @@ OBJS = \
launcher.o \
logical.o \
logicalfuncs.o \
- message.o \
+ logical_xlog.o \
origin.o \
proto.o \
relation.o \
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 1d22208c1ad..1abccd7190b 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -37,7 +37,7 @@
#include "catalog/pg_control.h"
#include "replication/decode.h"
#include "replication/logical.h"
-#include "replication/message.h"
+#include "replication/logical_xlog.h"
#include "replication/origin.h"
#include "replication/reorderbuffer.h"
#include "replication/snapbuild.h"
@@ -56,16 +56,19 @@ static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeLogicalOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
/* individual record(group)'s handlers */
-static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeHeapInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeLogicalMsg(LogicalDecodingContext *cxt, XLogRecordBuffer *buf);
+static void DecodeLogicalInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+
static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
xl_xact_parsed_commit *parsed, TransactionId xid,
bool two_phase);
@@ -154,8 +157,8 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
DecodeHeapOp(ctx, &buf);
break;
- case RM_LOGICALMSG_ID:
- DecodeLogicalMsgOp(ctx, &buf);
+ case RM_LOGICAL_ID:
+ DecodeLogicalOp(ctx, &buf);
break;
/*
@@ -518,7 +521,7 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
case XLOG_HEAP_INSERT:
if (SnapBuildProcessChange(builder, xid, buf->origptr))
- DecodeInsert(ctx, buf);
+ DecodeHeapInsert(ctx, buf);
break;
/*
@@ -616,35 +619,22 @@ FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
return filter_by_origin_cb_wrapper(ctx, origin_id);
}
-/*
- * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
- */
static void
-DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+DecodeLogicalMsg(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
- SnapBuild *builder = ctx->snapshot_builder;
XLogReaderState *r = buf->record;
+ SnapBuild *builder = ctx->snapshot_builder;
TransactionId xid = XLogRecGetXid(r);
- uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
RepOriginId origin_id = XLogRecGetOrigin(r);
Snapshot snapshot;
xl_logical_message *message;
- if (info != XLOG_LOGICAL_MESSAGE)
- elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
-
- ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
+ message = (xl_logical_message *) XLogRecGetData(r);
- /*
- * If we don't have snapshot or we are just fast-forwarding, there is no
- * point in decoding messages.
- */
- if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
- ctx->fast_forward)
+ if (message->transactional &&
+ !SnapBuildProcessChange(builder, xid, buf->origptr))
return;
- message = (xl_logical_message *) XLogRecGetData(r);
-
if (message->dbId != ctx->slot->data.database ||
FilterByOrigin(ctx, origin_id))
return;
@@ -664,6 +654,115 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
* prefix */
message->message_size,
message->message + message->prefix_size);
+
+}
+
+/*
+ * Parse XLOG_HEAP_INSERT (not MULTI_INSERT!) records into tuplebufs.
+ *
+ * Deletes can contain the new tuple.
+ */
+static void
+DecodeLogicalInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+ char *tupledata;
+ Size tuplelen;
+ XLogReaderState *r = buf->record;
+ xl_logical_insert *xlrec;
+ ReorderBufferChange *change;
+
+ xlrec = (xl_logical_insert *) XLogRecGetData(r);
+ tupledata = XLogRecGetData(r) + SizeOfLogicalInsert;
+
+ /*
+ * Ignore insert records without new tuples (this does happen when
+ * raw_heap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL).
+ */
+ if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
+ return;
+
+ /* only interested in our database */
+ if (xlrec->node.dbNode != ctx->slot->data.database)
+ return;
+
+ /* output plugin doesn't look for this origin, no need to queue */
+ if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+ return;
+
+ change = ReorderBufferGetChange(ctx->reorder);
+ if (!(xlrec->flags & XLH_INSERT_IS_SPECULATIVE))
+ change->action = REORDER_BUFFER_CHANGE_INSERT;
+ else
+ change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT;
+ change->origin_id = XLogRecGetOrigin(r);
+
+ memcpy(&change->data.tp.relnode, &xlrec->node, sizeof(RelFileNode));
+
+ tuplelen = xlrec->datalen - SizeOfHeapHeader;
+
+ change->data.tp.newtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
+
+ DecodeXLogTuple(tupledata, xlrec->datalen, change->data.tp.newtuple);
+
+ change->data.tp.clear_toast_afterwards = true;
+
+ ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
+ change,
+ xlrec->flags & XLH_INSERT_ON_TOAST_RELATION);
+}
+
+/*
+ * Handle rmgr LOGICAL_ID records for DecodeRecordIntoReorderBuffer().
+ */
+static void
+DecodeLogicalOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+ XLogReaderState *r = buf->record;
+ SnapBuild *builder = ctx->snapshot_builder;
+ TransactionId xid = XLogRecGetXid(r);
+ uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
+
+ ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
+
+ /*
+ * If we don't have snapshot or we are just fast-forwarding, there is no
+ * point in decoding data changes.
+ */
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+ ctx->fast_forward)
+ return;
+
+ switch (info)
+ {
+ case XLOG_LOGICAL_MESSAGE:
+ DecodeLogicalMsg(ctx, buf);
+ break;
+
+ case XLOG_LOGICAL_INSERT:
+ if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ DecodeLogicalInsert(ctx, buf);
+ break;
+
+ case XLOG_LOGICAL_UPDATE:
+ if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ DecodeUpdate(ctx, buf);
+ break;
+
+ case XLOG_LOGICAL_DELETE:
+ if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ DecodeDelete(ctx, buf);
+ break;
+
+ case XLOG_LOGICAL_TRUNCATE:
+ if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ DecodeTruncate(ctx, buf);
+ break;
+
+ default:
+ elog(ERROR, "unexpected RM_LOGICAL_ID record type: %u", info);
+ break;
+ }
}
/*
@@ -900,7 +999,7 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
* Deletes can contain the new tuple.
*/
static void
-DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+DecodeHeapInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
Size datalen;
char *tupledata;
diff --git a/src/backend/replication/logical/message.c b/src/backend/replication/logical/logical_xlog.c
similarity index 50%
rename from src/backend/replication/logical/message.c
rename to src/backend/replication/logical/logical_xlog.c
index b02363f0bda..efec90b3619 100644
--- a/src/backend/replication/logical/message.c
+++ b/src/backend/replication/logical/logical_xlog.c
@@ -1,14 +1,14 @@
/*-------------------------------------------------------------------------
*
- * message.c
- * Generic logical messages.
+ * logical_xlog.c
+ * Logical xlog records.
*
* Copyright (c) 2013-2022, PostgreSQL Global Development Group
*
* IDENTIFICATION
- * src/backend/replication/logical/message.c
+ * src/backend/replication/logical/logical_xlog.c
*
- * NOTES
+ * Logical Messages
*
* Generic logical messages allow XLOG logging of arbitrary binary blobs that
* get passed to the logical decoding plugin. In normal XLOG processing they
@@ -26,16 +26,27 @@
* plugins. The plugin authors must take extra care to use unique prefix,
* good options seems to be for example to use the name of the extension.
*
+ * Logical Insert/Update/Delete/Truncate
+ *
+ * These records are intended to be used by non-heap table access methods that
+ * wish to support logical decoding and replication. They are treated
+ * similarly to the analogous heap records, but are not tied to physical pages
+ * or other details of the heap. These records are not processed during redo,
+ * so do not contribute to durability or physical replication; use generic WAL
+ * records for that. Note that using both logical WAL records and generic WAL
+ * records is redundant compared with the heap.
+ *
* ---------------------------------------------------------------------------
*/
#include "postgres.h"
+#include "access/heapam_xlog.h"
#include "access/xact.h"
#include "miscadmin.h"
#include "nodes/execnodes.h"
#include "replication/logical.h"
-#include "replication/message.h"
+#include "replication/logical_xlog.h"
#include "utils/memutils.h"
/*
@@ -70,19 +81,75 @@ LogLogicalMessage(const char *prefix, const char *message, size_t size,
/* allow origin filtering */
XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
- return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE);
+ return XLogInsert(RM_LOGICAL_ID, XLOG_LOGICAL_MESSAGE);
}
+XLogRecPtr
+LogLogicalInsert(Relation relation, TupleTableSlot *slot)
+{
+ bool shouldFree;
+ HeapTuple tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree);
+ xl_logical_insert xlrec;
+ xl_heap_header xlhdr;
+ XLogRecPtr recptr;
+
+ /* force xid to be allocated */
+ Assert(IsTransactionState());
+ GetCurrentTransactionId();
+
+ tuple->t_tableOid = slot->tts_tableOid;
+ ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
+
+ xlrec.node = relation->rd_node;
+ xlrec.datalen = tuple->t_len;
+ xlrec.offnum = ItemPointerGetOffsetNumber(&tuple->t_self);
+ xlrec.flags = 0;
+ xlrec.flags |= XLH_INSERT_CONTAINS_NEW_TUPLE;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfLogicalInsert);
+
+ xlhdr.t_infomask2 = tuple->t_data->t_infomask2;
+ xlhdr.t_infomask = tuple->t_data->t_infomask;
+ xlhdr.t_hoff = tuple->t_data->t_hoff;
+
+ XLogRegisterData((char *) &xlhdr, SizeOfHeapHeader);
+ /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
+ XLogRegisterData((char *) tuple->t_data + SizeofHeapTupleHeader,
+ tuple->t_len - SizeofHeapTupleHeader);
+
+
+ /* allow origin filtering */
+ XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
+ recptr = XLogInsert(RM_LOGICAL_ID, XLOG_LOGICAL_INSERT);
+
+ if (shouldFree)
+ pfree(tuple);
+
+ return recptr;
+}
+
+
/*
* Redo is basically just noop for logical decoding messages.
*/
void
-logicalmsg_redo(XLogReaderState *record)
+logical_redo(XLogReaderState *record)
{
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
- if (info != XLOG_LOGICAL_MESSAGE)
- elog(PANIC, "logicalmsg_redo: unknown op code %u", info);
+ switch (info)
+ {
+ case XLOG_LOGICAL_MESSAGE:
+ case XLOG_LOGICAL_INSERT:
+ case XLOG_LOGICAL_UPDATE:
+ case XLOG_LOGICAL_DELETE:
+ case XLOG_LOGICAL_TRUNCATE:
+ break;
+ default:
+ elog(PANIC, "logical_redo: unknown op code %u", info);
+ }
/* This is only interesting for logical decoding, see decode.c. */
}
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 4f633888b4f..edb06b86d2d 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -28,7 +28,7 @@
#include "nodes/makefuncs.h"
#include "replication/decode.h"
#include "replication/logical.h"
-#include "replication/message.h"
+#include "replication/logical_xlog.h"
#include "storage/fd.h"
#include "utils/array.h"
#include "utils/builtins.h"
diff --git a/src/bin/pg_waldump/.gitignore b/src/bin/pg_waldump/.gitignore
index 3be00a8b61f..567655fa626 100644
--- a/src/bin/pg_waldump/.gitignore
+++ b/src/bin/pg_waldump/.gitignore
@@ -10,7 +10,7 @@
/gistdesc.c
/hashdesc.c
/heapdesc.c
-/logicalmsgdesc.c
+/logicaldesc.c
/mxactdesc.c
/nbtdesc.c
/relmapdesc.c
diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c
index 852d8ca4b1c..c7db22e70f1 100644
--- a/src/bin/pg_waldump/rmgrdesc.c
+++ b/src/bin/pg_waldump/rmgrdesc.c
@@ -26,7 +26,7 @@
#include "commands/dbcommands_xlog.h"
#include "commands/sequence.h"
#include "commands/tablespace.h"
-#include "replication/message.h"
+#include "replication/logical_xlog.h"
#include "replication/origin.h"
#include "rmgrdesc.h"
#include "storage/standbydefs.h"
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index ed751aaf039..5a55b18810e 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -46,4 +46,4 @@ PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, bri
PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL)
PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL)
PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask)
-PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL)
+PG_RMGR(RM_LOGICAL_ID, "Logical", logical_redo, logical_desc, logical_identify, NULL, NULL, NULL)
diff --git a/src/include/replication/message.h b/src/include/replication/logical_xlog.h
similarity index 54%
rename from src/include/replication/message.h
rename to src/include/replication/logical_xlog.h
index 7d7785292f1..bf20a740326 100644
--- a/src/include/replication/message.h
+++ b/src/include/replication/logical_xlog.h
@@ -1,10 +1,10 @@
/*-------------------------------------------------------------------------
- * message.h
- * Exports from replication/logical/message.c
+ * logical_xlog.h
+ * Exports from replication/logical/logical_xlog.c
*
* Copyright (c) 2013-2022, PostgreSQL Global Development Group
*
- * src/include/replication/message.h
+ * src/include/replication/logical_xlog.h
*-------------------------------------------------------------------------
*/
#ifndef PG_LOGICAL_MESSAGE_H
@@ -13,6 +13,7 @@
#include "access/xlog.h"
#include "access/xlogdefs.h"
#include "access/xlogreader.h"
+#include "storage/off.h"
/*
* Generic logical decoding message wal record.
@@ -29,13 +30,34 @@ typedef struct xl_logical_message
#define SizeOfLogicalMessage (offsetof(xl_logical_message, message))
+/* This is what we need to know about insert */
+typedef struct xl_logical_insert
+{
+ RelFileNode node;
+ OffsetNumber offnum; /* inserted tuple's offset */
+ Size datalen;
+ uint8 flags;
+
+ /* xl_heap_header & TUPLE DATA in backup block 0 */
+} xl_logical_insert;
+
+#define SizeOfLogicalInsert (offsetof(xl_logical_insert, flags) + sizeof(uint8))
+
+struct TupleTableSlot;
+
extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message,
size_t size, bool transactional);
+extern XLogRecPtr LogLogicalInsert(Relation relation, struct TupleTableSlot *slot);
/* RMGR API*/
#define XLOG_LOGICAL_MESSAGE 0x00
-void logicalmsg_redo(XLogReaderState *record);
-void logicalmsg_desc(StringInfo buf, XLogReaderState *record);
-const char *logicalmsg_identify(uint8 info);
+#define XLOG_LOGICAL_INSERT 0x10
+#define XLOG_LOGICAL_UPDATE 0x20
+#define XLOG_LOGICAL_DELETE 0x30
+#define XLOG_LOGICAL_TRUNCATE 0x40
+
+void logical_redo(XLogReaderState *record);
+void logical_desc(StringInfo buf, XLogReaderState *record);
+const char *logical_identify(uint8 info);
#endif /* PG_LOGICAL_MESSAGE_H */
On Mon, 17 Jan 2022 at 09:05, Jeff Davis <pgsql@j-davis.com> wrote:
On Wed, 2022-01-05 at 20:19 +0000, Simon Riggs wrote:
I spoke with Jeff in detail about this patch in NYC Dec 21, and I now
think it is worth pursuing. It seems much more likely that this would
be acceptable than fully extensible rmgr.Thank you. I had some conversations with others who felt this approach
is wasteful, which it is. But if this patch still has potential, I'm
happy to pursue it along with the extensible rmgr approach.
It's self-contained and generally useful for a range of applications,
so the code would be long-lived.
Calling it wasteful presumes the way you'd use it. If you make logical
log entries you don't need to make physical ones, so its actually the
physical log entries that would be wasteful.
Logical log entries don't need to be decoded, so it's actually more
efficient, plus we could skip index entries altogether.
I would argue that this is the way we should be doing WAL, with
occasional divergence to physical records for performance, rather than
the other way around.
So I'm signing up as a reviewer and we'll see if this is good to go.
Great, I attached a rebased version.
The approach is perfectly fine, it just needs to be finished and rebased.
--
Simon Riggs http://www.EnterpriseDB.com/
Hi,
On 2022-01-17 01:05:14 -0800, Jeff Davis wrote:
Great, I attached a rebased version.
Currently this doesn't apply: http://cfbot.cputube.org/patch_37_3394.log
- Andres
On Thu, 2022-02-24 at 20:35 +0000, Simon Riggs wrote:
The approach is perfectly fine, it just needs to be finished and
rebased.
Attached a new version. The scope expanded, so this is likely to slip
v15 at this late time. For 15, I'll focus on my extensible rmgr work,
which can serve similar purposes.
The main purpose of this patch is to be able to emit logical events for
a table (insert/update/delete/truncate) without actually modifying a
table or relying on the heap at all. That allows table AMs to support
logical decoding/replication, and perhaps allows a few other
interesting use cases (maybe foreign tables?). There are really two
advantages of this approach over relying on a custom rmgr:
1. Easier for extension authors
2. Doesn't require an extension module to be loaded to start the
server
Those are very nice advantages, but they come at the price of
flexibility and performance. I think there's room for both, and we can
discuss the merits individually.
Changes:
* Support logical messages for INSERT/UPDATE/DELETE/TRUNCATE
(before it only supported INSERT)
* SQL functions pg_logical_emit_insert/update/delete/truncate
(callable by superuser)
* Tests (using test_decoding)
* Use replica identities properly
* In general a lot of cleanup, but still not quite ready
TODO:
* Should SQL functions be callable by the table owner? I would lean
toward superuser-only, because logical replication is used for
administrative purposes like upgrades, and I don't think we want table
owners to be able to create inconsistencies.
* Support for multi-insert
* Docs for SQL functions, and maybe docs in the section on Generic
WAL
* Try to get away from reliance on heap tuples specifically
Regards,
Jeff Davis
Attachments:
v3-0001-Logical-wal.patchtext/x-patch; charset=UTF-8; name=v3-0001-Logical-wal.patchDownload
From f7b85aea60b06eb7019befec38566b5e014bea12 Mon Sep 17 00:00:00 2001
From: Jeff Davis <jeff@j-davis.com>
Date: Sat, 30 Oct 2021 12:07:35 -0700
Subject: [PATCH] Logical wal.
---
contrib/test_decoding/expected/messages.out | 148 +++++++
contrib/test_decoding/sql/messages.sql | 58 +++
src/backend/access/heap/heapam.c | 4 +-
src/backend/access/rmgrdesc/Makefile | 2 +-
.../{logicalmsgdesc.c => logicaldesc.c} | 45 +-
src/backend/access/transam/rmgr.c | 2 +-
src/backend/replication/logical/Makefile | 2 +-
src/backend/replication/logical/decode.c | 275 ++++++++++--
.../replication/logical/logical_xlog.c | 399 ++++++++++++++++++
.../replication/logical/logicalfuncs.c | 165 +++++++-
src/backend/replication/logical/message.c | 89 ----
src/bin/pg_waldump/.gitignore | 2 +-
src/bin/pg_waldump/rmgrdesc.c | 2 +-
src/include/access/heapam.h | 2 +
src/include/access/heapam_xlog.h | 3 +
src/include/access/rmgrlist.h | 2 +-
src/include/catalog/pg_proc.dat | 17 +
src/include/replication/decode.h | 2 +-
src/include/replication/logical_xlog.h | 124 ++++++
src/include/replication/message.h | 41 --
20 files changed, 1211 insertions(+), 173 deletions(-)
rename src/backend/access/rmgrdesc/{logicalmsgdesc.c => logicaldesc.c} (59%)
create mode 100644 src/backend/replication/logical/logical_xlog.c
delete mode 100644 src/backend/replication/logical/message.c
create mode 100644 src/include/replication/logical_xlog.h
delete mode 100644 src/include/replication/message.h
diff --git a/contrib/test_decoding/expected/messages.out b/contrib/test_decoding/expected/messages.out
index c75d40190b6..aa284bc37c2 100644
--- a/contrib/test_decoding/expected/messages.out
+++ b/contrib/test_decoding/expected/messages.out
@@ -91,6 +91,154 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'for
------
(0 rows)
+-- no data in this table, but emit logical INSERT/UPDATE/DELETE for it
+CREATE TABLE dummy(i int, t text, n numeric, primary key(t));
+SELECT pg_logical_emit_insert('dummy', row(1, 'one', 0.1)::dummy) <> '0/0'::pg_lsn;
+ ?column?
+----------
+ t
+(1 row)
+
+SELECT pg_logical_emit_insert('dummy', row(2, 'two', 0.2)::dummy) <> '0/0'::pg_lsn;
+ ?column?
+----------
+ t
+(1 row)
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
+ data
+-----------------------------------------------------------------------
+ BEGIN
+ table public.dummy: INSERT: i[integer]:1 t[text]:'one' n[numeric]:0.1
+ COMMIT
+ BEGIN
+ table public.dummy: INSERT: i[integer]:2 t[text]:'two' n[numeric]:0.2
+ COMMIT
+(6 rows)
+
+SELECT * FROM dummy;
+ i | t | n
+---+---+---
+(0 rows)
+
+SELECT pg_logical_emit_delete('dummy', row(12, 'twelve', 0.12)::dummy) <> '0/0'::pg_lsn;
+ ?column?
+----------
+ t
+(1 row)
+
+SELECT pg_logical_emit_update('dummy', row(15, 'fifteen', 0.15)::dummy,
+ row(16, 'sixteen', 0.16)::dummy) <> '0/0'::pg_lsn;
+ ?column?
+----------
+ t
+(1 row)
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
+ data
+-------------------------------------------------------------------------------------------------------------------
+ BEGIN
+ table public.dummy: DELETE: t[text]:'twelve'
+ COMMIT
+ BEGIN
+ table public.dummy: UPDATE: old-key: t[text]:'fifteen' new-tuple: i[integer]:16 t[text]:'sixteen' n[numeric]:0.16
+ COMMIT
+(6 rows)
+
+ALTER TABLE dummy REPLICA IDENTITY NOTHING;
+SELECT pg_logical_emit_delete('dummy', row(12, 'twelve', 0.12)::dummy) <> '0/0'::pg_lsn;
+ ?column?
+----------
+ t
+(1 row)
+
+SELECT pg_logical_emit_update('dummy', row(15, 'fifteen', 0.15)::dummy,
+ row(16, 'sixteen', 0.16)::dummy) <> '0/0'::pg_lsn;
+ ?column?
+----------
+ t
+(1 row)
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
+ data
+-----------------------------------------------------------------------------
+ BEGIN
+ table public.dummy: DELETE: (no-tuple-data)
+ COMMIT
+ BEGIN
+ table public.dummy: UPDATE: i[integer]:16 t[text]:'sixteen' n[numeric]:0.16
+ COMMIT
+(6 rows)
+
+SELECT pg_logical_emit_truncate(ARRAY['dummy'], true, false) <> '0/0'::pg_lsn;
+ ?column?
+----------
+ t
+(1 row)
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
+ data
+---------------------------------------
+ BEGIN
+ table public.dummy: TRUNCATE: cascade
+ COMMIT
+(3 rows)
+
+CREATE UNLOGGED TABLE dummy_u(i int, t text, n numeric, primary key (t));
+-- return invalid
+SELECT pg_logical_emit_insert('dummy_u', row(7, 'seven', 0.7)::dummy_u);
+ pg_logical_emit_insert
+------------------------
+ 0/0
+(1 row)
+
+-- return invalid
+SELECT pg_logical_emit_update('dummy_u', row(7, 'seven', 0.7)::dummy_u,
+ row(11, 'eleven', 0.11)::dummy_u);
+ pg_logical_emit_update
+------------------------
+ 0/0
+(1 row)
+
+-- return invalid
+SELECT pg_logical_emit_update('dummy_u', row(NULL, 'seven', 0.7)::dummy_u,
+ row(11, 'eleven', 0.11)::dummy_u);
+ pg_logical_emit_update
+------------------------
+ 0/0
+(1 row)
+
+-- error
+SELECT pg_logical_emit_update('dummy_u', row(7, NULL, 0.7)::dummy_u,
+ row(11, 'eleven', 0.11)::dummy_u);
+ERROR: replica identity column is NULL
+-- return invalid
+SELECT pg_logical_emit_delete('dummy_u', row(7, 'seven', 0.7)::dummy_u);
+ pg_logical_emit_delete
+------------------------
+ 0/0
+(1 row)
+
+-- error
+SELECT pg_logical_emit_delete('dummy_u', row(7, NULL, 0.7)::dummy_u);
+ERROR: replica identity column is NULL
+-- return invalid
+SELECT pg_logical_emit_truncate(ARRAY['dummy_u'], false, false);
+ pg_logical_emit_truncate
+--------------------------
+ 0/0
+(1 row)
+
+SELECT pg_logical_emit_truncate(ARRAY['dummy','dummy_u'], false, true) <> '0/0'::pg_lsn;
+ ?column?
+----------
+ t
+(1 row)
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
+ERROR: could not open relation with OID 1663
+DROP TABLE dummy;
+DROP TABLE dummy_u;
SELECT 'cleanup' FROM pg_drop_replication_slot('regression_slot');
?column?
----------
diff --git a/contrib/test_decoding/sql/messages.sql b/contrib/test_decoding/sql/messages.sql
index cf3f7738e57..c04a2e1a0c8 100644
--- a/contrib/test_decoding/sql/messages.sql
+++ b/contrib/test_decoding/sql/messages.sql
@@ -31,4 +31,62 @@ SELECT 'otherdb2' FROM pg_logical_emit_message(true, 'test', 'otherdb2');
\c :prevdb
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
+-- no data in this table, but emit logical INSERT/UPDATE/DELETE for it
+CREATE TABLE dummy(i int, t text, n numeric, primary key(t));
+
+SELECT pg_logical_emit_insert('dummy', row(1, 'one', 0.1)::dummy) <> '0/0'::pg_lsn;
+SELECT pg_logical_emit_insert('dummy', row(2, 'two', 0.2)::dummy) <> '0/0'::pg_lsn;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
+
+SELECT * FROM dummy;
+
+SELECT pg_logical_emit_delete('dummy', row(12, 'twelve', 0.12)::dummy) <> '0/0'::pg_lsn;
+
+SELECT pg_logical_emit_update('dummy', row(15, 'fifteen', 0.15)::dummy,
+ row(16, 'sixteen', 0.16)::dummy) <> '0/0'::pg_lsn;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
+
+ALTER TABLE dummy REPLICA IDENTITY NOTHING;
+
+SELECT pg_logical_emit_delete('dummy', row(12, 'twelve', 0.12)::dummy) <> '0/0'::pg_lsn;
+
+SELECT pg_logical_emit_update('dummy', row(15, 'fifteen', 0.15)::dummy,
+ row(16, 'sixteen', 0.16)::dummy) <> '0/0'::pg_lsn;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
+
+SELECT pg_logical_emit_truncate(ARRAY['dummy'], true, false) <> '0/0'::pg_lsn;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
+
+CREATE UNLOGGED TABLE dummy_u(i int, t text, n numeric, primary key (t));
+-- return invalid
+SELECT pg_logical_emit_insert('dummy_u', row(7, 'seven', 0.7)::dummy_u);
+-- return invalid
+SELECT pg_logical_emit_update('dummy_u', row(7, 'seven', 0.7)::dummy_u,
+ row(11, 'eleven', 0.11)::dummy_u);
+-- return invalid
+SELECT pg_logical_emit_update('dummy_u', row(NULL, 'seven', 0.7)::dummy_u,
+ row(11, 'eleven', 0.11)::dummy_u);
+-- error
+SELECT pg_logical_emit_update('dummy_u', row(7, NULL, 0.7)::dummy_u,
+ row(11, 'eleven', 0.11)::dummy_u);
+-- return invalid
+SELECT pg_logical_emit_delete('dummy_u', row(7, 'seven', 0.7)::dummy_u);
+-- error
+SELECT pg_logical_emit_delete('dummy_u', row(7, NULL, 0.7)::dummy_u);
+
+-- return invalid
+SELECT pg_logical_emit_truncate(ARRAY['dummy_u'], false, false);
+
+SELECT pg_logical_emit_truncate(ARRAY['dummy','dummy_u'], false, true) <> '0/0'::pg_lsn;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
+
+DROP TABLE dummy;
+
+DROP TABLE dummy_u;
+
SELECT 'cleanup' FROM pg_drop_replication_slot('regression_slot');
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 74ad445e59b..9d70598dbde 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -108,8 +108,6 @@ static bool ConditionalMultiXactIdWait(MultiXactId multi, MultiXactStatus status
static void index_delete_sort(TM_IndexDeleteOp *delstate);
static int bottomup_sort_and_shrink(TM_IndexDeleteOp *delstate);
static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup);
-static HeapTuple ExtractReplicaIdentity(Relation rel, HeapTuple tup, bool key_required,
- bool *copy);
/*
@@ -8372,7 +8370,7 @@ log_heap_new_cid(Relation relation, HeapTuple tup)
* *copy is set to true if the returned tuple is a modified copy rather than
* the same tuple that was passed in.
*/
-static HeapTuple
+HeapTuple
ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
bool *copy)
{
diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile
index f88d72fd862..ed6dff179be 100644
--- a/src/backend/access/rmgrdesc/Makefile
+++ b/src/backend/access/rmgrdesc/Makefile
@@ -18,7 +18,7 @@ OBJS = \
gistdesc.o \
hashdesc.o \
heapdesc.o \
- logicalmsgdesc.o \
+ logicaldesc.o \
mxactdesc.o \
nbtdesc.o \
relmapdesc.o \
diff --git a/src/backend/access/rmgrdesc/logicalmsgdesc.c b/src/backend/access/rmgrdesc/logicaldesc.c
similarity index 59%
rename from src/backend/access/rmgrdesc/logicalmsgdesc.c
rename to src/backend/access/rmgrdesc/logicaldesc.c
index 099e11a84e7..c2b0434a606 100644
--- a/src/backend/access/rmgrdesc/logicalmsgdesc.c
+++ b/src/backend/access/rmgrdesc/logicaldesc.c
@@ -1,22 +1,22 @@
/*-------------------------------------------------------------------------
*
- * logicalmsgdesc.c
- * rmgr descriptor routines for replication/logical/message.c
+ * logicaldesc.c
+ * rmgr descriptor routines for replication/logical/logical_xlog.c
*
* Portions Copyright (c) 2015-2022, PostgreSQL Global Development Group
*
*
* IDENTIFICATION
- * src/backend/access/rmgrdesc/logicalmsgdesc.c
+ * src/backend/access/rmgrdesc/logicaldesc.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
-#include "replication/message.h"
+#include "replication/logical_xlog.h"
void
-logicalmsg_desc(StringInfo buf, XLogReaderState *record)
+logical_desc(StringInfo buf, XLogReaderState *record)
{
char *rec = XLogRecGetData(record);
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
@@ -40,13 +40,42 @@ logicalmsg_desc(StringInfo buf, XLogReaderState *record)
sep = " ";
}
}
+ else if (info == XLOG_LOGICAL_INSERT)
+ {
+
+ }
+ else if (info == XLOG_LOGICAL_UPDATE)
+ {
+
+ }
+ else if (info == XLOG_LOGICAL_DELETE)
+ {
+
+ }
+ else if (info == XLOG_LOGICAL_TRUNCATE)
+ {
+
+ }
}
const char *
-logicalmsg_identify(uint8 info)
+logical_identify(uint8 info)
{
- if ((info & ~XLR_INFO_MASK) == XLOG_LOGICAL_MESSAGE)
- return "MESSAGE";
+ switch (info)
+ {
+ case XLOG_LOGICAL_MESSAGE:
+ return "MESSAGE";
+ case XLOG_LOGICAL_INSERT:
+ return "INSERT";
+ case XLOG_LOGICAL_UPDATE:
+ return "UPDATE";
+ case XLOG_LOGICAL_DELETE:
+ return "DELETE";
+ case XLOG_LOGICAL_TRUNCATE:
+ return "TRUNCATE";
+ default:
+ return NULL;
+ }
return NULL;
}
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index f8847d5aebf..ca0bd614fcf 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -25,7 +25,7 @@
#include "commands/sequence.h"
#include "commands/tablespace.h"
#include "replication/decode.h"
-#include "replication/message.h"
+#include "replication/logical_xlog.h"
#include "replication/origin.h"
#include "storage/standby.h"
#include "utils/relmapper.h"
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index c4e2fdeb719..9fa281a1dca 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -19,7 +19,7 @@ OBJS = \
launcher.o \
logical.o \
logicalfuncs.o \
- message.o \
+ logical_xlog.o \
origin.o \
proto.o \
relation.o \
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 77bc7aea7a0..81feca97eb0 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -35,23 +35,29 @@
#include "access/xlogrecord.h"
#include "access/xlogutils.h"
#include "catalog/pg_control.h"
+#include "commands/sequence.h"
#include "replication/decode.h"
#include "replication/logical.h"
-#include "replication/message.h"
+#include "replication/logical_xlog.h"
#include "replication/origin.h"
#include "replication/reorderbuffer.h"
#include "replication/snapbuild.h"
#include "storage/standby.h"
-#include "commands/sequence.h"
/* individual record(group)'s handlers */
-static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeHeapInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeHeapUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeHeapDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeHeapTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeLogicalMsg(LogicalDecodingContext *cxt, XLogRecordBuffer *buf);
+static void DecodeLogicalInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeLogicalUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeLogicalDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeLogicalTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+
static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
xl_xact_parsed_commit *parsed, TransactionId xid,
bool two_phase);
@@ -457,7 +463,7 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
case XLOG_HEAP_INSERT:
if (SnapBuildProcessChange(builder, xid, buf->origptr))
- DecodeInsert(ctx, buf);
+ DecodeHeapInsert(ctx, buf);
break;
/*
@@ -468,17 +474,17 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
case XLOG_HEAP_HOT_UPDATE:
case XLOG_HEAP_UPDATE:
if (SnapBuildProcessChange(builder, xid, buf->origptr))
- DecodeUpdate(ctx, buf);
+ DecodeHeapUpdate(ctx, buf);
break;
case XLOG_HEAP_DELETE:
if (SnapBuildProcessChange(builder, xid, buf->origptr))
- DecodeDelete(ctx, buf);
+ DecodeHeapDelete(ctx, buf);
break;
case XLOG_HEAP_TRUNCATE:
if (SnapBuildProcessChange(builder, xid, buf->origptr))
- DecodeTruncate(ctx, buf);
+ DecodeHeapTruncate(ctx, buf);
break;
case XLOG_HEAP_INPLACE:
@@ -559,31 +565,71 @@ FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
* Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
*/
void
-logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+logical_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
- SnapBuild *builder = ctx->snapshot_builder;
XLogReaderState *r = buf->record;
+ SnapBuild *builder = ctx->snapshot_builder;
TransactionId xid = XLogRecGetXid(r);
uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
- RepOriginId origin_id = XLogRecGetOrigin(r);
- Snapshot snapshot;
- xl_logical_message *message;
-
- if (info != XLOG_LOGICAL_MESSAGE)
- elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
- ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
+ ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
/*
* If we don't have snapshot or we are just fast-forwarding, there is no
- * point in decoding messages.
+ * point in decoding data changes.
*/
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
ctx->fast_forward)
return;
+ switch (info)
+ {
+ case XLOG_LOGICAL_MESSAGE:
+ DecodeLogicalMsg(ctx, buf);
+ break;
+
+ case XLOG_LOGICAL_INSERT:
+ if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ DecodeLogicalInsert(ctx, buf);
+ break;
+
+ case XLOG_LOGICAL_UPDATE:
+ if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ DecodeLogicalUpdate(ctx, buf);
+ break;
+
+ case XLOG_LOGICAL_DELETE:
+ if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ DecodeLogicalDelete(ctx, buf);
+ break;
+
+ case XLOG_LOGICAL_TRUNCATE:
+ if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ DecodeLogicalTruncate(ctx, buf);
+ break;
+
+ default:
+ elog(ERROR, "unexpected RM_LOGICAL_ID record type: %u", info);
+ break;
+ }
+}
+
+static void
+DecodeLogicalMsg(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+ XLogReaderState *r = buf->record;
+ SnapBuild *builder = ctx->snapshot_builder;
+ TransactionId xid = XLogRecGetXid(r);
+ RepOriginId origin_id = XLogRecGetOrigin(r);
+ Snapshot snapshot;
+ xl_logical_message *message;
+
message = (xl_logical_message *) XLogRecGetData(r);
+ if (message->transactional &&
+ !SnapBuildProcessChange(builder, xid, buf->origptr))
+ return;
+
if (message->dbId != ctx->slot->data.database ||
FilterByOrigin(ctx, origin_id))
return;
@@ -603,6 +649,187 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
* prefix */
message->message_size,
message->message + message->prefix_size);
+
+}
+
+/*
+ * Parse XLOG_HEAP_INSERT (not MULTI_INSERT!) records into tuplebufs.
+ *
+ * Deletes can contain the new tuple.
+ */
+static void
+DecodeLogicalInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+ XLogReaderState *r = buf->record;
+ Size datalen = XLogRecGetDataLen(r) - SizeOfLogicalInsert;
+ char *tupledata = XLogRecGetData(r) + SizeOfLogicalInsert;
+ Size tuplelen = datalen - SizeOfHeapHeader;
+ xl_logical_insert *xlrec;
+ ReorderBufferChange *change;
+
+ xlrec = (xl_logical_insert *) XLogRecGetData(r);
+
+ /* only interested in our database */
+ if (xlrec->node.dbNode != ctx->slot->data.database)
+ return;
+
+ /* output plugin doesn't look for this origin, no need to queue */
+ if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+ return;
+
+ change = ReorderBufferGetChange(ctx->reorder);
+ change->action = REORDER_BUFFER_CHANGE_INSERT;
+ change->origin_id = XLogRecGetOrigin(r);
+
+ memcpy(&change->data.tp.relnode, &xlrec->node, sizeof(RelFileNode));
+
+ change->data.tp.newtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
+
+ DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple);
+
+ change->data.tp.clear_toast_afterwards = true;
+
+ ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
+ change, false);
+}
+
+/*
+ * Parse XLOG_LOGICAL_UPDATE from wal into proper tuplebufs.
+ *
+ * Updates can possibly contain a new tuple and the old primary key.
+ */
+static void
+DecodeLogicalUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+ XLogReaderState *r = buf->record;
+ xl_logical_update *xlrec = (xl_logical_update *) XLogRecGetData(r);
+ char *new_tupledata = XLogRecGetData(r) + SizeOfLogicalUpdate;
+ Size new_datalen = xlrec->new_datalen;
+ Size new_tuplelen = new_datalen - SizeOfHeapHeader;
+ ReorderBufferChange *change;
+
+ /* only interested in our database */
+ if (xlrec->node.dbNode != ctx->slot->data.database)
+ return;
+
+ /* output plugin doesn't look for this origin, no need to queue */
+ if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+ return;
+
+ change = ReorderBufferGetChange(ctx->reorder);
+ change->action = REORDER_BUFFER_CHANGE_UPDATE;
+ change->origin_id = XLogRecGetOrigin(r);
+
+ memcpy(&change->data.tp.relnode, &xlrec->node, sizeof(RelFileNode));
+
+ change->data.tp.newtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, new_tuplelen);
+
+ DecodeXLogTuple(new_tupledata, new_datalen, change->data.tp.newtuple);
+
+ if (xlrec->flags & XLL_UPDATE_CONTAINS_OLD)
+ {
+ char *old_tupledata = new_tupledata + new_datalen;
+ Size old_datalen;
+ Size old_tuplelen;
+
+ /* remaining data is the old tuple */
+ old_datalen = XLogRecGetDataLen(r) - new_datalen - SizeOfLogicalUpdate;
+ old_tuplelen = old_datalen - SizeOfHeapHeader;
+
+ change->data.tp.oldtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, old_tuplelen);
+
+ DecodeXLogTuple(old_tupledata, old_datalen, change->data.tp.oldtuple);
+ }
+
+ change->data.tp.clear_toast_afterwards = true;
+
+ ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
+ change, false);
+}
+
+/*
+ * Parse XLOG_LOGICAL_DELETE from wal into proper tuplebufs.
+ *
+ * Deletes can possibly contain the old primary key.
+ */
+static void
+DecodeLogicalDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+ XLogReaderState *r = buf->record;
+ xl_logical_delete *xlrec;
+ ReorderBufferChange *change;
+
+ xlrec = (xl_logical_delete *) XLogRecGetData(r);
+
+ /* only interested in our database */
+ if (xlrec->node.dbNode != ctx->slot->data.database)
+ return;
+
+ /* output plugin doesn't look for this origin, no need to queue */
+ if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+ return;
+
+ change = ReorderBufferGetChange(ctx->reorder);
+ change->action = REORDER_BUFFER_CHANGE_DELETE;
+ change->origin_id = XLogRecGetOrigin(r);
+
+ memcpy(&change->data.tp.relnode, &xlrec->node, sizeof(RelFileNode));
+
+ if (xlrec->flags & XLL_DELETE_CONTAINS_OLD)
+ {
+ Size old_datalen = XLogRecGetDataLen(r) - SizeOfLogicalDelete;
+ char *old_tupledata = XLogRecGetData(r) + SizeOfLogicalDelete;
+ Size old_tuplelen = old_datalen - SizeOfHeapHeader;
+
+ change->data.tp.oldtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, old_tuplelen);
+
+ DecodeXLogTuple(old_tupledata, old_datalen, change->data.tp.oldtuple);
+ }
+
+ change->data.tp.clear_toast_afterwards = true;
+
+ ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
+ change, false);
+}
+
+/*
+ * Parse XLOG_LOGICAL_TRUNCATE from wal
+ */
+static void
+DecodeLogicalTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+ XLogReaderState *r = buf->record;
+ xl_logical_truncate *xlrec;
+ ReorderBufferChange *change;
+
+ xlrec = (xl_logical_truncate *) XLogRecGetData(r);
+
+ /* only interested in our database */
+ if (xlrec->dbId != ctx->slot->data.database)
+ return;
+
+ /* output plugin doesn't look for this origin, no need to queue */
+ if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+ return;
+
+ change = ReorderBufferGetChange(ctx->reorder);
+ change->action = REORDER_BUFFER_CHANGE_TRUNCATE;
+ change->origin_id = XLogRecGetOrigin(r);
+ if (xlrec->flags & XLL_TRUNCATE_CASCADE)
+ change->data.truncate.cascade = true;
+ if (xlrec->flags & XLL_TRUNCATE_RESTART_SEQS)
+ change->data.truncate.restart_seqs = true;
+ change->data.truncate.nrelids = xlrec->nrelids;
+ change->data.truncate.relids = ReorderBufferGetRelids(ctx->reorder,
+ xlrec->nrelids);
+ memcpy(change->data.truncate.relids, xlrec->relids,
+ xlrec->nrelids * sizeof(Oid));
+ ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
+ buf->origptr, change, false);
}
/*
@@ -839,7 +1066,7 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
* Deletes can contain the new tuple.
*/
static void
-DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+DecodeHeapInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
Size datalen;
char *tupledata;
@@ -898,7 +1125,7 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
* Updates can possibly contain a new tuple and the old primary key.
*/
static void
-DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+DecodeHeapUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
XLogReaderState *r = buf->record;
xl_heap_update *xlrec;
@@ -965,7 +1192,7 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
* Deletes can possibly contain the old primary key.
*/
static void
-DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+DecodeHeapDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
XLogReaderState *r = buf->record;
xl_heap_delete *xlrec;
@@ -1019,7 +1246,7 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
* Parse XLOG_HEAP_TRUNCATE from wal
*/
static void
-DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+DecodeHeapTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
XLogReaderState *r = buf->record;
xl_heap_truncate *xlrec;
diff --git a/src/backend/replication/logical/logical_xlog.c b/src/backend/replication/logical/logical_xlog.c
new file mode 100644
index 00000000000..30ae3e73be6
--- /dev/null
+++ b/src/backend/replication/logical/logical_xlog.c
@@ -0,0 +1,399 @@
+/*-------------------------------------------------------------------------
+ *
+ * logical_xlog.c
+ * Logical xlog records.
+ *
+ * Copyright (c) 2013-2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/replication/logical/logical_xlog.c
+ *
+ * Logical Messages
+ *
+ * Generic logical messages allow XLOG logging of arbitrary binary blobs that
+ * get passed to the logical decoding plugin. In normal XLOG processing they
+ * are same as NOOP.
+ *
+ * These messages can be either transactional or non-transactional.
+ * Transactional messages are part of current transaction and will be sent to
+ * decoding plugin using in a same way as DML operations.
+ * Non-transactional messages are sent to the plugin at the time when the
+ * logical decoding reads them from XLOG. This also means that transactional
+ * messages won't be delivered if the transaction was rolled back but the
+ * non-transactional one will always be delivered.
+ *
+ * Every message carries prefix to avoid conflicts between different decoding
+ * plugins. The plugin authors must take extra care to use unique prefix,
+ * good options seems to be for example to use the name of the extension.
+ *
+ * Logical Insert/Update/Delete/Truncate
+ *
+ * These records are intended to be used by non-heap table access methods that
+ * wish to support logical decoding and replication. They are treated
+ * similarly to the analogous heap records, but are not tied to physical pages
+ * or other details of the heap. These records are not processed during redo,
+ * so do not contribute to durability or physical replication; use generic WAL
+ * records for that. Note that using both logical WAL records and generic WAL
+ * records is redundant compared with the heap.
+ *
+ * ---------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "access/heapam_xlog.h"
+#include "access/xact.h"
+#include "access/xloginsert.h"
+#include "catalog/catalog.h"
+#include "miscadmin.h"
+#include "nodes/execnodes.h"
+#include "replication/logical.h"
+#include "replication/logical_xlog.h"
+#include "utils/memutils.h"
+#include "utils/relcache.h"
+
+static void CheckReplicaIdentity(Relation relation, TupleTableSlot *slot);
+
+/*
+ * Write logical decoding message into XLog.
+ */
+XLogRecPtr
+LogLogicalMessage(const char *prefix, const char *message, size_t size,
+ bool transactional)
+{
+ xl_logical_message xlrec;
+
+ /*
+ * Force xid to be allocated if we're emitting a transactional message.
+ */
+ if (transactional)
+ {
+ Assert(IsTransactionState());
+ GetCurrentTransactionId();
+ }
+
+ xlrec.dbId = MyDatabaseId;
+ xlrec.transactional = transactional;
+ /* trailing zero is critical; see logicalmsg_desc */
+ xlrec.prefix_size = strlen(prefix) + 1;
+ xlrec.message_size = size;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfLogicalMessage);
+ XLogRegisterData(unconstify(char *, prefix), xlrec.prefix_size);
+ XLogRegisterData(unconstify(char *, message), size);
+
+ /* allow origin filtering */
+ XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
+ return XLogInsert(RM_LOGICAL_ID, XLOG_LOGICAL_MESSAGE);
+}
+
+/*
+ * Write logical insert into log.
+ */
+XLogRecPtr
+LogLogicalInsert(Relation relation, TupleTableSlot *new_slot)
+{
+ bool free_new_tuple;
+ HeapTuple new_tuple;
+ xl_logical_insert xlrec;
+ xl_heap_header xlhdr;
+ XLogRecPtr recptr;
+
+ if (!equalTupleDescs(relation->rd_att, new_slot->tts_tupleDescriptor))
+ ereport(ERROR, (errmsg("record type must match relation type")));
+
+ if (!RelationIsLogicallyLogged(relation))
+ return InvalidXLogRecPtr;
+
+ new_tuple = ExecFetchSlotHeapTuple(new_slot, true, &free_new_tuple);
+
+ /* force xid to be allocated */
+ Assert(IsTransactionState());
+ GetCurrentTransactionId();
+
+ new_tuple->t_tableOid = new_slot->tts_tableOid;
+ ItemPointerCopy(&new_tuple->t_self, &new_slot->tts_tid);
+
+ xlrec.node = relation->rd_node;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfLogicalInsert);
+
+ xlhdr.t_infomask2 = new_tuple->t_data->t_infomask2;
+ xlhdr.t_infomask = new_tuple->t_data->t_infomask;
+ xlhdr.t_hoff = new_tuple->t_data->t_hoff;
+
+ XLogRegisterData((char *) &xlhdr, SizeOfHeapHeader);
+ XLogRegisterData((char *) new_tuple->t_data + SizeofHeapTupleHeader,
+ new_tuple->t_len - SizeofHeapTupleHeader);
+
+ /* allow origin filtering */
+ XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
+ recptr = XLogInsert(RM_LOGICAL_ID, XLOG_LOGICAL_INSERT);
+
+ if (free_new_tuple)
+ pfree(new_tuple);
+
+ return recptr;
+}
+
+/*
+ * Write logical update into log.
+ */
+XLogRecPtr
+LogLogicalUpdate(Relation relation, TupleTableSlot *old_slot,
+ TupleTableSlot *new_slot)
+{
+ HeapTuple old_whole_tuple;
+ HeapTuple old_id_tuple;
+ HeapTuple new_tuple;
+ bool free_old_whole_tuple;
+ bool free_old_id_tuple;
+ bool free_new_tuple;
+ xl_heap_header new_xlhdr;
+ xl_logical_update xlrec;
+ XLogRecPtr recptr;
+
+ if (!equalTupleDescs(relation->rd_att, old_slot->tts_tupleDescriptor) ||
+ !equalTupleDescs(relation->rd_att, new_slot->tts_tupleDescriptor))
+ ereport(ERROR, (errmsg("record types must match relation type")));
+
+ CheckReplicaIdentity(relation, old_slot);
+
+ if (!RelationIsLogicallyLogged(relation))
+ return InvalidXLogRecPtr;
+
+ /* force xid to be allocated */
+ Assert(IsTransactionState());
+ GetCurrentTransactionId();
+
+ old_whole_tuple = ExecFetchSlotHeapTuple(old_slot, true,
+ &free_old_whole_tuple);
+ new_tuple = ExecFetchSlotHeapTuple(new_slot, true, &free_new_tuple);
+
+ xlrec.node = relation->rd_node;
+ xlrec.new_datalen = new_tuple->t_len - SizeofHeapTupleHeader +
+ SizeOfHeapHeader;
+ xlrec.flags = 0;
+
+ old_id_tuple = ExtractReplicaIdentity(relation, old_whole_tuple, true,
+ &free_old_id_tuple);
+
+ if (old_id_tuple != NULL)
+ {
+ xlrec.flags |= XLL_UPDATE_CONTAINS_OLD;
+ old_id_tuple->t_tableOid = old_slot->tts_tableOid;
+ ItemPointerCopy(&old_id_tuple->t_self, &old_slot->tts_tid);
+ }
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfLogicalUpdate);
+
+ new_tuple->t_tableOid = new_slot->tts_tableOid;
+ ItemPointerCopy(&new_tuple->t_self, &new_slot->tts_tid);
+
+ new_xlhdr.t_infomask2 = new_tuple->t_data->t_infomask2;
+ new_xlhdr.t_infomask = new_tuple->t_data->t_infomask;
+ new_xlhdr.t_hoff = new_tuple->t_data->t_hoff;
+
+ /* write new tuple first, then old */
+ XLogRegisterData((char *) &new_xlhdr, SizeOfHeapHeader);
+ XLogRegisterData((char *) new_tuple->t_data + SizeofHeapTupleHeader,
+ new_tuple->t_len - SizeofHeapTupleHeader);
+
+ if (old_id_tuple != NULL)
+ {
+ xl_heap_header old_xlhdr;
+
+ old_xlhdr.t_infomask2 = old_id_tuple->t_data->t_infomask2;
+ old_xlhdr.t_infomask = old_id_tuple->t_data->t_infomask;
+ old_xlhdr.t_hoff = old_id_tuple->t_data->t_hoff;
+
+ XLogRegisterData((char *) &old_xlhdr, SizeOfHeapHeader);
+ XLogRegisterData((char *) old_id_tuple->t_data + SizeofHeapTupleHeader,
+ old_id_tuple->t_len - SizeofHeapTupleHeader);
+ }
+
+ /* allow origin filtering */
+ XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
+ recptr = XLogInsert(RM_LOGICAL_ID, XLOG_LOGICAL_UPDATE);
+
+ if (free_old_whole_tuple)
+ pfree(old_whole_tuple);
+ if (free_old_id_tuple)
+ pfree(old_id_tuple);
+ if (free_new_tuple)
+ pfree(new_tuple);
+
+ return recptr;
+}
+
+/*
+ * Write logical update into log.
+ */
+XLogRecPtr
+LogLogicalDelete(Relation relation, TupleTableSlot *old_slot)
+{
+ HeapTuple old_whole_tuple;
+ HeapTuple old_id_tuple;
+ bool free_old_whole_tuple;
+ bool free_old_id_tuple;
+ xl_logical_delete xlrec;
+ XLogRecPtr recptr;
+
+ if (!equalTupleDescs(relation->rd_att, old_slot->tts_tupleDescriptor))
+ ereport(ERROR, (errmsg("record type must match relation type")));
+
+ CheckReplicaIdentity(relation, old_slot);
+
+ if (!RelationIsLogicallyLogged(relation))
+ return InvalidXLogRecPtr;
+
+ /* force xid to be allocated */
+ Assert(IsTransactionState());
+ GetCurrentTransactionId();
+
+ old_whole_tuple = ExecFetchSlotHeapTuple(old_slot, true,
+ &free_old_whole_tuple);
+
+ xlrec.node = relation->rd_node;
+ xlrec.flags = 0;
+
+ old_id_tuple = ExtractReplicaIdentity(relation, old_whole_tuple, true,
+ &free_old_id_tuple);
+
+ if (old_id_tuple != NULL)
+ {
+ xlrec.flags |= XLL_UPDATE_CONTAINS_OLD;
+ old_id_tuple->t_tableOid = old_slot->tts_tableOid;
+ ItemPointerCopy(&old_id_tuple->t_self, &old_slot->tts_tid);
+ }
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfLogicalDelete);
+
+ if (old_id_tuple != NULL)
+ {
+ xl_heap_header old_xlhdr;
+
+ old_xlhdr.t_infomask2 = old_id_tuple->t_data->t_infomask2;
+ old_xlhdr.t_infomask = old_id_tuple->t_data->t_infomask;
+ old_xlhdr.t_hoff = old_id_tuple->t_data->t_hoff;
+
+ XLogRegisterData((char *) &old_xlhdr, SizeOfHeapHeader);
+ XLogRegisterData((char *) old_id_tuple->t_data + SizeofHeapTupleHeader,
+ old_id_tuple->t_len - SizeofHeapTupleHeader);
+ }
+
+ /* allow origin filtering */
+ XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
+ recptr = XLogInsert(RM_LOGICAL_ID, XLOG_LOGICAL_DELETE);
+
+ if (free_old_whole_tuple)
+ pfree(old_whole_tuple);
+ if (free_old_id_tuple)
+ pfree(old_id_tuple);
+
+ return recptr;
+}
+
+/*
+ * Write logical truncate into log.
+ */
+XLogRecPtr
+LogLogicalTruncate(List *relids, bool cascade, bool restart_seqs)
+{
+ xl_logical_truncate xlrec;
+ XLogRecPtr recptr;
+ Oid *logrelids;
+ ListCell *lc;
+ int nrelids = 0;
+
+ /* force xid to be allocated */
+ Assert(IsTransactionState());
+ GetCurrentTransactionId();
+
+ xlrec.dbId = MyDatabaseId;
+ xlrec.nrelids = list_length(relids);
+ xlrec.flags = 0;
+ if (cascade)
+ xlrec.flags |= XLL_TRUNCATE_CASCADE;
+ if (restart_seqs)
+ xlrec.flags |= XLL_TRUNCATE_RESTART_SEQS;
+
+ logrelids = palloc(list_length(relids) * sizeof(Oid));
+ foreach(lc, relids)
+ {
+ Oid relid = lfirst_oid(lc);
+ Relation rel = relation_open(relid, AccessShareLock);
+
+ if (RelationIsLogicallyLogged(rel))
+ {
+ logrelids[nrelids++] = lfirst_oid(lc);
+ }
+
+ relation_close(rel, NoLock);
+ }
+
+ if (nrelids == 0)
+ return InvalidXLogRecPtr;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfLogicalTruncate);
+ XLogRegisterData((char *) logrelids, nrelids * sizeof(Oid));
+
+ /* allow origin filtering */
+ XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
+ recptr = XLogInsert(RM_LOGICAL_ID, XLOG_LOGICAL_TRUNCATE);
+
+ return recptr;
+}
+
+/*
+ * Redo is basically just noop for logical decoding messages.
+ */
+void
+logical_redo(XLogReaderState *record)
+{
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+ switch (info)
+ {
+ case XLOG_LOGICAL_MESSAGE:
+ case XLOG_LOGICAL_INSERT:
+ case XLOG_LOGICAL_UPDATE:
+ case XLOG_LOGICAL_DELETE:
+ case XLOG_LOGICAL_TRUNCATE:
+ break;
+ default:
+ elog(PANIC, "logical_redo: unknown op code %u", info);
+ }
+
+ /* This is only interesting for logical decoding, see decode.c. */
+}
+
+/*
+ * Check that replica identity columns are non-NULL.
+ */
+static void
+CheckReplicaIdentity(Relation relation, TupleTableSlot *slot)
+{
+ /* check for NULL attributes in the replica identity */
+ Bitmapset *id_attrs = RelationGetIndexAttrBitmap(
+ relation, INDEX_ATTR_BITMAP_IDENTITY_KEY);
+ int id_attr = (-1) * FirstLowInvalidHeapAttributeNumber;
+
+ while ((id_attr = bms_next_member(id_attrs, id_attr)) >= 0)
+ {
+ AttrNumber attno = id_attr + FirstLowInvalidHeapAttributeNumber;
+ if (slot_attisnull(slot, attno))
+ ereport(ERROR, (errmsg("replica identity column is NULL")));
+ }
+}
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 6058d36e0d5..2776861ab8d 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -17,6 +17,7 @@
#include <unistd.h>
+#include "access/relation.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogrecovery.h"
@@ -29,7 +30,7 @@
#include "nodes/makefuncs.h"
#include "replication/decode.h"
#include "replication/logical.h"
-#include "replication/message.h"
+#include "replication/logical_xlog.h"
#include "storage/fd.h"
#include "utils/array.h"
#include "utils/builtins.h"
@@ -389,3 +390,165 @@ pg_logical_emit_message_text(PG_FUNCTION_ARGS)
/* bytea and text are compatible */
return pg_logical_emit_message_bytea(fcinfo);
}
+
+/*
+ * SQL function for writing logical insert into WAL.
+ */
+Datum
+pg_logical_emit_insert(PG_FUNCTION_ARGS)
+{
+ Oid relid = PG_GETARG_OID(0);
+ HeapTupleHeader htup = PG_GETARG_HEAPTUPLEHEADER(1);
+ Relation rel = relation_open(relid, AccessShareLock);
+ HeapTupleData tuple;
+ TupleTableSlot *slot;
+ Oid rel_type;
+ Oid tup_type;
+ XLogRecPtr lsn;
+
+ /* check that tuple matches the type of the relation */
+ rel_type = get_rel_type_id(relid);
+ tup_type = HeapTupleHeaderGetTypeId(htup);
+ if (rel_type != tup_type)
+ ereport(ERROR, (errmsg("record type must match table type")));
+
+ tuple.t_len = HeapTupleHeaderGetDatumLength(htup);
+ ItemPointerSetInvalid(&(tuple.t_self));
+ tuple.t_tableOid = relid;
+ tuple.t_data = htup;
+
+ slot = MakeTupleTableSlot(rel->rd_att, &TTSOpsHeapTuple);
+ ExecClearTuple(slot);
+ ExecStoreHeapTuple(&tuple, slot, false);
+
+ lsn = LogLogicalInsert(rel, slot);
+
+ ExecDropSingleTupleTableSlot(slot);
+
+ relation_close(rel, NoLock);
+
+ PG_RETURN_LSN(lsn);
+}
+
+/*
+ * SQL function for writing logical update into WAL.
+ */
+Datum
+pg_logical_emit_update(PG_FUNCTION_ARGS)
+{
+ Oid relid = PG_GETARG_OID(0);
+ HeapTupleHeader old_htup = PG_GETARG_HEAPTUPLEHEADER(1);
+ HeapTupleHeader new_htup = PG_GETARG_HEAPTUPLEHEADER(2);
+ Relation rel = relation_open(relid, AccessShareLock);
+ HeapTupleData old_tuple;
+ HeapTupleData new_tuple;
+ TupleTableSlot *old_slot;
+ TupleTableSlot *new_slot;
+ Oid rel_type;
+ Oid old_tup_type;
+ Oid new_tup_type;
+ XLogRecPtr lsn;
+
+ /* check that tuple matches the type of the relation */
+ rel_type = get_rel_type_id(relid);
+ old_tup_type = HeapTupleHeaderGetTypeId(old_htup);
+ new_tup_type = HeapTupleHeaderGetTypeId(new_htup);
+
+ if (rel_type != old_tup_type || rel_type != new_tup_type)
+ ereport(ERROR, (errmsg("record type must match table type")));
+
+ old_tuple.t_len = HeapTupleHeaderGetDatumLength(old_htup);
+ ItemPointerSetInvalid(&(old_tuple.t_self));
+ old_tuple.t_tableOid = relid;
+ old_tuple.t_data = old_htup;
+
+ new_tuple.t_len = HeapTupleHeaderGetDatumLength(new_htup);
+ ItemPointerSetInvalid(&(new_tuple.t_self));
+ new_tuple.t_tableOid = relid;
+ new_tuple.t_data = new_htup;
+
+ old_slot = MakeTupleTableSlot(rel->rd_att, &TTSOpsHeapTuple);
+ new_slot = MakeTupleTableSlot(rel->rd_att, &TTSOpsHeapTuple);
+ ExecClearTuple(old_slot);
+ ExecClearTuple(new_slot);
+ ExecStoreHeapTuple(&old_tuple, old_slot, false);
+ ExecStoreHeapTuple(&new_tuple, new_slot, false);
+
+ lsn = LogLogicalUpdate(rel, old_slot, new_slot);
+
+ ExecDropSingleTupleTableSlot(old_slot);
+ ExecDropSingleTupleTableSlot(new_slot);
+
+ relation_close(rel, NoLock);
+
+ PG_RETURN_LSN(lsn);
+}
+
+/*
+ * SQL function for writing logical delete into WAL.
+ */
+Datum
+pg_logical_emit_delete(PG_FUNCTION_ARGS)
+{
+ Oid relid = PG_GETARG_OID(0);
+ HeapTupleHeader htup = PG_GETARG_HEAPTUPLEHEADER(1);
+ Relation rel = relation_open(relid, AccessShareLock);
+ HeapTupleData tuple;
+ TupleTableSlot *slot;
+ Oid rel_type;
+ Oid tup_type;
+ XLogRecPtr lsn;
+
+ /* check that tuple matches the type of the relation */
+ rel_type = get_rel_type_id(relid);
+ tup_type = HeapTupleHeaderGetTypeId(htup);
+ if (rel_type != tup_type)
+ ereport(ERROR, (errmsg("record type must match table type")));
+
+ tuple.t_len = HeapTupleHeaderGetDatumLength(htup);
+ ItemPointerSetInvalid(&(tuple.t_self));
+ tuple.t_tableOid = relid;
+ tuple.t_data = htup;
+
+ slot = MakeTupleTableSlot(rel->rd_att, &TTSOpsHeapTuple);
+ ExecClearTuple(slot);
+ ExecStoreHeapTuple(&tuple, slot, false);
+
+ lsn = LogLogicalDelete(rel, slot);
+
+ ExecDropSingleTupleTableSlot(slot);
+
+ relation_close(rel, NoLock);
+
+ PG_RETURN_LSN(lsn);
+}
+
+/*
+ * SQL function for writing logical truncate into WAL.
+ */
+Datum
+pg_logical_emit_truncate(PG_FUNCTION_ARGS)
+{
+ ArrayType *arr = PG_GETARG_ARRAYTYPE_P(0);
+ bool cascade = PG_GETARG_BOOL(1);
+ bool restart_seqs = PG_GETARG_BOOL(2);
+ Datum *values;
+ bool *nulls;
+ int nrelids;
+ List *relids = NIL;
+ XLogRecPtr lsn;
+
+ deconstruct_array(arr, REGCLASSOID, sizeof(Oid), true, TYPALIGN_INT,
+ &values, &nulls, &nrelids);
+
+ for (int i = 0; i < nrelids; i++)
+ {
+ if (nulls[i])
+ ereport(ERROR, (errmsg("unexpected NULL element")));
+ relids = lappend_oid(relids, DatumGetObjectId(values[i]));
+ }
+
+ lsn = LogLogicalTruncate(relids, cascade, restart_seqs);
+
+ PG_RETURN_LSN(lsn);
+}
diff --git a/src/backend/replication/logical/message.c b/src/backend/replication/logical/message.c
deleted file mode 100644
index 1c34912610e..00000000000
--- a/src/backend/replication/logical/message.c
+++ /dev/null
@@ -1,89 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * message.c
- * Generic logical messages.
- *
- * Copyright (c) 2013-2022, PostgreSQL Global Development Group
- *
- * IDENTIFICATION
- * src/backend/replication/logical/message.c
- *
- * NOTES
- *
- * Generic logical messages allow XLOG logging of arbitrary binary blobs that
- * get passed to the logical decoding plugin. In normal XLOG processing they
- * are same as NOOP.
- *
- * These messages can be either transactional or non-transactional.
- * Transactional messages are part of current transaction and will be sent to
- * decoding plugin using in a same way as DML operations.
- * Non-transactional messages are sent to the plugin at the time when the
- * logical decoding reads them from XLOG. This also means that transactional
- * messages won't be delivered if the transaction was rolled back but the
- * non-transactional one will always be delivered.
- *
- * Every message carries prefix to avoid conflicts between different decoding
- * plugins. The plugin authors must take extra care to use unique prefix,
- * good options seems to be for example to use the name of the extension.
- *
- * ---------------------------------------------------------------------------
- */
-
-#include "postgres.h"
-
-#include "access/xact.h"
-#include "access/xloginsert.h"
-#include "miscadmin.h"
-#include "nodes/execnodes.h"
-#include "replication/logical.h"
-#include "replication/message.h"
-#include "utils/memutils.h"
-
-/*
- * Write logical decoding message into XLog.
- */
-XLogRecPtr
-LogLogicalMessage(const char *prefix, const char *message, size_t size,
- bool transactional)
-{
- xl_logical_message xlrec;
-
- /*
- * Force xid to be allocated if we're emitting a transactional message.
- */
- if (transactional)
- {
- Assert(IsTransactionState());
- GetCurrentTransactionId();
- }
-
- xlrec.dbId = MyDatabaseId;
- xlrec.transactional = transactional;
- /* trailing zero is critical; see logicalmsg_desc */
- xlrec.prefix_size = strlen(prefix) + 1;
- xlrec.message_size = size;
-
- XLogBeginInsert();
- XLogRegisterData((char *) &xlrec, SizeOfLogicalMessage);
- XLogRegisterData(unconstify(char *, prefix), xlrec.prefix_size);
- XLogRegisterData(unconstify(char *, message), size);
-
- /* allow origin filtering */
- XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
-
- return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE);
-}
-
-/*
- * Redo is basically just noop for logical decoding messages.
- */
-void
-logicalmsg_redo(XLogReaderState *record)
-{
- uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
-
- if (info != XLOG_LOGICAL_MESSAGE)
- elog(PANIC, "logicalmsg_redo: unknown op code %u", info);
-
- /* This is only interesting for logical decoding, see decode.c. */
-}
diff --git a/src/bin/pg_waldump/.gitignore b/src/bin/pg_waldump/.gitignore
index 3be00a8b61f..567655fa626 100644
--- a/src/bin/pg_waldump/.gitignore
+++ b/src/bin/pg_waldump/.gitignore
@@ -10,7 +10,7 @@
/gistdesc.c
/hashdesc.c
/heapdesc.c
-/logicalmsgdesc.c
+/logicaldesc.c
/mxactdesc.c
/nbtdesc.c
/relmapdesc.c
diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c
index 6a4ebd1310b..86804a243bc 100644
--- a/src/bin/pg_waldump/rmgrdesc.c
+++ b/src/bin/pg_waldump/rmgrdesc.c
@@ -26,7 +26,7 @@
#include "commands/dbcommands_xlog.h"
#include "commands/sequence.h"
#include "commands/tablespace.h"
-#include "replication/message.h"
+#include "replication/logical_xlog.h"
#include "replication/origin.h"
#include "rmgrdesc.h"
#include "storage/standbydefs.h"
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index b46ab7d7390..77fa0337f6e 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -175,6 +175,8 @@ extern void simple_heap_insert(Relation relation, HeapTuple tup);
extern void simple_heap_delete(Relation relation, ItemPointer tid);
extern void simple_heap_update(Relation relation, ItemPointer otid,
HeapTuple tup);
+extern HeapTuple ExtractReplicaIdentity(Relation rel, HeapTuple tup,
+ bool key_required, bool *copy);
extern TransactionId heap_index_delete_tuples(Relation rel,
TM_IndexDeleteOp *delstate);
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index 5c47fdcec80..5460f7836f9 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -124,6 +124,9 @@ typedef struct xl_heap_delete
* For truncate we list all truncated relids in an array, followed by all
* sequence relids that need to be restarted, if any.
* All rels are always within the same database, so we just list dbid once.
+ *
+ * Note: identical to xl_logical_truncate, except that for
+ * xl_logical_truncate, no redo is performed.
*/
typedef struct xl_heap_truncate
{
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index cf8b6d48193..d43175088fb 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -46,4 +46,4 @@ PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, bri
PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL)
PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL)
PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL)
-PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode)
+PG_RMGR(RM_LOGICAL_ID, "Logical", logical_redo, logical_desc, logical_identify, NULL, NULL, NULL, logical_decode)
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 01e1dd4d6d1..cd57c2b3aa2 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -10937,6 +10937,23 @@
proname => 'pg_logical_emit_message', provolatile => 'v', proparallel => 'u',
prorettype => 'pg_lsn', proargtypes => 'bool text bytea',
prosrc => 'pg_logical_emit_message_bytea' },
+{ oid => '9297', descr => 'emit a logical insert',
+ proname => 'pg_logical_emit_insert', provolatile => 'v', proparallel => 'u',
+ prorettype => 'pg_lsn', proargtypes => 'regclass record',
+ prosrc => 'pg_logical_emit_insert' },
+{ oid => '9298', descr => 'emit a logical update',
+ proname => 'pg_logical_emit_update', provolatile => 'v', proparallel => 'u',
+ prorettype => 'pg_lsn', proargtypes => 'regclass record record',
+ prosrc => 'pg_logical_emit_update' },
+{ oid => '9299', descr => 'emit a logical delete',
+ proname => 'pg_logical_emit_delete', provolatile => 'v', proparallel => 'u',
+ prorettype => 'pg_lsn', proargtypes => 'regclass record',
+ prosrc => 'pg_logical_emit_delete' },
+{ oid => '9300', descr => 'emit a logical truncate',
+ proname => 'pg_logical_emit_truncate', provolatile => 'v', proparallel => 'u',
+ prorettype => 'pg_lsn', proargtypes => '_regclass bool bool',
+ prosrc => 'pg_logical_emit_truncate' },
+
# event triggers
{ oid => '3566', descr => 'list objects dropped by the current command',
diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h
index 8e07bb7409a..118590ad61e 100644
--- a/src/include/replication/decode.h
+++ b/src/include/replication/decode.h
@@ -26,7 +26,7 @@ extern void heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
extern void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void logical_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
extern void sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
extern void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
diff --git a/src/include/replication/logical_xlog.h b/src/include/replication/logical_xlog.h
new file mode 100644
index 00000000000..dce74f71111
--- /dev/null
+++ b/src/include/replication/logical_xlog.h
@@ -0,0 +1,124 @@
+/*-------------------------------------------------------------------------
+ * logical_xlog.h
+ * Exports from replication/logical/logical_xlog.c
+ *
+ * Copyright (c) 2013-2022, PostgreSQL Global Development Group
+ *
+ * src/include/replication/logical_xlog.h
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_LOGICAL_MESSAGE_H
+#define PG_LOGICAL_MESSAGE_H
+
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "access/xlogreader.h"
+#include "storage/off.h"
+#include "utils/rel.h"
+
+/*
+ * xl_heap_update flag values, 8 bits are available.
+ */
+#define XLL_UPDATE_CONTAINS_OLD (1<<0)
+
+/*
+ * xl_heap_delete flag values, 8 bits are available.
+ */
+#define XLL_DELETE_CONTAINS_OLD (1<<0)
+
+/*
+ * Generic logical decoding message wal record.
+ */
+typedef struct xl_logical_message
+{
+ Oid dbId; /* database Oid emitted from */
+ bool transactional; /* is message transactional? */
+ Size prefix_size; /* length of prefix */
+ Size message_size; /* size of the message */
+ /* payload, including null-terminated prefix of length prefix_size */
+ char message[FLEXIBLE_ARRAY_MEMBER];
+} xl_logical_message;
+
+#define SizeOfLogicalMessage (offsetof(xl_logical_message, message))
+
+/* This is what we need to know about insert */
+typedef struct xl_logical_insert
+{
+ RelFileNode node;
+
+ /* tuple data follows */
+} xl_logical_insert;
+
+#define SizeOfLogicalInsert (offsetof(xl_logical_insert, node) + sizeof(RelFileNode))
+
+/*
+ * This is what we need to know about update.
+ */
+typedef struct xl_logical_update
+{
+ RelFileNode node;
+ Size new_datalen;
+ uint8 flags;
+
+ /* tuple data follows */
+} xl_logical_update;
+
+#define SizeOfLogicalUpdate (offsetof(xl_logical_update, flags) + sizeof(uint8))
+
+/* This is what we need to know about delete */
+typedef struct xl_logical_delete
+{
+ RelFileNode node;
+ uint8 flags;
+
+ /* tuple data follows */
+} xl_logical_delete;
+
+#define SizeOfLogicalDelete (offsetof(xl_logical_delete, flags) + sizeof(uint8))
+
+/*
+ * For truncate we list all truncated relids in an array, followed by all
+ * sequence relids that need to be restarted, if any.
+ * All rels are always within the same database, so we just list dbid once.
+ *
+ * Note: identical to xl_logical_truncate, except that no redo is performed, only
+ * decoding.
+ */
+typedef struct xl_logical_truncate
+{
+ Oid dbId;
+ uint32 nrelids;
+ uint8 flags;
+ Oid relids[FLEXIBLE_ARRAY_MEMBER];
+} xl_logical_truncate;
+
+#define SizeOfLogicalTruncate (offsetof(xl_logical_truncate, relids))
+
+struct TupleTableSlot;
+
+extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message,
+ size_t size, bool transactional);
+extern XLogRecPtr LogLogicalInsert(Relation relation, struct TupleTableSlot *slot);
+extern XLogRecPtr LogLogicalUpdate(Relation relation, struct TupleTableSlot *old_slot,
+ struct TupleTableSlot *new_slot);
+extern XLogRecPtr LogLogicalDelete(Relation relation, struct TupleTableSlot *slot);
+extern XLogRecPtr LogLogicalTruncate(List *relids, bool cascade, bool restart_seqs);
+
+/* RMGR API*/
+#define XLOG_LOGICAL_MESSAGE 0x00
+#define XLOG_LOGICAL_INSERT 0x10
+#define XLOG_LOGICAL_UPDATE 0x20
+#define XLOG_LOGICAL_DELETE 0x30
+#define XLOG_LOGICAL_TRUNCATE 0x40
+
+/*
+ * xl_logical_truncate flag values, 8 bits are available.
+ */
+#define XLL_TRUNCATE_CASCADE (1<<0)
+#define XLL_TRUNCATE_RESTART_SEQS (1<<1)
+
+void logical_redo(XLogReaderState *record);
+void logical_desc(StringInfo buf, XLogReaderState *record);
+const char *logical_identify(uint8 info);
+
+#endif /* PG_LOGICAL_MESSAGE_H */
diff --git a/src/include/replication/message.h b/src/include/replication/message.h
deleted file mode 100644
index 7d7785292f1..00000000000
--- a/src/include/replication/message.h
+++ /dev/null
@@ -1,41 +0,0 @@
-/*-------------------------------------------------------------------------
- * message.h
- * Exports from replication/logical/message.c
- *
- * Copyright (c) 2013-2022, PostgreSQL Global Development Group
- *
- * src/include/replication/message.h
- *-------------------------------------------------------------------------
- */
-#ifndef PG_LOGICAL_MESSAGE_H
-#define PG_LOGICAL_MESSAGE_H
-
-#include "access/xlog.h"
-#include "access/xlogdefs.h"
-#include "access/xlogreader.h"
-
-/*
- * Generic logical decoding message wal record.
- */
-typedef struct xl_logical_message
-{
- Oid dbId; /* database Oid emitted from */
- bool transactional; /* is message transactional? */
- Size prefix_size; /* length of prefix */
- Size message_size; /* size of the message */
- /* payload, including null-terminated prefix of length prefix_size */
- char message[FLEXIBLE_ARRAY_MEMBER];
-} xl_logical_message;
-
-#define SizeOfLogicalMessage (offsetof(xl_logical_message, message))
-
-extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message,
- size_t size, bool transactional);
-
-/* RMGR API*/
-#define XLOG_LOGICAL_MESSAGE 0x00
-void logicalmsg_redo(XLogReaderState *record);
-void logicalmsg_desc(StringInfo buf, XLogReaderState *record);
-const char *logicalmsg_identify(uint8 info);
-
-#endif /* PG_LOGICAL_MESSAGE_H */
--
2.17.1
On Wed, Mar 30, 2022 at 2:31 PM Jeff Davis <pgsql@j-davis.com> wrote:
Attached a new version. The scope expanded, so this is likely to slip
v15 at this late time. For 15, I'll focus on my extensible rmgr work,
which can serve similar purposes.The main purpose of this patch is to be able to emit logical events for
a table (insert/update/delete/truncate) without actually modifying a
table or relying on the heap at all. That allows table AMs to support
logical decoding/replication, and perhaps allows a few other
interesting use cases (maybe foreign tables?). There are really two
advantages of this approach over relying on a custom rmgr:1. Easier for extension authors
2. Doesn't require an extension module to be loaded to start the
server
I'm not sure that I understand exactly how this is intended to be
used. I can think of three possibilities:
1. An AM that doesn't care about having anything happening during
recovery, but wants to be able to get logical decoding to do some
work. Maybe the intention of the AM is that data is available only
when the server is not in recovery and all data is lost on shutdown,
or maybe the AM has its own separate durability mechanism.
2. An AM that wants things to happen during recovery, but handles that
separately. For example, maybe it logs all the data changes via
log_newpage() and then separately emits these log records.
3. An AM that wants things to happen during recovery, and expects
these records to serve that purpose. That would require getting
control during WAL replay as well as during decoding, and would
probably only work for an AM whose data is not block-structured (e.g.
an in-memory btree that is dumped to disk at every checkpoint).
My guess is that this is intended to meet use cases 1 and 2 but not 3.
Is that correct?
--
Robert Haas
EDB: http://www.enterprisedb.com
On Thu, 2022-03-31 at 09:05 -0400, Robert Haas wrote:
1. An AM that doesn't care about having anything happening during
recovery, but wants to be able to get logical decoding to do some
work. Maybe the intention of the AM is that data is available only
when the server is not in recovery and all data is lost on shutdown,
or maybe the AM has its own separate durability mechanism.
This is a speculative use case that is not what I would use it for, but
perhaps someone wants to do that with a table AM or maybe an FDW.
2. An AM that wants things to happen during recovery, but handles
that
separately. For example, maybe it logs all the data changes via
log_newpage() and then separately emits these log records.
Yes, or Generic WAL. Generic WAL seems like a half-feature without this
Logical WAL patch, because it's hopeless to support logical
decoding/replication of whatever data you're logging with Generic WAL.
That's probably the strongest argument for this patch.
3. An AM that wants things to happen during recovery, and expects
these records to serve that purpose. That would require getting
control during WAL replay as well as during decoding, and would
probably only work for an AM whose data is not block-structured (e.g.
an in-memory btree that is dumped to disk at every checkpoint).
This patch would not work in this case because the records are ignored
during REDO.
Regards,
Jeff Davis
On Mon, 2022-03-21 at 17:43 -0700, Andres Freund wrote:
Currently this doesn't apply:
http://cfbot.cputube.org/patch_37_3394.log
Withdrawn for now. With custom WAL resource managers this is less
important to me.
I still think it has value, and I'm willing to work on it if more use
cases come forward.
Regards,
Jeff Davis