/*-------------------------------------------------------------------------
 *
 * multixact.c
 *		PostgreSQL multi-transaction-log manager
 *
 * The pg_multixact manager is a pg_clog-like manager that stores an array
 * of TransactionIds for each MultiXactId.  It is a fundamental part of the
 * shared-row lock implementation.  A share-locked tuple stores a
 * MultiXactId in its Xmax, and a transaction that needs to wait for the
 * tuple to be unlocked can sleep on the potentially-several TransactionIds
 * that compose the MultiXactId.
 *
 * We use two SLRU areas, one for storing the offsets on which the data
 * starts for each MultiXactId in the other one.  This trick allows us to
 * store variable length arrays of TransactionIds.
 *
 * This code is based on clog.c, but the robustness requirements are
 * completely different from pg_clog, because we only need to remember
 * pg_multixact information for currently-open transactions.  Thus, there is
 * no need to preserve data over a crash and restart.
 *
 * The only XLOG interaction we need to take care of is that sequential
 * MultiXactId generation must not overlap across a system crash.  Thus we
 * log groups of MultiXactIds acquisition in the same fashion we do for Oids
 * (see XLogPutNextMultiXactId.)
 *
 * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * $PostgreSQL$
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

#include "access/slru.h"
#include "access/multixact.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "utils/memutils.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/sinval.h"


/*
 * Defines for MultiXactOffset page sizes.  A page is the same BLCKSZ as is
 * used everywhere else in Postgres.
 *
 * Note: because both uint32 and TransactionIds are 32 bits and wrap around at
 * 0xFFFFFFFF, MultiXact page numbering also wraps around at
 * 0xFFFFFFFF/MULTIXACT_*_PER_PAGE, and segment numbering at
 * 0xFFFFFFFF/MULTIXACT_*_PER_PAGE/SLRU_SEGMENTS_PER_PAGE.  We need take no
 * explicit notice of that fact in this module, except when comparing segment
 * and page numbers in TruncateMultiXactOffset and TruncateMultiXactMember
 * (see MultiXact{Offset,Member}PagePrecedes).
 */

/* We need four bytes per offset and another four bytes per member */
#define MULTIXACT_OFFSETS_PER_PAGE (BLCKSZ / sizeof(uint32))
#define MULTIXACT_MEMBERS_PER_PAGE (BLCKSZ / sizeof(TransactionId))

#define MultiXactIdToOffsetPage(xid) \
	((xid) / (uint32) MULTIXACT_OFFSETS_PER_PAGE)
#define MultiXactIdToOffsetEntry(xid) \
	((xid) % (uint32) MULTIXACT_OFFSETS_PER_PAGE)

#define MultiXactIdToMemberPage(xid) \
	((xid) / (TransactionId) MULTIXACT_MEMBERS_PER_PAGE)
#define MultiXactIdToMemberEntry(xid) \
	((xid) % (TransactionId) MULTIXACT_MEMBERS_PER_PAGE)

/* Arbitrary number of MultiXactId to allocate at each XLog call */
#define MXACT_PREFETCH	8192

/*
 * Links to shared-memory data structures for MultiXact control
 */
static SlruCtlData MultiXactOffsetCtlData;
static SlruCtlData MultiXactMemberCtlData;

#define MultiXactOffsetCtl  (&MultiXactOffsetCtlData)
#define MultiXactMemberCtl  (&MultiXactMemberCtlData)

/*
 * Definitions for the backend-local MultiXactId cache.
 *
 * We use this cache to store known MultiXacts, so we don't need to go to
 * SLRU areas everytime.  We also use this when we need to create MultiXacts
 * for a TransactionId set that was already used.
 *
 * The cache lasts for the duration of a single transaction, the rationale
 * for this being that most entries will contain our own TransactionId and
 * so they will be invalid by the time our next transaction starts.
 *
 * We allocate the cache entries on a memory context that is reset at
 * transaction end, so we don't need retail freeing of entries.
 */
typedef struct mXactCacheEnt
{
	MultiXactId		multi;
	int				nxids;
	TransactionId  *xids;
	struct mXactCacheEnt *next;
} mXactCacheEnt;

