Index: src/backend/access/transam/slru.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/access/transam/slru.c,v
retrieving revision 1.44
diff -c -w -p -r1.44 slru.c
*** src/backend/access/transam/slru.c	1 Jan 2008 19:45:48 -0000	1.44
--- src/backend/access/transam/slru.c	16 Oct 2008 12:44:29 -0000
*************** SlruPhysicalReadPage(SlruCtl ctl, int pa
*** 619,624 ****
--- 619,632 ----
  
  	if (lseek(fd, (off_t) offset, SEEK_SET) < 0)
  	{
+ 		if (InRecovery)
+ 		{
+ 			ereport(LOG,
+ 					(errmsg("file \"%s\" doesn't exist, reading as zeroes",
+ 							path)));
+ 			MemSet(shared->page_buffer[slotno], 0, BLCKSZ);
+ 			return true;
+ 		}
  		slru_errcause = SLRU_SEEK_FAILED;
  		slru_errno = errno;
  		close(fd);
*************** SlruPhysicalReadPage(SlruCtl ctl, int pa
*** 628,633 ****
--- 636,649 ----
  	errno = 0;
  	if (read(fd, shared->page_buffer[slotno], BLCKSZ) != BLCKSZ)
  	{
+ 		if (InRecovery)
+ 		{
+ 			ereport(LOG,
+ 					(errmsg("file \"%s\" doesn't exist, reading as zeroes",
+ 							path)));
+ 			MemSet(shared->page_buffer[slotno], 0, BLCKSZ);
+ 			return true;
+ 		}
  		slru_errcause = SLRU_READ_FAILED;
  		slru_errno = errno;
  		close(fd);
Index: src/backend/access/transam/twophase.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/access/transam/twophase.c,v
retrieving revision 1.45
diff -c -w -p -r1.45 twophase.c
*** src/backend/access/transam/twophase.c	11 Aug 2008 11:05:10 -0000	1.45
--- src/backend/access/transam/twophase.c	16 Oct 2008 12:44:29 -0000
*************** RecordTransactionCommitPrepared(Transact
*** 1710,1715 ****
--- 1710,1716 ----
  	xlrec.crec.xact_time = GetCurrentTimestamp();
  	xlrec.crec.nrels = nrels;
  	xlrec.crec.nsubxacts = nchildren;
+ 	xlrec.crec.slotId = MyProc->slotId;
  	rdata[0].data = (char *) (&xlrec);
  	rdata[0].len = MinSizeOfXactCommitPrepared;
  	rdata[0].buffer = InvalidBuffer;
*************** RecordTransactionAbortPrepared(Transacti
*** 1790,1795 ****
--- 1791,1797 ----
  	xlrec.arec.xact_time = GetCurrentTimestamp();
  	xlrec.arec.nrels = nrels;
  	xlrec.arec.nsubxacts = nchildren;
+ 	xlrec.arec.slotId = MyProc->slotId;
  	rdata[0].data = (char *) (&xlrec);
  	rdata[0].len = MinSizeOfXactAbortPrepared;
  	rdata[0].buffer = InvalidBuffer;
Index: src/backend/access/transam/xact.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/access/transam/xact.c,v
retrieving revision 1.265
diff -c -w -p -r1.265 xact.c
*** src/backend/access/transam/xact.c	11 Aug 2008 11:05:10 -0000	1.265
--- src/backend/access/transam/xact.c	16 Oct 2008 12:44:29 -0000
*************** int			CommitSiblings = 5; /* # concurren
*** 72,77 ****
--- 72,81 ----
   */
  bool		MyXactAccessedTempRel = false;
  
+ /*
+  * Bookkeeping for tracking emulated transactions in Recovery Procs.
+  */
+ static TransactionId	LatestObservedXid = InvalidTransactionId;
  
  /*
   *	transaction states - transaction state from server perspective
*************** typedef struct TransactionStateData
*** 139,144 ****
--- 143,149 ----
  	Oid			prevUser;		/* previous CurrentUserId setting */
  	bool		prevSecDefCxt;	/* previous SecurityDefinerContext setting */
  	bool		prevXactReadOnly;		/* entry-time xact r/o state */
+ 	bool		hasIssuedWAL;	/* has this transaction issued WAL? */
  	struct TransactionStateData *parent;		/* back link to parent */
  } TransactionStateData;
  
*************** static TransactionStateData TopTransacti
*** 167,172 ****
--- 172,178 ----
  	InvalidOid,					/* previous CurrentUserId setting */
  	false,						/* previous SecurityDefinerContext setting */
  	false,						/* entry-time xact r/o state */
+ 	false,						/* initial state for hasIssuedWAL */
  	NULL						/* link to parent state block */
  };
  
*************** static bool forceSyncCommit = false;
*** 210,215 ****
--- 216,228 ----
  static MemoryContext TransactionAbortContext = NULL;
  
  /*
+  * Bookkeeping for Recovery Snapshots. They're either immediately usable,
+  * or can calculate when they will be usable.
+  */
+ //static bool 			RecoverySnapshotValid = false;
+ //static TransactionId 	RecoverySnapshotUsableAtXmin = InvalidTransactionId; 
+ 
+ /*
   * List of add-on start- and end-of-xact callbacks
   */
  typedef struct XactCallbackItem
*************** static SubXactCallbackItem *SubXact_call
*** 235,241 ****
  
  
  /* local function prototypes */
! static void AssignTransactionId(TransactionState s);
  static void AbortTransaction(void);
  static void AtAbort_Memory(void);
  static void AtCleanup_Memory(void);
--- 248,254 ----
  
  
  /* local function prototypes */
! static void AssignTransactionId(TransactionState s, int recursion_level);
  static void AbortTransaction(void);
  static void AtAbort_Memory(void);
  static void AtCleanup_Memory(void);
*************** TransactionId
*** 330,336 ****
  GetTopTransactionId(void)
  {
  	if (!TransactionIdIsValid(TopTransactionStateData.transactionId))
! 		AssignTransactionId(&TopTransactionStateData);
  	return TopTransactionStateData.transactionId;
  }
  
--- 343,349 ----
  GetTopTransactionId(void)
  {
  	if (!TransactionIdIsValid(TopTransactionStateData.transactionId))
! 		AssignTransactionId(&TopTransactionStateData, 0);
  	return TopTransactionStateData.transactionId;
  }
  
*************** GetCurrentTransactionId(void)
*** 360,366 ****
  	TransactionState s = CurrentTransactionState;
  
  	if (!TransactionIdIsValid(s->transactionId))
! 		AssignTransactionId(s);
  	return s->transactionId;
  }
  
--- 373,379 ----
  	TransactionState s = CurrentTransactionState;
  
  	if (!TransactionIdIsValid(s->transactionId))
! 		AssignTransactionId(s, 0);
  	return s->transactionId;
  }
  
*************** GetCurrentTransactionIdIfAny(void)
*** 377,382 ****
--- 390,474 ----
  	return CurrentTransactionState->transactionId;
  }
  
+ /*
+  * Fill in additional transaction information for an XLogRecord.
+  * We do this here so we can inspect various transaction state data,
+  * plus no need to further clutter XLogInsert().
+  */
+ void
+ GetStandbyInfoForTransaction(RmgrId rmid, uint8 info, XLogRecData *rdata,
+ 								TransactionId *xid2, uint16 *info2)
+ {
+ 	int level;
+ 	int	slotId;
+ 
+ 	if (!MyProc)
+ 		slotId = 0;
+ 	else
+ 		slotId = MyProc->slotId;
+ 
+ //	Assert(slotId >= 0);
+ 
+ 	if (slotId >= XLOG_MAX_SLOT_ID)
+ 		*info2 |= XLR2_INVALID_SLOT_ID;
+ 	else
+ 		*info2 = ((uint16) slotId) & XLR2_INFO2_MASK;
+ 
+ 	if (rmid == RM_XACT_ID && info == XLOG_XACT_ASSIGNMENT)
+ 	{
+ 		xl_xact_assignment *xlrec = (xl_xact_assignment *) rdata->data;
+ 
+ 		/*
+ 		 * We set the flag for records written by AssignTransactionId 
+ 		 * to allow that record type to be handled by 
+ 		 * RecordKnownAssignedTransactionIds(). This looks a little
+ 		 * strange, but it avoids the need to alter the API of XLogInsert.
+ 		 */
+ 		if (xlrec->isSubXact)
+ 			*info2 |= XLR2_FIRST_SUBXID_RECORD;
+ 		else
+ 			*info2 |= XLR2_FIRST_XID_RECORD;
+ 	}
+ 	else
+ 	{
+ 		/*
+ 		 * If we haven't assigned an xid yet, don't flag the record.
+ 		 * We currently assign xids when we make database entries, so
+ 		 * things like storage creation and oid assignment does not
+ 		 * have xids assigned on them. So no need to mark xid2 either.
+ 		 */
+ 		if (!TransactionIdIsValid(GetCurrentTransactionIdIfAny()))
+ 			return;
+ 
+ 		level = GetCurrentTransactionNestLevel();
+ 
+ 		if (level >= 1 && !CurrentTransactionState->hasIssuedWAL)
+ 		{
+ 			if (level == 1)				
+ 				*info2 |= XLR2_FIRST_XID_RECORD;
+ 			else
+ 			{
+ 				*info2 |= XLR2_FIRST_SUBXID_RECORD;
+ 
+ 				if (level == 2 && 
+ 					!CurrentTransactionState->parent->hasIssuedWAL)
+ 				{
+ 					*info2 |= XLR2_FIRST_XID_RECORD;
+ 					CurrentTransactionState->parent->hasIssuedWAL = true;
+ 				}
+ 			}
+ 			CurrentTransactionState->hasIssuedWAL = true;
+ 		}
+ 
+ 		/*
+ 		 * Set the secondary TransactionId for this record
+ 		 */
+ 		if (*info2 & XLR2_FIRST_SUBXID_RECORD)
+ 			*xid2 = CurrentTransactionState->parent->transactionId;
+ 		else if (rmid == RM_HEAP2_ID)
+ 			*xid2 = InvalidTransactionId; // XXX: GetLatestRemovedXidIfAny();
+ 	}
+ }
  
  /*
   * AssignTransactionId
*************** GetCurrentTransactionIdIfAny(void)
*** 388,394 ****
   * following its parent's.
   */
  static void
! AssignTransactionId(TransactionState s)
  {
  	bool		isSubXact = (s->parent != NULL);
  	ResourceOwner currentOwner;
--- 480,486 ----
   * following its parent's.
   */
  static void
! AssignTransactionId(TransactionState s, int recursion_level)
  {
  	bool		isSubXact = (s->parent != NULL);
  	ResourceOwner currentOwner;
*************** AssignTransactionId(TransactionState s)
*** 402,408 ****
  	 * than its parent.
  	 */
  	if (isSubXact && !TransactionIdIsValid(s->parent->transactionId))
! 		AssignTransactionId(s->parent);
  
  	/*
  	 * Generate a new Xid and record it in PG_PROC and pg_subtrans.
--- 494,500 ----
  	 * than its parent.
  	 */
  	if (isSubXact && !TransactionIdIsValid(s->parent->transactionId))
! 		AssignTransactionId(s->parent, recursion_level + 1);
  
  	/*
  	 * Generate a new Xid and record it in PG_PROC and pg_subtrans.
*************** AssignTransactionId(TransactionState s)
*** 436,441 ****
--- 528,592 ----
  	}
  	PG_END_TRY();
  	CurrentResourceOwner = currentOwner;
+ 
+ #ifdef DEBUG_ASSIGN_XID
+ 	elog(LOG, "AssignXactId xid %d nest %d recursion %d hasIssuedWAL %s hasParent %s",
+ 				s->transactionId,
+ 				GetCurrentTransactionNestLevel(),
+ 				recursion_level,
+ 				s->hasIssuedWAL ? "t" : "f",
+ 				s->parent ? "t" : "f");
+ #endif
+ 	/*
+ 	 * WAL log this assignment, if required.
+ 	 *
+ 	 * If we have large numbers of connections, we need to log also.
+ 	 */
+ 	if (recursion_level > 1 || 
+ 		(recursion_level == 1 && s->parent) ||
+ 		(MyProc && MyProc->slotId >= XLOG_MAX_SLOT_ID))
+ 	{
+ 		XLogRecData rdata;
+ 		xl_xact_assignment	xlrec;
+ 
+ 		xlrec.xassign = s->transactionId;
+ 		xlrec.isSubXact = (s->parent != NULL);
+ 		xlrec.slotId = MyProc->slotId;
+ 
+ 		if (xlrec.isSubXact)
+ 			xlrec.xparent = s->parent->transactionId;
+ 		else
+ 			xlrec.xparent = InvalidTransactionId;
+ 
+ 		START_CRIT_SECTION();
+ 
+ 		rdata.data = (char *) (&xlrec);
+ 		rdata.len = sizeof(xl_xact_assignment);
+ 		rdata.buffer = InvalidBuffer;
+ 		rdata.next = NULL;
+ 
+ 		/* 
+ 		 * These WAL records look like no other. We are assigning a 
+ 		 * TransactionId to upper levels of the transaction stack. The
+ 		 * transaction level we are looking at is *not* the *current*
+ 		 * transaction. We have not yet assigned the xid for the current
+ 		 * transaction, so the xid of this WAL record will be 
+ 		 * InvalidTransactionId, even though we are in a transaction.
+ 		 * Got that?
+ 		 * 
+ 		 * So we stuff the newly assigned xid into the WAL record and
+ 		 * let WAL replay sort it out later.
+ 		 */
+ 		(void) XLogInsert(RM_XACT_ID, XLOG_XACT_ASSIGNMENT, &rdata);
+ 
+ 		END_CRIT_SECTION();
+ 
+ 		/*
+ 		 * Mark this transaction level, so we can avoid issuing WAL records
+ 		 * for later subtransactions also.
+ 		 */
+ 		s->hasIssuedWAL = true;
+ 	}
  }
  
  
*************** RecordTransactionCommit(void)
*** 892,897 ****
--- 1043,1049 ----
  		xlrec.xact_time = xactStopTimestamp;
  		xlrec.nrels = nrels;
  		xlrec.nsubxacts = nchildren;
+ 		xlrec.slotId = MyProc->slotId;
  		rdata[0].data = (char *) (&xlrec);
  		rdata[0].len = MinSizeOfXactCommit;
  		rdata[0].buffer = InvalidBuffer;
*************** RecordTransactionAbort(bool isSubXact)
*** 1256,1261 ****
--- 1408,1414 ----
  	}
  	xlrec.nrels = nrels;
  	xlrec.nsubxacts = nchildren;
+ 	xlrec.slotId = MyProc->slotId;
  	rdata[0].data = (char *) (&xlrec);
  	rdata[0].len = MinSizeOfXactAbort;
  	rdata[0].buffer = InvalidBuffer;
*************** StartTransaction(void)
*** 1568,1573 ****
--- 1721,1727 ----
  	s->childXids = NULL;
  	s->nChildXids = 0;
  	s->maxChildXids = 0;
+ 	s->hasIssuedWAL = false;
  	GetUserIdAndContext(&s->prevUser, &s->prevSecDefCxt);
  	/* SecurityDefinerContext should never be set outside a transaction */
  	Assert(!s->prevSecDefCxt);
*************** xactGetCommittedChildren(TransactionId *
*** 4248,4260 ****
  	return s->nChildXids;
  }
  
  /*
!  *	XLOG support routines
   */
  
  static void
! xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid)
  {
  	TransactionId *sub_xids;
  	TransactionId max_xid;
  	int			i;
--- 4402,4667 ----
  	return s->nChildXids;
  }
  
