Re: Review: Row-level Locks & SERIALIZABLE transactions, postgres vs. Oracle

Started by Kevin Grittnerover 15 years ago2 messages
#1Kevin Grittner
grimkg@gmail.com
1 attachment(s)

This patch suffered a lot of bitrot in the last month, mostly due to the
noop patch for SSI. It also used the term SERIALIZABLE in many places to
indicate any transaction-snapshot mode transaction, so I applied changes
consistent with the noop patch. I also found a few whitespace and brace
usage issues which seemed to conflict with conventional usage and cleaned
those up. A minor spelling typo also corrected.

A new patch reflecting all this is attached.

This now compiles and passes regression tests. I still need to re-run all
the other tests which Florian and I previously used to test the patch. I
don't have any reason to expect that they will now fail, but one need to be
thorough. Once that is confirmed, I think this will be ready for committer
unless someone can think of something else to throw at it first.

-Kevin

Attachments:

serializable_row_locks-v2.patchtext/x-diff; charset=US-ASCII; name=serializable_row_locks-v2.patchDownload
*** a/doc/src/sgml/mvcc.sgml
--- b/doc/src/sgml/mvcc.sgml
***************
*** 386,407 **** COMMIT;
      behave the same as <command>SELECT</command>
      in terms of searching for target rows: they will only find target rows
      that were committed as of the transaction start time.  However, such a
!     target
!     row might have already been updated (or deleted or locked) by
!     another concurrent transaction by the time it is found.  In this case, the
!     serializable transaction will wait for the first updating transaction to commit or
!     roll back (if it is still in progress).  If the first updater rolls back,
!     then its effects are negated and the serializable transaction can proceed
!     with updating the originally found row.  But if the first updater commits
!     (and actually updated or deleted the row, not just locked it)
!     then the serializable transaction will be rolled back with the message
  
  <screen>
  ERROR:  could not serialize access due to concurrent update
  </screen>
  
!     because a serializable transaction cannot modify or lock rows changed by
!     other transactions after the serializable transaction began.
     </para>
  
     <para>
--- 386,407 ----
      behave the same as <command>SELECT</command>
      in terms of searching for target rows: they will only find target rows
      that were committed as of the transaction start time.  However, such a
!     target row might have already been subject to a concurrent
!     <command>UPDATE</command>, <command>DELETE</command>, <command>SELECT
!     FOR UPDATE</command>, or <command>SELECT FOR SHARE</command>. In this case,
!     the serializable transaction will wait for the other transaction to commit
!     or roll back (if it is still in progress). If it rolls back then its effects
!     are negated and the serializable transaction can proceed with modifying
!     or locking the originally found row. If it commits, and the two commands
!     conflict according to <xref linkend="mvcc-serialization-conflicts">,
!     the serializable transaction is rolled back with the message
  
  <screen>
  ERROR:  could not serialize access due to concurrent update
  </screen>
  
!     since serializable transaction cannot simply proceed with the newer row
!     version like read committed ones do.
     </para>
  
     <para>
***************
*** 418,423 **** ERROR:  could not serialize access due to concurrent update
--- 418,463 ----
      transactions will never have serialization conflicts.
     </para>
  
+    <table tocentry="1" id="mvcc-serialization-conflicts">
+     <title>Serialization Conflicts</title>
+     <tgroup cols="4">
+      <colspec colnum="2" colname="lockst">
+      <colspec colnum="4" colname="lockend">
+      <spanspec namest="lockst" nameend="lockend" spanname="lockreq">
+      <thead>
+       <row>
+        <entry morerows="1">Serializable Transaction</entry>
+        <entry spanname="lockreq">Concurrent Transaction</entry>
+       </row>
+       <row>
+        <entry><command>UPDATE</command>, <command>DELETE</command></entry>
+        <entry><command>SELECT FOR UPDATE</command></entry>
+        <entry><command>SELECT FOR SHARE</command></entry>
+       </row>
+      </thead>
+      <tbody>
+       <row>
+        <entry><command>UPDATE</command>, <command>DELETE</command></entry>
+        <entry align="center">X</entry>
+        <entry align="center">X</entry>
+        <entry align="center">X</entry>
+       </row>
+       <row>
+        <entry><command>SELECT FOR UPDATE</command></entry>
+        <entry align="center">X</entry>
+        <entry align="center">X</entry>
+        <entry align="center">X</entry>
+       </row>
+       <row>
+        <entry><command>SELECT FOR SHARE</command></entry>
+        <entry align="center">X</entry>
+        <entry align="center"></entry>
+        <entry align="center"></entry>
+       </row>
+      </tbody>
+     </tgroup>
+    </table>
+ 
     <para>
      The Serializable mode provides a rigorous guarantee that each
      transaction sees a wholly consistent view of the database.  However,
***************
*** 921,926 **** SELECT SUM(value) FROM mytab WHERE class = 2;
--- 961,974 ----
      </para>
  
      <para>
+       Serializable transactions are affected by concurrent
+       <command>SELECT FOR SHARE</command> and <command>SELECT FOR UPDATE</command>
+       for longer than those locks are actually held, and may be aborted
+       when trying to obtain a conflicting lock. For details,
+       see <xref linkend="xact-serializable">
+     </para>
+ 
+     <para>
       <productname>PostgreSQL</productname> doesn't remember any
       information about modified rows in memory, so there is no limit on
       the number of rows locked at one time.  However, locking a row
*** a/src/backend/access/heap/heapam.c
--- b/src/backend/access/heap/heapam.c
***************
*** 83,88 **** static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
--- 83,89 ----
  				bool all_visible_cleared, bool new_all_visible_cleared);
  static bool HeapSatisfiesHOTUpdate(Relation relation, Bitmapset *hot_attrs,
  					   HeapTuple oldtup, HeapTuple newtup);
+ static bool HeapSatisfiesLockersVisible(HeapTupleHeader tuple, Snapshot snapshot);
  
  
  /* ----------------------------------------------------------------
***************
*** 2033,2040 **** simple_heap_insert(Relation relation, HeapTuple tup)
   *	update_xmax - output parameter, used only for failure case (see below)
   *	cid - delete command ID (used for visibility test, and stored into
   *		cmax if successful)
-  *	crosscheck - if not InvalidSnapshot, also check tuple against this
   *	wait - true if should wait for any conflicting update to commit/abort
   *
   * Normal, successful return value is HeapTupleMayBeUpdated, which
   * actually means we did delete it.  Failure return codes are
--- 2034,2042 ----
   *	update_xmax - output parameter, used only for failure case (see below)
   *	cid - delete command ID (used for visibility test, and stored into
   *		cmax if successful)
   *	wait - true if should wait for any conflicting update to commit/abort
+  *	lockcheck_snapshot - if not NULL, report the tuple as updated if it
+  *		was locked by a transaction not visible under this snapshot
   *
   * Normal, successful return value is HeapTupleMayBeUpdated, which
   * actually means we did delete it.  Failure return codes are
***************
*** 2049,2055 **** simple_heap_insert(Relation relation, HeapTuple tup)
  HTSU_Result
  heap_delete(Relation relation, ItemPointer tid,
  			ItemPointer ctid, TransactionId *update_xmax,
! 			CommandId cid, Snapshot crosscheck, bool wait)
  {
  	HTSU_Result result;
  	TransactionId xid = GetCurrentTransactionId();
--- 2051,2057 ----
  HTSU_Result
  heap_delete(Relation relation, ItemPointer tid,
  			ItemPointer ctid, TransactionId *update_xmax,
! 			CommandId cid, bool wait, Snapshot lockcheck_snapshot)
  {
  	HTSU_Result result;
  	TransactionId xid = GetCurrentTransactionId();
***************
*** 2171,2189 **** l1:
  			result = HeapTupleUpdated;
  	}
  
! 	if (crosscheck != InvalidSnapshot && result == HeapTupleMayBeUpdated)
! 	{
! 		/* Perform additional check for transaction-snapshot mode RI updates */
! 		if (!HeapTupleSatisfiesVisibility(&tp, crosscheck, buffer))
! 			result = HeapTupleUpdated;
! 	}
  
  	if (result != HeapTupleMayBeUpdated)
  	{
  		Assert(result == HeapTupleSelfUpdated ||
  			   result == HeapTupleUpdated ||
  			   result == HeapTupleBeingUpdated);
! 		Assert(!(tp.t_data->t_infomask & HEAP_XMAX_INVALID));
  		*ctid = tp.t_data->t_ctid;
  		*update_xmax = HeapTupleHeaderGetXmax(tp.t_data);
  		UnlockReleaseBuffer(buffer);
--- 2173,2190 ----
  			result = HeapTupleUpdated;
  	}
  
