Logical decoding for operations on zheap tables

Started by Amit Kapilaabout 7 years ago10 messages
#1Amit Kapila
amit.kapila16@gmail.com
1 attachment(s)

To support logical decoding for zheap operations, we need a way to
ensure zheap tuples can be registered as change streams. One idea
could be that we make ReorderBufferChange aware of another kind of
tuples as well, something like this:

@@ -100,6 +123,20 @@ typedef struct ReorderBufferChange
  ReorderBufferTupleBuf *newtuple;
  } tp;
+ struct
+ {
+ /* relation that has been changed */
+ RelFileNode relnode;
+
+ /* no previously reassembled toast chunks are necessary anymore */
+ bool clear_toast_afterwards;
+
+ /* valid for DELETE || UPDATE */
+ ReorderBufferZTupleBuf *oldtuple;
+ /* valid for INSERT || UPDATE */
+ ReorderBufferZTupleBuf *newtuple;
+ } ztp;
+
+/* an individual zheap tuple, stored in one chunk of memory */
+typedef struct ReorderBufferZTupleBuf
+{
..
+ /* tuple header, the interesting bit for users of logical decoding */
+ ZHeapTupleData tuple;
..
+} ReorderBufferZTupleBuf;

Apart from this, we need to define different decode functions for
zheap operations as the WAL data is different for heap and zheap, so
same functions can't be used to decode.

I have written a very hacky version to support zheap Insert operation
based on the above idea. If we want to go with this approach, we
might need a better way to represent a different type of tuple in
ReorderBufferChange.

The yet another approach could be that in the decode functions after
forming zheap tuples from WAL, we can convert them to heap tuples. I
have not tried that, so not sure if it can work, but it seems to me if
we can avoid tuple conversion overhead, it will be good.

This email is primarily to discuss about how the logical decoding for
basic DML operations (Insert/Update/Delete) will work in zheap. We
might need some special mechanism to deal with sub-transactions as
zheap doesn't generate a transaction id for sub-transactions, but we
can discuss that separately.

Thoughts?

Note - This patch is based on pluggable_zheap branch
(https://github.com/anarazel/postgres-pluggable-storage)

--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com

Attachments:

decode_zinsert_1.patchapplication/octet-stream; name=decode_zinsert_1.patchDownload
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index f6e77fbda1..e194550643 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -388,6 +388,80 @@ tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_
 	}
 }
 
+/* print the tuple 'tuple' into the StringInfo s */
+static void
+ztuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, ZHeapTuple tuple, bool skip_nulls)
+{
+	int			natt;
+
+	/* print all columns individually */
+	for (natt = 0; natt < tupdesc->natts; natt++)
+	{
+		Form_pg_attribute attr; /* the attribute itself */
+		Oid			typid;		/* type of current attribute */
+		Oid			typoutput;	/* output function */
+		bool		typisvarlena;
+		Datum		origval;	/* possibly toasted Datum */
+		bool		isnull;		/* column is null? */
+
+		attr = TupleDescAttr(tupdesc, natt);
+
+		/*
+		 * don't print dropped columns, we can't be sure everything is
+		 * available for them
+		 */
+		if (attr->attisdropped)
+			continue;
+
+		/*
+		 * Don't print system columns, oid will already have been printed if
+		 * present.
+		 */
+		if (attr->attnum < 0)
+			continue;
+
+		typid = attr->atttypid;
+
+		/* get Datum from tuple */
+		origval = zheap_getattr(tuple, natt + 1, tupdesc, &isnull);
+
+		if (isnull && skip_nulls)
+			continue;
+
+		/* print attribute name */
+		appendStringInfoChar(s, ' ');
+		appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
+
+		/* print attribute type */
+		appendStringInfoChar(s, '[');
+		appendStringInfoString(s, format_type_be(typid));
+		appendStringInfoChar(s, ']');
+
+		/* query output function */
+		getTypeOutputInfo(typid,
+			&typoutput, &typisvarlena);
+
+		/* print separator */
+		appendStringInfoChar(s, ':');
+
+		/* print data */
+		if (isnull)
+			appendStringInfoString(s, "null");
+		else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
+			appendStringInfoString(s, "unchanged-toast-datum");
+		else if (!typisvarlena)
+			print_literal(s, typid,
+				OidOutputFunctionCall(typoutput, origval));
+		else
+		{
+			Datum		val;	/* definitely detoasted Datum */
+
+			val = PointerGetDatum(PG_DETOAST_DATUM(origval));
+			print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
+		}
+	}
+}
+
 /*
  * callback for individual changed tuples
  */
@@ -468,6 +542,15 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 									&change->data.tp.oldtuple->tuple,
 									true);
 			break;
+		case REORDER_BUFFER_CHANGE_ZINSERT:
+			appendStringInfoString(ctx->out, " INSERT:");
+			if (change->data.ztp.newtuple == NULL)
+				appendStringInfoString(ctx->out, " (no-tuple-data)");
+			else
+				ztuple_to_stringinfo(ctx->out, tupdesc,
+									 &change->data.ztp.newtuple->tuple,
+									 false);
+			break;
 		default:
 			Assert(false);
 	}
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 95153f4e29..01f32e8fcf 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -34,6 +34,9 @@
 #include "access/xlogutils.h"
 #include "access/xlogreader.h"
 #include "access/xlogrecord.h"
+#include "access/zheap.h"
+#include "access/zheapam_xlog.h"
+#include "access/zhtup.h"
 
 #include "catalog/pg_control.h"
 
@@ -57,6 +60,7 @@ typedef struct XLogRecordBuffer
 static void DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeZHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -74,6 +78,10 @@ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 			xl_xact_parsed_abort *parsed, TransactionId xid);
 
+/* record handlers for zheap */
+static void DecodeZInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeXLogZTuple(char *data, Size len, ReorderBufferZTupleBuf *tuple);
+
 /* common function to decode tuples */
 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
 
@@ -161,7 +169,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 			break;
 		case RM_ZHEAP_ID:
 			/* Logical decoding is not yet implemented for zheap. */
-			Assert(0);
+			DecodeZHeapOp(ctx, &buf);
 			break;
 		case RM_ZHEAP2_ID:
 			/* Logical decoding is not yet implemented for zheap. */
@@ -510,6 +518,43 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	}
 }
 
+/*
+ * Handle rmgr ZHEAP_ID records for DecodeRecordIntoReorderBuffer().
+ */
+static void
+DecodeZHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	uint8		info = XLogRecGetInfo(buf->record) & XLOG_ZHEAP_OPMASK;
+	TransactionId xid = XLogRecGetXid(buf->record);
+	SnapBuild  *builder = ctx->snapshot_builder;
+
+	ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
+
+	/*
+	 * If we don't have snapshot or we are just fast-forwarding, there is no
+	 * point in decoding data changes.
+	 */
+	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+		ctx->fast_forward)
+		return;
+
+	switch (info)
+	{
+		case XLOG_ZHEAP_INSERT:
+			if (SnapBuildProcessChange(builder, xid, buf->origptr))
+				DecodeZInsert(ctx, buf);
+			break;
+
+		case XLOG_ZHEAP_LOCK:
+			/* we don't care about row level locks for now */
+			break;
+
+		default:
+			elog(ERROR, "unexpected RM_ZHEAP_ID record type: %u", info);
+			break;
+	}
+}
+
 static inline bool
 FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
 {
@@ -1068,3 +1113,98 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
 	header->t_infomask2 = xlhdr.t_infomask2;
 	header->t_hoff = xlhdr.t_hoff;
 }
+
+/*
+ * Parse XLOG_ZHEAP_INSERT (not ZMULTI_INSERT!) records into tuplebufs.
+ *
+ * Deletes can contain the new tuple.
+ */
+static void
+DecodeZInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	Size		datalen;
+	char	   *tupledata;
+	Size		tuplelen;
+	XLogReaderState *r = buf->record;
+	xl_zheap_insert *xlrec;
+	ReorderBufferChange *change;
+	RelFileNode target_node;
+
+	xlrec = (xl_zheap_insert *) ((char *) XLogRecGetData(r) + SizeOfUndoHeader);
+
+	/*
+	 * Ignore insert records without new tuples (this does happen when
+	 * raw_zheap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL).
+	 */
+	if (!(xlrec->flags & XLZ_INSERT_CONTAINS_NEW_TUPLE))
+		return;
+
+	/* only interested in our database */
+	XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
+	if (target_node.dbNode != ctx->slot->data.database)
+		return;
+
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+		return;
+
+	change = ReorderBufferGetChange(ctx->reorder);
+	if (!(xlrec->flags & XLZ_INSERT_IS_SPECULATIVE))
+		change->action = REORDER_BUFFER_CHANGE_ZINSERT;
+	else
+		change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ZINSERT;
+	change->origin_id = XLogRecGetOrigin(r);
+
+	memcpy(&change->data.ztp.relnode, &target_node, sizeof(RelFileNode));
+
+	tupledata = XLogRecGetBlockData(r, 0, &datalen);
+	tuplelen = datalen - SizeOfZHeapHeader;
+
+	change->data.ztp.newtuple =
+		ReorderBufferGetZTupleBuf(ctx->reorder, tuplelen);
+
+	DecodeXLogZTuple(tupledata, datalen, change->data.ztp.newtuple);
+
+	change->data.ztp.clear_toast_afterwards = true;
+
+	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
+}
+
+/*
+ * Read a ZHeapTuple as WAL logged by zheap_insert, zheap_update and
+ * zheap_delete (but not by zheap_multi_insert) into a tuplebuf.
+ *
+ * The size 'len' and the pointer 'data' in the record need to be
+ * computed outside as they are record specific.
+ */
+static void
+DecodeXLogZTuple(char *data, Size len, ReorderBufferZTupleBuf *tuple)
+{
+	xl_zheap_header xlhdr;
+	int			datalen = len - SizeOfZHeapHeader;
+	ZHeapTupleHeader header;
+
+	Assert(datalen >= 0);
+
+	tuple->tuple.t_len = datalen + SizeofZHeapTupleHeader;
+	header = tuple->tuple.t_data;
+
+	/* not a disk based tuple */
+	ItemPointerSetInvalid(&tuple->tuple.t_self);
+
+	/* we can only figure this out after reassembling the transactions */
+	tuple->tuple.t_tableOid = InvalidOid;
+
+	/* data is not stored aligned, copy to aligned storage */
+	memcpy((char *) &xlhdr, data, SizeOfZHeapHeader);
+
+	memset(header, 0, SizeofZHeapTupleHeader);
+
+	memcpy(((char *) tuple->tuple.t_data) + SizeofZHeapTupleHeader,
+		   data + SizeOfZHeapHeader,
+		   datalen);
+
+	header->t_infomask = xlhdr.t_infomask;
+	header->t_infomask2 = xlhdr.t_infomask2;
+	header->t_hoff = xlhdr.t_hoff;
+}
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 19451714da..525dc2b19d 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -347,6 +347,25 @@ logicalrep_read_truncate(StringInfo in,
 	return relids;
 }
 
+/*
+ * Write zheap's INSERT to the output stream.
+ */
+void
+logicalrep_write_zinsert(StringInfo out, Relation rel, ZHeapTuple newtuple)
+{
+	pq_sendbyte(out, 'I');		/* action INSERT */
+
+	Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
+		   rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
+		   rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
+
+	/* use Oid as relation identifier */
+	pq_sendint32(out, RelationGetRelid(rel));
+
+	pq_sendbyte(out, 'N');		/* new tuple follows */
+	//logicalrep_write_tuple(out, rel, newtuple);
+}
+
 /*
  * Write relation description to the output stream.
  */
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 23466bade2..70fb5e2934 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -393,6 +393,19 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
 				change->data.tp.oldtuple = NULL;
 			}
 			break;
+		case REORDER_BUFFER_CHANGE_ZINSERT:
+			if (change->data.tp.newtuple)
+			{
+				ReorderBufferReturnZTupleBuf(rb, change->data.ztp.newtuple);
+				change->data.ztp.newtuple = NULL;
+			}
+
+			if (change->data.ztp.oldtuple)
+			{
+				ReorderBufferReturnZTupleBuf(rb, change->data.ztp.oldtuple);
+				change->data.ztp.oldtuple = NULL;
+			}
+			break;
 		case REORDER_BUFFER_CHANGE_MESSAGE:
 			if (change->data.msg.prefix != NULL)
 				pfree(change->data.msg.prefix);
@@ -456,6 +469,37 @@ ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
 	pfree(tuple);
 }
 
+/*
+ * Get a fresh ReorderBufferZTupleBuf fitting at least a tuple of size
+ * tuple_len (excluding header overhead).
+ */
+ReorderBufferZTupleBuf *
+ReorderBufferGetZTupleBuf(ReorderBuffer *rb, Size tuple_len)
+{
+	ReorderBufferZTupleBuf *tuple;
+	Size		alloc_len;
+
+	alloc_len = tuple_len + SizeofZHeapTupleHeader;
+
+	tuple = (ReorderBufferZTupleBuf *)
+		MemoryContextAlloc(rb->tup_context,
+						   sizeof(ReorderBufferZTupleBuf) +
+						   alloc_len);
+	tuple->alloc_tuple_size = alloc_len;
+	tuple->tuple.t_data = ReorderBufferZTupleBufData(tuple);
+
+	return tuple;
+}
+
+/*
+ * Free an ReorderBufferZTupleBuf.
+ */
+void
+ReorderBufferReturnZTupleBuf(ReorderBuffer *rb, ReorderBufferZTupleBuf *tuple)
+{
+	pfree(tuple);
+}
+
 /*
  * Get an array for relids of truncated relations.
  *
@@ -1684,6 +1728,71 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 						break;
 					}
 
+				case REORDER_BUFFER_CHANGE_ZINSERT:
+					{
+						Assert(snapshot_now);
+
+						reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
+							change->data.tp.relnode.relNode);
+
+						if (reloid == InvalidOid &&
+							change->data.ztp.newtuple == NULL &&
+							change->data.ztp.oldtuple == NULL)
+							goto change_done;
+						else if (reloid == InvalidOid)
+							elog(ERROR, "could not map filenode \"%s\" to relation OID",
+								relpathperm(change->data.ztp.relnode,
+									MAIN_FORKNUM));
+
+						relation = RelationIdGetRelation(reloid);
+
+						if (relation == NULL)
+							elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
+								 reloid,
+								 relpathperm(change->data.ztp.relnode,
+											 MAIN_FORKNUM));
+
+						if (!RelationIsLogicallyLogged(relation))
+							goto zchange_done;
+
+						/*
+						 * Ignore temporary heaps created during DDL unless the
+						 * plugin has asked for them.
+						 */
+						if (relation->rd_rel->relrewrite && !rb->output_rewrites)
+							goto zchange_done;
+
+						/*
+						 * For now ignore sequence changes entirely. Most of the
+						 * time they don't log changes using records we
+						 * understand, so it doesn't make sense to handle the few
+						 * cases we do.
+						 */
+						if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
+							goto zchange_done;
+
+						/* user-triggered change */
+						if (!IsToastRelation(relation))
+						{
+							rb->apply_change(rb, txn, relation, change);
+						}
+						else if (change->action == REORDER_BUFFER_CHANGE_ZINSERT)
+						{
+							/* toast table implementation for zheap is not done yet. */
+							elog(ERROR,"decoding for toast tables not supported in zheap");
+						}
+
+					zchange_done:
+
+						if (relation != NULL)
+						{
+							RelationClose(relation);
+							relation = NULL;
+						}
+					}
+
+					break;
+
 				case REORDER_BUFFER_CHANGE_MESSAGE:
 					rb->message(rb, txn, change->lsn, true,
 								change->data.msg.prefix,
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 86e0951a70..303a3b8303 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -365,6 +365,12 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			else
 				elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
 			break;
+		case REORDER_BUFFER_CHANGE_ZINSERT:
+			OutputPluginPrepareWrite(ctx, true);
+			logicalrep_write_zinsert(ctx->out, relation,
+									 &change->data.ztp.newtuple->tuple);
+			OutputPluginWrite(ctx, true);
+			break;
 		default:
 			Assert(false);
 	}
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 8192f79ce3..587472ee06 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -106,4 +106,7 @@ extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
 extern void logicalrep_write_typ(StringInfo out, Oid typoid);
 extern void logicalrep_read_typ(StringInfo out, LogicalRepTyp *ltyp);
 
