diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index f6e77fbda1..e194550643 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -388,6 +388,80 @@ tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_ } } +/* print the tuple 'tuple' into the StringInfo s */ +static void +ztuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, ZHeapTuple tuple, bool skip_nulls) +{ + int natt; + + /* print all columns individually */ + for (natt = 0; natt < tupdesc->natts; natt++) + { + Form_pg_attribute attr; /* the attribute itself */ + Oid typid; /* type of current attribute */ + Oid typoutput; /* output function */ + bool typisvarlena; + Datum origval; /* possibly toasted Datum */ + bool isnull; /* column is null? */ + + attr = TupleDescAttr(tupdesc, natt); + + /* + * don't print dropped columns, we can't be sure everything is + * available for them + */ + if (attr->attisdropped) + continue; + + /* + * Don't print system columns, oid will already have been printed if + * present. + */ + if (attr->attnum < 0) + continue; + + typid = attr->atttypid; + + /* get Datum from tuple */ + origval = zheap_getattr(tuple, natt + 1, tupdesc, &isnull); + + if (isnull && skip_nulls) + continue; + + /* print attribute name */ + appendStringInfoChar(s, ' '); + appendStringInfoString(s, quote_identifier(NameStr(attr->attname))); + + /* print attribute type */ + appendStringInfoChar(s, '['); + appendStringInfoString(s, format_type_be(typid)); + appendStringInfoChar(s, ']'); + + /* query output function */ + getTypeOutputInfo(typid, + &typoutput, &typisvarlena); + + /* print separator */ + appendStringInfoChar(s, ':'); + + /* print data */ + if (isnull) + appendStringInfoString(s, "null"); + else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval)) + appendStringInfoString(s, "unchanged-toast-datum"); + else if (!typisvarlena) + print_literal(s, typid, + OidOutputFunctionCall(typoutput, origval)); + else + { + Datum val; /* definitely detoasted Datum */ + + val = PointerGetDatum(PG_DETOAST_DATUM(origval)); + print_literal(s, typid, OidOutputFunctionCall(typoutput, val)); + } + } +} + /* * callback for individual changed tuples */ @@ -468,6 +542,15 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, &change->data.tp.oldtuple->tuple, true); break; + case REORDER_BUFFER_CHANGE_ZINSERT: + appendStringInfoString(ctx->out, " INSERT:"); + if (change->data.ztp.newtuple == NULL) + appendStringInfoString(ctx->out, " (no-tuple-data)"); + else + ztuple_to_stringinfo(ctx->out, tupdesc, + &change->data.ztp.newtuple->tuple, + false); + break; default: Assert(false); } diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 95153f4e29..01f32e8fcf 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -34,6 +34,9 @@ #include "access/xlogutils.h" #include "access/xlogreader.h" #include "access/xlogrecord.h" +#include "access/zheap.h" +#include "access/zheapam_xlog.h" +#include "access/zhtup.h" #include "catalog/pg_control.h" @@ -57,6 +60,7 @@ typedef struct XLogRecordBuffer static void DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +static void DecodeZHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); @@ -74,6 +78,10 @@ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_abort *parsed, TransactionId xid); +/* record handlers for zheap */ +static void DecodeZInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +static void DecodeXLogZTuple(char *data, Size len, ReorderBufferZTupleBuf *tuple); + /* common function to decode tuples */ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); @@ -161,7 +169,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor break; case RM_ZHEAP_ID: /* Logical decoding is not yet implemented for zheap. */ - Assert(0); + DecodeZHeapOp(ctx, &buf); break; case RM_ZHEAP2_ID: /* Logical decoding is not yet implemented for zheap. */ @@ -510,6 +518,43 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } } +/* + * Handle rmgr ZHEAP_ID records for DecodeRecordIntoReorderBuffer(). + */ +static void +DecodeZHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + uint8 info = XLogRecGetInfo(buf->record) & XLOG_ZHEAP_OPMASK; + TransactionId xid = XLogRecGetXid(buf->record); + SnapBuild *builder = ctx->snapshot_builder; + + ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr); + + /* + * If we don't have snapshot or we are just fast-forwarding, there is no + * point in decoding data changes. + */ + if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || + ctx->fast_forward) + return; + + switch (info) + { + case XLOG_ZHEAP_INSERT: + if (SnapBuildProcessChange(builder, xid, buf->origptr)) + DecodeZInsert(ctx, buf); + break; + + case XLOG_ZHEAP_LOCK: + /* we don't care about row level locks for now */ + break; + + default: + elog(ERROR, "unexpected RM_ZHEAP_ID record type: %u", info); + break; + } +} + static inline bool FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id) { @@ -1068,3 +1113,98 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple) header->t_infomask2 = xlhdr.t_infomask2; header->t_hoff = xlhdr.t_hoff; } + +/* + * Parse XLOG_ZHEAP_INSERT (not ZMULTI_INSERT!) records into tuplebufs. + * + * Deletes can contain the new tuple. + */ +static void +DecodeZInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + Size datalen; + char *tupledata; + Size tuplelen; + XLogReaderState *r = buf->record; + xl_zheap_insert *xlrec; + ReorderBufferChange *change; + RelFileNode target_node; + + xlrec = (xl_zheap_insert *) ((char *) XLogRecGetData(r) + SizeOfUndoHeader); + + /* + * Ignore insert records without new tuples (this does happen when + * raw_zheap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL). + */ + if (!(xlrec->flags & XLZ_INSERT_CONTAINS_NEW_TUPLE)) + return; + + /* only interested in our database */ + XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL); + if (target_node.dbNode != ctx->slot->data.database) + return; + + /* output plugin doesn't look for this origin, no need to queue */ + if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + return; + + change = ReorderBufferGetChange(ctx->reorder); + if (!(xlrec->flags & XLZ_INSERT_IS_SPECULATIVE)) + change->action = REORDER_BUFFER_CHANGE_ZINSERT; + else + change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ZINSERT; + change->origin_id = XLogRecGetOrigin(r); + + memcpy(&change->data.ztp.relnode, &target_node, sizeof(RelFileNode)); + + tupledata = XLogRecGetBlockData(r, 0, &datalen); + tuplelen = datalen - SizeOfZHeapHeader; + + change->data.ztp.newtuple = + ReorderBufferGetZTupleBuf(ctx->reorder, tuplelen); + + DecodeXLogZTuple(tupledata, datalen, change->data.ztp.newtuple); + + change->data.ztp.clear_toast_afterwards = true; + + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); +} + +/* + * Read a ZHeapTuple as WAL logged by zheap_insert, zheap_update and + * zheap_delete (but not by zheap_multi_insert) into a tuplebuf. + * + * The size 'len' and the pointer 'data' in the record need to be + * computed outside as they are record specific. + */ +static void +DecodeXLogZTuple(char *data, Size len, ReorderBufferZTupleBuf *tuple) +{ + xl_zheap_header xlhdr; + int datalen = len - SizeOfZHeapHeader; + ZHeapTupleHeader header; + + Assert(datalen >= 0); + + tuple->tuple.t_len = datalen + SizeofZHeapTupleHeader; + header = tuple->tuple.t_data; + + /* not a disk based tuple */ + ItemPointerSetInvalid(&tuple->tuple.t_self); + + /* we can only figure this out after reassembling the transactions */ + tuple->tuple.t_tableOid = InvalidOid; + + /* data is not stored aligned, copy to aligned storage */ + memcpy((char *) &xlhdr, data, SizeOfZHeapHeader); + + memset(header, 0, SizeofZHeapTupleHeader); + + memcpy(((char *) tuple->tuple.t_data) + SizeofZHeapTupleHeader, + data + SizeOfZHeapHeader, + datalen); + + header->t_infomask = xlhdr.t_infomask; + header->t_infomask2 = xlhdr.t_infomask2; + header->t_hoff = xlhdr.t_hoff; +} diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 19451714da..525dc2b19d 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -347,6 +347,25 @@ logicalrep_read_truncate(StringInfo in, return relids; } +/* + * Write zheap's INSERT to the output stream. + */ +void +logicalrep_write_zinsert(StringInfo out, Relation rel, ZHeapTuple newtuple) +{ + pq_sendbyte(out, 'I'); /* action INSERT */ + + Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || + rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || + rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX); + + /* use Oid as relation identifier */ + pq_sendint32(out, RelationGetRelid(rel)); + + pq_sendbyte(out, 'N'); /* new tuple follows */ + //logicalrep_write_tuple(out, rel, newtuple); +} + /* * Write relation description to the output stream. */ diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 23466bade2..70fb5e2934 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -393,6 +393,19 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change) change->data.tp.oldtuple = NULL; } break; + case REORDER_BUFFER_CHANGE_ZINSERT: + if (change->data.tp.newtuple) + { + ReorderBufferReturnZTupleBuf(rb, change->data.ztp.newtuple); + change->data.ztp.newtuple = NULL; + } + + if (change->data.ztp.oldtuple) + { + ReorderBufferReturnZTupleBuf(rb, change->data.ztp.oldtuple); + change->data.ztp.oldtuple = NULL; + } + break; case REORDER_BUFFER_CHANGE_MESSAGE: if (change->data.msg.prefix != NULL) pfree(change->data.msg.prefix); @@ -456,6 +469,37 @@ ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple) pfree(tuple); } +/* + * Get a fresh ReorderBufferZTupleBuf fitting at least a tuple of size + * tuple_len (excluding header overhead). + */ +ReorderBufferZTupleBuf * +ReorderBufferGetZTupleBuf(ReorderBuffer *rb, Size tuple_len) +{ + ReorderBufferZTupleBuf *tuple; + Size alloc_len; + + alloc_len = tuple_len + SizeofZHeapTupleHeader; + + tuple = (ReorderBufferZTupleBuf *) + MemoryContextAlloc(rb->tup_context, + sizeof(ReorderBufferZTupleBuf) + + alloc_len); + tuple->alloc_tuple_size = alloc_len; + tuple->tuple.t_data = ReorderBufferZTupleBufData(tuple); + + return tuple; +} + +/* + * Free an ReorderBufferZTupleBuf. + */ +void +ReorderBufferReturnZTupleBuf(ReorderBuffer *rb, ReorderBufferZTupleBuf *tuple) +{ + pfree(tuple); +} + /* * Get an array for relids of truncated relations. * @@ -1684,6 +1728,71 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, break; } + case REORDER_BUFFER_CHANGE_ZINSERT: + { + Assert(snapshot_now); + + reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode, + change->data.tp.relnode.relNode); + + if (reloid == InvalidOid && + change->data.ztp.newtuple == NULL && + change->data.ztp.oldtuple == NULL) + goto change_done; + else if (reloid == InvalidOid) + elog(ERROR, "could not map filenode \"%s\" to relation OID", + relpathperm(change->data.ztp.relnode, + MAIN_FORKNUM)); + + relation = RelationIdGetRelation(reloid); + + if (relation == NULL) + elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")", + reloid, + relpathperm(change->data.ztp.relnode, + MAIN_FORKNUM)); + + if (!RelationIsLogicallyLogged(relation)) + goto zchange_done; + + /* + * Ignore temporary heaps created during DDL unless the + * plugin has asked for them. + */ + if (relation->rd_rel->relrewrite && !rb->output_rewrites) + goto zchange_done; + + /* + * For now ignore sequence changes entirely. Most of the + * time they don't log changes using records we + * understand, so it doesn't make sense to handle the few + * cases we do. + */ + if (relation->rd_rel->relkind == RELKIND_SEQUENCE) + goto zchange_done; + + /* user-triggered change */ + if (!IsToastRelation(relation)) + { + rb->apply_change(rb, txn, relation, change); + } + else if (change->action == REORDER_BUFFER_CHANGE_ZINSERT) + { + /* toast table implementation for zheap is not done yet. */ + elog(ERROR,"decoding for toast tables not supported in zheap"); + } + + zchange_done: + + if (relation != NULL) + { + RelationClose(relation); + relation = NULL; + } + } + + break; + case REORDER_BUFFER_CHANGE_MESSAGE: rb->message(rb, txn, change->lsn, true, change->data.msg.prefix, diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 86e0951a70..303a3b8303 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -365,6 +365,12 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, else elog(DEBUG1, "didn't send DELETE change because of missing oldtuple"); break; + case REORDER_BUFFER_CHANGE_ZINSERT: + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_zinsert(ctx->out, relation, + &change->data.ztp.newtuple->tuple); + OutputPluginWrite(ctx, true); + break; default: Assert(false); } diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 8192f79ce3..587472ee06 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -106,4 +106,7 @@ extern LogicalRepRelation *logicalrep_read_rel(StringInfo in); extern void logicalrep_write_typ(StringInfo out, Oid typoid); extern void logicalrep_read_typ(StringInfo out, LogicalRepTyp *ltyp); +extern void logicalrep_write_zinsert(StringInfo out, Relation rel, + ZHeapTuple newtuple); + #endif /* LOGICALREP_PROTO_H */ diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 7787edf7b6..2544cc3951 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -10,6 +10,7 @@ #define REORDERBUFFER_H #include "access/htup_details.h" +#include "access/zhtup.h" #include "lib/ilist.h" #include "storage/sinval.h" #include "utils/hsearch.h" @@ -36,6 +37,25 @@ typedef struct ReorderBufferTupleBuf #define ReorderBufferTupleBufData(p) \ ((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf))) +/* an individual zheap tuple, stored in one chunk of memory */ +typedef struct ReorderBufferZTupleBuf +{ + /* position in preallocated list */ + slist_node node; + + /* tuple header, the interesting bit for users of logical decoding */ + ZHeapTupleData tuple; + + /* pre-allocated size of tuple buffer, different from tuple size */ + Size alloc_tuple_size; + + /* actual tuple data follows */ +} ReorderBufferZTupleBuf; + +/* pointer to the data stored in a TupleBuf */ +#define ReorderBufferZTupleBufData(p) \ + ((ZHeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferZTupleBuf))) + /* * Types of the change passed to a 'change' callback. * @@ -60,7 +80,10 @@ enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, - REORDER_BUFFER_CHANGE_TRUNCATE + REORDER_BUFFER_CHANGE_TRUNCATE, + REORDER_BUFFER_CHANGE_ZINSERT, + REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ZINSERT, + REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ZCONFIRM }; /* @@ -100,6 +123,20 @@ typedef struct ReorderBufferChange ReorderBufferTupleBuf *newtuple; } tp; + struct + { + /* relation that has been changed */ + RelFileNode relnode; + + /* no previously reassembled toast chunks are necessary anymore */ + bool clear_toast_afterwards; + + /* valid for DELETE || UPDATE */ + ReorderBufferZTupleBuf *oldtuple; + /* valid for INSERT || UPDATE */ + ReorderBufferZTupleBuf *newtuple; + } ztp; + /* * Truncate data for REORDER_BUFFER_CHANGE_TRUNCATE representing one * set of relations to be truncated. @@ -399,6 +436,8 @@ void ReorderBufferFree(ReorderBuffer *); ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *, Size tuple_len); void ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple); +ReorderBufferZTupleBuf *ReorderBufferGetZTupleBuf(ReorderBuffer *rb, Size tuple_len); +void ReorderBufferReturnZTupleBuf(ReorderBuffer *, ReorderBufferZTupleBuf *tuple); ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *); void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *);