diff --git a/src/backend/access/transam/clog.c b/src/backend/access/transam/clog.c index ea83655..007317a 100644 --- a/src/backend/access/transam/clog.c +++ b/src/backend/access/transam/clog.c @@ -40,6 +40,7 @@ #include "access/xlogutils.h" #include "miscadmin.h" #include "pg_trace.h" +#include "storage/proc.h" /* * Defines for CLOG page sizes. A page is the same BLCKSZ as is used @@ -91,6 +92,10 @@ static void TransactionIdSetStatusBit(TransactionId xid, XidStatus status, XLogRecPtr lsn, int slotno); static void set_status_by_pages(int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn); +static void TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status, + XLogRecPtr lsn, int pageno); +static void TransactionIdSetPageStatusInternal(TransactionId xid, XidStatus status, + XLogRecPtr lsn, int pageno); /* @@ -248,6 +253,14 @@ set_status_by_pages(int nsubxids, TransactionId *subxids, * Record the final state of transaction entries in the commit log for * all entries on a single page. Atomic only on this page. * + * Group the status updation for transactions that don't have + * subtransactions. This improves the efficiency of the transaction + * status updation by reducing the number of lock acquirations required + * for it. To achieve the group transaction status updation, we need to + * populate the transaction status related information in shared memory + * and doing it for sub-transactions would need a big chunk of shared + * memory, so we are not doing this optimization for such cases. + * * Otherwise API is same as TransactionIdSetTreeStatus() */ static void @@ -262,7 +275,92 @@ TransactionIdSetPageStatus(TransactionId xid, int nsubxids, status == TRANSACTION_STATUS_ABORTED || (status == TRANSACTION_STATUS_SUB_COMMITTED && !TransactionIdIsValid(xid))); - LWLockAcquire(CLogControlLock, LW_EXCLUSIVE); + if (nsubxids > 0) + { + LWLockAcquire(CLogControlLock, LW_EXCLUSIVE); + + /* + * If we're doing an async commit (ie, lsn is valid), then we must + * wait for any active write on the page slot to complete. Otherwise + * our update could reach disk in that write, which will not do since + * we mustn't let it reach disk until we've done the appropriate WAL + * flush. But when lsn is invalid, it's OK to scribble on a page while + * it is write-busy, since we don't care if the update reaches disk + * sooner than we think. + */ + slotno = SimpleLruReadPage(ClogCtl, pageno, XLogRecPtrIsInvalid(lsn), xid); + + /* + * Set the main transaction id, if any. + * + * If we update more than one xid on this page while it is being + * written out, we might find that some of the bits go to disk and + * others don't. If we are updating commits on the page with the + * top-level xid that could break atomicity, so we subcommit the + * subxids first before we mark the top-level commit. + */ + if (TransactionIdIsValid(xid)) + { + /* Subtransactions first, if needed ... */ + if (status == TRANSACTION_STATUS_COMMITTED) + { + for (i = 0; i < nsubxids; i++) + { + Assert(ClogCtl->shared->page_number[slotno] == TransactionIdToPage(subxids[i])); + TransactionIdSetStatusBit(subxids[i], + TRANSACTION_STATUS_SUB_COMMITTED, + lsn, slotno); + } + } + + /* ... then the main transaction */ + TransactionIdSetStatusBit(xid, status, lsn, slotno); + } + + /* Set the subtransactions */ + for (i = 0; i < nsubxids; i++) + { + Assert(ClogCtl->shared->page_number[slotno] == TransactionIdToPage(subxids[i])); + TransactionIdSetStatusBit(subxids[i], status, lsn, slotno); + } + + ClogCtl->shared->page_dirty[slotno] = true; + + LWLockRelease(CLogControlLock); + } + else + { + /* + * If we can immediately acquire CLogControlLock, we update the status + * of our own XID and release the lock. If not, use group XID status + * updation to improve efficiency. + */ + if (LWLockConditionalAcquire(CLogControlLock, LW_EXCLUSIVE)) + { + TransactionIdSetPageStatusInternal(xid, status, lsn, pageno); + LWLockRelease(CLogControlLock); + } + else + TransactionGroupUpdateXidStatus(xid, status, lsn, pageno); + } +} + +/* + * Record the final state of transaction entry in the commit log + * + * We don't do any locking here; caller must handle that. + */ +static void +TransactionIdSetPageStatusInternal(TransactionId xid, XidStatus status, + XLogRecPtr lsn, int pageno) +{ + int slotno; + + /* We should definitely have an XID whose status needs to be updated. */ + Assert(TransactionIdIsValid(xid)); + + Assert(status == TRANSACTION_STATUS_COMMITTED || + status == TRANSACTION_STATUS_ABORTED); /* * If we're doing an async commit (ie, lsn is valid), then we must wait @@ -276,42 +374,141 @@ TransactionIdSetPageStatus(TransactionId xid, int nsubxids, slotno = SimpleLruReadPage(ClogCtl, pageno, XLogRecPtrIsInvalid(lsn), xid); /* - * Set the main transaction id, if any. - * - * If we update more than one xid on this page while it is being written - * out, we might find that some of the bits go to disk and others don't. - * If we are updating commits on the page with the top-level xid that - * could break atomicity, so we subcommit the subxids first before we mark - * the top-level commit. + * Update the status of transaction in clog. */ - if (TransactionIdIsValid(xid)) + TransactionIdSetStatusBit(xid, status, lsn, slotno); + + ClogCtl->shared->page_dirty[slotno] = true; +} + +/* + * When we cannot immediately acquire CLogControlLock in exclusive mode at + * commit time, add ourselves to a list of processes that need their XIDs + * status updation. The first process to add itself to the list will acquire + * CLogControlLock in exclusive mode and perform TransactionIdSetPageStatusInternal + * on behalf of all group members. This avoids a great deal of contention + * around CLogControlLock when many processes are trying to commit at once, + * since the lock need not be repeatedly handed off from one committing + * process to the next. + */ +static void +TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status, + XLogRecPtr lsn, int pageno) +{ + volatile PROC_HDR *procglobal = ProcGlobal; + PGPROC *proc = MyProc; + uint32 nextidx; + uint32 wakeidx; + int extraWaits = -1; + + /* We should definitely have an XID whose status needs to be updated. */ + Assert(TransactionIdIsValid(xid)); + + /* + * Add ourselves to the list of processes needing a group XID status + * updation. + */ + proc->updateXidStatus = true; + proc->memberXid = xid; + proc->memberXidstatus = status; + proc->clogPage = pageno; + proc->asyncCommitLsn = lsn; + while (true) { - /* Subtransactions first, if needed ... */ - if (status == TRANSACTION_STATUS_COMMITTED) + nextidx = pg_atomic_read_u32(&procglobal->firstupdateXidStatusElem); + pg_atomic_write_u32(&proc->nextupdateXidStatusElem, nextidx); + + if (pg_atomic_compare_exchange_u32(&procglobal->firstupdateXidStatusElem, + &nextidx, + (uint32) proc->pgprocno)) + break; + } + + /* + * If the list was not empty, the leader will update the status of our + * XID. It is impossible to have followers without a leader because the + * first process that has added itself to the list will always have + * nextidx as INVALID_PGPROCNO. + */ + if (nextidx != INVALID_PGPROCNO) + { + /* Sleep until the leader updates our XID status. */ + for (;;) { - for (i = 0; i < nsubxids; i++) - { - Assert(ClogCtl->shared->page_number[slotno] == TransactionIdToPage(subxids[i])); - TransactionIdSetStatusBit(subxids[i], - TRANSACTION_STATUS_SUB_COMMITTED, - lsn, slotno); - } + /* acts as a read barrier */ + PGSemaphoreLock(&proc->sem); + if (!proc->updateXidStatus) + break; + extraWaits++; } - /* ... then the main transaction */ - TransactionIdSetStatusBit(xid, status, lsn, slotno); + Assert(pg_atomic_read_u32(&proc->nextupdateXidStatusElem) == INVALID_PGPROCNO); + + /* Fix semaphore count for any absorbed wakeups */ + while (extraWaits-- > 0) + PGSemaphoreUnlock(&proc->sem); + return; } - /* Set the subtransactions */ - for (i = 0; i < nsubxids; i++) + /* We are the leader. Acquire the lock on behalf of everyone. */ + LWLockAcquire(CLogControlLock, LW_EXCLUSIVE); + + /* + * Now that we've got the lock, clear the list of processes waiting for + * group XID status updation, saving a pointer to the head of the list. + * Trying to pop elements one at a time could lead to an ABA problem. + */ + while (true) { - Assert(ClogCtl->shared->page_number[slotno] == TransactionIdToPage(subxids[i])); - TransactionIdSetStatusBit(subxids[i], status, lsn, slotno); + nextidx = pg_atomic_read_u32(&procglobal->firstupdateXidStatusElem); + if (pg_atomic_compare_exchange_u32(&procglobal->firstupdateXidStatusElem, + &nextidx, + INVALID_PGPROCNO)) + break; } - ClogCtl->shared->page_dirty[slotno] = true; + /* Remember head of list so we can perform wakeups after dropping lock. */ + wakeidx = nextidx; + + /* Walk the list and update the status of all XIDs. */ + while (nextidx != INVALID_PGPROCNO) + { + PGPROC *proc = &ProcGlobal->allProcs[nextidx]; + + TransactionIdSetPageStatusInternal(proc->memberXid, + proc->memberXidstatus, + proc->asyncCommitLsn, + proc->clogPage); + /* Move to next proc in list. */ + nextidx = pg_atomic_read_u32(&proc->nextupdateXidStatusElem); + } + + /* We're done with the lock now. */ LWLockRelease(CLogControlLock); + + /* + * Now that we've released the lock, go back and wake everybody up. We + * don't do this under the lock so as to keep lock hold times to a + * minimum. The system calls we need to perform to wake other processes + * up are probably much slower than the simple memory writes we did while + * holding the lock. + */ + while (wakeidx != INVALID_PGPROCNO) + { + PGPROC *proc = &ProcGlobal->allProcs[wakeidx]; + + wakeidx = pg_atomic_read_u32(&proc->nextupdateXidStatusElem); + pg_atomic_write_u32(&proc->nextupdateXidStatusElem, INVALID_PGPROCNO); + + /* ensure all previous writes are visible before follower continues. */ + pg_write_barrier(); + + proc->updateXidStatus = false; + + if (proc != MyProc) + PGSemaphoreUnlock(&proc->sem); + } } /* diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index bb10c1b..e1c71a6 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -182,6 +182,7 @@ InitProcGlobal(void) ProcGlobal->walwriterLatch = NULL; ProcGlobal->checkpointerLatch = NULL; pg_atomic_init_u32(&ProcGlobal->firstClearXidElem, INVALID_PGPROCNO); + pg_atomic_init_u32(&ProcGlobal->firstupdateXidStatusElem, INVALID_PGPROCNO); /* * Create and initialize all the PGPROC structures we'll need. There are @@ -397,6 +398,14 @@ InitProcess(void) MyProc->backendLatestXid = InvalidTransactionId; pg_atomic_init_u32(&MyProc->nextClearXidElem, INVALID_PGPROCNO); + /* Initialize fields for group transaction status updation. */ + MyProc->updateXidStatus = false; + MyProc->memberXid = InvalidTransactionId; + MyProc->memberXidstatus = TRANSACTION_STATUS_IN_PROGRESS; + MyProc->clogPage = -1; + MyProc->asyncCommitLsn = InvalidXLogRecPtr; + pg_atomic_init_u32(&MyProc->nextupdateXidStatusElem, INVALID_PGPROCNO); + /* * Acquire ownership of the PGPROC's latch, so that we can use WaitLatch * on it. That allows us to repoint the process latch, which so far diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 3d68017..2eddfe5 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -14,6 +14,7 @@ #ifndef _PROC_H_ #define _PROC_H_ +#include "access/clog.h" #include "access/xlogdefs.h" #include "lib/ilist.h" #include "storage/latch.h" @@ -146,6 +147,14 @@ struct PGPROC pg_atomic_uint32 nextClearXidElem; TransactionId backendLatestXid; + /* Support for group transaction status updation. */ + bool updateXidStatus; + pg_atomic_uint32 nextupdateXidStatusElem; + TransactionId memberXid; + XidStatus memberXidstatus; + int clogPage; + XLogRecPtr asyncCommitLsn; + /* Per-backend LWLock. Protects fields below. */ LWLock *backendLock; /* protects the fields below */ @@ -209,6 +218,8 @@ typedef struct PROC_HDR PGPROC *bgworkerFreeProcs; /* First pgproc waiting for group XID clear */ pg_atomic_uint32 firstClearXidElem; + /* First pgproc waiting for group transaction status update */ + pg_atomic_uint32 firstupdateXidStatusElem; /* WALWriter process's latch */ Latch *walwriterLatch; /* Checkpointer process's latch */