+extern void logicalrep_write_zinsert(StringInfo out, Relation rel,
+						ZHeapTuple newtuple);
+
 #endif							/* LOGICALREP_PROTO_H */
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 7787edf7b6..2544cc3951 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -10,6 +10,7 @@
 #define REORDERBUFFER_H
 
 #include "access/htup_details.h"
+#include "access/zhtup.h"
 #include "lib/ilist.h"
 #include "storage/sinval.h"
 #include "utils/hsearch.h"
@@ -36,6 +37,25 @@ typedef struct ReorderBufferTupleBuf
 #define ReorderBufferTupleBufData(p) \
 	((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf)))
 
+/* an individual zheap tuple, stored in one chunk of memory */
+typedef struct ReorderBufferZTupleBuf
+{
+	/* position in preallocated list */
+	slist_node	node;
+
+	/* tuple header, the interesting bit for users of logical decoding */
+	ZHeapTupleData tuple;
+
+	/* pre-allocated size of tuple buffer, different from tuple size */
+	Size		alloc_tuple_size;
+
+	/* actual tuple data follows */
+} ReorderBufferZTupleBuf;
+
+/* pointer to the data stored in a TupleBuf */
+#define ReorderBufferZTupleBufData(p) \
+	((ZHeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferZTupleBuf)))
+
 /*
  * Types of the change passed to a 'change' callback.
  *
@@ -60,7 +80,10 @@ enum ReorderBufferChangeType
 	REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
 	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
 	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM,
-	REORDER_BUFFER_CHANGE_TRUNCATE
+	REORDER_BUFFER_CHANGE_TRUNCATE,
+	REORDER_BUFFER_CHANGE_ZINSERT,
+	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ZINSERT,
+	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ZCONFIRM
 };
 
 /*
@@ -100,6 +123,20 @@ typedef struct ReorderBufferChange
 			ReorderBufferTupleBuf *newtuple;
 		}			tp;
 
+		struct
+		{
+			/* relation that has been changed */
+			RelFileNode relnode;
+
+			/* no previously reassembled toast chunks are necessary anymore */
+			bool		clear_toast_afterwards;
+
+			/* valid for DELETE || UPDATE */
+			ReorderBufferZTupleBuf *oldtuple;
+			/* valid for INSERT || UPDATE */
+			ReorderBufferZTupleBuf *newtuple;
+		}			ztp;
+
 		/*
 		 * Truncate data for REORDER_BUFFER_CHANGE_TRUNCATE representing one
 		 * set of relations to be truncated.
@@ -399,6 +436,8 @@ void		ReorderBufferFree(ReorderBuffer *);
 
 ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *, Size tuple_len);
 void		ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple);
+ReorderBufferZTupleBuf *ReorderBufferGetZTupleBuf(ReorderBuffer *rb, Size tuple_len);
+void		ReorderBufferReturnZTupleBuf(ReorderBuffer *, ReorderBufferZTupleBuf *tuple);
 ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *);
 void		ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *);
 
#2Amit Kapila
amit.kapila16@gmail.com
In reply to: Amit Kapila (#1)
1 attachment(s)
Re: Logical decoding for operations on zheap tables

On Mon, Dec 31, 2018 at 9:56 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

To support logical decoding for zheap operations, we need a way to
ensure zheap tuples can be registered as change streams. One idea
could be that we make ReorderBufferChange aware of another kind of
tuples as well, something like this:

..

Apart from this, we need to define different decode functions for
zheap operations as the WAL data is different for heap and zheap, so
same functions can't be used to decode.

I have written a very hacky version to support zheap Insert operation
based on the above idea.

I went ahead and tried to implement the decoding for Delete operation
as well based on the above approach and the result is attached.

The yet another approach could be that in the decode functions after
forming zheap tuples from WAL, we can convert them to heap tuples. I
have not tried that, so not sure if it can work, but it seems to me if
we can avoid tuple conversion overhead, it will be good.

While implementing the decoding for delete operation, I noticed that
the main changes required are to write a decode operation and
additional WAL (like old tuple) which anyway is required even if we
pursue this approach, so I think it might be better to with the
approach where we don't need tuple conversion (aka something similar
to what is done in attached patch).

Note - This patch is based on pluggable-zheap branch
(https://github.com/anarazel/postgres-pluggable-storage)

--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com

Attachments:

decode_zops_2.patchapplication/octet-stream; name=decode_zops_2.patchDownload
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index f6e77fbda1..3749e06707 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -388,6 +388,80 @@ tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_
 	}
 }
 
+/* print the tuple 'tuple' into the StringInfo s */
+static void
+ztuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, ZHeapTuple tuple, bool skip_nulls)
+{
+	int			natt;
+
+	/* print all columns individually */
+	for (natt = 0; natt < tupdesc->natts; natt++)
+	{
+		Form_pg_attribute attr; /* the attribute itself */
+		Oid			typid;		/* type of current attribute */
+		Oid			typoutput;	/* output function */
+		bool		typisvarlena;
+		Datum		origval;	/* possibly toasted Datum */
+		bool		isnull;		/* column is null? */
+
+		attr = TupleDescAttr(tupdesc, natt);
+
+		/*
+		 * don't print dropped columns, we can't be sure everything is
+		 * available for them
+		 */
+		if (attr->attisdropped)
+			continue;
+
+		/*
+		 * Don't print system columns, oid will already have been printed if
+		 * present.
+		 */
+		if (attr->attnum < 0)
+			continue;
+
+		typid = attr->atttypid;
+
+		/* get Datum from tuple */
+		origval = zheap_getattr(tuple, natt + 1, tupdesc, &isnull);
+
+		if (isnull && skip_nulls)
+			continue;
+
+		/* print attribute name */
+		appendStringInfoChar(s, ' ');
+		appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
+
+		/* print attribute type */
+		appendStringInfoChar(s, '[');
+		appendStringInfoString(s, format_type_be(typid));
+		appendStringInfoChar(s, ']');
+
+		/* query output function */
+		getTypeOutputInfo(typid,
+			&typoutput, &typisvarlena);
+
+		/* print separator */
+		appendStringInfoChar(s, ':');
+
+		/* print data */
+		if (isnull)
+			appendStringInfoString(s, "null");
+		else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
+			appendStringInfoString(s, "unchanged-toast-datum");
+		else if (!typisvarlena)
+			print_literal(s, typid,
+				OidOutputFunctionCall(typoutput, origval));
+		else
+		{
+			Datum		val;	/* definitely detoasted Datum */
+
+			val = PointerGetDatum(PG_DETOAST_DATUM(origval));
+			print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
+		}
+	}
+}
+
 /*
  * callback for individual changed tuples
  */
@@ -468,6 +542,27 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 									&change->data.tp.oldtuple->tuple,
 									true);
 			break;
+		case REORDER_BUFFER_CHANGE_ZINSERT:
+			appendStringInfoString(ctx->out, " INSERT:");
+			if (change->data.ztp.newtuple == NULL)
+				appendStringInfoString(ctx->out, " (no-tuple-data)");
+			else
+				ztuple_to_stringinfo(ctx->out, tupdesc,
+									 &change->data.ztp.newtuple->tuple,
+									 false);
+			break;
+		case REORDER_BUFFER_CHANGE_ZDELETE:
+			appendStringInfoString(ctx->out, " DELETE:");
+
+			/* if there was no PK, we only know that a delete happened */
+			if (change->data.ztp.oldtuple == NULL)
+				appendStringInfoString(ctx->out, " (no-tuple-data)");
+			/* In DELETE, only the replica identity is present; display that */
+			else
+				ztuple_to_stringinfo(ctx->out, tupdesc,
+									 &change->data.ztp.oldtuple->tuple,
+									 true);
+			break;
 		default:
 			Assert(false);
 	}
diff --git a/src/backend/access/zheap/zheapam.c b/src/backend/access/zheap/zheapam.c
index c916bde3fb..ac93c7d37a 100644
--- a/src/backend/access/zheap/zheapam.c
+++ b/src/backend/access/zheap/zheapam.c
@@ -115,6 +115,8 @@ static void zheap_lock_tuple_guts(Relation rel, Buffer buf, ZHeapTuple zhtup,
 					  TransactionId single_locker_xid, int single_locker_trans_slot,
 					  UndoRecPtr prev_urecptr, CommandId cid,
 					  bool any_multi_locker_member_alive);
+static ZHeapTuple ZExtractReplicaIdentity(Relation relation, ZHeapTuple tp,
+						bool key_changed, bool *copy);
 static void compute_new_xid_infomask(ZHeapTuple zhtup, Buffer buf,
 						 TransactionId tup_xid, int tup_trans_slot,
 						 uint16 old_infomask, TransactionId add_to_xid,
@@ -1259,6 +1261,7 @@ zheap_delete(Relation relation, ItemPointer tid,
 	CommandId		tup_cid;
 	ItemId		lp;
 	ZHeapTupleData zheaptup;
+	ZHeapTuple	old_key_tuple = NULL;	/* replica identity of the tuple */
 	UnpackedUndoRecord	undorecord;
 	Page		page;
 	BlockNumber blkno;
@@ -1280,6 +1283,7 @@ zheap_delete(Relation relation, ItemPointer tid,
 	bool		lock_reacquired;
 	bool		hasSubXactLock = false;
 	bool		hasPayload = false;
+	bool		old_key_copied = false;
 	xl_undolog_meta undometa;
 	uint8		vm_status;
 
@@ -1933,6 +1937,13 @@ zheap_tuple_updated:
 	vm_status = visibilitymap_get_status(relation,
 								BufferGetBlockNumber(buffer), &vmbuffer);
 
+	/*
+	 * Compute replica identity tuple before entering the critical section so
+	 * we don't PANIC upon a memory allocation failure.
+	 */
+	old_key_tuple = ZExtractReplicaIdentity(relation, &zheaptup, true,
+											&old_key_copied);
+
 	START_CRIT_SECTION();
 
 	/*
@@ -1986,6 +1997,7 @@ zheap_tuple_updated:
 		XLogRecPtr	RedoRecPtr;
 		uint32		totalundotuplen = 0;
 		Size		dataoff;
+		int			bufflags = 0;
 		bool		doPageWrites;
 
 		/*
@@ -2006,6 +2018,15 @@ zheap_tuple_updated:
 			xlrec.flags |= XLZ_DELETE_IS_PARTITION_MOVE;
 		if (hasSubXactLock)
 			xlrec.flags |= XLZ_DELETE_CONTAINS_SUBXACT;
+		if (old_key_tuple != NULL)
+		{
+			bufflags |= REGBUF_KEEP_DATA;
+
+			if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+				xlrec.flags |= XLZ_DELETE_CONTAINS_OLD_TUPLE;
+			else
+				xlrec.flags |= XLZ_DELETE_CONTAINS_OLD_KEY;
+		}
 
 		/*
 		 * If full_page_writes is enabled, and the buffer image is not
@@ -2051,7 +2072,27 @@ prepare_xlog:
 							totalundotuplen - SizeofZHeapTupleHeader);
 		}
 
-		XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
+		XLogRegisterBuffer(0, buffer, REGBUF_STANDARD | bufflags);
+
+		/*
+		 * Log replica identity of the deleted tuple if there is one
+		 */
+		if (old_key_tuple != NULL)
+		{
+			xl_zheap_header xlzhdr;
+
+			xlzhdr.t_infomask2 = old_key_tuple->t_data->t_infomask2;
+			xlzhdr.t_infomask = old_key_tuple->t_data->t_infomask;
+			xlzhdr.t_hoff = old_key_tuple->t_data->t_hoff;
+
+			XLogRegisterBufData(0, (char *) &xlzhdr, SizeOfZHeapHeader);
+			XLogRegisterBufData(0,
+								(char *) old_key_tuple->t_data +
+								SizeofZHeapTupleHeader,
+								old_key_tuple->t_len -
+								SizeofZHeapTupleHeader);
+		}
+
 		if (trans_slot_id > ZHEAP_PAGE_TRANS_SLOTS)
 			(void) RegisterTPDBuffer(page, 1);
 
@@ -2108,6 +2149,9 @@ prepare_xlog:
 	if (have_tuple_lock)
 		UnlockTupleTuplock(relation, &(zheaptup.t_self), LockTupleExclusive);
 
+	if (old_key_tuple != NULL && old_key_copied)
+		zheap_freetuple(old_key_tuple);
+
 	pgstat_count_heap_delete(relation);
 
 	return HeapTupleMayBeUpdated;
@@ -5846,6 +5890,102 @@ prepare_xlog:
 	UnlockReleaseTPDBuffers();
 }
 
+/*
+ * Build a zheap tuple representing the configured REPLICA IDENTITY to represent
+ * the old tuple in a UPDATE or DELETE.
+ *
+ * Returns NULL if there's no need to log an identity or if there's no suitable
+ * key in the Relation relation.
+ */
+static ZHeapTuple
+ZExtractReplicaIdentity(Relation relation, ZHeapTuple tp, bool key_changed,
+						bool *copy)
+{
+	TupleDesc	desc = RelationGetDescr(relation);
+	Oid			replidindex;
+	Relation	idx_rel;
+	char		replident = relation->rd_rel->relreplident;
+	ZHeapTuple	key_tuple = NULL;
+	bool		nulls[MaxHeapAttributeNumber];
+	Datum		values[MaxHeapAttributeNumber];
+	int			natt;
+
+	*copy = false;
+
+	if (!RelationIsLogicallyLogged(relation))
+		return NULL;
+
+	if (replident == REPLICA_IDENTITY_NOTHING)
+		return NULL;
+
+	if (replident == REPLICA_IDENTITY_FULL)
+	{
+		/*
+		 * When logging the entire old tuple, it very well could contain
+		 * toasted columns. If so, force them to be inlined.
+		 */
+		if (ZHeapTupleHasExternal(tp))
+		{
+			elog(ERROR, "toast tables are not supported with replica identity");
+		}
+		return tp;
+	}
+
+	/* if the key hasn't changed and we're only logging the key, we're done */
+	if (!key_changed)
+		return NULL;
+
+	/* find the replica identity index */
+	replidindex = RelationGetReplicaIndex(relation);
+	if (!OidIsValid(replidindex))
+	{
+		elog(DEBUG4, "could not find configured replica identity for table \"%s\"",
+			 RelationGetRelationName(relation));
+		return NULL;
+	}
+
+	idx_rel = RelationIdGetRelation(replidindex);
+
+	Assert(CheckRelationLockedByMe(idx_rel, AccessShareLock, true));
+
+	/* deform tuple, so we have fast access to columns */
+	zheap_deform_tuple(tp, desc, values, nulls);
+
+	/* set all columns to NULL, regardless of whether they actually are */
+	memset(nulls, 1, sizeof(nulls));
+
+	/*
+	 * Now set all columns contained in the index to NOT NULL, they cannot
+	 * currently be NULL.
+	 */
+	for (natt = 0; natt < IndexRelationGetNumberOfKeyAttributes(idx_rel); natt++)
+	{
+		int			attno = idx_rel->rd_index->indkey.values[natt];
+
+		if (attno < 0)
+			elog(ERROR, "system column in index");
+		nulls[attno - 1] = false;
+	}
+
+	key_tuple = zheap_form_tuple(desc, values, nulls);
+	*copy = true;
+	RelationClose(idx_rel);
+
+	/*
+	 * If the tuple, which by here only contains indexed columns, still has
+	 * toasted columns, force them to be inlined. This is somewhat unlikely
+	 * since there's limits on the size of indexed columns, so we don't
+	 * duplicate toast_flatten_tuple()s functionality in the above loop over
+	 * the indexed columns, even if it would be more efficient.
+	 */
+	if (ZHeapTupleHasExternal(key_tuple))
+	{
+		elog(ERROR, "toast tables are not supported with replica identity");
+	}
+
+	return key_tuple;
+}
+
 /*
  * compute_new_xid_infomask - Given the old values of tuple header's infomask,
  * compute the new values for tuple header which includes lock mode, new
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 95153f4e29..9bed68a6e1 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -34,6 +34,9 @@
 #include "access/xlogutils.h"
 #include "access/xlogreader.h"
 #include "access/xlogrecord.h"
+#include "access/zheap.h"
+#include "access/zheapam_xlog.h"
+#include "access/zhtup.h"
 
 #include "catalog/pg_control.h"
 
@@ -57,6 +60,7 @@ typedef struct XLogRecordBuffer
 static void DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeZHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -74,6 +78,11 @@ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 			xl_xact_parsed_abort *parsed, TransactionId xid);
 
+/* record handlers for zheap */
+static void DecodeZInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeZDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeXLogZTuple(char *data, Size len, ReorderBufferZTupleBuf *tuple);
+
 /* common function to decode tuples */
 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
 
@@ -161,7 +170,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 			break;
 		case RM_ZHEAP_ID:
 			/* Logical decoding is not yet implemented for zheap. */
-			Assert(0);
+			DecodeZHeapOp(ctx, &buf);
 			break;
 		case RM_ZHEAP2_ID:
 			/* Logical decoding is not yet implemented for zheap. */
@@ -510,6 +519,48 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	}
 }
 
