>From 44ac15e12ac2af4df613087098f9be573517e257 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Tue, 31 Dec 2013 00:43:38 -0300
Subject: [PATCH 2/3] handle wraparound during trunc for multixact/members

---
 src/backend/access/transam/multixact.c |  124 +++++++++++++++++++++++++++++---
 src/backend/access/transam/slru.c      |    5 ++
 2 files changed, 119 insertions(+), 10 deletions(-)

diff --git a/src/backend/access/transam/multixact.c b/src/backend/access/transam/multixact.c
index 2f87a1e..ba730d6 100644
--- a/src/backend/access/transam/multixact.c
+++ b/src/backend/access/transam/multixact.c
@@ -63,6 +63,8 @@
  */
 #include "postgres.h"
 
+#include <unistd.h>
+
 #include "access/multixact.h"
 #include "access/slru.h"
 #include "access/transam.h"
@@ -577,8 +579,13 @@ MultiXactIdSetOldestMember(void)
 		 * another someone else could compute an OldestVisibleMXactId that
 		 * would be after the value we are going to store when we get control
 		 * back.  Which would be wrong.
+		 *
+		 * Note that a shared lock is sufficient, because it's enough to stop
+		 * someone from advancing nextMXact; and nobody else could be trying to
+		 * write to our OldestMember entry, only reading (and we assume storing
+		 * it is atomic.)
 		 */
-		LWLockAcquire(MultiXactGenLock, LW_EXCLUSIVE);
+		LWLockAcquire(MultiXactGenLock, LW_SHARED);
 
 		/*
 		 * We have to beware of the possibility that nextMXact is in the
@@ -1546,7 +1553,7 @@ AtEOXact_MultiXact(void)
 
 /*
  * AtPrepare_MultiXact
- *		Save multixact state at 2PC tranasction prepare
+ *		Save multixact state at 2PC transaction prepare
  *
  * In this phase, we only store our OldestMemberMXactId value in the two-phase
  * state file.
@@ -2241,7 +2248,6 @@ ExtendMultiXactMember(MultiXactOffset offset, int nmembers)
 	{
 		int			flagsoff;
 		int			flagsbit;
-		int			difference;
 
 		/*
 		 * Only zero when at first entry of a page.
@@ -2262,10 +2268,25 @@ ExtendMultiXactMember(MultiXactOffset offset, int nmembers)
 			LWLockRelease(MultiXactMemberControlLock);
 		}
 
-		/* Advance to next page (OK if nmembers goes negative) */
-		difference = MULTIXACT_MEMBERS_PER_PAGE - offset % MULTIXACT_MEMBERS_PER_PAGE;
-		offset += difference;
-		nmembers -= difference;
+		/*
+		 * Advance to next page, taking care to properly handle the wraparound
+		 * case.
+		 */
+		if ((unsigned int) (offset + nmembers) < offset)
+		{
+			uint32		difference = offset + MULTIXACT_MEMBERS_PER_PAGE;
+
+			nmembers -= (unsigned int) (MULTIXACT_MEMBERS_PER_PAGE - difference);
+			offset = 0;
+		}
+		else
+		{
+			int			difference;
+
+			difference = MULTIXACT_MEMBERS_PER_PAGE - offset % MULTIXACT_MEMBERS_PER_PAGE;
+			nmembers -= difference;
+			offset += difference;
+		}
 	}
 }
 
@@ -2322,6 +2343,72 @@ GetOldestMultiXactId(void)
 	return oldestMXact;
 }
 