static mXactCacheEnt   *MXactCache = NULL;
static MemoryContext	MXactContext = NULL;


/* internal MultiXactId management */
static MultiXactId CreateMultiXactId(int nxids, TransactionId *xids);
static int GetMultiXactIdMembers(MultiXactId multi, TransactionId **xids);
static MultiXactId GetNewMultiXactId(int nxids, uint32 *offset);
static int xidComparator(const void *arg1, const void *arg2);
#ifdef MULTIXACT_DEBUG
static char *mxid_to_string(MultiXactId multi, int nxids, TransactionId *xids);
#endif

/* MultiXact cache management */
static MultiXactId mXactCacheGetBySet(int nxids, TransactionId *xids);
static int mXactCacheGetByMulti(MultiXactId multi, TransactionId **xids);
static void mXactCachePut(MultiXactId multi, int nxids, TransactionId *xids);
static void mXactCacheClear(void);

/* management of SLRU infrastructure */
static int	ZeroMultiXactOffsetPage(int pageno);
static int	ZeroMultiXactMemberPage(int pageno);
static bool MultiXactOffsetPagePrecedes(int page1, int page2);
static bool MultiXactMemberPagePrecedes(int page1, int page2);
static bool MultiXactOffsetPrecedes(uint32 offset1, uint32 offset2);
static void ExtendMultiXactOffset(MultiXactId multi);
static void ExtendMultiXactMember(uint32 offset);
static void TruncateMultiXactMember(uint32 offset);


/*
 * MultiXactIdExpand
 * 		Add a TransactionId to a possibly-already-existing MultiXactId.
 *
 * Get the members of the given MultiXactId, add the passed TransactionId to
 * them, and create a new MultiXactId.  If the passed MultiXactId is Invalid,
 * create a singleton MultiXactId.
 *
 * If the TransactionId is already a member of the passed MultiXactId, return
 * it.
 */
MultiXactId
MultiXactIdExpand(MultiXactId multi, TransactionId xid)
{
	MultiXactId		newMulti;
	TransactionId  *members;
	int				nmembers;
	int				i;

	/* singleton case */
	if (!MultiXactIdIsValid(multi))
		return CreateMultiXactId(1, &xid);

	nmembers = GetMultiXactIdMembers(multi, &members);

	for (i = 0; i < nmembers; i++)
	{
		if (TransactionIdEquals(members[i], xid))
		{
			pfree(members);
			return multi;
		}
	}

	/*
	 * FIXME -- be smarter here.
	 *
	 * For one, it would be good to check whether the members are still
	 * running.  Maybe change the API of GetMembers so that "members" has
	 * room for the new Xid, saving us a repalloc.
	 */
	members = repalloc(members, sizeof(TransactionId) * (nmembers + 1));
	members[nmembers] = xid;
	
	newMulti = CreateMultiXactId(nmembers + 1, members);

	pfree(members);

	return newMulti;
}

/*
 * MultiXactIdIsRunning
 * 		Returns whether a MultiXactId is "running".
 *
 * We return true if at least one member of the given MultiXactId is still
 * running.
 */
bool
MultiXactIdIsRunning(MultiXactId multi)
{
	TransactionId *members;
	TransactionId  myXid;
	int		nmembers;
	int		i;

	nmembers = GetMultiXactIdMembers(multi, &members);
	
	if (nmembers == 0)
		return false;

	/* checking for myself is cheap */
	myXid = GetTopTransactionId();

	for (i = 0; i < nmembers; i++)
	{
		if (TransactionIdEquals(members[i], myXid))
		{
			pfree(members);
			return true;
		}
	}

	/*
	 * FIXME --- maybe this could be made better by having a special entry
	 * point in sinval.c, walking the PGPROC array only once for the whole
	 * array, and maybe considering only top transaction Ids and not
	 * subtransactions.
	 *
	 * I think it would be much better to add a new API to sinval.c so that
	 * it could check which of a set of TransactionIds are still running,
	 * with a single loop around PGPROCs, limited to only top TransactionId
	 * (no subtransactions).
	 */
	for (i = 0; i < nmembers; i++)
	{
		if (TransactionIdIsInProgress(members[i]))
		{
			pfree(members);
			return true;
		}
	}

	pfree(members);
	return false;
}