+/*
+ * Handle rmgr ZHEAP_ID records for DecodeRecordIntoReorderBuffer().
+ */
+static void
+DecodeZHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	uint8		info = XLogRecGetInfo(buf->record) & XLOG_ZHEAP_OPMASK;
+	TransactionId xid = XLogRecGetXid(buf->record);
+	SnapBuild  *builder = ctx->snapshot_builder;
+
+	ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
+
+	/*
+	 * If we don't have snapshot or we are just fast-forwarding, there is no
+	 * point in decoding data changes.
+	 */
+	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+		ctx->fast_forward)
+		return;
+
+	switch (info)
+	{
+		case XLOG_ZHEAP_INSERT:
+			if (SnapBuildProcessChange(builder, xid, buf->origptr))
+				DecodeZInsert(ctx, buf);
+			break;
+
+		case XLOG_ZHEAP_DELETE:
+			if (SnapBuildProcessChange(builder, xid, buf->origptr))
+				DecodeZDelete(ctx, buf);
+			break;
+
+		case XLOG_ZHEAP_LOCK:
+			/* we don't care about row level locks for now */
+			break;
+
+		default:
+			elog(ERROR, "unexpected RM_ZHEAP_ID record type: %u", info);
+			break;
+	}
+}
+
 static inline bool
 FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
 {
@@ -1068,3 +1119,150 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
 	header->t_infomask2 = xlhdr.t_infomask2;
 	header->t_hoff = xlhdr.t_hoff;
 }
+
+/*
+ * Parse XLOG_ZHEAP_INSERT (not ZMULTI_INSERT!) records into tuplebufs.
+ *
+ * Deletes can contain the new tuple.
+ */
+static void
+DecodeZInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	Size		datalen;
+	char	   *tupledata;
+	Size		tuplelen;
+	XLogReaderState *r = buf->record;
+	xl_zheap_insert *xlrec;
+	ReorderBufferChange *change;
+	RelFileNode target_node;
+
+	xlrec = (xl_zheap_insert *) ((char *) XLogRecGetData(r) + SizeOfUndoHeader);
+
+	/*
+	 * Ignore insert records without new tuples (this does happen when
+	 * raw_zheap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL).
+	 */
+	if (!(xlrec->flags & XLZ_INSERT_CONTAINS_NEW_TUPLE))
+		return;
+
+	/* only interested in our database */
+	XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
+	if (target_node.dbNode != ctx->slot->data.database)
+		return;
+
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+		return;
+
+	change = ReorderBufferGetChange(ctx->reorder);
+	if (!(xlrec->flags & XLZ_INSERT_IS_SPECULATIVE))
+		change->action = REORDER_BUFFER_CHANGE_ZINSERT;
+	else
+		change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ZINSERT;
+	change->origin_id = XLogRecGetOrigin(r);
+
+	memcpy(&change->data.ztp.relnode, &target_node, sizeof(RelFileNode));
+
+	tupledata = XLogRecGetBlockData(r, 0, &datalen);
+	tuplelen = datalen - SizeOfZHeapHeader;
+
+	change->data.ztp.newtuple =
+		ReorderBufferGetZTupleBuf(ctx->reorder, tuplelen);
+
+	DecodeXLogZTuple(tupledata, datalen, change->data.ztp.newtuple);
+
+	change->data.ztp.clear_toast_afterwards = true;
+
+	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
+}
+
+/*
+ * Parse XLOG_ZHEAP_DELETE from wal into proper tuplebufs.
+ *
+ * Deletes can possibly contain the old primary key.
+ */
+static void
+DecodeZDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	XLogReaderState *r = buf->record;
+	xl_zheap_delete *xlrec;
+	ReorderBufferChange *change;
+	RelFileNode target_node;
+
+	xlrec = (xl_zheap_delete *) ((char *) XLogRecGetData(r) + SizeOfUndoHeader);
+
+	/* only interested in our database */
+	XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
+	if (target_node.dbNode != ctx->slot->data.database)
+		return;
+
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+		return;
+
+	change = ReorderBufferGetChange(ctx->reorder);
+	change->action = REORDER_BUFFER_CHANGE_ZDELETE;
+	change->origin_id = XLogRecGetOrigin(r);
+
+	memcpy(&change->data.ztp.relnode, &target_node, sizeof(RelFileNode));
+
+	/* old primary key stored */
+	if (xlrec->flags & XLZ_DELETE_CONTAINS_OLD)
+	{
+		char	*tupledata;
+		Size	datalen;
+		Size	tuplelen;
+
+		tupledata = XLogRecGetBlockData(r, 0, &datalen);
+		tuplelen = datalen - SizeOfZHeapHeader;
+
+		change->data.ztp.oldtuple =
+			ReorderBufferGetZTupleBuf(ctx->reorder, tuplelen);
+
+		DecodeXLogZTuple(tupledata,
+						 datalen, change->data.ztp.oldtuple);
+	}
+
+	change->data.ztp.clear_toast_afterwards = true;
+
+	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
+}
+
+/*
+ * Read a ZHeapTuple as WAL logged by zheap_insert, zheap_update and
+ * zheap_delete (but not by zheap_multi_insert) into a tuplebuf.
+ *
+ * The size 'len' and the pointer 'data' in the record need to be
+ * computed outside as they are record specific.
+ */
+static void
+DecodeXLogZTuple(char *data, Size len, ReorderBufferZTupleBuf *tuple)
+{
+	xl_zheap_header xlhdr;
+	int			datalen = len - SizeOfZHeapHeader;
+	ZHeapTupleHeader header;
+
+	Assert(datalen >= 0);
+
+	tuple->tuple.t_len = datalen + SizeofZHeapTupleHeader;
+	header = tuple->tuple.t_data;
+
+	/* not a disk based tuple */
+	ItemPointerSetInvalid(&tuple->tuple.t_self);
+
+	/* we can only figure this out after reassembling the transactions */
+	tuple->tuple.t_tableOid = InvalidOid;
+
+	/* data is not stored aligned, copy to aligned storage */
+	memcpy((char *) &xlhdr, data, SizeOfZHeapHeader);
+
+	memset(header, 0, SizeofZHeapTupleHeader);
+
+	memcpy(((char *) tuple->tuple.t_data) + SizeofZHeapTupleHeader,
+		   data + SizeOfZHeapHeader,
+		   datalen);
+
+	header->t_infomask = xlhdr.t_infomask;
+	header->t_infomask2 = xlhdr.t_infomask2;
+	header->t_hoff = xlhdr.t_hoff;
+}
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 19451714da..525dc2b19d 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -347,6 +347,25 @@ logicalrep_read_truncate(StringInfo in,
 	return relids;
 }
 
+/*
+ * Write zheap's INSERT to the output stream.
+ */
+void
+logicalrep_write_zinsert(StringInfo out, Relation rel, ZHeapTuple newtuple)
+{
+	pq_sendbyte(out, 'I');		/* action INSERT */
+
+	Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
+		   rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
+		   rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
+
+	/* use Oid as relation identifier */
+	pq_sendint32(out, RelationGetRelid(rel));
+
+	pq_sendbyte(out, 'N');		/* new tuple follows */
+	//logicalrep_write_tuple(out, rel, newtuple);
+}
+
 /*
  * Write relation description to the output stream.
  */
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 23466bade2..d2ef994899 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -393,6 +393,20 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
 				change->data.tp.oldtuple = NULL;
 			}
 			break;
+		case REORDER_BUFFER_CHANGE_ZINSERT:
+		case REORDER_BUFFER_CHANGE_ZDELETE:
+			if (change->data.ztp.newtuple)
+			{
+				ReorderBufferReturnZTupleBuf(rb, change->data.ztp.newtuple);
+				change->data.ztp.newtuple = NULL;
+			}
+
+			if (change->data.ztp.oldtuple)
+			{
+				ReorderBufferReturnZTupleBuf(rb, change->data.ztp.oldtuple);
+				change->data.ztp.oldtuple = NULL;
+			}
+			break;
 		case REORDER_BUFFER_CHANGE_MESSAGE:
 			if (change->data.msg.prefix != NULL)
 				pfree(change->data.msg.prefix);
@@ -456,6 +470,37 @@ ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
 	pfree(tuple);
 }
 
+/*
+ * Get a fresh ReorderBufferZTupleBuf fitting at least a tuple of size
+ * tuple_len (excluding header overhead).
+ */
+ReorderBufferZTupleBuf *
+ReorderBufferGetZTupleBuf(ReorderBuffer *rb, Size tuple_len)
+{
+	ReorderBufferZTupleBuf *tuple;
+	Size		alloc_len;
+
+	alloc_len = tuple_len + SizeofZHeapTupleHeader;
+
+	tuple = (ReorderBufferZTupleBuf *)
+		MemoryContextAlloc(rb->tup_context,
+						   sizeof(ReorderBufferZTupleBuf) +
+						   alloc_len);
+	tuple->alloc_tuple_size = alloc_len;
+	tuple->tuple.t_data = ReorderBufferZTupleBufData(tuple);
+
+	return tuple;
+}
+
+/*
+ * Free an ReorderBufferZTupleBuf.
+ */
+void
+ReorderBufferReturnZTupleBuf(ReorderBuffer *rb, ReorderBufferZTupleBuf *tuple)
+{
+	pfree(tuple);
+}
+
 /*
  * Get an array for relids of truncated relations.
  *
@@ -1684,6 +1729,72 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 						break;
 					}
 
+				case REORDER_BUFFER_CHANGE_ZINSERT:
+				case REORDER_BUFFER_CHANGE_ZDELETE:
+					{
+						Assert(snapshot_now);
+
+						reloid = RelidByRelfilenode(change->data.ztp.relnode.spcNode,
+													change->data.ztp.relnode.relNode);
+
+						if (reloid == InvalidOid &&
+							change->data.ztp.newtuple == NULL &&
+							change->data.ztp.oldtuple == NULL)
+							goto change_done;
+						else if (reloid == InvalidOid)
+							elog(ERROR, "could not map filenode \"%s\" to relation OID",
+								relpathperm(change->data.ztp.relnode,
+									MAIN_FORKNUM));
+
+						relation = RelationIdGetRelation(reloid);
+
+						if (relation == NULL)
+							elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
+								 reloid,
+								 relpathperm(change->data.ztp.relnode,
+											 MAIN_FORKNUM));
+
+						if (!RelationIsLogicallyLogged(relation))
+							goto zchange_done;
+
+						/*
+						 * Ignore temporary heaps created during DDL unless the
+						 * plugin has asked for them.
+						 */
+						if (relation->rd_rel->relrewrite && !rb->output_rewrites)
+							goto zchange_done;
+
+						/*
+						 * For now ignore sequence changes entirely. Most of the
+						 * time they don't log changes using records we
+						 * understand, so it doesn't make sense to handle the few
+						 * cases we do.
+						 */
+						if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
+							goto zchange_done;
+
+						/* user-triggered change */
+						if (!IsToastRelation(relation))
+						{
+							rb->apply_change(rb, txn, relation, change);
+						}
+						else if (change->action == REORDER_BUFFER_CHANGE_ZINSERT)
+						{
+							/* toast table implementation for zheap is not done yet. */
+							elog(ERROR,"decoding for toast tables not supported in zheap");
+						}
+
+					zchange_done:
+
+						if (relation != NULL)
+						{
+							RelationClose(relation);
+							relation = NULL;
+						}
+					}
+
+					break;
+
 				case REORDER_BUFFER_CHANGE_MESSAGE:
 					rb->message(rb, txn, change->lsn, true,
 								change->data.msg.prefix,
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 86e0951a70..303a3b8303 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -365,6 +365,12 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			else
 				elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
 			break;
+		case REORDER_BUFFER_CHANGE_ZINSERT:
+			OutputPluginPrepareWrite(ctx, true);
+			logicalrep_write_zinsert(ctx->out, relation,
+									 &change->data.ztp.newtuple->tuple);
+			OutputPluginWrite(ctx, true);
+			break;
 		default:
 			Assert(false);
 	}
diff --git a/src/include/access/zheapam_xlog.h b/src/include/access/zheapam_xlog.h
index 004aea495e..a023b9a7bc 100644
--- a/src/include/access/zheapam_xlog.h
+++ b/src/include/access/zheapam_xlog.h
@@ -124,6 +124,12 @@ typedef struct xl_zheap_insert
 #define XLZ_DELETE_CONTAINS_TPD_SLOT			(1<<2)
 #define XLZ_DELETE_CONTAINS_SUBXACT				(1<<3)
 #define XLZ_DELETE_IS_PARTITION_MOVE			(1<<4)