+ void
+ LogCurrentRunningXacts(void)
+ {
+ 	RunningTransactions		CurrRunningXacts = GetRunningTransactionData();
+ 	xl_xact_running_xacts	xlrec;
+ 	XLogRecData 			rdata[3];
+ 	int						lastrdata = 0;
+ 	XLogRecPtr				recptr;
+ 
+ 	xlrec.xcnt = CurrRunningXacts->xcnt;
+ 	xlrec.subxcnt = CurrRunningXacts->subxcnt;
+ 	xlrec.xlatest = CurrRunningXacts->xlatest;
+ 
+ 	/* Header */
+ 	rdata[0].data = (char *) (&xlrec);
+ 	rdata[0].len = MinSizeOfXactRunningXacts;
+ 	rdata[0].buffer = InvalidBuffer;
+ 
+ 	/* array of RunningXact */
+ 	if (xlrec.xcnt > 0)
+ 	{
+ 		rdata[0].next = &(rdata[1]);
+ 		rdata[1].data = (char *) CurrRunningXacts->xrun;
+ 		rdata[1].len = xlrec.xcnt * sizeof(RunningXact);
+ 		rdata[1].buffer = InvalidBuffer;
+ 		lastrdata = 1;
+ 	}
+ 
+ 	/* array of RunningXact */
+ 	if (xlrec.subxcnt > 0)
+ 	{
+ 		rdata[lastrdata].next = &(rdata[2]);
+ 		rdata[2].data = (char *) CurrRunningXacts->subxip;
+ 		rdata[2].len = xlrec.subxcnt * sizeof(TransactionId);
+ 		rdata[2].buffer = InvalidBuffer;
+ 		lastrdata = 2;
+ 	}
+ 
+ 	rdata[lastrdata].next = NULL;
+ 
+ 	START_CRIT_SECTION();
+ 
+ 	recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_RUNNING_XACTS, rdata);
+ 
+ 	END_CRIT_SECTION();
+ 
+ 	elog(DEBUG1, "captured snapshot of running xacts %X/%X", recptr.xlogid, recptr.xrecoff);
+ }
+ 
  /*
!  * During recovery we maintain ProcArray with incoming xids
!  * when we first observe them in use. Uses local variables, so
!  * should only be called by Startup process.
!  *
!  * We record all xids that we know have been assigned. That includes
!  * all the xids on the WAL record, plus all unobserved xids that
!  * we can deduce have been assigned. We can deduce the existence of
!  * unobserved xids because we know xids are in sequence, with no gaps.
!  * 
!  * XXX Be careful of what happens when we use pg_resetxlogs.
!  */
! void
! RecordKnownAssignedTransactionIds(XLogRecPtr lsn, XLogRecord *record)
! {
! 	uint8			info = record->xl_info & ~XLR_INFO_MASK;
! 	TransactionId	xid,
! 					parent_xid;
! 	int				slotId;
! 	PGPROC 			*proc;
! 	TransactionId	next_expected_xid = LatestObservedXid;
! 	TransactionId	next_plus_one_xid;
! 
! 	/*
! 	 * Have we seen the first RunningXacts yet? If not, no need to
! 	 * maintain state.
! 	 */
! 	if (!TransactionIdIsValid(LatestObservedXid))
! 		return;
! 
! 	/*
! 	 * If its an assignment record, we need to need extract data from
! 	 * the body of the record, rather than take header values. This
! 	 * is because an assignment record can be issued when
! 	 * GetCurrentTransactionIdIfAny() returns InvalidTransactionId.
! 	 * We also use the supplied slotId rather than the header value,
! 	 * so we can cope with backends above XLOG_MAX_SLOT_ID.
! 	 */
! 	if (record->xl_rmid == RM_XACT_ID && info == XLOG_XACT_ASSIGNMENT)
! 	{
! 		xl_xact_assignment	*xlrec = (xl_xact_assignment *) XLogRecGetData(record);
! 
! 		xid = xlrec->xassign;
! 		parent_xid = xlrec->xparent;
! 		slotId = xlrec->slotId;
! 	}
! 	else
! 	{
! 		xid = record->xl_xid;
! 		parent_xid = record->xl_xid2;
! 		slotId = XLogRecGetSlotId(record);
! 	}
! 
! 	elog(DEBUG3, "RecordKnown xid %d parent %d slot %d firstXid %s firstSubXid %s ", 
! 					xid, parent_xid, slotId,
! 					XLogRecIsFirstXidRecord(record) ? "t" : "f",
! 					XLogRecIsFirstSubXidRecord(record) ? "t" : "f");
! 
! 	if (XLogRecIsFirstSubXidRecord(record))
! 		Assert(TransactionIdIsValid(parent_xid) && TransactionIdPrecedes(parent_xid, xid));
! 	else
! 		Assert(!TransactionIdIsValid(parent_xid));
! 
! 	/*
! 	 * Identify the recovery proc that holds replay info for this xid
! 	 */
! 	proc = SlotIdGetRecoveryProc(slotId);
! 
! 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
! 
! 	/*
! 	 * Record the newly observed xid onto the correct proc.
! 	 */
! 	if (XLogRecIsFirstXidRecord(record))
! 	{
! 		if (XLogRecIsFirstSubXidRecord(record))
! 		{
! 			/*
! 			 * If both flags are set, then we are seeing both the
! 			 * subtransaction xid and its top-level parent xid
! 			 * for the first time. So start the top-level transaction 
! 			 * first, then add the subtransaction.
! 			 *
! 			 * Note that we don't need locks in all cases here
! 			 * because it is normal to start each of these atomically,
! 			 * in sequence.
! 			 */
! #define XACT_IS_TOP_XACT	false
! #define XACT_IS_SUBXACT		true
! 			ProcArrayStartRecoveryTransaction(proc, parent_xid, lsn, XACT_IS_TOP_XACT);
! 			ProcArrayStartRecoveryTransaction(proc, xid, lsn, XACT_IS_SUBXACT);
! 		}
! 		else
! 		{
! 			/*
! 			 * First observation of top-level xid only.
! 			 */
! 			ProcArrayStartRecoveryTransaction(proc, xid, lsn, XACT_IS_TOP_XACT);
! 		}
! 	}
! 	else if (XLogRecIsFirstSubXidRecord(record))
! 	{
! 		/*
! 		 * First observation of subtransaction xid.
! 		 */
! 		ProcArrayStartRecoveryTransaction(proc, xid, lsn, XACT_IS_SUBXACT);
! 	}
! 
! 	/*
! 	 * When a newly observed xid arrives, it is frequently the case
! 	 * that it is *not* the next xid in sequence. When this occurs, we
! 	 * must treat the intervening xids as running also. So we maintain
! 	 * a special list of these UnobservedXids, so that snapshots can
! 	 * see what's happening.
! 	 *
! 	 * We maintain both recovery Procs *and* UnobservedXids because we
! 	 * need them both. Recovery procs allow us to store top-level xids
! 	 * and subtransactions separately, otherwise we wouldn't know
! 	 * when to overflow the subxid cache. UnobservedXids allow us to
! 	 * make sense of the out-of-order arrival of xids.
! 	 *
! 	 * Some examples:
! 	 * 1)	LatestObservedXid = 647
! 	 *		next xid observed in WAL = 651 (a top-level transaction)
! 	 *		so we add 648, 649, 650 to UnobservedXids
! 	 *
! 	 * 2)	LatestObservedXid = 769
! 	 *		next xid observed in WAL = 771 (a subtransaction)
! 	 *		so we add 770 to UnobservedXids
! 	 *
! 	 * 3)	LatestObservedXid = 769
! 	 *		next xid observed in WAL = 810 (a subtransaction)
! 	 *		810's parent had not yet recorded WAL = 807
! 	 *		so we add 770 thru 809 inclusive to UnobservedXids
! 	 *		then remove 807
   */
+ 	TransactionIdAdvance(next_expected_xid);
+ 	next_plus_one_xid = next_expected_xid;
+ 	TransactionIdAdvance(next_plus_one_xid);
  
+ 	if (next_expected_xid == xid ||
+ 		(next_expected_xid == parent_xid && next_plus_one_xid == xid))
+ 	{
+ 		elog(DEBUG3, "Do Nothing: Unobserved processing %d %d %d - %d", 
+ 				LatestObservedXid, next_expected_xid, next_plus_one_xid, xid);
+ 		Assert(!XidInUnobservedTransactions(xid));
+  		Assert(!XidInUnobservedTransactions(parent_xid));
+ 
+ 		/* 
+ 		 * Do Nothing, this is what we hope for. Just move counter forwards. 
+ 		 */
+ 		LatestObservedXid = xid;
+ 	}
+ 	else 
+ 	{
+ 		/*
+ 		 * We have work to do on the UnobservedXids array.
+ 		 */
+ 		if (TransactionIdPrecedes(next_expected_xid, xid))
+ 		{
+ 			/*
+ 			 * This is a new transaction that leaves gaps in the xid sequence.
+ 			 * We must add the intervening xids onto UnobservedXids, from
+ 			 * next_expected_xids to, but not including the newly observed xid.
+ 			 */		
+ 			elog(DEBUG3, "Add: Unobserved processing %d %d %d", LatestObservedXid, next_expected_xid, xid);
+ 			UnobservedTransactionsAddXids(next_expected_xid, xid);
+ 			LatestObservedXid = xid;
+ 		}
+ 		else
+ 		{
+ 			/*
+ 			 * We're a transaction that was added to UnobservedXids earlier
+ 			 * by an out of order xid. Just remove ourselves from array.
+ 			 */
+ 			elog(DEBUG3, "Remove: Unobserved processing %d %d %d", LatestObservedXid, next_expected_xid, xid);
+ 			UnobservedTransactionsRemoveXid(xid);
+ 		}
+ 
+ 		/*
+ 		 * If both xids are newly observed, parent_xid is definitely
+ 		 * on UnobservedXids now. That might be because we just put
+ 		 * it there or maybe it was already there. Doesn't matter,
+ 		 * just remove it. 
+ 		 */
+ 		if (XLogRecIsFirstSubXidRecord(record) && 
+ 			XLogRecIsFirstXidRecord(record))
+ 			UnobservedTransactionsRemoveXid(parent_xid);
+ 	}
+ 
+ 	LWLockRelease(ProcArrayLock);
+ 
+ 	/* 
+ 	 * Now we've upated the proc we can update subtrans, if appropriate.  
+ 	 * We must do this step last to avoid race conditions.  See comments
+ 	 * and code for AssignTransactionId(). 
+ 	 */
+ 	if (XLogRecIsFirstSubXidRecord(record))
+ 	{
+ 		elog(LOG, "subtrans setting parent %d for xid %d", parent_xid, xid);
+ 		SubTransSetParent(xid, parent_xid);
+ 	}
+ }
+ 
+ /*
+  *	XLOG support routines
+  */
  static void
! xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid, bool preparedXact)
  {
+ 	PGPROC		*proc;
  	TransactionId *sub_xids;
  	TransactionId max_xid;
  	int			i;
*************** xact_redo_commit(xl_xact_commit *xlrec, 
*** 4267,4277 ****
--- 4674,4755 ----
  
  	/* Make sure nextXid is beyond any XID mentioned in the record */
  	max_xid = xid;
+ 
+ 	/*
+ 	 * Find the highest xid. ISTM this should be just the xid of
+ 	 * the last subtransaction in the array, since they are allocated
+ 	 * in sequence? i.e. 
+ 
+ 		if (xlrec->nsubxacts > 0)
+ 			max_xid = sub_xids[xlrec->nsubxacts - 1];
+  
+ 		Leave as it is for now, but we should document either way.
+ 	 */
  	for (i = 0; i < xlrec->nsubxacts; i++)
  	{
  		if (TransactionIdPrecedes(max_xid, sub_xids[i]))
  			max_xid = sub_xids[i];
  	}
+ 
+ 	/*
+ 	 * Even though there is a slotId on the xlrec header we use the slotId
+ 	 * from the nody of the xlrec, to allow for cases where MaxBackends
+ 	 * larger than can fit in the xlrec header.
+ 	 */
+ 	proc = SlotIdGetRecoveryProc(xlrec->slotId);
+ 
+ #ifdef USE_ASSERT_CHECKING
+ 	if (!preparedXact)
+ 	{
+ 		/*
+ 		 * Double check everything to make sure there's no mistakes
+ 		 * before we update the proc array.
+ 		 */
+ 		if (xid != proc->xid)
+ 		{
+ 			if (XidInRecoveryProcs(xid) && !preparedXact)
+ 			{
+ 				elog(LOG, "xid %d slot %d proc->xid %d prep %s", 
+ 						xid, xlrec->slotId, proc->xid,
+ 						(preparedXact ? "t" : "f"));
+ 
+ 				ProcArrayDisplay();
+ 				elog(FATAL, "accessed the wrong slot");
+ 			}
+ 		}
+ 
+ 		if (XidInUnobservedTransactions(xid))
+ 		{
+ 			ProcArrayDisplay();
+ 			UnobservedTransactionsDisplay();
+ 			elog(FATAL, "xid %d still in UnobservedXids", xid);
+ 		}
+ 
+ 		if (TransactionIdIsValid(LatestObservedXid) && 
+ 			TransactionIdPrecedes(LatestObservedXid, max_xid))
+ 		{
+ 			ProcArrayDisplay();
+ 			UnobservedTransactionsDisplay();
+ 			elog(FATAL, "LatestObservedXid %d not moved forwards to %d", LatestObservedXid, max_xid);
+ 		}
+ 	}
+ #endif
+ 
+ 	/*
+ 	 * We must mark clog before we update the ProcArray. Only update
+ 	 * if we have already initialised the state and we have previously
+ 	 * added an xid to the proc. We need no lock to check xid since it 
+ 	 * is controlled by Startup process. It's possible for xids to
+ 	 * appear that haven't been seen before. We don't need to check
+ 	 * UnobservedXids because in the normal case this will already have
+ 	 * happened, but there are cases where they might sneak through.
+ 	 * Leave these for the periodic cleanup by XACT_RUNNING_XACT records.
+ 	 */
+ 	if (TransactionIdIsValid(LatestObservedXid) && 
+ 		TransactionIdIsValid(proc->xid) && !preparedXact)
+ 		ProcArrayEndTransaction(proc, max_xid);
+ 
+ 	/* Make sure nextXid is beyond any XID mentioned in the record */
  	if (TransactionIdFollowsOrEquals(max_xid,
  									 ShmemVariableCache->nextXid))
  	{
*************** xact_redo_commit(xl_xact_commit *xlrec, 
*** 4293,4300 ****
  }
  
  static void
! xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
  {
  	TransactionId *sub_xids;
  	TransactionId max_xid;
  	int			i;
--- 4771,4779 ----
  }
  
  static void
! xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid, bool preparedXact)
  {
+ 	PGPROC		*proc;
  	TransactionId *sub_xids;
  	TransactionId max_xid;
  	int			i;
*************** xact_redo_abort(xl_xact_abort *xlrec, Tr
*** 4307,4317 ****
--- 4786,4879 ----
  
  	/* Make sure nextXid is beyond any XID mentioned in the record */
  	max_xid = xid;
+ 
+ 	/*
+ 	 * Find the highest xid. ISTM this should be just the xid of
+ 	 * the last subtransaction in the array, since they are allocated
+ 	 * in sequence? i.e. 
+ 
+ 		if (xlrec->nsubxacts > 0)
+ 			max_xid = sub_xids[xlrec->nsubxacts - 1];
+  
+ 		Leave as it is for now, but we should document either way.
+ 	 */
  	for (i = 0; i < xlrec->nsubxacts; i++)
  	{
  		if (TransactionIdPrecedes(max_xid, sub_xids[i]))
  			max_xid = sub_xids[i];
  	}
+ 
+ 	/*
+ 	 * Even though there is a slotId on the xlrec header we use the slotId
+ 	 * from the nody of the xlrec, to allow for cases where MaxBackends
+ 	 * larger than can fit in the xlrec header.
+ 	 */
+ 	proc = SlotIdGetRecoveryProc(xlrec->slotId);
+ 
+ 	/*
+ 	 * It's possible that we wrote an abort record without having written
+ 	 * anything else. If that happens we need to handle subtransactions.
+ 	 * If there is more than one subtransaction it should already have been
+ 	 * handled via an assignment record. So if the counter is behind then
+ 	 * it's an error except when we have exactly one subtransaction.
+ 	 */
+ 	if (TransactionIdIsValid(LatestObservedXid) &&
+ 		TransactionIdPrecedes(LatestObservedXid, max_xid))
+ 	{
+ 		if (xlrec->nsubxacts == 1)
+ 			LatestObservedXid = max_xid;
+ 		else
+ 		{
+ 			ProcArrayDisplay();
+ 			UnobservedTransactionsDisplay();
+ 			elog(FATAL, "LatestObservedXid %d not moved forwards to %d", LatestObservedXid, max_xid);
+ 		}
+ 	}
+ 
+ #ifdef USE_ASSERT_CHECKING
+ 	if (!preparedXact)
+ 	{
+ 		/*
+ 		 * Double check everything to make sure there's no mistakes
+ 		 * before we update the proc array.
+ 		 */
+ 		if (xid != proc->xid)
+ 		{
+ 			if (XidInRecoveryProcs(xid) && !preparedXact)
+ 			{
+ 				elog(LOG, "xid %d slot %d proc->xid %d prep %s", 
+ 						xid, xlrec->slotId, proc->xid,
+ 						(preparedXact ? "t" : "f"));
+ 
+ 				ProcArrayDisplay();
+ 				elog(FATAL, "accessed the wrong slot");
+ 			}
+ 		}
+ 
+ 		if (XidInUnobservedTransactions(xid))
+ 		{
+ 			ProcArrayDisplay();
+ 			UnobservedTransactionsDisplay();
+ 			elog(FATAL, "xid %d still in UnobservedXids", xid);
+ 		}
+ 	}
+ #endif
+ 
+ 	/*
+ 	 * We must mark clog before we update the ProcArray. Only update
+ 	 * if we have already initialised the state and we have previously
+ 	 * added an xid to the proc. We need no lock to check xid since it 
+ 	 * is controlled by Startup process. It's possible for xids to
+ 	 * appear that haven't been seen before. We don't need to check
+ 	 * UnobservedXids because in the normal case this will already have
+ 	 * happened, but there are cases where they might sneak through.
+ 	 * Leave these for the periodic cleanup by XACT_RUNNING_XACT records.
+ 	 */
+ 	if (TransactionIdIsValid(LatestObservedXid) && 
+ 		TransactionIdIsValid(proc->xid) && !preparedXact)
+ 		ProcArrayEndTransaction(proc, max_xid);
+ 
+ 	/* Make sure nextXid is beyond any XID mentioned in the record */
  	if (TransactionIdFollowsOrEquals(max_xid,
  									 ShmemVariableCache->nextXid))
  	{
*************** xact_redo(XLogRecPtr lsn, XLogRecord *re
*** 4341,4353 ****
  	{
  		xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
  
! 		xact_redo_commit(xlrec, record->xl_xid);
  	}
  	else if (info == XLOG_XACT_ABORT)
  	{
  		xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record);
  
! 		xact_redo_abort(xlrec, record->xl_xid);
  	}
  	else if (info == XLOG_XACT_PREPARE)
  	{
--- 4903,4915 ----
  	{
  		xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
  
! 		xact_redo_commit(xlrec, record->xl_xid, false);
  	}
  	else if (info == XLOG_XACT_ABORT)
  	{
  		xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record);
  
! 		xact_redo_abort(xlrec, record->xl_xid, false);
  	}
  	else if (info == XLOG_XACT_PREPARE)
  	{
*************** xact_redo(XLogRecPtr lsn, XLogRecord *re
*** 4359,4374 ****
  	{
  		xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) XLogRecGetData(record);
  
! 		xact_redo_commit(&xlrec->crec, xlrec->xid);
  		RemoveTwoPhaseFile(xlrec->xid, false);
  	}
  	else if (info == XLOG_XACT_ABORT_PREPARED)
  	{
  		xl_xact_abort_prepared *xlrec = (xl_xact_abort_prepared *) XLogRecGetData(record);
  
! 		xact_redo_abort(&xlrec->arec, xlrec->xid);
  		RemoveTwoPhaseFile(xlrec->xid, false);
  	}
  	else
  		elog(PANIC, "xact_redo: unknown op code %u", info);
  }
--- 4921,4961 ----
  	{
  		xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) XLogRecGetData(record);
  
! 		xact_redo_commit(&xlrec->crec, xlrec->xid, true);
  		RemoveTwoPhaseFile(xlrec->xid, false);
  	}
  	else if (info == XLOG_XACT_ABORT_PREPARED)
  	{
  		xl_xact_abort_prepared *xlrec = (xl_xact_abort_prepared *) XLogRecGetData(record);
  
! 		xact_redo_abort(&xlrec->arec, xlrec->xid, true);
  		RemoveTwoPhaseFile(xlrec->xid, false);
  	}
+ 	else if (info == XLOG_XACT_ASSIGNMENT)
+ 	{
+ 		/*
+ 		 * This is a no-op since RecordKnownAssignedTransactionIds()
+ 		 * already did all the work on this record for us.
+ 		 */
+ 		return;
+ 	}
+ 	else if (info == XLOG_XACT_RUNNING_XACTS)
+ 	{
+ 		xl_xact_running_xacts *xlrec = (xl_xact_running_xacts *) XLogRecGetData(record);
+ 
+ 		/*
+ 		 * Initialise if we have a valid snapshot to work with
+ 		 */
+ 		if (!TransactionIdIsValid(LatestObservedXid) && 
+ 			TransactionIdIsValid(xlrec->xlatest))
+ 		{
+ 			LatestObservedXid = xlrec->xlatest;
+ 			elog(LOG, "Initial snapshot created; LatestObservedXid = %d",
+ 						LatestObservedXid);
+ 		}
+ 
+ 		ProcArrayUpdateRecoveryTransactions(lsn, xlrec);
+ 	}
  	else
  		elog(PANIC, "xact_redo: unknown op code %u", info);
  }
*************** xact_desc_abort(StringInfo buf, xl_xact_
*** 4433,4438 ****
--- 5020,5063 ----
  	}
  }
  
+ static void
+ xact_desc_running_xacts(StringInfo buf, xl_xact_running_xacts *xlrec)
+ {
+ 	int				xid_index,
+ 					subxid_index;
+ 	TransactionId 	*subxip = (TransactionId *) &(xlrec->xrun[xlrec->xcnt]);
+ 
+ 	appendStringInfo(buf, "nxids %u nsubxids %u xlatest %d", 
+ 								xlrec->xcnt, 
+ 								xlrec->subxcnt,
+ 								xlrec->xlatest);
+ 
+ 	for (xid_index = 0; xid_index < xlrec->xcnt; xid_index++)
+ 	{
+ 		RunningXact		*rxact = (RunningXact *) xlrec->xrun;
+ 
+ 		appendStringInfo(buf, "; xid %d pid %d backend %d db %d role %d " 
+ 							  "vacflag %u nsubxids %u offset %d overflowed %s",
+ 								rxact[xid_index].xid,
+ 								rxact[xid_index].pid,
+ 								rxact[xid_index].slotId,
+ 								rxact[xid_index].databaseId,
+ 								rxact[xid_index].roleId,
+ 								rxact[xid_index].vacuumFlags,
+ 								rxact[xid_index].nsubxids,
+ 								rxact[xid_index].subx_offset,
+ 								(rxact[xid_index].overflowed ? "t" : "f"));
+ 
+ 		if (rxact[xid_index].nsubxids > 0)
+ 		{
+ 			appendStringInfo(buf, "; subxacts: ");
+ 			for (subxid_index = 0; subxid_index < rxact[xid_index].nsubxids; subxid_index++)
+ 				appendStringInfo(buf, " %u", 
+ 						subxip[subxid_index + rxact[xid_index].subx_offset]);
+ 		}
+ 	}
+ }
+ 
  void
  xact_desc(StringInfo buf, uint8 xl_info, char *rec)
  {
*************** xact_desc(StringInfo buf, uint8 xl_info,
*** 4470,4475 ****
--- 5095,5115 ----
  		appendStringInfo(buf, "abort %u: ", xlrec->xid);
  		xact_desc_abort(buf, &xlrec->arec);
  	}
+ 	else if (info == XLOG_XACT_ASSIGNMENT)
+ 	{
+ 		xl_xact_assignment *xlrec = (xl_xact_assignment *) rec;
+ 
+ 		/* ignore the main xid, it may be Invalid and misleading */
+ 		appendStringInfo(buf, "assignment: xid %u slotid %d", 
+ 							xlrec->xassign, xlrec->slotId);
+ 	}
+ 	else if (info == XLOG_XACT_RUNNING_XACTS)
+ 	{
+ 		xl_xact_running_xacts *xlrec = (xl_xact_running_xacts *) rec;
+ 
+ 		appendStringInfo(buf, "running xacts: ");
+ 		xact_desc_running_xacts(buf, xlrec);
+ 	}
  	else
  		appendStringInfo(buf, "UNKNOWN");
  }
Index: src/backend/access/transam/xlog.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/access/transam/xlog.c,v
retrieving revision 1.319
diff -c -w -p -r1.319 xlog.c
*** src/backend/access/transam/xlog.c	23 Sep 2008 09:20:35 -0000	1.319
--- src/backend/access/transam/xlog.c	16 Oct 2008 12:50:49 -0000
***************
*** 49,54 ****
--- 49,55 ----
  #include "utils/guc.h"
  #include "utils/ps_status.h"
  
+ #define WAL_DEBUG
  
  /* File path names (all relative to $PGDATA) */
  #define BACKUP_LABEL_FILE		"backup_label"
*************** bool		log_checkpoints = false;
*** 68,74 ****
  int 		sync_method = DEFAULT_SYNC_METHOD;
  
  #ifdef WAL_DEBUG
! bool		XLOG_DEBUG = false;
  #endif
  
  /*
--- 69,77 ----
  int 		sync_method = DEFAULT_SYNC_METHOD;
  
  #ifdef WAL_DEBUG
! bool		XLOG_DEBUG_FLUSH = false;
! bool		XLOG_DEBUG_BGFLUSH = false;
! bool		XLOG_DEBUG_REDO = true;
  #endif
  
  /*
*************** XLogInsert(RmgrId rmid, uint8 info, XLog
*** 473,478 ****
--- 476,483 ----
  	XLogRecData dtbuf_rdt1[XLR_MAX_BKP_BLOCKS];
  	XLogRecData dtbuf_rdt2[XLR_MAX_BKP_BLOCKS];
  	XLogRecData dtbuf_rdt3[XLR_MAX_BKP_BLOCKS];
+ 	TransactionId	xl_xid2 = InvalidTransactionId;
+ 	uint16			xl_info2 = 0;
  	pg_crc32	rdata_crc;
  	uint32		len,
  				write_len;
*************** begin:;
*** 628,633 ****
--- 633,643 ----
  	if (len == 0 && !isLogSwitch)
  		elog(PANIC, "invalid xlog record length %u", len);
  
+ 	/* 
+ 	 * Get standby information before we do lock and critical section.
+ 	 */
+ 	GetStandbyInfoForTransaction(rmid, info, rdata, &xl_xid2, &xl_info2);
+ 
  	START_CRIT_SECTION();
  
  	/* Now wait to get insert lock */
*************** begin:;
*** 816,821 ****
--- 826,833 ----
  	record->xl_len = len;		/* doesn't include backup blocks */
  	record->xl_info = info;
  	record->xl_rmid = rmid;
+ 	record->xl_xid2 = xl_xid2;
+ 	record->xl_info2 = xl_info2;
  
  	/* Now we can finish computing the record's CRC */
  	COMP_CRC32(rdata_crc, (char *) record + sizeof(pg_crc32),
*************** begin:;
*** 823,847 ****
  	FIN_CRC32(rdata_crc);
  	record->xl_crc = rdata_crc;
  
- #ifdef WAL_DEBUG
- 	if (XLOG_DEBUG)
- 	{
- 		StringInfoData buf;
- 
- 		initStringInfo(&buf);
- 		appendStringInfo(&buf, "INSERT @ %X/%X: ",
- 						 RecPtr.xlogid, RecPtr.xrecoff);
- 		xlog_outrec(&buf, record);
- 		if (rdata->data != NULL)
- 		{
- 			appendStringInfo(&buf, " - ");
- 			RmgrTable[record->xl_rmid].rm_desc(&buf, record->xl_info, rdata->data);
- 		}
- 		elog(LOG, "%s", buf.data);
- 		pfree(buf.data);
- 	}
- #endif
- 
  	/* Record begin of record in appropriate places */
  	ProcLastRecPtr = RecPtr;
  	Insert->PrevRecord = RecPtr;
--- 835,840 ----
*************** XLogFlush(XLogRecPtr record)
*** 1729,1735 ****
  		return;
  
  #ifdef WAL_DEBUG
! 	if (XLOG_DEBUG)
  		elog(LOG, "xlog flush request %X/%X; write %X/%X; flush %X/%X",
  			 record.xlogid, record.xrecoff,
  			 LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
--- 1722,1728 ----
  		return;
  
  #ifdef WAL_DEBUG
! 	if (XLOG_DEBUG_FLUSH)
  		elog(LOG, "xlog flush request %X/%X; write %X/%X; flush %X/%X",
  			 record.xlogid, record.xrecoff,
  			 LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
*************** XLogBackgroundFlush(void)
*** 1879,1885 ****
  		return;
  
  #ifdef WAL_DEBUG
! 	if (XLOG_DEBUG)
  		elog(LOG, "xlog bg flush request %X/%X; write %X/%X; flush %X/%X",
  			 WriteRqstPtr.xlogid, WriteRqstPtr.xrecoff,
  			 LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
--- 1872,1878 ----
  		return;
  
  #ifdef WAL_DEBUG
! 	if (XLOG_DEBUG_BGFLUSH)
  		elog(LOG, "xlog bg flush request %X/%X; write %X/%X; flush %X/%X",
  			 WriteRqstPtr.xlogid, WriteRqstPtr.xrecoff,
  			 LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
*************** BootStrapXLOG(void)
*** 4311,4316 ****
--- 4304,4310 ----
  	record->xl_prev.xlogid = 0;
  	record->xl_prev.xrecoff = 0;
  	record->xl_xid = InvalidTransactionId;
+ 	record->xl_xid2 = InvalidTransactionId;
  	record->xl_tot_len = SizeOfXLogRecord + sizeof(checkPoint);
  	record->xl_len = sizeof(checkPoint);
  	record->xl_info = XLOG_CHECKPOINT_SHUTDOWN;
*************** StartupXLOG(void)
*** 5097,5103 ****
  			do
  			{
  #ifdef WAL_DEBUG
! 				if (XLOG_DEBUG)
  				{
  					StringInfoData buf;
  
--- 5091,5097 ----
  			do
  			{
  #ifdef WAL_DEBUG
! 				if (XLogRecIsFirstUseOfXid(record) || rmid == RM_XACT_ID)
  				{
  					StringInfoData buf;
  
*************** StartupXLOG(void)
*** 5143,5148 ****
--- 5137,5145 ----
  				if (record->xl_info & XLR_BKP_BLOCK_MASK)
  					RestoreBkpBlocks(record, EndRecPtr);
  
+ 				if (XLogRecIsFirstUseOfXid(record))
+ 					RecordKnownAssignedTransactionIds(EndRecPtr, record);
+ 
  				RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
  
  				/* Pop the error context stack */
*************** StartupXLOG(void)
*** 5349,5354 ****
--- 5346,5353 ----
  	ShmemVariableCache->latestCompletedXid = ShmemVariableCache->nextXid;
  	TransactionIdRetreat(ShmemVariableCache->latestCompletedXid);
  
+ 	ProcArrayClearRecoveryTransactions();
+ 
  	/* Start up the commit log and related stuff, too */
  	StartupCLOG();
  	StartupSUBTRANS(oldestActiveXID);
*************** CreateCheckPoint(int flags)
*** 6007,6012 ****
--- 6006,6014 ----
  		LogCheckpointEnd();
  
  	LWLockRelease(CheckpointLock);
+ 
+ 	if (!shutdown)
+ 		LogCurrentRunningXacts();
  }
  
  /*
*************** xlog_redo(XLogRecPtr lsn, XLogRecord *re
*** 6190,6195 ****
--- 6192,6200 ----
  		MultiXactSetNextMXact(checkPoint.nextMulti,
  							  checkPoint.nextMultiOffset);
  
+ 		/* We know nothing was running on the master at this point */
+ 		ProcArrayClearRecoveryTransactions();
+ 
  		/* ControlFile->checkPointCopy always tracks the latest ckpt XID */
  		ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch;
  		ControlFile->checkPointCopy.nextXid = checkPoint.nextXid;
*************** xlog_outrec(StringInfo buf, XLogRecord *
*** 6300,6305 ****
--- 6305,6317 ----
  					 record->xl_prev.xlogid, record->xl_prev.xrecoff,
  					 record->xl_xid);
  
+ 	appendStringInfo(buf, "; pxid %u %s %s len %u slot %d",
+ 					 record->xl_xid2, 
+ 					 (XLogRecIsFirstXidRecord(record) ? "t" : "f"),
+ 					 (XLogRecIsFirstSubXidRecord(record) ? "t" : "f"),	
+ 					  record->xl_len,
+ 					 XLogRecGetSlotId(record));
+ 
  	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
  	{
  		if (record->xl_info & XLR_SET_BKP_BLOCK(i))
Index: src/backend/storage/ipc/procarray.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/storage/ipc/procarray.c,v
retrieving revision 1.46
diff -c -w -p -r1.46 procarray.c
*** src/backend/storage/ipc/procarray.c	4 Aug 2008 18:03:46 -0000	1.46
--- src/backend/storage/ipc/procarray.c	16 Oct 2008 12:44:29 -0000
***************
*** 17,22 ****
--- 17,29 ----
   * as are the myProcLocks lists.  They can be distinguished from regular
   * backend PGPROCs at need by checking for pid == 0.
   *
+  * The process array now also includes PGPROC structures representing
+  * transactions being recovered. The xid and subxids fields of these are valid,
+  * though that is all.  They can also be distinguished from regular backend
+  * PGPROCs at need by checking for pid == 0. The proc array also has an
+  * additional array of UnobservedXids representing transactions that are
+  * known to be running on the master but for which we do not yet know the
+  * slotId, so cannot be assigned to the correct recovery proc.
   *
   * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
   * Portions Copyright (c) 1994, Regents of the University of California
***************
*** 33,56 ****
  
  #include "access/subtrans.h"
  #include "access/transam.h"
! #include "access/xact.h"
  #include "access/twophase.h"
  #include "miscadmin.h"
  #include "storage/procarray.h"
  #include "utils/snapmgr.h"
  
  
  /* Our shared memory area */
  typedef struct ProcArrayStruct
  {
  	int			numProcs;		/* number of valid procs entries */
! 	int			maxProcs;		/* allocated size of procs array */
  
  	/*
  	 * We declare procs[] as 1 entry because C wants a fixed-size array, but
  	 * actually it is maxProcs entries long.
  	 */
  	PGPROC	   *procs[1];		/* VARIABLE LENGTH ARRAY */
  } ProcArrayStruct;
  
  static ProcArrayStruct *procArray;
--- 40,79 ----
  
  #include "access/subtrans.h"
  #include "access/transam.h"
! #include "access/xlog.h"
  #include "access/twophase.h"
  #include "miscadmin.h"
+ #include "storage/proc.h"
  #include "storage/procarray.h"
  #include "utils/snapmgr.h"
  
+ static RunningXactsData	CurrentRunningXactsData;
+ 
+ /* Handy constant for an invalid xlog recptr */
+ static const XLogRecPtr InvalidXLogRecPtr = {0, 0};
+ 
+ void ProcArrayDisplay(void);
+ 
  
  /* Our shared memory area */
  typedef struct ProcArrayStruct
  {
  	int			numProcs;		/* number of valid procs entries */
! 	int			maxProcs;			/* allocated size of total procs array */
! 
! 	int			maxRecoveryProcs;	/* number of allocated recovery procs */
! 
! 	int			numUnobservedXids;	/* number of valid unobserved xids */
! 	int			maxUnobservedXids;	/* allocated size of unobserved array */
! 	bool		overflowUnobservedXids; 	/* array has overflowed */
  
  	/*
  	 * We declare procs[] as 1 entry because C wants a fixed-size array, but
  	 * actually it is maxProcs entries long.
  	 */
  	PGPROC	   *procs[1];		/* VARIABLE LENGTH ARRAY */
+ 
+ 	/* ARRAY OF UNOBSERVED TRANSACTION XIDs FOLLOWS */
  } ProcArrayStruct;
  
  static ProcArrayStruct *procArray;
*************** ProcArrayShmemSize(void)
*** 100,107 ****
  	Size		size;
  
  	size = offsetof(ProcArrayStruct, procs);
! 	size = add_size(size, mul_size(sizeof(PGPROC *),
! 								 add_size(MaxBackends, max_prepared_xacts)));
  
  	return size;
  }
--- 123,141 ----
  	Size		size;
  
  	size = offsetof(ProcArrayStruct, procs);
! 
! 	/* Normal processing */
! 	/* MyProc slots */
! 	size = add_size(size, mul_size(sizeof(PGPROC *), MaxBackends));
! 	size = add_size(size, mul_size(sizeof(PGPROC *), max_prepared_xacts));
! 
! 	/* Recovery processing */
! 
! 	/* Recovery Procs */
! 	size = add_size(size, mul_size(sizeof(PGPROC *), MaxBackends));
! 	/* UnobservedXids */
! 	size = add_size(size, mul_size(sizeof(TransactionId), MaxBackends));
! 	size = add_size(size, mul_size(sizeof(TransactionId), MaxBackends));
  
  	return size;
  }
*************** CreateSharedProcArray(void)
*** 123,130 ****
--- 157,197 ----
  		/*
  		 * We're the first - initialize.
  		 */
+ 		/* Normal processing */
  		procArray->numProcs = 0;
  		procArray->maxProcs = MaxBackends + max_prepared_xacts;
+ 
+ 		/* Recovery processing */
+ 		procArray->maxRecoveryProcs = MaxBackends;
+ 		procArray->maxProcs += procArray->maxRecoveryProcs;
+ 
+ 		procArray->maxUnobservedXids = 2 * MaxBackends;
+ 		procArray->numUnobservedXids = 0;
+ 		procArray->overflowUnobservedXids = false;
+ 
+ 		if (!IsUnderPostmaster)
+ 		{
+ 			int			i;
+ 
+ 			/*
+ 			 * Create and add the Procs for recovery emulation.
+ 			 *
+ 			 * We do this now, so that we can identify which Recovery Proc
+ 			 * goes with each normal backend. Normal procs were allocated
+ 			 * first so we can use the slotId of the *proc* to look up
+ 			 * the Recovery Proc in the *procarray*. Recovery Procs never
+ 			 * move around in the procarray, whereas normal procs do.
+ 			 * e.g. Proc with slotId=7 is always associated with procarray[7]
+ 			 * for recovery processing. see also 
+ 			 */
+ 			for (i = 0; i < procArray->maxRecoveryProcs; i++)
+ 			{
+ 				PGPROC	*RecoveryProc = InitRecoveryProcess();
+ 
+ 				ProcArrayAdd(RecoveryProc);
+ 			}
+ 			elog(DEBUG3, "Added %d Recovery Procs", i);
+ 		}
  	}
  }
  
*************** ProcArrayRemove(PGPROC *proc, Transactio
*** 213,218 ****
--- 280,336 ----
  	elog(LOG, "failed to find proc %p in ProcArray", proc);
  }
  
+ /*
+  * ProcArrayStartRecoveryTransaction
+  * 
+  * Update Recovery Proc to show transaction is complete. There is no
+  * locking here. It is either handled by caller, or potentially 
+  * ignored (see comments for GetNewTransactionId()).
+  *
+  * In recovery we supply an LSN also, to ensure we can tell which of
+  * several inputs is the latest information on the state of the proc.
+  * 
+  * There is no ProcArrayStartNormalTransaction, that is handled by
+  * GetNewTransactionId in varsup.c
+  */
+ void
+ ProcArrayStartRecoveryTransaction(PGPROC *proc, TransactionId xid, XLogRecPtr lsn, bool isSubXact)
+ {
+ 	if (proc->slotId == 93)
+ 		elog(LOG, "start recovery xid = %d lsn = %X/%X %s",
+ 					xid, lsn.xlogid, lsn.xrecoff, (isSubXact ? "(SUB)" : ""));
+ 	/*
+ 	 * Use volatile pointer to prevent code rearrangement; other backends
+ 	 * could be examining my subxids info concurrently, and we don't want
+ 	 * them to see an invalid intermediate state, such as incrementing
+ 	 * nxids before filling the array entry.  Note we are assuming that
+ 	 * TransactionId and int fetch/store are atomic.
+ 	 */
+ 	{
+ 		volatile PGPROC *myproc = proc;
+ 
+ 		proc->lsn = lsn;
+ 
+ 		if (!isSubXact)
+ 			myproc->xid = xid;
+ 		else
+ 		{
+ 			int			nxids = myproc->subxids.nxids;
+ 
+ 			if (nxids < PGPROC_MAX_CACHED_SUBXIDS)
+ 			{
+ 				myproc->subxids.xids[nxids] = xid;
+ 				myproc->subxids.nxids = nxids + 1;
+ 			}
+ 			else
+ 				myproc->subxids.overflowed = true;
+ 		}
+ 	}
+ 
+ 	if (proc->slotId == 93)
+ 		elog(LOG, "start recovery xid = %d lsn = %X/%X %s",
+ 					proc->xid, proc->lsn.xlogid, proc->lsn.xrecoff, (isSubXact ? "(SUB)" : ""));
+ }
  
  /*
   * ProcArrayEndTransaction -- mark a transaction as no longer running
*************** ProcArrayRemove(PGPROC *proc, Transactio
*** 220,226 ****
   * This is used interchangeably for commit and abort cases.  The transaction
   * commit/abort must already be reported to WAL and pg_clog.
   *
!  * proc is currently always MyProc, but we pass it explicitly for flexibility.
   * latestXid is the latest Xid among the transaction's main XID and
   * subtransactions, or InvalidTransactionId if it has no XID.  (We must ask
   * the caller to pass latestXid, instead of computing it from the PGPROC's
--- 338,346 ----
   * This is used interchangeably for commit and abort cases.  The transaction
   * commit/abort must already be reported to WAL and pg_clog.
   *
!  * In normal running proc is currently always MyProc, but in recovery we pass
!  * one of the recovery procs.
!  *
   * latestXid is the latest Xid among the transaction's main XID and
   * subtransactions, or InvalidTransactionId if it has no XID.  (We must ask
   * the caller to pass latestXid, instead of computing it from the PGPROC's
*************** ProcArrayClearTransaction(PGPROC *proc)
*** 301,306 ****
--- 421,427 ----
  	proc->xid = InvalidTransactionId;
  	proc->lxid = InvalidLocalTransactionId;
  	proc->xmin = InvalidTransactionId;
+ 	proc->lsn = InvalidXLogRecPtr;
  
  	/* redundant, but just in case */
  	proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
*************** ProcArrayClearTransaction(PGPROC *proc)
*** 311,316 ****
--- 432,590 ----
  	proc->subxids.overflowed = false;
  }
  
+ /*
+  * ProcArrayClearRecoveryTransactions
+  *
+  * Called during recovery when we see a Shutdown checkpoint or EndRecovery
+  * record, or at the end of recovery processing.
+  */
+ void
+ ProcArrayClearRecoveryTransactions(void)
+ {
+ 	ProcArrayStruct *arrayP = procArray;
+ 	int			index;
+ 
+ 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+ 
+ 	/*
+ 	 * Reset Recovery Procs
+ 	 */
+ 	for (index = 0; index < arrayP->maxRecoveryProcs; index++)
+ 	{
+ 		PGPROC	*RecoveryProc = arrayP->procs[index];
+ 
+ 		ProcArrayClearTransaction(RecoveryProc);
+ 	}
+ 
+ 	/*
+ 	 * Clear the UnobservedXids also
+ 	 */
+ 	UnobservedTransactionsClearXids();
+ 
+ 	LWLockRelease(ProcArrayLock);
+ }
+ 
+ bool
+ XidInRecoveryProcs(TransactionId xid)
+ {
+ 	ProcArrayStruct *arrayP = procArray;
+ 	int				index;
+ 
+ 	for (index = 0; index < arrayP->maxRecoveryProcs; index++)
+ 	{
+ 		PGPROC	*RecoveryProc = arrayP->procs[index];
+ 
+ 		if (RecoveryProc->xid == xid)
+ 			return true;
+ 	}
+ 	return false;
+ }
+ 
+ void
+ ProcArrayDisplay(void)
+ {
+ 	ProcArrayStruct *arrayP = procArray;
+ 	int			index;
+ 
+ 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+ 
+ 	for (index = 0; index < arrayP->maxRecoveryProcs; index++)
+ 	{
+ 		PGPROC	*RecoveryProc = arrayP->procs[index];
+ 
+ 		if (TransactionIdIsValid(RecoveryProc->xid))
+ 			elog(LOG, "proc %d proc->xid %d proc->lsn %X/%X", index, RecoveryProc->xid, 
+ 								RecoveryProc->lsn.xlogid, RecoveryProc->lsn.xrecoff);
+ 	}
+ 
+ 	UnobservedTransactionsDisplay();
+ 
+ 	LWLockRelease(ProcArrayLock);
+ }
+ 
+ /*
+  * Use the data about running transactions on master to either create the
+  * initial state of the Recovery Procs, or maintain correctness of their
+  * state. This is almost the opposite of GetSnapshotData().
+  *
+  * Only used during recovery.
+  */
+ void
+ ProcArrayUpdateRecoveryTransactions(XLogRecPtr lsn, xl_xact_running_xacts *xlrec)
+ {
+ 	PGPROC 			*proc;
+ 	int				xid_index;
+ 	TransactionId 	*subxip = (TransactionId *) &(xlrec->xrun[xlrec->xcnt]);
+ 
+ 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+ 
+ 	for (xid_index = 0; xid_index < xlrec->xcnt; xid_index++)
+ 	{
+ 		RunningXact		*rxact = (RunningXact *) xlrec->xrun;
+ 
+ 		proc = SlotIdGetRecoveryProc(rxact[xid_index].slotId);
+ 
+ 		elog(LOG, "----------------------------------------------------------");
+ 		elog(LOG, "runinng xact proc->lsn %X/%X lsn %X/%X proc->xid %d xid %d",
+ 					proc->lsn.xlogid, proc->lsn.xrecoff,
+ 					lsn.xlogid, lsn.xrecoff, proc->xid, rxact[xid_index].xid);
+ 		/*
+ 		 * If our state information is later for this proc, then 
+ 		 * overwrite it. It's possible for a commit and possibly
+ 		 * a new transaction record to have arrived in WAL in between
+ 		 * us doing GetRunningTransactionData() and grabbing the
+ 		 * WALInsertLock, so we musn't assume we know best always.
+ 		 */
+ 		if (XLByteLT(proc->lsn, lsn))
+ 		{
+ 			proc->lsn = lsn;
+ 			proc->xid = rxact[xid_index].xid;
+ 			/* proc-> pid stays 0 for Recovery Procs */
+ 			/* proc->slotId should never be touched */
+ 			proc->databaseId = rxact[xid_index].databaseId;
+ 			proc->roleId = rxact[xid_index].roleId;
+ 			proc->vacuumFlags = rxact[xid_index].vacuumFlags;
+ 
+ 			proc->subxids.nxids = rxact[xid_index].nsubxids;
+ 			proc->subxids.overflowed = rxact[xid_index].overflowed;
+ 
+ 			memcpy(proc->subxids.xids, subxip, 
+ 						rxact[xid_index].nsubxids * sizeof(TransactionId));
+ 		}
+ 	}
+ 
+ 	/*
+ 	 * Are there any Recovery Procs that we have missed? If so,
+ 	 * we have to go and update them too. This can happen when a 
+ 	 * backend terminates with a FATAL error, not writing an abort
+ 	 * record and then does not reconnect and reuse the slotId.
+ 	 * But we don't have a count of currently active Recovery Procs.
+ 	 * Should we? Would we trust it?
+ 	 */
+ 	/* XXX: grovel through proc array looking for backends that were
+ 	 * not mentioned in the incoming set, and then zero them
+ 	 */
+ 
+ 	/*
+ 	 * If we know we have all running transactions, clear UnobservedXids,
+ 	 * otherwise go through and check each running xid is not present.
+ 	 */
+ 	if (TransactionIdIsValid(xlrec->xlatest))
+ 		UnobservedTransactionsClearXids();
+ 	else
+ 	{
+ 		/* XXX:
+ 		for ()
+ 		{
+ 			break if list empty;
+ 		}
+ 		*/		
+ 	}
+ 
+ 	LWLockRelease(ProcArrayLock);
+ 
+ 	ProcArrayDisplay();
+ }
  
  /*
   * TransactionIdIsInProgress -- is given transaction running in some backend
*************** GetOldestXmin(bool allDbs, bool ignoreVa
*** 655,661 ****
   * but since PGPROC has only a limited cache area for subxact XIDs, full
   * information may not be available.  If we find any overflowed subxid arrays,
   * we have to mark the snapshot's subxid data as overflowed, and extra work
!  * will need to be done to determine what's running (see XidInMVCCSnapshot()
   * in tqual.c).
   *
   * We also update the following backend-global variables:
--- 929,935 ----
   * but since PGPROC has only a limited cache area for subxact XIDs, full
   * information may not be available.  If we find any overflowed subxid arrays,
   * we have to mark the snapshot's subxid data as overflowed, and extra work
!  * may need to be done to determine what's running (see XidInMVCCSnapshot()
   * in tqual.c).
   *
   * We also update the following backend-global variables:
*************** GetSnapshotData(Snapshot snapshot)
*** 680,685 ****
--- 954,960 ----
  	int			index;
  	int			count = 0;
  	int			subcount = 0;
+ 	bool		suboverflowed = false;
  
  	Assert(snapshot != NULL);
  
*************** GetSnapshotData(Snapshot snapshot)
*** 706,725 ****
  					(errcode(ERRCODE_OUT_OF_MEMORY),
  					 errmsg("out of memory")));
  		Assert(snapshot->subxip == NULL);
  		snapshot->subxip = (TransactionId *)
! 			malloc(arrayP->maxProcs * PGPROC_MAX_CACHED_SUBXIDS * sizeof(TransactionId));
  		if (snapshot->subxip == NULL)
  			ereport(ERROR,
  					(errcode(ERRCODE_OUT_OF_MEMORY),
  					 errmsg("out of memory")));
  	}
  
  	/*
  	 * It is sufficient to get shared lock on ProcArrayLock, even if we are
  	 * going to set MyProc->xmin.
  	 */
  	LWLockAcquire(ProcArrayLock, LW_SHARED);
  
  	/* xmax is always latestCompletedXid + 1 */
  	xmax = ShmemVariableCache->latestCompletedXid;
  	Assert(TransactionIdIsNormal(xmax));
--- 981,1022 ----
  					(errcode(ERRCODE_OUT_OF_MEMORY),
  					 errmsg("out of memory")));
  		Assert(snapshot->subxip == NULL);
+ #define maxNumSubXids	(arrayP->maxProcs * PGPROC_MAX_CACHED_SUBXIDS)
  		snapshot->subxip = (TransactionId *)
! 			malloc(maxNumSubXids * sizeof(TransactionId));
  		if (snapshot->subxip == NULL)
  			ereport(ERROR,
  					(errcode(ERRCODE_OUT_OF_MEMORY),
  					 errmsg("out of memory")));
  	}
  
+ /* XXX we expect to be able to undef this after testing */
+ #define UNOBSERVED_XIDS_CAN_OVERFLOW
+ 
+ #ifdef UNOBSERVED_XIDS_CAN_OVERFLOW
+ retry:
+ #endif
+ 
  	/*
  	 * It is sufficient to get shared lock on ProcArrayLock, even if we are
  	 * going to set MyProc->xmin.
  	 */
  	LWLockAcquire(ProcArrayLock, LW_SHARED);
  
+ #ifdef UNOBSERVED_XIDS_CAN_OVERFLOW
+ 	/*
+ 	 * If UnobservedXids has overflowed then we cannot make a valid snapshot.
+ 	 * This will only ever happen in recovery processing and only when
+ 	 */
+ 	if (arrayP->overflowUnobservedXids)
+ 	{
+ 		LWLockRelease(ProcArrayLock);
+ 		elog(WARNING, "unable to obtain valid snapshot: unobserved xids overflow");
+ 		pg_usleep(10000L);
+ 		goto retry;
+ 	}
+ #endif
+ 
  	/* xmax is always latestCompletedXid + 1 */
  	xmax = ShmemVariableCache->latestCompletedXid;
  	Assert(TransactionIdIsNormal(xmax));
*************** GetSnapshotData(Snapshot snapshot)
*** 771,779 ****
  		}
  
  		/*
! 		 * Save subtransaction XIDs if possible (if we've already overflowed,
! 		 * there's no point).  Note that the subxact XIDs must be later than
! 		 * their parent, so no need to check them against xmin.  We could
  		 * filter against xmax, but it seems better not to do that much work
  		 * while holding the ProcArrayLock.
  		 *
--- 1068,1075 ----
  		}
  
  		/*
! 		 * Save subtransaction XIDs. Note that the subxact XIDs must be later
! 		 * than their parent, so no need to check them against xmin.  We could
  		 * filter against xmax, but it seems better not to do that much work
  		 * while holding the ProcArrayLock.
  		 *
*************** GetSnapshotData(Snapshot snapshot)
*** 784,806 ****
  		 *
  		 * Again, our own XIDs are not included in the snapshot.
  		 */
! 		if (subcount >= 0 && proc != MyProc)
! 		{
! 			if (proc->subxids.overflowed)
! 				subcount = -1;	/* overflowed */
! 			else
  			{
  				int			nxids = proc->subxids.nxids;
  
  				if (nxids > 0)
  				{
  					memcpy(snapshot->subxip + subcount,
  						   (void *) proc->subxids.xids,
  						   nxids * sizeof(TransactionId));
  					subcount += nxids;
  				}
  			}
  		}
  	}
  
  	if (!TransactionIdIsValid(MyProc->xmin))
--- 1080,1172 ----
  		 *
  		 * Again, our own XIDs are not included in the snapshot.
  		 */
! 		if (proc != MyProc)
  			{
  				int			nxids = proc->subxids.nxids;
  
  				if (nxids > 0)
  				{
+ 				if (proc->subxids.overflowed)
+ 					suboverflowed = true;
+ 
  					memcpy(snapshot->subxip + subcount,
  						   (void *) proc->subxids.xids,
  						   nxids * sizeof(TransactionId));
  					subcount += nxids;
  				}
+ 
+ 			}
+ 		}
+ 
+ 	/*
+ 	 * Also check for unobserved xids. There is no need for us to specify
+ 	 * only if IsRecoveryProcessingMode(), since the list will always be
+ 	 * empty when normal processing begins and the test will be optimised
+ 	 * to nearly nothing very quickly.
+ 	 */
+ 	for (index = 0; index < arrayP->numUnobservedXids; index++)
+ 	{
+ 		volatile TransactionId	*UnobservedXids;
+ 		TransactionId 	xid;
+ 
+ 		UnobservedXids = (TransactionId *) &(arrayP->procs[arrayP->maxProcs]);
+ 
+ 		/* Fetch xid just once - see GetNewTransactionId */
+ 		xid = UnobservedXids[index];
+ 
+ 		/*
+ 		 * If there are no more visible xids, we're done. This works
+ 		 * because UnobservedXids is maintained in strict ascending order.
+ 		 */
+ 		if (!TransactionIdIsNormal(xid) || TransactionIdPrecedes(xid, xmax))
+ 			break;
+ 
+ 		/*
+ 		 * Typically, there will be space in the snapshot. We know that the
+ 		 * unobserved xids are being run by one of the procs marked with
+ 		 * an xid of InvalidTransactionId, so we will have ignored that above,
+ 		 * and the xidcache for that proc will have been empty also.
+ 		 *
+ 		 * We put the unobserved xid anywhere in the snapshot. The xid might
+ 		 * be a top-level or it might be a subtransaction, but it won't
+ 		 * change the answer to XidInMVCCSnapshot() whichever it is. That's
+ 		 * just as well, since we don't know which it is, by definition.
+ 		 */
+ 		if (count < arrayP->maxProcs)
+ 			snapshot->xip[count++] = xid;
+ 		else
+ 		{
+ 			/*
+ 			 * If there is no space left in subxid cache then we will be forced
+ 			 * to look in Subtrans to check for subtransactions when we
+ 			 * run XidInMVCCSnapshot(). If we still have unobserved 
+ 			 * transactions we know they won't be found in subtrans,
+ 			 * so we have to abort our attempt to make a snapshot. 
+ 			 */
+ #ifdef UNOBSERVED_XIDS_CAN_OVERFLOW
+ 			if (subcount >= maxNumSubXids)
+ 			{
+ 				LWLockRelease(ProcArrayLock);
+ 				elog(WARNING, "unable to obtain valid snapshot: subxid overflow");
+ 				pg_usleep(10000L);
+ 				goto retry;
  			}
+ #endif
+ 
+ 			/*
+ 			 * Store unobserved xids in the subxid cache instead.
+ 			 */
+ 			snapshot->subxip[subcount++] = xid;
  		}
+ 
+ 		/*
+ 		 * We don't really need xmin during recovery, but lets derive
+ 		 * it anyway for consistency. It is possible that an unobserved
+ 		 * xid could be xmin if there is contention between long-lived 
+ 		 * transactions.
+ 		 */
+ 		if (TransactionIdPrecedes(xid, xmin))
+ 			xmin = xid;
  	}
  
  	if (!TransactionIdIsValid(MyProc->xmin))
*************** GetSnapshotData(Snapshot snapshot)
*** 824,829 ****
--- 1190,1196 ----
  	snapshot->xmax = xmax;
  	snapshot->xcnt = count;
  	snapshot->subxcnt = subcount;
+ 	snapshot->suboverflowed = suboverflowed;
  
  	snapshot->curcid = GetCurrentCommandId(false);
  
*************** GetSnapshotData(Snapshot snapshot)
*** 839,844 ****
--- 1206,1427 ----
  }
  
  /*
+  * GetRunningTransactionData -- returns information about running transactions.
+  *
+  * Similar to GetSnapshotData but returning more information. We include
+  * all PGPROCs with an assigned TransactionId, even VACUUM processes. We
+  * include slotId and databaseId for each PGPROC. We also keep track
+  * of which subtransactions go with each PGPROC, information which is lost
+  * when we GetSnapshotData.
+  *
+  * This is never executed when IsRecoveryMode() so there is no need to look
+  * at UnobservedXids.
+  *
+  * We don't worry about updating other counters, we want to keep this as
+  * simple as possible and leave GetSnapshotData() as the primary code for
+  * that bookkeeping.
+  */
+ RunningTransactions
+ GetRunningTransactionData(void)
+ {
+ 	ProcArrayStruct *arrayP = procArray;
+ 	RunningTransactions CurrentRunningXacts = (RunningTransactions) &CurrentRunningXactsData;
+ 	RunningXact	*rxact;
+ 	TransactionId *subxip;
+ 	TransactionId xlatest = InvalidTransactionId;
+ 	TransactionId prev_xlatest = InvalidTransactionId;
+ 	int			numAttempts = 0;
+ 	int			index;
+ 	int			count = 0;
+ 	int			subcount = 0;
+ 	bool		suboverflowed = false;
+ 
+ 	/*
+ 	 * Allocating space for maxProcs xids is usually overkill; numProcs would
+ 	 * be sufficient.  But it seems better to do the malloc while not holding
+ 	 * the lock, so we can't look at numProcs.  Likewise, we allocate much
+ 	 * more subxip storage than is probably needed.
+ 	 *
+ 	 * Should only be allocated for bgwriter, since only ever executed
+ 	 * during checkpoints.
+ 	 */
+ 	if (CurrentRunningXacts->xrun == NULL)
+ 	{
+ 		/*
+ 		 * First call
+ 		 */
+ 		CurrentRunningXacts->xrun = (RunningXact *)
+ 			malloc(arrayP->maxProcs * sizeof(RunningXact));
+ 		if (CurrentRunningXacts->xrun == NULL)
+ 			ereport(ERROR,
+ 					(errcode(ERRCODE_OUT_OF_MEMORY),
+ 					 errmsg("out of memory")));
+ 		Assert(CurrentRunningXacts->subxip == NULL);
+ 		CurrentRunningXacts->subxip = (TransactionId *)
+ 			malloc(maxNumSubXids * sizeof(TransactionId));
+ 		if (CurrentRunningXacts->subxip == NULL)
+ 			ereport(ERROR,
+ 					(errcode(ERRCODE_OUT_OF_MEMORY),
+ 					 errmsg("out of memory")));
+ 	}
+ 
+ 	rxact = CurrentRunningXacts->xrun;
+ 	subxip = CurrentRunningXacts->subxip;
+ 
+ 	/* 
+ 	 * Loop until we get a valid snapshot. See exit conditions below.
+ 	 */
+ 	for (;;)
+ 	{
+ 		count = 0;
+ 		subcount = 0;
+ 		suboverflowed = false;
+ 
+ 		LWLockAcquire(ProcArrayLock, LW_SHARED);
+ 
+ 		/*
+ 		 * Spin over procArray checking xid, and subxids. Shared lock is enough
+ 		 * because new transactions don't use locks at all, so LW_EXCLUSIVE
+ 		 * wouldn't be enough to prevent them, so don't bother.
+ 		 */
+ 		for (index = 0; index < arrayP->numProcs; index++)
+ 		{
+ 			volatile PGPROC *proc = arrayP->procs[index];
+ 			TransactionId xid;
+ 			int			nxids;
+ 
+ 			/* Fetch xid just once - see GetNewTransactionId */
+ 			xid = proc->xid;
+ 
+ 			/*
+ 			 * We store all xids, even XIDs >= xmax and our own XID, if any.
+ 			 * But we don't store transactions that don't have a TransactionId
+ 			 * yet because they will not show as running on a standby server.
+ 			 */
+ 			if (!TransactionIdIsValid(xid))
+ 				continue;
+ 
+ 			rxact[count].xid = xid;
+ 			rxact[count].slotId = proc->slotId;
+ 			rxact[count].databaseId = proc->databaseId;
+ 			rxact[count].roleId = proc->roleId;
+ 			rxact[count].vacuumFlags = proc->vacuumFlags;
+ 
+ 			if (TransactionIdPrecedes(xlatest, xid))
+ 				xlatest = xid;
+ 
+ 			/*
+ 			 * Save subtransaction XIDs. 
+ 			 *
+ 			 * The other backend can add more subxids concurrently, but cannot
+ 			 * remove any.	Hence it's important to fetch nxids just once. Should
+ 			 * be safe to use memcpy, though.  (We needn't worry about missing any
+ 			 * xids added concurrently, because they must postdate xmax.)
+ 			 *
+ 			 * Again, our own XIDs *are* included in the snapshot.
+ 			 */
+ 			nxids = proc->subxids.nxids;
+ 
+ 			if (nxids > 0)
+ 			{
+ 				rxact[count].subx_offset = subcount;
+ 
+ 				memcpy(subxip + subcount,
+ 					   (void *) proc->subxids.xids,
+ 					   nxids * sizeof(TransactionId));
+ 				subcount += nxids;
+ 
+ 				if (proc->subxids.overflowed)
+ 				{
+ 					rxact[count].overflowed = true;
+ 					suboverflowed = true;
+ 				}
+ 			}
+ 
+ 			rxact[count].nsubxids = nxids;
+ 
+ 			count++;
+ 		}
+ 
+ 		LWLockRelease(ProcArrayLock);
+ 
+ 		/*
+ 		 * If there's no procs with TransactionIds allocated we need to
+ 		 * find what the last xid assigned was. This takes and releases
+ 		 * XidGenLock, but that shouldn't cause contention in this case.
+ 		 * We could do this as well if the snapshot overflowed, but in
+ 		 * that case we think that XidGenLock might be high, so we punt.
+ 		 *
+ 		 * By the time we do this, another proc may have incremented the
+ 		 * nextxid, so we must rescan the procarray to check whether
+ 		 * there are either new running transactions or the counter is	
+ 		 * the same as before. If transactions appear and disappear
+ 		 * faster than we can do this, we're in trouble. So spin for at
+ 		 * a few 3 attempts before giving up.
+ 		 *
+ 		 * We do it this way to avoid needing to grab XidGenLock in all
+ 		 * cases, which is hardly ever actually required.
+ 		 */
+ 		if (count > 0)
+ 			break;
+ 		else
+ 		{
+ #define MAX_SNAPSHOT_ATTEMPTS 3
+ 			if (numAttempts >= MAX_SNAPSHOT_ATTEMPTS)
+ 			{
+ 				xlatest = InvalidTransactionId;
+ 				break;
+ 			}
+ 
+ 			xlatest = ReadNewTransactionId();
+ 			TransactionIdRetreat(xlatest);
+ 
+ 			if (prev_xlatest == xlatest)
+ 				break;
+ 
+ 			prev_xlatest = xlatest;
+ 			numAttempts++;
+ 		}
+ 	}
+ 
+ 	CurrentRunningXacts->xcnt = count;
+ 	CurrentRunningXacts->subxcnt = subcount;
+ 	if (!suboverflowed)
+ 		CurrentRunningXacts->xlatest = xlatest;
+ 	else
+ 		CurrentRunningXacts->xlatest = InvalidTransactionId;
+ 
+ #define RUNNING_XACT_DEBUG
+ #ifdef RUNNING_XACT_DEBUG
+ 	elog(LOG, "running xacts xcnt %d subxcnt %d xlatest %d",
+ 					CurrentRunningXacts->xcnt,
+ 					CurrentRunningXacts->subxcnt,
+ 					CurrentRunningXacts->xlatest);
+ 
+ 	for (index = 0; index < CurrentRunningXacts->xcnt; index++)
+ 	{
+ 		int j;
+ 		elog(LOG, "xid %d pid %d backend %d db %d role %d nsubxids %d offset %d vf %u, overflow %s",
+ 								CurrentRunningXacts->xrun[index].xid,
+ 								CurrentRunningXacts->xrun[index].pid,
+ 								CurrentRunningXacts->xrun[index].slotId,
+ 								CurrentRunningXacts->xrun[index].databaseId,
+ 								CurrentRunningXacts->xrun[index].roleId,
+ 								CurrentRunningXacts->xrun[index].nsubxids,
+ 								CurrentRunningXacts->xrun[index].subx_offset,
+ 								CurrentRunningXacts->xrun[index].vacuumFlags,
+ 								CurrentRunningXacts->xrun[index].overflowed ? "t" : "f");
+ 		for (j = 0; j < CurrentRunningXacts->xrun[index].nsubxids; j++)
+ 			elog(LOG, "subxid offset %d j %d xid %d", 
+ 								CurrentRunningXacts->xrun[index].subx_offset, j,
+ 								CurrentRunningXacts->subxip[j + CurrentRunningXacts->xrun[index].subx_offset]);
+ 	}
+ #endif
+ 
+ 	return CurrentRunningXacts;
+ }
+ 
+ /*
   * GetTransactionsInCommit -- Get the XIDs of transactions that are committing
   *
   * Constructs an array of XIDs of transactions that are currently in commit
*************** BackendPidGetProc(int pid)
*** 968,973 ****
--- 1551,1580 ----
  }
  
  /*
+  * SlotIdGetRecoveryProc -- get a PGPROC for a given SlotId
+  *
+  * Run during recovery to identify which PGPROC to access.
+  * Throws ERROR if not found, or we pass an invalid value.
+  *
+  * see comments in CreateSharedProcArray()
+  */
+ PGPROC *
+ SlotIdGetRecoveryProc(int slotId)
+ {
+ 	if (slotId < 0 || slotId > MaxBackends)
+ 		elog(ERROR, "invalid slotId %d", slotId);
+ 
+ 	Assert(procArray->procs[slotId] != NULL);
+ 
+ 	/*
+ 	 * No need to acquire ProcArrayLock to identify proc, we just
+ 	 * use the slotId as an array offset directly, since we assigned
+ 	 * these at start.
+ 	 */
+ 	return procArray->procs[slotId];
+ }
+ 
+ /*
   * BackendXidGetPid -- get a backend's pid given its XID
   *
   * Returns 0 if not found or it's a prepared transaction.  Note that
*************** DisplayXidCache(void)
*** 1367,1369 ****
--- 1974,2120 ----
  }
  
  #endif   /* XIDCACHE_DEBUG */