/*
 * MultiXactIdWait
 *		Sleep on a MultiXactId.
 *
 * We do this by sleeping on each member using XactLockTableWait.  Note that
 * by the time we finish sleeping, someone else may have changed the Xmax of
 * the containing tuple, so the caller needs to iterate on us.
 */
void
MultiXactIdWait(MultiXactId multi)
{
	TransactionId *members;
	int		nmembers;

	nmembers = GetMultiXactIdMembers(multi, &members);

	if (nmembers > 0)
	{
		int		i;

		for (i = 0; i < nmembers; i++)
			XactLockTableWait(members[i]);

		pfree(members);
	}

	return;
}

/*
 * CreateMultiXactId
 * 		Make a new MultiXactId
 *
 * Make SLRU and cache entries for a new MultiXactId, recording the given
 * TransactionIds as members.  Returns the newly created MultiXactId.
 */
static MultiXactId
CreateMultiXactId(int nxids, TransactionId *xids)
{
	MultiXactId	multi;
	int			pageno;
	int			entryno;
	int			slotno;
	uint32	   *offptr;
	uint32		offset;
	int			i;

	/* See if the Xid set is already on the cache */
	if (MultiXactIdIsValid(multi = mXactCacheGetBySet(nxids, xids)))
		return multi;

	multi = GetNewMultiXactId(nxids, &offset);

	LWLockAcquire(MultiXactOffsetControlLock, LW_EXCLUSIVE);

	ExtendMultiXactOffset(multi);

	pageno = MultiXactIdToOffsetPage(multi);
	entryno = MultiXactIdToOffsetEntry(multi);

	/*
	 * FIXME -- we could enhance error reporting on SimpleLruReadPage;
	 * clearly InvalidTransactionId is inappropiate here.  Maybe pass
	 * a function pointer?
	 */
	slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, InvalidTransactionId);
	offptr = (uint32 *) MultiXactOffsetCtl->shared->page_buffer[slotno];
	offptr += entryno;
	*offptr = offset;

	MultiXactOffsetCtl->shared->page_status[slotno] = SLRU_PAGE_DIRTY;

	/* Exchange our lock */
	LWLockRelease(MultiXactOffsetControlLock);
	LWLockAcquire(MultiXactMemberControlLock, LW_EXCLUSIVE);

	for (i = 0; i < nxids; i++, offset++)
	{
		TransactionId *memberptr;

		ExtendMultiXactMember(offset);

		pageno = MultiXactIdToMemberPage(offset);
		entryno = MultiXactIdToMemberEntry(offset);

		slotno = SimpleLruReadPage(MultiXactMemberCtl, pageno,
								   InvalidTransactionId);

		memberptr = (TransactionId *)
			MultiXactMemberCtl->shared->page_buffer[slotno];
		memberptr += entryno;

		*memberptr = xids[i];
		MultiXactMemberCtl->shared->page_status[slotno] = SLRU_PAGE_DIRTY;
	}

	LWLockRelease(MultiXactMemberControlLock);

	/* Store the new MultiXactId in the cache */
	mXactCachePut(multi, nxids, xids);

	return multi;
}

/*
 * GetNewMultiXactId
 *		Get the next MultiXactId.
 *
 * Get the next MultiXactId, XLogging if needed.  Also, reserve space
 * in the "members" area right away so no concurrent transaction can take it
 * away from us.  The offset of the reserved space is returned in the offset
 * parameter.
 */