+#define XLZ_DELETE_CONTAINS_OLD_TUPLE			(1<<5)
+#define XLZ_DELETE_CONTAINS_OLD_KEY				(1<<6)
+
+/* convenience macro for checking whether any form of old tuple was logged */
+#define XLZ_DELETE_CONTAINS_OLD						\
+	(XLZ_DELETE_CONTAINS_OLD_TUPLE | XLZ_DELETE_CONTAINS_OLD_KEY)
 
 /* This is what we need to know about delete */
 typedef struct xl_zheap_delete
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 8192f79ce3..587472ee06 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -106,4 +106,7 @@ extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
 extern void logicalrep_write_typ(StringInfo out, Oid typoid);
 extern void logicalrep_read_typ(StringInfo out, LogicalRepTyp *ltyp);
 
+extern void logicalrep_write_zinsert(StringInfo out, Relation rel,
+						ZHeapTuple newtuple);
+
 #endif							/* LOGICALREP_PROTO_H */
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 7787edf7b6..837aa70d75 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -10,6 +10,7 @@
 #define REORDERBUFFER_H
 
 #include "access/htup_details.h"
+#include "access/zhtup.h"
 #include "lib/ilist.h"
 #include "storage/sinval.h"
 #include "utils/hsearch.h"
@@ -36,6 +37,25 @@ typedef struct ReorderBufferTupleBuf
 #define ReorderBufferTupleBufData(p) \
 	((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf)))
 
+/* an individual zheap tuple, stored in one chunk of memory */
+typedef struct ReorderBufferZTupleBuf
+{
+	/* position in preallocated list */
+	slist_node	node;
+
+	/* tuple header, the interesting bit for users of logical decoding */
+	ZHeapTupleData tuple;
+
+	/* pre-allocated size of tuple buffer, different from tuple size */
+	Size		alloc_tuple_size;
+
+	/* actual tuple data follows */
+} ReorderBufferZTupleBuf;
+
+/* pointer to the data stored in a TupleBuf */
+#define ReorderBufferZTupleBufData(p) \
+	((ZHeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferZTupleBuf)))
+
 /*
  * Types of the change passed to a 'change' callback.
  *
@@ -60,7 +80,12 @@ enum ReorderBufferChangeType
 	REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
 	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
 	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM,
-	REORDER_BUFFER_CHANGE_TRUNCATE
+	REORDER_BUFFER_CHANGE_TRUNCATE,
+	REORDER_BUFFER_CHANGE_ZINSERT,
+	REORDER_BUFFER_CHANGE_ZUPDATE,
+	REORDER_BUFFER_CHANGE_ZDELETE,
+	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ZINSERT,
+	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ZCONFIRM
 };
 
 /*
@@ -100,6 +125,20 @@ typedef struct ReorderBufferChange
 			ReorderBufferTupleBuf *newtuple;
 		}			tp;
 
+		struct
+		{
+			/* relation that has been changed */
+			RelFileNode relnode;
+
+			/* no previously reassembled toast chunks are necessary anymore */
+			bool		clear_toast_afterwards;
+
+			/* valid for DELETE || UPDATE */
+			ReorderBufferZTupleBuf *oldtuple;
+			/* valid for INSERT || UPDATE */
+			ReorderBufferZTupleBuf *newtuple;
+		}			ztp;
+
 		/*
 		 * Truncate data for REORDER_BUFFER_CHANGE_TRUNCATE representing one
 		 * set of relations to be truncated.
@@ -399,6 +438,8 @@ void		ReorderBufferFree(ReorderBuffer *);
 
 ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *, Size tuple_len);
 void		ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple);
+ReorderBufferZTupleBuf *ReorderBufferGetZTupleBuf(ReorderBuffer *rb, Size tuple_len);
+void		ReorderBufferReturnZTupleBuf(ReorderBuffer *, ReorderBufferZTupleBuf *tuple);
 ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *);
 void		ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *);
 
#3Andres Freund
andres@anarazel.de
In reply to: Amit Kapila (#1)
Re: Logical decoding for operations on zheap tables

Hi,

On 2018-12-31 09:56:48 +0530, Amit Kapila wrote:

To support logical decoding for zheap operations, we need a way to
ensure zheap tuples can be registered as change streams. One idea
could be that we make ReorderBufferChange aware of another kind of
tuples as well, something like this:

@@ -100,6 +123,20 @@ typedef struct ReorderBufferChange
ReorderBufferTupleBuf *newtuple;
} tp;
+ struct
+ {
+ /* relation that has been changed */
+ RelFileNode relnode;
+
+ /* no previously reassembled toast chunks are necessary anymore */
+ bool clear_toast_afterwards;
+
+ /* valid for DELETE || UPDATE */
+ ReorderBufferZTupleBuf *oldtuple;
+ /* valid for INSERT || UPDATE */
+ ReorderBufferZTupleBuf *newtuple;
+ } ztp;
+
+/* an individual zheap tuple, stored in one chunk of memory */
+typedef struct ReorderBufferZTupleBuf
+{
..
+ /* tuple header, the interesting bit for users of logical decoding */
+ ZHeapTupleData tuple;
..
+} ReorderBufferZTupleBuf;

Apart from this, we need to define different decode functions for
zheap operations as the WAL data is different for heap and zheap, so
same functions can't be used to decode.

I'm very strongly opposed to that. We shouldn't have expose every
possible storage method to output plugins, that'll make extensibility
a farce. I think we'll either have to re-form a HeapTuple or decide
to bite the bullet and start exposing tuples via slots.

This email is primarily to discuss about how the logical decoding for
basic DML operations (Insert/Update/Delete) will work in zheap. We
might need some special mechanism to deal with sub-transactions as
zheap doesn't generate a transaction id for sub-transactions, but we
can discuss that separately.

Subtransactions seems to be the hardest part besides the tuple format
issue, so I think we should discuss that very soon.

+/*
+ * Write zheap's INSERT to the output stream.
+ */
+void
+logicalrep_write_zinsert(StringInfo out, Relation rel, ZHeapTuple newtuple)
+{
+	pq_sendbyte(out, 'I');		/* action INSERT */
+
+	Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
+		   rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
+		   rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
+
+	/* use Oid as relation identifier */
+	pq_sendint32(out, RelationGetRelid(rel));
+
+	pq_sendbyte(out, 'N');		/* new tuple follows */
+	//logicalrep_write_tuple(out, rel, newtuple);
+}

Obviously we need to do better - I don't think we should have
tuple-specific replication messages.

/*
* Write relation description to the output stream.
*/
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 23466bade2..70fb5e2934 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -393,6 +393,19 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
change->data.tp.oldtuple = NULL;
}
break;
+		case REORDER_BUFFER_CHANGE_ZINSERT:

This really needs to be undistinguishable from normal CHANGE_INSERT...

Greetings,

Andres Freund