+ 
+ /*
+  * Must be called with ProcArrayLock held.
+  */
+ void
+ UnobservedTransactionsAddXids(TransactionId firstXid, TransactionId lastXid)
+ {
+ 	TransactionId 	ixid = firstXid;
+ 	int 			index = procArray->numUnobservedXids;
+ 	TransactionId *UnobservedXids;
+ 
+ 	UnobservedXids = (TransactionId *) &(procArray->procs[procArray->maxProcs]);
+ 
+ 	Assert(TransactionIdPrecedes(firstXid, lastXid));
+ 
+ 	/*
+ 	 * UnobservedXids is maintained as a ascending list of xids, with no gaps.
+ 	 * Incoming xids are always higher than previous entries, so we just add
+ 	 * them directly to the end of the array.
+ 	 */
+ 	while (ixid != lastXid)
+ 	{
+ 		/*
+ 		 * check to see if we have space to store more UnobservedXids
+ 		 */
+ 		if (index >= procArray->maxUnobservedXids)
+ 		{
+ 			UnobservedTransactionsDisplay();
+ 			elog(FATAL, "No more entries in UnobservedXids array");
+ 			UnobservedTransactionsDisplay();
+ //			procArray->overflowUnobservedXids = true;
+ 			break;
+ 		}
+ 
+ 		/*
+ 		 * append ixid to UnobservedXids
+ 		 */
+ 		Assert(!TransactionIdIsValid(UnobservedXids[index]));
+ 		Assert(index == 0 || TransactionIdPrecedes(UnobservedXids[index - 1], ixid));
+ 
+ 		elog(LOG, "Adding UnobservedXid %d", ixid);
+ 		UnobservedXids[index++] = ixid;
+ 
+ 		TransactionIdAdvance(ixid);
+ 	}
+ 
+ 	procArray->numUnobservedXids = index;
+ }
+ 
+ /*
+  * Must be called with ProcArrayLock held.
+  */
+ void
+ UnobservedTransactionsRemoveXid(TransactionId xid)
+ {
+ 	int 			index;
+ 	bool			found = false;
+ 	TransactionId	*UnobservedXids;
+ 
+ 	UnobservedXids = (TransactionId *) &(procArray->procs[procArray->maxProcs]);
+ 
+ 	elog(LOG, "Remove UnobservedXid = %d", xid);
+ //	UnobservedTransactionsDisplay();
+ 
+ 	/* 
+ 	 * XXX we could use bsearch, if this has significant overhead.
+ 	 */
+ 	for (index = 0; index < procArray->numUnobservedXids; index++)
+ 	{
+ 		if (!found)
+ 		{
+ 			if (UnobservedXids[index] == xid)
+ 				found = true;
+ 		}
+ 		else
+ 		{
+ 			UnobservedXids[index - 1] = UnobservedXids[index];
+ 		}
+ 	}
+ 
+ 	if (found)
+ 		UnobservedXids[--procArray->numUnobservedXids] = InvalidTransactionId;
+ 
+ 	if (!found)
+ 	{
+ 		UnobservedTransactionsDisplay();
+ 		elog(ERROR, "could not remove unobserved xid = %d", xid);
+ 	}
+ }
+ 
+ void
+ UnobservedTransactionsClearXids(void)
+ {
+ 	int				index;
+ 	TransactionId	*UnobservedXids;
+ 
+ 	UnobservedXids = (TransactionId *) &(procArray->procs[procArray->maxProcs]);
+ 
+ 	elog(DEBUG3, "clear UnobservedXids");
+ 	UnobservedTransactionsDisplay();
+ 
+ 	for (index = 0; index < procArray->numUnobservedXids; index++)
+ 	{
+ 		UnobservedXids[index] = InvalidTransactionId;
+ 	}
+ 
+ 	procArray->numUnobservedXids = 0;
+ 	procArray->overflowUnobservedXids = false;
+ 
+ 	UnobservedTransactionsDisplay();
+ }
+ 
+ void
+ UnobservedTransactionsDisplay(void)
+ {
+ #define UNOBSV_XACTS_DEBUG
+ #ifdef UNOBSV_XACTS_DEBUG
+ 	int				index;
+ 	TransactionId	*UnobservedXids;
+ 
+ 	UnobservedXids = (TransactionId *) &(procArray->procs[procArray->maxProcs]);
+ 
+ 	for (index = 0; index < procArray->numUnobservedXids; index++)
+ 	{
+ 		elog(LOG, "%d unobserved[%d] = %d ", procArray->numUnobservedXids, index, UnobservedXids[index]);
+ 	}
+ #endif
+ }
+ 
+ bool
+ XidInUnobservedTransactions(TransactionId xid)
+ {
+ 	int				index;
+ 	TransactionId	*UnobservedXids;
+ 
+ 	UnobservedXids = (TransactionId *) &(procArray->procs[procArray->maxProcs]);
+ 
+ 	for (index = 0; index < procArray->numUnobservedXids; index++)
+ 	{
+ 		if (UnobservedXids[index] == xid)
+ 			return true;
+ 	}
+ 	return false;
+ }
Index: src/backend/storage/lmgr/proc.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/storage/lmgr/proc.c,v
retrieving revision 1.201
diff -c -w -p -r1.201 proc.c
*** src/backend/storage/lmgr/proc.c	9 Jun 2008 18:23:05 -0000	1.201
--- src/backend/storage/lmgr/proc.c	16 Oct 2008 12:44:29 -0000
*************** ProcGlobalShmemSize(void)
*** 103,108 ****
--- 103,110 ----
  	size = add_size(size, mul_size(NUM_AUXILIARY_PROCS, sizeof(PGPROC)));
  	/* MyProcs, including autovacuum */
  	size = add_size(size, mul_size(MaxBackends, sizeof(PGPROC)));