static MultiXactId
GetNewMultiXactId(int nxids, uint32 *offset)
{
	MultiXactId		result;

	LWLockAcquire(MultiXactGenLock, LW_EXCLUSIVE);

	if (ShmemVariableCache->nextMXact < FirstMultiXactId)
	{
		ShmemVariableCache->nextMXact = FirstMultiXactId;
		ShmemVariableCache->mXactCount = 0;
	}

	/* If we run out of logged for use multixacts then we must log more */
	if (ShmemVariableCache->mXactCount == 0)
	{
		XLogPutNextMultiXactId(ShmemVariableCache->nextMXact + MXACT_PREFETCH);
		ShmemVariableCache->mXactCount = MXACT_PREFETCH;
	}

	result = ShmemVariableCache->nextMXact;
	*offset = ShmemVariableCache->nextMXoffset;

	/*
	 * We don't care about MultiXactId wraparound; it will be handled by
	 * the next iteration.
	 */
	(ShmemVariableCache->nextMXact)++;
	(ShmemVariableCache->mXactCount)--;
	ShmemVariableCache->nextMXoffset += nxids;

	/* Set our "smallest MultiXactId assigned" if not done already */
	if (!MultiXactIdIsValid(MyProc->mxmin))
		MyProc->mxmin = result;

	/* Set the shared "smallest MultiXactId assigned" if not done already */
	if (!MultiXactIdIsValid(ShmemVariableCache->minMultiXact))
		ShmemVariableCache->minMultiXact = result;

	LWLockRelease(MultiXactGenLock);

	return result;
}

/*
 * GetMultiXactIdMembers
 * 		Returns the set of TransactionIds that make up a MultiXactId
 */
static int
GetMultiXactIdMembers(MultiXactId multi, TransactionId **xids)
{
	int			pageno;
	int			entryno;
	int			slotno;
	uint32	   *offptr;
	uint32		offset;
	int			length;
	int			i;
	MultiXactId	nextMXact;
	uint32		nextOffset;
	TransactionId *ptr;

	Assert(MultiXactIdIsValid(multi));

	/*
	 * We check known limits on MultiXact before resorting to the SLRU area.
	 * Rationale is:
	 *
	 * a) if ShmemVariableCache->minMultiXact is Invalid, then no
	 * transaction has used MultiXactIds yet, so we don't need to check.
	 *
	 * b) If the MultiXactId precedes ShmemVariableCache->minMultiXact, then
	 * all transactions that cared about that MultiXactId are gone, so no
	 * need to check.
	 */
	LWLockAcquire(MultiXactGenLock, LW_SHARED);

	/* Any valid MultiXactId must precede the next-to-assign MultiXactId */
	Assert(MultiXactIdPrecedes(multi, ShmemVariableCache->nextMXact));

	if (!MultiXactIdIsValid(ShmemVariableCache->minMultiXact) ||
		MultiXactIdPrecedes(multi, ShmemVariableCache->minMultiXact))
	{
		xids = NULL;
		LWLockRelease(MultiXactGenLock);
		return 0;
	}

	/*
	 * Before releasing the lock, save the shared-var-cached values, because
	 * they may contain the next MultiXactId to be given out.  We will need
	 * to know the offset if that's the case.
	 */
	nextMXact = ShmemVariableCache->nextMXact;
	nextOffset = ShmemVariableCache->nextMXoffset;

	LWLockRelease(MultiXactGenLock);

	/* See if the MultiXactId is in the local cache */
	length = mXactCacheGetByMulti(multi, xids);
	if (length > 0)
			return length;

	/* Get the offset at which we need to start reading MultiXactOffset */
	LWLockAcquire(MultiXactOffsetControlLock, LW_EXCLUSIVE);

	pageno = MultiXactIdToOffsetPage(multi);
	entryno = MultiXactIdToOffsetEntry(multi);

	slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, InvalidTransactionId);
	offptr = (uint32 *) MultiXactOffsetCtl->shared->page_buffer[slotno];
	offptr += entryno;
	offset = *offptr;

	/*
	 * How many members do we need to read?  If we are at the edge of the
	 * assigned MultiXactIds, consult ShmemVariableCache.  Else we
	 * need to check the MultiXactId following ours.
	 *
	 * Use the same increment rule as GetNewMultiXactId(), that is, don't
	 * handle wraparound explicitly.  Don't use "multi" beyond this point -- we
	 * will F it UBAR.
	 */
	if (nextMXact == ++multi)
		length = nextOffset - offset;
	else
	{
		/* handle wraparound */
		if (multi == InvalidMultiXactId)
			multi = FirstMultiXactId;

		pageno = MultiXactIdToOffsetPage(multi);
		entryno = MultiXactIdToOffsetEntry(multi);

		slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno,
								   InvalidTransactionId);
		offptr = (uint32 *) MultiXactOffsetCtl->shared->page_buffer[slotno];
		offptr += entryno;
		length = *offptr - offset;
	}

	/* Exchange our lock */
	LWLockRelease(MultiXactOffsetControlLock);
	LWLockAcquire(MultiXactMemberControlLock, LW_EXCLUSIVE);

	ptr = (TransactionId *) palloc(length * sizeof(TransactionId));
	*xids = ptr;

	/* Now get the members themselves. */
	for (i = 0; i < length ; i++, offset++)
	{
		TransactionId *xactptr;

		pageno = MultiXactIdToMemberPage(offset);
		entryno = MultiXactIdToMemberEntry(offset);

		slotno = SimpleLruReadPage(MultiXactMemberCtl, pageno,
								   InvalidTransactionId);
		xactptr = (TransactionId *)
			MultiXactMemberCtl->shared->page_buffer[slotno];
		xactptr += entryno;

		*ptr++ = *xactptr;
	}

	LWLockRelease(MultiXactMemberControlLock);

	/*
	 * FIXME -- should we enter this MultiXactId in our local cache?
	 * Can't do it verbatim though -- "multi" was modified above

	mXactCachePut(multi, length, ptr);

	 */

	return length;
}