#4Alvaro Herrera
alvherre@2ndquadrant.com
In reply to: Andres Freund (#3)
Re: Logical decoding for operations on zheap tables

On 2019-Jan-03, Andres Freund wrote:

Apart from this, we need to define different decode functions for
zheap operations as the WAL data is different for heap and zheap, so
same functions can't be used to decode.

I'm very strongly opposed to that. We shouldn't have expose every
possible storage method to output plugins, that'll make extensibility
a farce. I think we'll either have to re-form a HeapTuple or decide
to bite the bullet and start exposing tuples via slots.

Hmm, without looking at the patches, I agree that the tuples should be
given as slots to the logical decoding interface. I wonder if we need a
further function in the TTS interface to help decoding, or is the
"getattr" stuff sufficient.

--
�lvaro Herrera https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

#5Andres Freund
andres@anarazel.de
In reply to: Alvaro Herrera (#4)
Re: Logical decoding for operations on zheap tables

Hi,

On 2019-01-03 15:13:42 -0300, Alvaro Herrera wrote:

On 2019-Jan-03, Andres Freund wrote:

Apart from this, we need to define different decode functions for
zheap operations as the WAL data is different for heap and zheap, so
same functions can't be used to decode.

I'm very strongly opposed to that. We shouldn't have expose every
possible storage method to output plugins, that'll make extensibility
a farce. I think we'll either have to re-form a HeapTuple or decide
to bite the bullet and start exposing tuples via slots.

Hmm, without looking at the patches, I agree that the tuples should be
given as slots to the logical decoding interface. I wonder if we need a
further function in the TTS interface to help decoding, or is the
"getattr" stuff sufficient.

What precisely do you mean with "getattr stuff"? I'd assume that you'd
normally do a slot_getallattrs() and then access tts_values/nulls
directly. I don't think there's anything missing in the slot interface
itself, but using slots probably would require some careful
considerations around memory management, possibly a decoding specific
slot implementation even.

Greetings,

Andres Freund

#6Alvaro Herrera
alvherre@2ndquadrant.com
In reply to: Andres Freund (#5)
Re: Logical decoding for operations on zheap tables

On 2019-Jan-03, Andres Freund wrote:

Hi,

On 2019-01-03 15:13:42 -0300, Alvaro Herrera wrote:

Hmm, without looking at the patches, I agree that the tuples should be
given as slots to the logical decoding interface. I wonder if we need a
further function in the TTS interface to help decoding, or is the
"getattr" stuff sufficient.

What precisely do you mean with "getattr stuff"? I'd assume that you'd
normally do a slot_getallattrs() and then access tts_values/nulls
directly.

Ah, yeah, you deform the tuple first and then access the arrays
directly, right. I was just agreeing with your point that forming a
heaptuple only to have logical decoding grab individual attrs from there
didn't sound terribly optimal.

I don't think there's anything missing in the slot interface itself,
but using slots probably would require some careful considerations
around memory management, possibly a decoding specific slot
implementation even.

A specific slot implementation sounds like more work than I was
envisioning. Can't we just "pin" a slot to a memory context or
something like that, to keep it alive until decoding is done with it?
It seems useful to avoid creating another copy of the tuple in memory
(which we would need if, if I understand you correctly, we need to form
the tuple under a different slot implementation from whatever the origin
is).

--
�lvaro Herrera https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

#7Amit Kapila
amit.kapila16@gmail.com
In reply to: Andres Freund (#3)
Re: Logical decoding for operations on zheap tables

On Thu, Jan 3, 2019 at 11:30 PM Andres Freund <andres@anarazel.de> wrote:

Hi,

On 2018-12-31 09:56:48 +0530, Amit Kapila wrote:

To support logical decoding for zheap operations, we need a way to
ensure zheap tuples can be registered as change streams. One idea
could be that we make ReorderBufferChange aware of another kind of
tuples as well, something like this:

@@ -100,6 +123,20 @@ typedef struct ReorderBufferChange
ReorderBufferTupleBuf *newtuple;
} tp;
+ struct
+ {
+ /* relation that has been changed */
+ RelFileNode relnode;
+
+ /* no previously reassembled toast chunks are necessary anymore */
+ bool clear_toast_afterwards;
+
+ /* valid for DELETE || UPDATE */
+ ReorderBufferZTupleBuf *oldtuple;
+ /* valid for INSERT || UPDATE */
+ ReorderBufferZTupleBuf *newtuple;
+ } ztp;
+
+/* an individual zheap tuple, stored in one chunk of memory */
+typedef struct ReorderBufferZTupleBuf
+{
..
+ /* tuple header, the interesting bit for users of logical decoding */
+ ZHeapTupleData tuple;
..
+} ReorderBufferZTupleBuf;

Apart from this, we need to define different decode functions for
zheap operations as the WAL data is different for heap and zheap, so
same functions can't be used to decode.

I'm very strongly opposed to that. We shouldn't have expose every
possible storage method to output plugins, that'll make extensibility
a farce. I think we'll either have to re-form a HeapTuple or decide
to bite the bullet and start exposing tuples via slots.

To be clear, you are against exposing different format of tuples to
plugins, not having different decoding routines for other storage
engines, because later part is unavoidable due to WAL format. Now,
about tuple format, I guess it would be a lot better if we expose via
slots, but won't that make existing plugins to change the way they
decode the tuple, maybe that is okay? OTOH, re-forming the heap tuple
has a cost which might be okay for the time being or first version,
but eventually, we want to avoid that. The other reason why I
refrained from tuple conversion was that I was not sure if we anywhere
rely on the transaction information in the tuple during decode
process, because that will be tricky to mimic, but I guess we don't
check that.

The only point for exposing a different tuple format via plugin was a
performance which I think can be addressed if we expose via slots. I
don't want to take up exposing slots instead of tuples for plugins as
part of this project and I think if we want to go with that, it is
better done as part of pluggable API?

This email is primarily to discuss about how the logical decoding for
basic DML operations (Insert/Update/Delete) will work in zheap. We
might need some special mechanism to deal with sub-transactions as
zheap doesn't generate a transaction id for sub-transactions, but we
can discuss that separately.

Subtransactions seems to be the hardest part besides the tuple format
issue, so I think we should discuss that very soon.

Agreed, I am going to look at that part next.

/*
* Write relation description to the output stream.
*/
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 23466bade2..70fb5e2934 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -393,6 +393,19 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
change->data.tp.oldtuple = NULL;
}
break;
+             case REORDER_BUFFER_CHANGE_ZINSERT:

This really needs to be undistinguishable from normal CHANGE_INSERT...

Sure, it will be if we decide to either re-form heap tuple or expose via slots.

--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com

#8Andres Freund
andres@anarazel.de
In reply to: Amit Kapila (#7)
Re: Logical decoding for operations on zheap tables

Hi,

On 2019-01-04 08:54:34 +0530, Amit Kapila wrote:

On Thu, Jan 3, 2019 at 11:30 PM Andres Freund <andres@anarazel.de> wrote:

On 2018-12-31 09:56:48 +0530, Amit Kapila wrote:

To support logical decoding for zheap operations, we need a way to
ensure zheap tuples can be registered as change streams. One idea
could be that we make ReorderBufferChange aware of another kind of
tuples as well, something like this:

@@ -100,6 +123,20 @@ typedef struct ReorderBufferChange
ReorderBufferTupleBuf *newtuple;
} tp;
+ struct
+ {
+ /* relation that has been changed */
+ RelFileNode relnode;
+
+ /* no previously reassembled toast chunks are necessary anymore */
+ bool clear_toast_afterwards;
+
+ /* valid for DELETE || UPDATE */
+ ReorderBufferZTupleBuf *oldtuple;
+ /* valid for INSERT || UPDATE */
+ ReorderBufferZTupleBuf *newtuple;
+ } ztp;
+
+/* an individual zheap tuple, stored in one chunk of memory */
+typedef struct ReorderBufferZTupleBuf
+{
..
+ /* tuple header, the interesting bit for users of logical decoding */
+ ZHeapTupleData tuple;
..
+} ReorderBufferZTupleBuf;

Apart from this, we need to define different decode functions for
zheap operations as the WAL data is different for heap and zheap, so
same functions can't be used to decode.

I'm very strongly opposed to that. We shouldn't have expose every
possible storage method to output plugins, that'll make extensibility
a farce. I think we'll either have to re-form a HeapTuple or decide
to bite the bullet and start exposing tuples via slots.

To be clear, you are against exposing different format of tuples to
plugins, not having different decoding routines for other storage
engines, because later part is unavoidable due to WAL format.

Correct.

Now,
about tuple format, I guess it would be a lot better if we expose via
slots, but won't that make existing plugins to change the way they
decode the tuple, maybe that is okay?

I think one-off API changes are ok. What I'm strictly against is
primarily that output plugins will have to deal with more and more
different tuple formats.

OTOH, re-forming the heap tuple
has a cost which might be okay for the time being or first version,
but eventually, we want to avoid that.

Right.

The other reason why I refrained from tuple conversion was that I
was not sure if we anywhere rely on the transaction information in
the tuple during decode process, because that will be tricky to
mimic, but I guess we don't check that.

Shouldn't be necessary - in fact, most of that information isn't in
the heap wal records in the first place.

The only point for exposing a different tuple format via plugin was a
performance which I think can be addressed if we expose via slots. I
don't want to take up exposing slots instead of tuples for plugins as
part of this project and I think if we want to go with that, it is
better done as part of pluggable API?

No, I don't think it makes sense to address this is as part of
pluggable storage. That patchset is already way too invasive and
large.

Greetings,

Andres Freund

#9Amit Kapila
amit.kapila16@gmail.com
In reply to: Andres Freund (#8)
1 attachment(s)
Re: Logical decoding for operations on zheap tables

On Fri, Jan 4, 2019 at 9:01 AM Andres Freund <andres@anarazel.de> wrote:

On 2019-01-04 08:54:34 +0530, Amit Kapila wrote:

The only point for exposing a different tuple format via plugin was a
performance which I think can be addressed if we expose via slots. I
don't want to take up exposing slots instead of tuples for plugins as
part of this project and I think if we want to go with that, it is
better done as part of pluggable API?

No, I don't think it makes sense to address this is as part of
pluggable storage. That patchset is already way too invasive and
large.

Fair enough. I think that for now (and maybe for the first version
that can be committed) we might want to use heap tuple format. There
will be some overhead but I think code-wise, things will be simpler.
I have prototyped it for Insert and Delete operations of zheap and the
only thing that is required are new decode functions, see the attached
patch. I have done very minimal testing of this patch as this is just
to show you and others the direction we are taking (w.r.t tuple
format) to support logical decoding in zheap.

Thanks for the feedback, further thoughts are welcome!

--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com

Attachments:

decode_zops_3.patchapplication/octet-stream; name=decode_zops_3.patchDownload
diff --git a/src/backend/access/zheap/zheapam.c b/src/backend/access/zheap/zheapam.c
index ee0b7f0c43..bf261c37d5 100644
--- a/src/backend/access/zheap/zheapam.c
+++ b/src/backend/access/zheap/zheapam.c
@@ -116,6 +116,8 @@ static void zheap_lock_tuple_guts(Relation rel, Buffer buf, ZHeapTuple zhtup,
 					  TransactionId single_locker_xid, int single_locker_trans_slot,
 					  UndoRecPtr prev_urecptr, CommandId cid,
 					  bool any_multi_locker_member_alive);
+static ZHeapTuple ZExtractReplicaIdentity(Relation relation, ZHeapTuple tp,
+						bool key_changed, bool *copy);
 static void compute_new_xid_infomask(ZHeapTuple zhtup, Buffer buf,
 						 TransactionId tup_xid, int tup_trans_slot,
 						 uint16 old_infomask, TransactionId add_to_xid,
@@ -1265,6 +1267,7 @@ zheap_delete(Relation relation, ItemPointer tid,
 	CommandId		tup_cid;
 	ItemId		lp;
 	ZHeapTupleData zheaptup;
+	ZHeapTuple	old_key_tuple = NULL;	/* replica identity of the tuple */
 	UnpackedUndoRecord	undorecord;
 	Page		page;
 	BlockNumber blkno;
@@ -1286,6 +1289,7 @@ zheap_delete(Relation relation, ItemPointer tid,
 	bool		lock_reacquired;
 	bool		hasSubXactLock = false;
 	bool		hasPayload = false;
+	bool		old_key_copied = false;
 	xl_undolog_meta undometa;
 	uint8		vm_status;
 
@@ -1958,6 +1962,13 @@ zheap_tuple_updated:
 	vm_status = visibilitymap_get_status(relation,
 								BufferGetBlockNumber(buffer), &vmbuffer);
 
+	/*
+	 * Compute replica identity tuple before entering the critical section so
+	 * we don't PANIC upon a memory allocation failure.
+	 */
+	old_key_tuple = ZExtractReplicaIdentity(relation, &zheaptup, true,
+											&old_key_copied);
+
 	START_CRIT_SECTION();
 
 	/*
@@ -2011,6 +2022,7 @@ zheap_tuple_updated:
 		XLogRecPtr	RedoRecPtr;
 		uint32		totalundotuplen = 0;
 		Size		dataoff;
+		int			bufflags = 0;
 		bool		doPageWrites;
 
 		/*
@@ -2031,6 +2043,15 @@ zheap_tuple_updated:
 			xlrec.flags |= XLZ_DELETE_IS_PARTITION_MOVE;
 		if (hasSubXactLock)
 			xlrec.flags |= XLZ_DELETE_CONTAINS_SUBXACT;
+		if (old_key_tuple != NULL)
+		{
+			bufflags |= REGBUF_KEEP_DATA;
+
+			if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+				xlrec.flags |= XLZ_DELETE_CONTAINS_OLD_TUPLE;
+			else
+				xlrec.flags |= XLZ_DELETE_CONTAINS_OLD_KEY;
+		}
 
 		/*
 		 * If full_page_writes is enabled, and the buffer image is not
@@ -2076,7 +2097,27 @@ prepare_xlog:
 							totalundotuplen - SizeofZHeapTupleHeader);
 		}
 
-		XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
+		XLogRegisterBuffer(0, buffer, REGBUF_STANDARD | bufflags);
+
+		/*
+		 * Log replica identity of the deleted tuple if there is one
+		 */
+		if (old_key_tuple != NULL)
+		{
+			xl_zheap_header xlzhdr;
+
+			xlzhdr.t_infomask2 = old_key_tuple->t_data->t_infomask2;
+			xlzhdr.t_infomask = old_key_tuple->t_data->t_infomask;
+			xlzhdr.t_hoff = old_key_tuple->t_data->t_hoff;
+
+			XLogRegisterBufData(0, (char *) &xlzhdr, SizeOfZHeapHeader);
+			XLogRegisterBufData(0,
+								(char *) old_key_tuple->t_data +
+								SizeofZHeapTupleHeader,
+								old_key_tuple->t_len -
+								SizeofZHeapTupleHeader);
+		}
+
 		if (trans_slot_id > ZHEAP_PAGE_TRANS_SLOTS)
 			(void) RegisterTPDBuffer(page, 1);
 		RegisterUndoLogBuffers(2);
@@ -2135,6 +2176,9 @@ prepare_xlog:
 	if (have_tuple_lock)
 		UnlockTupleTuplock(relation, &(zheaptup.t_self), LockTupleExclusive);
 
+	if (old_key_tuple != NULL && old_key_copied)
+		zheap_freetuple(old_key_tuple);
+
 	pgstat_count_heap_delete(relation);
 
 	return HeapTupleMayBeUpdated;
@@ -5886,6 +5930,102 @@ prepare_xlog:
 	UnlockReleaseTPDBuffers();
 }
 
+/*
+ * Build a zheap tuple representing the configured REPLICA IDENTITY to represent
+ * the old tuple in a UPDATE or DELETE.
+ *
+ * Returns NULL if there's no need to log an identity or if there's no suitable
+ * key in the Relation relation.
+ */
+static ZHeapTuple
+ZExtractReplicaIdentity(Relation relation, ZHeapTuple tp, bool key_changed,
+						bool *copy)
+{
+	TupleDesc	desc = RelationGetDescr(relation);
+	Oid			replidindex;
+	Relation	idx_rel;
+	char		replident = relation->rd_rel->relreplident;
+	ZHeapTuple	key_tuple = NULL;
+	bool		nulls[MaxHeapAttributeNumber];
+	Datum		values[MaxHeapAttributeNumber];
+	int			natt;
+
+	*copy = false;
+
+	if (!RelationIsLogicallyLogged(relation))
+		return NULL;
+
+	if (replident == REPLICA_IDENTITY_NOTHING)
+		return NULL;
+
+	if (replident == REPLICA_IDENTITY_FULL)
+	{
+		/*
+		 * When logging the entire old tuple, it very well could contain
+		 * toasted columns. If so, force them to be inlined.
+		 */
+		if (ZHeapTupleHasExternal(tp))
+		{
+			elog(ERROR, "toast tables are not supported with replica identity");
+		}
+		return tp;
+	}
+
+	/* if the key hasn't changed and we're only logging the key, we're done */
+	if (!key_changed)
+		return NULL;
+
+	/* find the replica identity index */
+	replidindex = RelationGetReplicaIndex(relation);
+	if (!OidIsValid(replidindex))
+	{
+		elog(DEBUG4, "could not find configured replica identity for table \"%s\"",
+			 RelationGetRelationName(relation));
+		return NULL;
+	}
+
+	idx_rel = RelationIdGetRelation(replidindex);
+
+	Assert(CheckRelationLockedByMe(idx_rel, AccessShareLock, true));
+
+	/* deform tuple, so we have fast access to columns */
+	zheap_deform_tuple(tp, desc, values, nulls);
+
+	/* set all columns to NULL, regardless of whether they actually are */
+	memset(nulls, 1, sizeof(nulls));
+
+	/*
+	 * Now set all columns contained in the index to NOT NULL, they cannot
+	 * currently be NULL.
+	 */
+	for (natt = 0; natt < IndexRelationGetNumberOfKeyAttributes(idx_rel); natt++)
+	{
+		int			attno = idx_rel->rd_index->indkey.values[natt];
+
+		if (attno < 0)
+			elog(ERROR, "system column in index");
+		nulls[attno - 1] = false;
+	}
+
+	key_tuple = zheap_form_tuple(desc, values, nulls);
+	*copy = true;
+	RelationClose(idx_rel);
+
+	/*
+	 * If the tuple, which by here only contains indexed columns, still has
+	 * toasted columns, force them to be inlined. This is somewhat unlikely
+	 * since there's limits on the size of indexed columns, so we don't
+	 * duplicate toast_flatten_tuple()s functionality in the above loop over
+	 * the indexed columns, even if it would be more efficient.
+	 */
+	if (ZHeapTupleHasExternal(key_tuple))
+	{
+		elog(ERROR, "toast tables are not supported with replica identity");
+	}
+
+	return key_tuple;
+}
+
 /*
  * compute_new_xid_infomask - Given the old values of tuple header's infomask,
  * compute the new values for tuple header which includes lock mode, new
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index bafbbed50e..54800defc4 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -34,6 +34,10 @@
 #include "access/xlogutils.h"
 #include "access/xlogreader.h"
 #include "access/xlogrecord.h"
+#include "access/zheap.h"
+#include "access/zheapam_xlog.h"
+#include "access/zheaputils.h"
+#include "access/zhtup.h"
 
 #include "catalog/pg_control.h"
 
@@ -45,6 +49,8 @@
 #include "replication/snapbuild.h"
 
 #include "storage/standby.h"
+#include "utils/rel.h"
+#include "utils/relfilenodemap.h"
 
 typedef struct XLogRecordBuffer
 {
@@ -57,6 +63,7 @@ typedef struct XLogRecordBuffer
 static void DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeZHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -74,6 +81,11 @@ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 			xl_xact_parsed_abort *parsed, TransactionId xid);
 
+/* record handlers for zheap */
+static void DecodeZInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeZDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static ZHeapTuple DecodeXLogZTuple(char *data, Size len);
+
 /* common function to decode tuples */
 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
 
@@ -161,7 +173,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 			break;
 		case RM_ZHEAP_ID:
 			/* Logical decoding is not yet implemented for zheap. */
-			Assert(0);
+			DecodeZHeapOp(ctx, &buf);
 			break;
 		case RM_ZHEAP2_ID:
 			/* Logical decoding is not yet implemented for zheap. */
@@ -510,6 +522,48 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	}
 }
 
+/*
+ * Handle rmgr ZHEAP_ID records for DecodeRecordIntoReorderBuffer().
+ */
+static void
+DecodeZHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	uint8		info = XLogRecGetInfo(buf->record) & XLOG_ZHEAP_OPMASK;
+	TransactionId xid = XLogRecGetXid(buf->record);
+	SnapBuild  *builder = ctx->snapshot_builder;
+
+	ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
+
+	/*
+	 * If we don't have snapshot or we are just fast-forwarding, there is no
+	 * point in decoding data changes.
+	 */
+	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+		ctx->fast_forward)
+		return;
+
+	switch (info)
+	{
+		case XLOG_ZHEAP_INSERT:
+			if (SnapBuildProcessChange(builder, xid, buf->origptr))
+				DecodeZInsert(ctx, buf);
+			break;
+
+		case XLOG_ZHEAP_DELETE:
+			if (SnapBuildProcessChange(builder, xid, buf->origptr))
+				DecodeZDelete(ctx, buf);
+			break;
+
+		case XLOG_ZHEAP_LOCK:
+			/* we don't care about row level locks for now */
+			break;
+
+		default:
+			elog(ERROR, "unexpected RM_ZHEAP_ID record type: %u", info);
+			break;
+	}
+}
+
 static inline bool
 FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
 {
@@ -1068,3 +1122,212 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
 	header->t_infomask2 = xlhdr.t_infomask2;
 	header->t_hoff = xlhdr.t_hoff;
 }
+
+/*
+ * Parse XLOG_ZHEAP_INSERT (not ZMULTI_INSERT!) records into tuplebufs.
+ *
+ * Here we retrieve zheap tuple, convert it to heap tuple format so
+ * reorder buffer stream can understand the tuple format.
+ */
+static void
+DecodeZInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	Size		datalen;
+	char	   *tupledata;
+	XLogReaderState *r = buf->record;
+	xl_zheap_insert *xlrec;
+	ReorderBufferChange *change;
+	RelFileNode target_node;
+	Relation	relation = NULL;
+	Oid			reloid;
+	ZHeapTuple	zhtup;
+	HeapTuple	htup;
+
+	xlrec = (xl_zheap_insert *) ((char *) XLogRecGetData(r) + SizeOfUndoHeader);
+
+	/*
+	 * Ignore insert records without new tuples (this does happen when
+	 * raw_zheap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL).
+	 */
+	if (!(xlrec->flags & XLZ_INSERT_CONTAINS_NEW_TUPLE))
+		return;
+
+	/* only interested in our database */
+	XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
+	if (target_node.dbNode != ctx->slot->data.database)
+		return;
+
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+		return;
+
+	change = ReorderBufferGetChange(ctx->reorder);
+	if (!(xlrec->flags & XLZ_INSERT_IS_SPECULATIVE))
+		change->action = REORDER_BUFFER_CHANGE_INSERT;
+	else
+		change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT;
+	change->origin_id = XLogRecGetOrigin(r);
+
+	memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
+
+	tupledata = XLogRecGetBlockData(r, 0, &datalen);
+
+	/*
+	 * Get the zheap tuple from WAL, convert it to heap tuple and store the
+	 * same as change stream.
+	 */
+	zhtup = DecodeXLogZTuple(tupledata, datalen);
+	reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
+								change->data.tp.relnode.relNode);
+	relation = RelationIdGetRelation(reloid);
+	htup = zheap_to_heap(zhtup, RelationGetDescr(relation));
+
+	change->data.tp.newtuple =
+		ReorderBufferGetTupleBuf(ctx->reorder,
+								 htup->t_len - SizeofHeapTupleHeader);
+	change->data.tp.newtuple->tuple.t_len = htup->t_len;
+	change->data.tp.newtuple->tuple.t_self = htup->t_self;
+	change->data.tp.newtuple->tuple.t_tableOid = htup->t_tableOid;
+	memcpy((char *) change->data.tp.newtuple->tuple.t_data,
+		   (char *) htup->t_data,
+		   htup->t_len);
+
+	/* be tidy */
+	pfree(zhtup);
+	pfree(htup);
+
+	if (relation != NULL)
+	{
+		RelationClose(relation);
+		relation = NULL;
+	}
+
+	change->data.tp.clear_toast_afterwards = true;
+
+	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
+}
+
+/*
+ * Parse XLOG_ZHEAP_DELETE from wal into proper tuplebufs.
+ *
+ * Deletes can possibly contain the old primary key.
+ */
+static void
+DecodeZDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	XLogReaderState *r = buf->record;
+	xl_zheap_delete *xlrec;
+	ReorderBufferChange *change;
+	RelFileNode target_node;
+
+	xlrec = (xl_zheap_delete *) ((char *) XLogRecGetData(r) + SizeOfUndoHeader);
+
+	/* only interested in our database */
+	XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
+	if (target_node.dbNode != ctx->slot->data.database)
+		return;
+
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+		return;
+
+	change = ReorderBufferGetChange(ctx->reorder);
+	change->action = REORDER_BUFFER_CHANGE_DELETE;
+	change->origin_id = XLogRecGetOrigin(r);
+
+	memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
+
+	/* old primary key stored */
+	if (xlrec->flags & XLZ_DELETE_CONTAINS_OLD)
+	{
+		Relation	relation = NULL;
+		ZHeapTuple	zhtup;
+		HeapTuple	htup;
+		char	*tupledata;
+		Oid		reloid;
+		Size	datalen;
+
+		tupledata = XLogRecGetBlockData(r, 0, &datalen);
+
+		/*
+		 * Get the zheap tuple from WAL, convert it to heap tuple and store the
+		 * same as change stream.
+		 */
+		zhtup = DecodeXLogZTuple(tupledata, datalen);
+		reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
+									change->data.tp.relnode.relNode);
+		relation = RelationIdGetRelation(reloid);
+		htup = zheap_to_heap(zhtup, RelationGetDescr(relation));
+
+		change->data.tp.oldtuple =
+			ReorderBufferGetTupleBuf(ctx->reorder,
+				htup->t_len - SizeofHeapTupleHeader);
+		change->data.tp.oldtuple->tuple.t_len = htup->t_len;
+		change->data.tp.oldtuple->tuple.t_self = htup->t_self;
+		change->data.tp.oldtuple->tuple.t_tableOid = htup->t_tableOid;
+		memcpy((char *) change->data.tp.oldtuple->tuple.t_data,
+			   (char *) htup->t_data,
+			   htup->t_len);
+
+		/* be tidy */
+		pfree(zhtup);
+		pfree(htup);
+
+		if (relation != NULL)
+		{
+			RelationClose(relation);
+			relation = NULL;
+		}
+	}
+
+	change->data.tp.clear_toast_afterwards = true;
+
+	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
+}
+
+/*
+ * Read a ZHeapTuple as WAL logged by zheap_insert, zheap_update and
+ * zheap_delete (but not by zheap_multi_insert) and for in-memory tuple.
+ *
+ * The size 'len' and the pointer 'data' in the record need to be
+ * computed outside as they are record specific.
+ *
+ * The caller is responsible to free the memory for tuple allocated by
+ * this function.
+ */
+static ZHeapTuple
+DecodeXLogZTuple(char *data, Size len)
+{
+	ZHeapTuple	zhtup;
+	xl_zheap_header xlhdr;
+	int			datalen = len - SizeOfZHeapHeader;
+	int			tuplelen = datalen + SizeofZHeapTupleHeader;
+	ZHeapTupleHeader header;
+
+	Assert(datalen >= 0);
+
+	zhtup = palloc(tuplelen + ZHEAPTUPLESIZE);
+	header = zhtup->t_data = (ZHeapTupleHeader)((char *) zhtup + ZHEAPTUPLESIZE);
+
+	zhtup->t_len = tuplelen;
+	/* not a disk based tuple */
+	ItemPointerSetInvalid(&zhtup->t_self);
+
+	/* we can only figure this out after reassembling the transactions */
+	zhtup->t_tableOid = InvalidOid;
+
+	/* data is not stored aligned, copy to aligned storage */
+	memcpy((char *) &xlhdr, data, SizeOfZHeapHeader);
+
+	memset(header, 0, SizeofZHeapTupleHeader);
+
+	memcpy(((char *) zhtup->t_data) + SizeofZHeapTupleHeader,
+		   data + SizeOfZHeapHeader,
+		   datalen);
+
+	header->t_infomask = xlhdr.t_infomask;
+	header->t_infomask2 = xlhdr.t_infomask2;
+	header->t_hoff = xlhdr.t_hoff;
+
+	return zhtup;
+}
diff --git a/src/include/access/zheapam_xlog.h b/src/include/access/zheapam_xlog.h
index 6d031dcaa4..1654718263 100644
--- a/src/include/access/zheapam_xlog.h
+++ b/src/include/access/zheapam_xlog.h
@@ -124,6 +124,12 @@ typedef struct xl_zheap_insert
 #define XLZ_DELETE_CONTAINS_TPD_SLOT			(1<<2)
 #define XLZ_DELETE_CONTAINS_SUBXACT				(1<<3)
 #define XLZ_DELETE_IS_PARTITION_MOVE			(1<<4)