+/*
+ * SlruScanDirectory callback.
+ * 		This callback deletes segments that are outside the range determined by
+ * 		the given page numbers.
+ *
+ * Both range endpoints are exclusive (that is, segments containing any of
+ * those pages are kept.)
+ */
+typedef struct SlruScanDirPageRange
+{
+	int		rangeStart;
+	int		rangeEnd;
+} SlruScanDirPageRange;
+
+static bool
+SlruScanDirCbRemoveMembers(SlruCtl ctl, char *filename, int segpage,
+						   void *data)
+{
+	SlruScanDirPageRange *range = (SlruScanDirPageRange *) data;
+	MultiXactOffset	nextOffset;
+
+	if (range->rangeStart == range->rangeEnd)
+		return false;		/* easy case out */
+
+	/*
+	 * To ensure that no segment is spuriously removed, we must keep track
+	 * of new segments added since the start of the directory scan; to do this,
+	 * we update our end-of-range point as we run.
+	 *
+	 * As an optimization, we can skip looking at shared memory if we know for
+	 * certain that the current segment must be kept.  This is so because
+	 * nextOffset never decreases, and we never increase rangeStart during any
+	 * one run.
+	 */
+	if (!((range->rangeStart > range->rangeEnd &&
+		   segpage > range->rangeEnd && segpage < range->rangeStart) ||
+		  (range->rangeStart < range->rangeEnd &&
+		   (segpage < range->rangeStart || segpage > range->rangeEnd))))
+		return false;
+
+	/*
+	 * Update our idea of the end of the live range.
+	 */
+	LWLockAcquire(MultiXactGenLock, LW_SHARED);
+	nextOffset = MultiXactState->nextOffset;
+	LWLockRelease(MultiXactGenLock);
+	range->rangeEnd = MXOffsetToMemberPage(nextOffset);
+
+	/* Recheck the deletion condition.  If it still holds, perform it. */
+	if ((range->rangeStart > range->rangeEnd &&
+		 segpage > range->rangeEnd && segpage < range->rangeStart) ||
+		(range->rangeStart < range->rangeEnd &&
+		 (segpage < range->rangeStart || segpage > range->rangeEnd)))
+	{
+		char		path[MAXPGPATH];
+
+		snprintf(path, MAXPGPATH, "%s/%s", ctl->Dir, filename);
+		ereport(DEBUG2,
+				(errmsg("removing file \"%s\"", path)));
+		unlink(path);
+	}
+
+	return false;				/* keep going */
+}
+
+
 typedef struct mxtruncinfo
 {
 	int			earliestExistingPage;
@@ -2363,8 +2450,10 @@ void
 TruncateMultiXact(MultiXactId oldestMXact)
 {
 	MultiXactOffset oldestOffset;
+	MultiXactOffset	nextOffset;
 	mxtruncinfo trunc;
 	MultiXactId earliest;
+	SlruScanDirPageRange	pageRange;
 
 	/*
 	 * Note we can't just plow ahead with the truncation; it's possible that
@@ -2411,9 +2500,24 @@ TruncateMultiXact(MultiXactId oldestMXact)
 	SimpleLruTruncate(MultiXactOffsetCtl,
 					  MultiXactIdToOffsetPage(oldestMXact));
 
-	/* truncate MultiXactMembers and we're done */
-	SimpleLruTruncate(MultiXactMemberCtl,
-					  MXOffsetToMemberPage(oldestOffset));
+	/*
+	 * To truncate MultiXactMembers, we need to figure out the active page
+	 * range and delete all files outside that range.  The start point is the
+	 * start of the segment containing the oldest offset; an end point of the
+	 * segment containing the next offset to use is enough.  The end point is
+	 * updated as MultiXactMember gets extended concurrently, elsewhere.
+	 */
+	pageRange.rangeStart = MXOffsetToMemberPage(oldestOffset);
+	pageRange.rangeStart -= pageRange.rangeStart % SLRU_PAGES_PER_SEGMENT;
+
+	LWLockAcquire(MultiXactGenLock, LW_SHARED);
+	nextOffset = MultiXactState->nextOffset;
+	LWLockRelease(MultiXactGenLock);
+
+	pageRange.rangeEnd = MXOffsetToMemberPage(nextOffset);
+
+	SlruScanDirectory(MultiXactMemberCtl, SlruScanDirCbRemoveMembers,
+					  &pageRange);
 }
 
 /*
diff --git a/src/backend/access/transam/slru.c b/src/backend/access/transam/slru.c
index 5e53593..a8480b0 100644
--- a/src/backend/access/transam/slru.c
+++ b/src/backend/access/transam/slru.c
@@ -1272,6 +1272,11 @@ SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, void *data)
  * If the callback returns true, the scan is stopped.  The last return value
  * from the callback is returned.
  *
+ * The callback receives the following arguments: 1. the SlruCtl struct for the
+ * slru being truncated; 2. the filename being considered; 3. the page number
+ * for the first page of that file; 4. a pointer to the opaque data given to us
+ * by the caller.
+ *
  * Note that the ordering in which the directory is scanned is not guaranteed.
  *
  * Note that no locking is applied.
-- 
1.7.10.4