+ 	/* RecoveryProcs, including recovery actions by autovacuum */
+ 	size = add_size(size, mul_size(MaxBackends, sizeof(PGPROC)));
  	/* ProcStructLock */
  	size = add_size(size, sizeof(slock_t));
  
*************** InitProcGlobal(void)
*** 152,157 ****
--- 154,160 ----
  	PGPROC	   *procs;
  	int			i;
  	bool		found;
+ 	int			slotId = 0;
  
  	/* Create the ProcGlobal shared structure */
  	ProcGlobal = (PROC_HDR *)
*************** InitProcGlobal(void)
*** 178,183 ****
--- 181,187 ----
  	/*
  	 * Pre-create the PGPROC structures and create a semaphore for each.
  	 */
+ 
  	procs = (PGPROC *) ShmemAlloc((MaxConnections) * sizeof(PGPROC));
  	if (!procs)
  		ereport(FATAL,
*************** InitProcGlobal(void)
*** 188,193 ****
--- 192,198 ----
  	{
  		PGSemaphoreCreate(&(procs[i].sem));
  		procs[i].links.next = ProcGlobal->freeProcs;
+ 		procs[i].slotId = slotId++;		/* once set, never changed */
  		ProcGlobal->freeProcs = MAKE_OFFSET(&procs[i]);
  	}
  
*************** InitProcGlobal(void)
*** 201,209 ****
--- 206,232 ----
  	{
  		PGSemaphoreCreate(&(procs[i].sem));
  		procs[i].links.next = ProcGlobal->autovacFreeProcs;
+ 		procs[i].slotId = slotId++;		/* once set, never changed */
  		ProcGlobal->autovacFreeProcs = MAKE_OFFSET(&procs[i]);
  	}
  
+ 	/* 
+ 	 * Create enough Recovery Procs so there is a shadow proc for every
+ 	 * normal proc. Recovery procs don't need semaphores. 
+ 	 */
+ 	procs = (PGPROC *) ShmemAlloc((MaxBackends) * sizeof(PGPROC));
+ 	if (!procs)
+ 		ereport(FATAL,
+ 				(errcode(ERRCODE_OUT_OF_MEMORY),
+ 				 errmsg("out of shared memory")));
+ 	MemSet(procs, 0, MaxBackends * sizeof(PGPROC));
+ 	for (i = 0; i < MaxBackends; i++)
+ 	{
+ 		procs[i].links.next = ProcGlobal->freeProcs;
+ 		procs[i].slotId = -1;
+ 		ProcGlobal->freeProcs = MAKE_OFFSET(&procs[i]);
+ 	}
+ 
  	MemSet(AuxiliaryProcs, 0, NUM_AUXILIARY_PROCS * sizeof(PGPROC));
  	for (i = 0; i < NUM_AUXILIARY_PROCS; i++)
  	{
*************** InitProcess(void)
*** 278,284 ****
  
  	/*
  	 * Initialize all fields of MyProc, except for the semaphore which was
! 	 * prepared for us by InitProcGlobal.
  	 */
  	SHMQueueElemInit(&(MyProc->links));
  	MyProc->waitStatus = STATUS_OK;
--- 301,307 ----
  
  	/*
  	 * Initialize all fields of MyProc, except for the semaphore which was
! 	 * prepared for us by InitProcGlobal. Never touch the slotId.
  	 */
  	SHMQueueElemInit(&(MyProc->links));
  	MyProc->waitStatus = STATUS_OK;
*************** InitProcess(void)
*** 322,327 ****
--- 345,432 ----
  }
  
  /*
+  * InitRecoveryProcess -- initialize a per-master process data structure
+  *							for use when emulating transactions in recovery
+  */
+ PGPROC *
+ InitRecoveryProcess(void)
+ {
+ 	/* use volatile pointer to prevent code rearrangement */
+ 	volatile PROC_HDR *procglobal = ProcGlobal;
+ 	SHMEM_OFFSET myOffset;
+ 	PGPROC		*ThisProc = NULL;
+ 
+ 	/*
+ 	 * ProcGlobal should be set up already (if we are a backend, we inherit
+ 	 * this by fork() or EXEC_BACKEND mechanism from the postmaster).
+ 	 */
+ 	if (procglobal == NULL)
+ 		elog(PANIC, "proc header uninitialized");
+ 
+ 	/*
+ 	 * Try to get a proc struct from the free list.  If this fails, we must be
+ 	 * out of PGPROC structures (not to mention semaphores).
+ 	 */
+ 	SpinLockAcquire(ProcStructLock);
+ 
+ 	myOffset = procglobal->freeProcs;
+ 
+ 	if (myOffset != INVALID_OFFSET)
+ 	{
+ 		ThisProc = (PGPROC *) MAKE_PTR(myOffset);
+ 		procglobal->freeProcs = ThisProc->links.next;
+ 		SpinLockRelease(ProcStructLock);
+ 	}
+ 	else
+ 	{
+ 		/*
+ 		 * Should never reach here if shared memory is allocated correctly.
+ 		 */
+ 		SpinLockRelease(ProcStructLock);
+ 		elog(FATAL, "too many procs - could not create recovery proc");
+ 	}
+ 
+ 	/*
+ 	 * xid will be set later as WAL records arrive for this recovery proc
+ 	 */
+ 	ThisProc->xid = InvalidTransactionId;
+ 
+ 	/*
+ 	 * The backendid of the recovery proc stays at InvalidBackendId. There 
+ 	 * is a direct 1:1 correspondence between a master backendid and this
+ 	 * proc, but that same backendid may also be in use during recovery,
+ 	 * so if we set this field we would have duplicate backendids.
+ 	 */
+ 	ThisProc->backendId = InvalidBackendId;
+ 
+ 	/* 
+ 	 * The following are not used in recovery 
+ 	 */
+ 	ThisProc->pid = 0;
+ 
+ 	SHMQueueElemInit(&(ThisProc->links));
+ 	ThisProc->waitStatus = STATUS_OK;
+ 	ThisProc->lxid = InvalidLocalTransactionId;
+ 	ThisProc->xmin = InvalidTransactionId;
+ 	ThisProc->databaseId = InvalidOid;
+ 	ThisProc->roleId = InvalidOid;
+ 	ThisProc->inCommit = false;
+ 	ThisProc->vacuumFlags = 0;
+ 	ThisProc->lwWaiting = false;
+ 	ThisProc->lwExclusive = false;
+ 	ThisProc->lwWaitLink = NULL;
+ 	ThisProc->waitLock = NULL;
+ 	ThisProc->waitProcLock = NULL;
+ 
+ 	/*
+ 	 * There is little else to do. The recovery proc is never used to
+ 	 * acquire buffers, nor will we ever acquire LWlocks using the proc.
+ 	 * Deadlock checker is not active during recovery.
+ 	 */
+ 	return ThisProc;
+ }
+ 
+ /*
   * InitProcessPhase2 -- make MyProc visible in the shared ProcArray.
   *
   * This is separate from InitProcess because we can't acquire LWLocks until
Index: src/backend/utils/time/tqual.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/utils/time/tqual.c,v
retrieving revision 1.110
diff -c -w -p -r1.110 tqual.c
*** src/backend/utils/time/tqual.c	26 Mar 2008 16:20:47 -0000	1.110
--- src/backend/utils/time/tqual.c	16 Oct 2008 12:44:29 -0000
*************** XidInMVCCSnapshot(TransactionId xid, Sna
*** 1238,1263 ****
  		return true;
  
  	/*
! 	 * If the snapshot contains full subxact data, the fastest way to check
! 	 * things is just to compare the given XID against both subxact XIDs and
! 	 * top-level XIDs.	If the snapshot overflowed, we have to use pg_subtrans
! 	 * to convert a subxact XID to its parent XID, but then we need only look
! 	 * at top-level XIDs not subxacts.
  	 */
! 	if (snapshot->subxcnt >= 0)
  	{
! 		/* full data, so search subxip */
! 		int32		j;
! 
! 		for (j = 0; j < snapshot->subxcnt; j++)
! 		{
! 			if (TransactionIdEquals(xid, snapshot->subxip[j]))
  				return true;
  		}
  
! 		/* not there, fall through to search xip[] */
! 	}
! 	else
  	{
  		/* overflowed, so convert xid to top-level */
  		xid = SubTransGetTopmostTransaction(xid);
--- 1238,1257 ----
  		return true;
  
  	/*
! 	 * Compare the given XID against subxact XIDs.
  	 */
! 	for (i = 0; i < snapshot->subxcnt; i++)
  	{
! 		if (TransactionIdEquals(xid, snapshot->subxip[i]))
  				return true;
  		}
  
! 	/*
! 	 * If the snapshot overflowed, we have to use pg_subtrans to convert a 
! 	 * subxact XID to its parent XID, but then we need only look at top-level
! 	 * XIDs not subxacts.
! 	 */
! 	if (snapshot->suboverflowed)
  	{
  		/* overflowed, so convert xid to top-level */
  		xid = SubTransGetTopmostTransaction(xid);
*************** XidInMVCCSnapshot(TransactionId xid, Sna
*** 1270,1275 ****
--- 1264,1272 ----
  			return false;
  	}
  
+ 	/*
+ 	 * Compare the given XID against top-level XIDs.	
+ 	 */
  	for (i = 0; i < snapshot->xcnt; i++)
  	{
  		if (TransactionIdEquals(xid, snapshot->xip[i]))
Index: src/include/access/xact.h
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/include/access/xact.h,v
retrieving revision 1.95
diff -c -w -p -r1.95 xact.h
*** src/include/access/xact.h	11 Aug 2008 11:05:11 -0000	1.95
--- src/include/access/xact.h	16 Oct 2008 12:44:29 -0000
***************
*** 17,22 ****
--- 17,23 ----
  #include "access/xlog.h"
  #include "nodes/pg_list.h"
  #include "storage/relfilenode.h"
+ #include "utils/snapshot.h"
  #include "utils/timestamp.h"
  
  
*************** typedef void (*SubXactCallback) (SubXact
*** 84,95 ****
--- 85,113 ----
  #define XLOG_XACT_ABORT				0x20
  #define XLOG_XACT_COMMIT_PREPARED	0x30
  #define XLOG_XACT_ABORT_PREPARED	0x40
+ #define XLOG_XACT_ASSIGNMENT		0x50
+ #define XLOG_XACT_RUNNING_XACTS		0x60
+ /* 0x70 can also be used, if required */
+ 
+ typedef struct xl_xact_assignment
+ {
+ 	TransactionId	xassign;	/* assigned xid */
+ 	TransactionId	xparent;	/* assigned xids parent, if any */
+ 	bool			isSubXact;	/* is a subtransaction */
+ 	int				slotId;		/* slotId in procarray */
+ } xl_xact_assignment;
+ 
+ /* 
+  * xl_xact_running_xacts is in utils/snapshot.h so it can be passed
+  * around to the same places as snapshots. Not snapmgr.h
+  */
  
  typedef struct xl_xact_commit
  {
  	TimestampTz xact_time;		/* time of commit */
  	int			nrels;			/* number of RelFileForks */
  	int			nsubxacts;		/* number of subtransaction XIDs */
+ 	int			slotId;			/* slotId in procarray */
  	/* Array of RelFileFork(s) to drop at commit */
  	RelFileFork	xnodes[1];		/* VARIABLE LENGTH ARRAY */
  	/* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */
*************** typedef struct xl_xact_abort
*** 102,107 ****
--- 120,126 ----
  	TimestampTz xact_time;		/* time of abort */
  	int			nrels;			/* number of RelFileForks */
  	int			nsubxacts;		/* number of subtransaction XIDs */
+ 	int			slotId;			/* slotId in procarray */
  	/* Array of RelFileFork(s) to drop at abort */
  	RelFileFork	xnodes[1];		/* VARIABLE LENGTH ARRAY */
  	/* ARRAY OF ABORTED SUBTRANSACTION XIDs FOLLOWS */
*************** extern TransactionId RecordTransactionCo
*** 185,190 ****
--- 204,216 ----
  
  extern int	xactGetCommittedChildren(TransactionId **ptr);
  
+ extern void LogCurrentRunningXacts(void);
+ extern void GetStandbyInfoForTransaction(RmgrId rmid, uint8 info,
+ 							XLogRecData *rdata,
+ 							TransactionId *xid2, 
+ 							uint16 *info2);
+ extern void RecordKnownAssignedTransactionIds(XLogRecPtr lsn, XLogRecord *record);
+ 
  extern void xact_redo(XLogRecPtr lsn, XLogRecord *record);
  extern void xact_desc(StringInfo buf, uint8 xl_info, char *rec);
  
Index: src/include/access/xlog.h
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/include/access/xlog.h,v
retrieving revision 1.88
diff -c -w -p -r1.88 xlog.h
*** src/include/access/xlog.h	12 May 2008 08:35:05 -0000	1.88
--- src/include/access/xlog.h	16 Oct 2008 12:44:29 -0000
*************** typedef struct XLogRecord
*** 46,55 ****
  	TransactionId xl_xid;		/* xact id */
  	uint32		xl_tot_len;		/* total len of entire record */
  	uint32		xl_len;			/* total len of rmgr data */
! 	uint8		xl_info;		/* flag bits, see below */
  	RmgrId		xl_rmid;		/* resource manager for this record */
  
! 	/* Depending on MAXALIGN, there are either 2 or 6 wasted bytes here */
  
  	/* ACTUAL LOG DATA FOLLOWS AT END OF STRUCT */
  
--- 46,63 ----
  	TransactionId xl_xid;		/* xact id */
  	uint32		xl_tot_len;		/* total len of entire record */
  	uint32		xl_len;			/* total len of rmgr data */
! 	uint8		xl_info;		/* flag bits, see below (XLR_ entries) */
  	RmgrId		xl_rmid;		/* resource manager for this record */
+ 	uint16		xl_info2;		/* more flag bits, see below (XLR2_ entries) */
  
! 	/*
! 	 * Next we have an additional entry that can have multiple meanings.
! 	 * If XLR2_FIRST_SUBXID_RECORD is set we interpret this as the parent xid.
! 	 * If XLR2_ROW_REMOVAL is set we interpret this as latestRemovedXid.
! 	 */
! 	TransactionId xl_xid2;
! 
! 	/* Above structure has 8 byte alignment */
  
  	/* ACTUAL LOG DATA FOLLOWS AT END OF STRUCT */
  
*************** typedef struct XLogRecord
*** 85,90 ****
--- 93,130 ----
   */
  #define XLR_BKP_REMOVABLE		0x01
  
+ /*
+  * XLOG uses only high 4 bits of xl_info2. 
+  *
+  * Other 12 bits are the slotId, allowing up to XLOG_MAX_SLOT_ID
+  * slotIds in the WAL record. This doesn't prevent having more than
+  * that number of backends, it just means all backends with a slotId
+  * higher than XLOG_MAX_SLOT_ID need to write a specific WAL record
+  * during AssignTransactionId()
+  */
+ #define XLR2_INFO2_MASK			0x0FFF
+ #define XLOG_MAX_SLOT_ID		4096
+ /*
+  * xl_info2 records
+  */
+ #define XLR2_INVALID_SLOT_ID		0x8000
+ #define XLR2_FIRST_XID_RECORD		0x4000
+ #define XLR2_FIRST_SUBXID_RECORD	0x2000
+ #define XLR2_ROW_REMOVAL			0x1000
+ 
+ #define XLR2_XID_MASK				0x6000
+ 
+ #define XLogRecGetSlotId(record) \
+ ( \
+ 	((record)->xl_info2 & XLR2_INVALID_SLOT_ID) ? \
+ 		-1 : \
+ 		(int)((record)->xl_info2 & XLR2_INFO2_MASK) \
+ )
+ 
+ #define XLogRecIsFirstXidRecord(record)		((record)->xl_info2 & XLR2_FIRST_XID_RECORD)
+ #define XLogRecIsFirstSubXidRecord(record)	((record)->xl_info2 & XLR2_FIRST_SUBXID_RECORD)
+ #define XLogRecIsFirstUseOfXid(record)		((record)->xl_info2 & XLR2_XID_MASK)
+ 
  /* Sync methods */
  #define SYNC_METHOD_FSYNC		0
  #define SYNC_METHOD_FDATASYNC	1
Index: src/include/access/xlog_internal.h
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/include/access/xlog_internal.h,v
retrieving revision 1.24
diff -c -w -p -r1.24 xlog_internal.h
*** src/include/access/xlog_internal.h	11 Aug 2008 11:05:11 -0000	1.24
--- src/include/access/xlog_internal.h	16 Oct 2008 12:44:29 -0000
*************** typedef struct XLogContRecord
*** 71,77 ****
  /*
   * Each page of XLOG file has a header like this:
   */
! #define XLOG_PAGE_MAGIC 0xD063	/* can be used as WAL version indicator */
  
  typedef struct XLogPageHeaderData
  {
--- 71,77 ----
  /*
   * Each page of XLOG file has a header like this:
   */
! #define XLOG_PAGE_MAGIC 0x5352	/* can be used as WAL version indicator */
  
  typedef struct XLogPageHeaderData
  {
Index: src/include/catalog/pg_control.h
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/include/catalog/pg_control.h,v
retrieving revision 1.42
diff -c -w -p -r1.42 pg_control.h
*** src/include/catalog/pg_control.h	23 Sep 2008 09:20:39 -0000	1.42
--- src/include/catalog/pg_control.h	16 Oct 2008 12:44:29 -0000
***************
*** 21,27 ****
  
  
  /* Version identifier for this pg_control format */
! #define PG_CONTROL_VERSION	843
  
  /*
   * Body of CheckPoint XLOG records.  This is declared here because we keep
--- 21,28 ----
  
  
  /* Version identifier for this pg_control format */
! #define PG_CONTROL_VERSION	8477777
! // xxx change me
  
  /*
   * Body of CheckPoint XLOG records.  This is declared here because we keep
Index: src/include/storage/proc.h
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/include/storage/proc.h,v
retrieving revision 1.106
diff -c -w -p -r1.106 proc.h
*** src/include/storage/proc.h	15 Apr 2008 20:28:47 -0000	1.106
--- src/include/storage/proc.h	16 Oct 2008 12:44:29 -0000
***************
*** 14,19 ****
--- 14,20 ----
  #ifndef _PROC_H_
  #define _PROC_H_
  
+ #include "access/xlog.h"
  #include "storage/lock.h"
  #include "storage/pg_sema.h"
  
*************** struct PGPROC
*** 92,97 ****
--- 93,100 ----
  	bool		inCommit;		/* true if within commit critical section */
  
  	uint8		vacuumFlags;	/* vacuum-related flags, see above */
+ 	XLogRecPtr	lsn;		/* Last LSN which maintained state of Recovery Proc */
+ 	int		slotId;		/* slot number in procarray, never changes once set, OK to reuse */
  
  	/* Info about LWLock the process is currently waiting for, if any. */
  	bool		lwWaiting;		/* true if waiting for an LW lock */
*************** extern int	ProcGlobalSemas(void);
*** 157,162 ****
--- 160,166 ----
  extern Size ProcGlobalShmemSize(void);
  extern void InitProcGlobal(void);
  extern void InitProcess(void);
+ extern PGPROC *InitRecoveryProcess(void);
  extern void InitProcessPhase2(void);
  extern void InitAuxiliaryProcess(void);
  extern bool HaveNFreeProcs(int n);
Index: src/include/storage/procarray.h
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/include/storage/procarray.h,v
retrieving revision 1.23
diff -c -w -p -r1.23 procarray.h
*** src/include/storage/procarray.h	4 Aug 2008 18:03:46 -0000	1.23
--- src/include/storage/procarray.h	16 Oct 2008 12:46:16 -0000
***************
*** 14,19 ****
--- 14,20 ----
  #ifndef PROCARRAY_H
  #define PROCARRAY_H
  
+ #include "access/xact.h"
  #include "storage/lock.h"
  #include "utils/snapshot.h"
  
*************** extern void CreateSharedProcArray(void);
*** 23,32 ****
--- 24,41 ----
  extern void ProcArrayAdd(PGPROC *proc);
  extern void ProcArrayRemove(PGPROC *proc, TransactionId latestXid);
  
+ extern void ProcArrayStartRecoveryTransaction(PGPROC *proc, TransactionId xid, 
+ 												XLogRecPtr lsn, bool isSubXact);
  extern void ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid);
  extern void ProcArrayClearTransaction(PGPROC *proc);
+ extern void ProcArrayClearRecoveryTransactions(void);
+ extern bool XidInRecoveryProcs(TransactionId xid);
+ extern void ProcArrayDisplay(void);
+ extern void ProcArrayUpdateRecoveryTransactions(XLogRecPtr lsn, 
+ 												xl_xact_running_xacts *xlrec);
  
  extern Snapshot GetSnapshotData(Snapshot snapshot);
+ extern RunningTransactions GetRunningTransactionData(void);
  
  extern bool TransactionIdIsInProgress(TransactionId xid);
  extern bool TransactionIdIsActive(TransactionId xid);
*************** extern bool HaveTransactionsInCommit(Tra
*** 37,42 ****
--- 46,52 ----
  
  extern PGPROC *BackendPidGetProc(int pid);
  extern int	BackendXidGetPid(TransactionId xid);
+ extern PGPROC *SlotIdGetRecoveryProc(int slotid);
  extern bool IsBackendPid(int pid);
  
  extern VirtualTransactionId *GetCurrentVirtualXIDs(TransactionId limitXmin,
*************** extern void XidCacheRemoveRunningXids(Tr
*** 51,54 ****
--- 61,71 ----
  						  int nxids, const TransactionId *xids,
  						  TransactionId latestXid);
  
+ extern void UnobservedTransactionsAddXids(TransactionId firstXid, 
+ 											TransactionId lastXid);
+ extern void UnobservedTransactionsRemoveXid(TransactionId xid);
+ extern void UnobservedTransactionsClearXids(void);
+ extern void UnobservedTransactionsDisplay(void);
+ extern bool XidInUnobservedTransactions(TransactionId xid);
+ 
  #endif   /* PROCARRAY_H */
Index: src/include/utils/snapshot.h
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/include/utils/snapshot.h,v
retrieving revision 1.3
diff -c -w -p -r1.3 snapshot.h
*** src/include/utils/snapshot.h	12 May 2008 20:02:02 -0000	1.3
--- src/include/utils/snapshot.h	16 Oct 2008 12:44:29 -0000
*************** typedef struct SnapshotData
*** 49,55 ****
  	uint32		xcnt;			/* # of xact ids in xip[] */
  	TransactionId *xip;			/* array of xact IDs in progress */
  	/* note: all ids in xip[] satisfy xmin <= xip[i] < xmax */
! 	int32		subxcnt;		/* # of xact ids in subxip[], -1 if overflow */
  	TransactionId *subxip;		/* array of subxact IDs in progress */
  
  	/*
--- 49,56 ----
  	uint32		xcnt;			/* # of xact ids in xip[] */
  	TransactionId *xip;			/* array of xact IDs in progress */
  	/* note: all ids in xip[] satisfy xmin <= xip[i] < xmax */
! 	uint32		subxcnt;		/* # of xact ids in subxip[] */
! 	bool		suboverflowed;	/* true means at least one subxid cache overflowed */
  	TransactionId *subxip;		/* array of subxact IDs in progress */
  
  	/*
*************** typedef struct SnapshotData
*** 63,68 ****
--- 64,131 ----
  } SnapshotData;
  
  /*
+  * Declarations for GetRunningTransactionData(). Similar to Snapshots, but
+  * not quite. This has nothing at all to do with visibility on this server,
+  * so this is completely separate from snapmgr.c and snapmgr.h
+  */
+ typedef struct RunningXact
+ {
+ 	/* Items matching PGPROC entries */
+ 	TransactionId	xid;			/* xact ID in progress */
+ 	int          	pid;        	/* backend's process id, or 0 */
+ 	int          	slotId;  		/* backend's slotId */
+ 	Oid         	databaseId; 	/* OID of database this backend is using */
+ 	Oid          	roleId;     	/* OID of role using this backend */
+ 	uint8        	vacuumFlags; 	/* vacuum-related flags, see above */
+ 
+ 	/* Items matching XidCache */ 
+ 	bool        	overflowed;
+ 	int	        	nsubxids;		/* # of subxact ids for this xact only */
+ 
+ 	/* Additional info */
+ 	uint32     		subx_offset;	/* array offset of start of subxip,
+ 									 * zero if nsubxids == 0
+ 									 */
+ } RunningXact;
+ 
+ typedef struct RunningXactsData
+ {
+ 	uint32		xcnt;			/* # of xact ids in xrun[] */
+ 	uint32		subxcnt;		/* total # of xact ids in subxip[] */
+ 	TransactionId	xlatest;	/* Initial setting of LatestObservedXid */
+ 
+ 	RunningXact	*xrun;			/* array of RunningXact structs */
+ 
+ 	/* 
+ 	 * subxip is held as a single contiguous array, so no space is wasted,
+ 	 * plus it helps it fit into one XLogRecord.  We continue to keep track
+ 	 * of which subxids go with each top-level xid by tracking the start
+ 	 * offset, held on each RunningXact struct.
+ 	 */
+ 	TransactionId *subxip;		/* array of subxact IDs in progress */
+ 
+ } RunningXactsData;
+ 
+ typedef RunningXactsData *RunningTransactions;
+ 
+ /*
+  * When we write running xact data to WAL, we use this structure.
+  */
+ typedef struct xl_xact_running_xacts
+ {
+ 	int		xcnt;			/* # of xact ids in xrun[] */
+ 	int		subxcnt;		/* # of xact ids in subxip[] */
+ 	TransactionId	xlatest;	/* Initial setting of LatestObservedXid */
+ 
+ 	/* Array of RunningXact(s)  */
+ 	RunningXact	xrun[1];		/* VARIABLE LENGTH ARRAY */
+ 
+ 	/* ARRAY OF RUNNING SUBTRANSACTION XIDs FOLLOWS */
+ } xl_xact_running_xacts;
+ 
+ #define MinSizeOfXactRunningXacts offsetof(xl_xact_running_xacts, xrun)
+ 
+ /*
   * Result codes for HeapTupleSatisfiesUpdate.  This should really be in
   * tqual.h, but we want to avoid including that file elsewhere.
   */