+#define XLZ_DELETE_CONTAINS_OLD_TUPLE			(1<<5)
+#define XLZ_DELETE_CONTAINS_OLD_KEY				(1<<6)
+
+/* convenience macro for checking whether any form of old tuple was logged */
+#define XLZ_DELETE_CONTAINS_OLD						\
+	(XLZ_DELETE_CONTAINS_OLD_TUPLE | XLZ_DELETE_CONTAINS_OLD_KEY)
 
 /* This is what we need to know about delete */
 typedef struct xl_zheap_delete
#10Dilip Kumar
dilipbalaut@gmail.com
In reply to: Amit Kapila (#9)
1 attachment(s)
Re: Logical decoding for operations on zheap tables

On Sat, Jan 12, 2019 at 5:02 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

Fair enough. I think that for now (and maybe for the first version
that can be committed) we might want to use heap tuple format. There
will be some overhead but I think code-wise, things will be simpler.
I have prototyped it for Insert and Delete operations of zheap and the
only thing that is required are new decode functions, see the attached
patch. I have done very minimal testing of this patch as this is just
to show you and others the direction we are taking (w.r.t tuple
format) to support logical decoding in zheap.

+ */
+ zhtup = DecodeXLogZTuple(tupledata, datalen);
+ reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
+ change->data.tp.relnode.relNode);
+ relation = RelationIdGetRelation(reloid);

We need to start a transaction for fetching the relation if it's a
walsender process. I have fixed this issue in the patch and also
implemented decode functions for zheap update and multi-insert.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

Attachments:

decode_zops_v4.patchapplication/octet-stream; name=decode_zops_v4.patchDownload
diff --git a/src/backend/access/zheap/zheapam.c b/src/backend/access/zheap/zheapam.c
index 14bcb34..c5870d8 100644
--- a/src/backend/access/zheap/zheapam.c
+++ b/src/backend/access/zheap/zheapam.c
@@ -56,6 +56,7 @@
 #include "access/zheapam_xlog.h"
 #include "access/zheap.h"
 #include "access/zheapscan.h"
+#include "access/zheaputils.h"
 #include "access/zmultilocker.h"
 #include "catalog/catalog.h"
 #include "executor/tuptable.h"