/*
 * mXactCacheGetBySet
 * 		returns a MultiXactId from the cache based on the set of
 * 		TransactionIds that compose it, or InvalidMultiXactId if
 * 		none matches.
 */
static MultiXactId
mXactCacheGetBySet(int nxids, TransactionId *xids)
{
	mXactCacheEnt	*entry;

	for (entry = MXactCache; entry != NULL; entry = entry->next)
	{
		int i;

		if (entry->nxids != nxids)
			continue;
		/* We assume the entries are sorted */
		for (i = 0; i < nxids; i++)
		{
			if (entry->xids[i] != xids[i])
				goto next;
		}
		return entry->multi;
next:
		;
	}

	return InvalidMultiXactId;
}

/*
 * mXactCacheGetByMulti
 * 		returns the composing TransactionId set from the cache for a
 * 		given MultiXactId, if present.
 */
static int
mXactCacheGetByMulti(MultiXactId multi, TransactionId **xids)
{
	mXactCacheEnt	*entry;

	for (entry = MXactCache; entry != NULL; entry = entry->next)
	{
		if (entry->multi == multi)
		{
			TransactionId	*ptr;
			int		size;

			size = sizeof(TransactionId) * entry->nxids;
			ptr = (TransactionId *) palloc(size);
			*xids = ptr;

			memcpy(ptr, entry->xids, size);

			return entry->nxids;
		}
	}

	return 0;
}

/*
 * mXactCachePut
 * 		Add a new MultiXactId and its composing set into the local cache.
 */
static void
mXactCachePut(MultiXactId multi, int nxids, TransactionId *xids)
{
	mXactCacheEnt  *entry;
	MemoryContext	old_cxt;

	if (MXactContext == NULL)
		MXactContext = AllocSetContextCreate(TopTransactionContext,
											 "MultiXact Cache Context",
											 ALLOCSET_SMALL_MINSIZE,
											 ALLOCSET_SMALL_INITSIZE,
											 ALLOCSET_SMALL_MAXSIZE);

	old_cxt = MemoryContextSwitchTo(MXactContext);

	entry = (mXactCacheEnt *) palloc(sizeof(mXactCacheEnt));

	entry->multi = multi;
	entry->nxids = nxids;
	entry->xids = (TransactionId *) palloc(sizeof(TransactionId) * nxids);
	memcpy(entry->xids, xids, sizeof(TransactionId) * nxids);

	/* mXactCacheGetBySet assumes the entries are sorted, so sort them */
	qsort(entry->xids, nxids, sizeof(TransactionId), xidComparator);
	entry->next = MXactCache;
	MXactCache = entry;

	MemoryContextSwitchTo(old_cxt);
}

/*
 * mXactCacheClear
 * 		Clear the local MultiXactId cache.
 */
