From ef19e9c6b41d43983c9844abc8efde001eb937ef Mon Sep 17 00:00:00 2001 From: Matthias van de Meent Date: Wed, 14 Sep 2022 14:29:55 +0200 Subject: [PATCH v2 2/2] Add 'remote' WAL level The 'remote' WAL level allows replicas that are caught up to the primary to re-use the snapshot of that primary while the primary also has that snapshot registered. Specifically useful for offloading read-only queries to read-only standby replicas that are not far behind on WAL, and to recover full snapshot consistency on a page from WAL only. To get this remote snapshot consistency, heap tuple updates that set the CID will log records which include the command id of the inserting statement. --- doc/src/sgml/config.sgml | 9 +- src/backend/access/heap/heapam.c | 177 ++++++++++++++++++++--- src/backend/access/rmgrdesc/xlogdesc.c | 1 + src/backend/replication/logical/decode.c | 44 ++++-- src/bin/pg_controldata/pg_controldata.c | 2 + src/include/access/heapam_xlog.h | 11 ++ src/include/access/xlog.h | 1 + 7 files changed, 218 insertions(+), 27 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 700914684d..eaec08a075 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -2796,9 +2796,16 @@ include_dir 'conf.d' minimal makes previous base backups unusable for point-in-time recovery and standby servers. + + In remote level, the same information is logged as + with replica level, plus some information that is + needed to utilize a transfered snapshot from the primary to a replica + node. Using a level of remote will increase the WAL + volume a bit, but by far not as much as logical. + In logical level, the same information is logged as - with replica, plus information needed to + with remote, plus information needed to extract logical change sets from the WAL. Using a level of logical will increase the WAL volume, particularly if many tables are configured for REPLICA IDENTITY FULL and diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index e65100d00f..5c634880cb 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -2150,6 +2150,14 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, } XLogBeginInsert(); + + /* include CommandId, if necessary */ + if (wal_level >= WAL_LEVEL_REMOTE) + { + rminfo |= XLOG_HEAP_WITH_CID; + XLogRegisterData((char *) &cid, sizeof(CommandId)); + } + XLogRegisterData((char *) &xlrec, SizeOfHeapInsert); xlhdr.t_infomask2 = heaptup->t_data->t_infomask2; @@ -2511,6 +2519,17 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, bufflags |= REGBUF_KEEP_DATA; XLogBeginInsert(); + + /* + * If we're doing physical replication for remote snapshot + * transfers, + */ + if (wal_level >= WAL_LEVEL_REMOTE) + { + XLogRegisterData((char *) &cid, sizeof(CommandId)); + rminfo |= XLOG_HEAP_WITH_CID; + } + XLogRegisterData((char *) xlrec, tupledata - scratch.data); XLogRegisterBuffer(0, buffer, REGBUF_STANDARD | bufflags); @@ -2960,6 +2979,7 @@ l1: xl_heap_delete xlrec; xl_heap_header xlhdr; XLogRecPtr recptr; + uint8 rminfo = XLOG_HEAP_DELETE; /* * For logical decode we need combo CIDs to properly decode the @@ -2987,6 +3007,14 @@ l1: } XLogBeginInsert(); + + /* include CommandId, if necessary */ + if (wal_level >= WAL_LEVEL_REMOTE) + { + rminfo |= XLOG_HEAP_WITH_CID; + XLogRegisterData((char *) &cid, sizeof(CommandId)); + } + XLogRegisterData((char *) &xlrec, SizeOfHeapDelete); XLogRegisterBuffer(0, buffer, REGBUF_STANDARD); @@ -3010,7 +3038,7 @@ l1: /* filtering by origin on a row level is much more efficient */ XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); - recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE); + recptr = XLogInsert(RM_HEAP_ID, rminfo); PageSetLSN(page, recptr); } @@ -3695,8 +3723,17 @@ l2: { xl_heap_lock xlrec; XLogRecPtr recptr; + uint8 rminfo = XLOG_HEAP_LOCK; XLogBeginInsert(); + + /* include CommandId, if necessary */ + if (wal_level >= WAL_LEVEL_REMOTE) + { + rminfo |= XLOG_HEAP_WITH_CID; + XLogRegisterData((char *) &cid, sizeof(CommandId)); + } + XLogRegisterBuffer(0, buffer, REGBUF_STANDARD); xlrec.offnum = ItemPointerGetOffsetNumber(&oldtup.t_self); @@ -3706,7 +3743,7 @@ l2: xlrec.flags = cleared_all_frozen ? XLH_LOCK_ALL_FROZEN_CLEARED : 0; XLogRegisterData((char *) &xlrec, SizeOfHeapLock); - recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_LOCK); + recptr = XLogInsert(RM_HEAP_ID, rminfo); PageSetLSN(page, recptr); } @@ -4883,8 +4920,17 @@ failed: { xl_heap_lock xlrec; XLogRecPtr recptr; + uint8 rminfo = XLOG_HEAP_LOCK; XLogBeginInsert(); + + /* include CommandId, if necessary */ + if (wal_level >= WAL_LEVEL_REMOTE) + { + rminfo |= XLOG_HEAP_WITH_CID; + XLogRegisterData((char *) &cid, sizeof(CommandId)); + } + XLogRegisterBuffer(0, *buffer, REGBUF_STANDARD); xlrec.offnum = ItemPointerGetOffsetNumber(&tuple->t_self); @@ -4896,7 +4942,7 @@ failed: /* we don't decode row locks atm, so no need to log the origin */ - recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_LOCK); + recptr = XLogInsert(RM_HEAP_ID, rminfo); PageSetLSN(page, recptr); } @@ -5935,6 +5981,8 @@ heap_abort_speculative(Relation relation, ItemPointer tid) { xl_heap_delete xlrec; XLogRecPtr recptr; + uint8 rminfo = XLOG_HEAP_DELETE; + CommandId cid; xlrec.flags = XLH_DELETE_IS_SUPER; xlrec.infobits_set = compute_infobits(tp.t_data->t_infomask, @@ -5943,12 +5991,21 @@ heap_abort_speculative(Relation relation, ItemPointer tid) xlrec.xmax = xid; XLogBeginInsert(); + + /* include CommandId, if necessary */ + if (wal_level >= WAL_LEVEL_REMOTE) + { + rminfo |= XLOG_HEAP_WITH_CID; + cid = HeapTupleHeaderGetCmax(tp.t_data); + XLogRegisterData((char *) &cid, sizeof(CommandId)); + } + XLogRegisterData((char *) &xlrec, SizeOfHeapDelete); XLogRegisterBuffer(0, buffer, REGBUF_STANDARD); /* No replica identity & replication origin logged */ - recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE); + recptr = XLogInsert(RM_HEAP_ID, rminfo); PageSetLSN(page, recptr); } @@ -8249,6 +8306,14 @@ log_heap_update(Relation reln, Buffer oldbuf, else rminfo = XLOG_HEAP_UPDATE; + /* include CommandId, if necessary */ + if (wal_level >= WAL_LEVEL_REMOTE) + { + rminfo |= XLOG_HEAP_WITH_CID; + XLogRegisterData((char *) &HeapTupleHeaderGetRawCommandId(newtup->t_data), + sizeof(CommandId)); + } + /* * If the old and new tuple are on the same page, we only need to log the * parts of the new tuple that were changed. That saves on the amount of @@ -9001,7 +9066,7 @@ static void heap_xlog_delete(XLogReaderState *record) { XLogRecPtr lsn = record->EndRecPtr; - xl_heap_delete *xlrec = (xl_heap_delete *) XLogRecGetData(record); + xl_heap_delete *xlrec; Buffer buffer; Page page; ItemId lp = NULL; @@ -9009,6 +9074,22 @@ heap_xlog_delete(XLogReaderState *record) BlockNumber blkno; RelFileLocator target_locator; ItemPointerData target_tid; + CommandId cid; + + /* + * If CommandId was included, use that to set CIDs. + * Otherwise, default to FirstCommandId + */ + if (XLogRecGetRmInfo(record) & XLOG_HEAP_WITH_CID) + { + cid = *((CommandId *) XLogRecGetData(record)); + xlrec = (xl_heap_delete *) (XLogRecGetData(record) + sizeof(CommandId)); + } + else + { + cid = FirstCommandId; + xlrec = (xl_heap_delete *) XLogRecGetData(record); + } XLogRecGetBlockTag(record, 0, &target_locator, NULL, &blkno); ItemPointerSetBlockNumber(&target_tid, blkno); @@ -9050,7 +9131,7 @@ heap_xlog_delete(XLogReaderState *record) HeapTupleHeaderSetXmax(htup, xlrec->xmax); else HeapTupleHeaderSetXmin(htup, InvalidTransactionId); - HeapTupleHeaderSetCmax(htup, FirstCommandId, false); + HeapTupleHeaderSetCmax(htup, cid, false); /* Mark the page as a candidate for pruning */ PageSetPrunable(page, XLogRecGetXid(record)); @@ -9074,7 +9155,7 @@ static void heap_xlog_insert(XLogReaderState *record) { XLogRecPtr lsn = record->EndRecPtr; - xl_heap_insert *xlrec = (xl_heap_insert *) XLogRecGetData(record); + xl_heap_insert *xlrec; Buffer buffer; Page page; union @@ -9090,6 +9171,22 @@ heap_xlog_insert(XLogReaderState *record) BlockNumber blkno; ItemPointerData target_tid; XLogRedoAction action; + CommandId cid; + + /* + * If CommandId was included, use that to set CIDs. + * Otherwise, default to FirstCommandId + */ + if (XLogRecGetRmInfo(record) & XLOG_HEAP_WITH_CID) + { + cid = *((CommandId *) XLogRecGetData(record)); + xlrec = (xl_heap_insert *) (XLogRecGetData(record) + sizeof(CommandId)); + } + else + { + cid = FirstCommandId; + xlrec = (xl_heap_insert *) XLogRecGetData(record); + } XLogRecGetBlockTag(record, 0, &target_locator, NULL, &blkno); ItemPointerSetBlockNumber(&target_tid, blkno); @@ -9151,7 +9248,7 @@ heap_xlog_insert(XLogReaderState *record) htup->t_infomask = xlhdr.t_infomask; htup->t_hoff = xlhdr.t_hoff; HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record)); - HeapTupleHeaderSetCmin(htup, FirstCommandId); + HeapTupleHeaderSetCmin(htup, cid); htup->t_ctid = target_tid; if (PageAddItem(page, (Item) htup, newlen, xlrec->offnum, @@ -9210,12 +9307,26 @@ heap_xlog_multi_insert(XLogReaderState *record) int i; bool isinit = (XLogRecGetRmInfo(record) & XLOG_HEAP_INIT_PAGE) != 0; XLogRedoAction action; + CommandId cid; /* - * Insertion doesn't overwrite MVCC data, so no conflict processing is - * required. + * If CommandId was included, use that to set CIDs. + * Otherwise, default to FirstCommandId */ - xlrec = (xl_heap_multi_insert *) XLogRecGetData(record); + if (XLogRecGetRmInfo(record) & XLOG_HEAP_WITH_CID) + { + cid = *((CommandId *) XLogRecGetData(record)); + xlrec = (xl_heap_multi_insert *) (XLogRecGetData(record) + sizeof(CommandId)); + } + else + { + cid = FirstCommandId; + /* + * Insertion doesn't overwrite MVCC data, so no conflict processing is + * required. + */ + xlrec = (xl_heap_multi_insert *) XLogRecGetData(record); + } XLogRecGetBlockTag(record, 0, &rlocator, NULL, &blkno); @@ -9294,7 +9405,7 @@ heap_xlog_multi_insert(XLogReaderState *record) htup->t_infomask = xlhdr->t_infomask; htup->t_hoff = xlhdr->t_hoff; HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record)); - HeapTupleHeaderSetCmin(htup, FirstCommandId); + HeapTupleHeaderSetCmin(htup, cid); ItemPointerSetBlockNumber(&htup->t_ctid, blkno); ItemPointerSetOffsetNumber(&htup->t_ctid, offnum); @@ -9341,7 +9452,7 @@ static void heap_xlog_update(XLogReaderState *record, bool hot_update) { XLogRecPtr lsn = record->EndRecPtr; - xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record); + xl_heap_update *xlrec; RelFileLocator rlocator; BlockNumber oldblk; BlockNumber newblk; @@ -9366,6 +9477,22 @@ heap_xlog_update(XLogReaderState *record, bool hot_update) Size freespace = 0; XLogRedoAction oldaction; XLogRedoAction newaction; + CommandId cid; + + /* + * If CommandId was included, use that to set CIDs. + * Otherwise, default to FirstCommandId + */ + if (XLogRecGetRmInfo(record) & XLOG_HEAP_WITH_CID) + { + cid = *((CommandId *) XLogRecGetData(record)); + xlrec = (xl_heap_update *) (XLogRecGetData(record) + sizeof(CommandId)); + } + else + { + cid = FirstCommandId; + xlrec = (xl_heap_update *) XLogRecGetData(record); + } /* initialize to keep the compiler quiet */ oldtup.t_data = NULL; @@ -9434,7 +9561,7 @@ heap_xlog_update(XLogReaderState *record, bool hot_update) fix_infomask_from_infobits(xlrec->old_infobits_set, &htup->t_infomask, &htup->t_infomask2); HeapTupleHeaderSetXmax(htup, xlrec->old_xmax); - HeapTupleHeaderSetCmax(htup, FirstCommandId, false); + HeapTupleHeaderSetCmax(htup, cid, false); /* Set forward chain link in t_ctid */ htup->t_ctid = newtid; @@ -9567,7 +9694,7 @@ heap_xlog_update(XLogReaderState *record, bool hot_update) htup->t_hoff = xlhdr.t_hoff; HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record)); - HeapTupleHeaderSetCmin(htup, FirstCommandId); + HeapTupleHeaderSetCmin(htup, cid); HeapTupleHeaderSetXmax(htup, xlrec->new_xmax); /* Make sure there is no forward chain link in t_ctid */ htup->t_ctid = newtid; @@ -9649,12 +9776,28 @@ static void heap_xlog_lock(XLogReaderState *record) { XLogRecPtr lsn = record->EndRecPtr; - xl_heap_lock *xlrec = (xl_heap_lock *) XLogRecGetData(record); + xl_heap_lock *xlrec; Buffer buffer; Page page; OffsetNumber offnum; ItemId lp = NULL; HeapTupleHeader htup; + CommandId cid; + + /* + * If CommandId was included, use that to set CIDs. + * Otherwise, default to FirstCommandId + */ + if (XLogRecGetRmInfo(record) & XLOG_HEAP_WITH_CID) + { + cid = *((CommandId *) XLogRecGetData(record)); + xlrec = (xl_heap_lock *) (XLogRecGetData(record) + sizeof(CommandId)); + } + else + { + cid = FirstCommandId; + xlrec = (xl_heap_lock *) XLogRecGetData(record); + } /* * The visibility map may need to be fixed even if the heap page is @@ -9708,7 +9851,7 @@ heap_xlog_lock(XLogReaderState *record) offnum); } HeapTupleHeaderSetXmax(htup, xlrec->locking_xid); - HeapTupleHeaderSetCmax(htup, FirstCommandId, false); + HeapTupleHeaderSetCmax(htup, cid, false); PageSetLSN(page, lsn); MarkBufferDirty(buffer); } diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c index 65ac642908..0c7168a52c 100644 --- a/src/backend/access/rmgrdesc/xlogdesc.c +++ b/src/backend/access/rmgrdesc/xlogdesc.c @@ -29,6 +29,7 @@ const struct config_enum_entry wal_level_options[] = { {"replica", WAL_LEVEL_REPLICA, false}, {"archive", WAL_LEVEL_REPLICA, true}, /* deprecated */ {"hot_standby", WAL_LEVEL_REPLICA, true}, /* deprecated */ + {"remote", WAL_LEVEL_REMOTE, false}, {"logical", WAL_LEVEL_LOGICAL, false}, {NULL, 0, false} }; diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index add4d7c7dd..cf834c1329 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -847,8 +847,14 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) xl_heap_insert *xlrec; ReorderBufferChange *change; RelFileLocator target_locator; + Size cidsize; - xlrec = (xl_heap_insert *) XLogRecGetData(r); + if (XLogRecGetRmInfo(r) & XLOG_HEAP_WITH_CID) + cidsize = sizeof(CommandId); + else + cidsize = 0; + + xlrec = (xl_heap_insert *) (XLogRecGetData(r) + cidsize); /* * Ignore insert records without new tuples (this does happen when @@ -904,8 +910,14 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ReorderBufferChange *change; char *data; RelFileLocator target_locator; + Size cidsize; - xlrec = (xl_heap_update *) XLogRecGetData(r); + if (XLogRecGetRmInfo(r) & XLOG_HEAP_WITH_CID) + cidsize = sizeof(CommandId); + else + cidsize = 0; + + xlrec = (xl_heap_update *) (XLogRecGetData(r) + cidsize); /* only interested in our database */ XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL); @@ -942,8 +954,8 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) Size tuplelen; /* caution, remaining data in record is not aligned */ - data = XLogRecGetData(r) + SizeOfHeapUpdate; - datalen = XLogRecGetDataLen(r) - SizeOfHeapUpdate; + data = XLogRecGetData(r) + SizeOfHeapUpdate + cidsize; + datalen = XLogRecGetDataLen(r) - SizeOfHeapUpdate - cidsize; tuplelen = datalen - SizeOfHeapHeader; change->data.tp.oldtuple = @@ -970,8 +982,15 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) xl_heap_delete *xlrec; ReorderBufferChange *change; RelFileLocator target_locator; + Size cidsize; + + if (XLogRecGetRmInfo(r) & XLOG_HEAP_WITH_CID) + cidsize = sizeof(CommandId); + else + cidsize = 0; + + xlrec = (xl_heap_delete *) (XLogRecGetData(r) + cidsize); - xlrec = (xl_heap_delete *) XLogRecGetData(r); /* only interested in our database */ XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL); @@ -996,15 +1015,16 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* old primary key stored */ if (xlrec->flags & XLH_DELETE_CONTAINS_OLD) { - Size datalen = XLogRecGetDataLen(r) - SizeOfHeapDelete; + Size datalen = XLogRecGetDataLen(r) - SizeOfHeapDelete - cidsize; Size tuplelen = datalen - SizeOfHeapHeader; - Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader)); + Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader + + cidsize)); change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder, tuplelen); - DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete, + DecodeXLogTuple(XLogRecGetData(r) + SizeOfHeapDelete + cidsize, datalen, change->data.tp.oldtuple); } @@ -1065,8 +1085,14 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) char *tupledata; Size tuplelen; RelFileLocator rlocator; + Size cidsize; + + if (XLogRecGetRmInfo(r) & XLOG_HEAP_WITH_CID) + cidsize = sizeof(CommandId); + else + cidsize = 0; - xlrec = (xl_heap_multi_insert *) XLogRecGetData(r); + xlrec = (xl_heap_multi_insert *) (XLogRecGetData(r) + cidsize); /* * Ignore insert records without new tuples. This happens when a diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c index c390ec51ce..13de8eb393 100644 --- a/src/bin/pg_controldata/pg_controldata.c +++ b/src/bin/pg_controldata/pg_controldata.c @@ -78,6 +78,8 @@ wal_level_str(WalLevel wal_level) return "minimal"; case WAL_LEVEL_REPLICA: return "replica"; + case WAL_LEVEL_REMOTE: + return "remote"; case WAL_LEVEL_LOGICAL: return "logical"; } diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h index 306cc568eb..a8e9778e59 100644 --- a/src/include/access/heapam_xlog.h +++ b/src/include/access/heapam_xlog.h @@ -44,6 +44,17 @@ * or MULTI_INSERT, we can (and we do) restore entire page in redo */ #define XLOG_HEAP_INIT_PAGE 0x80 +/* + * When we INSERT/DELETE/UPDATE/LOCK tuples, we might need the CID of that + * operation. The value is included in the front of the main xlog data + * if this bit is set. + * Valid for HEAP_INSERT, HEAP_DELETE, HEAP_UPDATE, HEAP_HOT_UPDATE, + * and HEAP2_MULTI_INSERT + * + * Usually only emitted when wal_level >= WAL_LEVEL_REMOTE + */ +#define XLOG_HEAP_WITH_CID 0x01 + /* * We ran out of opcodes, so heapam.c now has a second RmgrId. These opcodes * are associated with RM_HEAP2_ID, but are not logically different from diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index ba6672dd35..1b141cfd47 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -68,6 +68,7 @@ typedef enum WalLevel { WAL_LEVEL_MINIMAL = 0, WAL_LEVEL_REPLICA, + WAL_LEVEL_REMOTE, WAL_LEVEL_LOGICAL } WalLevel; -- 2.30.2