@@ -88,10 +89,10 @@ static void log_zheap_update(Relation reln, UnpackedUndoRecord undorecord,
 					UnpackedUndoRecord newundorecord, UndoRecPtr urecptr,
 					UndoRecPtr newurecptr, Buffer oldbuf, Buffer newbuf,
 					ZHeapTuple oldtup, ZHeapTuple newtup,
-					int old_tup_trans_slot_id, int trans_slot_id,
-					int new_trans_slot_id, bool inplace_update,
-					bool all_visible_cleared, bool new_all_visible_cleared,
-					xl_undolog_meta *undometa);
+					ZHeapTuple old_key_tuple, int old_tup_trans_slot_id,
+					int trans_slot_id, int new_trans_slot_id,
+					bool inplace_update, bool all_visible_cleared,
+					bool new_all_visible_cleared, xl_undolog_meta *undometa);
 static HTSU_Result
 zheap_lock_updated_tuple(Relation rel, ZHeapTuple tuple, ItemPointer ctid,
 						 TransactionId xid, LockTupleMode mode, LockOper lockopr,
@@ -103,6 +104,8 @@ static void zheap_lock_tuple_guts(Relation rel, Buffer buf, ZHeapTuple zhtup,
 					  TransactionId single_locker_xid, int single_locker_trans_slot,
 					  UndoRecPtr prev_urecptr, CommandId cid,
 					  bool any_multi_locker_member_alive);
+static ZHeapTuple ZExtractReplicaIdentity(Relation relation, ZHeapTuple tp,
+						bool key_changed, bool *copy);
 static void compute_new_xid_infomask(ZHeapTuple zhtup, Buffer buf,
 						 TransactionId tup_xid, int tup_trans_slot,
 						 uint16 old_infomask, TransactionId add_to_xid,
@@ -1253,6 +1256,7 @@ zheap_delete(Relation relation, ItemPointer tid,
 	CommandId		tup_cid;
 	ItemId		lp;
 	ZHeapTupleData zheaptup;
+	ZHeapTuple	old_key_tuple = NULL;	/* replica identity of the tuple */
 	UnpackedUndoRecord	undorecord;
 	Page		page;
 	BlockNumber blkno;
@@ -1274,6 +1278,7 @@ zheap_delete(Relation relation, ItemPointer tid,
 	bool		lock_reacquired;
 	bool		hasSubXactLock = false;
 	bool		hasPayload = false;
+	bool		old_key_copied = false;
 	xl_undolog_meta undometa;
 	uint8		vm_status;
 
@@ -1956,6 +1961,13 @@ zheap_tuple_updated:
 	vm_status = visibilitymap_get_status(relation,
 								BufferGetBlockNumber(buffer), &vmbuffer);
 
+	/*
+	 * Compute replica identity tuple before entering the critical section so
+	 * we don't PANIC upon a memory allocation failure.
+	 */
+	old_key_tuple = ZExtractReplicaIdentity(relation, &zheaptup, true,
+											&old_key_copied);
+
 	START_CRIT_SECTION();
 
 	if ((vm_status & VISIBILITYMAP_ALL_VISIBLE) ||
@@ -2000,6 +2012,7 @@ zheap_tuple_updated:
 		XLogRecPtr	RedoRecPtr;
 		uint32		totalundotuplen = 0;
 		Size		dataoff;
+		int			bufflags = 0;
 		bool		doPageWrites;
 
 		/*
@@ -2020,6 +2033,15 @@ zheap_tuple_updated:
 			xlrec.flags |= XLZ_DELETE_IS_PARTITION_MOVE;
 		if (hasSubXactLock)
 			xlrec.flags |= XLZ_DELETE_CONTAINS_SUBXACT;
+		if (old_key_tuple != NULL)
+		{
+			bufflags |= REGBUF_KEEP_DATA;
+
+			if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+				xlrec.flags |= XLZ_DELETE_CONTAINS_OLD_TUPLE;
+			else
+				xlrec.flags |= XLZ_DELETE_CONTAINS_OLD_KEY;
+		}
 
 		/*
 		 * If full_page_writes is enabled, and the buffer image is not
@@ -2065,7 +2087,27 @@ prepare_xlog:
 							totalundotuplen - SizeofZHeapTupleHeader);
 		}
 
-		XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
+		XLogRegisterBuffer(0, buffer, REGBUF_STANDARD | bufflags);
+
+		/*
+		 * Log replica identity of the deleted tuple if there is one
+		 */
+		if (old_key_tuple != NULL)
+		{
+			xl_zheap_header xlzhdr;
+
+			xlzhdr.t_infomask2 = old_key_tuple->t_data->t_infomask2;
+			xlzhdr.t_infomask = old_key_tuple->t_data->t_infomask;
+			xlzhdr.t_hoff = old_key_tuple->t_data->t_hoff;
+
+			XLogRegisterBufData(0, (char *) &xlzhdr, SizeOfZHeapHeader);
+			XLogRegisterBufData(0,
+								(char *) old_key_tuple->t_data +
+								SizeofZHeapTupleHeader,
+								old_key_tuple->t_len -
+								SizeofZHeapTupleHeader);
+		}
+
 		if (trans_slot_id > ZHEAP_PAGE_TRANS_SLOTS)
 			(void) RegisterTPDBuffer(page, 1);
 		RegisterUndoLogBuffers(2);
@@ -2124,6 +2166,9 @@ prepare_xlog:
 	if (have_tuple_lock)
 		UnlockTupleTuplock(relation, &(zheaptup.t_self), LockTupleExclusive);
 
+	if (old_key_tuple != NULL && old_key_copied)
+		zheap_freetuple(old_key_tuple);
+
 	pgstat_count_heap_delete(relation);
 
 	return HeapTupleMayBeUpdated;
@@ -2162,6 +2207,7 @@ zheap_update(Relation relation, ItemPointer otid, ZHeapTuple newtup,
 	ItemId		lp;
 	ZHeapTupleData oldtup;
 	ZHeapTuple	zheaptup;
+	ZHeapTuple	old_key_tuple = NULL;	/* replica identity of the tuple */
 	UndoRecPtr	urecptr, prev_urecptr, new_prev_urecptr;
 	UndoRecPtr	new_urecptr = InvalidUndoRecPtr;
 	UnpackedUndoRecord	undorecord, new_undorecord;
@@ -2199,6 +2245,7 @@ zheap_update(Relation relation, ItemPointer otid, ZHeapTuple newtup,
 	bool		lock_reacquired;
 	bool		need_toast;
 	bool		hasSubXactLock = false;
+	bool		old_key_copied = false;
 	xl_undolog_meta	undometa;
 	uint8		vm_status;
 	uint8		vm_status_new = 0;
@@ -3650,6 +3697,15 @@ reacquire_buffer:
 	 */
 	XLogEnsureRecordSpace(8, 0);
 
+	/*
+	 * Compute replica identity tuple before entering the critical section so
+	 * we don't PANIC upon a memory allocation failure.
+	 */
+	old_key_tuple = ZExtractReplicaIdentity(relation, &oldtup, true,
+											&old_key_copied);
+	if (old_key_tuple != NULL && !old_key_copied)
+		old_key_tuple = zheap_copytuple(old_key_tuple);
+
 	START_CRIT_SECTION();
 
 	if ((vm_status & VISIBILITYMAP_ALL_VISIBLE) ||
@@ -3806,15 +3862,18 @@ reacquire_buffer:
 
 		log_zheap_update(relation, undorecord, new_undorecord,
 						 urecptr, new_urecptr, buffer, newbuf,
-						 &oldtup, zheaptup, tup_trans_slot_id,
-						 trans_slot_id, new_trans_slot_id,
-						 use_inplace_update, all_visible_cleared,
-						 new_all_visible_cleared, &undometa);
+						 &oldtup, zheaptup, old_key_tuple,
+						 tup_trans_slot_id, trans_slot_id,
+						 new_trans_slot_id, use_inplace_update,
+						 all_visible_cleared, new_all_visible_cleared,
+						 &undometa);
 	}
 
 	END_CRIT_SECTION();
 
 	/* be tidy */
+	if (old_key_tuple != NULL)
+		zheap_freetuple(old_key_tuple);
 	pfree(undorecord.uur_tuple.data);
 	if (undorecord.uur_payload.len > 0)
 		pfree(undorecord.uur_payload.data);
@@ -3897,10 +3956,10 @@ log_zheap_update(Relation reln, UnpackedUndoRecord undorecord,
 				 UnpackedUndoRecord newundorecord, UndoRecPtr urecptr,
 				 UndoRecPtr newurecptr, Buffer oldbuf, Buffer newbuf,
 				 ZHeapTuple oldtup, ZHeapTuple newtup,
-				 int old_tup_trans_slot_id, int trans_slot_id,
-				 int new_trans_slot_id, bool inplace_update,
-				 bool all_visible_cleared, bool new_all_visible_cleared,
-				 xl_undolog_meta *undometa)
+				 ZHeapTuple old_key_tuple, int old_tup_trans_slot_id,
+				 int trans_slot_id, int new_trans_slot_id,
+				 bool inplace_update, bool all_visible_cleared,
+				 bool new_all_visible_cleared, xl_undolog_meta *undometa)
 {
 	xl_undo_header	xlundohdr,
 					xlnewundohdr;
@@ -3915,6 +3974,7 @@ log_zheap_update(Relation reln, UnpackedUndoRecord undorecord,
 	XLogRecPtr	recptr;
 	XLogRecPtr	RedoRecPtr;
 	bool		doPageWrites;
+	bool		need_tuple_data = RelationIsLogicallyLogged(reln);
 	char	*oldp = NULL;
 	char	*newp = NULL;
 	int		oldlen, newlen;
@@ -3958,7 +4018,8 @@ log_zheap_update(Relation reln, UnpackedUndoRecord undorecord,
 	 * See log_heap_update to know under what some circumstances we can use
 	 * prefix-suffix compression.
 	 */
-	if (oldbuf == newbuf && !XLogCheckBufferNeedsBackup(newbuf))
+	if (oldbuf == newbuf && !need_tuple_data &&
+		!XLogCheckBufferNeedsBackup(newbuf))
 	{
 		Assert(oldp != NULL && newp != NULL);
 
@@ -4010,6 +4071,17 @@ log_zheap_update(Relation reln, UnpackedUndoRecord undorecord,
 		xlrec.flags |= XLZ_UPDATE_SUFFIX_FROM_OLD;
 	if (undorecord.uur_info & UREC_INFO_PAYLOAD_CONTAINS_SUBXACT)
 		xlrec.flags |= XLZ_UPDATE_CONTAINS_SUBXACT;
+	if (need_tuple_data)
+	{
+		xlrec.flags |= XLZ_UPDATE_CONTAINS_NEW_TUPLE;
+		if (old_key_tuple)
+		{
+			if (reln->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+				xlrec.flags |= XLZ_UPDATE_CONTAINS_OLD_TUPLE;
+			else
+				xlrec.flags |= XLZ_UPDATE_CONTAINS_OLD_KEY;
+		}
+	}
 
 	if (!inplace_update)
 	{
@@ -4079,6 +4151,9 @@ prepare_xlog:
 						 totalundotuplen - SizeofZHeapTupleHeader);
 	}
 
+	if (need_tuple_data)
+		bufflags |= REGBUF_KEEP_DATA;
+
 	XLogRegisterBuffer(0, newbuf, bufflags);
 	if (oldbuf != newbuf)
 	{
@@ -4165,6 +4240,23 @@ prepare_xlog:
 	/* filtering by origin on a row level is much more efficient */
 	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
 
+	/* We need to log a tuple identity */
+	if (need_tuple_data && old_key_tuple &&
+		!(xlrec.flags & XLZ_HAS_UPDATE_UNDOTUPLE))
+	{
+		xl_zheap_header xlzhdr;
+
+		xlzhdr.t_infomask2 = old_key_tuple->t_data->t_infomask2;
+		xlzhdr.t_infomask = old_key_tuple->t_data->t_infomask;
+		xlzhdr.t_hoff = old_key_tuple->t_data->t_hoff;
+
+		XLogRegisterData((char *) &xlzhdr, SizeOfZHeapHeader);
+		XLogRegisterData((char *) old_key_tuple->t_data +
+						SizeofZHeapTupleHeader,
+						old_key_tuple->t_len -
+						SizeofZHeapTupleHeader);
+	}
+
 	recptr = XLogInsertExtended(RM_ZHEAP_ID, info, RedoRecPtr, doPageWrites);
 	if (recptr == InvalidXLogRecPtr)
 	{
@@ -5941,6 +6033,102 @@ prepare_xlog:
 }
 
 /*
+ * Build a zheap tuple representing the configured REPLICA IDENTITY to represent
+ * the old tuple in a UPDATE or DELETE.
+ *
+ * Returns NULL if there's no need to log an identity or if there's no suitable
+ * key in the Relation relation.
+ */
+static ZHeapTuple
+ZExtractReplicaIdentity(Relation relation, ZHeapTuple tp, bool key_changed,
+						bool *copy)
+{
+	TupleDesc	desc = RelationGetDescr(relation);
+	Oid			replidindex;
+	Relation	idx_rel;
+	char		replident = relation->rd_rel->relreplident;
+	ZHeapTuple	key_tuple = NULL;
+	bool		nulls[MaxHeapAttributeNumber];
+	Datum		values[MaxHeapAttributeNumber];
+	int			natt;
+
+	*copy = false;
+
+	if (!RelationIsLogicallyLogged(relation))
+		return NULL;
+
+	if (replident == REPLICA_IDENTITY_NOTHING)
+		return NULL;
+
+	if (replident == REPLICA_IDENTITY_FULL)
+	{
+		/*
+		 * When logging the entire old tuple, it very well could contain
+		 * toasted columns. If so, force them to be inlined.
+		 */
+		if (ZHeapTupleHasExternal(tp))
+		{
+			elog(ERROR, "toast tables are not supported with replica identity");
+		}
+		return tp;
+	}
+
+	/* if the key hasn't changed and we're only logging the key, we're done */
+	if (!key_changed)
+		return NULL;
+
+	/* find the replica identity index */
+	replidindex = RelationGetReplicaIndex(relation);
+	if (!OidIsValid(replidindex))
+	{
+		elog(DEBUG4, "could not find configured replica identity for table \"%s\"",
+			 RelationGetRelationName(relation));
+		return NULL;
+	}
+
+	idx_rel = RelationIdGetRelation(replidindex);
+
+	Assert(CheckRelationLockedByMe(idx_rel, AccessShareLock, true));
+
+	/* deform tuple, so we have fast access to columns */
+	zheap_deform_tuple(tp, desc, values, nulls);
+
+	/* set all columns to NULL, regardless of whether they actually are */
+	memset(nulls, 1, sizeof(nulls));
+
+	/*
+	 * Now set all columns contained in the index to NOT NULL, they cannot
+	 * currently be NULL.
+	 */
+	for (natt = 0; natt < IndexRelationGetNumberOfKeyAttributes(idx_rel); natt++)
+	{
+		int			attno = idx_rel->rd_index->indkey.values[natt];
+
+		if (attno < 0)
+			elog(ERROR, "system column in index");
+		nulls[attno - 1] = false;
+	}
+
+	key_tuple = zheap_form_tuple(desc, values, nulls);
+	*copy = true;
+	RelationClose(idx_rel);
+
+	/*
+	 * If the tuple, which by here only contains indexed columns, still has
+	 * toasted columns, force them to be inlined. This is somewhat unlikely
+	 * since there's limits on the size of indexed columns, so we don't
+	 * duplicate toast_flatten_tuple()s functionality in the above loop over
+	 * the indexed columns, even if it would be more efficient.
+	 */
+	if (ZHeapTupleHasExternal(key_tuple))
+	{
+		elog(ERROR, "toast tables are not supported with replica identity");
+	}
+
+	return key_tuple;
+}
+
+/*
  * compute_new_xid_infomask - Given the old values of tuple header's infomask,
  * compute the new values for tuple header which includes lock mode, new
  * infomask and transaction slot.
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index bafbbed..fc01f2b 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -34,6 +34,10 @@
 #include "access/xlogutils.h"
 #include "access/xlogreader.h"
 #include "access/xlogrecord.h"
+#include "access/zheap.h"
+#include "access/zheapam_xlog.h"
+#include "access/zheaputils.h"
+#include "access/zhtup.h"
 
 #include "catalog/pg_control.h"
 
@@ -45,6 +49,8 @@
 #include "replication/snapbuild.h"
 
 #include "storage/standby.h"
+#include "utils/rel.h"
+#include "utils/relfilenodemap.h"
 
 typedef struct XLogRecordBuffer
 {
@@ -57,6 +63,7 @@ typedef struct XLogRecordBuffer
 static void DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeZHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -74,6 +81,13 @@ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 			xl_xact_parsed_abort *parsed, TransactionId xid);
 
+/* record handlers for zheap */
+static void DecodeZInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeZDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeZUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeZMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static ZHeapTuple DecodeXLogZTuple(char *data, Size len);
+
 /* common function to decode tuples */
 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
 
@@ -161,7 +175,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 			break;
 		case RM_ZHEAP_ID:
 			/* Logical decoding is not yet implemented for zheap. */
-			Assert(0);
+			DecodeZHeapOp(ctx, &buf);
 			break;
 		case RM_ZHEAP2_ID:
 			/* Logical decoding is not yet implemented for zheap. */
@@ -510,6 +524,73 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	}
 }
 
+/*
+ * Handle rmgr ZHEAP_ID records for DecodeRecordIntoReorderBuffer().
+ */
+static void
+DecodeZHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	uint8		info = XLogRecGetInfo(buf->record) & XLOG_ZHEAP_OPMASK;
+	TransactionId xid = XLogRecGetXid(buf->record);
+	SnapBuild  *builder = ctx->snapshot_builder;
+	bool		started_tx = false;
+
+	ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
+
+	/*
+	 * If we don't have snapshot or we are just fast-forwarding, there is no
+	 * point in decoding data changes.
+	 */
+	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+		ctx->fast_forward)
+		return;
+
+	/* This function might be called inside or outside of transaction. */
+	if (!IsTransactionState())
+	{
+		StartTransactionCommand();
+		started_tx = true;
+	}
+	switch (info)
+	{
+		case XLOG_ZHEAP_INSERT:
+			if (SnapBuildProcessChange(builder, xid, buf->origptr))
+				DecodeZInsert(ctx, buf);
+			break;
+
+		case XLOG_ZHEAP_DELETE:
+			if (SnapBuildProcessChange(builder, xid, buf->origptr))
+				DecodeZDelete(ctx, buf);
+			break;
+		case XLOG_ZHEAP_UPDATE:
+			if (SnapBuildProcessChange(builder, xid, buf->origptr))
+				DecodeZUpdate(ctx, buf);
+			break;
+		case XLOG_ZHEAP_MULTI_INSERT:
+			if (SnapBuildProcessChange(builder, xid, buf->origptr))
+				DecodeZMultiInsert(ctx, buf);
+			break;
+		case XLOG_ZHEAP_LOCK:
+			/* we don't care about row level locks for now */
+			break;
+		/*
+		 * Everything else here is just low level physical stuff we're not
+		 * interested in.
+		 */
+		case XLOG_ZHEAP_FREEZE_XACT_SLOT:
+		case XLOG_ZHEAP_INVALID_XACT_SLOT:
+			break;
+
+		default:
+			elog(ERROR, "unexpected RM_ZHEAP_ID record type: %u", info);
+			break;
+	}
+
+	/* Commit the transaction we have started one in this function. */
+	if (started_tx)
+		CommitTransactionCommand();
+}
+
 static inline bool
 FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
 {
@@ -1068,3 +1149,465 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
 	header->t_infomask2 = xlhdr.t_infomask2;
 	header->t_hoff = xlhdr.t_hoff;
 }
+
+/*
+ * Parse XLOG_ZHEAP_INSERT (not ZMULTI_INSERT!) records into tuplebufs.
+ *
+ * Here we retrieve zheap tuple, convert it to heap tuple format so
+ * reorder buffer stream can understand the tuple format.
+ */
+static void
+DecodeZInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	Size		datalen;
+	char	   *tupledata;
+	XLogReaderState *r = buf->record;
+	xl_zheap_insert *xlrec;
+	ReorderBufferChange *change;
+	RelFileNode target_node;
+	Relation	relation = NULL;
+	Oid			reloid;
+	ZHeapTuple	zhtup;
+	HeapTuple	htup;
+
+	xlrec = (xl_zheap_insert *) ((char *) XLogRecGetData(r) + SizeOfUndoHeader);
+
+	/*
+	 * Ignore insert records without new tuples (this does happen when
+	 * raw_zheap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL).
+	 */
+	if (!(xlrec->flags & XLZ_INSERT_CONTAINS_NEW_TUPLE))
+		return;
+
+	/* only interested in our database */
+	XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
+	if (target_node.dbNode != ctx->slot->data.database)
+		return;
+
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+		return;
+
+	change = ReorderBufferGetChange(ctx->reorder);
+	if (!(xlrec->flags & XLZ_INSERT_IS_SPECULATIVE))
+		change->action = REORDER_BUFFER_CHANGE_INSERT;
+	else
+		change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT;
+	change->origin_id = XLogRecGetOrigin(r);
+
+	memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
+
+	tupledata = XLogRecGetBlockData(r, 0, &datalen);
+
+	/*
+	 * Get the zheap tuple from WAL, convert it to heap tuple and store the
+	 * same as change stream.
+	 */
+	zhtup = DecodeXLogZTuple(tupledata, datalen);
+	reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
+								change->data.tp.relnode.relNode);
+	relation = RelationIdGetRelation(reloid);
+	htup = zheap_to_heap(zhtup, RelationGetDescr(relation));
+
+	change->data.tp.newtuple =
+		ReorderBufferGetTupleBuf(ctx->reorder,
+								 htup->t_len - SizeofHeapTupleHeader);
+	change->data.tp.newtuple->tuple.t_len = htup->t_len;
+	change->data.tp.newtuple->tuple.t_self = htup->t_self;
+	change->data.tp.newtuple->tuple.t_tableOid = htup->t_tableOid;
+	memcpy((char *) change->data.tp.newtuple->tuple.t_data,
+		   (char *) htup->t_data,
+		   htup->t_len);
+
+	/* be tidy */
+	pfree(zhtup);
+	pfree(htup);
+
+	if (relation != NULL)
+	{
+		RelationClose(relation);
+		relation = NULL;
+	}
+
+	change->data.tp.clear_toast_afterwards = true;
+
+	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
+}
+
+/*
+ * Parse XLOG_ZHEAP_DELETE from wal into proper tuplebufs.
+ *
+ * Deletes can possibly contain the old primary key.
+ */
+static void
+DecodeZDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	XLogReaderState *r = buf->record;
+	xl_zheap_delete *xlrec;
+	ReorderBufferChange *change;
+	RelFileNode target_node;
+
+	xlrec = (xl_zheap_delete *) ((char *) XLogRecGetData(r) + SizeOfUndoHeader);
+
+	/* only interested in our database */
+	XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
+	if (target_node.dbNode != ctx->slot->data.database)
+		return;
+
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+		return;
+
+	change = ReorderBufferGetChange(ctx->reorder);
+	change->action = REORDER_BUFFER_CHANGE_DELETE;
+	change->origin_id = XLogRecGetOrigin(r);
+
+	memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
+
+	/* old primary key stored */
+	if (xlrec->flags & XLZ_DELETE_CONTAINS_OLD)
+	{
+		Relation	relation = NULL;
+		ZHeapTuple	zhtup;
+		HeapTuple	htup;
+		char	*tupledata;
+		Oid		reloid;
+		Size	datalen;
+
+		tupledata = XLogRecGetBlockData(r, 0, &datalen);
+
+		/*
+		 * Get the zheap tuple from WAL, convert it to heap tuple and store the
+		 * same as change stream.
+		 */
+		zhtup = DecodeXLogZTuple(tupledata, datalen);
+		reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
+									change->data.tp.relnode.relNode);
+		relation = RelationIdGetRelation(reloid);
+		htup = zheap_to_heap(zhtup, RelationGetDescr(relation));
+
+		change->data.tp.oldtuple =
+			ReorderBufferGetTupleBuf(ctx->reorder,
+				htup->t_len - SizeofHeapTupleHeader);
+		change->data.tp.oldtuple->tuple.t_len = htup->t_len;
+		change->data.tp.oldtuple->tuple.t_self = htup->t_self;
+		change->data.tp.oldtuple->tuple.t_tableOid = htup->t_tableOid;
+		memcpy((char *) change->data.tp.oldtuple->tuple.t_data,
+			   (char *) htup->t_data,
+			   htup->t_len);
+
+		/* be tidy */
+		pfree(zhtup);
+		pfree(htup);
+
+		if (relation != NULL)
+		{
+			RelationClose(relation);
+			relation = NULL;
+		}
+	}
+
+	change->data.tp.clear_toast_afterwards = true;
+
+	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
+}
+
+/*
+ * Parse XLOG_ZHEAP_UPDATE from wal into proper tuplebufs.
+ *
+ * Updates can possibly contain a new tuple and the old primary key.
+ */
+static void
+DecodeZUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	XLogReaderState *r = buf->record;
+	xl_zheap_update *xlrec;
+	ReorderBufferChange *change;
+	char	   *data;
+	RelFileNode target_node;
+
+	data = XLogRecGetData(r);
+	xlrec = (xl_zheap_update *) (data + SizeOfUndoHeader);
+
+	/* only interested in our database */
+	XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
+	if (target_node.dbNode != ctx->slot->data.database)
+		return;
+
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+		return;
+
+	change = ReorderBufferGetChange(ctx->reorder);
+	change->action = REORDER_BUFFER_CHANGE_UPDATE;
+	change->origin_id = XLogRecGetOrigin(r);
+	memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
+
+	if (xlrec->flags & XLZ_UPDATE_CONTAINS_NEW_TUPLE)
+	{
+		Relation	relation = NULL;
+		ZHeapTuple	zhtup;
+		HeapTuple	htup;
+		char	   *tupledata;
+		Oid			reloid;
+		Size		datalen;
+
+		tupledata = XLogRecGetBlockData(r, 0, &datalen);
+		zhtup = DecodeXLogZTuple(tupledata, datalen);
+
+		reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
+									change->data.tp.relnode.relNode);
+		relation = RelationIdGetRelation(reloid);
+		htup = zheap_to_heap(zhtup, RelationGetDescr(relation));
+
+		change->data.tp.newtuple =
+			ReorderBufferGetTupleBuf(ctx->reorder,
+				htup->t_len - SizeofHeapTupleHeader);
+		change->data.tp.newtuple->tuple.t_len = htup->t_len;
+		change->data.tp.newtuple->tuple.t_self = htup->t_self;
+		change->data.tp.newtuple->tuple.t_tableOid = htup->t_tableOid;
+		memcpy((char *) change->data.tp.newtuple->tuple.t_data,
+			   (char *) htup->t_data,
+			   htup->t_len);
+
+		/* be tidy */
+		pfree(zhtup);
+		pfree(htup);
+
+		if (relation != NULL)
+		{
+			RelationClose(relation);
+			relation = NULL;
+		}
+	}
+
+	if (xlrec->flags & XLZ_UPDATE_CONTAINS_OLD)
+	{
+		Relation	relation = NULL;
+		ZHeapTuple	zhtup;
+		HeapTuple	htup;
+		char	   *tupledata;
+		Oid			reloid;
+		Size		datalen;
+		int32		offset = 0;
+
+		offset =  SizeOfZHeapUpdate + SizeOfUndoHeader;
+
+		if (xlrec->flags & XLZ_UPDATE_OLD_CONTAINS_TPD_SLOT)
+			offset += sizeof(int32);
+		if (xlrec->flags & XLZ_NON_INPLACE_UPDATE)
+		{
+			offset += SizeOfUndoHeader;
+
+			if (xlrec->flags & XLZ_UPDATE_NEW_CONTAINS_TPD_SLOT)
+				offset += sizeof(int32);
+		}
+
+		tupledata = data + offset;
+		datalen = XLogRecGetDataLen(r) - offset;
+		zhtup = DecodeXLogZTuple(tupledata, datalen);
+
+		reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
+									change->data.tp.relnode.relNode);
+		relation = RelationIdGetRelation(reloid);
+		htup = zheap_to_heap(zhtup, RelationGetDescr(relation));
+
+		change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder,
+								   htup->t_len - SizeofHeapTupleHeader);
+		change->data.tp.oldtuple->tuple.t_len = htup->t_len;
+		change->data.tp.oldtuple->tuple.t_self = htup->t_self;
+		change->data.tp.oldtuple->tuple.t_tableOid = htup->t_tableOid;
+		memcpy((char *) change->data.tp.oldtuple->tuple.t_data,
+			   (char *) htup->t_data,
+			   htup->t_len);
+
+		/* be tidy */
+		pfree(zhtup);
+		pfree(htup);
+
+		if (relation != NULL)
+		{
+			RelationClose(relation);
+			relation = NULL;
+		}
+	}
+
+	change->data.tp.clear_toast_afterwards = true;
+
+	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
+}
+
+/*
+ * Decode XLOG_HEAP2_MULTI_INSERT_insert record into multiple tuplebufs.
+ *
+ * Currently MULTI_INSERT will always contain the full tuples.
+ */
+static void
+DecodeZMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	XLogReaderState *r = buf->record;
+	xl_zheap_multi_insert *xlrec;
+	Relation	relation = NULL;
+	int			i;
+	char	   *data;
+	char	   *tupledata;
+	Size		tuplelen;
+	RelFileNode rnode;
+
+	xlrec = (xl_zheap_multi_insert *) ((char *) XLogRecGetData(r) + SizeOfUndoHeader);
+
+	/* only interested in our database */
+	XLogRecGetBlockTag(r, 0, &rnode, NULL, NULL);
+	if (rnode.dbNode != ctx->slot->data.database)
+		return;
+
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+		return;
+
+	tupledata = XLogRecGetBlockData(r, 0, &tuplelen);
+
+	data = tupledata;
+	for (i = 0; i < xlrec->ntuples; i++)
+	{
+		ReorderBufferChange *change;
+		xl_multi_insert_ztuple *xlhdr;
+		int			datalen;
+		Oid			reloid;
+		ZHeapTuple	zhtup;
+		HeapTuple	htup;
+
+		change = ReorderBufferGetChange(ctx->reorder);
+		change->action = REORDER_BUFFER_CHANGE_INSERT;
+		change->origin_id = XLogRecGetOrigin(r);
+
+		memcpy(&change->data.tp.relnode, &rnode, sizeof(RelFileNode));
+
+		/*
+		 * CONTAINS_NEW_TUPLE will always be set currently as multi_insert
+		 * isn't used for catalogs, but better be future proof.
+		 *
+		 * We decode the tuple in pretty much the same way as DecodeXLogTuple,
+		 * but since the layout is slightly different, we can't use it here.
+		 */
+		if (xlrec->flags & XLZ_INSERT_CONTAINS_NEW_TUPLE)
+		{
+			ZHeapTupleHeader header;
+
+			xlhdr = (xl_multi_insert_ztuple *) SHORTALIGN(data);
+			data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
+			datalen = xlhdr->datalen;
+
+			zhtup = palloc(tuplelen + ZHEAPTUPLESIZE);
+			header = zhtup->t_data = (ZHeapTupleHeader)((char *) zhtup + ZHEAPTUPLESIZE);
+
+			zhtup->t_len = tuplelen;
+			/* not a disk based tuple */
+			ItemPointerSetInvalid(&zhtup->t_self);
+
+			/* we can only figure this out after reassembling the transactions */
+			zhtup->t_tableOid = InvalidOid;
+
+			memcpy(((char *) zhtup->t_data) + SizeofZHeapTupleHeader,
+				   (char *) data,
+				   datalen);
+
+			header->t_infomask = xlhdr->t_infomask;
+			header->t_infomask2 = xlhdr->t_infomask2;
+			header->t_hoff = xlhdr->t_hoff;
+
+			if (!RelationIsValid(relation))
+			{
+				reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
+											change->data.tp.relnode.relNode);
+				relation = RelationIdGetRelation(reloid);
+			}
+
+			htup = zheap_to_heap(zhtup, RelationGetDescr(relation));
+
+			change->data.tp.newtuple =
+				ReorderBufferGetTupleBuf(ctx->reorder,
+										htup->t_len - SizeofHeapTupleHeader);
+			change->data.tp.newtuple->tuple.t_len = htup->t_len;
+			change->data.tp.newtuple->tuple.t_self = htup->t_self;
+			change->data.tp.newtuple->tuple.t_tableOid = htup->t_tableOid;
+			memcpy((char *) change->data.tp.newtuple->tuple.t_data,
+				   (char *) htup->t_data,
+					htup->t_len);
+
+			data += datalen;
+
+			/* be tidy */
+			pfree(zhtup);
+			pfree(htup);
+		}
+
+		/*
+		 * Reset toast reassembly state only after the last row in the last
+		 * xl_multi_insert_tuple record emitted by one heap_multi_insert()
+		 * call.
+		 */
+		if (xlrec->flags & XLZ_INSERT_LAST_IN_MULTI &&
+			(i + 1) == xlrec->ntuples)
+			change->data.tp.clear_toast_afterwards = true;
+		else
+			change->data.tp.clear_toast_afterwards = false;
+
+		ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
+								 buf->origptr, change);
+	}
+	Assert(data == tupledata + tuplelen);
+
+	if (RelationIsValid(relation))
+	{
+		RelationClose(relation);
+		relation = NULL;
+	}
+}
+
+/*
+ * Read a ZHeapTuple as WAL logged by zheap_insert, zheap_update and
+ * zheap_delete (but not by zheap_multi_insert) and for in-memory tuple.
+ *
+ * The size 'len' and the pointer 'data' in the record need to be
+ * computed outside as they are record specific.
+ *
+ * The caller is responsible to free the memory for tuple allocated by
+ * this function.
+ */
+static ZHeapTuple
+DecodeXLogZTuple(char *data, Size len)
+{
+	ZHeapTuple	zhtup;
+	xl_zheap_header xlhdr;
+	int			datalen = len - SizeOfZHeapHeader;
+	int			tuplelen = datalen + SizeofZHeapTupleHeader;
+	ZHeapTupleHeader header;
+
+	Assert(datalen >= 0);
+
+	zhtup = palloc(tuplelen + ZHEAPTUPLESIZE);
+	header = zhtup->t_data = (ZHeapTupleHeader)((char *) zhtup + ZHEAPTUPLESIZE);
+
+	zhtup->t_len = tuplelen;
+	/* not a disk based tuple */
+	ItemPointerSetInvalid(&zhtup->t_self);
+
+	/* we can only figure this out after reassembling the transactions */
+	zhtup->t_tableOid = InvalidOid;
+
+	/* data is not stored aligned, copy to aligned storage */
+	memcpy((char *) &xlhdr, data, SizeOfZHeapHeader);
+
+	memset(header, 0, SizeofZHeapTupleHeader);
+
+	memcpy(((char *) zhtup->t_data) + SizeofZHeapTupleHeader,
+		   data + SizeOfZHeapHeader,
+		   datalen);
+
+	header->t_infomask = xlhdr.t_infomask;
+	header->t_infomask2 = xlhdr.t_infomask2;
+	header->t_hoff = xlhdr.t_hoff;
+
+	return zhtup;
+}
diff --git a/src/include/access/zheapam_xlog.h b/src/include/access/zheapam_xlog.h
index 6d031dc..99a360d 100644
--- a/src/include/access/zheapam_xlog.h
+++ b/src/include/access/zheapam_xlog.h
@@ -124,6 +124,12 @@ typedef struct xl_zheap_insert
 #define XLZ_DELETE_CONTAINS_TPD_SLOT			(1<<2)
 #define XLZ_DELETE_CONTAINS_SUBXACT				(1<<3)
 #define XLZ_DELETE_IS_PARTITION_MOVE			(1<<4)
