diff --git a/src/backend/access/zheap/zheapam.c b/src/backend/access/zheap/zheapam.c index ee0b7f0c43..bf261c37d5 100644 --- a/src/backend/access/zheap/zheapam.c +++ b/src/backend/access/zheap/zheapam.c @@ -116,6 +116,8 @@ static void zheap_lock_tuple_guts(Relation rel, Buffer buf, ZHeapTuple zhtup, TransactionId single_locker_xid, int single_locker_trans_slot, UndoRecPtr prev_urecptr, CommandId cid, bool any_multi_locker_member_alive); +static ZHeapTuple ZExtractReplicaIdentity(Relation relation, ZHeapTuple tp, + bool key_changed, bool *copy); static void compute_new_xid_infomask(ZHeapTuple zhtup, Buffer buf, TransactionId tup_xid, int tup_trans_slot, uint16 old_infomask, TransactionId add_to_xid, @@ -1265,6 +1267,7 @@ zheap_delete(Relation relation, ItemPointer tid, CommandId tup_cid; ItemId lp; ZHeapTupleData zheaptup; + ZHeapTuple old_key_tuple = NULL; /* replica identity of the tuple */ UnpackedUndoRecord undorecord; Page page; BlockNumber blkno; @@ -1286,6 +1289,7 @@ zheap_delete(Relation relation, ItemPointer tid, bool lock_reacquired; bool hasSubXactLock = false; bool hasPayload = false; + bool old_key_copied = false; xl_undolog_meta undometa; uint8 vm_status; @@ -1958,6 +1962,13 @@ zheap_tuple_updated: vm_status = visibilitymap_get_status(relation, BufferGetBlockNumber(buffer), &vmbuffer); + /* + * Compute replica identity tuple before entering the critical section so + * we don't PANIC upon a memory allocation failure. + */ + old_key_tuple = ZExtractReplicaIdentity(relation, &zheaptup, true, + &old_key_copied); + START_CRIT_SECTION(); /* @@ -2011,6 +2022,7 @@ zheap_tuple_updated: XLogRecPtr RedoRecPtr; uint32 totalundotuplen = 0; Size dataoff; + int bufflags = 0; bool doPageWrites; /* @@ -2031,6 +2043,15 @@ zheap_tuple_updated: xlrec.flags |= XLZ_DELETE_IS_PARTITION_MOVE; if (hasSubXactLock) xlrec.flags |= XLZ_DELETE_CONTAINS_SUBXACT; + if (old_key_tuple != NULL) + { + bufflags |= REGBUF_KEEP_DATA; + + if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL) + xlrec.flags |= XLZ_DELETE_CONTAINS_OLD_TUPLE; + else + xlrec.flags |= XLZ_DELETE_CONTAINS_OLD_KEY; + } /* * If full_page_writes is enabled, and the buffer image is not @@ -2076,7 +2097,27 @@ prepare_xlog: totalundotuplen - SizeofZHeapTupleHeader); } - XLogRegisterBuffer(0, buffer, REGBUF_STANDARD); + XLogRegisterBuffer(0, buffer, REGBUF_STANDARD | bufflags); + + /* + * Log replica identity of the deleted tuple if there is one + */ + if (old_key_tuple != NULL) + { + xl_zheap_header xlzhdr; + + xlzhdr.t_infomask2 = old_key_tuple->t_data->t_infomask2; + xlzhdr.t_infomask = old_key_tuple->t_data->t_infomask; + xlzhdr.t_hoff = old_key_tuple->t_data->t_hoff; + + XLogRegisterBufData(0, (char *) &xlzhdr, SizeOfZHeapHeader); + XLogRegisterBufData(0, + (char *) old_key_tuple->t_data + + SizeofZHeapTupleHeader, + old_key_tuple->t_len - + SizeofZHeapTupleHeader); + } + if (trans_slot_id > ZHEAP_PAGE_TRANS_SLOTS) (void) RegisterTPDBuffer(page, 1); RegisterUndoLogBuffers(2); @@ -2135,6 +2176,9 @@ prepare_xlog: if (have_tuple_lock) UnlockTupleTuplock(relation, &(zheaptup.t_self), LockTupleExclusive); + if (old_key_tuple != NULL && old_key_copied) + zheap_freetuple(old_key_tuple); + pgstat_count_heap_delete(relation); return HeapTupleMayBeUpdated; @@ -5886,6 +5930,102 @@ prepare_xlog: UnlockReleaseTPDBuffers(); } +/* + * Build a zheap tuple representing the configured REPLICA IDENTITY to represent + * the old tuple in a UPDATE or DELETE. + * + * Returns NULL if there's no need to log an identity or if there's no suitable + * key in the Relation relation. + */ +static ZHeapTuple +ZExtractReplicaIdentity(Relation relation, ZHeapTuple tp, bool key_changed, + bool *copy) +{ + TupleDesc desc = RelationGetDescr(relation); + Oid replidindex; + Relation idx_rel; + char replident = relation->rd_rel->relreplident; + ZHeapTuple key_tuple = NULL; + bool nulls[MaxHeapAttributeNumber]; + Datum values[MaxHeapAttributeNumber]; + int natt; + + *copy = false; + + if (!RelationIsLogicallyLogged(relation)) + return NULL; + + if (replident == REPLICA_IDENTITY_NOTHING) + return NULL; + + if (replident == REPLICA_IDENTITY_FULL) + { + /* + * When logging the entire old tuple, it very well could contain + * toasted columns. If so, force them to be inlined. + */ + if (ZHeapTupleHasExternal(tp)) + { + elog(ERROR, "toast tables are not supported with replica identity"); + } + return tp; + } + + /* if the key hasn't changed and we're only logging the key, we're done */ + if (!key_changed) + return NULL; + + /* find the replica identity index */ + replidindex = RelationGetReplicaIndex(relation); + if (!OidIsValid(replidindex)) + { + elog(DEBUG4, "could not find configured replica identity for table \"%s\"", + RelationGetRelationName(relation)); + return NULL; + } + + idx_rel = RelationIdGetRelation(replidindex); + + Assert(CheckRelationLockedByMe(idx_rel, AccessShareLock, true)); + + /* deform tuple, so we have fast access to columns */ + zheap_deform_tuple(tp, desc, values, nulls); + + /* set all columns to NULL, regardless of whether they actually are */ + memset(nulls, 1, sizeof(nulls)); + + /* + * Now set all columns contained in the index to NOT NULL, they cannot + * currently be NULL. + */ + for (natt = 0; natt < IndexRelationGetNumberOfKeyAttributes(idx_rel); natt++) + { + int attno = idx_rel->rd_index->indkey.values[natt]; + + if (attno < 0) + elog(ERROR, "system column in index"); + nulls[attno - 1] = false; + } + + key_tuple = zheap_form_tuple(desc, values, nulls); + *copy = true; + RelationClose(idx_rel); + + /* + * If the tuple, which by here only contains indexed columns, still has + * toasted columns, force them to be inlined. This is somewhat unlikely + * since there's limits on the size of indexed columns, so we don't + * duplicate toast_flatten_tuple()s functionality in the above loop over + * the indexed columns, even if it would be more efficient. + */ + if (ZHeapTupleHasExternal(key_tuple)) + { + elog(ERROR, "toast tables are not supported with replica identity"); + } + + return key_tuple; +} + /* * compute_new_xid_infomask - Given the old values of tuple header's infomask, * compute the new values for tuple header which includes lock mode, new diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index bafbbed50e..54800defc4 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -34,6 +34,10 @@ #include "access/xlogutils.h" #include "access/xlogreader.h" #include "access/xlogrecord.h" +#include "access/zheap.h" +#include "access/zheapam_xlog.h" +#include "access/zheaputils.h" +#include "access/zhtup.h" #include "catalog/pg_control.h" @@ -45,6 +49,8 @@ #include "replication/snapbuild.h" #include "storage/standby.h" +#include "utils/rel.h" +#include "utils/relfilenodemap.h" typedef struct XLogRecordBuffer { @@ -57,6 +63,7 @@ typedef struct XLogRecordBuffer static void DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +static void DecodeZHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); @@ -74,6 +81,11 @@ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_abort *parsed, TransactionId xid); +/* record handlers for zheap */ +static void DecodeZInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +static void DecodeZDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +static ZHeapTuple DecodeXLogZTuple(char *data, Size len); + /* common function to decode tuples */ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); @@ -161,7 +173,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor break; case RM_ZHEAP_ID: /* Logical decoding is not yet implemented for zheap. */ - Assert(0); + DecodeZHeapOp(ctx, &buf); break; case RM_ZHEAP2_ID: /* Logical decoding is not yet implemented for zheap. */ @@ -510,6 +522,48 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } } +/* + * Handle rmgr ZHEAP_ID records for DecodeRecordIntoReorderBuffer(). + */ +static void +DecodeZHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + uint8 info = XLogRecGetInfo(buf->record) & XLOG_ZHEAP_OPMASK; + TransactionId xid = XLogRecGetXid(buf->record); + SnapBuild *builder = ctx->snapshot_builder; + + ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr); + + /* + * If we don't have snapshot or we are just fast-forwarding, there is no + * point in decoding data changes. + */ + if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || + ctx->fast_forward) + return; + + switch (info) + { + case XLOG_ZHEAP_INSERT: + if (SnapBuildProcessChange(builder, xid, buf->origptr)) + DecodeZInsert(ctx, buf); + break; + + case XLOG_ZHEAP_DELETE: + if (SnapBuildProcessChange(builder, xid, buf->origptr)) + DecodeZDelete(ctx, buf); + break; + + case XLOG_ZHEAP_LOCK: + /* we don't care about row level locks for now */ + break; + + default: + elog(ERROR, "unexpected RM_ZHEAP_ID record type: %u", info); + break; + } +} + static inline bool FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id) { @@ -1068,3 +1122,212 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple) header->t_infomask2 = xlhdr.t_infomask2; header->t_hoff = xlhdr.t_hoff; } + +/* + * Parse XLOG_ZHEAP_INSERT (not ZMULTI_INSERT!) records into tuplebufs. + * + * Here we retrieve zheap tuple, convert it to heap tuple format so + * reorder buffer stream can understand the tuple format. + */ +static void +DecodeZInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + Size datalen; + char *tupledata; + XLogReaderState *r = buf->record; + xl_zheap_insert *xlrec; + ReorderBufferChange *change; + RelFileNode target_node; + Relation relation = NULL; + Oid reloid; + ZHeapTuple zhtup; + HeapTuple htup; + + xlrec = (xl_zheap_insert *) ((char *) XLogRecGetData(r) + SizeOfUndoHeader); + + /* + * Ignore insert records without new tuples (this does happen when + * raw_zheap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL). + */ + if (!(xlrec->flags & XLZ_INSERT_CONTAINS_NEW_TUPLE)) + return; + + /* only interested in our database */ + XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL); + if (target_node.dbNode != ctx->slot->data.database) + return; + + /* output plugin doesn't look for this origin, no need to queue */ + if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + return; + + change = ReorderBufferGetChange(ctx->reorder); + if (!(xlrec->flags & XLZ_INSERT_IS_SPECULATIVE)) + change->action = REORDER_BUFFER_CHANGE_INSERT; + else + change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT; + change->origin_id = XLogRecGetOrigin(r); + + memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); + + tupledata = XLogRecGetBlockData(r, 0, &datalen); + + /* + * Get the zheap tuple from WAL, convert it to heap tuple and store the + * same as change stream. + */ + zhtup = DecodeXLogZTuple(tupledata, datalen); + reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode, + change->data.tp.relnode.relNode); + relation = RelationIdGetRelation(reloid); + htup = zheap_to_heap(zhtup, RelationGetDescr(relation)); + + change->data.tp.newtuple = + ReorderBufferGetTupleBuf(ctx->reorder, + htup->t_len - SizeofHeapTupleHeader); + change->data.tp.newtuple->tuple.t_len = htup->t_len; + change->data.tp.newtuple->tuple.t_self = htup->t_self; + change->data.tp.newtuple->tuple.t_tableOid = htup->t_tableOid; + memcpy((char *) change->data.tp.newtuple->tuple.t_data, + (char *) htup->t_data, + htup->t_len); + + /* be tidy */ + pfree(zhtup); + pfree(htup); + + if (relation != NULL) + { + RelationClose(relation); + relation = NULL; + } + + change->data.tp.clear_toast_afterwards = true; + + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); +} + +/* + * Parse XLOG_ZHEAP_DELETE from wal into proper tuplebufs. + * + * Deletes can possibly contain the old primary key. + */ +static void +DecodeZDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + XLogReaderState *r = buf->record; + xl_zheap_delete *xlrec; + ReorderBufferChange *change; + RelFileNode target_node; + + xlrec = (xl_zheap_delete *) ((char *) XLogRecGetData(r) + SizeOfUndoHeader); + + /* only interested in our database */ + XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL); + if (target_node.dbNode != ctx->slot->data.database) + return; + + /* output plugin doesn't look for this origin, no need to queue */ + if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + return; + + change = ReorderBufferGetChange(ctx->reorder); + change->action = REORDER_BUFFER_CHANGE_DELETE; + change->origin_id = XLogRecGetOrigin(r); + + memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); + + /* old primary key stored */ + if (xlrec->flags & XLZ_DELETE_CONTAINS_OLD) + { + Relation relation = NULL; + ZHeapTuple zhtup; + HeapTuple htup; + char *tupledata; + Oid reloid; + Size datalen; + + tupledata = XLogRecGetBlockData(r, 0, &datalen); + + /* + * Get the zheap tuple from WAL, convert it to heap tuple and store the + * same as change stream. + */ + zhtup = DecodeXLogZTuple(tupledata, datalen); + reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode, + change->data.tp.relnode.relNode); + relation = RelationIdGetRelation(reloid); + htup = zheap_to_heap(zhtup, RelationGetDescr(relation)); + + change->data.tp.oldtuple = + ReorderBufferGetTupleBuf(ctx->reorder, + htup->t_len - SizeofHeapTupleHeader); + change->data.tp.oldtuple->tuple.t_len = htup->t_len; + change->data.tp.oldtuple->tuple.t_self = htup->t_self; + change->data.tp.oldtuple->tuple.t_tableOid = htup->t_tableOid; + memcpy((char *) change->data.tp.oldtuple->tuple.t_data, + (char *) htup->t_data, + htup->t_len); + + /* be tidy */ + pfree(zhtup); + pfree(htup); + + if (relation != NULL) + { + RelationClose(relation); + relation = NULL; + } + } + + change->data.tp.clear_toast_afterwards = true; + + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); +} + +/* + * Read a ZHeapTuple as WAL logged by zheap_insert, zheap_update and + * zheap_delete (but not by zheap_multi_insert) and for in-memory tuple. + * + * The size 'len' and the pointer 'data' in the record need to be + * computed outside as they are record specific. + * + * The caller is responsible to free the memory for tuple allocated by + * this function. + */ +static ZHeapTuple +DecodeXLogZTuple(char *data, Size len) +{ + ZHeapTuple zhtup; + xl_zheap_header xlhdr; + int datalen = len - SizeOfZHeapHeader; + int tuplelen = datalen + SizeofZHeapTupleHeader; + ZHeapTupleHeader header; + + Assert(datalen >= 0); + + zhtup = palloc(tuplelen + ZHEAPTUPLESIZE); + header = zhtup->t_data = (ZHeapTupleHeader)((char *) zhtup + ZHEAPTUPLESIZE); + + zhtup->t_len = tuplelen; + /* not a disk based tuple */ + ItemPointerSetInvalid(&zhtup->t_self); + + /* we can only figure this out after reassembling the transactions */ + zhtup->t_tableOid = InvalidOid; + + /* data is not stored aligned, copy to aligned storage */ + memcpy((char *) &xlhdr, data, SizeOfZHeapHeader); + + memset(header, 0, SizeofZHeapTupleHeader); + + memcpy(((char *) zhtup->t_data) + SizeofZHeapTupleHeader, + data + SizeOfZHeapHeader, + datalen); + + header->t_infomask = xlhdr.t_infomask; + header->t_infomask2 = xlhdr.t_infomask2; + header->t_hoff = xlhdr.t_hoff; + + return zhtup; +} diff --git a/src/include/access/zheapam_xlog.h b/src/include/access/zheapam_xlog.h index 6d031dcaa4..1654718263 100644 --- a/src/include/access/zheapam_xlog.h +++ b/src/include/access/zheapam_xlog.h @@ -124,6 +124,12 @@ typedef struct xl_zheap_insert #define XLZ_DELETE_CONTAINS_TPD_SLOT (1<<2) #define XLZ_DELETE_CONTAINS_SUBXACT (1<<3) #define XLZ_DELETE_IS_PARTITION_MOVE (1<<4) +#define XLZ_DELETE_CONTAINS_OLD_TUPLE (1<<5) +#define XLZ_DELETE_CONTAINS_OLD_KEY (1<<6) + +/* convenience macro for checking whether any form of old tuple was logged */ +#define XLZ_DELETE_CONTAINS_OLD \ + (XLZ_DELETE_CONTAINS_OLD_TUPLE | XLZ_DELETE_CONTAINS_OLD_KEY) /* This is what we need to know about delete */ typedef struct xl_zheap_delete