! 	/* Verify visibility of locking transactions. */
! 	if ((result == HeapTupleMayBeUpdated) &&
! 		!HeapSatisfiesLockersVisible(tp.t_data, lockcheck_snapshot))
! 		result = HeapTupleUpdated;
  
  	if (result != HeapTupleMayBeUpdated)
  	{
  		Assert(result == HeapTupleSelfUpdated ||
  			   result == HeapTupleUpdated ||
  			   result == HeapTupleBeingUpdated);
! 		Assert(!(tp.t_data->t_infomask & HEAP_XMAX_INVALID) ||
! 			   (tp.t_data->t_infomask & HEAP_IS_LOCKED));
  		*ctid = tp.t_data->t_ctid;
  		*update_xmax = HeapTupleHeaderGetXmax(tp.t_data);
  		UnlockReleaseBuffer(buffer);
***************
*** 2313,2320 **** simple_heap_delete(Relation relation, ItemPointer tid)
  
  	result = heap_delete(relation, tid,
  						 &update_ctid, &update_xmax,
! 						 GetCurrentCommandId(true), InvalidSnapshot,
! 						 true /* wait for commit */ );
  	switch (result)
  	{
  		case HeapTupleSelfUpdated:
--- 2314,2322 ----
  
  	result = heap_delete(relation, tid,
  						 &update_ctid, &update_xmax,
! 						 GetCurrentCommandId(true),
! 						 true /* wait for commit */ ,
! 						 InvalidSnapshot);
  	switch (result)
  	{
  		case HeapTupleSelfUpdated:
***************
*** 2349,2355 **** simple_heap_delete(Relation relation, ItemPointer tid)
   *	update_xmax - output parameter, used only for failure case (see below)
   *	cid - update command ID (used for visibility test, and stored into
   *		cmax/cmin if successful)
!  *	crosscheck - if not InvalidSnapshot, also check old tuple against this
   *	wait - true if should wait for any conflicting update to commit/abort
   *
   * Normal, successful return value is HeapTupleMayBeUpdated, which
--- 2351,2358 ----
   *	update_xmax - output parameter, used only for failure case (see below)
   *	cid - update command ID (used for visibility test, and stored into
   *		cmax/cmin if successful)
!  *	lockcheck_snapshot - if not NULL, report the tuple as updated if it
!  *		was locked by a transaction not visible under this snapshot
   *	wait - true if should wait for any conflicting update to commit/abort
   *
   * Normal, successful return value is HeapTupleMayBeUpdated, which
***************
*** 2371,2377 **** simple_heap_delete(Relation relation, ItemPointer tid)
  HTSU_Result
  heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
  			ItemPointer ctid, TransactionId *update_xmax,
! 			CommandId cid, Snapshot crosscheck, bool wait)
  {
  	HTSU_Result result;
  	TransactionId xid = GetCurrentTransactionId();
--- 2374,2380 ----
  HTSU_Result
  heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
  			ItemPointer ctid, TransactionId *update_xmax,
! 			CommandId cid, bool wait, Snapshot lockcheck_snapshot)
  {
  	HTSU_Result result;
  	TransactionId xid = GetCurrentTransactionId();
***************
*** 2523,2541 **** l2:
  			result = HeapTupleUpdated;
  	}
  
! 	if (crosscheck != InvalidSnapshot && result == HeapTupleMayBeUpdated)
! 	{
! 		/* Perform additional check for transaction-snapshot mode RI updates */
! 		if (!HeapTupleSatisfiesVisibility(&oldtup, crosscheck, buffer))
! 			result = HeapTupleUpdated;
! 	}
  
  	if (result != HeapTupleMayBeUpdated)
  	{
  		Assert(result == HeapTupleSelfUpdated ||
  			   result == HeapTupleUpdated ||
  			   result == HeapTupleBeingUpdated);
! 		Assert(!(oldtup.t_data->t_infomask & HEAP_XMAX_INVALID));
  		*ctid = oldtup.t_data->t_ctid;
  		*update_xmax = HeapTupleHeaderGetXmax(oldtup.t_data);
  		UnlockReleaseBuffer(buffer);
--- 2526,2543 ----
  			result = HeapTupleUpdated;
  	}
  
! 	/* Verify visibility of locking transactions. */
! 	if ((result == HeapTupleMayBeUpdated) &&
! 		!HeapSatisfiesLockersVisible(oldtup.t_data, lockcheck_snapshot))
! 		result = HeapTupleUpdated;
  
  	if (result != HeapTupleMayBeUpdated)
  	{
  		Assert(result == HeapTupleSelfUpdated ||
  			   result == HeapTupleUpdated ||
  			   result == HeapTupleBeingUpdated);
! 		Assert(!(oldtup.t_data->t_infomask & HEAP_XMAX_INVALID) ||
! 			   (oldtup.t_data->t_infomask & HEAP_IS_LOCKED));
  		*ctid = oldtup.t_data->t_ctid;
  		*update_xmax = HeapTupleHeaderGetXmax(oldtup.t_data);
  		UnlockReleaseBuffer(buffer);
***************
*** 2961,2966 **** HeapSatisfiesHOTUpdate(Relation relation, Bitmapset *hot_attrs,
--- 2963,3063 ----
  	return true;
  }
  
+ 
+ /*
+  * Returns false if one of the tuple's lockers committed but aren't visible
+  * according to lockcheck_snapshot, excluding subtransactions of the current
+  * transaction.  Assumes that all locking transaction either committed or
+  * aborted, but aren't still in progress.
+  */
+ static bool
+ HeapSatisfiesLockersVisible(HeapTupleHeader tuple,
+ 							Snapshot lockcheck_snapshot)
+ {
+ 	if (lockcheck_snapshot == InvalidSnapshot)
+ 		return true;
+ 
+ 	if (tuple->t_infomask & HEAP_IS_LOCKED)
+ 	{
+ 		/*
+ 		 * If the tuple was locked, we now check whether the locking
+ 		 * transaction(s) are visible under lockcheck_snapshot. If
+ 		 * they aren't, we pretend that the tuple was updated.
+ 		 */
+ 
+ 		if (tuple->t_infomask & HEAP_XMAX_IS_MULTI)
+ 		{
+ 			TransactionId* xids;
+ 			int xids_l = GetMultiXactIdMembers(HeapTupleHeaderGetXmax(tuple),
+ 											   &xids);
+ 
+ 			if (xids_l < 1)
+ 			{
+ 				/*
+ 				 * The multi xact either is too old to be inspected or
+ 				 * doesn't contain members.  The second case is probably
+ 				 * impossible, but even if not it doesn't pose any problem.
+ 				 *
+ 				 * In the first case, we have to trust that all xids that
+ 				 * were contained in the xact are in fact visible under
+ 				 * lockcheck_snapshot. Currently this is always the case,
+ 				 * since lockcheck_snapshot is always the transaction's
+ 				 * snapshot, and we call MultiXactIdSetOldestVisible() before
+ 				 * acquiring that snapshot.
+ 				 */
+ 				return true;
+ 			}
+ 			else
+ 			{
+ 				int i;
+ 				for (i = 0; i < xids_l; ++i)
+ 				{
+ 					/* Ignore our own subtransactions */
+ 					if (TransactionIdIsCurrentTransactionId(xids[i]))
+ 						continue;
+ 
+ 					/*
+ 					 * We expect to be called after the locking transactions'
+ 					 * fates have been decided
+ 					 */
+ 					Assert(!TransactionIdIsInProgress(xids[i]));
+ 
+ 					if (!TransactionIdDidAbort(xids[i]) &&
+ 					    XidInMVCCSnapshot(xids[i], lockcheck_snapshot))
+ 					{
+ 						/* Non-aborted, invisible locker */
+ 						return false;
+ 					}
+ 				}
+ 				return true;
+ 			}
+ 		}
+ 		else
+ 		{
+ 			TransactionId xid = HeapTupleHeaderGetXmax(tuple);
+ 
+ 			/* Ignore our own subtransactions */
+ 			if (TransactionIdIsCurrentTransactionId(xid))
+ 				return true;
+ 
+ 			/*
+ 			 * We expect to be called after the locking transactions' fates
+ 			 * have been decided
+ 			 */
+ 			Assert(!TransactionIdIsInProgress(xid));
+ 
+ 			/* Locker must either be visible or have aborted */
+ 			return TransactionIdDidAbort(xid) ||
+ 				   !XidInMVCCSnapshot(xid, lockcheck_snapshot);
+ 		}
+ 	}
+ 	else
+ 	{
+ 		/* Tuple wasn't locked */
+ 		return true;
+ 	}
+ }
+ 
  /*
   *	simple_heap_update - replace a tuple
   *
***************
*** 2978,2985 **** simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup)
  
  	result = heap_update(relation, otid, tup,
  						 &update_ctid, &update_xmax,
! 						 GetCurrentCommandId(true), InvalidSnapshot,
! 						 true /* wait for commit */ );
  	switch (result)
  	{
  		case HeapTupleSelfUpdated:
--- 3075,3082 ----
  
  	result = heap_update(relation, otid, tup,
  						 &update_ctid, &update_xmax,
! 						 GetCurrentCommandId(true),
! 						 true /* wait for commit */, InvalidSnapshot);
  	switch (result)
  	{
  		case HeapTupleSelfUpdated:
***************
*** 3013,3018 **** simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup)
--- 3110,3118 ----
   *		tuple's cmax if lock is successful)
   *	mode: indicates if shared or exclusive tuple lock is desired
   *	nowait: if true, ereport rather than blocking if lock not available
+  *	lockcheck_snapshot: if not NULL, report the tuple as updated if it
+  *						was locked by a transaction not visible under
+  *						this snapshot
   *
   * Output parameters:
   *	*tuple: all fields filled in
***************
*** 3066,3072 **** simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup)
  HTSU_Result
  heap_lock_tuple(Relation relation, HeapTuple tuple, Buffer *buffer,
  				ItemPointer ctid, TransactionId *update_xmax,
! 				CommandId cid, LockTupleMode mode, bool nowait)
  {
  	HTSU_Result result;
  	ItemPointer tid = &(tuple->t_self);
--- 3166,3173 ----
  HTSU_Result
  heap_lock_tuple(Relation relation, HeapTuple tuple, Buffer *buffer,
  				ItemPointer ctid, TransactionId *update_xmax,
! 				CommandId cid, LockTupleMode mode, bool nowait,
! 				Snapshot lockcheck_snapshot)
  {
  	HTSU_Result result;
  	ItemPointer tid = &(tuple->t_self);
***************
*** 3247,3256 **** l3:
  			result = HeapTupleUpdated;
  	}
  
  	if (result != HeapTupleMayBeUpdated)
  	{
  		Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated);
! 		Assert(!(tuple->t_data->t_infomask & HEAP_XMAX_INVALID));
  		*ctid = tuple->t_data->t_ctid;
  		*update_xmax = HeapTupleHeaderGetXmax(tuple->t_data);
  		LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
--- 3348,3363 ----
  			result = HeapTupleUpdated;
  	}
  
+ 	/* Verify visibility of locking transactions */
+ 	if ((result == HeapTupleMayBeUpdated) &&
+ 		!HeapSatisfiesLockersVisible(tuple->t_data, lockcheck_snapshot))
+ 		result = HeapTupleUpdated;
+ 
  	if (result != HeapTupleMayBeUpdated)
  	{
  		Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated);
! 		Assert(!(tuple->t_data->t_infomask & HEAP_XMAX_INVALID) ||
! 			   (tuple->t_data->t_infomask & HEAP_IS_LOCKED));
  		*ctid = tuple->t_data->t_ctid;
  		*update_xmax = HeapTupleHeaderGetXmax(tuple->t_data);
  		LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
*** a/src/backend/access/transam/multixact.c
--- b/src/backend/access/transam/multixact.c
***************
*** 211,217 **** static MemoryContext MXactContext = NULL;
  #endif
  
  /* internal MultiXactId management */
- static void MultiXactIdSetOldestVisible(void);
  static MultiXactId CreateMultiXactId(int nxids, TransactionId *xids);
  static void RecordNewMultiXact(MultiXactId multi, MultiXactOffset offset,
  				   int nxids, TransactionId *xids);
--- 211,216 ----
***************
*** 531,537 **** MultiXactIdSetOldestMember(void)
   * there is no live transaction, now or later, that can be a member of any
   * MultiXactId older than the OldestVisibleMXactId we compute here.
   */
! static void
  MultiXactIdSetOldestVisible(void)
  {
  	if (!MultiXactIdIsValid(OldestVisibleMXactId[MyBackendId]))
--- 530,536 ----
   * there is no live transaction, now or later, that can be a member of any
   * MultiXactId older than the OldestVisibleMXactId we compute here.
   */
! void
  MultiXactIdSetOldestVisible(void)
  {
  	if (!MultiXactIdIsValid(OldestVisibleMXactId[MyBackendId]))
*** a/src/backend/commands/copy.c
--- b/src/backend/commands/copy.c
***************
*** 1088,1094 **** DoCopy(const CopyStmt *stmt, const char *queryString)
  		/* Create a QueryDesc requesting no output */
  		cstate->queryDesc = CreateQueryDesc(plan, queryString,
  											GetActiveSnapshot(),
- 											InvalidSnapshot,
  											dest, NULL, 0);
  
  		/*
--- 1088,1093 ----
*** a/src/backend/commands/explain.c
--- b/src/backend/commands/explain.c
***************
*** 372,378 **** ExplainOnePlan(PlannedStmt *plannedstmt, ExplainState *es,
  
  	/* Create a QueryDesc requesting no output */
  	queryDesc = CreateQueryDesc(plannedstmt, queryString,
! 								GetActiveSnapshot(), InvalidSnapshot,
  								None_Receiver, params, instrument_option);
  
  	INSTR_TIME_SET_CURRENT(starttime);
--- 372,378 ----
  
  	/* Create a QueryDesc requesting no output */
  	queryDesc = CreateQueryDesc(plannedstmt, queryString,
! 								GetActiveSnapshot(),
  								None_Receiver, params, instrument_option);
  
  	INSTR_TIME_SET_CURRENT(starttime);
*** a/src/backend/commands/trigger.c
--- b/src/backend/commands/trigger.c
***************
*** 2367,2380 **** GetTupleForTrigger(EState *estate,
  		Assert(epqstate != NULL);
  
  		/*
! 		 * lock tuple for update
  		 */
  ltrmark:;
  		tuple.t_self = *tid;
  		test = heap_lock_tuple(relation, &tuple, &buffer,
  							   &update_ctid, &update_xmax,
  							   estate->es_output_cid,
! 							   LockTupleExclusive, false);
  		switch (test)
  		{
  			case HeapTupleSelfUpdated:
--- 2367,2386 ----
  		Assert(epqstate != NULL);
  
  		/*
! 		 * Lock tuple for update.
! 		 *
! 		 * Transaction snapshot mode transactions pass their snapshot as the
! 		 * logcheck_snapshot.  This lets heap_lock_tuple report concurrently
! 		 * FOR SHARE or FOR UPDATE locked tuples as HeapTupleUpdated.
  		 */
  ltrmark:;
  		tuple.t_self = *tid;
  		test = heap_lock_tuple(relation, &tuple, &buffer,
  							   &update_ctid, &update_xmax,
  							   estate->es_output_cid,
! 							   LockTupleExclusive, false,
! 							   IsolationUsesXactSnapshot() ? estate->es_snapshot :
! 															 InvalidSnapshot);
  		switch (test)
  		{
  			case HeapTupleSelfUpdated:
*** a/src/backend/executor/README
--- b/src/backend/executor/README
***************
*** 139,157 **** be hidden inside function calls).  This case has a flow of control like
  		(a separate FreeExprContext call is not necessary)
  
  
! EvalPlanQual (READ COMMITTED Update Checking)
! ---------------------------------------------
  
  For simple SELECTs, the executor need only pay attention to tuples that are
  valid according to the snapshot seen by the current transaction (ie, they
  were inserted by a previously committed transaction, and not deleted by any
  previously committed transaction).  However, for UPDATE and DELETE it is not
  cool to modify or delete a tuple that's been modified by an open or
! concurrently-committed transaction.  If we are running in SERIALIZABLE
! isolation level then we just raise an error when this condition is seen to
! occur.  In READ COMMITTED isolation level, we must work a lot harder.
  
! The basic idea in READ COMMITTED mode is to take the modified tuple
  committed by the concurrent transaction (after waiting for it to commit,
  if need be) and re-evaluate the query qualifications to see if it would
  still meet the quals.  If so, we regenerate the updated tuple (if we are
--- 139,158 ----
  		(a separate FreeExprContext call is not necessary)
  
  
! EvalPlanQual (Statement-Snapshot Update Checking)
! -------------------------------------------------
  
  For simple SELECTs, the executor need only pay attention to tuples that are
  valid according to the snapshot seen by the current transaction (ie, they
  were inserted by a previously committed transaction, and not deleted by any
  previously committed transaction).  However, for UPDATE and DELETE it is not
  cool to modify or delete a tuple that's been modified by an open or
! concurrently-committed transaction.  If we are running in a transaction-
! snapshot isolation level then we just raise an error when this condition is
! seen to occur.  In statement-snapshot isolation levels, we must work a lot
! harder.
  
! The basic idea in statement-snapshot modes is to take the modified tuple
  committed by the concurrent transaction (after waiting for it to commit,
  if need be) and re-evaluate the query qualifications to see if it would
  still meet the quals.  If so, we regenerate the updated tuple (if we are
***************
*** 195,197 **** is no explicit prohibition on SRFs in UPDATE, but the net effect will be
--- 196,227 ----
  that only the first result row of an SRF counts, because all subsequent
  rows will result in attempts to re-update an already updated target row.
  This is historical behavior and seems not worth changing.)
+ 
+ Row Locks and Transaction-Snapshot Mode Transactions
+ ----------------------------------------------------
+ 
+ In a statement-snapshot mode, a transaction which encounters a locked row
+ during an UPDATE, DELETE, SELECT FOR UPDATE or SELECT FOR SHARE simply blocks
+ until the locking transaction commits or roll backs, and in the former case
+ then re-executes the statement using the new row version, as described above.
+ 
+ For transaction-snapshot mode transactions this is not satisfactory. The RI
+ triggers, for example, take a FOR SHARE lock on a parent row before allowing
+ a child row to be inserted and verify that deleting a parent row leaves no
+ orphaned children behind before allowing the delete to occur. From within
+ statement-snapshot mode transactions, blocking upon a delete or a parent row
+ until all lockers have finished is sufficient to guarantee that this check
+ finds any potential orphan, since the check will be executed with a up-to-
+ date snapshot to which the locking transaction's changes are visible. This,
+ however, is not true for transaction-snapshot mode transactions since these
+ will continue to use their old snapshot and hence miss newly inserted rows.
+ 
+ Transaction-snapshot mode transactions therefore treat a FOR SHARE or FOR
+ UPDATE lock on a tuple the same as an actual update during UPDATE and SELECT
+ FOR SHARE. They are thus aborted when trying to UPDATE or FOR UPDATE lock a
+ row that was FOR SHARE or FOR UPDATE locked by a concurrent transaction.
+ 
+ This is implemented by the lockcheck_snapshot parameter of heap_update,
+ heap_delete and heap_lock_tuple. If such a snapshot is provided to one of
+ these functions, they return HeapTupleUpdated if the tuple locked (but not
+ necessarily updated) by any transaction invisible to the snapshot.
*** a/src/backend/executor/execMain.c
--- b/src/backend/executor/execMain.c
***************
*** 183,189 **** standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
  	 * Copy other important information into the EState
  	 */
  	estate->es_snapshot = RegisterSnapshot(queryDesc->snapshot);
- 	estate->es_crosscheck_snapshot = RegisterSnapshot(queryDesc->crosscheck_snapshot);
  	estate->es_instrument = queryDesc->instrument_options;
  
  	/*
--- 183,188 ----
***************
*** 348,354 **** standard_ExecutorEnd(QueryDesc *queryDesc)
  
  	/* do away with our snapshots */
  	UnregisterSnapshot(estate->es_snapshot);
- 	UnregisterSnapshot(estate->es_crosscheck_snapshot);
  
  	/*
  	 * Must switch out of context before destroying it
--- 347,352 ----
***************
*** 1533,1543 **** EvalPlanQualFetch(EState *estate, Relation relation, int lockmode,
  
  			/*
  			 * This is a live tuple, so now try to lock it.
  			 */
  			test = heap_lock_tuple(relation, &tuple, &buffer,
  								   &update_ctid, &update_xmax,
  								   estate->es_output_cid,
! 								   lockmode, false);
  			/* We now have two pins on the buffer, get rid of one */
  			ReleaseBuffer(buffer);
  
--- 1531,1549 ----
  
  			/*
  			 * This is a live tuple, so now try to lock it.
+ 			 *
+ 			 * Transaction-snapshot mode transactions pass their snapshot as
+ 			 * the logcheck_snapshot.  This lets heap_lock_tuple report
+ 			 * concurrently FOR SHARE or FOR UPDATE locked tuples as
+ 			 * HeapTupleUpdated.
  			 */
+ 			Assert(!IsolationUsesXactSnapshot() || (estate->es_snapshot != InvalidSnapshot));
  			test = heap_lock_tuple(relation, &tuple, &buffer,
  								   &update_ctid, &update_xmax,
  								   estate->es_output_cid,
! 								   lockmode, false,
! 								   IsolationUsesXactSnapshot() ? estate->es_snapshot :
! 																 InvalidSnapshot);
  			/* We now have two pins on the buffer, get rid of one */
  			ReleaseBuffer(buffer);
  
***************
*** 1906,1912 **** EvalPlanQualStart(EPQState *epqstate, EState *parentestate, Plan *planTree)
  	 */
  	estate->es_direction = ForwardScanDirection;
  	estate->es_snapshot = parentestate->es_snapshot;
- 	estate->es_crosscheck_snapshot = parentestate->es_crosscheck_snapshot;
  	estate->es_range_table = parentestate->es_range_table;
  	estate->es_plannedstmt = parentestate->es_plannedstmt;
  	estate->es_junkFilter = parentestate->es_junkFilter;
--- 1912,1917 ----
*** a/src/backend/executor/execUtils.c
--- b/src/backend/executor/execUtils.c
***************
*** 109,115 **** CreateExecutorState(void)
  	 */
  	estate->es_direction = ForwardScanDirection;
  	estate->es_snapshot = SnapshotNow;
- 	estate->es_crosscheck_snapshot = InvalidSnapshot;	/* no crosscheck */
  	estate->es_range_table = NIL;
  	estate->es_plannedstmt = NULL;
  
--- 109,114 ----
*** a/src/backend/executor/functions.c
--- b/src/backend/executor/functions.c
***************
*** 415,421 **** postquel_start(execution_state *es, SQLFunctionCachePtr fcache)
  	if (IsA(es->stmt, PlannedStmt))
  		es->qd = CreateQueryDesc((PlannedStmt *) es->stmt,
  								 fcache->src,
! 								 snapshot, InvalidSnapshot,
  								 dest,
  								 fcache->paramLI, 0);
  	else
--- 415,421 ----
  	if (IsA(es->stmt, PlannedStmt))
  		es->qd = CreateQueryDesc((PlannedStmt *) es->stmt,
  								 fcache->src,
! 								 snapshot,
  								 dest,
  								 fcache->paramLI, 0);
  	else
*** a/src/backend/executor/nodeLockRows.c
--- b/src/backend/executor/nodeLockRows.c
***************
*** 71,76 **** lnext:
--- 71,77 ----
  		ItemPointerData update_ctid;
  		TransactionId update_xmax;
  		LockTupleMode lockmode;
+ 		Snapshot lockcheck_snapshot = InvalidSnapshot;
  		HTSU_Result test;
  		HeapTuple	copyTuple;
  
***************
*** 110,123 **** lnext:
  
  		/* okay, try to lock the tuple */
  		if (erm->markType == ROW_MARK_EXCLUSIVE)
  			lockmode = LockTupleExclusive;
  		else
  			lockmode = LockTupleShared;
  
  		test = heap_lock_tuple(erm->relation, &tuple, &buffer,
  							   &update_ctid, &update_xmax,
  							   estate->es_output_cid,
! 							   lockmode, erm->noWait);
  		ReleaseBuffer(buffer);
  		switch (test)
  		{
--- 111,136 ----
  
  		/* okay, try to lock the tuple */
  		if (erm->markType == ROW_MARK_EXCLUSIVE)
+ 		{
  			lockmode = LockTupleExclusive;
+ 
+ 			/*
+ 			 * Transaction-snapshot mode transactions pass their snapshot as
+ 			 * the logcheck_snapshot.  This lets heap_lock_tuple report
+ 			 * concurrently FOR SHARE or FOR UPDATE locked tuples as
+ 			 * HeapTupleUpdated.
+ 			 */
+ 			if (IsolationUsesXactSnapshot())
+ 				lockcheck_snapshot = estate->es_snapshot;
+ 		}
  		else
  			lockmode = LockTupleShared;
  
  		test = heap_lock_tuple(erm->relation, &tuple, &buffer,
  							   &update_ctid, &update_xmax,
  							   estate->es_output_cid,
! 							   lockmode, erm->noWait,
! 							   lockcheck_snapshot);
  		ReleaseBuffer(buffer);
  		switch (test)
  		{
*** a/src/backend/executor/nodeModifyTable.c
--- b/src/backend/executor/nodeModifyTable.c
***************
*** 307,323 **** ExecDelete(ItemPointer tupleid,
  	/*
  	 * delete the tuple
  	 *
! 	 * Note: if es_crosscheck_snapshot isn't InvalidSnapshot, we check that
! 	 * the row to be deleted is visible to that snapshot, and throw a can't-
! 	 * serialize error if not.	This is a special-case behavior needed for
! 	 * referential integrity updates in transaction-snapshot mode transactions.
  	 */
  ldelete:;
  	result = heap_delete(resultRelationDesc, tupleid,
  						 &update_ctid, &update_xmax,
  						 estate->es_output_cid,
! 						 estate->es_crosscheck_snapshot,
! 						 true /* wait for commit */ );
  	switch (result)
  	{
  		case HeapTupleSelfUpdated:
--- 307,323 ----
  	/*
  	 * delete the tuple
  	 *
! 	 * Transaction-snapshot mode transactions pass their snapshot as the
! 	 * logcheck_snapshot.  This lets heap_lock_tuple report concurrently FOR
! 	 * SHARE or FOR UPDATE locked tuples as HeapTupleUpdated.
  	 */
  ldelete:;
  	result = heap_delete(resultRelationDesc, tupleid,
  						 &update_ctid, &update_xmax,
  						 estate->es_output_cid,
! 						 true, /* wait for commit */
! 						 IsolationUsesXactSnapshot() ? estate->es_snapshot :
! 													   InvalidSnapshot);
  	switch (result)
  	{
  		case HeapTupleSelfUpdated:
***************
*** 496,511 **** lreplace:;
  	/*
  	 * replace the heap tuple
  	 *
! 	 * Note: if es_crosscheck_snapshot isn't InvalidSnapshot, we check that
! 	 * the row to be updated is visible to that snapshot, and throw a can't-
! 	 * serialize error if not.	This is a special-case behavior needed for
! 	 * referential integrity updates in transaction-snapshot mode transactions.
  	 */
  	result = heap_update(resultRelationDesc, tupleid, tuple,
  						 &update_ctid, &update_xmax,
  						 estate->es_output_cid,
! 						 estate->es_crosscheck_snapshot,
! 						 true /* wait for commit */ );
  	switch (result)
  	{
  		case HeapTupleSelfUpdated:
--- 496,511 ----
  	/*
  	 * replace the heap tuple
  	 *
! 	 * Transaction-snapshot mode transactions pass their snapshot as the
! 	 * logcheck_snapshot.  This lets heap_lock_tuple report concurrently FOR
! 	 * SHARE or FOR UPDATE locked tuples as HeapTupleUpdated.
  	 */
  	result = heap_update(resultRelationDesc, tupleid, tuple,
  						 &update_ctid, &update_xmax,
  						 estate->es_output_cid,
! 						 true, /* wait for commit */
! 						 IsolationUsesXactSnapshot() ? estate->es_snapshot :
! 													   InvalidSnapshot);
  	switch (result)
  	{
  		case HeapTupleSelfUpdated:
*** a/src/backend/executor/spi.c
--- b/src/backend/executor/spi.c
***************
*** 51,57 **** static void _SPI_prepare_plan(const char *src, SPIPlanPtr plan,
  				  ParamListInfo boundParams);
  
  static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
! 				  Snapshot snapshot, Snapshot crosscheck_snapshot,
  				  bool read_only, bool fire_triggers, long tcount);
  
  static ParamListInfo _SPI_convert_params(int nargs, Oid *argtypes,
--- 51,57 ----
  				  ParamListInfo boundParams);
  
  static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
! 				  Snapshot snapshot,
  				  bool read_only, bool fire_triggers, long tcount);
  
  static ParamListInfo _SPI_convert_params(int nargs, Oid *argtypes,
***************
*** 357,363 **** SPI_execute(const char *src, bool read_only, long tcount)
  	_SPI_prepare_plan(src, &plan, NULL);
  
  	res = _SPI_execute_plan(&plan, NULL,
! 							InvalidSnapshot, InvalidSnapshot,
  							read_only, true, tcount);
  
  	_SPI_end_call(true);
--- 357,363 ----
  	_SPI_prepare_plan(src, &plan, NULL);
  
  	res = _SPI_execute_plan(&plan, NULL,
! 							InvalidSnapshot,
  							read_only, true, tcount);
  
  	_SPI_end_call(true);
***************
*** 392,398 **** SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
  							_SPI_convert_params(plan->nargs, plan->argtypes,
  												Values, Nulls,
  												0),
! 							InvalidSnapshot, InvalidSnapshot,
  							read_only, true, tcount);
  
  	_SPI_end_call(true);
--- 392,398 ----
  							_SPI_convert_params(plan->nargs, plan->argtypes,
  												Values, Nulls,
  												0),
! 							InvalidSnapshot,
  							read_only, true, tcount);
  
  	_SPI_end_call(true);
***************
*** 421,427 **** SPI_execute_plan_with_paramlist(SPIPlanPtr plan, ParamListInfo params,
  		return res;
  
  	res = _SPI_execute_plan(plan, params,
! 							InvalidSnapshot, InvalidSnapshot,
  							read_only, true, tcount);
  
  	_SPI_end_call(true);
--- 421,427 ----
  		return res;
  
  	res = _SPI_execute_plan(plan, params,
! 							InvalidSnapshot,
  							read_only, true, tcount);
  
  	_SPI_end_call(true);
***************
*** 444,450 **** SPI_execute_plan_with_paramlist(SPIPlanPtr plan, ParamListInfo params,
  int
  SPI_execute_snapshot(SPIPlanPtr plan,
  					 Datum *Values, const char *Nulls,
! 					 Snapshot snapshot, Snapshot crosscheck_snapshot,
  					 bool read_only, bool fire_triggers, long tcount)
  {
  	int			res;
--- 444,450 ----
  int
  SPI_execute_snapshot(SPIPlanPtr plan,
  					 Datum *Values, const char *Nulls,
! 					 Snapshot snapshot,
  					 bool read_only, bool fire_triggers, long tcount)
  {
  	int			res;
***************
*** 463,469 **** SPI_execute_snapshot(SPIPlanPtr plan,
  							_SPI_convert_params(plan->nargs, plan->argtypes,
  												Values, Nulls,
  												0),
! 							snapshot, crosscheck_snapshot,
  							read_only, fire_triggers, tcount);
  
  	_SPI_end_call(true);
--- 463,469 ----
  							_SPI_convert_params(plan->nargs, plan->argtypes,
  												Values, Nulls,
  												0),
! 							snapshot,
  							read_only, fire_triggers, tcount);
  
  	_SPI_end_call(true);
***************
*** 516,522 **** SPI_execute_with_args(const char *src,
  	/* We don't need to copy the plan since it will be thrown away anyway */
  
  	res = _SPI_execute_plan(&plan, paramLI,
! 							InvalidSnapshot, InvalidSnapshot,
  							read_only, true, tcount);
  
  	_SPI_end_call(true);
--- 516,522 ----
  	/* We don't need to copy the plan since it will be thrown away anyway */
  
  	res = _SPI_execute_plan(&plan, paramLI,
! 							InvalidSnapshot,
  							read_only, true, tcount);
  
  	_SPI_end_call(true);
***************
*** 1752,1758 **** _SPI_prepare_plan(const char *src, SPIPlanPtr plan, ParamListInfo boundParams)
   *
   * snapshot: query snapshot to use, or InvalidSnapshot for the normal
   *		behavior of taking a new snapshot for each query.
-  * crosscheck_snapshot: for RI use, all others pass InvalidSnapshot
   * read_only: TRUE for read-only execution (no CommandCounterIncrement)
   * fire_triggers: TRUE to fire AFTER triggers at end of query (normal case);
   *		FALSE means any AFTER triggers are postponed to end of outer query
--- 1752,1757 ----
***************
*** 1760,1766 **** _SPI_prepare_plan(const char *src, SPIPlanPtr plan, ParamListInfo boundParams)
   */
  static int
  _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
! 				  Snapshot snapshot, Snapshot crosscheck_snapshot,
  				  bool read_only, bool fire_triggers, long tcount)
  {
  	int			my_res = 0;
--- 1759,1765 ----
   */
  static int
  _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
! 				  Snapshot snapshot,
  				  bool read_only, bool fire_triggers, long tcount)
  {
  	int			my_res = 0;
***************
*** 1903,1909 **** _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
  
  				qdesc = CreateQueryDesc((PlannedStmt *) stmt,
  										plansource->query_string,
! 										snap, crosscheck_snapshot,
  										dest,
  										paramLI, 0);
  				res = _SPI_pquery(qdesc, fire_triggers,
--- 1902,1908 ----
  
  				qdesc = CreateQueryDesc((PlannedStmt *) stmt,
  										plansource->query_string,
! 										snap,
  										dest,
  										paramLI, 0);
  				res = _SPI_pquery(qdesc, fire_triggers,
*** a/src/backend/tcop/pquery.c
--- b/src/backend/tcop/pquery.c
***************
*** 64,70 **** QueryDesc *
  CreateQueryDesc(PlannedStmt *plannedstmt,
  				const char *sourceText,
  				Snapshot snapshot,
- 				Snapshot crosscheck_snapshot,
  				DestReceiver *dest,
  				ParamListInfo params,
  				int instrument_options)
--- 64,69 ----
***************
*** 76,83 **** CreateQueryDesc(PlannedStmt *plannedstmt,
  	qd->utilitystmt = plannedstmt->utilityStmt; /* in case DECLARE CURSOR */
  	qd->sourceText = sourceText;	/* query text */
  	qd->snapshot = RegisterSnapshot(snapshot);	/* snapshot */
- 	/* RI check snapshot */
- 	qd->crosscheck_snapshot = RegisterSnapshot(crosscheck_snapshot);
  	qd->dest = dest;			/* output dest */
  	qd->params = params;		/* parameter values passed into query */
  	qd->instrument_options = instrument_options;		/* instrumentation
--- 75,80 ----
***************
*** 109,115 **** CreateUtilityQueryDesc(Node *utilitystmt,
  	qd->utilitystmt = utilitystmt;		/* utility command */
  	qd->sourceText = sourceText;	/* query text */
  	qd->snapshot = RegisterSnapshot(snapshot);	/* snapshot */
- 	qd->crosscheck_snapshot = InvalidSnapshot;	/* RI check snapshot */
  	qd->dest = dest;			/* output dest */
  	qd->params = params;		/* parameter values passed into query */
  	qd->instrument_options = false;		/* uninteresting for utilities */
--- 106,111 ----
***************
*** 134,140 **** FreeQueryDesc(QueryDesc *qdesc)
  
  	/* forget our snapshots */
  	UnregisterSnapshot(qdesc->snapshot);
- 	UnregisterSnapshot(qdesc->crosscheck_snapshot);
  
  	/* Only the QueryDesc itself need be freed */
  	pfree(qdesc);
--- 130,135 ----
***************
*** 178,184 **** ProcessQuery(PlannedStmt *plan,
  	 * Create the QueryDesc object
  	 */
  	queryDesc = CreateQueryDesc(plan, sourceText,
! 								GetActiveSnapshot(), InvalidSnapshot,
  								dest, params, 0);
  
  	/*
--- 173,179 ----
  	 * Create the QueryDesc object
  	 */
  	queryDesc = CreateQueryDesc(plan, sourceText,
! 								GetActiveSnapshot(),
  								dest, params, 0);
  
  	/*
***************
*** 514,520 **** PortalStart(Portal portal, ParamListInfo params, Snapshot snapshot)
  				queryDesc = CreateQueryDesc((PlannedStmt *) linitial(portal->stmts),
  											portal->sourceText,
  											GetActiveSnapshot(),
- 											InvalidSnapshot,
  											None_Receiver,
  											params,
  											0);
--- 509,514 ----
*** a/src/backend/utils/adt/ri_triggers.c
--- b/src/backend/utils/adt/ri_triggers.c
***************
*** 230,236 **** static SPIPlanPtr ri_PlanCheck(const char *querystr, int nargs, Oid *argtypes,
  static bool ri_PerformCheck(RI_QueryKey *qkey, SPIPlanPtr qplan,
  				Relation fk_rel, Relation pk_rel,
  				HeapTuple old_tuple, HeapTuple new_tuple,
- 				bool detectNewRows,
  				int expect_OK, const char *constrname);
  static void ri_ExtractValues(RI_QueryKey *qkey, int key_idx,
  				 Relation rel, HeapTuple tuple,
--- 230,235 ----
***************
*** 357,363 **** RI_FKey_check(PG_FUNCTION_ARGS)
  		ri_PerformCheck(&qkey, qplan,
  						fk_rel, pk_rel,
  						NULL, NULL,
- 						false,
  						SPI_OK_SELECT,
  						NameStr(riinfo.conname));
  
--- 356,361 ----
***************
*** 500,506 **** RI_FKey_check(PG_FUNCTION_ARGS)
  	ri_PerformCheck(&qkey, qplan,
  					fk_rel, pk_rel,
  					NULL, new_row,
- 					false,
  					SPI_OK_SELECT,
  					NameStr(riinfo.conname));
  
--- 498,503 ----
***************
*** 661,667 **** ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel,
  	result = ri_PerformCheck(&qkey, qplan,
  							 fk_rel, pk_rel,
  							 old_row, NULL,
- 							 true,		/* treat like update */
  							 SPI_OK_SELECT, NULL);
  
  	if (SPI_finish() != SPI_OK_FINISH)
--- 658,663 ----
***************
*** 818,824 **** RI_FKey_noaction_del(PG_FUNCTION_ARGS)
  			ri_PerformCheck(&qkey, qplan,
  							fk_rel, pk_rel,
  							old_row, NULL,
- 							true,		/* must detect new rows */
  							SPI_OK_SELECT,
  							NameStr(riinfo.conname));
  
--- 814,819 ----
***************
*** 1006,1012 **** RI_FKey_noaction_upd(PG_FUNCTION_ARGS)
  			ri_PerformCheck(&qkey, qplan,
  							fk_rel, pk_rel,
  							old_row, NULL,
- 							true,		/* must detect new rows */
  							SPI_OK_SELECT,
  							NameStr(riinfo.conname));
  
--- 1001,1006 ----
***************
*** 1168,1174 **** RI_FKey_cascade_del(PG_FUNCTION_ARGS)
  			ri_PerformCheck(&qkey, qplan,
  							fk_rel, pk_rel,
  							old_row, NULL,
- 							true,		/* must detect new rows */
  							SPI_OK_DELETE,
  							NameStr(riinfo.conname));
  
--- 1162,1167 ----
***************
*** 1356,1362 **** RI_FKey_cascade_upd(PG_FUNCTION_ARGS)
  			ri_PerformCheck(&qkey, qplan,
  							fk_rel, pk_rel,
  							old_row, new_row,
- 							true,		/* must detect new rows */
  							SPI_OK_UPDATE,
  							NameStr(riinfo.conname));
  
--- 1349,1354 ----
***************
*** 1527,1533 **** RI_FKey_restrict_del(PG_FUNCTION_ARGS)
  			ri_PerformCheck(&qkey, qplan,
  							fk_rel, pk_rel,
  							old_row, NULL,
- 							true,		/* must detect new rows */
  							SPI_OK_SELECT,
  							NameStr(riinfo.conname));
  
--- 1519,1524 ----
***************
*** 1710,1716 **** RI_FKey_restrict_upd(PG_FUNCTION_ARGS)
  			ri_PerformCheck(&qkey, qplan,
  							fk_rel, pk_rel,
  							old_row, NULL,
- 							true,		/* must detect new rows */
  							SPI_OK_SELECT,
  							NameStr(riinfo.conname));
  
--- 1701,1706 ----
***************
*** 1881,1887 **** RI_FKey_setnull_del(PG_FUNCTION_ARGS)
  			ri_PerformCheck(&qkey, qplan,
  							fk_rel, pk_rel,
  							old_row, NULL,
- 							true,		/* must detect new rows */
  							SPI_OK_UPDATE,
  							NameStr(riinfo.conname));
  
--- 1871,1876 ----
***************
*** 2097,2103 **** RI_FKey_setnull_upd(PG_FUNCTION_ARGS)
  			ri_PerformCheck(&qkey, qplan,
  							fk_rel, pk_rel,
  							old_row, NULL,
- 							true,		/* must detect new rows */
  							SPI_OK_UPDATE,
  							NameStr(riinfo.conname));
  
--- 2086,2091 ----
***************
*** 2269,2275 **** RI_FKey_setdefault_del(PG_FUNCTION_ARGS)
  			ri_PerformCheck(&qkey, qplan,
  							fk_rel, pk_rel,
  							old_row, NULL,
- 							true,		/* must detect new rows */
  							SPI_OK_UPDATE,
  							NameStr(riinfo.conname));
  
--- 2257,2262 ----
***************
*** 2472,2478 **** RI_FKey_setdefault_upd(PG_FUNCTION_ARGS)
  			ri_PerformCheck(&qkey, qplan,
  							fk_rel, pk_rel,
  							old_row, NULL,
- 							true,		/* must detect new rows */
  							SPI_OK_UPDATE,
  							NameStr(riinfo.conname));
  
--- 2459,2464 ----
***************
*** 2792,2798 **** RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
  	spi_result = SPI_execute_snapshot(qplan,
  									  NULL, NULL,
  									  GetLatestSnapshot(),
- 									  InvalidSnapshot,
  									  true, false, 1);
  
  	/* Check result */
--- 2778,2783 ----
***************
*** 3271,3284 **** static bool
  ri_PerformCheck(RI_QueryKey *qkey, SPIPlanPtr qplan,
  				Relation fk_rel, Relation pk_rel,
  				HeapTuple old_tuple, HeapTuple new_tuple,
- 				bool detectNewRows,
  				int expect_OK, const char *constrname)
  {
  	Relation	query_rel,
  				source_rel;
  	int			key_idx;
  	Snapshot	test_snapshot;
- 	Snapshot	crosscheck_snapshot;
  	int			limit;
  	int			spi_result;
  	Oid			save_userid;
--- 3256,3267 ----
***************
*** 3330,3359 **** ri_PerformCheck(RI_QueryKey *qkey, SPIPlanPtr qplan,
  	}
  
  	/*
- 	 * In READ COMMITTED mode, we just need to use an up-to-date regular
- 	 * snapshot, and we will see all rows that could be interesting. But in
- 	 * transaction-snapshot mode, we can't change the transaction snapshot.
- 	 * If the caller passes detectNewRows == false then it's okay to do the query
- 	 * with the transaction snapshot; otherwise we use a current snapshot, and
- 	 * tell the executor to error out if it finds any rows under the current
- 	 * snapshot that wouldn't be visible per the transaction snapshot.  Note
- 	 * that SPI_execute_snapshot will register the snapshots, so we don't need
- 	 * to bother here.
- 	 */
- 	if (IsolationUsesXactSnapshot() && detectNewRows)
- 	{
- 		CommandCounterIncrement();		/* be sure all my own work is visible */
- 		test_snapshot = GetLatestSnapshot();
- 		crosscheck_snapshot = GetTransactionSnapshot();
- 	}
- 	else
- 	{
- 		/* the default SPI behavior is okay */
- 		test_snapshot = InvalidSnapshot;
- 		crosscheck_snapshot = InvalidSnapshot;
- 	}
- 
- 	/*
  	 * If this is a select query (e.g., for a 'no action' or 'restrict'
  	 * trigger), we only need to see if there is a single row in the table,
  	 * matching the key.  Otherwise, limit = 0 - because we want the query to
--- 3313,3318 ----
***************
*** 3369,3375 **** ri_PerformCheck(RI_QueryKey *qkey, SPIPlanPtr qplan,
  	/* Finally we can run the query. */
  	spi_result = SPI_execute_snapshot(qplan,
  									  vals, nulls,
! 									  test_snapshot, crosscheck_snapshot,
  									  false, false, limit);
  
  	/* Restore UID and security context */
--- 3328,3334 ----
  	/* Finally we can run the query. */
  	spi_result = SPI_execute_snapshot(qplan,
  									  vals, nulls,
! 									  InvalidSnapshot,
  									  false, false, limit);
  
  	/* Restore UID and security context */
*** a/src/backend/utils/time/snapmgr.c
--- b/src/backend/utils/time/snapmgr.c
***************
*** 27,32 ****
--- 27,33 ----
  
  #include "access/transam.h"
  #include "access/xact.h"
+ #include "access/multixact.h"
  #include "storage/proc.h"
  #include "storage/procarray.h"
  #include "utils/memutils.h"
***************
*** 126,131 **** GetTransactionSnapshot(void)
--- 127,142 ----
  	{
  		Assert(RegisteredSnapshots == 0);
  
+ 		/*
+ 		 * We must store the oldest visible multi xact *before* taking the
+ 		 * transaction snapshot. Otherwise HeapSatisfiesLockersVisible in
+ 		 * heapam.c will be in trouble, since it depends on being able to
+ 		 * inspect all multi xact ids which might contain xids invisible to
+ 		 * the transaction snapshot.
+ 		 */
+ 		if (IsolationUsesXactSnapshot())
+ 			MultiXactIdSetOldestVisible();
+ 
  		CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
  		FirstSnapshotSet = true;
  
*** a/src/backend/utils/time/tqual.c
--- b/src/backend/utils/time/tqual.c
***************
*** 72,80 **** SnapshotData SnapshotSelfData = {HeapTupleSatisfiesSelf};
  SnapshotData SnapshotAnyData = {HeapTupleSatisfiesAny};
  SnapshotData SnapshotToastData = {HeapTupleSatisfiesToast};
  
- /* local functions */
- static bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot);
- 
  
  /*
   * SetHintBits()
--- 72,77 ----
***************
*** 1253,1259 **** HeapTupleSatisfiesVacuum(HeapTupleHeader tuple, TransactionId OldestXmin,
   * by this function.  This is OK for current uses, because we actually only
   * apply this for known-committed XIDs.
   */
! static bool
  XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
  {
  	uint32		i;
--- 1250,1256 ----
   * by this function.  This is OK for current uses, because we actually only
   * apply this for known-committed XIDs.
   */
! bool
  XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
  {
  	uint32		i;
*** a/src/include/access/heapam.h
--- b/src/include/access/heapam.h
***************
*** 98,112 **** extern Oid heap_insert(Relation relation, HeapTuple tup, CommandId cid,
  			int options, BulkInsertState bistate);
  extern HTSU_Result heap_delete(Relation relation, ItemPointer tid,
  			ItemPointer ctid, TransactionId *update_xmax,
! 			CommandId cid, Snapshot crosscheck, bool wait);
  extern HTSU_Result heap_update(Relation relation, ItemPointer otid,
  			HeapTuple newtup,
  			ItemPointer ctid, TransactionId *update_xmax,
! 			CommandId cid, Snapshot crosscheck, bool wait);
  extern HTSU_Result heap_lock_tuple(Relation relation, HeapTuple tuple,
  				Buffer *buffer, ItemPointer ctid,
  				TransactionId *update_xmax, CommandId cid,
! 				LockTupleMode mode, bool nowait);
  extern void heap_inplace_update(Relation relation, HeapTuple tuple);
  extern bool heap_freeze_tuple(HeapTupleHeader tuple, TransactionId cutoff_xid,
  				  Buffer buf);
--- 98,113 ----
  			int options, BulkInsertState bistate);
  extern HTSU_Result heap_delete(Relation relation, ItemPointer tid,
  			ItemPointer ctid, TransactionId *update_xmax,
! 			CommandId cid, bool wait, Snapshot lockcheck_snapshot);
  extern HTSU_Result heap_update(Relation relation, ItemPointer otid,
  			HeapTuple newtup,
  			ItemPointer ctid, TransactionId *update_xmax,
! 			CommandId cid, bool wait, Snapshot lockcheck_snapshot);
  extern HTSU_Result heap_lock_tuple(Relation relation, HeapTuple tuple,
  				Buffer *buffer, ItemPointer ctid,
  				TransactionId *update_xmax, CommandId cid,
! 				LockTupleMode mode, bool nowait,
! 				Snapshot lockcheck_snapshot);
  extern void heap_inplace_update(Relation relation, HeapTuple tuple);
  extern bool heap_freeze_tuple(HeapTupleHeader tuple, TransactionId cutoff_xid,
  				  Buffer buf);
*** a/src/include/access/multixact.h
--- b/src/include/access/multixact.h
***************
*** 49,54 **** extern bool MultiXactIdIsCurrent(MultiXactId multi);
--- 49,55 ----
  extern void MultiXactIdWait(MultiXactId multi);
  extern bool ConditionalMultiXactIdWait(MultiXactId multi);
  extern void MultiXactIdSetOldestMember(void);
+ extern void MultiXactIdSetOldestVisible(void);
  extern int	GetMultiXactIdMembers(MultiXactId multi, TransactionId **xids);
  
  extern void AtEOXact_MultiXact(void);
*** a/src/include/executor/execdesc.h
--- b/src/include/executor/execdesc.h
***************
*** 39,45 **** typedef struct QueryDesc
  	Node	   *utilitystmt;	/* utility statement, or null */
  	const char *sourceText;		/* source text of the query */
  	Snapshot	snapshot;		/* snapshot to use for query */
- 	Snapshot	crosscheck_snapshot;	/* crosscheck for RI update/delete */
  	DestReceiver *dest;			/* the destination for tuple output */
  	ParamListInfo params;		/* param values being passed in */
  	int			instrument_options;		/* OR of InstrumentOption flags */
--- 39,44 ----
***************
*** 57,63 **** typedef struct QueryDesc
  extern QueryDesc *CreateQueryDesc(PlannedStmt *plannedstmt,
  				const char *sourceText,
  				Snapshot snapshot,
- 				Snapshot crosscheck_snapshot,
  				DestReceiver *dest,
  				ParamListInfo params,
  				int instrument_options);
--- 56,61 ----
*** a/src/include/executor/spi.h
--- b/src/include/executor/spi.h
***************
*** 82,88 **** extern int SPI_execp(SPIPlanPtr plan, Datum *Values, const char *Nulls,
  extern int SPI_execute_snapshot(SPIPlanPtr plan,
  					 Datum *Values, const char *Nulls,
  					 Snapshot snapshot,
- 					 Snapshot crosscheck_snapshot,
  					 bool read_only, bool fire_triggers, long tcount);
  extern int SPI_execute_with_args(const char *src,
  					  int nargs, Oid *argtypes,
--- 82,87 ----
*** a/src/include/nodes/execnodes.h
--- b/src/include/nodes/execnodes.h
***************
*** 337,343 **** typedef struct EState
  	/* Basic state for all query types: */
  	ScanDirection es_direction; /* current scan direction */
  	Snapshot	es_snapshot;	/* time qual to use */
- 	Snapshot	es_crosscheck_snapshot; /* crosscheck time qual for RI */
  	List	   *es_range_table; /* List of RangeTblEntry */
  	PlannedStmt *es_plannedstmt;	/* link to top of plan tree */
  
--- 337,342 ----
*** a/src/include/utils/tqual.h
--- b/src/include/utils/tqual.h
***************
*** 41,46 **** extern PGDLLIMPORT SnapshotData SnapshotToastData;
--- 41,48 ----
  #define IsMVCCSnapshot(snapshot)  \
  	((snapshot)->satisfies == HeapTupleSatisfiesMVCC)
  
+ bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot);
+ 
  /*
   * HeapTupleSatisfiesVisibility
   *		True iff heap tuple satisfies a time qual.
#2Kevin Grittner
Kevin.Grittner@wicourts.gov
In reply to: Kevin Grittner (#1)
Re: Review: Row-level Locks & SERIALIZABLE transactions, postgres vs. Oracle

Kevin Grittner <grimkg@gmail.com> wrote:

This now compiles and passes regression tests. I still need to
re-run all the other tests which Florian and I previously used to
test the patch. I don't have any reason to expect that they will
now fail, but one need to be thorough. Once that is confirmed, I
think this will be ready for committer unless someone can think of
something else to throw at it first.

I reran the tests at http://github.com/fgp/fk_concurrency and,
unsurprisingly, it still works.

This patch addresses concerns I heard expressed by a couple guys
from an Oracle shop who wanted to convert to PostgreSQL but were
much put out by the behavior of SELECT FOR UPDATE under snapshot
isolation in PostgreSQL. This patch should do much to ease the
migration of some Oracle shops to PostgreSQL.

A complete review was done in the last CF, but I held off marking it
as Ready for Committer then because there were some more tests
coming, which all looked good.

http://archives.postgresql.org/message-id/4C41932002000025000337FB@gw.wicourts.gov

I am marking this patch "Ready for Committer" now.

-Kevin