static void
mXactCacheClear(void)
{
	MemoryContextReset(MXactContext);
	MXactContext = NULL;
	MXactCache = NULL;
}

#ifdef MULTIXACT_DEBUG
static char *
mxid_to_string(MultiXactId multi, int nxids, TransactionId *xids)
{
	char *str = palloc(15 * (nxids + 1) + 4);
	int i;
	snprintf(str, 47, "%u %d[%u", multi, nxids, xids[0]);

	for (i = 1; i < nxids; i++)
		snprintf(str + strlen(str), 17, ", %u", xids[i]);

	strcat(str, "]");
	return str;
}
#endif

/* xidComparator
 * 		qsort comparison function for Xids
 *
 * FIXME -- I'm dubious about this comparator ... test it more.
 */
static int
xidComparator(const void *arg1, const void *arg2)
{
	const TransactionId *xid1 = (const TransactionId *) arg1;
	const TransactionId *xid2 = (const TransactionId *) arg2;

	if (TransactionIdEquals(*xid1, *xid2))
		return 0;

	return TransactionIdFollows(*xid1, *xid2);
}

/*
 * AtEOXact_MultiXact
 *		Handle transaction end for MultiXact
 *
 * Detect whether we are the transaction with the oldest known-valid
 * MultiXactId, and update the shared state if so.
 */
void
AtEOXact_MultiXact(void)
{
	/* return early if we didn't use MultiXacts */
	if (!MultiXactIdIsValid(MyProc->mxmin))
		return;

	LWLockAcquire(MultiXactGenLock, LW_EXCLUSIVE);

	if (ShmemVariableCache->minMultiXact == MyProc->mxmin)
		ShmemVariableCache->minMultiXact = GetNewMinMultiXact();

	/*
	 * Invalidate our PGPROC's min MultiXactId before releasing
	 * the lock.
	 *
	 * FIXME -- verify that this is right.
	 */
	MyProc->mxmin = InvalidMultiXactId;

	LWLockRelease(MultiXactGenLock);

	/* Clear the local MultiXactId cache */
	mXactCacheClear();

}

/*
 * Initialization of shared memory for MultiXact.  We use two
 * SLRU areas, thus double memory.
 */
int
MultiXactShmemSize(void)
{
	return SimpleLruShmemSize() * 2;
}

void
MultiXactShmemInit(void)
{
	MultiXactOffsetCtl->PagePrecedes = MultiXactOffsetPagePrecedes;
	MultiXactMemberCtl->PagePrecedes = MultiXactMemberPagePrecedes;

	SimpleLruInit(MultiXactOffsetCtl, "MultiXactOffset Ctl",
				  MultiXactOffsetControlLock, "pg_multixact/offsets");
	SimpleLruInit(MultiXactMemberCtl, "MultiXactMember Ctl",
				  MultiXactMemberControlLock, "pg_multixact/members");

	/* Override default assumption that writes should be fsync'd */
	MultiXactOffsetCtl->do_fsync = false;
	MultiXactMemberCtl->do_fsync = false;
}

/*
 * This func must be called ONCE on system install.  It creates the initials
 * MultiXact segments.  (The MultiXacts directores are assumed to have been
 * created by initdb, and MultiXactShmemInit must have been called already.)
 *
 * Note: it's not really necessary to create the initial segment now,
 * since slru.c would create it on first write anyway.	But we may as well
 * do it to be sure the directory is set up correctly.
 */
void
BootStrapMultiXact(void)
{
	int			slotno;

	LWLockAcquire(MultiXactOffsetControlLock, LW_EXCLUSIVE);

	/* Offsets first page */
	slotno = ZeroMultiXactOffsetPage(0);
	SimpleLruWritePage(MultiXactOffsetCtl, slotno, NULL);
	Assert(MultiXactOffsetCtl->shared->page_status[slotno] == SLRU_PAGE_CLEAN);

	LWLockRelease(MultiXactOffsetControlLock);
	LWLockAcquire(MultiXactMemberControlLock, LW_EXCLUSIVE);

	/* Members first page */
	slotno = ZeroMultiXactMemberPage(0);
	SimpleLruWritePage(MultiXactMemberCtl, slotno, NULL);
	Assert(MultiXactMemberCtl->shared->page_status[slotno] == SLRU_PAGE_CLEAN);

	LWLockRelease(MultiXactMemberControlLock);
}

