From 3438f926d607738f4a8a941fa594e214b3b9b764 Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Thu, 18 Apr 2019 16:09:25 +0530 Subject: [PATCH 3/3] Provide interfaces to store and fetch undo records. Add the capability to form undo records and store them in undo logs. We also provide the capability to fetch the undo records. This layer will use undo-log-storage to reserve the space for the undo records and buffer management routines to write and read the undo records. Undo records are stored in sequential order in the undo log. Each undo record consists of a variable length header, tuple data, and payload information. The undo records are stored without any sort of alignment padding and a undo record can span across multiple pages. The undo records for a transaction can span across multiple undo logs. Author: Dilip Kumar with contributions from Robert Haas, Amit Kapila, Thomas Munro and Rafia Sabih Reviewed-by: Earlier version of this patch is reviewed by Amit Kapila Tested-by: Neha Sharma Discussion: https://www.postgresql.org/message-id/CAFiTN-uVxxopn0UZ64%3DF-sydbETBbGjWapnBikNo1%3DXv78UeFw%40mail.gmail.com --- src/backend/access/transam/xact.c | 4 +- src/backend/access/undo/Makefile | 2 +- src/backend/access/undo/undoinsert.c | 1523 ++++++++++++++++++++++++++++++++++ src/backend/access/undo/undorecord.c | 677 +++++++++++++++ src/include/access/transam.h | 1 + src/include/access/undoinsert.h | 64 ++ src/include/access/undorecord.h | 276 ++++++ src/include/access/xact.h | 1 + 8 files changed, 2546 insertions(+), 2 deletions(-) create mode 100644 src/backend/access/undo/undoinsert.c create mode 100644 src/backend/access/undo/undorecord.c create mode 100644 src/include/access/undoinsert.h create mode 100644 src/include/access/undorecord.h diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index bd5024e..47a8f0d 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -30,6 +30,7 @@ #include "access/xlog.h" #include "access/xloginsert.h" #include "access/xlogutils.h" +#include "access/undoinsert.h" #include "catalog/namespace.h" #include "catalog/pg_enum.h" #include "catalog/storage.h" @@ -68,7 +69,6 @@ #include "utils/timestamp.h" #include "pg_trace.h" - /* * User-tweakable parameters */ @@ -2736,6 +2736,7 @@ AbortTransaction(void) AtEOXact_HashTables(false); AtEOXact_PgStat(false); AtEOXact_ApplyLauncher(false); + AtAbort_ResetUndoBuffers(); pgstat_report_xact_timestamp(0); } @@ -4993,6 +4994,7 @@ AbortSubTransaction(void) AtEOSubXact_PgStat(false, s->nestingLevel); AtSubAbort_Snapshot(s->nestingLevel); AtEOSubXact_ApplyLauncher(false, s->nestingLevel); + AtAbort_ResetUndoBuffers(); } /* diff --git a/src/backend/access/undo/Makefile b/src/backend/access/undo/Makefile index 219c696..f41e8f7 100644 --- a/src/backend/access/undo/Makefile +++ b/src/backend/access/undo/Makefile @@ -12,6 +12,6 @@ subdir = src/backend/access/undo top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global -OBJS = undolog.o +OBJS = undoinsert.o undolog.o undorecord.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/undo/undoinsert.c b/src/backend/access/undo/undoinsert.c new file mode 100644 index 0000000..5161ca2 --- /dev/null +++ b/src/backend/access/undo/undoinsert.c @@ -0,0 +1,1523 @@ +/*------------------------------------------------------------------------- + * + * undoinsert.c + * entry points for inserting undo records + * + * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/backend/access/undo/undoinsert.c + * + * NOTES: + * Undo record layout: + * + * Undo records are stored in sequential order in the undo log. Each undo + * record consists of a variable length header, tuple data, and payload + * information. The first undo record of each transaction contains a + * transaction header that points to the next transaction's start header. + * This allows us to discard the entire transaction's log at one-shot rather + * than record-by-record. The callers are not aware of transaction header, + * this is entirely maintained and used by undo record layer. See + * undorecord.h for detailed information about undo record header. + * + * Multiple logs: + * + * It is possible that the undo records for a transaction spans multiple undo + * logs. We need some special handling while inserting them to ensure that + * discard and rollbacks can work sanely. + * + * When the undo record for a transaction gets inserted in the next log then we + * add a transaction header for the first record of the transaction in the new + * log and connect this undo record to the first record of the transaction in + * the previous log by updating the "uur_next" field. + * + * We will also keep a previous undo record pointer to the last undo record of + * the transaction in the previous log, so that we can find the previous undo + * record pointer during rollback. + * ------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/subtrans.h" +#include "access/transam.h" +#include "access/undorecord.h" +#include "access/undoinsert.h" +#include "access/undolog_xlog.h" +#include "access/xact.h" +#include "access/xlog.h" +#include "access/xlogutils.h" +#include "catalog/pg_tablespace.h" +#include "commands/tablecmds.h" +#include "storage/block.h" +#include "storage/buf.h" +#include "storage/buf_internals.h" +#include "storage/bufmgr.h" +#include "miscadmin.h" + +/* + * XXX Do we want to support undo tuple size which is more than the BLCKSZ + * if not than undo record can spread across 2 buffers at the max. + */ +#define MAX_BUFFER_PER_UNDO 2 + +/* + * This defines the number of undo records that can be prepared before + * calling insert by default. If you need to prepare more than + * MAX_PREPARED_UNDO undo records, then you must call UndoSetPrepareSize + * first. + */ +#define MAX_PREPARED_UNDO 2 + +/* + * This defines the max number of previous xact infos we need to update. + * Usually it's 1 for updating next link of previous transaction's header + * if we are starting a new transaction. But, in some cases where the same + * transaction is spilled to the next log, we update our own transaction's + * header in previous undo log as well as the header of the previous + * transaction in the new log. + */ +#define MAX_XACT_UNDO_INFO 2 + +/* + * Consider buffers needed for updating previous transaction's + * starting undo record as well. + */ +#define MAX_UNDO_BUFFERS (MAX_PREPARED_UNDO + MAX_XACT_UNDO_INFO) * MAX_BUFFER_PER_UNDO + +/* Undo block number to buffer mapping. */ +typedef struct UndoBuffers +{ + UndoLogNumber logno; /* Undo log number */ + BlockNumber blk; /* block number */ + Buffer buf; /* buffer allocated for the block */ + bool zero; /* new block full of zeroes */ +} UndoBuffers; + +/* + * This array holds undo buffers required for preparing undo records during + * prepare undo time these buffers will be pinned and locked and insert will + * insert the actual undo record inside the critical section. + */ +static UndoBuffers def_buffers[MAX_UNDO_BUFFERS]; + +/* + * Structure to hold the prepared undo information. + */ +typedef struct PreparedUndoSpace +{ + UndoRecPtr urp; /* undo record pointer */ + UnpackedUndoRecord *urec; /* undo record */ + uint16 size; /* undo record size */ + int undo_buffer_idx[MAX_BUFFER_PER_UNDO]; /* undo_buffer array + * index */ +} PreparedUndoSpace; + +static PreparedUndoSpace def_prepared[MAX_PREPARED_UNDO]; +static int max_prepared_undo = MAX_PREPARED_UNDO; + +/* + * By default prepared_undo and undo_buffer points to the static memory. + * In case caller wants to support more than default max_prepared undo records + * then the limit can be increased by calling UndoSetPrepareSize function. + * Therein, dynamic memory will be allocated and prepared_undo and undo_buffer + * will start pointing to newly allocated memory, which will be released by + * UnlockReleaseUndoBuffers and these variables will again set back to their + * default values. + */ +static PreparedUndoSpace *prepared_undo = def_prepared; +static UndoBuffers *undo_buffer = def_buffers; + +/* Index into undo_buffer. */ +static int buffer_idx; + +/* Index into prepared_undo. */ +static int prepare_idx; + +/* + * Structure to hold the previous transaction's undo update information. This + * is populated while current transaction is updating its undo record pointer + * in previous transactions first undo record. + */ +typedef struct XactUndoRecordInfo +{ + UndoRecPtr urecptr; /* txn's start urecptr */ + int idx_undo_buffers[MAX_BUFFER_PER_UNDO]; + UnpackedUndoRecord uur; /* undo record header */ +} XactUndoRecordInfo; + +static XactUndoRecordInfo xact_urec_info[MAX_XACT_UNDO_INFO]; +static int xact_urec_info_idx; + +/* Prototypes for static functions. */ +static UnpackedUndoRecord *UndoGetOneRecord(UnpackedUndoRecord *urec, + UndoRecPtr urp, RelFileNode rnode, + UndoPersistence persistence, + Buffer *prevbuf); +static void UndoRecordPrepareTransInfo(UndoRecPtr urecptr, + UndoRecPtr xact_urp, + XLogReaderState *xlog_record); +static void UndoRecordUpdateTransInfo(int idx); +static int UndoGetBufferSlot(RelFileNode rnode, BlockNumber blk, + ReadBufferMode rbm, + UndoPersistence persistence, XLogReaderState *xlog_record); +static bool UndoRecordIsValid(UndoLogControl *log, + UndoRecPtr urp); +static uint16 UndoGetPrevRecordLen(UndoRecPtr urp, Buffer input_buffer, + UndoPersistence upersistence); + +/* + * Check whether the undo record is discarded or not. If it's already discarded + * return false otherwise return true. + * + * Caller must hold lock on log->discard_lock. + */ +static bool +UndoRecordIsValid(UndoLogControl *log, UndoRecPtr urp) +{ + Assert(LWLockHeldByMeInMode(&log->discard_lock, LW_SHARED)); + + /* + * oldest_data is only initialized when the DiscardWorker first time + * attempts to discard undo logs so we can not rely on this value to + * identify whether the undo record pointer is already discarded or not so + * we can check it by calling undo log routine. + */ + if (log->oldest_data == InvalidUndoRecPtr) + { + if (UndoLogIsDiscarded(urp)) + return false; + } + else if (urp < log->oldest_data) + return false; + + return true; +} + +/* + * Prepare to update the previous transaction's next undo pointer to maintain + * the transaction chain in the undo. This will read the header of the first + * undo record of the previous transaction and lock the necessary buffers. + * The actual update will be done by UndoRecordUpdateTransInfo under the + * critical section. + */ +static void +UndoRecordPrepareTransInfo(UndoRecPtr urecptr, UndoRecPtr xact_urp, + XLogReaderState *xlog_record) +{ + Buffer buffer = InvalidBuffer; + BlockNumber cur_blk; + RelFileNode rnode; + UndoLogControl *log; + UnpackUndoContext ucontext = {{0}}; + Page page; + int starting_byte; + int bufidx; + int index = 0; + + /* + * The absence of previous transaction's undo indicate that this backend + * is preparing its first undo in which case we have nothing to update. + */ + if (!UndoRecPtrIsValid(xact_urp)) + return; + + log = UndoLogGet(UndoRecPtrGetLogNo(xact_urp), false); + + /* + * Temporary undo logs are discarded on transaction commit so we don't + * need to do anything. + */ + if (log->meta.persistence == UNDO_TEMP) + return; + + /* + * Acquire the discard lock before accessing the undo record so that + * discard worker doesn't remove the record while we are in process of + * reading it. + */ + LWLockAcquire(&log->discard_lock, LW_SHARED); + + /* + * The absence of previous transaction's undo indicate that this backend + * is preparing its first undo in which case we have nothing to update. + * UndoRecordIsValid will release the lock if it returns false. + */ + if (!UndoRecordIsValid(log, xact_urp)) + { + LWLockRelease(&log->discard_lock); + return; + } + + UndoRecPtrAssignRelFileNode(rnode, xact_urp); + cur_blk = UndoRecPtrGetBlockNum(xact_urp); + starting_byte = UndoRecPtrGetPageOffset(xact_urp); + + /* Initiate reading the undo record. */ + BeginUnpackUndo(&ucontext); + while (true) + { + bufidx = UndoGetBufferSlot(rnode, cur_blk, + RBM_NORMAL, + log->meta.persistence, xlog_record); + xact_urec_info[xact_urec_info_idx].idx_undo_buffers[index++] = bufidx; + buffer = undo_buffer[bufidx].buf; + page = BufferGetPage(buffer); + + /* Do actual decoding. */ + UnpackUndoData(&ucontext, page, starting_byte); + + /* We just want to fetch upto transaction header so stop after that. */ + if (ucontext.stage > UNDO_DECODE_STAGE_TRANSACTION) + break; + + /* Could not fetch the complete header so go to the next block. */ + starting_byte = UndoLogBlockHeaderSize; + cur_blk++; + } + + FinishUnpackUndo(&ucontext, &xact_urec_info[xact_urec_info_idx].uur); + + xact_urec_info[xact_urec_info_idx].uur.uur_next = urecptr; + xact_urec_info[xact_urec_info_idx].urecptr = xact_urp; + xact_urec_info_idx++; + + LWLockRelease(&log->discard_lock); +} + + +/* + * Overwrite the first undo record of the previous transaction to update its + * next pointer. This will just insert the already prepared record by + * UndoRecordPrepareTransInfo. This must be called under the critical section. + * This will just overwrite the undo header not the data. + */ +static void +UndoRecordUpdateTransInfo(int idx) +{ + Page page = NULL; + int starting_byte; + int i = 0; + UndoRecPtr urec_ptr = InvalidUndoRecPtr; + InsertUndoContext ucontext = {{0}}; + + urec_ptr = xact_urec_info[idx].urecptr; + + /* + * Update the next transactions start urecptr in the transaction header. + */ + starting_byte = UndoRecPtrGetPageOffset(urec_ptr); + + /* Initiate inserting the undo record. */ + BeginInsertUndo(&ucontext, &xact_urec_info[idx].uur); + + /* Main loop for writing the undo record. */ + do + { + Buffer buffer; + int buf_idx; + + buf_idx = xact_urec_info[idx].idx_undo_buffers[i]; + buffer = undo_buffer[buf_idx].buf; + + /* + * During recovery, there might be some blocks which are already + * removed by discard process, so we can just skip inserting into + * those blocks. + */ + if (!BufferIsValid(buffer)) + { + Assert(InRecovery); + + /* + * Skip actual writing just update the context so that we have + * write offset for inserting into next blocks. + */ + SkipInsertingUndoData(&ucontext, starting_byte); + if (ucontext.stage > UNDO_INSERT_STAGE_TRANSACTION) + break; + } + else + { + page = BufferGetPage(buffer); + + /* Overwrite the previously written undo record. */ + InsertUndoData(&ucontext, page, starting_byte); + if (ucontext.stage > UNDO_INSERT_STAGE_TRANSACTION) + { + MarkBufferDirty(buffer); + break; + } + MarkBufferDirty(buffer); + } + + starting_byte = UndoLogBlockHeaderSize; + i++; + + Assert(idx < MAX_BUFFER_PER_UNDO); + } while (true); +} + +/* + * Find the block number in undo buffer array, if it's present then just return + * its index otherwise search the buffer and insert an entry and lock the buffer + * in exclusive mode. + * + * Undo log insertions are append-only. If the caller is writing new data + * that begins exactly at the beginning of a page, then there cannot be any + * useful data after that point. In that case RBM_ZERO can be passed in as + * rbm so that we can skip a useless read of a disk block. In all other + * cases, RBM_NORMAL should be passed in, to read the page in if it doesn't + * happen to be already in the buffer pool. + */ +static int +UndoGetBufferSlot(RelFileNode rnode, + BlockNumber blk, + ReadBufferMode rbm, + UndoPersistence persistence, + XLogReaderState *xlog_record) +{ + int i; + Buffer buffer; + XLogRedoAction action = BLK_NEEDS_REDO; + + /* Don't do anything, if we already have a buffer pinned for the block. */ + for (i = 0; i < buffer_idx; i++) + { + /* + * It's not enough to just compare the block number because the + * undo_buffer might holds the undo from different undo logs (e.g when + * previous transaction start header is in previous undo log) so + * compare (logno + blkno). + */ + if ((blk == undo_buffer[i].blk) && + (undo_buffer[i].logno == rnode.relNode)) + { + /* caller must hold exclusive lock on buffer */ + Assert(BufferIsLocal(undo_buffer[i].buf) || + LWLockHeldByMeInMode(BufferDescriptorGetContentLock( + GetBufferDescriptor(undo_buffer[i].buf - 1)), + LW_EXCLUSIVE)); + break; + } + } + + /* + * We did not find the block so allocate the buffer and insert into the + * undo buffer array + */ + if (i == buffer_idx) + { + /* + * Fetch the buffer in which we want to insert the undo record. + */ + if (InRecovery) + action = XLogReadBufferForRedoBlock(xlog_record, + SMGR_UNDO, + rnode, + UndoLogForkNum, + blk, + rbm, + false, + &buffer); + else + { + buffer = ReadBufferWithoutRelcache(SMGR_UNDO, + rnode, + UndoLogForkNum, + blk, + rbm, + NULL, + RelPersistenceForUndoPersistence(persistence)); + + /* Lock the buffer */ + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + } + + if (action == BLK_NOTFOUND) + { + undo_buffer[buffer_idx].buf = InvalidBuffer; + undo_buffer[buffer_idx].blk = InvalidBlockNumber; + } + else + { + undo_buffer[buffer_idx].buf = buffer; + undo_buffer[buffer_idx].blk = blk; + undo_buffer[buffer_idx].logno = rnode.relNode; + undo_buffer[buffer_idx].zero = rbm == RBM_ZERO; + } + buffer_idx++; + } + + return i; +} + +/* + * Call UndoSetPrepareSize to set the value of how many undo records can be + * prepared before we can insert them. If the size is greater than + * MAX_PREPARED_UNDO then it will allocate extra memory to hold the extra + * prepared undo. + * + * This is normally used when more than one undo record needs to be prepared. + */ +void +UndoSetPrepareSize(int nrecords) +{ + if (nrecords <= MAX_PREPARED_UNDO) + return; + + prepared_undo = palloc0(nrecords * sizeof(PreparedUndoSpace)); + + /* + * Consider buffers needed for updating previous transaction's starting + * undo record. Hence increased by 1. + */ + undo_buffer = palloc0((nrecords + 1) * MAX_BUFFER_PER_UNDO * + sizeof(UndoBuffers)); + max_prepared_undo = nrecords; +} + +/* + * Call PrepareUndoInsert to tell the undo subsystem about the undo record you + * intended to insert. Upon return, the necessary undo buffers are pinned and + * locked. + * + * This should be done before any critical section is established, since it + * can fail. + * + * In recovery, 'fxid' refers to the full transaction id stored in WAL, + * otherwise, it refers to the top full transaction id. + */ +UndoRecPtr +PrepareUndoInsert(UnpackedUndoRecord *urec, FullTransactionId fxid, + UndoPersistence upersistence, + XLogReaderState *xlog_record) +{ + UndoRecordSize size; + UndoRecPtr urecptr; + RelFileNode rnode; + UndoRecordSize cur_size = 0; + BlockNumber cur_blk; + FullTransactionId txid; + int starting_byte; + int index = 0; + int bufidx; + ReadBufferMode rbm; + bool need_xact_header; + UndoRecPtr try_location; + UndoRecPtr last_xact_start; + UndoRecPtr prevlog_xact_start = InvalidUndoRecPtr; + UndoRecPtr prevlog_insert_urp = InvalidUndoRecPtr; + UndoRecPtr prevlogurp = InvalidUndoRecPtr; + + /* Already reached maximum prepared limit. */ + if (prepare_idx == max_prepared_undo) + elog(ERROR, "already reached the maximum prepared limit"); + + if (!FullTransactionIdIsValid(fxid)) + { + /* During recovery, we must have a valid transaction id. */ + Assert(!InRecovery); + txid = GetTopFullTransactionId(); + } + else + { + /* + * Assign the top transaction id because undo log only stores mapping + * for the top most transactions. + */ + Assert(InRecovery || + FullTransactionIdEquals(fxid, GetTopFullTransactionId())); + txid = fxid; + } + + /* + * We don't yet know if this record needs a transaction header (ie is the + * first undo record for a given transaction in a given undo log), because + * you can only find out by allocating. We'll resolve this circularity by + * allocating enough space for a transaction header. We'll only advance + * by as many bytes as we turn out to need. + */ + urec->uur_next = InvalidUndoRecPtr; + UndoRecordSetInfo(urec); + urec->uur_info |= UREC_INFO_TRANSACTION; + size = UndoRecordExpectedSize(urec); + + /* + * Since we don't actually advance the insert pointer until later in + * InsertPreparedUndo(), but we may need to allocate space for several + * undo records, we need to keep track of the insert pointer as we go. + */ + if (prepare_idx == 0) + { + /* Nothing allocated already; just ask for some space anywhere. */ + try_location = InvalidUndoRecPtr; + } + else + { + /* + * Ask to extend the space immediately after the last record, if + * possible. A new undo log will be chosen otherwise. + */ + PreparedUndoSpace *space = &prepared_undo[prepare_idx - 1]; + + try_location = UndoLogOffsetPlusUsableBytes(space->urp, space->size); + } + + /* Allocate space for the record. */ + if (InRecovery) + { + /* + * We'll figure out where the space needs to be allocated by + * inspecting the xlog_record. + */ + Assert(upersistence == UNDO_PERMANENT); + urecptr = UndoLogAllocateInRecovery(XidFromFullTransactionId(txid), + size, try_location, + &need_xact_header, + &last_xact_start, + &prevlog_xact_start, + &prevlogurp, + xlog_record); + } + else + { + urecptr = UndoLogAllocate(size, try_location, upersistence, + &need_xact_header, &last_xact_start, + &prevlog_xact_start, &prevlog_insert_urp); + if (UndoRecPtrIsValid(prevlog_xact_start)) + { + uint16 prevlen; + + Assert(UndoRecPtrIsValid(prevlog_insert_urp)); + /* Fetch length of the last undo record of the previous log. */ + prevlen = UndoGetPrevRecordLen(prevlog_insert_urp, InvalidBuffer, + upersistence); + /* Compute the last record's undo record pointer. */ + prevlogurp = + MakeUndoRecPtr(UndoRecPtrGetLogNo(prevlog_insert_urp), + (UndoRecPtrGetOffset(prevlog_insert_urp) - prevlen)); + + /* + * Undo log switched so set prevlog info in current undo log. + * + * XXX can we do this directly in UndoLogAllocate ? but for that + * the UndoLogAllocate might need to read the length of the last + * undo record from the previous undo log but for that it might + * use callback? + */ + UndoLogSwitchSetPrevLogInfo(UndoRecPtrGetLogNo(urecptr), + prevlog_xact_start, prevlogurp); + } + } + + urec->uur_prevurp = prevlogurp; + + /* Initialize transaction related members. */ + urec->uur_progress = 0; + if (need_xact_header) + { + /* + * TODO: Should we set urec->uur_dbid automatically? How can you do + * that, in recovery -- can we extract it from xlog_record? For now + * assume that the caller set it explicitly. + */ + } + else + { + urec->uur_dbid = 0; + + /* We don't need a transaction header after all. */ + urec->uur_info &= ~UREC_INFO_TRANSACTION; + size = UndoRecordExpectedSize(urec); + } + + /* + * If there is a physically preceding transaction in this undo log, and we + * are writing the first record for this transaction that is in this undo + * log (not necessarily the first ever for the transaction, because we + * could have switched logs), then we need to update the size of the + * preceding transaction. + */ + if (need_xact_header && + UndoRecPtrGetOffset(urecptr) > UndoLogBlockHeaderSize) + UndoRecordPrepareTransInfo(urecptr, last_xact_start, xlog_record); + + /* + * If prevlog_xact_start is valid that means the transaction's undo are + * split across the undo logs. So we need to update our own transaction + * header in the previous log as well. + */ + if (UndoRecPtrIsValid(prevlog_xact_start)) + { + Assert(UndoRecPtrIsValid(prevlogurp)); + UndoRecordPrepareTransInfo(urecptr, prevlog_xact_start, xlog_record); + } + + cur_blk = UndoRecPtrGetBlockNum(urecptr); + UndoRecPtrAssignRelFileNode(rnode, urecptr); + starting_byte = UndoRecPtrGetPageOffset(urecptr); + + /* + * If we happen to be writing the very first byte into this page, then + * there is no need to read from disk. + */ + if (starting_byte == UndoLogBlockHeaderSize) + rbm = RBM_ZERO; + else + rbm = RBM_NORMAL; + + do + { + bufidx = UndoGetBufferSlot(rnode, cur_blk, rbm, upersistence, + xlog_record); + if (cur_size == 0) + cur_size = BLCKSZ - starting_byte; + else + cur_size += BLCKSZ - UndoLogBlockHeaderSize; + + /* undo record can't use buffers more than MAX_BUFFER_PER_UNDO. */ + Assert(index < MAX_BUFFER_PER_UNDO); + + /* Keep the track of the buffers we have pinned and locked. */ + prepared_undo[prepare_idx].undo_buffer_idx[index++] = bufidx; + + /* + * If we need more pages they'll be all new so we can definitely skip + * reading from disk. + */ + rbm = RBM_ZERO; + cur_blk++; + } while (cur_size < size); + + /* + * Save the undo record information to be later used by InsertPreparedUndo + * to insert the prepared record. + */ + prepared_undo[prepare_idx].urec = urec; + prepared_undo[prepare_idx].urp = urecptr; + prepared_undo[prepare_idx].size = size; + prepare_idx++; + + return urecptr; +} + +/* + * Insert a previously-prepared undo records. This will write the actual undo + * record into the buffers already pinned and locked in PreparedUndoInsert, + * and mark them dirty. This step should be performed after entering a + * criticalsection; it should never fail. + */ +void +InsertPreparedUndo(void) +{ + Page page = NULL; + int starting_byte; + int bufidx = 0; + int idx; + UndoRecPtr urp; + UnpackedUndoRecord *uur; + InsertUndoContext ucontext = {{0}}; + uint16 size; + + /* There must be atleast one prepared undo record. */ + Assert(prepare_idx > 0); + + /* + * This must be called under a critical section or we must be in recovery. + */ + Assert(InRecovery || CritSectionCount > 0); + + for (idx = 0; idx < prepare_idx; idx++) + { + uur = prepared_undo[idx].urec; + urp = prepared_undo[idx].urp; + size = prepared_undo[idx].size; + + Assert(size == UndoRecordExpectedSize(uur)); + + bufidx = 0; + + /* + * Compute starting offset of the page where to start inserting undo + * record. + */ + starting_byte = UndoRecPtrGetPageOffset(urp); + + /* Initiate inserting the undo record. */ + BeginInsertUndo(&ucontext, uur); + + /* Main loop for writing the undo record. */ + do + { + PreparedUndoSpace undospace = prepared_undo[idx]; + Buffer buffer; + + buffer = undo_buffer[undospace.undo_buffer_idx[bufidx]].buf; + + /* + * During recovery, there might be some blocks which are already + * deleted due to some discard command so we can just skip + * inserting into those blocks. + */ + if (!BufferIsValid(buffer)) + { + Assert(InRecovery); + + /* + * Skip actual writing just update the context so that we have + * write offset for inserting into next blocks. + */ + SkipInsertingUndoData(&ucontext, starting_byte); + if (ucontext.stage == UNDO_INSERT_STAGE_DONE) + break; + } + else + { + page = BufferGetPage(buffer); + + /* + * Initialize the page whenever we try to write the first + * record in page. We start writing immediately after the + * block header. + */ + if (starting_byte == UndoLogBlockHeaderSize) + PageInit(page, BLCKSZ, 0); + + /* + * Try to insert the record into the current page. If it + * doesn't succeed then recall the routine with the next page. + */ + InsertUndoData(&ucontext, page, starting_byte); + if (ucontext.stage == UNDO_INSERT_STAGE_DONE) + { + MarkBufferDirty(buffer); + break; + } + MarkBufferDirty(buffer); + } + + /* Insert remaining record in next block. */ + starting_byte = UndoLogBlockHeaderSize; + bufidx++; + + /* undo record can't use buffers more than MAX_BUFFER_PER_UNDO. */ + Assert(bufidx < MAX_BUFFER_PER_UNDO); + } while (true); + + /* Advance the insert pointer past this record. */ + UndoLogAdvance(urp, size); + } + + /* Update previously prepared transaction headers. */ + if (xact_urec_info_idx > 0) + { + int i = 0; + + for (i = 0; i < xact_urec_info_idx; i++) + UndoRecordUpdateTransInfo(i); + } +} + +/* + * Helper function for UndoFetchRecord. It will fetch the undo record pointed + * by urp and unpack the record into urec. This function will not release the + * pin on the buffer if complete record is fetched from one buffer, so caller + * can reuse the same urec to fetch the another undo record which is on the + * same block. Caller will be responsible to release the buffer inside urec + * and set it to invalid if it wishes to fetch the record from another block. + * + * prevbuf : Remember the first buffer of the undo record in a hope that + * while traversing the undo chain in backward we might get previous record + * on the same buffer. + */ +static UnpackedUndoRecord * +UndoGetOneRecord(UnpackedUndoRecord *urec, UndoRecPtr urp, RelFileNode rnode, + UndoPersistence persistence, Buffer *curbuf) +{ + Page page; + int starting_byte = UndoRecPtrGetPageOffset(urp); + BlockNumber cur_blk; + UnpackUndoContext ucontext = {{0}}; + Buffer buffer = *curbuf; + + cur_blk = UndoRecPtrGetBlockNum(urp); + + /* Initiate unpacking one undo record. */ + BeginUnpackUndo(&ucontext); + + while (true) + { + /* If we already have a buffer then no need to allocate a new one. */ + if (!BufferIsValid(buffer)) + { + buffer = ReadBufferWithoutRelcache(SMGR_UNDO, + rnode, UndoLogForkNum, cur_blk, + RBM_NORMAL, NULL, + RelPersistenceForUndoPersistence(persistence)); + + /* + * Remember the first buffer where this undo started as next undo + * record what we fetch might fall on the same buffer. + */ + if (!BufferIsValid(*curbuf)) + *curbuf = buffer; + + /* Acquire shared lock on the buffer before reading undo from it. */ + LockBuffer(buffer, BUFFER_LOCK_SHARE); + } + + page = BufferGetPage(buffer); + + UnpackUndoData(&ucontext, page, starting_byte); + + /* + * We are done if we have reached to the done stage otherwise move to + * next block and continue reading from there. + */ + if (ucontext.stage == UNDO_DECODE_STAGE_DONE) + { + if (buffer != *curbuf) + UnlockReleaseBuffer(buffer); + break; + } + + /* + * The record spans more than a page so we would have copied it (see + * UnpackUndoRecord). In such cases, we can release the buffer. + */ + if (buffer != *curbuf) + UnlockReleaseBuffer(buffer); + buffer = InvalidBuffer; + + /* Go to next block. */ + cur_blk++; + starting_byte = UndoLogBlockHeaderSize; + } + + /* Final step of unpacking. */ + FinishUnpackUndo(&ucontext, urec); + + return urec; +} + +/* + * ResetUndoRecord - Helper function for UndoFetchRecord to reset the current + * record. + */ +static void +ResetUndoRecord(UnpackedUndoRecord *urec, UndoRecPtr urp, RelFileNode *rnode, + RelFileNode *prevrec_rnode, Buffer *buffer) +{ + /* + * If we have a valid buffer pinned then just ensure that we want to find + * the next tuple from the same block. Otherwise release the buffer and + * set it invalid + */ + if (BufferIsValid(*buffer)) + { + /* + * Undo buffer will be changed if the next undo record belongs to a + * different block or undo log. + */ + if ((UndoRecPtrGetBlockNum(urp) != + BufferGetBlockNumber(*buffer)) || + (prevrec_rnode->relNode != rnode->relNode)) + { + ReleaseBuffer(*buffer); + *buffer = InvalidBuffer; + } + } + + if (urec->uur_payload.data) + pfree(urec->uur_payload.data); + if (urec->uur_tuple.data) + pfree(urec->uur_tuple.data); + + /* Reset the urec before fetching the tuple */ + urec->uur_tuple.data = NULL; + urec->uur_tuple.len = 0; + urec->uur_payload.data = NULL; + urec->uur_payload.len = 0; +} + +/* + * Fetch the next undo record for given blkno, offset and transaction id (if + * valid). The same tuple can be modified by multiple transactions, so during + * undo chain traversal sometimes we need to distinguish based on transaction + * id. Callers that don't have any such requirement can pass + * InvalidTransactionId. + * + * Start the search from urp. Caller need to call UndoRecordRelease to release the + * resources allocated by this function. + * + * urec_ptr_out is undo record pointer of the qualified undo record if valid + * pointer is passed. + * + * callback function decides whether particular undo record satisfies the + * condition of caller. + * + * Returns the required undo record if found, otherwise, return NULL which + * means either the record is already discarded or there is no such record + * in the undo chain. + */ +UnpackedUndoRecord * +UndoFetchRecord(UndoRecPtr urp, BlockNumber blkno, OffsetNumber offset, + TransactionId xid, UndoRecPtr *urec_ptr_out, + SatisfyUndoRecordCallback callback) +{ + RelFileNode rnode, + prevrec_rnode = {0}; + UnpackedUndoRecord *urec = NULL; + Buffer buffer = InvalidBuffer; + int logno; + + if (urec_ptr_out) + *urec_ptr_out = InvalidUndoRecPtr; + + /* + * Allocate memory for holding the undo record, caller should be + * responsible for freeing this memory. + */ + urec = palloc0(sizeof(UnpackedUndoRecord)); + UndoRecPtrAssignRelFileNode(rnode, urp); + + /* Find the undo record pointer we are interested in. */ + while (true) + { + UndoLogControl *log; + + logno = UndoRecPtrGetLogNo(urp); + log = UndoLogGet(logno, true); + if (log == NULL) + { + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); + return NULL; + } + + /* + * Prevent UndoDiscardOneLog() from discarding data while we try to + * read it. Usually we would acquire log->mutex to read log->meta + * members, but in this case we know that discard can't move without + * also holding log->discard_lock. + */ + LWLockAcquire(&log->discard_lock, LW_SHARED); + if (!UndoRecordIsValid(log, urp)) + { + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); + LWLockRelease(&log->discard_lock); + return NULL; + } + + /* Fetch the current undo record. */ + urec = UndoGetOneRecord(urec, urp, rnode, log->meta.persistence, + &buffer); + LWLockRelease(&log->discard_lock); + + if (blkno == InvalidBlockNumber) + break; + + /* Check whether the undo record satisfies conditions */ + if (callback(urec, blkno, offset, xid)) + break; + + urp = urec->uur_blkprev; + prevrec_rnode = rnode; + + /* Get rnode for the current undo record pointer. */ + UndoRecPtrAssignRelFileNode(rnode, urp); + + /* Reset the current undo record before fetching the next. */ + ResetUndoRecord(urec, urp, &rnode, &prevrec_rnode, &buffer); + } + + if (urec_ptr_out) + *urec_ptr_out = urp; + + /* Release the last buffer. */ + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); + return urec; +} + +/* + * PrefetchUndoPages - Prefetch undo pages + * + * Prefetch undo pages, if prefetch_pages are behind prefetch_target + */ +static void +PrefetchUndoPages(RelFileNode rnode, int prefetch_target, int *prefetch_pages, + BlockNumber to_blkno, BlockNumber from_blkno, + char persistence) +{ + int nprefetch; + BlockNumber startblock; + BlockNumber lastprefetched; + + /* Calculate last prefetched page in the previous iteration. */ + lastprefetched = from_blkno - *prefetch_pages; + + /* We have already prefetched all the pages of the transaction's undo. */ + if (lastprefetched <= to_blkno) + return; + + /* Calculate number of blocks to be prefetched. */ + nprefetch = + Min(prefetch_target - *prefetch_pages, lastprefetched - to_blkno); + + /* Where to start prefetch. */ + startblock = lastprefetched - nprefetch; + + while (nprefetch--) + { + PrefetchBufferWithoutRelcache(SMGR_UNDO, rnode, MAIN_FORKNUM, + startblock++, + RelPersistenceForUndoPersistence(persistence)); + (*prefetch_pages)++; + } +} + +/* + * UndoRecordBulkFetch - Read undo records in bulk + * + * Read undo records between from_urecptr and to_urecptr until we exhaust the + * the memory size specified by undo_apply_size. If we could not read all the + * records till to_urecptr then the caller should consume current set of records + * and call this function again. + * + * from_urecptr - Where to start fetching the undo records. If we can not + * read all the records because of memory limit then this + * will be set to the previous undo record pointer from where + * we need to start fetching on next call. Otherwise it will + * be set to InvalidUndoRecPtr. + * to_urecptr - Last undo record pointer to be fetched. + * undo_apply_size - Memory segment limit to collect undo records. + * nrecords - Number of undo records read. + * one_page - Caller is applying undo only for one block not for + * complete transaction. If this is set true then instead + * of following transaction undo chain using prevlen we will + * follow the block prev chain of the block so that we can + * avoid reading many unnecessary undo records of the + * transaction. + */ +UndoRecInfo * +UndoRecordBulkFetch(UndoRecPtr *from_urecptr, UndoRecPtr to_urecptr, + int undo_apply_size, int *nrecords, bool one_page) +{ + RelFileNode rnode; + UndoRecPtr urecptr, + prev_urec_ptr; + BlockNumber blkno; + BlockNumber to_blkno; + Buffer buffer = InvalidBuffer; + UnpackedUndoRecord *uur = NULL; + UndoRecInfo *urp_array; + int urp_array_size = 1024; + int urp_index = 0; + int prefetch_target = 0; + int prefetch_pages = 0; + Size total_size = 0; + TransactionId xid = InvalidTransactionId; + + /* + * In one_page mode we are fetching undo only for one page instead of + * fetching all the undo of the transaction. Basically, we are fetching + * interleaved undo records. So it does not make sense to do any prefetch + * in that case. + */ + if (!one_page) + prefetch_target = target_prefetch_pages; + + /* + * Allocate initial memory to hold the undo record info, we can expand it + * if needed. + */ + urp_array = (UndoRecInfo *) palloc(sizeof(UndoRecInfo) * urp_array_size); + urecptr = *from_urecptr; + + prev_urec_ptr = InvalidUndoRecPtr; + *from_urecptr = InvalidUndoRecPtr; + + /* Read undo chain backward until we reach to the first undo record. */ + do + { + BlockNumber from_blkno; + UndoLogControl *log; + UndoPersistence persistence; + int size; + int logno; + + logno = UndoRecPtrGetLogNo(urecptr); + log = UndoLogGet(logno, true); + if (log == NULL) + { + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); + return NULL; + } + persistence = log->meta.persistence; + + UndoRecPtrAssignRelFileNode(rnode, urecptr); + to_blkno = UndoRecPtrGetBlockNum(to_urecptr); + from_blkno = UndoRecPtrGetBlockNum(urecptr); + + /* Allocate memory for next undo record. */ + uur = palloc0(sizeof(UnpackedUndoRecord)); + + /* + * If next undo record pointer to be fetched is not on the same block + * then release the old buffer and reduce the prefetch_pages count by + * one as we have consumed one page. Otherwise, just set the old + * buffer into the new undo record so that UndoGetOneRecord don't read + * the buffer again. + */ + blkno = UndoRecPtrGetBlockNum(urecptr); + if (!UndoRecPtrIsValid(prev_urec_ptr) || + UndoRecPtrGetLogNo(prev_urec_ptr) != logno || + UndoRecPtrGetBlockNum(prev_urec_ptr) != blkno) + { + /* Release the previous buffer */ + if (BufferIsValid(buffer)) + { + UnlockReleaseBuffer(buffer); + buffer = InvalidBuffer; + } + + if (prefetch_pages > 0) + prefetch_pages--; + } + + /* + * If prefetch_pages are half of the prefetch_target then it's time to + * prefetch again. + */ + if (prefetch_pages < prefetch_target / 2) + PrefetchUndoPages(rnode, prefetch_target, &prefetch_pages, to_blkno, + from_blkno, persistence); + + /* + * In one_page mode it's possible that the undo of the transaction + * might have been applied by worker and undo got discarded. Prevent + * discard worker from discarding undo data while we are reading it. + * See detail comment in UndoFetchRecord. In normal mode we are + * holding transaction undo action lock so it can not be discarded. + */ + if (one_page) + { + LWLockAcquire(&log->discard_lock, LW_SHARED); + + if (!UndoRecordIsValid(log, urecptr)) + { + LWLockRelease(&log->discard_lock); + break; + } + + /* Read the undo record. */ + UndoGetOneRecord(uur, urecptr, rnode, persistence, &buffer); + LWLockRelease(&log->discard_lock); + } + else + UndoGetOneRecord(uur, urecptr, rnode, persistence, &buffer); + + /* + * As soon as the transaction id is changed we can stop fetching the + * undo record. Ideally, to_urecptr should control this but while + * reading undo only for a page we don't know what is the end undo + * record pointer for the transaction. + */ + if (one_page) + { + if (!TransactionIdIsValid(xid)) + xid = uur->uur_xid; + else if (xid != uur->uur_xid) + break; + } + + /* Remember the previous undo record pointer. */ + prev_urec_ptr = urecptr; + + /* + * Calculate the previous undo record pointer of the transaction. If + * we are reading undo only for a page then follow the blkprev chain + * of the page. Otherwise, calculate the previous undo record pointer + * using transaction's current undo record pointer and the prevlen. + */ + if (one_page) + urecptr = uur->uur_blkprev; + else if (prev_urec_ptr == to_urecptr || uur->uur_info & UREC_INFO_TRANSACTION) + urecptr = InvalidUndoRecPtr; + else + urecptr = UndoGetPrevUndoRecptr(prev_urec_ptr, uur->uur_prevurp, + buffer, persistence); + + /* We have consumed all elements of the urp_array so expand its size. */ + if (urp_index >= urp_array_size) + { + urp_array_size *= 2; + urp_array = + repalloc(urp_array, sizeof(UndoRecInfo) * urp_array_size); + } + + /* Add entry in the urp_array */ + urp_array[urp_index].index = urp_index; + urp_array[urp_index].urp = prev_urec_ptr; + urp_array[urp_index].uur = uur; + urp_index++; + + /* We have fetched all the undo records for the transaction. */ + if (!UndoRecPtrIsValid(urecptr) || (prev_urec_ptr == to_urecptr)) + break; + + /* + * Including current record, if we have crossed the memory limit then + * stop processing more records. Remember to set the from_urecptr so + * that on next call we can resume fetching undo records where we left + * it. + */ + size = UnpackedUndoRecordSize(uur); + total_size += size; + + if (total_size >= undo_apply_size) + { + *from_urecptr = urecptr; + break; + } + } while (true); + + /* Release the last buffer. */ + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); + + *nrecords = urp_index; + + return urp_array; +} + +/* + * Release the resources allocated by UndoFetchRecord. + */ +void +UndoRecordRelease(UnpackedUndoRecord *urec) +{ + if (urec->uur_payload.data) + pfree(urec->uur_payload.data); + if (urec->uur_tuple.data) + pfree(urec->uur_tuple.data); + + pfree(urec); +} + +/* + * RegisterUndoLogBuffers - Register the undo buffers. + */ +void +RegisterUndoLogBuffers(uint8 first_block_id) +{ + int idx; + int flags; + + for (idx = 0; idx < buffer_idx; idx++) + { + flags = undo_buffer[idx].zero + ? REGBUF_KEEP_DATA_AFTER_CP | REGBUF_WILL_INIT + : REGBUF_KEEP_DATA_AFTER_CP; + XLogRegisterBuffer(first_block_id + idx, undo_buffer[idx].buf, flags); + UndoLogRegister(first_block_id + idx, undo_buffer[idx].logno); + } +} + +/* + * UndoLogBuffersSetLSN - Set LSN on undo page. +*/ +void +UndoLogBuffersSetLSN(XLogRecPtr recptr) +{ + int idx; + + for (idx = 0; idx < buffer_idx; idx++) + PageSetLSN(BufferGetPage(undo_buffer[idx].buf), recptr); +} + +/* + * Reset the global variables related to undo buffers. This is required at the + * transaction abort and while releasing the undo buffers. + */ +static inline void +ResetUndoBuffers(void) +{ + int i; + + /* Reset undo buffer array. */ + for (i = 0; i < buffer_idx; i++) + { + undo_buffer[i].blk = InvalidBlockNumber; + undo_buffer[i].buf = InvalidBuffer; + } + + for (i = 0; i < xact_urec_info_idx; i++) + xact_urec_info[i].urecptr = InvalidUndoRecPtr; + + /* Reset the prepared index. */ + prepare_idx = 0; + buffer_idx = 0; + xact_urec_info_idx = 0; + + /* + * max_prepared_undo limit is changed so free the allocated memory and + * reset all the variable back to their default value. + */ + if (max_prepared_undo > MAX_PREPARED_UNDO) + { + pfree(undo_buffer); + pfree(prepared_undo); + undo_buffer = def_buffers; + prepared_undo = def_prepared; + max_prepared_undo = MAX_PREPARED_UNDO; + } +} + +/* + * Reset the undo buffers at abort. + */ +void +AtAbort_ResetUndoBuffers(void) +{ + ResetUndoBuffers(); +} + +/* + * Unlock and release the undo buffers. This step must be performed after + * exiting any critical section where we have perfomed undo actions. + */ +void +UnlockReleaseUndoBuffers(void) +{ + int i; + + for (i = 0; i < buffer_idx; i++) + { + if (BufferIsValid(undo_buffer[i].buf)) + UnlockReleaseBuffer(undo_buffer[i].buf); + } + ResetUndoBuffers(); +} + +/* + * UndoGetPrevRecordLen - read length of the previous undo record. + * + * This function will take an undo record pointer as an input and read the + * length of the previous undo record which is stored at the end of the previous + * undo record. If the previous undo record is split then this will add the + * undo block header size in the total length. + */ +static uint16 +UndoGetPrevRecordLen(UndoRecPtr urp, Buffer input_buffer, + UndoPersistence upersistence) +{ + UndoLogOffset page_offset = UndoRecPtrGetPageOffset(urp); + BlockNumber cur_blk = UndoRecPtrGetBlockNum(urp); + Buffer buffer = input_buffer; + char *page; + char prevlen[2]; + RelFileNode rnode; + int byte_to_read = sizeof(uint16); + char persistence; + uint16 prev_rec_len = 0; + + /* Get relfilenode. */ + UndoRecPtrAssignRelFileNode(rnode, urp); + persistence = RelPersistenceForUndoPersistence(upersistence); + + /* + * If caller has passed invalid buffer then read the buffer. + */ + if (!BufferIsValid(buffer)) + { + buffer = ReadBufferWithoutRelcache(SMGR_UNDO, rnode, UndoLogForkNum, + cur_blk, RBM_NORMAL, NULL, + persistence); + + LockBuffer(buffer, BUFFER_LOCK_SHARE); + } + + page = (char *) BufferGetPage(buffer); + + /* + * Length if the previous undo record is store at the end of that record + * so just fetch last 2 bytes. + */ + while (byte_to_read > 0) + { + page_offset -= 1; + + /* + * Read first prevlen byte from current page if page_offset hasn't + * reach to undo block header. Otherwise move to the previous page. + */ + if (page_offset >= UndoLogBlockHeaderSize) + { + prevlen[byte_to_read - 1] = page[page_offset]; + byte_to_read -= 1; + } + else + { + /* Release the previous buffer. */ + if (input_buffer != buffer) + UnlockReleaseBuffer(buffer); + cur_blk -= 1; + persistence = RelPersistenceForUndoPersistence(upersistence); + buffer = ReadBufferWithoutRelcache(SMGR_UNDO, rnode, UndoLogForkNum, + cur_blk, RBM_NORMAL, NULL, + persistence); + LockBuffer(buffer, BUFFER_LOCK_SHARE); + page_offset = BLCKSZ; + page = (char *) BufferGetPage(buffer); + } + } + + prev_rec_len = *(uint16 *) (prevlen); + + /* + * If previous undo record is not completely stored in this page then add + * UndoLogBlockHeaderSize in total length so that the call can use this + * length to compute the undo record pointer of the previous undo record. + */ + if (UndoRecPtrGetPageOffset(urp) - UndoLogBlockHeaderSize < prev_rec_len) + prev_rec_len += UndoLogBlockHeaderSize; + + /* Release the buffer if we have locally read it. */ + if (input_buffer != buffer) + UnlockReleaseBuffer(buffer); + + return prev_rec_len; +} + +/* + * Return the previous undo record pointer. + * + * A valid value of prevurp indicates that the previous undo record + * pointer is in some other log and caller can directly use that. + * Otherwise this will calculate the previous undo record pointer + * by using current urp and the prevlen. + */ +UndoRecPtr +UndoGetPrevUndoRecptr(UndoRecPtr urp, UndoRecPtr prevurp, Buffer buffer, + UndoPersistence upersistence) +{ + if (UndoRecPtrIsValid(prevurp)) + return prevurp; + else + { + UndoLogNumber logno = UndoRecPtrGetLogNo(urp); + UndoLogOffset offset = UndoRecPtrGetOffset(urp); + uint16 prevlen; + + /* Read length of the previous undo record. */ + prevlen = UndoGetPrevRecordLen(urp, buffer, upersistence); + + /* calculate the previous undo record pointer */ + return MakeUndoRecPtr(logno, offset - prevlen); + } +} diff --git a/src/backend/access/undo/undorecord.c b/src/backend/access/undo/undorecord.c new file mode 100644 index 0000000..fd16a88 --- /dev/null +++ b/src/backend/access/undo/undorecord.c @@ -0,0 +1,677 @@ +/*------------------------------------------------------------------------- + * + * undorecord.c + * encode and decode undo records + * + * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/backend/access/undo/undorecord.c + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/subtrans.h" +#include "access/undorecord.h" +#include "catalog/pg_tablespace.h" +#include "storage/block.h" + +/* Prototypes for static functions. */ +static bool InsertUndoBytes(char *sourceptr, int sourcelen, + char **writeptr, char *endptr, + int *total_bytes_written, int *partial_write); +static bool ReadUndoBytes(char *destptr, int readlen, + char **readptr, char *endptr, + int *total_bytes_read, int *partial_read); + +/* + * Compute and return the expected size of an undo record. + */ +Size +UndoRecordExpectedSize(UnpackedUndoRecord *uur) +{ + Size size; + + size = SizeOfUndoRecordHeader + sizeof(uint16); + if ((uur->uur_info & UREC_INFO_RELATION_DETAILS) != 0) + size += SizeOfUndoRecordRelationDetails; + if ((uur->uur_info & UREC_INFO_BLOCK) != 0) + size += SizeOfUndoRecordBlock; + if ((uur->uur_info & UREC_INFO_TRANSACTION) != 0) + size += SizeOfUndoRecordTransaction; + if ((uur->uur_info & UREC_INFO_PAYLOAD) != 0) + { + size += SizeOfUndoRecordPayload; + size += uur->uur_payload.len; + size += uur->uur_tuple.len; + } + + return size; +} + +/* + * Compute size of the Unpacked undo record in memory + */ +Size +UnpackedUndoRecordSize(UnpackedUndoRecord *uur) +{ + Size size; + + size = sizeof(UnpackedUndoRecord); + + /* Add payload size if record contains payload data. */ + if ((uur->uur_info & UREC_INFO_PAYLOAD) != 0) + { + size += uur->uur_payload.len; + size += uur->uur_tuple.len; + } + + return size; +} + +/* + * BeginUnpackUndo - Initiate unpacking a single one record. + */ +void +BeginInsertUndo(InsertUndoContext *ucontext, UnpackedUndoRecord *uur) +{ + ucontext->stage = UNDO_INSERT_STAGE_HEADER; + ucontext->already_written = 0; + ucontext->partial_write = 0; + /* Copy undo record header. */ + ucontext->urec_hd.urec_rmid = uur->uur_rmid; + ucontext->urec_hd.urec_type = uur->uur_type; + ucontext->urec_hd.urec_info = uur->uur_info; + ucontext->urec_hd.urec_reloid = uur->uur_reloid; + ucontext->urec_hd.urec_xid = uur->uur_xid; + ucontext->urec_hd.urec_cid = uur->uur_cid; + + /* Copy undo record relation header if it presents. */ + if ((uur->uur_info & UREC_INFO_RELATION_DETAILS) != 0) + ucontext->urec_rd.urec_fork = uur->uur_fork; + + /* Copy undo record block header if it presents. */ + if ((uur->uur_info & UREC_INFO_BLOCK) != 0) + { + ucontext->urec_blk.urec_blkprev = uur->uur_blkprev; + ucontext->urec_blk.urec_block = uur->uur_block; + ucontext->urec_blk.urec_offset = uur->uur_offset; + } + + /* Copy undo record transaction header if it presents. */ + if ((uur->uur_info & UREC_INFO_TRANSACTION) != 0) + { + ucontext->urec_txn.urec_progress = uur->uur_progress; + ucontext->urec_txn.urec_xidepoch = uur->uur_xidepoch; + ucontext->urec_txn.urec_dbid = uur->uur_dbid; + ucontext->urec_txn.urec_prevurp = uur->uur_prevurp; + ucontext->urec_txn.urec_next = uur->uur_next; + } + + /* Copy undo record payload header and data if it presents. */ + if ((uur->uur_info & UREC_INFO_PAYLOAD) != 0) + { + ucontext->urec_payload.urec_payload_len = uur->uur_payload.len; + ucontext->urec_payload.urec_tuple_len = uur->uur_tuple.len; + ucontext->urec_payloaddata = uur->uur_payload.data; + ucontext->urec_tupledata = uur->uur_tuple.data; + } + ucontext->undo_len = UndoRecordExpectedSize(uur); +} + +/* + * InsertUndoData - Insert the undo record into the input page from the unpack + * undo context. Caller can call this function multiple times until desired + * stage is reached. This will write the undo record into the page. + */ +void +InsertUndoData(InsertUndoContext *ucontext, Page page, int starting_byte) +{ + char *writeptr = (char *) page + starting_byte; + char *endptr = (char *) page + BLCKSZ; + + switch (ucontext->stage) + { + case UNDO_INSERT_STAGE_HEADER: + /* Insert undo record header. */ + if (!InsertUndoBytes((char *) &ucontext->urec_hd, + SizeOfUndoRecordHeader, &writeptr, endptr, + &ucontext->already_written, + &ucontext->partial_write)) + return; + ucontext->stage = UNDO_INSERT_STAGE_RELATION_DETAILS; + /* fall through */ + case UNDO_INSERT_STAGE_RELATION_DETAILS: + if ((ucontext->urec_hd.urec_info & UREC_INFO_RELATION_DETAILS) != 0) + { + /* Insert undo record relation header. */ + if (!InsertUndoBytes((char *) &ucontext->urec_rd, + SizeOfUndoRecordRelationDetails, + &writeptr, endptr, + &ucontext->already_written, + &ucontext->partial_write)) + return; + } + ucontext->stage = UNDO_INSERT_STAGE_BLOCK; + /* fall through */ + case UNDO_INSERT_STAGE_BLOCK: + if ((ucontext->urec_hd.urec_info & UREC_INFO_BLOCK) != 0) + { + /* Insert undo record block header. */ + if (!InsertUndoBytes((char *) &ucontext->urec_blk, + SizeOfUndoRecordBlock, + &writeptr, endptr, + &ucontext->already_written, + &ucontext->partial_write)) + return; + } + ucontext->stage = UNDO_INSERT_STAGE_TRANSACTION; + /* fall through */ + case UNDO_INSERT_STAGE_TRANSACTION: + if ((ucontext->urec_hd.urec_info & UREC_INFO_TRANSACTION) != 0) + { + /* Insert undo record transaction header. */ + if (!InsertUndoBytes((char *) &ucontext->urec_txn, + SizeOfUndoRecordTransaction, + &writeptr, endptr, + &ucontext->already_written, + &ucontext->partial_write)) + return; + } + ucontext->stage = UNDO_INSERT_STAGE_PAYLOAD; + /* fall through */ + case UNDO_INSERT_STAGE_PAYLOAD: + if ((ucontext->urec_hd.urec_info & UREC_INFO_PAYLOAD) != 0) + { + /* Insert undo record payload header. */ + if (!InsertUndoBytes((char *) &ucontext->urec_payload, + SizeOfUndoRecordPayload, + &writeptr, endptr, + &ucontext->already_written, + &ucontext->partial_write)) + return; + } + ucontext->stage = UNDO_INSERT_STAGE_PAYLOAD_DATA; + /* fall through */ + case UNDO_INSERT_STAGE_PAYLOAD_DATA: + { + int len = ucontext->urec_payload.urec_payload_len; + + if (len > 0) + { + /* Insert payload data. */ + if (!InsertUndoBytes((char *) ucontext->urec_payloaddata, + len, &writeptr, endptr, + &ucontext->already_written, + &ucontext->partial_write)) + return; + } + ucontext->stage = UNDO_INSERT_STAGE_TUPLE_DATA; + /* fall through */ + } + case UNDO_INSERT_STAGE_TUPLE_DATA: + { + int len = ucontext->urec_payload.urec_tuple_len; + + if (len > 0) + { + /* Insert tuple data. */ + if (!InsertUndoBytes((char *) ucontext->urec_tupledata, + len, &writeptr, endptr, + &ucontext->already_written, + &ucontext->partial_write)) + return; + } + ucontext->stage = UNDO_INSERT_STAGE_UNDO_LENGTH; + /* fall through */ + } + case UNDO_INSERT_STAGE_UNDO_LENGTH: + /* Insert undo length. */ + if (!InsertUndoBytes((char *) &ucontext->undo_len, + sizeof(uint16), &writeptr, endptr, + &ucontext->already_written, + &ucontext->partial_write)) + return; + + ucontext->stage = UNDO_INSERT_STAGE_DONE; + /* fall through */ + + case UNDO_INSERT_STAGE_DONE: + /* Nothing to be done. */ + break; + default: + Assert(0); /* Invalid stage */ + } + + return; +} + +/* + * SkipInsertingUndoData - Skip inserting undo record + * + * Don't insert the actual undo record instead just update the context right + * so that if we need to insert the remaining partial record to the next + * block then we have right context. + */ +void +SkipInsertingUndoData(InsertUndoContext *ucontext, int starting_byte) +{ + int remaining = BLCKSZ - starting_byte; + + switch (ucontext->stage) + { + case UNDO_INSERT_STAGE_HEADER: + if (remaining < SizeOfUndoRecordHeader) + { + ucontext->partial_write = remaining; + return; + } + remaining -= SizeOfUndoRecordHeader; + ucontext->stage = UNDO_INSERT_STAGE_RELATION_DETAILS; + /* fall through */ + + case UNDO_INSERT_STAGE_RELATION_DETAILS: + if ((ucontext->urec_hd.urec_info & UREC_INFO_RELATION_DETAILS) != 0) + { + if (remaining < SizeOfUndoRecordHeader) + { + ucontext->partial_write = remaining; + return; + } + remaining -= SizeOfUndoRecordRelationDetails; + } + + ucontext->stage = UNDO_INSERT_STAGE_BLOCK; + /* fall through */ + + case UNDO_INSERT_STAGE_BLOCK: + if ((ucontext->urec_hd.urec_info & UREC_INFO_BLOCK) != 0) + { + if (remaining < SizeOfUndoRecordHeader) + { + ucontext->partial_write = remaining; + return; + } + remaining -= SizeOfUndoRecordBlock; + } + ucontext->stage = UNDO_INSERT_STAGE_TRANSACTION; + /* fall through */ + + case UNDO_INSERT_STAGE_TRANSACTION: + if ((ucontext->urec_hd.urec_info & UREC_INFO_TRANSACTION) != 0) + { + if (remaining < SizeOfUndoRecordHeader) + { + ucontext->partial_write = remaining; + return; + } + remaining -= SizeOfUndoRecordTransaction; + } + + ucontext->stage = UNDO_INSERT_STAGE_PAYLOAD; + /* fall through */ + + case UNDO_INSERT_STAGE_PAYLOAD: + /* Skip payload header. */ + if ((ucontext->urec_hd.urec_info & UREC_INFO_PAYLOAD) != 0) + { + if (remaining < SizeOfUndoRecordHeader) + { + ucontext->partial_write = remaining; + return; + } + remaining -= SizeOfUndoRecordPayload; + } + ucontext->stage = UNDO_INSERT_STAGE_PAYLOAD_DATA; + /* fall through */ + + case UNDO_INSERT_STAGE_PAYLOAD_DATA: + if (ucontext->urec_payload.urec_payload_len > 0) + { + if (remaining < ucontext->urec_payload.urec_payload_len) + { + ucontext->partial_write = remaining; + return; + } + remaining -= ucontext->urec_payload.urec_payload_len; + } + ucontext->stage = UNDO_INSERT_STAGE_TUPLE_DATA; + /* fall through */ + + case UNDO_INSERT_STAGE_TUPLE_DATA: + if (ucontext->urec_payload.urec_tuple_len > 0) + { + if (remaining < ucontext->urec_payload.urec_tuple_len) + { + ucontext->partial_write = remaining; + return; + } + remaining -= ucontext->urec_payload.urec_tuple_len; + } + ucontext->stage = UNDO_INSERT_STAGE_UNDO_LENGTH; + /* fall through */ + + case UNDO_INSERT_STAGE_UNDO_LENGTH: + ucontext->stage = UNDO_INSERT_STAGE_DONE; + return; + case UNDO_INSERT_STAGE_DONE: + /* Nothing to be done. */ + break; + default: + Assert(0); /* Invalid stage */ + } + + return; +} + +/* + * Write undo bytes from a particular source, but only to the extent that + * they weren't written previously and will fit. + * + * 'sourceptr' points to the source data, and 'sourcelen' is the length of + * that data in bytes. + * + * 'writeptr' points to the insertion point for these bytes, and is updated + * for whatever we write. The insertion point must not pass 'endptr', which + * represents the end of the buffer into which we are writing. + * + * 'my_bytes_written' is a pointer to the count of previous-written bytes + * from this and following structures in this undo record; that is, any + * bytes that are part of previous structures in the record have already + * been subtracted out. + * + * 'total_bytes_written' points to the count of all previously-written bytes, + * and must it must be updated for the bytes we write. + * + * The return value is false if we ran out of space before writing all + * the bytes, and otherwise true. + */ +static bool +InsertUndoBytes(char *sourceptr, int sourcelen, char **writeptr, char *endptr, + int *total_bytes_written, int *partial_write) +{ + int can_write; + int remaining; + + /* Compute number of bytes we can write. */ + remaining = sourcelen - *partial_write; + can_write = Min(remaining, endptr - *writeptr); + + /* Bail out if no bytes can be written. */ + if (can_write == 0) + return false; + + /* Copy the bytes we can write. */ + memcpy(*writeptr, sourceptr + *partial_write, can_write); + + /* Update bookkeeping information. */ + *writeptr += can_write; + *total_bytes_written += can_write; + + /* Could not read whole data so set the partial_read. */ + if (can_write < remaining) + { + *partial_write += can_write; + return false; + } + + /* Return true only if we wrote the whole thing. */ + *partial_write = 0; + return true; +} + +/* + * BeginUnpackUndo - Initiate unpacking a single one record. + */ +void +BeginUnpackUndo(UnpackUndoContext *ucontext) +{ + ucontext->stage = UNDO_DECODE_STAGE_HEADER; + ucontext->already_read = 0; + ucontext->partial_read = 0; +} + +/* + * UnpackUndoData - Read the undo record from the input page to the unpack undo + * context. Caller can call this function multiple times until desired stage + * is reached. This will read the undo record from the page and store the data + * into unpack undo context, which can be later copied to unpacked undo record + * by calling FinishUnpackUndo. + */ +void +UnpackUndoData(UnpackUndoContext *ucontext, Page page, int starting_byte) +{ + char *readptr = (char *) page + starting_byte; + char *endptr = (char *) page + BLCKSZ; + + switch (ucontext->stage) + { + case UNDO_DECODE_STAGE_HEADER: + if (!ReadUndoBytes((char *) &ucontext->urec_hd, + SizeOfUndoRecordHeader, &readptr, endptr, + &ucontext->already_read, + &ucontext->partial_read)) + return; + ucontext->stage = UNDO_DECODE_STAGE_RELATION_DETAILS; + /* fall through */ + case UNDO_DECODE_STAGE_RELATION_DETAILS: + if ((ucontext->urec_hd.urec_info & UREC_INFO_RELATION_DETAILS) != 0) + { + if (!ReadUndoBytes((char *) &ucontext->urec_rd, + SizeOfUndoRecordRelationDetails, + &readptr, endptr, &ucontext->already_read, + &ucontext->partial_read)) + return; + } + ucontext->stage = UNDO_DECODE_STAGE_BLOCK; + /* fall through */ + case UNDO_DECODE_STAGE_BLOCK: + if ((ucontext->urec_hd.urec_info & UREC_INFO_BLOCK) != 0) + { + if (!ReadUndoBytes((char *) &ucontext->urec_blk, + SizeOfUndoRecordBlock, + &readptr, endptr, &ucontext->already_read, + &ucontext->partial_read)) + return; + } + ucontext->stage = UNDO_DECODE_STAGE_TRANSACTION; + /* fall through */ + case UNDO_DECODE_STAGE_TRANSACTION: + if ((ucontext->urec_hd.urec_info & UREC_INFO_TRANSACTION) != 0) + { + if (!ReadUndoBytes((char *) &ucontext->urec_txn, + SizeOfUndoRecordTransaction, + &readptr, endptr, &ucontext->already_read, + &ucontext->partial_read)) + return; + } + ucontext->stage = UNDO_DECODE_STAGE_PAYLOAD; + /* fall through */ + case UNDO_DECODE_STAGE_PAYLOAD: + /* Read payload header. */ + if ((ucontext->urec_hd.urec_info & UREC_INFO_PAYLOAD) != 0) + { + if (!ReadUndoBytes((char *) &ucontext->urec_payload, + SizeOfUndoRecordPayload, + &readptr, endptr, &ucontext->already_read, + &ucontext->partial_read)) + return; + } + ucontext->stage = UNDO_DECODE_STAGE_PAYLOAD_DATA; + /* fall through */ + case UNDO_DECODE_STAGE_PAYLOAD_DATA: + { + int len = ucontext->urec_payload.urec_payload_len; + + /* Allocate memory for the payload data if not already done. */ + if (len > 0) + { + if (ucontext->urec_payloaddata == NULL) + ucontext->urec_payloaddata = (char *) palloc0(len); + + /* Read payload data. */ + if (!ReadUndoBytes((char *) ucontext->urec_payloaddata, len, + &readptr, endptr, &ucontext->already_read, + &ucontext->partial_read)) + return; + } + ucontext->stage = UNDO_DECODE_STAGE_TUPLE_DATA; + /* fall through */ + } + case UNDO_DECODE_STAGE_TUPLE_DATA: + { + int len = ucontext->urec_payload.urec_tuple_len; + + /* Allocate memory for the tuple data if not already done. */ + if (len > 0) + { + if (ucontext->urec_tupledata == NULL) + ucontext->urec_tupledata = (char *) palloc0(len); + + /* Read tuple data. */ + if (!ReadUndoBytes((char *) ucontext->urec_tupledata, len, + &readptr, endptr, &ucontext->already_read, + &ucontext->partial_read)) + return; + } + + ucontext->stage = UNDO_DECODE_STAGE_DONE; + /* fall through */ + } + case UNDO_DECODE_STAGE_DONE: + /* Nothing to be done. */ + break; + default: + Assert(0); /* Invalid stage */ + } + + return; +} + +/* + * FinishUnpackUndo - Final step of unpacking the undo record. + * + * Copy the undo record data from the unpack undo context to the input unpacked + * undo record. + */ +void +FinishUnpackUndo(UnpackUndoContext *ucontext, UnpackedUndoRecord *uur) +{ + /* Copy undo record header. */ + uur->uur_rmid = ucontext->urec_hd.urec_rmid; + uur->uur_type = ucontext->urec_hd.urec_type; + uur->uur_info = ucontext->urec_hd.urec_info; + uur->uur_reloid = ucontext->urec_hd.urec_reloid; + uur->uur_xid = ucontext->urec_hd.urec_xid; + uur->uur_cid = ucontext->urec_hd.urec_cid; + + /* Copy undo record relation header if it presents. */ + if ((uur->uur_info & UREC_INFO_RELATION_DETAILS) != 0) + uur->uur_fork = ucontext->urec_rd.urec_fork; + + /* Copy undo record block header if it presents. */ + if ((uur->uur_info & UREC_INFO_BLOCK) != 0) + { + uur->uur_blkprev = ucontext->urec_blk.urec_blkprev; + uur->uur_block = ucontext->urec_blk.urec_block; + uur->uur_offset = ucontext->urec_blk.urec_offset; + } + + /* Copy undo record transaction header if it presents. */ + if ((uur->uur_info & UREC_INFO_TRANSACTION) != 0) + { + uur->uur_progress = ucontext->urec_txn.urec_progress; + uur->uur_xidepoch = ucontext->urec_txn.urec_xidepoch; + uur->uur_dbid = ucontext->urec_txn.urec_dbid; + uur->uur_prevurp = ucontext->urec_txn.urec_prevurp; + uur->uur_next = ucontext->urec_txn.urec_next; + } + + /* Copy undo record payload header and data if it presents. */ + if ((uur->uur_info & UREC_INFO_PAYLOAD) != 0) + { + uur->uur_payload.len = ucontext->urec_payload.urec_payload_len; + uur->uur_tuple.len = ucontext->urec_payload.urec_tuple_len; + /* Read payload data if its length is not 0. */ + if (uur->uur_payload.len != 0) + uur->uur_payload.data = ucontext->urec_payloaddata; + + /* Read tuple data if its length is not 0. */ + if (uur->uur_tuple.len != 0) + uur->uur_tuple.data = ucontext->urec_tupledata; + } +} + +/* + * Read undo bytes into a particular destination, + * + * 'destptr' points to the source data, and 'readlen' is the length of + * that data to be read in bytes. + * + * 'readptr' points to the read point for these bytes, and is updated + * for how much we read. The read point must not pass 'endptr', which + * represents the end of the buffer from which we are reading. + * + * 'partial_read' is a pointer to the count of previous partial read bytes + * + * 'total_bytes_read' points to the count of all previously-read bytes, + * and must likewise be updated for the bytes we read. + * + * nocopy if this flag is set true then it will just skip the readlen + * size in undo but it will not copy into the buffer. + * + * The return value is false if we ran out of space before read all + * the bytes, and otherwise true. + */ +static bool +ReadUndoBytes(char *destptr, int readlen, char **readptr, char *endptr, + int *total_bytes_read, int *partial_read) +{ + int can_read; + int remaining; + + /* Compute number of bytes we can read. */ + remaining = readlen - *partial_read; + can_read = Min(remaining, endptr - *readptr); + + /* Bail out if no bytes can be read. */ + if (can_read == 0) + return false; + + /* Copy the bytes we can read. */ + memcpy(destptr + *partial_read, *readptr, can_read); + + /* Update bookkeeping information. */ + *readptr += can_read; + *total_bytes_read += can_read; + + /* Could not read whole data so set the partial_read. */ + if (can_read < remaining) + { + *partial_read += can_read; + return false; + } + + /* Return true only if we wrote the whole thing. */ + *partial_read = 0; + + return true; +} + +/* + * Set uur_info for an UnpackedUndoRecord appropriately based on which + * other fields are set. + */ +void +UndoRecordSetInfo(UnpackedUndoRecord *uur) +{ + if (uur->uur_fork != MAIN_FORKNUM) + uur->uur_info |= UREC_INFO_RELATION_DETAILS; + if (uur->uur_block != InvalidBlockNumber) + uur->uur_info |= UREC_INFO_BLOCK; + if (uur->uur_next != InvalidUndoRecPtr) + uur->uur_info |= UREC_INFO_TRANSACTION; + if (uur->uur_payload.len || uur->uur_tuple.len) + uur->uur_info |= UREC_INFO_PAYLOAD; +} diff --git a/src/include/access/transam.h b/src/include/access/transam.h index 7966a9e..592c338 100644 --- a/src/include/access/transam.h +++ b/src/include/access/transam.h @@ -47,6 +47,7 @@ #define EpochFromFullTransactionId(x) ((uint32) ((x).value >> 32)) #define XidFromFullTransactionId(x) ((uint32) (x).value) #define U64FromFullTransactionId(x) ((x).value) +#define FullTransactionIdEquals(a, b) ((a).value == (b).value) #define FullTransactionIdPrecedes(a, b) ((a).value < (b).value) #define FullTransactionIdIsValid(x) TransactionIdIsValid(XidFromFullTransactionId(x)) #define InvalidFullTransactionId FullTransactionIdFromEpochAndXid(0, InvalidTransactionId) diff --git a/src/include/access/undoinsert.h b/src/include/access/undoinsert.h new file mode 100644 index 0000000..d50085e --- /dev/null +++ b/src/include/access/undoinsert.h @@ -0,0 +1,64 @@ +/*------------------------------------------------------------------------- + * + * undoinsert.h + * entry points for inserting undo records + * + * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/access/undoinsert.h + * + *------------------------------------------------------------------------- + */ +#ifndef UNDOINSERT_H +#define UNDOINSERT_H + +#include "access/undolog.h" +#include "access/undorecord.h" +#include "access/xlogdefs.h" +#include "catalog/pg_class.h" + +/* undo record information */ +typedef struct UndoRecInfo +{ + int index; /* Index of the element. */ + UndoRecPtr urp; /* undo recptr (undo record location). */ + UnpackedUndoRecord *uur; /* actual undo record. */ +} UndoRecInfo; + +/* + * Typedef for callback function for UndoFetchRecord. + * + * This checks whether an undorecord satisfies the given conditions. + */ +typedef bool (*SatisfyUndoRecordCallback) (UnpackedUndoRecord *urec, + BlockNumber blkno, + OffsetNumber offset, + TransactionId xid); + +extern UndoRecPtr PrepareUndoInsert(UnpackedUndoRecord *, FullTransactionId fxid, + UndoPersistence upersistence, + XLogReaderState *xlog_record); +extern void InsertPreparedUndo(void); + +extern void RegisterUndoLogBuffers(uint8 first_block_id); +extern void UndoLogBuffersSetLSN(XLogRecPtr recptr); +extern void UnlockReleaseUndoBuffers(void); + +extern UnpackedUndoRecord *UndoFetchRecord(UndoRecPtr urp, + BlockNumber blkno, OffsetNumber offset, + TransactionId xid, UndoRecPtr *urec_ptr_out, + SatisfyUndoRecordCallback callback); +extern UndoRecInfo *UndoRecordBulkFetch(UndoRecPtr *from_urecptr, + UndoRecPtr to_urecptr, + int undo_apply_size, int *nrecords, + bool one_page); +extern void UndoRecordRelease(UnpackedUndoRecord *urec); +extern void UndoRecordSetPrevUndoLen(uint16 len); +extern void UndoSetPrepareSize(int nrecords); +extern UndoRecPtr UndoGetPrevUndoRecptr(UndoRecPtr urp, UndoRecPtr prevurp, + Buffer buffer, + UndoPersistence upersistence); +extern void AtAbort_ResetUndoBuffers(void); + +#endif /* UNDOINSERT_H */ diff --git a/src/include/access/undorecord.h b/src/include/access/undorecord.h new file mode 100644 index 0000000..008b02d --- /dev/null +++ b/src/include/access/undorecord.h @@ -0,0 +1,276 @@ +/*------------------------------------------------------------------------- + * + * undorecord.h + * encode and decode undo records + * + * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/access/undorecord.h + * + *------------------------------------------------------------------------- + */ +#ifndef UNDORECORD_H +#define UNDORECORD_H + +#include "access/undolog.h" +#include "lib/stringinfo.h" +#include "storage/block.h" +#include "storage/bufpage.h" +#include "storage/buf.h" +#include "storage/off.h" + + +/* + * Every undo record begins with an UndoRecordHeader structure, which is + * followed by the additional structures indicated by the contents of + * urec_info. All structures are packed into the alignment without padding + * bytes, and the undo record itself need not be aligned either, so care + * must be taken when reading the header. + */ +typedef struct UndoRecordHeader +{ + RmgrId urec_rmid; /* RMGR [XXX:TODO: this creates an alignment + * hole?] */ + uint8 urec_type; /* record type code */ + uint8 urec_info; /* flag bits */ + Oid urec_reloid; /* relation OID */ + + /* + * Transaction id that has modified the tuple for which this undo record + * is written. We use this to skip the undo records. See comments atop + * function UndoFetchRecord. + */ + TransactionId urec_xid; /* Transaction id */ + CommandId urec_cid; /* command id */ +} UndoRecordHeader; + +#define SizeOfUndoRecordHeader \ + (offsetof(UndoRecordHeader, urec_cid) + sizeof(CommandId)) + +/* + * If UREC_INFO_RELATION_DETAILS is set, an UndoRecordRelationDetails structure + * follows. + * + * If UREC_INFO_BLOCK is set, an UndoRecordBlock structure follows. + * + * If UREC_INFO_TRANSACTION is set, an UndoRecordTransaction structure + * follows. + * + * If UREC_INFO_PAYLOAD is set, an UndoRecordPayload structure follows. + * + * When (as will often be the case) multiple structures are present, they + * appear in the same order in which the constants are defined here. That is, + * UndoRecordRelationDetails appears first. + */ +#define UREC_INFO_RELATION_DETAILS 0x01 +#define UREC_INFO_BLOCK 0x02 +#define UREC_INFO_PAYLOAD 0x04 +#define UREC_INFO_TRANSACTION 0x08 + +/* + * Additional information about a relation to which this record pertains, + * namely the fork number. If the fork number is MAIN_FORKNUM, this structure + * can (and should) be omitted. + */ +typedef struct UndoRecordRelationDetails +{ + ForkNumber urec_fork; /* fork number */ +} UndoRecordRelationDetails; + +#define SizeOfUndoRecordRelationDetails \ + (offsetof(UndoRecordRelationDetails, urec_fork) + sizeof(uint8)) + +/* + * Identifying information for a block to which this record pertains, and + * a pointer to the previous record for the same block. + */ +typedef struct UndoRecordBlock +{ + UndoRecPtr urec_blkprev; /* byte offset of previous undo for block */ + BlockNumber urec_block; /* block number */ + OffsetNumber urec_offset; /* offset number */ +} UndoRecordBlock; + +#define SizeOfUndoRecordBlock \ + (offsetof(UndoRecordBlock, urec_offset) + sizeof(OffsetNumber)) + +/* + * Identifying information for a transaction to which this undo belongs. This + * also stores the dbid and the progress of the undo apply during rollback. + */ +typedef struct UndoRecordTransaction +{ + /* + * This indicates undo action apply progress, 0 means not started, 1 means + * completed. In future, it can also be used to show the progress of how + * much undo has been applied so far with some formula. + */ + uint32 urec_progress; + uint32 urec_xidepoch; /* epoch of the current transaction */ + Oid urec_dbid; /* database id */ + + /* + * Transaction's previous undo record pointer when a transaction spans + * across undo logs. The first undo record in the new log stores the + * previous undo record pointer in the previous log as we can't calculate + * that directly using prevlen during rollback. + */ + UndoRecPtr urec_prevurp; + UndoRecPtr urec_next; /* urec pointer of the next transaction */ +} UndoRecordTransaction; + +#define SizeOfUrecNext (sizeof(UndoRecPtr)) +#define SizeOfUndoRecordTransaction \ + (offsetof(UndoRecordTransaction, urec_next) + SizeOfUrecNext) + +/* + * Information about the amount of payload data and tuple data present + * in this record. The payload bytes immediately follow the structures + * specified by flag bits in urec_info, and the tuple bytes follow the + * payload bytes. + */ +typedef struct UndoRecordPayload +{ + uint16 urec_payload_len; /* # of payload bytes */ + uint16 urec_tuple_len; /* # of tuple bytes */ +} UndoRecordPayload; + +#define SizeOfUndoRecordPayload \ + (offsetof(UndoRecordPayload, urec_tuple_len) + sizeof(uint16)) + +/* + * Information that can be used to create an undo record or that can be + * extracted from one previously created. The raw undo record format is + * difficult to manage, so this structure provides a convenient intermediate + * form that is easier for callers to manage. + * + * When creating an undo record from an UnpackedUndoRecord, caller should + * set uur_info to 0. It will be initialized by the first call to + * UndoRecordSetInfo or InsertUndoRecord. We do set it in + * UndoRecordAllocate for transaction specific header information. + * + * When an undo record is decoded into an UnpackedUndoRecord, all fields + * will be initialized, but those for which no information is available + * will be set to invalid or default values, as appropriate. + */ +typedef struct UnpackedUndoRecord +{ + RmgrId uur_rmid; /* rmgr ID */ + uint8 uur_type; /* record type code */ + uint8 uur_info; /* flag bits */ + Oid uur_reloid; /* relation OID */ + TransactionId uur_xid; /* transaction id */ + CommandId uur_cid; /* command id */ + ForkNumber uur_fork; /* fork number */ + UndoRecPtr uur_blkprev; /* byte offset of previous undo for block */ + BlockNumber uur_block; /* block number */ + OffsetNumber uur_offset; /* offset number */ + uint32 uur_xidepoch; /* epoch of the inserting transaction. */ + UndoRecPtr uur_prevurp; /* urec pointer to the previous record in the + * different log */ + UndoRecPtr uur_next; /* urec pointer of the next transaction */ + Oid uur_dbid; /* database id */ + + /* undo applying progress, see detail comment in UndoRecordTransaction */ + uint32 uur_progress; + StringInfoData uur_payload; /* payload bytes */ + StringInfoData uur_tuple; /* tuple bytes */ +} UnpackedUndoRecord; + +typedef enum UndoDecodeStage +{ + UNDO_DECODE_STAGE_HEADER, /* We have not yet decoded even the record + * header; we need to do that next. */ + UNDO_DECODE_STAGE_RELATION_DETAILS, /* The next thing to be decoded is the + * relation details, if present. */ + UNDO_DECODE_STAGE_BLOCK, /* The next thing to be decoded is the block + * details, if present. */ + UNDO_DECODE_STAGE_TRANSACTION, /* The next thing to be decoded is the + * transaction details, if present. */ + UNDO_DECODE_STAGE_PAYLOAD, /* The next thing to be decoded is the payload + * details, if present */ + UNDO_DECODE_STAGE_PAYLOAD_DATA, /* The next thing to be decoded is the + * payload data */ + UNDO_DECODE_STAGE_TUPLE_DATA, /* The next thing to be decoded is the + * tuple data */ + UNDO_DECODE_STAGE_DONE /* Decoding is complete */ +} UndoDecodeStage; + +/* + * Undo record context for reading undo record from the buffers. This will + * holds intermediate state of undo record read so far. + */ +typedef struct UnpackUndoContext +{ + UndoRecordHeader urec_hd; /* Main header */ + UndoRecordRelationDetails urec_rd; /* Relation header */ + UndoRecordBlock urec_blk; /* Block header */ + UndoRecordTransaction urec_txn; /* Transaction header */ + UndoRecordPayload urec_payload; /* Payload data */ + char *urec_payloaddata; + char *urec_tupledata; + int already_read; /* Number of bytes read so far */ + int partial_read; /* Number of partial byte read */ + UndoDecodeStage stage; /* Decoding stage */ +} UnpackUndoContext; + +typedef enum UndoInsertStage +{ + UNDO_INSERT_STAGE_HEADER, /* We have not yet inserted even the record + * header; we need to do that next. */ + UNDO_INSERT_STAGE_RELATION_DETAILS, /* The next thing to be inserted is + * the relation details, if present. */ + UNDO_INSERT_STAGE_BLOCK, /* The next thing to be inserted is the block + * details, if present. */ + UNDO_INSERT_STAGE_TRANSACTION, /* The next thing to be inserted is the + * transaction details, if present. */ + UNDO_INSERT_STAGE_PAYLOAD, /* The next thing to be inserted is the + * payload details, if present */ + UNDO_INSERT_STAGE_PAYLOAD_DATA, /* The next thing to be inserted is the + * payload data */ + UNDO_INSERT_STAGE_TUPLE_DATA, /* The next thing to be inserted is the + * tuple data */ + UNDO_INSERT_STAGE_UNDO_LENGTH, /* The next thing to be inserted is the + * undo length. */ + UNDO_INSERT_STAGE_DONE /* inserting is complete */ +} UndoInsertStage; + +/* + * Undo record context for inserting undo record to the buffers. This will + * holds intermediate state of undo record written so far. + */ +typedef struct InsertUndoContext +{ + UndoRecordHeader urec_hd; /* Main header */ + UndoRecordRelationDetails urec_rd; /* Relation header */ + UndoRecordBlock urec_blk; /* Block header */ + UndoRecordTransaction urec_txn; /* Transaction header */ + UndoRecordPayload urec_payload; /* Payload data */ + char *urec_payloaddata; + char *urec_tupledata; + uint16 undo_len; /* Length of the undo record. */ + int already_written; /* Number of bytes written so far */ + int partial_write; /* Number of partial bytes write */ + UndoInsertStage stage; /* inserting stage */ +} InsertUndoContext; + +extern void UndoRecordSetInfo(UnpackedUndoRecord *uur); +extern Size UndoRecordExpectedSize(UnpackedUndoRecord *uur); +extern Size UnpackedUndoRecordSize(UnpackedUndoRecord *uur); +extern bool InsertUndoRecord(UnpackedUndoRecord *uur, Page page, + int starting_byte, int *already_written, + int remaining_bytes, uint16 undo_len, bool header_only); +extern void BeginUnpackUndo(UnpackUndoContext *ucontext); +extern void UnpackUndoData(UnpackUndoContext *ucontext, Page page, + int starting_byte); +extern void FinishUnpackUndo(UnpackUndoContext *ucontext, + UnpackedUndoRecord *uur); +extern void BeginInsertUndo(InsertUndoContext *ucontext, + UnpackedUndoRecord *uur); +extern void InsertUndoData(InsertUndoContext *ucontext, Page page, + int starting_byte); +extern void SkipInsertingUndoData(InsertUndoContext *ucontext, + int starting_byte); + +#endif /* UNDORECORD_H */ diff --git a/src/include/access/xact.h b/src/include/access/xact.h index d787f92..61df235 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -15,6 +15,7 @@ #define XACT_H #include "access/transam.h" +#include "access/undolog.h" #include "access/xlogreader.h" #include "lib/stringinfo.h" #include "nodes/pg_list.h" -- 1.8.3.1