+#define XLZ_DELETE_CONTAINS_OLD_TUPLE			(1<<5)
+#define XLZ_DELETE_CONTAINS_OLD_KEY				(1<<6)
+
+/* convenience macro for checking whether any form of old tuple was logged */
+#define XLZ_DELETE_CONTAINS_OLD						\
+	(XLZ_DELETE_CONTAINS_OLD_TUPLE | XLZ_DELETE_CONTAINS_OLD_KEY)
 
 /* This is what we need to know about delete */
 typedef struct xl_zheap_delete
@@ -155,6 +161,13 @@ typedef struct xl_zheap_delete
 #define	XLZ_UPDATE_OLD_CONTAINS_TPD_SLOT		(1<<6)
 #define	XLZ_UPDATE_NEW_CONTAINS_TPD_SLOT		(1<<7)
 #define XLZ_UPDATE_CONTAINS_SUBXACT				(1<<8)
+#define XLZ_UPDATE_CONTAINS_OLD_TUPLE			(1<<9)
+#define XLZ_UPDATE_CONTAINS_OLD_KEY				(1<<10)
+#define XLZ_UPDATE_CONTAINS_NEW_TUPLE			(1<<11)
+
+/* convenience macro for checking whether any form of old tuple was logged */
+#define XLZ_UPDATE_CONTAINS_OLD						\
+	(XLZ_UPDATE_CONTAINS_OLD_TUPLE | XLZ_UPDATE_CONTAINS_OLD_KEY)
 
 /*
  * This is what we need to know about update|inplace_update