/*
 * Initialize (or reinitialize) a page of MultiXactOffset to zeroes.
 *
 * The page is not actually written, just set up in shared memory.
 * The slot number of the new page is returned.
 *
 * Control lock must be held at entry, and will be held at exit.
 */
static int
ZeroMultiXactOffsetPage(int pageno)
{
	return SimpleLruZeroPage(MultiXactOffsetCtl, pageno);
}

/*
 * Ditto, for MultiXactMember
 */
static int
ZeroMultiXactMemberPage(int pageno)
{
	return SimpleLruZeroPage(MultiXactMemberCtl, pageno);
}

/*
 * This must be called ONCE during postmaster or standalone-backend startup,
 * after StartupXLOG has initialized ShmemVariableCache->nextMXact.
 */
void
StartupMultiXact(void)
{
	int			startPage;

	/*
	 * Since we don't expect MultiXact to be valid across crashes, we
	 * initialize the currently-active page to zeroes during startup.
	 * Whenever we advance into a new page, both ExtendMultiXact routines
	 * will likewise zero the new page without regard to whatever was
	 * previously on disk.
	 */
	LWLockAcquire(MultiXactOffsetControlLock, LW_EXCLUSIVE);

	startPage = MultiXactIdToOffsetPage(ShmemVariableCache->nextMXact);
	(void) ZeroMultiXactOffsetPage(startPage);

	LWLockRelease(MultiXactOffsetControlLock);

	LWLockAcquire(MultiXactMemberControlLock, LW_EXCLUSIVE);

	/* The members area always starts at 0 */
	(void) ZeroMultiXactMemberPage(0);

	LWLockRelease(MultiXactMemberControlLock);
}

/*
 * This must be called ONCE during postmaster or standalone-backend shutdown
 */
void
ShutdownMultiXact(void)
{
	/*
	 * Flush dirty MultiXact pages to disk
	 *
	 * This is not actually necessary from a correctness point of view. We do
	 * it merely as a debugging aid.
	 */
	SimpleLruFlush(MultiXactOffsetCtl, false);
	SimpleLruFlush(MultiXactMemberCtl, false);
}

/*
 * Perform a checkpoint --- either during shutdown, or on-the-fly
 */
void
CheckPointMultiXact(void)
{
	/*
	 * Flush dirty MultiXact pages to disk
	 *
	 * This is not actually necessary from a correctness point of view. We do
	 * it merely to improve the odds that writing of dirty pages is done
	 * by the checkpoint process and not by backends.
	 */
	SimpleLruFlush(MultiXactOffsetCtl, true);
	SimpleLruFlush(MultiXactMemberCtl, true);
}

/*
 * Make sure that MultiXactOffset has room for a newly-allocated MultiXactId.
 *
 * The MultiXactOffsetControlLock should be held at entry, and will
 * be held at exit.
 */
void
ExtendMultiXactOffset(MultiXactId multi)
{
	int			pageno;

	/*
	 * No work except at first MultiXactId of a page.  But beware: just after
	 * wraparound, the first MultiXactId of page zero is FirstMultiXactId.
	 */
	if (MultiXactIdToOffsetEntry(multi) != 0 &&
		multi != FirstMultiXactId)
		return;

	pageno = MultiXactIdToOffsetPage(multi);

	/* Zero the page */
	ZeroMultiXactOffsetPage(pageno);
}

/*
 * Make sure that MultiXactMember has room for the members of a newly-
 * allocated MultiXactId.
 *
 * The MultiXactMemberControlLock should be held at entry, and will be held
 * at exit.
 */
void
ExtendMultiXactMember(uint32 offset)
{
	int		pageno;

	/*
	 * No work except at first entry of a page.
	 */
	if (MultiXactIdToMemberEntry(offset) != 0)
		return;

	pageno = MultiXactIdToMemberPage(offset);

	/* Zero the page */
	ZeroMultiXactMemberPage(pageno);
}

/*
 * Remove all MultiXactOffset segments before the one holding the passed
 * MultiXactId.
 *
 * FIXME -- this is called nowhere at present.  Figure out where it should
 * be.  Maybe the bwgriter?
 */
void
TruncateMultiXactOffset(MultiXactId multi)
{
	int			cutoffPage;
	int			pageno;
	int			slotno;
	uint32	   *offptr;
	uint32		offset;

	/*
	 * Before truncating we need to read the stored datum, in order
	 * to truncate MultiXactMember too.  Note that we truncate on the
	 * offset pointed by the first entry on the page that contains
	 * the passed MultiXactId (not the offset pointed by the passed
	 * MultiXactId).  So no "entryno" calculations here.
	 */
	LWLockAcquire(MultiXactOffsetControlLock, LW_EXCLUSIVE);

	pageno = MultiXactIdToOffsetPage(multi);

	slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, InvalidTransactionId);
	offptr = (uint32 *) MultiXactOffsetCtl->shared->page_buffer[slotno];
	offset = *offptr;

	LWLockRelease(MultiXactOffsetControlLock);

	/*
	 * The cutoff point is the start of the segment containing multi.
	 * We pass the *page* containing oldestXact to SimpleLruTruncate.
	 */
	cutoffPage = MultiXactIdToOffsetPage(multi);

	SimpleLruTruncate(MultiXactOffsetCtl, cutoffPage);
	
	/*
	 * Now truncate MultiXactMember.
	 */
	TruncateMultiXactMember(offset);
}

/*
 * Truncate the members' SLRU area, removing segments before the one
 * holding the passed offset.
 *
 * This is called from TruncateMultiXactOffset only, because at that point
 * we know what is the earliest offset that might be needed to check in the
 * future.
 */
static void
TruncateMultiXactMember(uint32 offset)
{
	int		cutoffPage;

	cutoffPage = MultiXactIdToMemberPage(offset);

	SimpleLruTruncate(MultiXactMemberCtl, cutoffPage);
}

/*
 * Decide which of two MultiXactOffset page numbers is "older" for truncation
 * purposes.
 *
 * We need to use comparison of MultiXactId here in order to do the right
 * thing with wraparound.  However, if we are asked about page number zero, we
 * don't want to hand InvalidMultiXactId to MultiXactIdPrecedes: it'll get
 * weird.  So, offset both multis by FirstMultiXactId to avoid that.
 */
static bool
MultiXactOffsetPagePrecedes(int page1, int page2)
{
	MultiXactId multi1;
	MultiXactId multi2;

	multi1 = ((MultiXactId) page1) * MULTIXACT_OFFSETS_PER_PAGE;
	multi1 += FirstMultiXactId;
	multi2 = ((MultiXactId) page2) * MULTIXACT_OFFSETS_PER_PAGE;
	multi2 += FirstMultiXactId;

	return MultiXactIdPrecedes(multi1, multi2);
}

/*
 * Decide which of two MultiXactMember page numbers is "older" for truncation
 * purposes.  There is no "invalid offset number" so use the numbers verbatim.
 */
static bool
MultiXactMemberPagePrecedes(int page1, int page2)
{
	uint32	offset1;
	uint32	offset2;

	offset1 = ((uint32) page1) * MULTIXACT_MEMBERS_PER_PAGE;
	offset2 = ((uint32) page2) * MULTIXACT_MEMBERS_PER_PAGE;

	return MultiXactOffsetPrecedes(offset1, offset2);
}

/*
 * Decide which of two MultiXactIds is earlier.
 *
 * FIXME -- do we need to do something special to InvalidMultiXactId?
 */
bool
MultiXactIdPrecedes(MultiXactId multi1, MultiXactId multi2)
{
	int32 diff = (int32) (multi1 - multi2);
	return (diff < 0);
}

/*
 * Decide which of two offsets is earlier.
 */
static bool
MultiXactOffsetPrecedes(uint32 offset1, uint32 offset2)
{
	int32 diff = (int32) (offset1 - offset2);
	return (diff < 0);
}
