SSI memory mitigation & false positive degradation
To recap, I've had an open question on the Serializable Wiki page[1]http://wiki.postgresql.org/wiki/Serializable
since January about how we should handle long-running transactions.
The algorithm published by Cahill et al requires keeping some
transaction information in memory for all committed transactions
which overlapped a still-running transaction. Since we need to keep
this in shared memory, and the structures must have a finite
allocation, there's an obvious looming limit, even if the allocation
is relatively generous.
The two obvious solutions on resource exhaustion are to roll back the
oldest active serializable transaction or to block or cancel new
serializable transactions. Neither is particularly appealing. In
September and October Heikki was kind enough to share some insights
and ideas about alternatives. I've done what I can to pursue this,
but my availability to follow up on it has been limited until now. I
have blocked out a lot of time between now and the middle of January
to try to resolve the issue, and Dan has been able to devote time to
this lately as well.
I've been pursuing the ideas sketched out in a prior post[2]http://archives.postgresql.org/pgsql-hackers/2010-10/msg01754.php as a way
to work into the approach suggested by Heikki, which amounts to
summarizing old data (at the risk of some false positive
serialization failures) and accessing old data through the SLRU API
(and suffering some performance hit if that data spills to disk).
Pursuing mitigation through more aggressive cleanup seemed important
to me both to reduce the performance impact of the graceful
degradation approach, and to better understand which data are needed
when.
Dan and I have now implemented most of the mitigation techniques
described in [2]http://archives.postgresql.org/pgsql-hackers/2010-10/msg01754.php, and I now feel confident I have a good grasp of how
long each type of data is useful. (By useful I mean that to maintain
data integrity without them it will be necessary to roll back some
transactions which could have been allowed to commit had the data
been available.) I'll be adding this to the Wiki page, but below are
the results, as I understand them. I have yet to define exactly what
we will drop at which point, but that should be coming within a few
days.
Read only transactions only have one real need to track committed
data, but it is the most complex to state:
(1) An active read only transaction needs to be able to recognize
when it is reading a tuple which was written by an overlapping
transaction which has committed, but only if that read write
transaction has a rw-conflict out to a transaction committed before
the read only transaction acquired its snapshot.
A read only transaction which detects such a rw-conflict out must
fail with a serialization conflict. The multiple conditions required
for this, however, have an up-side -- they allow us to detect when a
read only transaction no longer has any possibility of creating such
a conflict, and therefore entirely removing it from predicate locking
and conflict detection. This is true whether the read only
transaction is committed, active, or the new DEFERRABLE type of
transaction which will wait for these conditions before it starts.
I've found four statements about what committed data are useful for
read write transactions:
(2) An active read write transaction needs to be able to recognize
when it is reading a tuple which was written by an overlapping
transaction which has committed, and to know whether that committed
transaction had any rw-conflict(s) out to previously committed
transaction(s). This is rather similar to (1).
(3) An active read write transaction needs to be able to detect when
one of its writes conflicts with a predicate lock from an overlapping
transaction which has committed. There's no need to know which one,
but by the definition of a rw-conflict, it must have overlapped.
(4) An active read write transaction needs to know that it had a
rw-conflict out to a committed transaction. There's no need to know
which one, but by the definition of a rw-conflict, it must have
overlapped.
(5) An active read write transaction needs to know that it had a
rw-conflict in from a committed transaction. There's no need to know
which one, but by the definition of a rw-conflict, it must have
overlapped.
Since I know some people prefer to look at a patch than to poke
around someone's git repo, I'm attaching a patch reflecting what's in
the repo at the moment. Consider it WIP. While I hope to post a
patch as a serious candidate for the release within a couple weeks, I
would sure welcome any feedback before then, so the candidate patch
is in the best shape possible. (To avoid confusion with Florian's
patch, I'm using ssi instead of serializable in file name and subject
lines.)
-Kevin
[1]: http://wiki.postgresql.org/wiki/Serializable
[2]: http://archives.postgresql.org/pgsql-hackers/2010-10/msg01754.php
Attachments:
ssi-7.patchapplication/octet-stream; name=ssi-7.patchDownload
*** a/GNUmakefile.in
--- b/GNUmakefile.in
***************
*** 57,63 **** distclean maintainer-clean:
check: all
! check installcheck installcheck-parallel:
$(MAKE) -C src/test $@
$(call recurse,installcheck-world,src/test src/pl src/interfaces/ecpg contrib,installcheck)
--- 57,63 ----
check: all
! check dcheck installcheck installcheck-parallel:
$(MAKE) -C src/test $@
$(call recurse,installcheck-world,src/test src/pl src/interfaces/ecpg contrib,installcheck)
*** a/src/backend/access/gist/gist.c
--- b/src/backend/access/gist/gist.c
***************
*** 20,25 ****
--- 20,26 ----
#include "miscadmin.h"
#include "storage/bufmgr.h"
#include "storage/indexfsm.h"
+ #include "storage/predicate.h"
#include "utils/memutils.h"
/* Working state for gistbuild and its callback */
***************
*** 306,311 **** gistplacetopage(GISTInsertState *state, GISTSTATE *giststate,
--- 307,314 ----
*splitinfo = NIL;
+ CheckForSerializableConflictIn(state->r, NULL, state->stack->buffer);
+
/*
* if isupdate, remove old key: This node's key has been modified, either
* because a child split occurred or because we needed to adjust our key
*** a/src/backend/access/gist/gistget.c
--- b/src/backend/access/gist/gistget.c
***************
*** 20,25 ****
--- 20,26 ----
#include "miscadmin.h"
#include "pgstat.h"
#include "storage/bufmgr.h"
+ #include "storage/predicate.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
*** a/src/backend/access/gist/gistvacuum.c
--- b/src/backend/access/gist/gistvacuum.c
***************
*** 23,28 ****
--- 23,29 ----
#include "storage/freespace.h"
#include "storage/indexfsm.h"
#include "storage/lmgr.h"
+ #include "storage/predicate.h"
#include "utils/memutils.h"
***************
*** 87,94 **** gistvacuumcleanup(PG_FUNCTION_ARGS)
if (PageIsNew(page) || GistPageIsDeleted(page))
{
! totFreePages++;
! RecordFreeIndexPage(rel, blkno);
}
else
lastFilledBlock = blkno;
--- 88,98 ----
if (PageIsNew(page) || GistPageIsDeleted(page))
{
! if (!PageIsPredicateLocked(rel, blkno))
! {
! totFreePages++;
! RecordFreeIndexPage(rel, blkno);
! }
}
else
lastFilledBlock = blkno;
*** a/src/backend/access/heap/heapam.c
--- b/src/backend/access/heap/heapam.c
***************
*** 57,62 ****
--- 57,63 ----
#include "storage/bufmgr.h"
#include "storage/freespace.h"
#include "storage/lmgr.h"
+ #include "storage/predicate.h"
#include "storage/procarray.h"
#include "storage/smgr.h"
#include "storage/standby.h"
***************
*** 261,280 **** heapgetpage(HeapScanDesc scan, BlockNumber page)
{
if (ItemIdIsNormal(lpp))
{
bool valid;
if (all_visible)
valid = true;
else
- {
- HeapTupleData loctup;
-
- loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
- loctup.t_len = ItemIdGetLength(lpp);
- ItemPointerSet(&(loctup.t_self), page, lineoff);
-
valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer);
! }
if (valid)
scan->rs_vistuples[ntup++] = lineoff;
}
--- 262,281 ----
{
if (ItemIdIsNormal(lpp))
{
+ HeapTupleData loctup;
bool valid;
+ loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
+ loctup.t_len = ItemIdGetLength(lpp);
+ ItemPointerSet(&(loctup.t_self), page, lineoff);
+
if (all_visible)
valid = true;
else
valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer);
!
! CheckForSerializableConflictOut(valid, scan->rs_rd, &loctup, buffer);
!
if (valid)
scan->rs_vistuples[ntup++] = lineoff;
}
***************
*** 468,479 **** heapgettup(HeapScanDesc scan,
--- 469,483 ----
snapshot,
scan->rs_cbuf);
+ CheckForSerializableConflictOut(valid, scan->rs_rd, tuple, scan->rs_cbuf);
+
if (valid && key != NULL)
HeapKeyTest(tuple, RelationGetDescr(scan->rs_rd),
nkeys, key, valid);
if (valid)
{
+ PredicateLockTuple(scan->rs_rd, tuple);
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
return;
}
***************
*** 741,752 **** heapgettup_pagemode(HeapScanDesc scan,
--- 745,758 ----
nkeys, key, valid);
if (valid)
{
+ PredicateLockTuple(scan->rs_rd, tuple);
scan->rs_cindex = lineindex;
return;
}
}
else
{
+ PredicateLockTuple(scan->rs_rd, tuple);
scan->rs_cindex = lineindex;
return;
}
***************
*** 1460,1467 **** heap_fetch(Relation relation,
--- 1466,1476 ----
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ CheckForSerializableConflictOut(valid, relation, tuple, buffer);
+
if (valid)
{
+ PredicateLockTuple(relation, tuple);
/*
* All checks passed, so return the tuple as valid. Caller is now
* responsible for releasing the buffer.
***************
*** 1505,1517 **** heap_fetch(Relation relation,
* heap_fetch, we do not report any pgstats count; caller may do so if wanted.
*/
bool
! heap_hot_search_buffer(ItemPointer tid, Buffer buffer, Snapshot snapshot,
! bool *all_dead)
{
Page dp = (Page) BufferGetPage(buffer);
TransactionId prev_xmax = InvalidTransactionId;
OffsetNumber offnum;
bool at_chain_start;
if (all_dead)
*all_dead = true;
--- 1514,1528 ----
* heap_fetch, we do not report any pgstats count; caller may do so if wanted.
*/
bool
! heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer,
! Snapshot snapshot, bool *all_dead)
{
Page dp = (Page) BufferGetPage(buffer);
TransactionId prev_xmax = InvalidTransactionId;
OffsetNumber offnum;
bool at_chain_start;
+ bool valid;
+ bool match_found;
if (all_dead)
*all_dead = true;
***************
*** 1521,1526 **** heap_hot_search_buffer(ItemPointer tid, Buffer buffer, Snapshot snapshot,
--- 1532,1538 ----
Assert(ItemPointerGetBlockNumber(tid) == BufferGetBlockNumber(buffer));
offnum = ItemPointerGetOffsetNumber(tid);
at_chain_start = true;
+ match_found = false;
/* Scan through possible multiple members of HOT-chain */
for (;;)
***************
*** 1551,1556 **** heap_hot_search_buffer(ItemPointer tid, Buffer buffer, Snapshot snapshot,
--- 1563,1570 ----
heapTuple.t_data = (HeapTupleHeader) PageGetItem(dp, lp);
heapTuple.t_len = ItemIdGetLength(lp);
+ heapTuple.t_tableOid = relation->rd_id;
+ heapTuple.t_self = *tid;
/*
* Shouldn't see a HEAP_ONLY tuple at chain start.
***************
*** 1568,1579 **** heap_hot_search_buffer(ItemPointer tid, Buffer buffer, Snapshot snapshot,
break;
/* If it's visible per the snapshot, we must return it */
! if (HeapTupleSatisfiesVisibility(&heapTuple, snapshot, buffer))
{
ItemPointerSetOffsetNumber(tid, offnum);
if (all_dead)
*all_dead = false;
! return true;
}
/*
--- 1582,1599 ----
break;
/* If it's visible per the snapshot, we must return it */
! valid = HeapTupleSatisfiesVisibility(&heapTuple, snapshot, buffer);
! CheckForSerializableConflictOut(valid, relation, &heapTuple, buffer);
! if (valid)
{
ItemPointerSetOffsetNumber(tid, offnum);
+ PredicateLockTuple(relation, &heapTuple);
if (all_dead)
*all_dead = false;
! if (IsolationIsSerializable())
! match_found = true;
! else
! return true;
}
/*
***************
*** 1602,1608 **** heap_hot_search_buffer(ItemPointer tid, Buffer buffer, Snapshot snapshot,
break; /* end of chain */
}
! return false;
}
/*
--- 1622,1628 ----
break; /* end of chain */
}
! return match_found;
}
/*
***************
*** 1621,1627 **** heap_hot_search(ItemPointer tid, Relation relation, Snapshot snapshot,
buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
LockBuffer(buffer, BUFFER_LOCK_SHARE);
! result = heap_hot_search_buffer(tid, buffer, snapshot, all_dead);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
return result;
--- 1641,1647 ----
buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
LockBuffer(buffer, BUFFER_LOCK_SHARE);
! result = heap_hot_search_buffer(tid, relation, buffer, snapshot, all_dead);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
return result;
***************
*** 1728,1733 **** heap_get_latest_tid(Relation relation,
--- 1748,1754 ----
* result candidate.
*/
valid = HeapTupleSatisfiesVisibility(&tp, snapshot, buffer);
+ CheckForSerializableConflictOut(valid, relation, &tp, buffer);
if (valid)
*tid = ctid;
***************
*** 1892,1897 **** heap_insert(Relation relation, HeapTuple tup, CommandId cid,
--- 1913,1925 ----
buffer = RelationGetBufferForTuple(relation, heaptup->t_len,
InvalidBuffer, options, bistate);
+ /*
+ * We're about to do the actual insert -- check for conflict at the
+ * relation or buffer level first, to avoid possibly having to roll
+ * back work we've just done.
+ */
+ CheckForSerializableConflictIn(relation, NULL, buffer);
+
/* NO EREPORT(ERROR) from here till changes are logged */
START_CRIT_SECTION();
***************
*** 2192,2197 **** l1:
--- 2220,2231 ----
return result;
}
+ /*
+ * We're about to do the actual delete -- check for conflict first,
+ * to avoid possibly having to roll back work we've just done.
+ */
+ CheckForSerializableConflictIn(relation, &tp, buffer);
+
/* replace cid with a combo cid if necessary */
HeapTupleHeaderAdjustCmax(tp.t_data, &cid, &iscombo);
***************
*** 2545,2550 **** l2:
--- 2579,2590 ----
return result;
}
+ /*
+ * We're about to do the actual update -- check for conflict first,
+ * to avoid possibly having to roll back work we've just done.
+ */
+ CheckForSerializableConflictIn(relation, &oldtup, buffer);
+
/* Fill in OID and transaction status data for newtup */
if (relation->rd_rel->relhasoids)
{
***************
*** 2690,2695 **** l2:
--- 2730,2745 ----
}
/*
+ * We're about to create the new tuple -- check for conflict first,
+ * to avoid possibly having to roll back work we've just done.
+ *
+ * NOTE: For a tuple insert, we only need to check for table locks, since
+ * predicate locking at the index level will cover ranges for anything
+ * except a table scan. Therefore, only provide the relation.
+ */
+ CheckForSerializableConflictIn(relation, NULL, InvalidBuffer);
+
+ /*
* At this point newbuf and buffer are both pinned and locked, and newbuf
* has enough space for the new tuple. If they are the same buffer, only
* one pin is held.
***************
*** 2829,2834 **** l2:
--- 2879,2890 ----
CacheInvalidateHeapTuple(relation, heaptup);
/*
+ * TODO SSI: In order to support SIREAD locks at tuple granularity, any
+ * existing SIREAD locks on the old tuple must be copied to
+ * also refer to the new tuple, somewhere around this point?
+ */
+
+ /*
* Release the lmgr tuple lock, if we had it.
*/
if (have_tuple_lock)
*** a/src/backend/access/index/indexam.c
--- b/src/backend/access/index/indexam.c
***************
*** 64,72 ****
--- 64,74 ----
#include "access/relscan.h"
#include "access/transam.h"
+ #include "access/xact.h"
#include "pgstat.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
+ #include "storage/predicate.h"
#include "utils/relcache.h"
#include "utils/snapmgr.h"
#include "utils/tqual.h"
***************
*** 192,197 **** index_insert(Relation indexRelation,
--- 194,204 ----
RELATION_CHECKS;
GET_REL_PROCEDURE(aminsert);
+ if (!(indexRelation->rd_am->ampredlocks))
+ CheckForSerializableConflictIn(indexRelation,
+ (HeapTuple) NULL,
+ InvalidBuffer);
+
/*
* have the am's insert proc do all the work.
*/
***************
*** 266,271 **** index_beginscan_internal(Relation indexRelation,
--- 273,281 ----
RELATION_CHECKS;
GET_REL_PROCEDURE(ambeginscan);
+ if (!(indexRelation->rd_am->ampredlocks))
+ PredicateLockRelation(indexRelation);
+
/*
* We hold a reference count to the relcache entry throughout the scan.
*/
***************
*** 523,528 **** index_getnext(IndexScanDesc scan, ScanDirection direction)
--- 533,539 ----
{
ItemId lp;
ItemPointer ctid;
+ bool valid;
/* check for bogus TID */
if (offnum < FirstOffsetNumber ||
***************
*** 577,584 **** index_getnext(IndexScanDesc scan, ScanDirection direction)
break;
/* If it's visible per the snapshot, we must return it */
! if (HeapTupleSatisfiesVisibility(heapTuple, scan->xs_snapshot,
! scan->xs_cbuf))
{
/*
* If the snapshot is MVCC, we know that it could accept at
--- 588,600 ----
break;
/* If it's visible per the snapshot, we must return it */
! valid = HeapTupleSatisfiesVisibility(heapTuple, scan->xs_snapshot,
! scan->xs_cbuf);
!
! CheckForSerializableConflictOut(valid, scan->heapRelation,
! heapTuple, scan->xs_cbuf);
!
! if (valid)
{
/*
* If the snapshot is MVCC, we know that it could accept at
***************
*** 586,592 **** index_getnext(IndexScanDesc scan, ScanDirection direction)
* any more members. Otherwise, check for continuation of the
* HOT-chain, and set state for next time.
*/
! if (IsMVCCSnapshot(scan->xs_snapshot))
scan->xs_next_hot = InvalidOffsetNumber;
else if (HeapTupleIsHotUpdated(heapTuple))
{
--- 602,609 ----
* any more members. Otherwise, check for continuation of the
* HOT-chain, and set state for next time.
*/
! if (IsMVCCSnapshot(scan->xs_snapshot)
! && !IsolationIsSerializable())
scan->xs_next_hot = InvalidOffsetNumber;
else if (HeapTupleIsHotUpdated(heapTuple))
{
***************
*** 602,607 **** index_getnext(IndexScanDesc scan, ScanDirection direction)
--- 619,626 ----
pgstat_count_heap_fetch(scan->indexRelation);
+ PredicateLockTuple(scan->heapRelation, heapTuple);
+
return heapTuple;
}
*** a/src/backend/access/nbtree/nbtinsert.c
--- b/src/backend/access/nbtree/nbtinsert.c
***************
*** 21,26 ****
--- 21,27 ----
#include "miscadmin.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
+ #include "storage/predicate.h"
#include "utils/inval.h"
#include "utils/tqual.h"
***************
*** 174,179 **** top:
--- 175,188 ----
if (checkUnique != UNIQUE_CHECK_EXISTING)
{
+ /*
+ * The only conflict predicate locking cares about for indexes is when
+ * an index tuple insert conflicts with an existing lock. Since the
+ * actual location of the insert is hard to predict because of the
+ * random search used to prevent O(N^2) performance when there are many
+ * duplicate entries, we can just use the "first valid" page.
+ */
+ CheckForSerializableConflictIn(rel, NULL, buf);
/* do the insertion */
_bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup, heapRel);
_bt_insertonpg(rel, buf, stack, itup, offset, false);
***************
*** 696,701 **** _bt_insertonpg(Relation rel,
--- 705,713 ----
/* split the buffer into left and right halves */
rbuf = _bt_split(rel, buf, firstright,
newitemoff, itemsz, itup, newitemonleft);
+ PredicateLockPageSplit(rel,
+ BufferGetBlockNumber(buf),
+ BufferGetBlockNumber(rbuf));
/*----------
* By here,
*** a/src/backend/access/nbtree/nbtpage.c
--- b/src/backend/access/nbtree/nbtpage.c
***************
*** 29,34 ****
--- 29,35 ----
#include "storage/freespace.h"
#include "storage/indexfsm.h"
#include "storage/lmgr.h"
+ #include "storage/predicate.h"
#include "utils/inval.h"
#include "utils/snapmgr.h"
***************
*** 1184,1189 **** _bt_pagedel(Relation rel, Buffer buf, BTStack stack)
--- 1185,1196 ----
RelationGetRelationName(rel));
/*
+ * Any insert which would have gone on the target block will now go to the
+ * right sibling block.
+ */
+ PredicateLockPageCombine(rel, target, rightsib);
+
+ /*
* Next find and write-lock the current parent of the target page. This is
* essentially the same as the corresponding step of splitting.
*/
*** a/src/backend/access/nbtree/nbtsearch.c
--- b/src/backend/access/nbtree/nbtsearch.c
***************
*** 21,26 ****
--- 21,27 ----
#include "miscadmin.h"
#include "pgstat.h"
#include "storage/bufmgr.h"
+ #include "storage/predicate.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
***************
*** 63,69 **** _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
--- 64,73 ----
/* If index is empty and access = BT_READ, no root page is created. */
if (!BufferIsValid(*bufP))
+ {
+ PredicateLockRelation(rel); /* Nothing finer to lock exists. */
return (BTStack) NULL;
+ }
/* Loop iterates once per level descended in the tree */
for (;;)
***************
*** 88,94 **** _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
--- 92,102 ----
page = BufferGetPage(*bufP);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (P_ISLEAF(opaque))
+ {
+ if (access == BT_READ)
+ PredicateLockPage(rel, BufferGetBlockNumber(*bufP));
break;
+ }
/*
* Find the appropriate item on the internal page, and get the child
***************
*** 1142,1147 **** _bt_steppage(IndexScanDesc scan, ScanDirection dir)
--- 1150,1156 ----
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (!P_IGNORE(opaque))
{
+ PredicateLockPage(rel, blkno);
/* see if there are any matches on this page */
/* note that this will clear moreRight if we can stop */
if (_bt_readpage(scan, dir, P_FIRSTDATAKEY(opaque)))
***************
*** 1189,1194 **** _bt_steppage(IndexScanDesc scan, ScanDirection dir)
--- 1198,1204 ----
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (!P_IGNORE(opaque))
{
+ PredicateLockPage(rel, BufferGetBlockNumber(so->currPos.buf));
/* see if there are any matches on this page */
/* note that this will clear moreLeft if we can stop */
if (_bt_readpage(scan, dir, PageGetMaxOffsetNumber(page)))
***************
*** 1352,1357 **** _bt_get_endpoint(Relation rel, uint32 level, bool rightmost)
--- 1362,1368 ----
if (!BufferIsValid(buf))
{
/* empty index... */
+ PredicateLockRelation(rel); /* Nothing finer to lock exists. */
return InvalidBuffer;
}
***************
*** 1431,1440 **** _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
--- 1442,1453 ----
if (!BufferIsValid(buf))
{
/* empty index... */
+ PredicateLockRelation(rel); /* Nothing finer to lock exists. */
so->currPos.buf = InvalidBuffer;
return false;
}
+ PredicateLockPage(rel, BufferGetBlockNumber(buf));
page = BufferGetPage(buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
Assert(P_ISLEAF(opaque));
*** a/src/backend/access/transam/varsup.c
--- b/src/backend/access/transam/varsup.c
***************
*** 21,26 ****
--- 21,27 ----
#include "miscadmin.h"
#include "postmaster/autovacuum.h"
#include "storage/pmsignal.h"
+ #include "storage/predicate.h"
#include "storage/proc.h"
#include "utils/builtins.h"
#include "utils/syscache.h"
***************
*** 157,165 **** GetNewTransactionId(bool isSubXact)
--- 158,169 ----
* holds 32K or more transactions, so we don't have to do this very often.
*
* Extend pg_subtrans too.
+ * If it's top level, the predicate locking system also needs to know.
*/
ExtendCLOG(xid);
ExtendSUBTRANS(xid);
+ if (!isSubXact)
+ RegisterPredicateLockingXid(xid);
/*
* Now advance the nextXid counter. This must not happen until after we
*** a/src/backend/access/transam/xact.c
--- b/src/backend/access/transam/xact.c
***************
*** 40,45 ****
--- 40,46 ----
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/lmgr.h"
+ #include "storage/predicate.h"
#include "storage/procarray.h"
#include "storage/sinvaladt.h"
#include "storage/smgr.h"
***************
*** 63,68 **** int XactIsoLevel;
--- 64,71 ----
bool DefaultXactReadOnly = false;
bool XactReadOnly;
+ bool XactDeferrable;
+
bool XactSyncCommit = true;
int CommitDelay = 0; /* precommit delay in microseconds */
***************
*** 1639,1644 **** StartTransaction(void)
--- 1642,1648 ----
s->startedInRecovery = false;
XactReadOnly = DefaultXactReadOnly;
}
+ XactDeferrable = false;
XactIsoLevel = DefaultXactIsoLevel;
forceSyncCommit = false;
MyXactAccessedTempRel = false;
***************
*** 1786,1791 **** CommitTransaction(void)
--- 1790,1802 ----
AtEOXact_LargeObject(true);
/*
+ * Mark serializable transaction as complete for predicate locking
+ * purposes. This should be done as late as we can put it and still
+ * allow errors to be raised for failure patterns found at commit.
+ */
+ PreCommit_CheckForSerializationFailure();
+
+ /*
* Insert notifications sent by NOTIFY commands into the queue. This
* should be late in the pre-commit sequence to minimize time spent
* holding the notify-insertion lock.
***************
*** 1979,1984 **** PrepareTransaction(void)
--- 1990,2002 ----
/* close large objects before lower-level cleanup */
AtEOXact_LargeObject(true);
+ /*
+ * Mark serializable transaction as complete for predicate locking
+ * purposes. This should be done as late as we can put it and still
+ * allow errors to be raised for failure patterns found at commit.
+ */
+ PreCommit_CheckForSerializationFailure();
+
/* NOTIFY will be handled below */
/*
*** a/src/backend/commands/variable.c
--- b/src/backend/commands/variable.c
***************
*** 618,623 **** show_XactIsoLevel(void)
--- 618,652 ----
}
}
+ /*
+ * SET TRANSACTION [NOT] DEFERRABLE
+ */
+
+ bool
+ assign_transaction_deferrable(bool newval, bool doit, GucSource source)
+ {
+ /* source == PGC_S_OVERRIDE means do it anyway, eg at xact abort */
+ if (source == PGC_S_OVERRIDE)
+ return true;
+
+ if (IsSubTransaction())
+ {
+ ereport(GUC_complaint_elevel(source),
+ (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
+ errmsg("SET TRANSACTION [NOT] DEFERRABLE cannot be called within a subtransaction")));
+ return false;
+ }
+
+ if (FirstSnapshotSet)
+ {
+ ereport(GUC_complaint_elevel(source),
+ (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
+ errmsg("SET TRANSACTION [NOT] DEFERRABLE must be called before any query")));
+ return false;
+ }
+
+ return true;
+ }
/*
* Random number seed
*** a/src/backend/executor/nodeBitmapHeapscan.c
--- b/src/backend/executor/nodeBitmapHeapscan.c
***************
*** 42,47 ****
--- 42,48 ----
#include "executor/nodeBitmapHeapscan.h"
#include "pgstat.h"
#include "storage/bufmgr.h"
+ #include "storage/predicate.h"
#include "utils/memutils.h"
#include "utils/snapmgr.h"
#include "utils/tqual.h"
***************
*** 351,357 **** bitgetpage(HeapScanDesc scan, TBMIterateResult *tbmres)
ItemPointerData tid;
ItemPointerSet(&tid, page, offnum);
! if (heap_hot_search_buffer(&tid, buffer, snapshot, NULL))
scan->rs_vistuples[ntup++] = ItemPointerGetOffsetNumber(&tid);
}
}
--- 352,358 ----
ItemPointerData tid;
ItemPointerSet(&tid, page, offnum);
! if (heap_hot_search_buffer(&tid, scan->rs_rd, buffer, snapshot, NULL))
scan->rs_vistuples[ntup++] = ItemPointerGetOffsetNumber(&tid);
}
}
*** a/src/backend/executor/nodeSeqscan.c
--- b/src/backend/executor/nodeSeqscan.c
***************
*** 28,33 ****
--- 28,34 ----
#include "access/relscan.h"
#include "executor/execdebug.h"
#include "executor/nodeSeqscan.h"
+ #include "storage/predicate.h"
static void InitScanRelation(SeqScanState *node, EState *estate);
static TupleTableSlot *SeqNext(SeqScanState *node);
***************
*** 105,115 **** SeqRecheck(SeqScanState *node, TupleTableSlot *slot)
--- 106,118 ----
* tuple.
* We call the ExecScan() routine and pass it the appropriate
* access method functions.
+ * For serializable transactions, we first lock the entire relation.
* ----------------------------------------------------------------
*/
TupleTableSlot *
ExecSeqScan(SeqScanState *node)
{
+ PredicateLockRelation(node->ss_currentRelation);
return ExecScan((ScanState *) node,
(ExecScanAccessMtd) SeqNext,
(ExecScanRecheckMtd) SeqRecheck);
*** a/src/backend/parser/gram.y
--- b/src/backend/parser/gram.y
***************
*** 6590,6595 **** transaction_mode_item:
--- 6590,6601 ----
| READ WRITE
{ $$ = makeDefElem("transaction_read_only",
makeIntConst(FALSE, @1)); }
+ | DEFERRABLE
+ { $$ = makeDefElem("transaction_deferrable",
+ makeIntConst(TRUE, @1)); }
+ | NOT DEFERRABLE
+ { $$ = makeDefElem("transaction_deferrable",
+ makeIntConst(FALSE, @1)); }
;
/* Syntax with commas is SQL-spec, without commas is Postgres historical */
*** a/src/backend/storage/freespace/indexfsm.c
--- b/src/backend/storage/freespace/indexfsm.c
***************
*** 24,29 ****
--- 24,30 ----
#include "storage/freespace.h"
#include "storage/indexfsm.h"
+ #include "storage/predicate.h"
#include "storage/smgr.h"
/*
***************
*** 52,57 **** GetFreeIndexPage(Relation rel)
--- 53,59 ----
void
RecordFreeIndexPage(Relation rel, BlockNumber freeBlock)
{
+ Assert(!PageIsPredicateLocked(rel, freeBlock));
RecordPageWithFreeSpace(rel, freeBlock, BLCKSZ - 1);
}
*** a/src/backend/storage/ipc/ipci.c
--- b/src/backend/storage/ipc/ipci.c
***************
*** 32,37 ****
--- 32,38 ----
#include "storage/ipc.h"
#include "storage/pg_shmem.h"
#include "storage/pmsignal.h"
+ #include "storage/predicate.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "storage/sinvaladt.h"
***************
*** 105,110 **** CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
--- 106,112 ----
sizeof(ShmemIndexEnt)));
size = add_size(size, BufferShmemSize());
size = add_size(size, LockShmemSize());
+ size = add_size(size, PredicateLockShmemSize());
size = add_size(size, ProcGlobalShmemSize());
size = add_size(size, XLOGShmemSize());
size = add_size(size, CLOGShmemSize());
***************
*** 200,205 **** CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
--- 202,212 ----
InitLocks();
/*
+ * Set up predicate lock manager
+ */
+ InitPredicateLocks();
+
+ /*
* Set up process table
*/
if (!IsUnderPostmaster)
*** a/src/backend/storage/ipc/shmqueue.c
--- b/src/backend/storage/ipc/shmqueue.c
***************
*** 43,56 **** SHMQueueInit(SHM_QUEUE *queue)
* SHMQueueIsDetached -- TRUE if element is not currently
* in a queue.
*/
- #ifdef NOT_USED
bool
! SHMQueueIsDetached(SHM_QUEUE *queue)
{
Assert(ShmemAddrIsValid(queue));
return (queue->prev == NULL);
}
- #endif
/*
* SHMQueueElemInit -- clear an element's links
--- 43,54 ----
* SHMQueueIsDetached -- TRUE if element is not currently
* in a queue.
*/
bool
! SHMQueueIsDetached(const SHM_QUEUE *queue)
{
Assert(ShmemAddrIsValid(queue));
return (queue->prev == NULL);
}
/*
* SHMQueueElemInit -- clear an element's links
***************
*** 146,152 **** SHMQueueInsertAfter(SHM_QUEUE *queue, SHM_QUEUE *elem)
*--------------------
*/
Pointer
! SHMQueueNext(SHM_QUEUE *queue, SHM_QUEUE *curElem, Size linkOffset)
{
SHM_QUEUE *elemPtr = curElem->next;
--- 144,150 ----
*--------------------
*/
Pointer
! SHMQueueNext(const SHM_QUEUE *queue, const SHM_QUEUE *curElem, Size linkOffset)
{
SHM_QUEUE *elemPtr = curElem->next;
***************
*** 162,168 **** SHMQueueNext(SHM_QUEUE *queue, SHM_QUEUE *curElem, Size linkOffset)
* SHMQueueEmpty -- TRUE if queue head is only element, FALSE otherwise
*/
bool
! SHMQueueEmpty(SHM_QUEUE *queue)
{
Assert(ShmemAddrIsValid(queue));
--- 160,166 ----
* SHMQueueEmpty -- TRUE if queue head is only element, FALSE otherwise
*/
bool
! SHMQueueEmpty(const SHM_QUEUE *queue)
{
Assert(ShmemAddrIsValid(queue));
*** a/src/backend/storage/lmgr/Makefile
--- b/src/backend/storage/lmgr/Makefile
***************
*** 12,18 **** subdir = src/backend/storage/lmgr
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
! OBJS = lmgr.o lock.o proc.o deadlock.o lwlock.o spin.o s_lock.o
include $(top_srcdir)/src/backend/common.mk
--- 12,18 ----
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
! OBJS = lmgr.o lock.o proc.o deadlock.o lwlock.o spin.o s_lock.o predicate.o
include $(top_srcdir)/src/backend/common.mk
*** /dev/null
--- b/src/backend/storage/lmgr/predicate.c
***************
*** 0 ****
--- 1,3114 ----
+ /*-------------------------------------------------------------------------
+ *
+ * predicate.c
+ * POSTGRES predicate locking
+ * to support full serializable transaction isolation
+ *
+ *
+ * The approach taken is to implement Serializable Snapshot Isolation (SSI)
+ * as initially described in this paper:
+ *
+ * Michael J. Cahill, Uwe Röhm, and Alan D. Fekete. 2008.
+ * Serializable isolation for snapshot databases.
+ * In SIGMOD ’08: Proceedings of the 2008 ACM SIGMOD
+ * international conference on Management of data,
+ * pages 729–738, New York, NY, USA. ACM.
+ * http://doi.acm.org/10.1145/1376616.1376690
+ *
+ * and further elaborated in Cahill's doctoral thesis:
+ *
+ * Michael James Cahill. 2009.
+ * Serializable Isolation for Snapshot Databases.
+ * Sydney Digital Theses.
+ * University of Sydney, School of Information Technologies.
+ * http://hdl.handle.net/2123/5353
+ *
+ *
+ * Predicate locks for Serializable Snapshot Isolation (SSI) are SIREAD
+ * locks, which are so different from normal locks that a distinct set of
+ * structures is required to handle them. They are needed to detect
+ * rw-conflicts when the read happens before the write. (When the write
+ * occurs first, the reading transaction can check for a conflict by
+ * examining the MVCC data.)
+ *
+ * (1) Besides tuples actually read, they must cover ranges of tuples
+ * which would have been read based on the predicate. This will
+ * require modelling the predicates through locks against database
+ * objects such as pages, index ranges, or entire tables.
+ *
+ * (2) They must be kept in RAM for quick access. Because of this, it
+ * isn't possible to always maintain tuple-level granularity -- when
+ * the space allocated to store these approaches exhaustion, a
+ * request for a lock may need to scan for situations where a single
+ * transaction holds many fine-grained locks which can be coalesced
+ * into a single coarser-grained lock.
+ *
+ * (3) They never block anything; they are more like flags than locks
+ * in that regard; although they refer to database objects and are
+ * used to identify rw-conflicts with normal write locks.
+ *
+ * (4) While they are associated with a transaction, they must survive
+ * a successful COMMIT of that transaction, and remain until all
+ * overlapping transactions complete. This even means that they
+ * must survive termination of the transaction's process. If a
+ * top level transaction is rolled back, however, it is immediately
+ * flagged so that it can be ignored, and its SIREAD locks can be
+ * released any time after that.
+ *
+ * (5) The only transactions which create SIREAD locks or check for
+ * conflicts with them are serializable transactions.
+ *
+ * (6) When a write lock for a top level transaction is found to cover
+ * an existing SIREAD lock for the same transaction, the SIREAD lock
+ * can be deleted.
+ *
+ * (7) A write from a serializable transaction must ensure that a xact
+ * record exists for the transaction, with the same lifespan (until
+ * all concurrent transaction complete or the transaction is rolled
+ * back) so that rw-dependencies to that transaction can be
+ * detected.
+ *
+ *
+ * Lightweight locks to manage access to the predicate locking shared
+ * memory objects must be taken in this order, and should be released in
+ * reverse order:
+ *
+ * SerializableFinishedListLock
+ * - Protects the list of transactions which have completed but which
+ * may yet matter because they overlap still-active transactions.
+ *
+ * SerializablePredicateLockListLock
+ * - Protects the linked list of locks held by a transaction. Note
+ * that the locks themselves are also covered by the partition
+ * locks of their respective lock targets; this lock only affects
+ * the linked list connecting the locks related to a transaction.
+ * - All transactions share this single lock (with no partitioning).
+ * - There is never a need for a process other than the one running
+ * a transaction to walk the list of locks held by that
+ * transaction.
+ * - It is relatively infrequent that another process needs to
+ * modify the list for a transaction, but it does happen for such
+ * things as index page splits for pages with predicate locks and
+ * freeing of predicate locked pages by a vacuum process. When
+ * removing a lock in such cases, the lock itself contains the
+ * pointers needed to remove it from the list. When adding a
+ * lock in such cases, the lock can be added using the anchor in
+ * the transaction structure.
+ * - Cleaning up the list for a terminated transaction is *not* done
+ * on a retail basis, so no lock is required there.
+ * - Due to the above, a process accessing its active transaction's
+ * list always uses a shared lock, regardless of whether it is
+ * walking or maintaining the list. This improves concurrency
+ * for the common access patterns.
+ * - A process which needs to alter the list of a transaction other
+ * than its own active transaction must acquire an exclusive
+ * lock.
+ *
+ * FirstPredicateLockMgrLock based partition locks
+ * - The same lock protects a target, all locks on that target, and
+ * the linked list of locks on the target..
+ * - When more than one is needed, acquire in ascending order.
+ *
+ * SerializableXactHashLock
+ * - Protects both PredTran and SerializableXidHash.
+ *
+ *
+ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * $PostgreSQL$
+ *
+ *-------------------------------------------------------------------------
+ */
+ /*
+ * INTERFACE ROUTINES
+ *
+ * housekeeping for setting up shared memory predicate lock structures
+ * InitPredicateLocks(void)
+ * PredicateLockShmemSize(void)
+ *
+ * predicate lock reporting
+ * GetPredicateLockStatusData(void)
+ * PageIsPredicateLocked(Relation relation, BlockNumber blkno)
+ *
+ * predicate lock maintenance
+ * RegisterSerializableTransaction(Snapshot snapshot)
+ * RegisterPredicateLockingXid(void)
+ * PredicateLockRelation(Relation relation)
+ * PredicateLockPage(Relation relation, BlockNumber blkno)
+ * PredicateLockTuple(Relation relation, HeapTuple tuple)
+ * PredicateLockPageSplit(Relation relation, BlockNumber oldblkno,
+ * BlockNumber newblkno);
+ * PredicateLockPageCombine(Relation relation, BlockNumber oldblkno,
+ * BlockNumber newblkno);
+ * ReleasePredicateLocks(bool isCommit)
+ *
+ * conflict detection (may also trigger rollback)
+ * CheckForSerializableConflictOut(bool valid, Relation relation,
+ * HeapTupleData *tup, Buffer buffer)
+ * CheckForSerializableConflictIn(Relation relation, HeapTupleData *tup,
+ * Buffer buffer)
+ *
+ * final rollback checking
+ * PreCommit_CheckForSerializationFailure(void)
+ */
+
+ #include "postgres.h"
+
+ #include "access/subtrans.h"
+ #include "access/transam.h"
+ #include "access/twophase.h"
+ #include "access/xact.h"
+ #include "miscadmin.h"
+ #include "storage/bufmgr.h"
+ #include "storage/predicate.h"
+ #include "storage/predicate_internals.h"
+ #include "storage/procarray.h"
+ #include "utils/rel.h"
+ #include "utils/snapmgr.h"
+
+
+ /*
+ * Test the most selective fields first, for performance.
+ *
+ * a is covered by b if all of the following hold:
+ * 1) a.database = b.database
+ * 2) a.relation = b.relation
+ * 3) b.offset is invalid (b is page-granularity or higher)
+ * 4) either of the following:
+ * 4a) a.offset is valid (a is tuple-granularity) and a.page = b.page
+ * or 4b) a.offset is invalid and b.page is invalid (a is
+ * page-granularity and b is relation-granularity
+ */
+ #define TargetTagIsCoveredBy(covered_target, covering_target) \
+ ((GET_PREDICATELOCKTARGETTAG_RELATION(covered_target) == /* (2) */ \
+ GET_PREDICATELOCKTARGETTAG_RELATION(covering_target)) \
+ && (GET_PREDICATELOCKTARGETTAG_OFFSET(covering_target) == \
+ InvalidOffsetNumber) /* (3) */ \
+ && (((GET_PREDICATELOCKTARGETTAG_OFFSET(covered_target) != \
+ InvalidOffsetNumber) /* (4a) */ \
+ && (GET_PREDICATELOCKTARGETTAG_PAGE(covering_target) == \
+ GET_PREDICATELOCKTARGETTAG_PAGE(covered_target))) \
+ || ((GET_PREDICATELOCKTARGETTAG_PAGE(covering_target) == \
+ InvalidBlockNumber) /* (4b) */ \
+ && (GET_PREDICATELOCKTARGETTAG_PAGE(covered_target) \
+ != InvalidBlockNumber))) \
+ && (GET_PREDICATELOCKTARGETTAG_DB(covered_target) == /* (1) */ \
+ GET_PREDICATELOCKTARGETTAG_DB(covering_target)))
+
+ /*
+ * The predicate locking target and lock shared hash tables are partitioned to
+ * reduce contention. To determine which partition a given target belongs to,
+ * compute the tag's hash code with PredicateLockTargetTagHashCode(), then
+ * apply one of these macros.
+ * NB: NUM_PREDICATELOCK_PARTITIONS must be a power of 2!
+ */
+ #define PredicateLockHashPartition(hashcode) \
+ ((hashcode) % NUM_PREDICATELOCK_PARTITIONS)
+ #define PredicateLockHashPartitionLock(hashcode) \
+ ((LWLockId) (FirstPredicateLockMgrLock + PredicateLockHashPartition(hashcode)))
+
+ #define NPREDICATELOCKTARGETENTS() \
+ mul_size(max_predicate_locks_per_xact, add_size(MaxBackends, max_prepared_xacts))
+
+ #define SxactIsOnFinishedList(sxact) (!SHMQueueIsDetached(&((sxact)->finishedLink)))
+
+ #define SxactIsCommitted(sxact) (((sxact)->flags & SXACT_FLAG_COMMITTED) != 0)
+ #define SxactIsRolledBack(sxact) (((sxact)->flags & SXACT_FLAG_ROLLED_BACK) != 0)
+ #define SxactIsReadOnly(sxact) (((sxact)->flags & SXACT_FLAG_READ_ONLY) != 0)
+ #define SxactHasConflictOut(sxact) (((sxact)->flags & SXACT_FLAG_CONFLICT_OUT) != 0)
+ #define SxactIsDeferrableWaiting(sxact) (((sxact)->flags & SXACT_FLAG_DEFERRABLE_WAITING) != 0)
+ #define SxactIsROSafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_SAFE) != 0)
+ #define SxactIsROUnsafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_UNSAFE) != 0)
+
+ #define SxactCommittedBefore(sxactPivotOut, sxactOther) \
+ ((!TransactionIdIsValid((sxactOther)->finishedBefore)) \
+ || TransactionIdPrecedesOrEquals((sxactPivotOut)->finishedBefore, \
+ (sxactOther)->finishedBefore))
+
+ /*
+ * When a public interface method is called for a split on an index relation,
+ * this is the test to see if we should do a quick return.
+ */
+ #define SkipSplitTracking(relation) \
+ (((relation)->rd_id < FirstBootstrapObjectId) \
+ || RelationUsesLocalBuffers(relation))
+
+ /*
+ * When a public interface method is called for serializing a relation within
+ * the current transaction, this is the test to see if we should do a quick
+ * return.
+ */
+ #define SkipSerialization(relation) \
+ ((!IsolationIsSerializable()) \
+ || ((MySerializableXact == InvalidSerializableXact)) \
+ || ReleasePredicateLocksIfROSafe() \
+ || SkipSplitTracking(relation))
+
+
+ /*
+ * Compute the hash code associated with a PREDICATELOCKTARGETTAG.
+ *
+ * To avoid unnecessary recomputations of the hash code, we try to do this
+ * just once per function, and then pass it around as needed. Aside from
+ * passing the hashcode to hash_search_with_hash_value(), we can extract
+ * the lock partition number from the hashcode.
+ */
+ #define PredicateLockTargetTagHashCode(predicatelocktargettag) \
+ (tag_hash((predicatelocktargettag), sizeof(PREDICATELOCKTARGETTAG)))
+
+ /*
+ * Given a predicate lock tag, and the hash for its target,
+ * compute the lock hash.
+ *
+ * To make the hash code also depend on the transaction, we xor the sxid
+ * struct's address into the hash code, left-shifted so that the
+ * partition-number bits don't change. Since this is only a hash, we
+ * don't care if we lose high-order bits of the address; use an
+ * intermediate variable to suppress cast-pointer-to-int warnings.
+ */
+ #define PredicateLockHashCodeFromTargetHashCode(predicatelocktag, targethash) \
+ ((targethash) ^ ((uint32) PointerGetDatum((predicatelocktag)->myXact)) \
+ << LOG2_NUM_PREDICATELOCK_PARTITIONS)
+
+
+ /* This configuration variable is used to set the predicate lock table size */
+ int max_predicate_locks_per_xact; /* set by guc.c */
+
+ /*
+ * This provides a list of objects in order to track transactions
+ * participating in predicate locking. Entries in the list are fixed size,
+ * and reside in shared memory. The memory address of an entry must remain
+ * fixed during its lifetime. The list will be protected from concurrent
+ * update externally; no provision is made in this code to manage that. The
+ * number of entries in the list, and the size allowed for each entry is
+ * fixed upon creation.
+ */
+ static PredTranList PredTran;
+
+ /*
+ * This provides a pool of RWConflict data elements to use in conflict lists
+ * between transactions.
+ */
+ static RWConflictPoolHeader RWConflictPool;
+
+ /*
+ * The predicate locking hash tables are in shared memory.
+ * Each backend keeps pointers to them.
+ */
+ static HTAB *SerializableXidHash;
+ static HTAB *PredicateLockTargetHash;
+ static HTAB *PredicateLockHash;
+ static SHM_QUEUE *FinishedSerializableTransactions;
+
+ /*
+ * The local hash table used to determine when to combine multiple fine-
+ * grained locks into a single courser-grained lock.
+ */
+ static HTAB *LocalPredicateLockHash = NULL;
+
+ /*
+ * Keep a pointer to the currently-running serializable transaction (if any)
+ * for quick reference.
+ * TODO SSI: Remove volatile qualifier and the then-unnecessary casts?
+ */
+ static volatile SERIALIZABLEXACT *MySerializableXact = InvalidSerializableXact;
+
+
+ /* local functions */
+
+ static SERIALIZABLEXACT *CreatePredTran(void);
+ static void ReleasePredTran(SERIALIZABLEXACT *sxact);
+ static SERIALIZABLEXACT *FirstPredTran(void);
+ static SERIALIZABLEXACT *NextPredTran(SERIALIZABLEXACT *sxact);
+
+ static bool RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer);
+ static void SetRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer);
+ static void SetPossibleUnsafeConflict(SERIALIZABLEXACT *roXact, SERIALIZABLEXACT *activeXact);
+ static void ReleaseRWConflict(RWConflict conflict);
+ static void FlagSxactUnsafe(SERIALIZABLEXACT *sxact);
+
+ static uint32 predicatelock_hash(const void *key, Size keysize);
+ static void RegisterSerializableTransactionInt(const Snapshot snapshot);
+ static bool PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag);
+ static bool GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag,
+ PREDICATELOCKTARGETTAG *parent);
+ static bool CoarserLockCovers(const PREDICATELOCKTARGETTAG *newtargettag);
+ static void DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag);
+ static int PredicateLockPromotionThreshold(const PREDICATELOCKTARGETTAG *tag);
+ static bool CheckAndPromotePredicateLockRequest(const PREDICATELOCKTARGETTAG *reqtag);
+ static void DecrementParentLocks(const PREDICATELOCKTARGETTAG *targettag);
+ static void PredicateLockAcquire(const PREDICATELOCKTARGETTAG *targettag);
+ static void SetNewSxactGlobalXmin(void);
+ static bool ReleasePredicateLocksIfROSafe(void);
+ static void ClearOldPredicateLocks(void);
+ static void ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial);
+ static bool XidIsConcurrent(TransactionId xid);
+ static void CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag);
+ static void FlagRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer);
+ static void OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
+ const SERIALIZABLEXACT *writer);
+
+
+ /*
+ * These functions are a simple implementation of a list for this specific
+ * type of struct. If there is ever a generalized shared memory list, we
+ * should probably switch to that.
+ */
+ static SERIALIZABLEXACT *
+ CreatePredTran(void)
+ {
+ PredTranListElement ptle;
+
+ ptle = (PredTranListElement)
+ SHMQueueNext(&PredTran->availableList,
+ &PredTran->availableList,
+ offsetof(PredTranListElementData, link));
+ if (!ptle)
+ return NULL;
+
+ SHMQueueDelete(&ptle->link);
+ SHMQueueInsertBefore(&PredTran->activeList, &ptle->link);
+ return &ptle->sxact;
+ }
+
+ static void
+ ReleasePredTran(SERIALIZABLEXACT *sxact)
+ {
+ PredTranListElement ptle;
+
+ Assert(ShmemAddrIsValid(sxact));
+
+ ptle = (PredTranListElement)
+ (((char *) sxact)
+ - offsetof(PredTranListElementData, sxact)
+ +offsetof(PredTranListElementData, link));
+ SHMQueueDelete(&ptle->link);
+ SHMQueueInsertBefore(&PredTran->availableList, &ptle->link);
+ }
+
+ static SERIALIZABLEXACT *
+ FirstPredTran(void)
+ {
+ PredTranListElement ptle;
+
+ ptle = (PredTranListElement)
+ SHMQueueNext(&PredTran->activeList,
+ &PredTran->activeList,
+ offsetof(PredTranListElementData, link));
+ if (!ptle)
+ return NULL;
+
+ return &ptle->sxact;
+ }
+
+ static SERIALIZABLEXACT *
+ NextPredTran(SERIALIZABLEXACT *sxact)
+ {
+ PredTranListElement ptle;
+
+ Assert(ShmemAddrIsValid(sxact));
+
+ ptle = (PredTranListElement)
+ (((char *) sxact)
+ - offsetof(PredTranListElementData, sxact)
+ +offsetof(PredTranListElementData, link));
+ ptle = (PredTranListElement)
+ SHMQueueNext(&PredTran->activeList,
+ &ptle->link,
+ offsetof(PredTranListElementData, link));
+ if (!ptle)
+ return NULL;
+
+ return &ptle->sxact;
+ }
+
+
+ /*
+ * These functions manage primitive access to the RWConflict pool and lists.
+ */
+ static bool
+ RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer)
+ {
+ RWConflict conflict;
+
+ Assert(reader != writer);
+
+ /* Check the ends of the purported conflict first. */
+ if (SxactIsRolledBack(reader)
+ || SxactIsRolledBack(writer)
+ || SHMQueueEmpty(&reader->outConflicts)
+ || SHMQueueEmpty(&writer->inConflicts))
+ return false;
+
+ /* A conflict is possible; walk the list to find out. */
+ conflict = (RWConflict)
+ SHMQueueNext(&reader->outConflicts,
+ &reader->outConflicts,
+ offsetof(RWConflictData, outLink));
+ while (conflict)
+ {
+ if (conflict->sxactIn == writer)
+ return true;
+ conflict = (RWConflict)
+ SHMQueueNext(&reader->outConflicts,
+ &conflict->outLink,
+ offsetof(RWConflictData, outLink));
+ }
+
+ /* No conflict found. */
+ return false;
+ }
+
+ static void
+ SetRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer)
+ {
+ RWConflict conflict;
+
+ Assert(reader != writer);
+ Assert(!RWConflictExists(reader, writer));
+
+ conflict = (RWConflict)
+ SHMQueueNext(&RWConflictPool->availableList,
+ &RWConflictPool->availableList,
+ offsetof(RWConflictData, outLink));
+ if (!conflict)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("not enough elements in RWConflictPool to record a rw-conflict")));
+
+ SHMQueueDelete(&conflict->outLink);
+
+ conflict->sxactOut = reader;
+ conflict->sxactIn = writer;
+ SHMQueueInsertBefore(&reader->outConflicts, &conflict->outLink);
+ SHMQueueInsertBefore(&writer->inConflicts, &conflict->inLink);
+ }
+
+ static void
+ SetPossibleUnsafeConflict(SERIALIZABLEXACT *roXact,
+ SERIALIZABLEXACT *activeXact)
+ {
+ RWConflict conflict;
+
+ Assert(roXact != activeXact);
+ Assert(SxactIsReadOnly(roXact));
+ Assert(!SxactIsReadOnly(activeXact));
+
+ conflict = (RWConflict)
+ SHMQueueNext(&RWConflictPool->availableList,
+ &RWConflictPool->availableList,
+ offsetof(RWConflictData, outLink));
+ if (!conflict)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("not enough elements in RWConflictPool to record a potential conflict with a DEFERRABLE snapshot")));
+
+ SHMQueueDelete(&conflict->outLink);
+
+ conflict->sxactOut = activeXact;
+ conflict->sxactIn = roXact;
+ SHMQueueInsertBefore(&activeXact->possibleUnsafeConflicts,
+ &conflict->outLink);
+ SHMQueueInsertBefore(&roXact->possibleUnsafeConflicts,
+ &conflict->inLink);
+ }
+
+ static void
+ ReleaseRWConflict(RWConflict conflict)
+ {
+ conflict->sxactOut = conflict->sxactIn = InvalidSerializableXact;
+ SHMQueueDelete(&conflict->inLink);
+ SHMQueueDelete(&conflict->outLink);
+ SHMQueueInsertBefore(&RWConflictPool->availableList, &conflict->outLink);
+ }
+
+ static void
+ FlagSxactUnsafe(SERIALIZABLEXACT *sxact)
+ {
+ RWConflict conflict,
+ nextConflict;
+
+ Assert(SxactIsReadOnly(sxact));
+ Assert(!SxactIsROSafe(sxact));
+
+ sxact->flags |= SXACT_FLAG_RO_UNSAFE;
+
+ /*
+ * We know this isn't a safe snapshot, so we can stop looking for other
+ * potential conflicts.
+ */
+ conflict = (RWConflict)
+ SHMQueueNext(&sxact->possibleUnsafeConflicts,
+ &sxact->possibleUnsafeConflicts,
+ offsetof(RWConflictData, inLink));
+ while (conflict)
+ {
+ nextConflict = (RWConflict)
+ SHMQueueNext(&sxact->possibleUnsafeConflicts,
+ &conflict->inLink,
+ offsetof(RWConflictData, inLink));
+
+ Assert(!SxactIsReadOnly(conflict->sxactOut));
+ Assert(sxact == conflict->sxactIn);
+
+ ReleaseRWConflict(conflict);
+
+ conflict = nextConflict;
+ }
+ }
+
+ /*
+ * InitPredicateLocks -- Initialize the predicate locking data structures.
+ *
+ * This is called from CreateSharedMemoryAndSemaphores(), which see for
+ * more comments. In the normal postmaster case, the shared hash tables
+ * are created here. Backends inherit the pointers
+ * to the shared tables via fork(). In the EXEC_BACKEND case, each
+ * backend re-executes this code to obtain pointers to the already existing
+ * shared hash tables.
+ */
+ void
+ InitPredicateLocks(void)
+ {
+ HASHCTL info;
+ int hash_flags;
+ long init_table_size,
+ max_table_size;
+ Size requestSize;
+ bool found;
+
+ /*
+ * Compute init/max size to request for predicate lock target hashtable.
+ * Note these calculations must agree with PredicateLockShmemSize!
+ */
+ max_table_size = NPREDICATELOCKTARGETENTS();
+ init_table_size = max_table_size / 2;
+
+ /*
+ * Allocate hash table for PREDICATELOCKTARGET structs. This stores
+ * per-predicate-lock-target information.
+ */
+ MemSet(&info, 0, sizeof(info));
+ info.keysize = sizeof(PREDICATELOCKTARGETTAG);
+ info.entrysize = sizeof(PREDICATELOCKTARGET);
+ info.hash = tag_hash;
+ info.num_partitions = NUM_PREDICATELOCK_PARTITIONS;
+ hash_flags = (HASH_ELEM | HASH_FUNCTION | HASH_PARTITION);
+
+ PredicateLockTargetHash = ShmemInitHash("PREDICATELOCKTARGET hash",
+ init_table_size,
+ max_table_size,
+ &info,
+ hash_flags);
+
+ /* Assume an average of 2 xacts per target */
+ max_table_size *= 2;
+ init_table_size *= 2;
+
+ /*
+ * Allocate hash table for PREDICATELOCK structs. This stores per
+ * xact-lock-of-a-target information.
+ */
+ MemSet(&info, 0, sizeof(info));
+ info.keysize = sizeof(PREDICATELOCKTAG);
+ info.entrysize = sizeof(PREDICATELOCK);
+ info.hash = predicatelock_hash;
+ info.num_partitions = NUM_PREDICATELOCK_PARTITIONS;
+ hash_flags = (HASH_ELEM | HASH_FUNCTION | HASH_PARTITION);
+
+ PredicateLockHash = ShmemInitHash("PREDICATELOCK hash",
+ init_table_size,
+ max_table_size,
+ &info,
+ hash_flags);
+
+ /*
+ * Compute init/max size to request for serializable transaction
+ * hashtable. Note these calculations must agree with
+ * PredicateLockShmemSize!
+ */
+ max_table_size = MaxBackends;
+ init_table_size = max_table_size / 2;
+
+ /*
+ * Allocate a list to hold information on transaction participating in
+ * predicate locking.
+ *
+ * Assume an average of 10 predicate locking transactions per backend.
+ * That may seem high, but each transaction must be kept until every
+ * overlapping predicate locking transaction has completed, so we have to
+ * tolerate the occassional long-running transaction.
+ */
+ max_table_size *= 10;
+ init_table_size *= 10;
+
+ PredTran = ShmemInitStruct("PredTranList",
+ PredTranListDataSize,
+ &found);
+ if (!found)
+ {
+ int i;
+
+ SHMQueueInit(&PredTran->availableList);
+ SHMQueueInit(&PredTran->activeList);
+ PredTran->SxactGlobalXmin = InvalidTransactionId;
+ PredTran->SxactGlobalXminCount = 0;
+ PredTran->WritableSxactCount = 0;
+ PredTran->LastSxactCommitSeqNo = 0;
+ PredTran->CanPartialClearThrough = 0;
+ PredTran->HavePartialClearedThrough = 0;
+ requestSize = mul_size((Size) max_table_size,
+ PredTranListElementDataSize);
+ PredTran->element = ShmemAlloc(requestSize);
+ if (PredTran->element == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("not enough shared memory for elements of data structure"
+ " \"%s\" (%lu bytes requested)",
+ "PredTranList", (unsigned long) requestSize)));
+ /* Add all elements to available list, clean. */
+ memset(PredTran->element, 0, requestSize);
+ for (i = 0; i < max_table_size; i++)
+ {
+ SHMQueueInsertBefore(&(PredTran->availableList),
+ &(PredTran->element[i].link));
+ }
+ }
+
+ /*
+ * Allocate hash table for SERIALIZABLEXID structs. This stores per-xid
+ * information for serializable transactions which have accessed data.
+ */
+ MemSet(&info, 0, sizeof(info));
+ info.keysize = sizeof(SERIALIZABLEXIDTAG);
+ info.entrysize = sizeof(SERIALIZABLEXID);
+ info.hash = tag_hash;
+ hash_flags = (HASH_ELEM | HASH_FUNCTION);
+
+ SerializableXidHash = ShmemInitHash("SERIALIZABLEXID hash",
+ init_table_size,
+ max_table_size,
+ &info,
+ hash_flags);
+
+ /*
+ * Allocate space for tracking rw-conflicts in lists attached to the
+ * transactions.
+ *
+ * TODO SSI: Assume an average of 5 conflicts per transaction. This is
+ * likely to need to be adjusted or configured by a GUC. Gotta start
+ * somewhere....
+ */
+ max_table_size *= 5;
+
+ RWConflictPool = ShmemInitStruct("RWConflictPool",
+ RWConflictPoolHeaderDataSize,
+ &found);
+ if (!found)
+ {
+ int i;
+
+ SHMQueueInit(&RWConflictPool->availableList);
+ requestSize = mul_size((Size) max_table_size,
+ PredTranListElementDataSize);
+ RWConflictPool->element = ShmemAlloc(requestSize);
+ if (RWConflictPool->element == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("not enough shared memory for elements of data structure"
+ " \"%s\" (%lu bytes requested)",
+ "RWConflictPool", (unsigned long) requestSize)));
+ /* Add all elements to available list, clean. */
+ memset(RWConflictPool->element, 0, requestSize);
+ for (i = 0; i < max_table_size; i++)
+ {
+ SHMQueueInsertBefore(&(RWConflictPool->availableList),
+ &(RWConflictPool->element[i].outLink));
+ }
+ }
+
+ /*
+ * Create or attach to the header for the list of finished serializable
+ * transactions.
+ */
+ FinishedSerializableTransactions = (SHM_QUEUE *)
+ ShmemInitStruct("FinishedSerializableTransactions",
+ sizeof(SHM_QUEUE),
+ &found);
+ if (!found)
+ SHMQueueInit(FinishedSerializableTransactions);
+ }
+
+ /*
+ * Estimate shared-memory space used for predicate lock table
+ */
+ Size
+ PredicateLockShmemSize(void)
+ {
+ Size size = 0;
+ long max_table_size;
+
+ /* predicate lock target hash table */
+ max_table_size = NPREDICATELOCKTARGETENTS();
+ size = add_size(size, hash_estimate_size(max_table_size,
+ sizeof(PREDICATELOCKTARGET)));
+
+ /* predicate lock hash table */
+ max_table_size *= 2;
+ size = add_size(size, hash_estimate_size(max_table_size,
+ sizeof(PREDICATELOCK)));
+
+ /*
+ * Since NPREDICATELOCKTARGETENTS is only an estimate, add 10% safety
+ * margin.
+ */
+ size = add_size(size, size / 10);
+
+ /* transaction list */
+ max_table_size = MaxBackends;
+ max_table_size *= 10;
+ size = add_size(size, PredTranListDataSize);
+ size = add_size(size, mul_size((Size) max_table_size,
+ PredTranListElementDataSize));
+
+ /* transaction xid table */
+ size = add_size(size, hash_estimate_size(max_table_size,
+ sizeof(SERIALIZABLEXID)));
+
+ /* Head for list of serializable transactions. */
+ size = add_size(size, sizeof(SHM_QUEUE));
+
+ return size;
+ }
+
+
+ /*
+ * Compute the hash code associated with a PREDICATELOCKTAG.
+ *
+ * Because we want to use just one set of partition locks for both the
+ * PREDICATELOCKTARGET and PREDICATELOCK hash tables, we have to make sure
+ * that PREDICATELOCKs fall into the same partition number as their
+ * associated PREDICATELOCKTARGETs. dynahash.c expects the partition number
+ * to be the low-order bits of the hash code, and therefore a
+ * PREDICATELOCKTAG's hash code must have the same low-order bits as the
+ * associated PREDICATELOCKTARGETTAG's hash code. We achieve this with this
+ * specialized hash function.
+ */
+ static uint32
+ predicatelock_hash(const void *key, Size keysize)
+ {
+ const PREDICATELOCKTAG *predicatelocktag = (const PREDICATELOCKTAG *) key;
+ uint32 targethash;
+
+ Assert(keysize == sizeof(PREDICATELOCKTAG));
+
+ /* Look into the associated target object, and compute its hash code */
+ targethash = PredicateLockTargetTagHashCode(&predicatelocktag->myTarget->tag);
+
+ return PredicateLockHashCodeFromTargetHashCode(predicatelocktag, targethash);
+ }
+
+
+ /*
+ * GetPredicateLockStatusData
+ * Return a table containing the internal state of the predicate
+ * lock manager for use in pg_lock_status.
+ *
+ * Like GetLockStatusData, this function tries to hold the partition LWLocks
+ * for as short a time as possible by returning two arrays that simply
+ * contain the PREDICATELOCKTARGETTAG and SERIALIZABLEXACT for each lock
+ * table entry. Multiple copies of the same PREDICATELOCKTARGETTAG and
+ * SERIALIZABLEXACT will likely appear.
+ */
+ PredicateLockData *
+ GetPredicateLockStatusData(void)
+ {
+ PredicateLockData *data;
+ int i;
+ int els,
+ el;
+ HASH_SEQ_STATUS seqstat;
+ PREDICATELOCK *predlock;
+
+ data = (PredicateLockData *) palloc(sizeof(PredicateLockData));
+
+ /*
+ * Acquire locks. To ensure consistency, take simultaneous locks on
+ * SerializableFinishedListLock, all partition locks in ascending order,
+ * then SerializableXactHashLock.
+ *
+ * TODO SSI: Do we really need to lock SerializableFinishedListLock?
+ */
+ LWLockAcquire(SerializableFinishedListLock, LW_SHARED);
+ for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++)
+ LWLockAcquire(FirstPredicateLockMgrLock + i, LW_SHARED);
+ LWLockAcquire(SerializableXactHashLock, LW_SHARED);
+
+ /* Get number of locks and allocate appropriately-sized arrays. */
+ els = hash_get_num_entries(PredicateLockHash);
+ data->nelements = els;
+ data->locktags = (PREDICATELOCKTARGETTAG *)
+ palloc(sizeof(PREDICATELOCKTARGETTAG) * els);
+ data->xacts = (SERIALIZABLEXACT *)
+ palloc(sizeof(SERIALIZABLEXACT) * els);
+
+
+ /* Scan through PredicateLockHash and copy contents */
+ hash_seq_init(&seqstat, PredicateLockHash);
+
+ el = 0;
+
+ while ((predlock = (PREDICATELOCK *) hash_seq_search(&seqstat)))
+ {
+ data->locktags[el] = predlock->tag.myTarget->tag;
+ data->xacts[el] = *predlock->tag.myXact;
+ el++;
+ }
+
+ Assert(el == els);
+
+ /* Release locks in reverse order */
+ LWLockRelease(SerializableXactHashLock);
+ for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--)
+ LWLockRelease(FirstPredicateLockMgrLock + i);
+ LWLockRelease(SerializableFinishedListLock);
+
+ return data;
+ }
+
+
+ /*
+ * Make sure we have a SERIALIZABLEXACT reference in MySerializableXact.
+ * It should be current for this process and be contained in PredTran.
+ */
+ void
+ RegisterSerializableTransaction(const Snapshot snapshot)
+ {
+ LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+ RegisterSerializableTransactionInt(snapshot);
+ LWLockRelease(SerializableXactHashLock);
+ }
+
+ static void
+ RegisterSerializableTransactionInt(const Snapshot snapshot)
+ {
+ PGPROC *proc;
+ SERIALIZABLEXACTTAG sxacttag;
+ SERIALIZABLEXACT *sxact,
+ *othersxact;
+ HASHCTL hash_ctl;
+
+ /* We only do this for serializable transactions. Once. */
+ Assert(IsolationIsSerializable());
+ Assert(MySerializableXact == InvalidSerializableXact);
+ Assert(LWLockHeldByMe(SerializableXactHashLock));
+
+ proc = MyProc;
+ Assert(proc != NULL);
+ GET_VXID_FROM_PGPROC(sxacttag.vxid, *proc);
+
+
+ /*
+ * If there are no serializable transactions which are not read-only, we
+ * can "opt out" of predicate locking and conflict checking for a
+ * read-only transaction.
+ *
+ * The reason this is safe is that a read-only transaction can only become
+ * part of a dangerous structure if it overlaps a writable transaction
+ * which in turn overlaps a writable transaction which committed before
+ * the read-only transaction started. A new writable transaction can
+ * overlap this one, but it can't meet the other condition of overlapping
+ * a transaction which committed before this one started.
+ */
+ if (XactReadOnly && PredTran->WritableSxactCount == 0)
+ return;
+
+ /* Maintain serializable global xmin info. */
+ if (!TransactionIdIsValid(PredTran->SxactGlobalXmin))
+ {
+ Assert(PredTran->SxactGlobalXminCount == 0);
+ PredTran->SxactGlobalXmin = snapshot->xmin;
+ PredTran->SxactGlobalXminCount = 1;
+ }
+ else if (TransactionIdEquals(snapshot->xmin, PredTran->SxactGlobalXmin))
+ {
+ Assert(PredTran->SxactGlobalXminCount > 0);
+ PredTran->SxactGlobalXminCount++;
+ }
+ else
+ {
+ Assert(TransactionIdFollows(snapshot->xmin, PredTran->SxactGlobalXmin));
+ }
+
+ /*
+ * Set up the serializable transaction information for predicate locking
+ * for the current transaction.
+ */
+ sxact = CreatePredTran();
+ /* TODO SSI: If null, push out committed tran to SLRU summary; retry? */
+ if (!sxact)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of shared memory"),
+ errhint("You might need to increase max_connections.")));
+
+ /* Initialize the structure. */
+ sxact->tag = sxacttag;
+ sxact->SeqNo.lastCommitBeforeSnapshot = PredTran->LastSxactCommitSeqNo;
+ sxact->commitSeqNo = InvalidSerCommitSeqNo;
+ SHMQueueInit(&(sxact->outConflicts));
+ SHMQueueInit(&(sxact->inConflicts));
+ SHMQueueInit(&(sxact->possibleUnsafeConflicts));
+ sxact->topXid = GetTopTransactionIdIfAny();
+ sxact->finishedBefore = InvalidTransactionId;
+ sxact->xmin = snapshot->xmin;
+ sxact->pid = MyProcPid;
+ SHMQueueInit(&(sxact->predicateLocks));
+ SHMQueueElemInit(&(sxact->finishedLink));
+ sxact->flags = 0;
+ if (XactReadOnly)
+ {
+ sxact->flags |= SXACT_FLAG_READ_ONLY;
+
+ /*
+ * Register all concurrent r/w transactions as possible conflicts; if
+ * all of them commit without any outgoing conflicts to earlier
+ * transactions then this snapshot can be deemed safe (and we can run
+ * without tracking predicate locks).
+ */
+ for (othersxact = FirstPredTran();
+ othersxact != NULL;
+ othersxact = NextPredTran(sxact))
+ {
+ if (!SxactIsOnFinishedList(othersxact) &&
+ !SxactIsReadOnly(othersxact))
+ {
+ SetPossibleUnsafeConflict(sxact, othersxact);
+ }
+ }
+ }
+ else
+ {
+ ++(PredTran->WritableSxactCount);
+ Assert(PredTran->WritableSxactCount <= MaxBackends);
+ }
+
+ MySerializableXact = sxact;
+
+ /* Initialized the backend-local hash table of parent locks */
+ Assert(LocalPredicateLockHash == NULL);
+ MemSet(&hash_ctl, 0, sizeof(hash_ctl));
+ hash_ctl.keysize = sizeof(PREDICATELOCKTARGETTAG);
+ hash_ctl.entrysize = sizeof(LOCALPREDICATELOCK);
+ hash_ctl.hash = tag_hash;
+ LocalPredicateLockHash = hash_create("Local predicate lock",
+ max_predicate_locks_per_xact,
+ &hash_ctl,
+ HASH_ELEM | HASH_FUNCTION);
+ }
+
+ /*
+ * Register the top level XID in SerializableXidHash.
+ * Also store it for easy reference in MySerializableXact.
+ */
+ void
+ RegisterPredicateLockingXid(const TransactionId xid)
+ {
+ SERIALIZABLEXIDTAG sxidtag;
+ SERIALIZABLEXID *sxid;
+ bool found;
+
+ /*
+ * If we're not tracking predicate lock data for this transaction, we
+ * should ignore the request and return quickly.
+ */
+ if (MySerializableXact == InvalidSerializableXact)
+ return;
+
+ /* This should only be done once per transaction. */
+ Assert(MySerializableXact->topXid == InvalidTransactionId);
+
+ /* We should have a valid XID and be at the top level. */
+ Assert(TransactionIdIsValid(xid));
+
+ MySerializableXact->topXid = xid;
+
+ sxidtag.xid = xid;
+ LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+ sxid = (SERIALIZABLEXID *) hash_search(SerializableXidHash,
+ &sxidtag,
+ HASH_ENTER, &found);
+ if (!sxid)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of shared memory"),
+ errhint("You might need to increase max_predicate_locks_per_transaction.")));
+
+ Assert(!found);
+
+ /* Initialize the structure. */
+ sxid->myXact = (SERIALIZABLEXACT *) MySerializableXact;
+ LWLockRelease(SerializableXactHashLock);
+ }
+
+
+ /*
+ * Check whether there are any predicate locks held by any transaction
+ * for the page at the given block number.
+ *
+ * Note that the transaction may be completed but not yet subject to
+ * cleanup due to overlapping serializable transactions. This must
+ * return valid information regardless of transaction isolation level.
+ *
+ * Also note that this doesn't check for a conflicting relation lock,
+ * just a lock specifically on the given page.
+ *
+ * One use is to support proper behavior during GiST index vacuum.
+ */
+ bool
+ PageIsPredicateLocked(const Relation relation, const BlockNumber blkno)
+ {
+ PREDICATELOCKTARGETTAG targettag;
+ uint32 targettaghash;
+ LWLockId partitionLock;
+ PREDICATELOCKTARGET *target;
+
+ SET_PREDICATELOCKTARGETTAG_PAGE(targettag,
+ relation->rd_node.dbNode,
+ relation->rd_id,
+ blkno);
+
+ targettaghash = PredicateLockTargetTagHashCode(&targettag);
+ partitionLock = PredicateLockHashPartitionLock(targettaghash);
+ LWLockAcquire(partitionLock, LW_SHARED);
+ target = (PREDICATELOCKTARGET *)
+ hash_search_with_hash_value(PredicateLockTargetHash,
+ &targettag, targettaghash,
+ HASH_FIND, NULL);
+ LWLockRelease(partitionLock);
+
+ return (target != NULL);
+ }
+
+
+ /*
+ * Check whether a particular lock is held by this transaction.
+ */
+ static bool
+ PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag)
+ {
+ LOCALPREDICATELOCK *lock;
+
+ /* check local hash table */
+ lock = (LOCALPREDICATELOCK *) hash_search(LocalPredicateLockHash,
+ targettag,
+ HASH_FIND, NULL);
+
+ if (!lock)
+ return false;
+
+ /*
+ * Found entry in the table, but still need to check whether it's actually
+ * held -- it could just be a parent of some held lock.
+ */
+ return lock->held;
+ }
+
+ /*
+ * Return the parent lock tag in the lock hierarchy: the next coarser
+ * lock that covers the provided tag.
+ *
+ * Returns true and sets *parent to the parent tag if one exists,
+ * returns false if none exists.
+ */
+ static bool
+ GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag,
+ PREDICATELOCKTARGETTAG *parent)
+ {
+ switch (GET_PREDICATELOCKTARGETTAG_TYPE(*tag))
+ {
+ case PREDLOCKTAG_RELATION:
+ /* relation locks have no parent lock */
+ return false;
+
+ case PREDLOCKTAG_PAGE:
+ /* parent lock is relation lock */
+ SET_PREDICATELOCKTARGETTAG_RELATION(*parent,
+ GET_PREDICATELOCKTARGETTAG_DB(*tag),
+ GET_PREDICATELOCKTARGETTAG_RELATION(*tag));
+
+ return true;
+
+ case PREDLOCKTAG_TUPLE:
+ /* parent lock is page lock */
+ SET_PREDICATELOCKTARGETTAG_PAGE(*parent,
+ GET_PREDICATELOCKTARGETTAG_DB(*tag),
+ GET_PREDICATELOCKTARGETTAG_RELATION(*tag),
+ GET_PREDICATELOCKTARGETTAG_PAGE(*tag));
+ return true;
+ }
+
+ /* not reachable */
+ Assert(false);
+ return false;
+ }
+
+ /*
+ * Check whether the lock we are considering is already covered by a
+ * coarser lock for our transaction.
+ */
+ static bool
+ CoarserLockCovers(const PREDICATELOCKTARGETTAG *newtargettag)
+ {
+ PREDICATELOCKTARGETTAG targettag,
+ parenttag;
+
+ targettag = *newtargettag;
+
+ /* check parents iteratively until no more */
+ while (GetParentPredicateLockTag(&targettag, &parenttag))
+ {
+ targettag = parenttag;
+ if (PredicateLockExists(&targettag))
+ return true;
+ }
+
+ /* no more parents to check; lock is not covered */
+ return false;
+ }
+
+
+ /*
+ * Delete child target locks owned by this process.
+ * This implementation is assuming that the usage of each target tag field
+ * is uniform. No need to make this hard if we don't have to.
+ *
+ * We aren't acquiring lightweight locks for the predicate lock or lock
+ * target structures associated with this transaction unless we're going
+ * to modify them, because no other process is permitted to modify our
+ * locks.
+ */
+ static void
+ DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag)
+ {
+ SERIALIZABLEXACT *sxact;
+ PREDICATELOCK *predlock;
+
+ LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
+ sxact = (SERIALIZABLEXACT *) MySerializableXact;
+ predlock = (PREDICATELOCK *)
+ SHMQueueNext(&(sxact->predicateLocks),
+ &(sxact->predicateLocks),
+ offsetof(PREDICATELOCK, xactLink));
+ while (predlock)
+ {
+ SHM_QUEUE *predlocksxactlink;
+ PREDICATELOCK *nextpredlock;
+ PREDICATELOCKTAG oldlocktag;
+ PREDICATELOCKTARGET *oldtarget;
+ PREDICATELOCKTARGETTAG oldtargettag;
+
+ predlocksxactlink = &(predlock->xactLink);
+ nextpredlock = (PREDICATELOCK *)
+ SHMQueueNext(&(sxact->predicateLocks),
+ predlocksxactlink,
+ offsetof(PREDICATELOCK, xactLink));
+
+ oldlocktag = predlock->tag;
+ Assert(oldlocktag.myXact == sxact);
+ oldtarget = oldlocktag.myTarget;
+ oldtargettag = oldtarget->tag;
+
+ if (TargetTagIsCoveredBy(oldtargettag, *newtargettag))
+ {
+ uint32 oldtargettaghash;
+ LWLockId partitionLock;
+ PREDICATELOCK *rmpredlock;
+ PREDICATELOCKTARGET *rmtarget;
+
+ oldtargettaghash = PredicateLockTargetTagHashCode(&oldtargettag);
+ partitionLock = PredicateLockHashPartitionLock(oldtargettaghash);
+
+ LWLockAcquire(partitionLock, LW_EXCLUSIVE);
+
+ SHMQueueDelete(predlocksxactlink);
+ SHMQueueDelete(&(predlock->targetLink));
+ rmpredlock = hash_search_with_hash_value
+ (PredicateLockHash,
+ &oldlocktag,
+ PredicateLockHashCodeFromTargetHashCode(&oldlocktag,
+ oldtargettaghash),
+ HASH_REMOVE, NULL);
+ Assert(rmpredlock == predlock);
+
+ if (SHMQueueEmpty(&oldtarget->predicateLocks))
+ {
+ rmtarget = hash_search_with_hash_value(PredicateLockTargetHash,
+ &oldtargettag,
+ oldtargettaghash,
+ HASH_REMOVE, NULL);
+ Assert(rmtarget == oldtarget);
+ }
+
+ LWLockRelease(partitionLock);
+
+ DecrementParentLocks(&oldtargettag);
+ }
+
+ predlock = nextpredlock;
+ }
+ LWLockRelease(SerializablePredicateLockListLock);
+ }
+
+ /*
+ * Returns the promotion threshold for a given predicate lock
+ * target. This is the number of descendant locks required to promote
+ * to the specified tag. Note that the threshold includes non-direct
+ * descendants, e.g. both tuples and pages for a relation lock.
+ *
+ * TODO SSI: We should do something more intelligent about what the
+ * thresholds are, either making it proportional to the number of
+ * tuples in a page & pages in a relation, or at least making it a
+ * GUC. Currently the threshold is 3 for a page lock, and
+ * max_predicate_locks_per_transaction/2 for a relation lock, chosen
+ * entirely arbitrarily (and without benchmarking).
+ */
+ static int
+ PredicateLockPromotionThreshold(const PREDICATELOCKTARGETTAG *tag)
+ {
+ switch (GET_PREDICATELOCKTARGETTAG_TYPE(*tag))
+ {
+ case PREDLOCKTAG_RELATION:
+ return max_predicate_locks_per_xact / 2;
+
+ case PREDLOCKTAG_PAGE:
+ return 3;
+
+ case PREDLOCKTAG_TUPLE:
+
+ /*
+ * not reachable: nothing is finer-granularity than a tuple, so we
+ * should never try to promote to it.
+ */
+ Assert(false);
+ return 0;
+ }
+
+ /* not reachable */
+ Assert(false);
+ return 0;
+ }
+
+ /*
+ * For all ancestors of a newly-acquired predicate lock, increment
+ * their child count in the parent hash table. If any of them have
+ * more descendants than their promotion threshold, acquire the
+ * coarsest such lock.
+ *
+ * Returns true if a parent lock was acquired and false otherwise.
+ */
+ static bool
+ CheckAndPromotePredicateLockRequest(const PREDICATELOCKTARGETTAG *reqtag)
+ {
+ PREDICATELOCKTARGETTAG targettag,
+ nexttag,
+ promotiontag;
+ LOCALPREDICATELOCK *parentlock;
+ bool found,
+ promote;
+
+ promote = false;
+
+ targettag = *reqtag;
+
+ /* check parents iteratively */
+ while (GetParentPredicateLockTag(&targettag, &nexttag))
+ {
+ targettag = nexttag;
+ parentlock = (LOCALPREDICATELOCK *) hash_search(LocalPredicateLockHash,
+ &targettag,
+ HASH_ENTER,
+ &found);
+ if (!found)
+ {
+ parentlock->held = false;
+ parentlock->childLocks = 1;
+ }
+ else
+ parentlock->childLocks++;
+
+ if (parentlock->childLocks >=
+ PredicateLockPromotionThreshold(&targettag))
+ {
+ /*
+ * We should promote to this parent lock. Continue to check its
+ * ancestors, however, both to get their child counts right and to
+ * check whether we should just go ahead and promote to one of
+ * them.
+ */
+ promotiontag = targettag;
+ promote = true;
+ }
+ }
+
+ if (promote)
+ {
+ /* acquire coarsest ancestor eligible for promotion */
+ PredicateLockAcquire(&promotiontag);
+ return true;
+ }
+ else
+ return false;
+ }
+
+ /*
+ * When releasing a lock, decrement the child count on all ancestor
+ * locks.
+ *
+ * This is called only when releasing a lock via
+ * DeleteChildTargetLocks (i.e. when a lock becomes redundant because
+ * we've acquired its parent, possibly due to promotion) or when a new
+ * MVCC write lock makes the predicate lock unnecessary. There's no
+ * point in calling it when locks are released at transaction end, as
+ * this information is no longer needed.
+ */
+ static void
+ DecrementParentLocks(const PREDICATELOCKTARGETTAG *targettag)
+ {
+ PREDICATELOCKTARGETTAG parenttag,
+ nexttag;
+
+ parenttag = *targettag;
+
+ while (GetParentPredicateLockTag(&parenttag, &nexttag))
+ {
+ uint32 targettaghash;
+ LOCALPREDICATELOCK *parentlock,
+ *rmlock;
+
+ parenttag = nexttag;
+ targettaghash = PredicateLockTargetTagHashCode(&parenttag);
+ parentlock = (LOCALPREDICATELOCK *)
+ hash_search_with_hash_value(LocalPredicateLockHash,
+ &parenttag, targettaghash,
+ HASH_FIND, NULL);
+ Assert(parentlock != NULL);
+ parentlock->childLocks--;
+
+ Assert(parentlock->childLocks >= 0);
+
+ if ((parentlock->childLocks == 0) && (!parentlock->held))
+ {
+ rmlock = (LOCALPREDICATELOCK *)
+ hash_search_with_hash_value(LocalPredicateLockHash,
+ &parenttag, targettaghash,
+ HASH_REMOVE, NULL);
+ Assert(rmlock == parentlock);
+ }
+ }
+ }
+
+ /*
+ * Acquire a predicate lock on the specified target for the current
+ * connection if not already held. Create related serializable transaction
+ * and predicate lock target entries first if missing.
+ */
+ static void
+ PredicateLockAcquire(const PREDICATELOCKTARGETTAG *targettag)
+ {
+ uint32 targettaghash;
+ LWLockId partitionLock;
+ bool found;
+ PREDICATELOCKTARGET *target;
+ PREDICATELOCKTAG locktag;
+ PREDICATELOCK *lock;
+ LOCALPREDICATELOCK *locallock;
+
+ /* Do we have the lock already, or a covering lock? */
+ if (PredicateLockExists(targettag))
+ return;
+
+ if (CoarserLockCovers(targettag))
+ return;
+
+ /* the same hash and LW lock apply to the lock target and the local lock. */
+ targettaghash = PredicateLockTargetTagHashCode(targettag);
+ partitionLock = PredicateLockHashPartitionLock(targettaghash);
+
+ /* Acquire lock in local table */
+ locallock = (LOCALPREDICATELOCK *)
+ hash_search_with_hash_value(LocalPredicateLockHash,
+ targettag, targettaghash,
+ HASH_ENTER, &found);
+ /* We should not hold the lock (but its entry might still exist) */
+ Assert(!found || !locallock->held);
+ locallock->held = true;
+ if (!found)
+ locallock->childLocks = 0;
+
+ LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
+ LWLockAcquire(partitionLock, LW_EXCLUSIVE);
+
+ /* Make sure that the target is represented. */
+ target = (PREDICATELOCKTARGET *)
+ hash_search_with_hash_value(PredicateLockTargetHash,
+ targettag, targettaghash,
+ HASH_ENTER, &found);
+ if (!target)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of shared memory"),
+ errhint("You might need to increase max_predicate_locks_per_transaction.")));
+ if (!found)
+ SHMQueueInit(&(target->predicateLocks));
+
+ /* We've got the sxact and target, make sure they're joined. */
+ locktag.myTarget = target;
+ locktag.myXact = (SERIALIZABLEXACT *) MySerializableXact;
+ lock = (PREDICATELOCK *)
+ hash_search_with_hash_value(PredicateLockHash, &locktag,
+ PredicateLockHashCodeFromTargetHashCode(&locktag, targettaghash),
+ HASH_ENTER, &found);
+ if (!lock)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of shared memory"),
+ errhint("You might need to increase max_predicate_locks_per_transaction.")));
+
+ if (!found)
+ {
+ SHMQueueInsertBefore(&(target->predicateLocks), &(lock->targetLink));
+ SHMQueueInsertBefore((SHM_QUEUE *) &(MySerializableXact->predicateLocks),
+ &(lock->xactLink));
+ }
+
+ LWLockRelease(partitionLock);
+ LWLockRelease(SerializablePredicateLockListLock);
+
+ /*
+ * Lock has been acquired. Check whether it should be promoted to a
+ * coarser granularity, or whether there are finer-granularity locks to
+ * clean up.
+ */
+ if (CheckAndPromotePredicateLockRequest(targettag))
+ {
+ /*
+ * Lock request was promoted to a coarser-granularity lock, and that
+ * lock was acquired. It will delete this lock and any of its
+ * children, so we're done.
+ */
+ }
+ else
+ {
+ /* Clean up any finer-granularity locks */
+ if (GET_PREDICATELOCKTARGETTAG_TYPE(*targettag) != PREDLOCKTAG_TUPLE)
+ DeleteChildTargetLocks(targettag);
+ }
+ }
+
+
+ /*
+ * PredicateLockRelation
+ *
+ * Gets a predicate lock at the relation level.
+ * Skip if not in full serializable transaction isolation level.
+ * Skip if this is a temporary table.
+ * Clear any finer-grained predicate locks this session has on the relation.
+ */
+ void
+ PredicateLockRelation(const Relation relation)
+ {
+ PREDICATELOCKTARGETTAG tag;
+
+ if (SkipSerialization(relation))
+ return;
+
+ SET_PREDICATELOCKTARGETTAG_RELATION(tag,
+ relation->rd_node.dbNode,
+ relation->rd_id);
+ PredicateLockAcquire(&tag);
+ }
+
+ /*
+ * PredicateLockPage
+ *
+ * Gets a predicate lock at the page level.
+ * Skip if not in full serializable transaction isolation level.
+ * Skip if this is a temporary table.
+ * Skip if a coarser predicate lock already covers this page.
+ * Clear any finer-grained predicate locks this session has on the relation.
+ */
+ void
+ PredicateLockPage(const Relation relation, const BlockNumber blkno)
+ {
+ PREDICATELOCKTARGETTAG tag;
+
+ if (SkipSerialization(relation))
+ return;
+
+ SET_PREDICATELOCKTARGETTAG_PAGE(tag,
+ relation->rd_node.dbNode,
+ relation->rd_id,
+ blkno);
+ PredicateLockAcquire(&tag);
+ }
+
+ /*
+ * PredicateLockTuple
+ *
+ * Gets a predicate lock at the tuple level.
+ * Skip if not in full serializable transaction isolation level.
+ * Skip if this is a temporary table.
+ */
+ void
+ PredicateLockTuple(const Relation relation, const HeapTuple tuple)
+ {
+ PREDICATELOCKTARGETTAG tag;
+ ItemPointer tid;
+
+ if (SkipSerialization(relation))
+ return;
+
+ /*
+ * If it's a heap tuple, return if this xact wrote it.
+ */
+ if (relation->rd_index == NULL)
+ {
+ TransactionId xid;
+
+ xid = HeapTupleHeaderGetXmin(tuple->t_data);
+ if (TransactionIdFollowsOrEquals(xid, TransactionXmin))
+ {
+ xid = SubTransGetTopmostTransaction(xid);
+ if (xid == GetTopTransactionIdIfAny())
+ {
+ /* We wrote it; we already have a write lock. */
+ return;
+ }
+ }
+ }
+
+ tid = &(tuple->t_self);
+ SET_PREDICATELOCKTARGETTAG_TUPLE(tag,
+ relation->rd_node.dbNode,
+ relation->rd_id,
+ ItemPointerGetBlockNumber(tid),
+ ItemPointerGetOffsetNumber(tid));
+ PredicateLockAcquire(&tag);
+ }
+
+ /*
+ * PredicateLockPageSplit
+ *
+ * Copies any predicate locks for the old page to the new page.
+ * Skip if this is a temporary table or toast table.
+ *
+ * NOTE: A page split (or overflow) affects all serializable transactions,
+ * even if it occurs in the context of another transaction isolation level.
+ *
+ * NOTE: This currently leaves the local copy of the locks without
+ * information on the new lock which is in shared memory. This could cause
+ * problems if enough page splits occur on locked pages without the processes
+ * which hold the locks getting in and noticing.
+ */
+ void
+ PredicateLockPageSplit(const Relation relation, const BlockNumber oldblkno,
+ const BlockNumber newblkno)
+ {
+ PREDICATELOCKTARGETTAG oldtargettag;
+ PREDICATELOCKTARGETTAG newtargettag;
+ uint32 oldtargettaghash;
+ LWLockId oldpartitionLock;
+ PREDICATELOCKTARGET *oldtarget;
+ uint32 newtargettaghash;
+ LWLockId newpartitionLock;
+
+ if (SkipSplitTracking(relation))
+ return;
+
+ Assert(oldblkno != newblkno);
+ Assert(BlockNumberIsValid(oldblkno));
+ Assert(BlockNumberIsValid(newblkno));
+
+ SET_PREDICATELOCKTARGETTAG_PAGE(oldtargettag,
+ relation->rd_node.dbNode,
+ relation->rd_id,
+ oldblkno);
+ SET_PREDICATELOCKTARGETTAG_PAGE(newtargettag,
+ relation->rd_node.dbNode,
+ relation->rd_id,
+ newblkno);
+
+ oldtargettaghash = PredicateLockTargetTagHashCode(&oldtargettag);
+ newtargettaghash = PredicateLockTargetTagHashCode(&newtargettag);
+ oldpartitionLock = PredicateLockHashPartitionLock(oldtargettaghash);
+ newpartitionLock = PredicateLockHashPartitionLock(newtargettaghash);
+
+ LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE);
+
+ /*
+ * We must get the partition locks in ascending sequence to avoid
+ * deadlocks. If old and new partitions are the same, we must request the
+ * lock only once.
+ */
+ if (oldpartitionLock < newpartitionLock)
+ {
+ LWLockAcquire(oldpartitionLock, LW_SHARED);
+ LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
+ }
+ else if (oldpartitionLock > newpartitionLock)
+ {
+ LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
+ LWLockAcquire(oldpartitionLock, LW_SHARED);
+ }
+ else
+ LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
+
+ /*
+ * Look for the old target. If not found, that's OK; no predicate locks
+ * are affected, so we can just clean up and return. If it does exist,
+ * walk its list of predicate locks and create new ones for the new block
+ * number.
+ */
+ oldtarget = hash_search_with_hash_value(PredicateLockTargetHash,
+ &oldtargettag,
+ oldtargettaghash,
+ HASH_FIND, NULL);
+ if (oldtarget)
+ {
+ PREDICATELOCKTARGET *newtarget;
+ bool found;
+ PREDICATELOCK *oldpredlock;
+ PREDICATELOCKTAG newpredlocktag;
+
+ newtarget = hash_search_with_hash_value(PredicateLockTargetHash,
+ &newtargettag,
+ newtargettaghash,
+ HASH_ENTER, &found);
+ Assert(!found);
+ if (!newtarget)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of shared memory"),
+ errhint("You might need to increase max_predicate_locks_per_transaction.")));
+ SHMQueueInit(&(newtarget->predicateLocks));
+
+ newpredlocktag.myTarget = newtarget;
+
+ oldpredlock = (PREDICATELOCK *)
+ SHMQueueNext(&(oldtarget->predicateLocks),
+ &(oldtarget->predicateLocks),
+ offsetof(PREDICATELOCK, targetLink));
+ LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+ while (oldpredlock)
+ {
+ SHM_QUEUE *predlocktargetlink;
+ PREDICATELOCK *nextpredlock;
+ PREDICATELOCK *newpredlock;
+
+ predlocktargetlink = &(oldpredlock->targetLink);
+ nextpredlock = (PREDICATELOCK *)
+ SHMQueueNext(&(oldtarget->predicateLocks),
+ predlocktargetlink,
+ offsetof(PREDICATELOCK, targetLink));
+ newpredlocktag.myXact = oldpredlock->tag.myXact;
+
+ newpredlock = (PREDICATELOCK *)
+ hash_search_with_hash_value
+ (PredicateLockHash,
+ &newpredlocktag,
+ PredicateLockHashCodeFromTargetHashCode(&newpredlocktag,
+ newtargettaghash),
+ HASH_ENTER, &found);
+ if (!newpredlock)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of shared memory"),
+ errhint("You might need to increase max_predicate_locks_per_transaction.")));
+ Assert(!found);
+ SHMQueueInsertBefore(&(newtarget->predicateLocks),
+ &(newpredlock->targetLink));
+ SHMQueueInsertBefore(&(newpredlocktag.myXact->predicateLocks),
+ &(newpredlock->xactLink));
+
+ oldpredlock = nextpredlock;
+ }
+ LWLockRelease(SerializableXactHashLock);
+ }
+
+ /* Release partition locks in reverse order of acquisition. */
+ if (oldpartitionLock < newpartitionLock)
+ {
+ LWLockRelease(newpartitionLock);
+ LWLockRelease(oldpartitionLock);
+ }
+ else if (oldpartitionLock > newpartitionLock)
+ {
+ LWLockRelease(oldpartitionLock);
+ LWLockRelease(newpartitionLock);
+ }
+ else
+ LWLockRelease(newpartitionLock);
+ LWLockRelease(SerializablePredicateLockListLock);
+ }
+
+ /*
+ * PredicateLockPageCombine
+ *
+ * Combines predicate locks for two existing pages.
+ * Skip if this is a temporary table or toast table.
+ *
+ * NOTE: A page combine affects all serializable transactions, even if it
+ * occurs in the context of another transaction isolation level.
+ */
+ void
+ PredicateLockPageCombine(const Relation relation, const BlockNumber oldblkno,
+ const BlockNumber newblkno)
+ {
+ PREDICATELOCKTARGETTAG oldtargettag;
+ PREDICATELOCKTARGETTAG newtargettag;
+ uint32 oldtargettaghash;
+ LWLockId oldpartitionLock;
+ PREDICATELOCKTARGET *oldtarget;
+ uint32 newtargettaghash;
+ LWLockId newpartitionLock;
+
+ if (SkipSplitTracking(relation))
+ return;
+
+ Assert(oldblkno != newblkno);
+ Assert(BlockNumberIsValid(oldblkno));
+ Assert(BlockNumberIsValid(newblkno));
+
+ SET_PREDICATELOCKTARGETTAG_PAGE(oldtargettag,
+ relation->rd_node.dbNode,
+ relation->rd_id,
+ oldblkno);
+ SET_PREDICATELOCKTARGETTAG_PAGE(newtargettag,
+ relation->rd_node.dbNode,
+ relation->rd_id,
+ newblkno);
+
+ oldtargettaghash = PredicateLockTargetTagHashCode(&oldtargettag);
+ newtargettaghash = PredicateLockTargetTagHashCode(&newtargettag);
+ oldpartitionLock = PredicateLockHashPartitionLock(oldtargettaghash);
+ newpartitionLock = PredicateLockHashPartitionLock(newtargettaghash);
+
+ LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE);
+
+ /*
+ * We must get the partition locks in ascending sequence to avoid
+ * deadlocks. If old and new partitions are the same, we must request the
+ * lock only once.
+ */
+ if (oldpartitionLock < newpartitionLock)
+ {
+ LWLockAcquire(oldpartitionLock, LW_EXCLUSIVE);
+ LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
+ }
+ else if (oldpartitionLock > newpartitionLock)
+ {
+ LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
+ LWLockAcquire(oldpartitionLock, LW_EXCLUSIVE);
+ }
+ else
+ LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
+
+ /*
+ * Look for the old target. If not found, that's OK; no predicate locks
+ * are affected, so we can just clean up and return. If it does exist,
+ * walk its list of predicate locks and create new ones for the new block
+ * number, while deleting the old ones.
+ */
+ oldtarget = hash_search_with_hash_value(PredicateLockTargetHash,
+ &oldtargettag,
+ oldtargettaghash,
+ HASH_FIND, NULL);
+ if (oldtarget)
+ {
+ PREDICATELOCKTARGET *newtarget;
+ PREDICATELOCK *oldpredlock;
+ PREDICATELOCKTAG newpredlocktag;
+
+ newtarget = hash_search_with_hash_value(PredicateLockTargetHash,
+ &newtargettag,
+ newtargettaghash,
+ HASH_FIND, NULL);
+ Assert(newtarget);
+
+ newpredlocktag.myTarget = newtarget;
+
+ oldpredlock = (PREDICATELOCK *)
+ SHMQueueNext(&(oldtarget->predicateLocks),
+ &(oldtarget->predicateLocks),
+ offsetof(PREDICATELOCK, targetLink));
+ LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+ while (oldpredlock)
+ {
+ SHM_QUEUE *predlocktargetlink;
+ PREDICATELOCK *nextpredlock;
+ PREDICATELOCK *newpredlock;
+ bool found;
+
+ predlocktargetlink = &(oldpredlock->targetLink);
+ nextpredlock = (PREDICATELOCK *)
+ SHMQueueNext(&(oldtarget->predicateLocks),
+ predlocktargetlink,
+ offsetof(PREDICATELOCK, targetLink));
+ newpredlocktag.myXact = oldpredlock->tag.myXact;
+
+ hash_search_with_hash_value
+ (PredicateLockHash,
+ &oldpredlock->tag,
+ PredicateLockHashCodeFromTargetHashCode(&oldpredlock->tag,
+ oldtargettaghash),
+ HASH_REMOVE, NULL);
+
+ newpredlock = (PREDICATELOCK *)
+ hash_search_with_hash_value
+ (PredicateLockHash,
+ &newpredlocktag,
+ PredicateLockHashCodeFromTargetHashCode(&newpredlocktag,
+ newtargettaghash),
+ HASH_ENTER, &found);
+ if (!newpredlock)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of shared memory"),
+ errhint("You might need to increase max_predicate_locks_per_transaction.")));
+ if (!found)
+ {
+ SHMQueueInsertBefore(&(newtarget->predicateLocks),
+ &(newpredlock->targetLink));
+ SHMQueueInsertBefore((SHM_QUEUE *) &(newpredlocktag.myXact->predicateLocks),
+ &(newpredlock->xactLink));
+ }
+
+ oldpredlock = nextpredlock;
+ }
+ LWLockRelease(SerializableXactHashLock);
+ Assert(SHMQueueEmpty(&oldtarget->predicateLocks));
+ hash_search_with_hash_value(PredicateLockTargetHash,
+ &oldtargettag,
+ oldtargettaghash,
+ HASH_REMOVE, NULL);
+ }
+
+ /* Release partition locks in reverse order of acquisition. */
+ if (oldpartitionLock < newpartitionLock)
+ {
+ LWLockRelease(newpartitionLock);
+ LWLockRelease(oldpartitionLock);
+ }
+ else if (oldpartitionLock > newpartitionLock)
+ {
+ LWLockRelease(oldpartitionLock);
+ LWLockRelease(newpartitionLock);
+ }
+ else
+ LWLockRelease(newpartitionLock);
+
+ LWLockRelease(SerializablePredicateLockListLock);
+ }
+
+ /*
+ * Walk the hash table and find the new xmin.
+ */
+ static void
+ SetNewSxactGlobalXmin(void)
+ {
+ SERIALIZABLEXACT *sxact;
+
+ PredTran->SxactGlobalXmin = InvalidTransactionId;
+ PredTran->SxactGlobalXminCount = 0;
+
+ for (sxact = FirstPredTran(); sxact != NULL; sxact = NextPredTran(sxact))
+ {
+ if (!SxactIsRolledBack(sxact) && !SxactIsOnFinishedList(sxact))
+ {
+ if (!TransactionIdIsValid(PredTran->SxactGlobalXmin)
+ || TransactionIdPrecedes(sxact->xmin, PredTran->SxactGlobalXmin))
+ {
+ PredTran->SxactGlobalXmin = sxact->xmin;
+ PredTran->SxactGlobalXminCount = 1;
+ }
+ else if (TransactionIdEquals(sxact->xmin, PredTran->SxactGlobalXmin))
+ PredTran->SxactGlobalXminCount++;
+ }
+ }
+ }
+
+ /*
+ * ReleasePredicateLocks
+ *
+ * Releases predicate locks based on completion of the current
+ * transaction, whether committed or rolled back.
+ *
+ * We do nothing unless this is a serializable transaction.
+ *
+ * For a rollback, the current transaction's predicate locks could be
+ * immediately released; however, we may still have conflict pointers to
+ * our transaction which could be expensive to find and eliminate right
+ * now, so we flag it as rolled back so that it will be ignored, and let
+ * cleanup happen later.
+ *
+ * This method must ensure that shared memory hash tables are cleaned
+ * up in some relatively timely fashion.
+ *
+ * If this transaction is committing and is holding any predicate locks,
+ * it must be added to a list of completed serializable transaction still
+ * holding locks.
+ *
+ * TODO SSI: Some of what this function does should probably be moved to
+ * PreCommit_CheckForSerializationFailure so that it all happens under a
+ * single lock. Anything which needs to run on ROLLBACK, including and
+ * especially resource cleanup, must stay here.
+ */
+ void
+ ReleasePredicateLocks(const bool isCommit)
+ {
+ bool needToClear;
+ RWConflict conflict,
+ nextConflict,
+ possibleUnsafeConflict;
+ SERIALIZABLEXACT *roXact;
+
+ if (MySerializableXact == InvalidSerializableXact)
+ {
+ Assert(LocalPredicateLockHash == NULL);
+ return;
+ }
+
+ Assert(IsolationIsSerializable());
+
+ /* We'd better not already be on the cleanup list. */
+ Assert(!SxactIsOnFinishedList((SERIALIZABLEXACT *) MySerializableXact));
+
+ LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+
+ /*
+ * If it's not a commit it's a rollback, and we can clear our locks
+ * immediately.
+ */
+ if (isCommit)
+ {
+ Assert(!SxactIsRolledBack((SERIALIZABLEXACT *) MySerializableXact));
+ Assert(SxactIsCommitted((SERIALIZABLEXACT *) MySerializableXact));
+ MySerializableXact->commitSeqNo = ++(PredTran->LastSxactCommitSeqNo);
+ /* Recognize implicit read-only transaction (commit without write). */
+ if (!(MySerializableXact->flags & SXACT_FLAG_DID_WRITE))
+ MySerializableXact->flags |= SXACT_FLAG_READ_ONLY;
+ }
+ else
+ {
+ Assert(!SxactIsCommitted((SERIALIZABLEXACT *) MySerializableXact));
+ MySerializableXact->flags |= SXACT_FLAG_ROLLED_BACK;
+ }
+
+ if (!XactReadOnly)
+ {
+ Assert(PredTran->WritableSxactCount > 0);
+ if (--(PredTran->WritableSxactCount) == 0)
+ {
+ /*
+ * Release predicate locks and rw-conflicts in for all committed
+ * transactions. There are no longer any transactions which might
+ * conflict with the locks and no chance for new transactions to
+ * overlap. Similarly, existing conflicts in can't cause pivots,
+ * and any conflicts in which could have completed a dangerous
+ * structure would already have caused a rollback, so any
+ * remaining ones must be benign.
+ */
+ PredTran->CanPartialClearThrough = PredTran->LastSxactCommitSeqNo;
+ }
+
+ /*
+ * Remove ourselves from the list of possible conflicts for concurrent
+ * READ ONLY transactions, flagging them as unsafe if we have a
+ * conflict out. If any are waiting DEFERRABLE transactions, wake them
+ * up if they are known safe or known unsafe.
+ */
+ possibleUnsafeConflict = (RWConflict)
+ SHMQueueNext((SHM_QUEUE *) &MySerializableXact->possibleUnsafeConflicts,
+ (SHM_QUEUE *) &MySerializableXact->possibleUnsafeConflicts,
+ offsetof(RWConflictData, outLink));
+ while (possibleUnsafeConflict)
+ {
+ nextConflict = (RWConflict)
+ SHMQueueNext((SHM_QUEUE *) &MySerializableXact->possibleUnsafeConflicts,
+ &possibleUnsafeConflict->outLink,
+ offsetof(RWConflictData, outLink));
+
+ roXact = possibleUnsafeConflict->sxactIn;
+ Assert(MySerializableXact == possibleUnsafeConflict->sxactOut);
+ Assert(SxactIsReadOnly(roXact));
+
+ /*
+ * Mark conflicted if necessary.
+ *
+ * TODO: Should be sufficient to only do so if out conflict is to
+ * an *earlier* snapshot, but we don't check that yet.
+ */
+ if (isCommit &&
+ (MySerializableXact->flags & SXACT_FLAG_DID_WRITE) &&
+ !(SHMQueueEmpty((SHM_QUEUE *) &MySerializableXact->outConflicts)))
+ {
+ /*
+ * This releases possibleUnsafeConflict (as well as all other
+ * possible conflicts for roXact)
+ */
+ FlagSxactUnsafe(roXact);
+ }
+ else
+ {
+ ReleaseRWConflict(possibleUnsafeConflict);
+
+ /*
+ * If we were the last possible conflict, flag it safe. The
+ * transaction can now safely release its predicate locks (but
+ * that transaction's backend has to do that itself).
+ */
+ if (SHMQueueEmpty(&roXact->possibleUnsafeConflicts))
+ roXact->flags |= SXACT_FLAG_RO_SAFE;
+ }
+
+ /*
+ * Wake up the process for a waiting DEFERRABLE transaction if we
+ * now know it's either safe or conflicted.
+ */
+ if (SxactIsDeferrableWaiting(roXact) &&
+ (SxactIsROUnsafe(roXact) || SxactIsROSafe(roXact)))
+ ProcSendSignal(roXact->pid);
+
+ possibleUnsafeConflict = nextConflict;
+ }
+ }
+ else
+ {
+ /*
+ * Read-only transactions: clear the list of transactions that might
+ * make us unsafe. Note that we use 'inLink' for the iteration as
+ * opposed to 'outLink' for the r/w xacts.
+ */
+ possibleUnsafeConflict = (RWConflict)
+ SHMQueueNext((SHM_QUEUE *) &MySerializableXact->possibleUnsafeConflicts,
+ (SHM_QUEUE *) &MySerializableXact->possibleUnsafeConflicts,
+ offsetof(RWConflictData, inLink));
+ while (possibleUnsafeConflict)
+ {
+ nextConflict = (RWConflict)
+ SHMQueueNext((SHM_QUEUE *) &MySerializableXact->possibleUnsafeConflicts,
+ &possibleUnsafeConflict->inLink,
+ offsetof(RWConflictData, inLink));
+
+ Assert(!SxactIsReadOnly(possibleUnsafeConflict->sxactOut));
+ Assert(MySerializableXact == possibleUnsafeConflict->sxactIn);
+
+ ReleaseRWConflict(possibleUnsafeConflict);
+
+ possibleUnsafeConflict = nextConflict;
+ }
+ }
+
+ /*
+ * Release all outConflicts from committed transactions. If we're rolling
+ * back clear them all. Set SXACT_FLAG_CONFLICT_OUT if any point to
+ * previously committed transactions.
+ */
+ conflict = (RWConflict)
+ SHMQueueNext((SHM_QUEUE *) &MySerializableXact->outConflicts,
+ (SHM_QUEUE *) &MySerializableXact->outConflicts,
+ offsetof(RWConflictData, outLink));
+ while (conflict)
+ {
+ nextConflict = (RWConflict)
+ SHMQueueNext((SHM_QUEUE *) &MySerializableXact->outConflicts,
+ &conflict->outLink,
+ offsetof(RWConflictData, outLink));
+
+ if (isCommit
+ && !SxactIsReadOnly(conflict->sxactIn)
+ && SxactIsCommitted(conflict->sxactIn))
+ {
+ if ((MySerializableXact->flags & SXACT_FLAG_CONFLICT_OUT) == 0
+ || conflict->sxactIn->commitSeqNo < MySerializableXact->SeqNo.earliestOutConflictCommit)
+ MySerializableXact->SeqNo.earliestOutConflictCommit = conflict->sxactIn->commitSeqNo;
+ MySerializableXact->flags |= SXACT_FLAG_CONFLICT_OUT;
+ }
+
+ if (!isCommit
+ || SxactIsCommitted(conflict->sxactIn)
+ || (conflict->sxactIn->SeqNo.lastCommitBeforeSnapshot >= PredTran->LastSxactCommitSeqNo))
+ ReleaseRWConflict(conflict);
+
+ /* Keep track of highest commitSeqNo which wrote data. */
+ if (isCommit && (MySerializableXact->flags & SXACT_FLAG_DID_WRITE))
+ PredTran->LastWritingCommitSeqNo = PredTran->LastSxactCommitSeqNo;
+
+ conflict = nextConflict;
+ }
+
+ /*
+ * Release all inConflicts from committed transactions. If we're rolling
+ * back, clear them all.
+ */
+ conflict = (RWConflict)
+ SHMQueueNext((SHM_QUEUE *) &MySerializableXact->inConflicts,
+ (SHM_QUEUE *) &MySerializableXact->inConflicts,
+ offsetof(RWConflictData, inLink));
+ while (conflict)
+ {
+ nextConflict = (RWConflict)
+ SHMQueueNext((SHM_QUEUE *) &MySerializableXact->inConflicts,
+ &conflict->inLink,
+ offsetof(RWConflictData, inLink));
+
+ if (!isCommit || SxactIsCommitted(conflict->sxactOut))
+ ReleaseRWConflict(conflict);
+
+ conflict = nextConflict;
+ }
+
+ LWLockRelease(SerializableXactHashLock);
+
+ LWLockAcquire(SerializableFinishedListLock, LW_EXCLUSIVE);
+
+ /* Add this to the list of transactions to check for later cleanup. */
+ if (isCommit)
+ SHMQueueInsertBefore(FinishedSerializableTransactions,
+ (SHM_QUEUE *) &(MySerializableXact->finishedLink));
+
+ /*
+ * Check whether it's time to clean up old transactions. This can only be
+ * done when the last serializable transaction with the oldest xmin among
+ * serializable transactions completes. We then find the "new oldest"
+ * xmin and purge any transactions which finished before this transaction
+ * was launched.
+ */
+ needToClear = false;
+ if (TransactionIdEquals(MySerializableXact->xmin, PredTran->SxactGlobalXmin))
+ {
+ LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+ Assert(PredTran->SxactGlobalXminCount > 0);
+ if (--(PredTran->SxactGlobalXminCount) == 0)
+ {
+ SetNewSxactGlobalXmin();
+ needToClear = true;
+ }
+ LWLockRelease(SerializableXactHashLock);
+ }
+
+ /*
+ * Reality check: There can't be an active serializable transaction older
+ * than the oldest active transaction.
+ */
+ Assert(!TransactionIdIsValid(PredTran->SxactGlobalXmin)
+ || TransactionIdFollowsOrEquals(PredTran->SxactGlobalXmin, RecentGlobalXmin));
+
+ LWLockRelease(SerializableFinishedListLock);
+
+ if (!isCommit)
+ ReleaseOneSerializableXact((SERIALIZABLEXACT *) MySerializableXact, false);
+
+ if (needToClear)
+ ClearOldPredicateLocks();
+
+ MySerializableXact = InvalidSerializableXact;
+
+ /* Delete per-transaction lock table */
+ hash_destroy(LocalPredicateLockHash);
+ LocalPredicateLockHash = NULL;
+ }
+
+ /*
+ * ReleasePredicateLocksIfROSafe
+ * Check if the current transaction is read only and operating on
+ * a safe snapshot. If so, release predicate locks and return
+ * true.
+ *
+ * A transaction is flagged as RO_SAFE if all concurrent R/W
+ * transactions commit without having conflicts out to an earlier
+ * snapshot, thus ensuring that no conflicts are possible for this
+ * transaction. Thus, we call this function as part of the
+ * SkipSerialization check on all public interface methods.
+ */
+ static bool
+ ReleasePredicateLocksIfROSafe(void)
+ {
+ if (SxactIsROSafe(MySerializableXact))
+ {
+ ReleasePredicateLocks(false);
+ return true;
+ }
+ else
+ return false;
+ }
+
+ /*
+ * Clear old predicate locks.
+ */
+ static void
+ ClearOldPredicateLocks(void)
+ {
+ SERIALIZABLEXACT *finishedSxact;
+
+ LWLockAcquire(SerializableFinishedListLock, LW_EXCLUSIVE);
+ finishedSxact = (SERIALIZABLEXACT *)
+ SHMQueueNext(FinishedSerializableTransactions,
+ FinishedSerializableTransactions,
+ offsetof(SERIALIZABLEXACT, finishedLink));
+ LWLockAcquire(SerializableXactHashLock, LW_SHARED);
+ while (finishedSxact)
+ {
+ SERIALIZABLEXACT *nextSxact;
+
+ nextSxact = (SERIALIZABLEXACT *)
+ SHMQueueNext(FinishedSerializableTransactions,
+ &(finishedSxact->finishedLink),
+ offsetof(SERIALIZABLEXACT, finishedLink));
+ if (!TransactionIdIsValid(PredTran->SxactGlobalXmin)
+ || TransactionIdPrecedesOrEquals(finishedSxact->finishedBefore,
+ PredTran->SxactGlobalXmin))
+ {
+ LWLockRelease(SerializableXactHashLock);
+ SHMQueueDelete(&(finishedSxact->finishedLink));
+ ReleaseOneSerializableXact(finishedSxact, false);
+ LWLockAcquire(SerializableXactHashLock, LW_SHARED);
+ }
+ else if (finishedSxact->commitSeqNo > PredTran->HavePartialClearedThrough
+ && finishedSxact->commitSeqNo <= PredTran->CanPartialClearThrough)
+ {
+ LWLockRelease(SerializableXactHashLock);
+ ReleaseOneSerializableXact(finishedSxact, true);
+ PredTran->HavePartialClearedThrough = finishedSxact->commitSeqNo;
+ LWLockAcquire(SerializableXactHashLock, LW_SHARED);
+ }
+ else
+ break;
+ finishedSxact = nextSxact;
+ }
+ LWLockRelease(SerializableXactHashLock);
+ LWLockRelease(SerializableFinishedListLock);
+ }
+
+ /*
+ * This is the normal way to delete anything from any of the predicate
+ * locking hash tables. Given a transaction which we know can be deleted,
+ * delete all predicate locks held by that transaction, and any predicate
+ * lock targets which are now unreferenced by a lock; delete all conflicts
+ * for the transaction; delete all xid values for the transaction; then
+ * delete the transaction.
+ */
+ static void
+ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial)
+ {
+ PREDICATELOCK *predlock;
+ SERIALIZABLEXIDTAG sxidtag;
+ RWConflict conflict,
+ nextConflict;
+
+ Assert(sxact != NULL);
+ Assert(SxactIsRolledBack(sxact) || SxactIsCommitted(sxact));
+
+ LWLockAcquire(SerializablePredicateLockListLock,
+ partial ? LW_EXCLUSIVE : LW_SHARED);
+ predlock = (PREDICATELOCK *)
+ SHMQueueNext(&(sxact->predicateLocks),
+ &(sxact->predicateLocks),
+ offsetof(PREDICATELOCK, xactLink));
+ while (predlock)
+ {
+ PREDICATELOCK *nextpredlock;
+ PREDICATELOCKTAG tag;
+ SHM_QUEUE *targetLink;
+ PREDICATELOCKTARGET *target;
+ PREDICATELOCKTARGETTAG targettag;
+ uint32 targettaghash;
+ LWLockId partitionLock;
+
+ nextpredlock = (PREDICATELOCK *)
+ SHMQueueNext(&(sxact->predicateLocks),
+ &(predlock->xactLink),
+ offsetof(PREDICATELOCK, xactLink));
+
+ if (partial)
+ SHMQueueDelete(&(predlock->xactLink));
+ /*
+ * Else no need to do retail removal of predicate locks from
+ * transaction object; it's going away.
+ */
+
+ tag = predlock->tag;
+ targetLink = &(predlock->targetLink);
+ target = tag.myTarget;
+ targettag = target->tag;
+ targettaghash = PredicateLockTargetTagHashCode(&targettag);
+ partitionLock = PredicateLockHashPartitionLock(targettaghash);
+
+ LWLockAcquire(partitionLock, LW_EXCLUSIVE);
+ SHMQueueDelete(targetLink);
+
+ hash_search_with_hash_value(PredicateLockHash, &tag,
+ PredicateLockHashCodeFromTargetHashCode(&tag,
+ targettaghash),
+ HASH_REMOVE, NULL);
+ if (SHMQueueEmpty(&target->predicateLocks))
+ hash_search_with_hash_value(PredicateLockTargetHash,
+ &targettag, targettaghash, HASH_REMOVE, NULL);
+ LWLockRelease(partitionLock);
+ predlock = nextpredlock;
+ }
+ LWLockRelease(SerializablePredicateLockListLock);
+
+ sxidtag.xid = sxact->topXid;
+ LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+
+ if (!partial)
+ {
+ /* Release all outConflicts. */
+ conflict = (RWConflict)
+ SHMQueueNext((SHM_QUEUE *) &MySerializableXact->outConflicts,
+ (SHM_QUEUE *) &MySerializableXact->outConflicts,
+ offsetof(RWConflictData, outLink));
+ while (conflict)
+ {
+ nextConflict = (RWConflict)
+ SHMQueueNext((SHM_QUEUE *) &MySerializableXact->outConflicts,
+ &conflict->outLink,
+ offsetof(RWConflictData, outLink));
+ ReleaseRWConflict(conflict);
+ conflict = nextConflict;
+ }
+ }
+
+ /* Release all inConflicts. */
+ conflict = (RWConflict)
+ SHMQueueNext((SHM_QUEUE *) &MySerializableXact->inConflicts,
+ (SHM_QUEUE *) &MySerializableXact->inConflicts,
+ offsetof(RWConflictData, inLink));
+ while (conflict)
+ {
+ nextConflict = (RWConflict)
+ SHMQueueNext((SHM_QUEUE *) &MySerializableXact->inConflicts,
+ &conflict->inLink,
+ offsetof(RWConflictData, inLink));
+ ReleaseRWConflict(conflict);
+ conflict = nextConflict;
+ }
+
+ if (!partial)
+ {
+ /* Get rid of the xid and the record of the transaction itself. */
+ if (sxidtag.xid != InvalidTransactionId)
+ hash_search(SerializableXidHash, &sxidtag, HASH_REMOVE, NULL);
+ ReleasePredTran(sxact);
+ }
+
+ LWLockRelease(SerializableXactHashLock);
+ }
+
+ /*
+ * Tests whether the given top level transaction is concurrent with
+ * (overlaps) our current transaction.
+ *
+ * We need to identify the top level transaction for SSI, anyway, so pass
+ * that to this function to save the overhead of checking the snapshot's
+ * subxip array.
+ */
+ static bool
+ XidIsConcurrent(TransactionId xid)
+ {
+ Snapshot snap;
+ uint32 i;
+
+ Assert(TransactionIdIsValid(xid));
+
+ /*
+ * We don't count our own transaction or its subtransactions as
+ * "concurrent".
+ */
+ if (xid == GetTopTransactionIdIfAny())
+ return false;
+
+ snap = GetTransactionSnapshot();
+
+ if (TransactionIdPrecedes(xid, snap->xmin))
+ return false;
+
+ if (TransactionIdFollowsOrEquals(xid, snap->xmax))
+ return true;
+
+ for (i = 0; i < snap->xcnt; i++)
+ {
+ if (xid == snap->xip[i])
+ return true;
+ }
+
+ return false;
+ }
+
+ /*
+ * CheckForSerializableConflictOut
+ * We are reading a tuple which has been modified. If it is visible to
+ * us but has been deleted, that indicates a rw-conflict out. If it's
+ * not visible and was created by a concurrent (overlapping)
+ * serializable transaction, that is also a rw-conflict out,
+ *
+ * The heap tables which we maintain for predicate locking will also be used
+ * to determine that the xmin from a row is related to a serializable
+ * transaction, and will provide a mapping to the top level transaction.
+ *
+ * This function should be called just about anywhere in heapam.c that a
+ * tuple has been read.
+ */
+ void
+ CheckForSerializableConflictOut(const bool valid, const Relation relation,
+ const HeapTuple tuple, const Buffer buffer)
+ {
+ TransactionId xid;
+ SERIALIZABLEXIDTAG sxidtag;
+ SERIALIZABLEXID *sxid;
+ SERIALIZABLEXACT *sxact;
+
+ if (SkipSerialization(relation))
+ return;
+
+ if (valid)
+ {
+ /*
+ * A visible tuple has been modified. This is probably a conflict,
+ * but for updates we'll catch this on the new tuple -- for the sake
+ * of performance we don't want to check it twice. We return unless
+ * this is a tuple delete, in which case there is no new tuple to
+ * trigger the check.
+ */
+ if (!ItemPointerEquals(&(tuple->t_self), &(tuple->t_data->t_ctid)))
+ return;
+
+ /*
+ * We may bail out if previous xmax aborted, or if it committed but
+ * only locked the tuple without updating it.
+ */
+ if (tuple->t_data->t_infomask & (HEAP_XMAX_INVALID | HEAP_IS_LOCKED))
+ return;
+
+ /*
+ * If there's a valid xmax, it must be from a concurrent transaction,
+ * since it deleted a tuple which is visible to us.
+ */
+ xid = HeapTupleHeaderGetXmax(tuple->t_data);
+ if (!TransactionIdIsValid(xid))
+ return;
+ }
+ else
+ {
+ /*
+ * We would read this row, but it isn't visible to us.
+ */
+ xid = HeapTupleHeaderGetXmin(tuple->t_data);
+ }
+
+ /*
+ * Find top level xid. Bail out if xid is too early to be a conflict.
+ */
+ if (TransactionIdPrecedes(xid, TransactionXmin))
+ return;
+ xid = SubTransGetTopmostTransaction(xid);
+ if (TransactionIdPrecedes(xid, TransactionXmin))
+ return;
+
+ /*
+ * It's OK to look for conflicts with a share lock, and record them with
+ * an exclusive lock when found; we just have to release the shared lock
+ * before attempting to get the other lock, to prevent deadlocks. We will
+ * need to recheck that the entry still exists after getting the stronger
+ * lock, just in case it rolled back in the window where we weren't
+ * holding a lock.
+ */
+ sxidtag.xid = xid;
+ LWLockAcquire(SerializableXactHashLock, LW_SHARED);
+ sxid = (SERIALIZABLEXID *)
+ hash_search(SerializableXidHash, &sxidtag, HASH_FIND, NULL);
+ if (!sxid)
+ {
+ /* It's not serializable or otherwise not important. */
+ LWLockRelease(SerializableXactHashLock);
+ return;
+ }
+ sxact = sxid->myXact;
+ if (sxact == MySerializableXact || SxactIsRolledBack(sxact))
+ {
+ /* We can't conflict with our own transaction or one rolled back. */
+ LWLockRelease(SerializableXactHashLock);
+ return;
+ }
+
+ /*
+ * If this is a read-only transaction and the writing transaction has
+ * committed, and it doesn't have a rw-conflict to a transaction which
+ * committed before it, no conflict.
+ */
+ if (SxactIsReadOnly(MySerializableXact)
+ && SxactIsCommitted(sxact)
+ && (!SxactHasConflictOut(sxact)
+ || MySerializableXact->SeqNo.lastCommitBeforeSnapshot < sxact->SeqNo.earliestOutConflictCommit))
+ {
+ /* Read-only transaction will appear to run first. No conflict. */
+ LWLockRelease(SerializableXactHashLock);
+ return;
+ }
+
+ LWLockRelease(SerializableXactHashLock);
+
+ LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+ sxid = (SERIALIZABLEXID *)
+ hash_search(SerializableXidHash, &sxidtag, HASH_FIND, NULL);
+ if (!sxid)
+ {
+ /* It must have been cleaned up, which means it wasn't useful. */
+ LWLockRelease(SerializableXactHashLock);
+ return;
+ }
+ Assert(sxid->myXact == sxact);
+ xid = sxact->topXid;
+ if (!XidIsConcurrent(xid))
+ {
+ /* This write was already in our snapshot; no conflict. */
+ LWLockRelease(SerializableXactHashLock);
+ return;
+ }
+
+ if (RWConflictExists((SERIALIZABLEXACT *) MySerializableXact, sxact))
+ {
+ /* We don't want duplicate conflict records in the list. */
+ LWLockRelease(SerializableXactHashLock);
+ return;
+ }
+
+ /*
+ * Flag the conflict. But first, if this conflict creates a dangerous
+ * structure, ereport an error.
+ */
+ FlagRWConflict((SERIALIZABLEXACT *) MySerializableXact, sxact);
+ LWLockRelease(SerializableXactHashLock);
+ }
+
+ /*
+ * Check a particular target for rw-dependency conflict in.
+ */
+ static void
+ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag)
+ {
+ uint32 targettaghash;
+ LWLockId partitionLock;
+ PREDICATELOCKTARGET *target;
+ PREDICATELOCK *predlock;
+
+ Assert(MySerializableXact != InvalidSerializableXact);
+
+ /* The same hash and LW lock apply to the lock target and the lock itself. */
+ targettaghash = PredicateLockTargetTagHashCode(targettag);
+ partitionLock = PredicateLockHashPartitionLock(targettaghash);
+ LWLockAcquire(partitionLock, LW_SHARED);
+ target = (PREDICATELOCKTARGET *)
+ hash_search_with_hash_value(PredicateLockTargetHash,
+ targettag, targettaghash,
+ HASH_FIND, NULL);
+ if (!target)
+ {
+ /* Nothing has this target locked; we're done here. */
+ LWLockRelease(partitionLock);
+ return;
+ }
+
+ /*
+ * Each lock for an overlapping transaction represents a conflict: a
+ * rw-dependency in to this transaction.
+ */
+ predlock = (PREDICATELOCK *)
+ SHMQueueNext(&(target->predicateLocks),
+ &(target->predicateLocks),
+ offsetof(PREDICATELOCK, targetLink));
+ LWLockAcquire(SerializableXactHashLock, LW_SHARED);
+ while (predlock)
+ {
+ SHM_QUEUE *predlocktargetlink;
+ PREDICATELOCK *nextpredlock;
+ SERIALIZABLEXACT *sxact;
+
+ predlocktargetlink = &(predlock->targetLink);
+ nextpredlock = (PREDICATELOCK *)
+ SHMQueueNext(&(target->predicateLocks),
+ predlocktargetlink,
+ offsetof(PREDICATELOCK, targetLink));
+
+ sxact = predlock->tag.myXact;
+ if (sxact == MySerializableXact)
+ {
+ /*
+ * If we're getting a write lock on the tuple, we don't need a
+ * predicate (SIREAD) lock. At this point our transaction already
+ * has an ExclusiveRowLock on the relation, so we are OK to drop
+ * the predicate lock on the tuple, if found, without fearing that
+ * another write against the tuple will occur before the MVCC
+ * information makes it to the buffer.
+ */
+ if (GET_PREDICATELOCKTARGETTAG_OFFSET(*targettag))
+ {
+ uint32 predlockhashcode;
+ PREDICATELOCKTARGET *rmtarget = NULL;
+ PREDICATELOCK *rmpredlock;
+ LOCALPREDICATELOCK *locallock,
+ *rmlocallock;
+
+ /*
+ * This is a tuple on which we have a tuple predicate lock. We
+ * only have shared LW locks now; release those, and get
+ * exclusive locks only while we modify things.
+ */
+ LWLockRelease(SerializableXactHashLock);
+ LWLockRelease(partitionLock);
+ LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
+ LWLockAcquire(partitionLock, LW_EXCLUSIVE);
+ LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+
+ /*
+ * Remove the predicate lock from shared memory, if it wasn't
+ * removed while the locks were released. One way that could
+ * happen is from autovacuum cleaning up an index.
+ */
+ predlockhashcode = PredicateLockHashCodeFromTargetHashCode
+ (&(predlock->tag), targettaghash);
+ rmpredlock = (PREDICATELOCK *)
+ hash_search_with_hash_value(PredicateLockHash,
+ &(predlock->tag),
+ predlockhashcode,
+ HASH_FIND, NULL);
+ if (rmpredlock)
+ {
+ Assert(rmpredlock == predlock);
+
+ SHMQueueDelete(predlocktargetlink);
+ SHMQueueDelete(&(predlock->xactLink));
+
+ rmpredlock = (PREDICATELOCK *)
+ hash_search_with_hash_value(PredicateLockHash,
+ &(predlock->tag),
+ predlockhashcode,
+ HASH_REMOVE, NULL);
+ Assert(rmpredlock == predlock);
+
+ /*
+ * When a target is no longer used, remove it.
+ */
+ if (SHMQueueEmpty(&target->predicateLocks))
+ {
+ rmtarget = (PREDICATELOCKTARGET *)
+ hash_search_with_hash_value(PredicateLockTargetHash,
+ targettag,
+ targettaghash,
+ HASH_REMOVE, NULL);
+ Assert(rmtarget == target);
+ }
+
+ LWLockRelease(SerializableXactHashLock);
+ LWLockRelease(partitionLock);
+ LWLockRelease(SerializablePredicateLockListLock);
+
+ locallock = (LOCALPREDICATELOCK *)
+ hash_search_with_hash_value(LocalPredicateLockHash,
+ targettag, targettaghash,
+ HASH_FIND, NULL);
+ Assert(locallock != NULL);
+ Assert(locallock->held);
+ locallock->held = false;
+
+ if (locallock->childLocks == 0)
+ {
+ rmlocallock = (LOCALPREDICATELOCK *)
+ hash_search_with_hash_value(LocalPredicateLockHash,
+ targettag, targettaghash,
+ HASH_REMOVE, NULL);
+ Assert(rmlocallock == locallock);
+ }
+
+ DecrementParentLocks(targettag);
+
+ /*
+ * If we've cleaned up the last of the predicate locks for
+ * the target, bail out before re-acquiring the locks.
+ */
+ if (rmtarget)
+ return;
+
+ /*
+ * The list has been altered. Start over at the front.
+ */
+ LWLockAcquire(partitionLock, LW_SHARED);
+ nextpredlock = (PREDICATELOCK *)
+ SHMQueueNext(&(target->predicateLocks),
+ &(target->predicateLocks),
+ offsetof(PREDICATELOCK, targetLink));
+
+ LWLockAcquire(SerializableXactHashLock, LW_SHARED);
+ }
+ else
+ {
+ /*
+ * The predicate lock was cleared while we were attempting
+ * to upgrade our lightweight locks. Revert to the shared
+ * locks.
+ */
+ LWLockRelease(SerializableXactHashLock);
+ LWLockRelease(partitionLock);
+ LWLockRelease(SerializablePredicateLockListLock);
+ LWLockAcquire(partitionLock, LW_SHARED);
+ LWLockAcquire(SerializableXactHashLock, LW_SHARED);
+ }
+ }
+ }
+ else if (!SxactIsRolledBack(sxact)
+ && (!SxactIsCommitted(sxact)
+ || TransactionIdPrecedes(GetTransactionSnapshot()->xmin,
+ sxact->finishedBefore))
+ && !RWConflictExists(sxact, (SERIALIZABLEXACT *) MySerializableXact))
+ {
+ LWLockRelease(SerializableXactHashLock);
+ LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+
+ FlagRWConflict(sxact, (SERIALIZABLEXACT *) MySerializableXact);
+
+ LWLockRelease(SerializableXactHashLock);
+ LWLockAcquire(SerializableXactHashLock, LW_SHARED);
+ }
+
+ predlock = nextpredlock;
+ }
+ LWLockRelease(SerializableXactHashLock);
+ LWLockRelease(partitionLock);
+ }
+
+ /*
+ * CheckForSerializableConflictIn
+ * We are writing the given tuple. If that indicates a rw-conflict
+ * in from another serializable transaction, take appropriate action.
+ *
+ * Skip checking for any granularity for which a parameter is missing.
+ *
+ * A tuple update or delete is in conflict if we have a predicate lock
+ * against the relation or page in which the tuple exists, or against the
+ * tuple itself. A tuple insert is in conflict only if there is a predicate
+ * lock against the entire relation.
+ *
+ * The call to this function also indicates that we need an entry in the
+ * serializable transaction hash table, so that this write's conflicts can
+ * be detected for the proper lifetime, which is until this transaction and
+ * all overlapping serializable transactions have completed.
+ */
+ void
+ CheckForSerializableConflictIn(const Relation relation, const HeapTuple tuple,
+ const Buffer buffer)
+ {
+ PREDICATELOCKTARGETTAG targettag;
+
+ if (SkipSerialization(relation))
+ return;
+
+ MySerializableXact->flags |= SXACT_FLAG_DID_WRITE;
+
+ /*
+ * It is important that we check for locks from the finest granularity to
+ * the coarsest granularity, so that granularity promotion doesn't cause
+ * us to miss a lock. The new (coarser) lock will be acquired before the
+ * old (finer) locks are released.
+ *
+ * It is not possible to take and hold a lock across the checks for all
+ * granularities because each target could be in a separate partition.
+ */
+ if (tuple != NULL)
+ {
+ SET_PREDICATELOCKTARGETTAG_TUPLE(targettag,
+ relation->rd_node.dbNode,
+ relation->rd_id,
+ ItemPointerGetBlockNumber(&(tuple->t_data->t_ctid)),
+ ItemPointerGetOffsetNumber(&(tuple->t_data->t_ctid)));
+ CheckTargetForConflictsIn(&targettag);
+ }
+
+ if (BufferIsValid(buffer))
+ {
+ SET_PREDICATELOCKTARGETTAG_PAGE(targettag,
+ relation->rd_node.dbNode,
+ relation->rd_id,
+ BufferGetBlockNumber(buffer));
+ CheckTargetForConflictsIn(&targettag);
+ }
+
+ SET_PREDICATELOCKTARGETTAG_RELATION(targettag,
+ relation->rd_node.dbNode,
+ relation->rd_id);
+ CheckTargetForConflictsIn(&targettag);
+ }
+
+ /*
+ * Flag a rw-dependency between two serializable transactions.
+ *
+ * The caller is responsible for ensuring that we have a LW lock on
+ * the transaction hash table.
+ */
+ static void
+ FlagRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer)
+ {
+ Assert(reader != writer);
+
+ /* First, see if this conflict causes failure. */
+ OnConflict_CheckForSerializationFailure(reader, writer);
+
+ /* Actually do the conflict flagging. */
+ SetRWConflict(reader, writer);
+ }
+
+ /*
+ * Check whether we should roll back one of these transactions
+ * instead of flagging a new rw-conflict.
+ */
+ static void
+ OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
+ const SERIALIZABLEXACT *writer)
+ {
+ bool failure;
+ RWConflict conflict;
+
+ Assert(LWLockHeldByMe(SerializableXactHashLock));
+
+ failure = false;
+
+ /*
+ * Check for already-committed writer with rw-conflict out flagged. This
+ * means that the reader must immediately fail.
+ */
+ if (SxactIsCommitted(writer) && SxactHasConflictOut(writer))
+ failure = true;
+
+ /*
+ * Check whether the reader has become a pivot with a committed writer. If
+ * so, we must roll back unless every in-conflict either committed before
+ * the writer committed or is READ ONLY and overlaps the writer.
+ */
+ if (!failure && SxactIsCommitted(writer) && !SxactIsReadOnly(reader))
+ {
+ conflict = (RWConflict)
+ SHMQueueNext(&reader->inConflicts,
+ &reader->inConflicts,
+ offsetof(RWConflictData, inLink));
+ while (conflict)
+ {
+ if (!SxactIsRolledBack(conflict->sxactOut)
+ && (!SxactIsCommitted(conflict->sxactOut)
+ || conflict->sxactOut->commitSeqNo >= writer->commitSeqNo)
+ && (!SxactIsReadOnly(conflict->sxactOut)
+ || conflict->sxactOut->SeqNo.lastCommitBeforeSnapshot >= writer->commitSeqNo))
+ {
+ failure = true;
+ break;
+ }
+ conflict = (RWConflict)
+ SHMQueueNext(&reader->inConflicts,
+ &conflict->inLink,
+ offsetof(RWConflictData, inLink));
+ }
+ }
+
+ /*
+ * Check whether the writer has become a pivot with an out-conflict
+ * committed transaction, while neither reader nor writer is committed. If
+ * the reader is a READ ONLY transaction, there is only a serialization
+ * failure if an out-conflict transaction causing the pivot committed
+ * before the reader acquired its snapshot. (That is, the reader must not
+ * have been concurrent with the out-conflict transaction.)
+ */
+ if (!failure && !SxactIsCommitted(writer))
+ {
+ conflict = (RWConflict)
+ SHMQueueNext(&writer->outConflicts,
+ &writer->outConflicts,
+ offsetof(RWConflictData, outLink));
+ while (conflict)
+ {
+ if ((reader == conflict->sxactIn && SxactIsCommitted(reader))
+ || (SxactIsCommitted(conflict->sxactIn)
+ && !SxactIsCommitted(reader)
+ && (!SxactIsReadOnly(reader)
+ || conflict->sxactIn->commitSeqNo <= reader->SeqNo.lastCommitBeforeSnapshot)))
+ {
+ failure = true;
+ break;
+ }
+ conflict = (RWConflict)
+ SHMQueueNext(&writer->outConflicts,
+ &conflict->outLink,
+ offsetof(RWConflictData, outLink));
+ }
+ }
+
+ if (failure)
+ ereport(ERROR,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("could not serialize access due to read/write dependencies among transactions"),
+ errhint("The transaction might succeed if retried.")));
+ }
+
+ /*
+ * PreCommit_CheckForSerializableConflicts
+ * Check for dangerous structures in a serializable transaction
+ * at commit.
+ *
+ * We're checking for a dangerous structure as each conflict is recorded.
+ * The only way we could have a problem at commit is if this is the "out"
+ * side of a pivot, and neither the "in" side nor the pivot has yet
+ * committed.
+ */
+ void
+ PreCommit_CheckForSerializationFailure(void)
+ {
+ bool failure;
+ RWConflict nearConflict;
+
+ if (MySerializableXact == InvalidSerializableXact)
+ return;
+
+ Assert(IsolationIsSerializable());
+
+ failure = false;
+
+ /*
+ * TODO SSI: SHARED here and EXCLUSIVE below to modify? Would require new
+ * SerializableCommitLock for exclusive use around this method?
+ */
+ LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+
+ /* TODO SSI: check whether another transaction has cancelled us? */
+
+ nearConflict = (RWConflict)
+ SHMQueueNext((SHM_QUEUE *) &MySerializableXact->inConflicts,
+ (SHM_QUEUE *) &MySerializableXact->inConflicts,
+ offsetof(RWConflictData, inLink));
+ while (nearConflict)
+ {
+ if (!SxactIsCommitted(nearConflict->sxactOut)
+ && !SxactIsRolledBack(nearConflict->sxactOut))
+ {
+ RWConflict farConflict;
+
+ farConflict = (RWConflict)
+ SHMQueueNext(&nearConflict->sxactOut->inConflicts,
+ &nearConflict->sxactOut->inConflicts,
+ offsetof(RWConflictData, inLink));
+ while (farConflict)
+ {
+ if (farConflict->sxactOut == MySerializableXact
+ || (!SxactIsCommitted(farConflict->sxactOut)
+ && !SxactIsReadOnly(farConflict->sxactOut)
+ && !SxactIsRolledBack(farConflict->sxactOut)))
+ {
+ failure = true;
+ break;
+ }
+ farConflict = (RWConflict)
+ SHMQueueNext(&nearConflict->sxactOut->inConflicts,
+ &farConflict->inLink,
+ offsetof(RWConflictData, inLink));
+ }
+ if (failure)
+ break;
+ }
+
+ nearConflict = (RWConflict)
+ SHMQueueNext((SHM_QUEUE *) &MySerializableXact->inConflicts,
+ &nearConflict->inLink,
+ offsetof(RWConflictData, inLink));
+ }
+
+ if (failure)
+ {
+ /*
+ * TODO SSI: cancel some *other* transaction(s) here, instead!
+ * CancelVirtualTransaction(VirtualTransactionId vxid,
+ * ProcSignalReason sigmode)
+ */
+ MySerializableXact->flags |= SXACT_FLAG_ROLLED_BACK;
+ MySerializableXact->finishedBefore = ShmemVariableCache->nextXid;
+ ereport(ERROR,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("could not serialize access due to read/write dependencies among transactions"),
+ errhint("The transaction might succeed if retried.")));
+ }
+
+ MySerializableXact->flags |= SXACT_FLAG_COMMITTED;
+ MySerializableXact->finishedBefore = ShmemVariableCache->nextXid;
+
+ LWLockRelease(SerializableXactHashLock);
+ }
+
+ /*
+ * GetSafeSnapshot
+ * Obtain and register a snapshot for a READ ONLY DEFERRABLE
+ * transaction. Ensures that the snapshot is "safe", i.e. a
+ * read-only transaction running on it can execute serializably
+ * without further checks. This requires waiting for concurrent
+ * transactions to complete, and retrying with a new snapshot if
+ * one of them could possibly create a conflict.
+ */
+ Snapshot
+ GetSafeSnapshot(Snapshot snapshot)
+ {
+ Assert(XactReadOnly && XactDeferrable);
+
+ while (true)
+ {
+ LWLockAcquire(SerializableXactHashLock, LW_SHARED);
+
+ /* Get and register a snapshot */
+ snapshot = GetSnapshotData(snapshot);
+ snapshot = RegisterSnapshotOnOwner(snapshot,
+ TopTransactionResourceOwner);
+ RegisterSerializableTransactionInt(snapshot);
+ if (MySerializableXact == InvalidSerializableXact)
+ return snapshot; /* no concurrent r/w xacts; it's safe */
+
+ MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING;
+
+ LWLockRelease(SerializableXactHashLock);
+
+ /*
+ * Wait for concurrent transactions to finish. Stop early if one of
+ * them marked us as conflicted.
+ */
+ while (!(SHMQueueEmpty((SHM_QUEUE *)
+ &MySerializableXact->possibleUnsafeConflicts) ||
+ SxactIsROUnsafe(MySerializableXact)))
+ ProcWaitForSignal();
+
+ MySerializableXact->flags &= ~SXACT_FLAG_DEFERRABLE_WAITING;
+ if (!SxactIsROUnsafe(MySerializableXact))
+ break; /* success */
+
+ /* else, need to retry... */
+ ereport(WARNING,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("deferrable snapshot was unsafe; trying a new one")));
+ ReleasePredicateLocks(false);
+ UnregisterSnapshotFromOwner(snapshot,
+ TopTransactionResourceOwner);
+ }
+
+ /*
+ * Now we have a safe snapshot, so we don't need to do any further checks.
+ */
+ Assert(SxactIsROSafe(MySerializableXact));
+ ReleasePredicateLocks(false);
+ return snapshot;
+ }
*** a/src/backend/tcop/utility.c
--- b/src/backend/tcop/utility.c
***************
*** 373,378 **** standard_ProcessUtility(Node *parsetree,
--- 373,382 ----
SetPGVariable("transaction_read_only",
list_make1(item->arg),
true);
+ else if (strcmp(item->defname, "transaction_deferrable") == 0)
+ SetPGVariable("transaction_deferrable",
+ list_make1(item->arg),
+ true);
}
}
break;
*** a/src/backend/utils/adt/lockfuncs.c
--- b/src/backend/utils/adt/lockfuncs.c
***************
*** 15,20 ****
--- 15,21 ----
#include "catalog/pg_type.h"
#include "funcapi.h"
#include "miscadmin.h"
+ #include "storage/predicate_internals.h"
#include "storage/proc.h"
#include "utils/builtins.h"
***************
*** 32,42 **** static const char *const LockTagTypeNames[] = {
--- 33,52 ----
"advisory"
};
+ /* This must match enum PredicateLockTargetType (predicate_internals.h) */
+ static const char *const PredicateLockTagTypeNames[] = {
+ "relation",
+ "page",
+ "tuple"
+ };
+
/* Working status for pg_lock_status */
typedef struct
{
LockData *lockData; /* state data from lmgr */
int currIdx; /* current PROCLOCK index */
+ PredicateLockData *predLockData; /* state data for pred locks */
+ int predLockIdx; /* current index for pred lock */
} PG_Lock_Status;
***************
*** 69,74 **** pg_lock_status(PG_FUNCTION_ARGS)
--- 79,85 ----
FuncCallContext *funcctx;
PG_Lock_Status *mystatus;
LockData *lockData;
+ PredicateLockData *predLockData;
if (SRF_IS_FIRSTCALL())
{
***************
*** 126,131 **** pg_lock_status(PG_FUNCTION_ARGS)
--- 137,144 ----
mystatus->lockData = GetLockStatusData();
mystatus->currIdx = 0;
+ mystatus->predLockData = GetPredicateLockStatusData();
+ mystatus->predLockIdx = 0;
MemoryContextSwitchTo(oldcontext);
}
***************
*** 303,308 **** pg_lock_status(PG_FUNCTION_ARGS)
--- 316,387 ----
SRF_RETURN_NEXT(funcctx, result);
}
+ /*
+ * Have returned all regular locks. Now start on the SIREAD predicate
+ * locks.
+ */
+ predLockData = mystatus->predLockData;
+ if (mystatus->predLockIdx < predLockData->nelements)
+ {
+ PredicateLockTargetType lockType;
+
+ PREDICATELOCKTARGETTAG *predTag = &(predLockData->locktags[mystatus->predLockIdx]);
+ SERIALIZABLEXACT *xact = &(predLockData->xacts[mystatus->predLockIdx]);
+ Datum values[14];
+ bool nulls[14];
+ HeapTuple tuple;
+ Datum result;
+
+ mystatus->predLockIdx++;
+
+ /*
+ * Form tuple with appropriate data.
+ */
+ MemSet(values, 0, sizeof(values));
+ MemSet(nulls, false, sizeof(nulls));
+
+ /* lock type */
+ lockType = GET_PREDICATELOCKTARGETTAG_TYPE(*predTag);
+
+ values[0] = CStringGetTextDatum(PredicateLockTagTypeNames[lockType]);
+
+ /* lock target */
+ values[1] = GET_PREDICATELOCKTARGETTAG_DB(*predTag);
+ values[2] = GET_PREDICATELOCKTARGETTAG_RELATION(*predTag);
+ if (lockType == PREDLOCKTAG_TUPLE)
+ values[4] = GET_PREDICATELOCKTARGETTAG_OFFSET(*predTag);
+ else
+ nulls[4] = true;
+ if ((lockType == PREDLOCKTAG_TUPLE) ||
+ (lockType == PREDLOCKTAG_PAGE))
+ values[3] = GET_PREDICATELOCKTARGETTAG_PAGE(*predTag);
+ else
+ nulls[3] = true;
+
+ /* these fields are targets for other types of locks */
+ nulls[5] = true; /* virtualxid */
+ nulls[6] = true; /* transactionid */
+ nulls[7] = true; /* classid */
+ nulls[8] = true; /* objid */
+ nulls[9] = true; /* objsubid */
+
+ /* lock holder */
+ values[10] = VXIDGetDatum(xact->tag.vxid.backendId,
+ xact->tag.vxid.localTransactionId);
+ nulls[11] = true; /* pid */
+
+ /*
+ * Lock mode. Currently all predicate locks are SIReadLocks, which are
+ * always held (never waiting)
+ */
+ values[12] = CStringGetTextDatum("SIReadLock");
+ values[13] = BoolGetDatum(true);
+
+ tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
+ result = HeapTupleGetDatum(tuple);
+ SRF_RETURN_NEXT(funcctx, result);
+ }
+
SRF_RETURN_DONE(funcctx);
}
*** a/src/backend/utils/misc/guc.c
--- b/src/backend/utils/misc/guc.c
***************
*** 59,64 ****
--- 59,65 ----
#include "storage/bufmgr.h"
#include "storage/standby.h"
#include "storage/fd.h"
+ #include "storage/predicate.h"
#include "tcop/tcopprot.h"
#include "tsearch/ts_cache.h"
#include "utils/builtins.h"
***************
*** 1098,1103 **** static struct config_bool ConfigureNamesBool[] =
--- 1099,1113 ----
false, assign_transaction_read_only, NULL
},
{
+ {"transaction_deferrable", PGC_USERSET, CLIENT_CONN_STATEMENT,
+ gettext_noop("Whether to defer a read-only serializable transaction until it can be executed with no possible serialization failures."),
+ NULL,
+ GUC_NO_RESET_ALL | GUC_NOT_IN_SAMPLE | GUC_DISALLOW_IN_FILE
+ },
+ &XactDeferrable,
+ false, assign_transaction_deferrable, NULL
+ },
+ {
{"check_function_bodies", PGC_USERSET, CLIENT_CONN_STATEMENT,
gettext_noop("Check function bodies during CREATE FUNCTION."),
NULL
***************
*** 1697,1702 **** static struct config_int ConfigureNamesInt[] =
--- 1707,1723 ----
},
{
+ {"max_predicate_locks_per_transaction", PGC_POSTMASTER, LOCK_MANAGEMENT,
+ gettext_noop("Sets the maximum number of predicate locks per transaction."),
+ gettext_noop("The shared predicate lock table is sized on the assumption that "
+ "at most max_predicate_locks_per_transaction * max_connections distinct "
+ "objects will need to be locked at any one time.")
+ },
+ &max_predicate_locks_per_xact,
+ 64, 10, INT_MAX, NULL, NULL
+ },
+
+ {
{"authentication_timeout", PGC_SIGHUP, CONN_AUTH_SECURITY,
gettext_noop("Sets the maximum allowed time to complete client authentication."),
NULL,
***************
*** 3461,3466 **** InitializeGUCOptions(void)
--- 3482,3489 ----
PGC_POSTMASTER, PGC_S_OVERRIDE);
SetConfigOption("transaction_read_only", "no",
PGC_POSTMASTER, PGC_S_OVERRIDE);
+ SetConfigOption("transaction_deferrable", "no",
+ PGC_POSTMASTER, PGC_S_OVERRIDE);
/*
* For historical reasons, some GUC parameters can receive defaults from
***************
*** 5700,5705 **** ExecSetVariableStmt(VariableSetStmt *stmt)
--- 5723,5731 ----
else if (strcmp(item->defname, "transaction_read_only") == 0)
SetPGVariable("transaction_read_only",
list_make1(item->arg), stmt->is_local);
+ else if (strcmp(item->defname, "transaction_deferrable") == 0)
+ SetPGVariable("transaction_deferrable",
+ list_make1(item->arg), stmt->is_local);
else
elog(ERROR, "unexpected SET TRANSACTION element: %s",
item->defname);
*** a/src/backend/utils/resowner/resowner.c
--- b/src/backend/utils/resowner/resowner.c
***************
*** 22,27 ****
--- 22,28 ----
#include "access/hash.h"
#include "storage/bufmgr.h"
+ #include "storage/predicate.h"
#include "storage/proc.h"
#include "utils/memutils.h"
#include "utils/rel.h"
***************
*** 261,267 **** ResourceOwnerReleaseInternal(ResourceOwner owner,
--- 262,271 ----
* the top of the recursion.
*/
if (owner == TopTransactionResourceOwner)
+ {
ProcReleaseLocks(isCommit);
+ ReleasePredicateLocks(isCommit);
+ }
}
else
{
*** a/src/backend/utils/time/snapmgr.c
--- b/src/backend/utils/time/snapmgr.c
***************
*** 27,32 ****
--- 27,33 ----
#include "access/transam.h"
#include "access/xact.h"
+ #include "storage/predicate.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "utils/memutils.h"
***************
*** 126,131 **** GetTransactionSnapshot(void)
--- 127,151 ----
{
Assert(RegisteredSnapshots == 0);
+ /*
+ * A special optimization is available for SERIALIZABLE READ ONLY
+ * DEFERRABLE transactions -- we can wait for a suitable snapshot
+ * and thereby avoid all SSI overhead.
+ */
+ if (IsolationIsSerializable() && XactReadOnly && XactDeferrable)
+ {
+ /*
+ * Need to atomically acquire a snapshot and begin waiting
+ * to see if it's safe. The snapshot will already be registered
+ * when it is returned. The transaction should not be
+ * registered for SSI.
+ */
+ CurrentSnapshot = GetSafeSnapshot(&CurrentSnapshotData);
+ FirstSnapshotSet = true;
+ registered_xact_snapshot = true;
+ return CurrentSnapshot;
+ }
+
CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
FirstSnapshotSet = true;
***************
*** 139,144 **** GetTransactionSnapshot(void)
--- 159,166 ----
CurrentSnapshot = RegisterSnapshotOnOwner(CurrentSnapshot,
TopTransactionResourceOwner);
registered_xact_snapshot = true;
+ if (IsolationIsSerializable())
+ RegisterSerializableTransaction(CurrentSnapshot);
}
return CurrentSnapshot;
*** a/src/bin/pg_dump/pg_dump.c
--- b/src/bin/pg_dump/pg_dump.c
***************
*** 11,24 ****
* script that reproduces the schema in terms of SQL that is understood
* by PostgreSQL
*
! * Note that pg_dump runs in a serializable transaction, so it sees a
! * consistent snapshot of the database including system catalogs.
! * However, it relies in part on various specialized backend functions
! * like pg_get_indexdef(), and those things tend to run on SnapshotNow
! * time, ie they look at the currently committed state. So it is
! * possible to get 'cache lookup failed' error if someone performs DDL
! * changes while a dump is happening. The window for this sort of thing
! * is from the beginning of the serializable transaction to
* getSchemaData() (when pg_dump acquires AccessShareLock on every
* table it intends to dump). It isn't very large, but it can happen.
*
--- 11,24 ----
* script that reproduces the schema in terms of SQL that is understood
* by PostgreSQL
*
! * Note that pg_dump runs in a transaction-snapshot mode transaction,
! * so it sees a consistent snapshot of the database including system
! * catalogs. However, it relies in part on various specialized backend
! * functions like pg_get_indexdef(), and those things tend to run on
! * SnapshotNow time, ie they look at the currently committed state. So
! * it is possible to get 'cache lookup failed' error if someone
! * performs DDL changes while a dump is happening. The window for this
! * sort of thing is from the acquisition of the transaction snapshot to
* getSchemaData() (when pg_dump acquires AccessShareLock on every
* table it intends to dump). It isn't very large, but it can happen.
*
***************
*** 134,139 **** static int disable_dollar_quoting = 0;
--- 134,140 ----
static int dump_inserts = 0;
static int column_inserts = 0;
static int no_security_label = 0;
+ static int serializable_deferrable = 0;
static void help(const char *progname);
***************
*** 314,319 **** main(int argc, char **argv)
--- 315,321 ----
{"no-tablespaces", no_argument, &outputNoTablespaces, 1},
{"quote-all-identifiers", no_argument, "e_all_identifiers, 1},
{"role", required_argument, NULL, 3},
+ {"serializable-deferrable", no_argument, &serializable_deferrable, 1},
{"use-set-session-authorization", no_argument, &use_setsessauth, 1},
{"no-security-label", no_argument, &no_security_label, 1},
***************
*** 667,677 **** main(int argc, char **argv)
no_security_label = 1;
/*
! * Start serializable transaction to dump consistent data.
*/
do_sql_command(g_conn, "BEGIN");
!
! do_sql_command(g_conn, "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
/* Select the appropriate subquery to convert user IDs to names */
if (g_fout->remoteVersion >= 80100)
--- 669,689 ----
no_security_label = 1;
/*
! * Start transaction-snapshot mode transaction to dump consistent data.
*/
do_sql_command(g_conn, "BEGIN");
! if (g_fout->remoteVersion >= 90100)
! {
! if (serializable_deferrable)
! do_sql_command(g_conn,
! "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, "
! "READ ONLY, DEFERRABLE");
! else
! do_sql_command(g_conn,
! "SET TRANSACTION ISOLATION LEVEL REPEATABLE READ");
! }
! else
! do_sql_command(g_conn, "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
/* Select the appropriate subquery to convert user IDs to names */
if (g_fout->remoteVersion >= 80100)
***************
*** 862,867 **** help(const char *progname)
--- 874,880 ----
printf(_(" --disable-triggers disable triggers during data-only restore\n"));
printf(_(" --no-tablespaces do not dump tablespace assignments\n"));
printf(_(" --quote-all-identifiers quote all identifiers, even if not keywords\n"));
+ printf(_(" --serializable-deferrable wait until the dump can run without anomalies\n"));
printf(_(" --role=ROLENAME do SET ROLE before dump\n"));
printf(_(" --no-security-label do not dump security label assignments\n"));
printf(_(" --use-set-session-authorization\n"
*** a/src/include/access/heapam.h
--- b/src/include/access/heapam.h
***************
*** 82,89 **** extern HeapTuple heap_getnext(HeapScanDesc scan, ScanDirection direction);
extern bool heap_fetch(Relation relation, Snapshot snapshot,
HeapTuple tuple, Buffer *userbuf, bool keep_buf,
Relation stats_relation);
! extern bool heap_hot_search_buffer(ItemPointer tid, Buffer buffer,
! Snapshot snapshot, bool *all_dead);
extern bool heap_hot_search(ItemPointer tid, Relation relation,
Snapshot snapshot, bool *all_dead);
--- 82,89 ----
extern bool heap_fetch(Relation relation, Snapshot snapshot,
HeapTuple tuple, Buffer *userbuf, bool keep_buf,
Relation stats_relation);
! extern bool heap_hot_search_buffer(ItemPointer tid, Relation relation,
! Buffer buffer, Snapshot snapshot, bool *all_dead);
extern bool heap_hot_search(ItemPointer tid, Relation relation,
Snapshot snapshot, bool *all_dead);
*** a/src/include/access/xact.h
--- b/src/include/access/xact.h
***************
*** 32,46 **** extern int DefaultXactIsoLevel;
extern int XactIsoLevel;
/*
! * We only implement two isolation levels internally. This macro should
! * be used to check which one is selected.
*/
#define IsolationUsesXactSnapshot() (XactIsoLevel >= XACT_REPEATABLE_READ)
/* Xact read-only state */
extern bool DefaultXactReadOnly;
extern bool XactReadOnly;
/* Asynchronous commits */
extern bool XactSyncCommit;
--- 32,56 ----
extern int XactIsoLevel;
/*
! * We implement three isolation levels internally.
! * The two stronger ones use one snapshot per database transaction;
! * the others use one snapshot per statement.
! * Serializable uses predicate locks in addition to snapshots.
! * These macros should be used to check which isolation level is selected.
*/
#define IsolationUsesXactSnapshot() (XactIsoLevel >= XACT_REPEATABLE_READ)
+ #define IsolationIsSerializable() (XactIsoLevel == XACT_SERIALIZABLE)
/* Xact read-only state */
extern bool DefaultXactReadOnly;
extern bool XactReadOnly;
+ /*
+ * Xact is deferrable -- only meaningful (currently) for read only
+ * SERIALIZABLE transactions
+ */
+ extern bool XactDeferrable;
+
/* Asynchronous commits */
extern bool XactSyncCommit;
*** a/src/include/catalog/pg_am.h
--- b/src/include/catalog/pg_am.h
***************
*** 50,55 **** CATALOG(pg_am,2601)
--- 50,56 ----
bool amsearchnulls; /* can AM search for NULL/NOT NULL entries? */
bool amstorage; /* can storage type differ from column type? */
bool amclusterable; /* does AM support cluster command? */
+ bool ampredlocks; /* does AM handle predicate locks? */
Oid amkeytype; /* type of data in index, or InvalidOid */
regproc aminsert; /* "insert this tuple" function */
regproc ambeginscan; /* "prepare for index scan" function */
***************
*** 77,83 **** typedef FormData_pg_am *Form_pg_am;
* compiler constants for pg_am
* ----------------
*/
! #define Natts_pg_am 27
#define Anum_pg_am_amname 1
#define Anum_pg_am_amstrategies 2
#define Anum_pg_am_amsupport 3
--- 78,84 ----
* compiler constants for pg_am
* ----------------
*/
! #define Natts_pg_am 28
#define Anum_pg_am_amname 1
#define Anum_pg_am_amstrategies 2
#define Anum_pg_am_amsupport 3
***************
*** 91,126 **** typedef FormData_pg_am *Form_pg_am;
#define Anum_pg_am_amsearchnulls 11
#define Anum_pg_am_amstorage 12
#define Anum_pg_am_amclusterable 13
! #define Anum_pg_am_amkeytype 14
! #define Anum_pg_am_aminsert 15
! #define Anum_pg_am_ambeginscan 16
! #define Anum_pg_am_amgettuple 17
! #define Anum_pg_am_amgetbitmap 18
! #define Anum_pg_am_amrescan 19
! #define Anum_pg_am_amendscan 20
! #define Anum_pg_am_ammarkpos 21
! #define Anum_pg_am_amrestrpos 22
! #define Anum_pg_am_ambuild 23
! #define Anum_pg_am_ambulkdelete 24
! #define Anum_pg_am_amvacuumcleanup 25
! #define Anum_pg_am_amcostestimate 26
! #define Anum_pg_am_amoptions 27
/* ----------------
* initial contents of pg_am
* ----------------
*/
! DATA(insert OID = 403 ( btree 5 1 t f t t t t t t f t 0 btinsert btbeginscan btgettuple btgetbitmap btrescan btendscan btmarkpos btrestrpos btbuild btbulkdelete btvacuumcleanup btcostestimate btoptions ));
DESCR("b-tree index access method");
#define BTREE_AM_OID 403
! DATA(insert OID = 405 ( hash 1 1 f f t f f f f f f f 23 hashinsert hashbeginscan hashgettuple hashgetbitmap hashrescan hashendscan hashmarkpos hashrestrpos hashbuild hashbulkdelete hashvacuumcleanup hashcostestimate hashoptions ));
DESCR("hash index access method");
#define HASH_AM_OID 405
! DATA(insert OID = 783 ( gist 0 8 f t f f t t t t t t 0 gistinsert gistbeginscan gistgettuple gistgetbitmap gistrescan gistendscan gistmarkpos gistrestrpos gistbuild gistbulkdelete gistvacuumcleanup gistcostestimate gistoptions ));
DESCR("GiST index access method");
#define GIST_AM_OID 783
! DATA(insert OID = 2742 ( gin 0 5 f f f f t t f f t f 0 gininsert ginbeginscan - gingetbitmap ginrescan ginendscan ginmarkpos ginrestrpos ginbuild ginbulkdelete ginvacuumcleanup gincostestimate ginoptions ));
DESCR("GIN index access method");
#define GIN_AM_OID 2742
--- 92,128 ----
#define Anum_pg_am_amsearchnulls 11
#define Anum_pg_am_amstorage 12
#define Anum_pg_am_amclusterable 13
! #define Anum_pg_am_ampredlocks 14
! #define Anum_pg_am_amkeytype 15
! #define Anum_pg_am_aminsert 16
! #define Anum_pg_am_ambeginscan 17
! #define Anum_pg_am_amgettuple 18
! #define Anum_pg_am_amgetbitmap 19
! #define Anum_pg_am_amrescan 20
! #define Anum_pg_am_amendscan 21
! #define Anum_pg_am_ammarkpos 22
! #define Anum_pg_am_amrestrpos 23
! #define Anum_pg_am_ambuild 24
! #define Anum_pg_am_ambulkdelete 25
! #define Anum_pg_am_amvacuumcleanup 26
! #define Anum_pg_am_amcostestimate 27
! #define Anum_pg_am_amoptions 28
/* ----------------
* initial contents of pg_am
* ----------------
*/
! DATA(insert OID = 403 ( btree 5 1 t f t t t t t t f t t 0 btinsert btbeginscan btgettuple btgetbitmap btrescan btendscan btmarkpos btrestrpos btbuild btbulkdelete btvacuumcleanup btcostestimate btoptions ));
DESCR("b-tree index access method");
#define BTREE_AM_OID 403
! DATA(insert OID = 405 ( hash 1 1 f f t f f f f f f f f 23 hashinsert hashbeginscan hashgettuple hashgetbitmap hashrescan hashendscan hashmarkpos hashrestrpos hashbuild hashbulkdelete hashvacuumcleanup hashcostestimate hashoptions ));
DESCR("hash index access method");
#define HASH_AM_OID 405
! DATA(insert OID = 783 ( gist 0 8 f t f f t t t t t t f 0 gistinsert gistbeginscan gistgettuple gistgetbitmap gistrescan gistendscan gistmarkpos gistrestrpos gistbuild gistbulkdelete gistvacuumcleanup gistcostestimate gistoptions ));
DESCR("GiST index access method");
#define GIST_AM_OID 783
! DATA(insert OID = 2742 ( gin 0 5 f f f f t t f f t f f 0 gininsert ginbeginscan - gingetbitmap ginrescan ginendscan ginmarkpos ginrestrpos ginbuild ginbulkdelete ginvacuumcleanup gincostestimate ginoptions ));
DESCR("GIN index access method");
#define GIN_AM_OID 2742
*** a/src/include/commands/variable.h
--- b/src/include/commands/variable.h
***************
*** 24,29 **** extern const char *show_log_timezone(void);
--- 24,31 ----
extern const char *assign_XactIsoLevel(const char *value,
bool doit, GucSource source);
extern const char *show_XactIsoLevel(void);
+ extern bool assign_transaction_deferrable(bool newval, bool doit,
+ GucSource source);
extern bool assign_random_seed(double value,
bool doit, GucSource source);
extern const char *show_random_seed(void);
*** a/src/include/storage/lwlock.h
--- b/src/include/storage/lwlock.h
***************
*** 27,32 ****
--- 27,36 ----
#define LOG2_NUM_LOCK_PARTITIONS 4
#define NUM_LOCK_PARTITIONS (1 << LOG2_NUM_LOCK_PARTITIONS)
+ /* Number of partitions the shared predicate lock tables are divided into */
+ #define LOG2_NUM_PREDICATELOCK_PARTITIONS 4
+ #define NUM_PREDICATELOCK_PARTITIONS (1 << LOG2_NUM_PREDICATELOCK_PARTITIONS)
+
/*
* We have a number of predefined LWLocks, plus a bunch of LWLocks that are
* dynamically assigned (e.g., for shared buffers). The LWLock structures
***************
*** 70,81 **** typedef enum LWLockId
RelationMappingLock,
AsyncCtlLock,
AsyncQueueLock,
/* Individual lock IDs end here */
FirstBufMappingLock,
FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
/* must be last except for MaxDynamicLWLock: */
! NumFixedLWLocks = FirstLockMgrLock + NUM_LOCK_PARTITIONS,
MaxDynamicLWLock = 1000000000
} LWLockId;
--- 74,89 ----
RelationMappingLock,
AsyncCtlLock,
AsyncQueueLock,
+ SerializableXactHashLock,
+ SerializableFinishedListLock,
+ SerializablePredicateLockListLock,
/* Individual lock IDs end here */
FirstBufMappingLock,
FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
+ FirstPredicateLockMgrLock = FirstLockMgrLock + NUM_LOCK_PARTITIONS,
/* must be last except for MaxDynamicLWLock: */
! NumFixedLWLocks = FirstPredicateLockMgrLock + NUM_PREDICATELOCK_PARTITIONS,
MaxDynamicLWLock = 1000000000
} LWLockId;
*** /dev/null
--- b/src/include/storage/predicate.h
***************
*** 0 ****
--- 1,59 ----
+ /*-------------------------------------------------------------------------
+ *
+ * predicate.h
+ * POSTGRES public predicate locking definitions.
+ *
+ *
+ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * $PostgreSQL$
+ *
+ *-------------------------------------------------------------------------
+ */
+ #ifndef PREDICATE_H
+ #define PREDICATE_H
+
+ #include "utils/relcache.h"
+ #include "utils/snapshot.h"
+
+
+ /*
+ * GUC variables
+ */
+ extern int max_predicate_locks_per_xact;
+
+
+ /*
+ * function prototypes
+ */
+
+ /* housekeeping for shared memory predicate lock structures */
+ extern void InitPredicateLocks(void);
+ extern Size PredicateLockShmemSize(void);
+
+ /* predicate lock reporting */
+ extern bool PageIsPredicateLocked(const Relation relation, const BlockNumber blkno);
+
+ /* predicate lock maintenance */
+ extern void RegisterSerializableTransaction(const Snapshot snapshot);
+ extern void RegisterPredicateLockingXid(const TransactionId xid);
+ extern void PredicateLockRelation(const Relation relation);
+ extern void PredicateLockPage(const Relation relation, const BlockNumber blkno);
+ extern void PredicateLockTuple(const Relation relation, const HeapTuple tuple);
+ extern void PredicateLockPageSplit(const Relation relation, const BlockNumber oldblkno, const BlockNumber newblkno);
+ extern void PredicateLockPageCombine(const Relation relation, const BlockNumber oldblkno, const BlockNumber newblkno);
+ extern void ReleasePredicateLocks(const bool isCommit);
+
+ /* conflict detection (may also trigger rollback) */
+ extern void CheckForSerializableConflictOut(const bool valid, const Relation relation, const HeapTuple tuple, const Buffer buffer);
+ extern void CheckForSerializableConflictIn(const Relation relation, const HeapTuple tuple, const Buffer buffer);
+
+ /* final rollback checking */
+ extern void PreCommit_CheckForSerializationFailure(void);
+
+ /* for READ ONLY DEFERRABLE transactions */
+ Snapshot GetSafeSnapshot(Snapshot snapshot);
+
+
+ #endif /* PREDICATE_H */
*** /dev/null
--- b/src/include/storage/predicate_internals.h
***************
*** 0 ****
--- 1,415 ----
+ /*-------------------------------------------------------------------------
+ *
+ * predicate_internals.h
+ * POSTGRES internal predicate locking definitions.
+ *
+ *
+ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * $PostgreSQL$
+ *
+ *-------------------------------------------------------------------------
+ */
+ #ifndef PREDICATE_INTERNALS_H
+ #define PREDICATE_INTERNALS_H
+
+ #include "storage/lock.h"
+
+ /*
+ * Commit number.
+ */
+ typedef uint64 SerCommitSeqNo;
+
+ #define InvalidSerCommitSeqNo ((SerCommitSeqNo) ((2^64)-1))
+
+
+ /*
+ * The SERIALIZABLEXACTTAG struct identifies a serializable transaction.
+ */
+ typedef struct SERIALIZABLEXACTTAG
+ {
+ VirtualTransactionId vxid; /* The executing process always has one of
+ * these. */
+ } SERIALIZABLEXACTTAG;
+
+ /*
+ * The SERIALIZABLEXACT struct conatins information needed for each
+ * serializable database transaction to support SSI techniques.
+ *
+ * A hash table is maintained in shared memory of these, keyed by the virtual
+ * transaction ID. An entry is created and added to the table when and if
+ * the serializable transaction acquires a snapshot. Unless the transaction
+ * is rolled back, this entry must remain until all concurrent transactions
+ * have completed. While it would be OK to clean up a transaction as soon as
+ * it is rolled back, for performance reasons this is generally deferred; a
+ * flag indicates whether a transaction has been rolled back, and such
+ * transactions should be ignored for purposes of detecting conflicts and
+ * serialization failures.
+ *
+ * Eligibility for cleanup of committed transactions is determined by
+ * comparing the transaction's finishedBefore field to SerializableGlobalXmin.
+ */
+ typedef struct SERIALIZABLEXACT
+ {
+ /* hash key */
+ SERIALIZABLEXACTTAG tag;
+
+ /* data */
+ SerCommitSeqNo commitSeqNo;
+ union /* these values are not both interesting at
+ * the same time */
+ {
+ SerCommitSeqNo earliestOutConflictCommit; /* when committed with
+ * conflict out */
+ SerCommitSeqNo lastCommitBeforeSnapshot; /* when not committed or
+ * no conflict out */
+ } SeqNo;
+ SHM_QUEUE outConflicts; /* list of write transactions whose data we
+ * couldn't read. */
+ SHM_QUEUE inConflicts; /* list of read transactions which couldn't
+ * see our write. */
+ SHM_QUEUE predicateLocks; /* list of associated PREDICATELOCK objects */
+ SHM_QUEUE finishedLink; /* list link in
+ * FinishedSerializableTransactions */
+ SHM_QUEUE possibleUnsafeConflicts;
+
+ /*
+ * for r/o transactions: list of concurrent r/w transactions that we could
+ * potentially have conflicts with, and vice versa for r/w transactions
+ */
+ TransactionId topXid; /* top level xid for the transaction, if one
+ * exists; else invalid */
+ TransactionId finishedBefore; /* invalid means still running; else
+ * the struct expires when no
+ * serializable xids are before this. */
+ TransactionId xmin; /* the transaction's snapshot xmin */
+ int flags; /* OR'd combination of values defined below */
+ int pid; /* pid of associated process */
+ } SERIALIZABLEXACT;
+
+ /* TODO SSI: What's the best technique for dealing with these flags? */
+ #define SXACT_FLAG_ROLLED_BACK 0x00000001
+ #define SXACT_FLAG_COMMITTED 0x00000002
+ #define SXACT_FLAG_CONFLICT_OUT 0x00000004
+ #define SXACT_FLAG_READ_ONLY 0x00000008
+ #define SXACT_FLAG_DID_WRITE 0x00000010
+ #define SXACT_FLAG_INTERRUPT 0x00000020
+ #define SXACT_FLAG_DEFERRABLE_WAITING 0x00000040
+ #define SXACT_FLAG_RO_SAFE 0x00000080
+ #define SXACT_FLAG_RO_UNSAFE 0x00000100
+
+ /*
+ * The following types are used to provide an ad hoc list for holding
+ * SERIALIZABLEXACT objects. An HTAB is overkill, since there is no need to
+ * access these by key -- there are direct pointers to these objects where
+ * needed. If a shared memory list is created, these types can probably be
+ * eliminated in favor of using the general solution.
+ */
+ typedef struct PredTranListElementData
+ {
+ SHM_QUEUE link;
+ SERIALIZABLEXACT sxact;
+ } PredTranListElementData;
+
+ typedef struct PredTranListElementData *PredTranListElement;
+
+ #define PredTranListElementDataSize \
+ ((Size)MAXALIGN(sizeof(PredTranListElementData)))
+
+ typedef struct PredTranListData
+ {
+ SHM_QUEUE availableList;
+ SHM_QUEUE activeList;
+
+ /*
+ * These global variables are maintained when registering and cleaning up
+ * serializable transactions. They must be global across all backends,
+ * but are not needed outside the predicate.c source file.
+ */
+ TransactionId SxactGlobalXmin; /* global xmin for active serializable
+ * transactions */
+ int SxactGlobalXminCount; /* how many active serializable
+ * transactions have this xmin */
+ int WritableSxactCount; /* how many non-read-only serializable
+ * transactions are active */
+ SerCommitSeqNo LastSxactCommitSeqNo; /* a strictly monotonically
+ * increasing number for
+ * commits of serialiable
+ * transactions */
+ SerCommitSeqNo LastWritingCommitSeqNo; /* The last commitSeqNo
+ * assigned at commit to a
+ * transaction which wrote
+ * data. */
+ SerCommitSeqNo CanPartialClearThrough; /* can clear predicate locks and
+ * inConflicts for committed
+ * transactions through this seq
+ * no */
+ SerCommitSeqNo HavePartialClearedThrough; /* have cleared through this
+ * seq no */
+
+ PredTranListElement element;
+ } PredTranListData;
+
+ typedef struct PredTranListData *PredTranList;
+
+ #define PredTranListDataSize \
+ ((Size)MAXALIGN(sizeof(PredTranListData)))
+
+
+ /*
+ * The following types are used to provide lists of rw-conflicts between
+ * pairs of transactions.
+ *
+ * The outList field doubles for an "available" list when the structure
+ * is not in use.
+ */
+ typedef struct RWConflictData
+ {
+ SHM_QUEUE outLink;
+ SHM_QUEUE inLink;
+ SERIALIZABLEXACT *sxactOut;
+ SERIALIZABLEXACT *sxactIn;
+ } RWConflictData;
+
+ typedef struct RWConflictData *RWConflict;
+
+ #define RWConflictDataSize \
+ ((Size)MAXALIGN(sizeof(RWConflictData)))
+
+ typedef struct RWConflictPoolHeaderData
+ {
+ SHM_QUEUE availableList;
+ RWConflict element;
+ } RWConflictPoolHeaderData;
+
+ typedef struct RWConflictPoolHeaderData *RWConflictPoolHeader;
+
+ #define RWConflictPoolHeaderDataSize \
+ ((Size)MAXALIGN(sizeof(RWConflictPoolHeaderData)))
+
+
+ /*
+ * The SERIALIZABLEXIDTAG struct identifies an xid assigned to a serializable
+ * transaction or any of its subtransactions.
+ */
+ typedef struct SERIALIZABLEXIDTAG
+ {
+ TransactionId xid;
+ } SERIALIZABLEXIDTAG;
+
+ /*
+ * The SERIALIZABLEXID struct provides a link from a TransactionId for a
+ * serializable transaction to the related SERIALIZABLEXACT record, even if
+ * the transaction has completed and its connection has been closed.
+ *
+ * A hash table of these objects is maintained in shared memory to provide a
+ * quick way to find the top level transaction information for a serializable
+ * transaction, Because a serializable transaction can acquire a snapshot
+ * and read information which requires a predicate lock before it has a
+ * TransactionId, it must be keyed by VirtualTransactionId; this hashmap
+ * allows a fast link from MVCC transaction IDs to the related serializable
+ * transaction hash table entry.
+ *
+ * These are created as new top level transaction IDs are first assigned to
+ * transactions which are participating in predicate locking. They are
+ * removed with their related serializable transaction objects.
+ *
+ * The SubTransGetTopmostTransaction method is used where necessary to get
+ * from an XID which might be from a subtransaction to the top level XID.
+ */
+ typedef struct SERIALIZABLEXID
+ {
+ /* hash key */
+ SERIALIZABLEXIDTAG tag;
+
+ /* data */
+ SERIALIZABLEXACT *myXact; /* pointer to the top level transaction data */
+ } SERIALIZABLEXID;
+
+
+ /*
+ * The PREDICATELOCKTARGETTAG struct identifies a database object which can
+ * be the target of predicate locks. It is designed to fit into 16 bytes
+ * with no padding. Note that this would need adjustment if we widen Oid or
+ * BlockNumber to more than 32 bits.
+ *
+ * TODO SSI: If we always use the same fields for the same type of value, we
+ * should rename these. Holding off until it's clear there are no exceptions.
+ * Since indexes are relations with blocks and tuples, it's looking likely that
+ * the rename will be possible. If not, we may need to divide the last field
+ * and use part of it for a target type, so that we know how to interpret the
+ * data..
+ */
+ typedef struct PREDICATELOCKTARGETTAG
+ {
+ uint32 locktag_field1; /* a 32-bit ID field */
+ uint32 locktag_field2; /* a 32-bit ID field */
+ uint32 locktag_field3; /* a 32-bit ID field */
+ uint16 locktag_field4; /* a 16-bit ID field */
+ uint16 locktag_field5; /* a 16-bit ID field */
+ } PREDICATELOCKTARGETTAG;
+
+ /*
+ * The PREDICATELOCKTARGET struct represents a database object on which there
+ * are predicate locks.
+ *
+ * A hash list of these objects is maintained in shared memory. An entry is
+ * added when a predicate lock is requested on an object which doesn't
+ * already have one. An entry is removed when the last lock is removed from
+ * its list.
+ */
+ typedef struct PREDICATELOCKTARGET
+ {
+ /* hash key */
+ PREDICATELOCKTARGETTAG tag; /* unique identifier of lockable object */
+
+ /* data */
+ SHM_QUEUE predicateLocks; /* list of PREDICATELOCK objects assoc. with
+ * predicate lock target */
+ } PREDICATELOCKTARGET;
+
+
+ /*
+ * The PREDICATELOCKTAG struct identifies an individual predicate lock.
+ *
+ * It is the combination of predicate lock target (which is a lockable
+ * object) and a serializable transaction which has acquired a lock on that
+ * target.
+ */
+ typedef struct PREDICATELOCKTAG
+ {
+ PREDICATELOCKTARGET *myTarget;
+ SERIALIZABLEXACT *myXact;
+ } PREDICATELOCKTAG;
+
+ /*
+ * The PREDICATELOCK struct represents an individual lock.
+ *
+ * An entry can be created here when the related database object is read, or
+ * by promotion of multiple finer-grained targets. All entries related to a
+ * serializable transaction are removed when that serializable transaction is
+ * cleaned up. Entries can also be removed when they are combined into a
+ * single coarser-grained lock entry.
+ */
+ typedef struct PREDICATELOCK
+ {
+ /* hash key */
+ PREDICATELOCKTAG tag; /* unique identifier of lock */
+
+ /* data */
+ SHM_QUEUE targetLink; /* list link in PREDICATELOCKTARGET's list of
+ * predicate locks */
+ SHM_QUEUE xactLink; /* list link in SERIALIZABLEXACT's list of
+ * predicate locks */
+ } PREDICATELOCK;
+
+
+ /*
+ * The LOCALPREDICATELOCK struct represents a local copy of data which is
+ * also present in the PREDICATELOCK table, organized for fast access without
+ * needing to acquire a LWLock. It is strictly for optimization.
+ *
+ * Each serializable transaction creates its own local hash table to hold a
+ * collection of these. This information is used to determine when a number
+ * of fine-grained locks should be promoted to a single coarser-grained lock.
+ * The information is maintained more-or-less in parallel to the
+ * PREDICATELOCK data, but because this data is not protected by locks and is
+ * only used in an optimization heuristic, it is allowed to drift in a few
+ * corner cases where maintaining exact data would be expensive.
+ *
+ * The hash table is created when the serializable transaction acquires its
+ * snapshot, and its memory is released upon completion of the transaction.
+ */
+ typedef struct LOCALPREDICATELOCK
+ {
+ /* hash key */
+ PREDICATELOCKTARGETTAG tag; /* unique identifier of lockable object */
+
+ /* data */
+ bool held; /* is lock held, or just its children? */
+ int childLocks; /* number of child locks currently held */
+ } LOCALPREDICATELOCK;
+
+
+ /*
+ * The types of predicate locks which can be acquired.
+ */
+ typedef enum PredicateLockTargetType
+ {
+ PREDLOCKTAG_RELATION,
+ PREDLOCKTAG_PAGE,
+ PREDLOCKTAG_TUPLE
+ /* TODO SSI: Other types may be needed for index locking */
+ } PredicateLockTargetType;
+
+
+ /*
+ * This structure is used to quickly capture a copy of all predicate
+ * locks. This is currently used only by the pg_lock_status function,
+ * which in turn is used by the pg_locks view.
+ */
+ typedef struct PredicateLockData
+ {
+ int nelements;
+ PREDICATELOCKTARGETTAG *locktags;
+ SERIALIZABLEXACT *xacts;
+ } PredicateLockData;
+
+
+ /*
+ * These macros define how we map logical IDs of lockable objects into the
+ * physical fields of PREDICATELOCKTARGETTAG. Use these to set up values,
+ * rather than accessing the fields directly. Note multiple eval of target!
+ */
+ #define SET_PREDICATELOCKTARGETTAG_RELATION(locktag,dboid,reloid) \
+ ((locktag).locktag_field1 = (dboid), \
+ (locktag).locktag_field2 = (reloid), \
+ (locktag).locktag_field3 = InvalidBlockNumber, \
+ (locktag).locktag_field4 = InvalidOffsetNumber, \
+ (locktag).locktag_field5 = 0)
+
+ #define SET_PREDICATELOCKTARGETTAG_PAGE(locktag,dboid,reloid,blocknum) \
+ ((locktag).locktag_field1 = (dboid), \
+ (locktag).locktag_field2 = (reloid), \
+ (locktag).locktag_field3 = (blocknum), \
+ (locktag).locktag_field4 = InvalidOffsetNumber, \
+ (locktag).locktag_field5 = 0)
+
+ #define SET_PREDICATELOCKTARGETTAG_TUPLE(locktag,dboid,reloid,blocknum,offnum) \
+ ((locktag).locktag_field1 = (dboid), \
+ (locktag).locktag_field2 = (reloid), \
+ (locktag).locktag_field3 = (blocknum), \
+ (locktag).locktag_field4 = (offnum), \
+ (locktag).locktag_field5 = 0)
+
+ #define GET_PREDICATELOCKTARGETTAG_DB(locktag) \
+ ((locktag).locktag_field1)
+ #define GET_PREDICATELOCKTARGETTAG_RELATION(locktag) \
+ ((locktag).locktag_field2)
+ #define GET_PREDICATELOCKTARGETTAG_PAGE(locktag) \
+ ((locktag).locktag_field3)
+ #define GET_PREDICATELOCKTARGETTAG_OFFSET(locktag) \
+ ((locktag).locktag_field4)
+ #define GET_PREDICATELOCKTARGETTAG_TYPE(locktag) \
+ (((locktag).locktag_field4 != InvalidOffsetNumber) ? PREDLOCKTAG_TUPLE : \
+ (((locktag).locktag_field3 != InvalidBlockNumber) ? PREDLOCKTAG_PAGE : \
+ PREDLOCKTAG_RELATION))
+
+
+ /*
+ * Define a macro to use for an "empty" SERIALIZABLEXACT reference.
+ */
+ typedef SERIALIZABLEXACT *SERIALIZABLEXACTPtr;
+
+ #define InvalidSerializableXact ((SERIALIZABLEXACTPtr) NULL)
+
+
+ /*
+ * Function definitions for functions needing awareness of predicate
+ * locking internals.
+ */
+ extern PredicateLockData *GetPredicateLockStatusData(void);
+
+
+ #endif /* PREDICATE_INTERNALS_H */
*** a/src/include/storage/shmem.h
--- b/src/include/storage/shmem.h
***************
*** 67,74 **** extern void SHMQueueInit(SHM_QUEUE *queue);
extern void SHMQueueElemInit(SHM_QUEUE *queue);
extern void SHMQueueDelete(SHM_QUEUE *queue);
extern void SHMQueueInsertBefore(SHM_QUEUE *queue, SHM_QUEUE *elem);
! extern Pointer SHMQueueNext(SHM_QUEUE *queue, SHM_QUEUE *curElem,
Size linkOffset);
! extern bool SHMQueueEmpty(SHM_QUEUE *queue);
#endif /* SHMEM_H */
--- 67,75 ----
extern void SHMQueueElemInit(SHM_QUEUE *queue);
extern void SHMQueueDelete(SHM_QUEUE *queue);
extern void SHMQueueInsertBefore(SHM_QUEUE *queue, SHM_QUEUE *elem);
! extern Pointer SHMQueueNext(const SHM_QUEUE *queue, const SHM_QUEUE *curElem,
Size linkOffset);
! extern bool SHMQueueEmpty(const SHM_QUEUE *queue);
! extern bool SHMQueueIsDetached(const SHM_QUEUE *queue);
#endif /* SHMEM_H */
*** a/src/test/regress/GNUmakefile
--- b/src/test/regress/GNUmakefile
***************
*** 138,143 **** tablespace-setup:
--- 138,160 ----
##
+ ## Prepare for dtester tests
+ ##
+ pg_dtester.py: pg_dtester.py.in GNUmakefile $(top_builddir)/src/Makefile.global
+ sed -e 's,@bindir@,$(bindir),g' \
+ -e 's,@libdir@,$(libdir),g' \
+ -e 's,@pkglibdir@,$(pkglibdir),g' \
+ -e 's,@datadir@,$(datadir),g' \
+ -e 's/@VERSION@/$(VERSION)/g' \
+ -e 's/@host_tuple@/$(host_tuple)/g' \
+ -e 's,@GMAKE@,$(MAKE),g' \
+ -e 's/@enable_shared@/$(enable_shared)/g' \
+ -e 's/@GCC@/$(GCC)/g' \
+ $< >$@
+ chmod a+x $@
+
+
+ ##
## Run tests
##
***************
*** 155,160 **** installcheck-parallel: all tablespace-setup
--- 172,182 ----
standbycheck: all
$(pg_regress_call) --psqldir=$(PSQLDIR) --schedule=$(srcdir)/standby_schedule --use-existing
+ dcheck: pg_dtester.py
+ ./pg_dtester.py --temp-install --top-builddir=$(top_builddir) \
+ --multibyte=$(MULTIBYTE) $(MAXCONNOPT) $(NOLOCALE)
+
+
# old interfaces follow...
runcheck: check
*** /dev/null
--- b/src/test/regress/pg_dtester.py.in
***************
*** 0 ****
--- 1,1608 ----
+ #!/usr/bin/python
+
+ #-------------------------------------------------------------------------
+ #
+ # dtester.py.in
+ #
+ # Sample test suite running two concurrent transactions, showing
+ # off some capabilities of dtester.
+ #
+ # Copyright (c) 2006-2010, Markus Wanner
+ #
+ #-------------------------------------------------------------------------
+
+ import re, os, sys, getopt
+ from twisted.internet import defer, reactor
+ from twisted.python import failure
+
+ from dtester.events import EventMatcher, EventSource, Event, \
+ ProcessOutputEvent, ProcessErrorEvent, ProcessEndedEvent
+ from dtester.exceptions import TestAborted, TestFailure
+ from dtester.test import TestSuite, BaseTest, SyncTest
+ from dtester.reporter import StreamReporter, CursesReporter, TapReporter
+ from dtester.runner import Runner, Timeout
+
+ # ****** definition of tests and suites ***********************************
+
+ class InstallationSuite(TestSuite):
+
+ setUpDescription = "creating temporary installation"
+ tearDownDescription = "removing temporary installation"
+
+ needs = (('shell', "IShell or something"),)
+
+ def setUp(self):
+ # inherit getConfig from the shell
+ setattr(self, 'getConfig', self.shell.getConfig)
+ setattr(self, 'runCommand', self.shell.runCommand)
+ setattr(self, 'recursive_remove', self.shell.recursive_remove)
+
+ # (re) create an installation directory
+ self.pg_inst_dir = self.shell.getConfig('inst_dir')
+ if os.path.exists(self.pg_inst_dir):
+ self.shell.recursive_remove(self.pg_inst_dir)
+ os.mkdir(self.pg_inst_dir)
+
+ # install into that directory
+ proc = self.shell.runCommand('make', 'make',
+ args=['make', '-C', self.shell.getConfig('top-builddir'),
+ 'DESTDIR=%s' % self.pg_inst_dir, 'install',
+ 'with_perl=no', 'with_python=no'],
+ lineBasedOutput=True)
+
+ d = self.waitFor(proc, EventMatcher(ProcessEndedEvent))
+ d.addCallback(self.makeTerminated)
+ proc.start()
+
+ # FIXME: how to properly handle these?
+ self.shell.addEnvPath(self.shell.getConfig('bindir'))
+ self.shell.addEnvLibraryPath(self.shell.getConfig('libdir'))
+ return d
+
+ def makeTerminated(self, event):
+ if event.exitCode != 0:
+ raise Exception("Initdb returned %d" % event.exitCode)
+ else:
+ return True
+
+ def tearDown(self):
+ # The installation procedure should be able to simply override any
+ # formerly installed files, so we save the time to clean up the
+ # installation directory.
+ return
+
+
+ class InitdbSuite(TestSuite):
+
+ args = (('number', int), )
+ needs = (('shell', "IShell or something"),)
+
+ def setUpDescription(self):
+ return "initializing database system %d" % self.number
+
+ def tearDownDescription(self):
+ return "removing database system %d" % self.number
+
+ def getNumber(self):
+ return self.number
+
+ def getDir(self):
+ return self.dbdir
+
+ def setUp(self):
+ self.dbdir = "%s%d" % \
+ (self.shell.getConfig('pgdata_prefix'), self.number)
+ proc = self.shell.runCommand(
+ 'initdb-%d' % self.number,
+ 'initdb', args = [
+ 'initdb', '-D', self.dbdir,
+ '-A', 'trust', '--noclean'],
+ lineBasedOutput=True)
+
+ d = defer.Deferred()
+ proc.addHook(EventMatcher(ProcessEndedEvent),
+ self.initdb_terminated, d)
+ proc.start()
+ return d
+
+ def initdb_terminated(self, event, d):
+ if event.exitCode != 0:
+ d.errback(Exception("Initdb returned %d" % event.exitCode))
+ else:
+ d.callback(True)
+
+ def tearDown(self):
+ self.shell.recursive_remove(
+ "%s%d" % (self.shell.getConfig('pgdata_prefix'), self.number))
+
+
+ class PostmasterSuite(TestSuite):
+
+ needs = (('shell', "IShell or something"),
+ ('dbdir', "IDatabaseDir"),)
+
+ def setUpDescription(self):
+ return "starting database system %d" % self.dbdir.getNumber()
+
+ def tearDownDescription(self):
+ return "stopping database system %d" % self.dbdir.getNumber()
+
+ def getPort(self):
+ return self.port
+
+ def setUp(self):
+ setattr(self, 'getNumber', self.dbdir.getNumber)
+
+ self.port = self.shell.getConfig('temp-port') + self.dbdir.getNumber()
+
+ args = ['postmaster', '-d5',
+ '-D', self.dbdir.getDir(),
+ '-i', '-p', str(self.port)]
+ if self.shell.getConfig('enable_cassert'):
+ args += "-A1"
+
+ self.postmaster = self.shell.runCommand(
+ 'postmaster%d' % self.dbdir.getNumber(),
+ 'postmaster',
+ args = args,
+ lineBasedOutput=True)
+
+ d = defer.Deferred()
+ self.readyHook = \
+ self.postmaster.addHook(EventMatcher(ProcessErrorEvent,
+ "database system is ready to accept connections"),
+ self.postmaster_ready, d)
+
+ self.unexpectedTerminationHook = \
+ self.postmaster.addHook(EventMatcher(ProcessEndedEvent),
+ self.postmaster_terminated)
+ self.postmaster.start()
+ return d
+
+ def postmaster_ready(self, event, d):
+ # it's sufficient if we're called once
+ self.postmaster.removeHook(self.readyHook)
+ d.callback(None)
+
+ def postmaster_terminated(self, event):
+ exitCode = 'undef'
+ if hasattr(event, 'exitCode'):
+ exitCode = event.exitCode
+ elif hasattr(event, 'data'):
+ exitCode = repr(event.data)
+ self.abort("postmaster %d unexpectedly terminated (exit code %s)" % \
+ (self.dbdir.getNumber(), exitCode))
+
+ def tearDown(self):
+ self.postmaster.removeHook(self.unexpectedTerminationHook)
+ if not self.aborted:
+ d = defer.Deferred()
+ self.postmaster.addHook(EventMatcher(ProcessEndedEvent),
+ lambda event: d.callback(None))
+ self.postmaster.stop()
+ return d
+ else:
+ return True
+
+
+ class TestDatabaseSuite(TestSuite):
+
+ args = (('dbname', str),)
+ needs = (('shell', "IShell or something"),
+ ('pg', "IPostmaster"),)
+
+ def setUpDescription(self):
+ return "creating database %s at server %d" % \
+ (self.dbname, self.pg.getNumber())
+
+ def tearDownDescription(self):
+ return "not (!) dropping database %s at server %d" % \
+ (self.dbname, self.pg.getNumber())
+
+ def getDbname(self):
+ return self.dbname
+
+ def setUp(self):
+ setattr(self, "getPort", self.pg.getPort)
+ setattr(self, "getNumber", self.pg.getNumber)
+
+ self.proc = self.shell.runCommand(
+ 'createdb%d' % self.pg.getNumber(),
+ 'createdb',
+ args = ['createdb',
+ '-p', str(self.getPort()), self.dbname],
+ lineBasedOutput=True)
+
+ d = defer.Deferred()
+ self.proc.addHook(EventMatcher(ProcessEndedEvent),
+ self.createdb_terminated, d)
+ self.proc.start()
+ return d
+
+ def createdb_terminated(self, event, d):
+ if event.exitCode != 0:
+ d.errback(Exception("createdb terminated with code %d" % \
+ event.exitCode))
+ else:
+ d.callback(None)
+
+ def tearDown(self):
+ if self.pg.aborted:
+ return True
+
+ # Hm.. this interferes with the postmaster suites, which need
+ # to be started and stopped several times on top of a test database,
+ # however, creating and dropping it certainly depends on a running
+ # postmaster. Not sure how to solve this, at the moment I'm just
+ # skipping cleanup, i.e. dropdb.
+ return True
+
+ self.proc = self.shell.runCommand(
+ 'dropdb%d' % self.pg.getNumber(),
+ 'dropdb',
+ args = ['dropdb',
+ '-p', str(self.getPort()), self.dbname],
+ lineBasedOutput=True)
+
+ d = defer.Deferred()
+ self.proc.addHook(EventMatcher(ProcessEndedEvent),
+ self.dropdb_terminated, d)
+ self.proc.start()
+ return d
+
+ def dropdb_terminated(self, event, d):
+ if event.exitCode != 0:
+ d.errback(Exception("dropdb returned with %d" % \
+ event.exitCode))
+ else:
+ d.callback(None)
+
+
+ class SqlConnectionSuite(TestSuite):
+
+ args = (('dbname', str),)
+ needs = (('shell', "IShell or something"),
+ ('db', "IPostmaster"))
+
+ def setUpDescription(self):
+ return "connecting to database %s at server %d" % \
+ (self.dbname, self.db.getNumber())
+ def tearDownDescription(self):
+ return "disconnecting from database %s at server %d" % \
+ (self.dbname, self.db.getNumber())
+
+ def getDbname(self):
+ return self.dbname
+
+ def setUp(self):
+ self.psql = self.shell.runCommand(
+ 'psql%d' % self.db.getNumber(),
+ 'psql',
+ args=['psql', '-AEn',
+ '--pset=pager=off', '--pset=columns=0',
+ '-p', str(self.db.getPort()),
+ self.dbname])
+
+ # initialize the output buffer and attach a first output collector
+ # *before* the process is started.
+ self.output_buffer = ""
+ d = defer.Deferred()
+ self.outputCollectorDeferred = d
+ self.outputCollectorHook = self.psql.addHook(
+ EventMatcher(ProcessOutputEvent), self.outputCollector,
+ None, d)
+
+ # Mark as being in used, until we get to the commandline
+ self.inUse = True
+ self.workQueue = []
+
+ # also add a termination hook
+ self.unexpectedTerminationHook = self.psql.addHook(
+ EventMatcher(ProcessEndedEvent), self.psql_terminated)
+
+ # then schedule start of the psql process and return the deferred
+ # *before* starting the process.
+ reactor.callLater(0.0, self.psql.start)
+ return d
+
+ def psql_terminated(self, event):
+ exitCode = "undef"
+ if hasattr(event, 'exitCode'):
+ exitCode = event.exitCode
+ elif hasattr(event, 'data'):
+ exitCode = repr(event.data)
+
+ # If there's an outputCollectorHook, the abort method won't catch
+ # and we have to wait for the timeout to trigger, instead of
+ # acting on process termination. We thus save the outputCollector
+ # deferred and send it an errback with the failure.
+ if self.outputCollectorHook:
+ self.outputCollectorDeferred.errback( \
+ TestAborted("psql to server %d unexpectedly terminated (exit code %s)" % ( \
+ self.db.getNumber(), exitCode)))
+ self.abort(
+ "psql to server %d unexpectedly terminated (exit code %s)" % ( \
+ self.db.getNumber(), exitCode))
+
+ def tearDown(self):
+ self.psql.removeHook(self.unexpectedTerminationHook)
+
+ d = defer.Deferred()
+ self.psql.addHook(EventMatcher(ProcessEndedEvent),
+ lambda event: d.callback(None))
+ reactor.callLater(0.0, self.psql.write, "\\q\n")
+ reactor.callLater(5.0, self.psql.stop)
+ return d
+
+ def outputCollector(self, event, query, d):
+ self.output_buffer += event.data
+
+ cmdprompt = self.dbname + '=#'
+ cpos = self.output_buffer.find(cmdprompt)
+
+ if cpos >= 0:
+ self.psql.removeHook(self.outputCollectorHook)
+ self.outputCollectorHook = False
+ result = self.output_buffer[:cpos]
+ self.output_buffer = self.output_buffer[cpos + len(cmdprompt):]
+ if len(self.output_buffer) > 0 and self.output_buffer != ' ':
+ print "rest: %s" % repr(self.output_buffer)
+ if d:
+ # remove the command prompt at the end
+ result = result[:cpos]
+
+ if query:
+ # remove the query string at the beginning
+ query_len = len(query)
+ if result[:query_len] != query:
+ raise Exception("Query not found at beginning of psql answer.")
+
+ result = result[query_len:]
+ while (len(result) > 1) and (result[0] in ("\n", "\r", " ")):
+ result = result[1:]
+ reactor.callLater(0.0, d.callback, result)
+
+ self.inUse = False
+ if len(self.workQueue) > 0:
+ assert not self.inUse
+ job = self.workQueue.pop()
+ d1 = job['method'](*job['args'])
+ d1.chainDeferred(job['deferred'])
+
+ def query(self, query):
+ if self.inUse:
+ d = defer.Deferred()
+ self.workQueue.append({'deferred': d,
+ 'method': self.query,
+ 'args': (query,)})
+ return d
+
+ assert not self.inUse
+ assert not self.outputCollectorHook
+
+ self.inUse = True
+ self.output_buffer = ""
+ d = defer.Deferred()
+ self.outputCollectorHook = self.psql.addHook(
+ EventMatcher(ProcessOutputEvent), self.outputCollector, query, d)
+ d.addCallback(self.parseQueryResult)
+
+ # defer writing to the process, so that the caller has the
+ # opportunity to add callbacks to the deferred we return.
+ reactor.callLater(0.0, self.psql.write, query + "\n")
+
+ return d
+
+ def parseQueryResult(self, result):
+ rawlines = result.split('\n')
+
+ lines = []
+ for line in rawlines:
+ line = line.strip()
+ if line.startswith("ROLLBACK"):
+ raise Exception("transaction rolled back (%s)" % query)
+ if line.startswith("message type"):
+ raise Exception("protocol error: %s" % line)
+ if len(line) > 0 and not line.startswith("NOTICE:") \
+ and not line.startswith("ROLLBACK"):
+ lines.append(line)
+
+ try:
+ assert len(lines) >= 2
+
+ lines = map(lambda x: x.strip(), lines)
+ headLine = lines[0]
+ tailLine = lines[-1]
+
+ fields = headLine.split('|')
+ rows = []
+ for row in lines[1:-1]:
+ attrs = row.split('|')
+ assert len(attrs) == len(fields)
+ x = {}
+ for i in range(len(attrs)):
+ x[fields[i]] = attrs[i].strip()
+ rows.append(x)
+
+ x = re.compile("\((\d+) rows?\)").search(tailLine)
+ if x:
+ if not int(x.group(1)) == len(rows):
+ raise Exception("number of rows doesn't match: %s vs %d for: '%s'" % (
+ x.group(1), len(rows), lines))
+ else:
+ raise Exception("final number of rows line doesn't match.\n------------\n%s\n---------------\n" % lines)
+ return rows
+ except Exception, e:
+ import traceback
+ print "error parsing query result: %s" % e
+ traceback.print_exc()
+ raise e
+ # return []
+
+ def operation(self, query, expResult=None):
+ if self.inUse:
+ d = defer.Deferred()
+ self.workQueue.append({'deferred': d,
+ 'method': self.operation,
+ 'args': (query, expResult)})
+ return d
+
+ assert not self.inUse
+ assert not self.outputCollectorHook
+
+ self.inUse = True
+ self.output_buffer = ""
+ d = defer.Deferred()
+ self.outputCollectorDeferred = d
+ self.outputCollectorHook = self.psql.addHook(
+ EventMatcher(ProcessOutputEvent), self.outputCollector, query, d)
+ d.addCallback(self.checkQueryResult, query, expResult)
+
+ # defer writing to the process, so that the caller has the
+ # opportunity to add callbacks to the deferred we return.
+ reactor.callLater(0.0, self.psql.write, query + "\n")
+
+ return d
+
+ def checkQueryResult(self, result, query, expResult):
+ lines = []
+ for line in result.split("\n"):
+ line = line.strip()
+ if len(line) > 0 and not line.startswith("WARNING:") \
+ and not line.startswith("NOTICE:"):
+ lines.append(line)
+ lines = "\n".join(lines)
+ if expResult:
+ if isinstance(expResult, str):
+ self.assertEqual(expResult, lines,
+ "didn't get expected result for query '%s'" % query)
+ elif isinstance(expResult, list):
+ if not lines in expResult:
+ raise TestFailure("didn't get expected result",
+ "no result matches, got:\n%s\nfor query: '%s'\n" % (lines, query))
+ return lines
+
+
+ class TestDatabaseConnection(BaseTest):
+
+ needs = (('conn', "ISqlConnection"),)
+
+ description = "database connection"
+
+ def run(self):
+ return self.conn.query("SELECT 1 AS test;")
+
+
+ # FIXME: that's not actually a test, but it modifies the database state
+ class PopulateTestDatabase(BaseTest):
+
+ needs = (('conn', "ISqlConnection"),)
+
+ description = "populate test database"
+
+ def run(self):
+ conn = self.conn
+
+ # Create a test table for use in TestConcurrentUpdates and fill it
+ # with two test tuples.
+ d = conn.operation("CREATE TABLE test (i int PRIMARY KEY, t text);",
+ "CREATE TABLE")
+ d.addCallback(lambda x: conn.operation(
+ "INSERT INTO test VALUES (5, 'apple');",
+ "INSERT 0 1"))
+ d.addCallback(lambda x: conn.operation(
+ "INSERT INTO test VALUES (7, 'pear');",
+ "INSERT 0 1"))
+ d.addCallback(lambda x: conn.operation(
+ "INSERT INTO test VALUES (11, 'banana');",
+ "INSERT 0 1"))
+ return d
+
+
+ class PermutationTest(SyncTest):
+ """ Abstract class for testing a set of steps in all permutations of execution order.
+ This counts as a single test, although a subclass may accumulate counts which may be of
+ interest, and should therefore be shown regardless of success or failure of the test.
+ """
+
+ # stepDictionary maps a step ID to a function to run for that step.
+ stepDictionary = {}
+
+ # stepThreading is a list of lists.
+ # All permutations of interleaving of steps from the sublists will be generated.
+ # Steps from within each sublist are kept in order; only the interleaving is variable.
+ stepThreading = [[]]
+
+ # Override this to provide any per-iteration (permutation) setup.
+ def setUpIteration(self, stepIdList):
+ pass
+
+ # Override this to provide any per-iteration (permutation) teardown.
+ def tearDownIteration(self, stepIdList):
+ pass
+
+ def runIterationStep(self, stepId):
+ p = self.stepDictionary[stepId]
+ p()
+
+ def runIterationSteps(self, stepIdList):
+ try:
+ self.setUpIteration(stepIdList)
+ for stepId in stepIdList:
+ self.runIterationStep(stepId)
+ finally:
+ self.tearDownIteration(stepIdList)
+
+ def runPermutations(self, a):
+ self.runPermutations_recurse([], a)
+
+ def runPermutations_recurse(self, p, a):
+ found = False
+ for i in range(len(a)):
+ if len(a[i]) > 0:
+ found = True
+ r = p[:]
+ b = a[:]
+ r.append(b[i][0])
+ b[i] = b[i][1:]
+ self.runPermutations_recurse(r, b)
+ if not found:
+ self.runIterationSteps(p)
+
+ # If the dictionary is set up in this method, there can be references
+ # to class methods and fields.
+ def populateStepDictionary(self):
+ pass
+
+ def run(self):
+ self.populateStepDictionary()
+ self.runPermutations(self.stepThreading)
+
+
+ class DummyPermutationTest(PermutationTest):
+ """ Simple test of the PermutationTest abstract class.
+ """
+
+ description = "simple test of the PermutationTest abstract class"
+
+ stepThreading = [['r1x','c1'],['r2x','c2']]
+
+ def setUpIteration(self, stepIdList):
+ print stepIdList
+
+ def tearDownIteration(self, stepIdList):
+ print
+
+ def printStepId(self, stepId):
+ print stepId,
+
+ def populateStepDictionary(self):
+ self.stepDictionary = {
+ 'r1x': lambda : self.printStepId('r1x'),
+ 'c1': lambda : self.printStepId('c1'),
+ 'r2x': lambda : self.printStepId('r2x'),
+ 'c2': lambda : self.printStepId('c2')
+ }
+
+
+ class DatabasePermutationTest(PermutationTest):
+ """ Abstract class to provide framework for using an IterativeTest for database queries.
+ """
+
+ commitRequiredCount = 0
+ commitRequiredOK = 0
+ rollbackRequiredCount = 0
+ rollbackRequiredOK = 0
+ commitPreferredCount = 0
+ commitPreferredOK = 0
+
+ serializationFailure = False
+
+ def commitRequired(self, stepIdList):
+ return True
+
+ def rollbackRequired(self, stepIdList):
+ return False
+
+ def countProgress(self, stepIdList):
+ if self.rollbackRequired(stepIdList):
+ self.rollbackRequiredCount += 1
+ if self.serializationFailure:
+ self.rollbackRequiredOK += 1
+ else:
+ if self.commitRequired(stepIdList):
+ self.commitRequiredCount += 1
+ if not self.serializationFailure:
+ self.commitRequiredOK += 1
+ else:
+ self.commitPreferredCount += 1
+ if not self.serializationFailure:
+ self.commitPreferredOK += 1
+
+ def runIterationSteps(self, stepIdList):
+ try:
+ self.setUpIteration(stepIdList)
+ for stepId in stepIdList:
+ self.runIterationStep(stepId)
+ self.countProgress(stepIdList)
+ finally:
+ self.tearDownIteration(stepIdList)
+
+ def tryOperation(self, conn, sql):
+ result = self.syncCall(10, conn.operation, sql),
+ for line in result:
+ if len(line) > 0 and line.startswith("ERROR: could not serialize"):
+ self.serializationFailure = True
+ else:
+ if (len(line) > 0
+ and line.startswith("ERROR:")
+ and not line.startswith("ERROR: current transaction is aborted")):
+ raise TestFailure("failure other than serializable encountered: " + line, line)
+
+ def printStatistics(self):
+ print '# rollback required: ', self.rollbackRequiredOK, '/', self.rollbackRequiredCount
+ print '# commit required: ', self.commitRequiredOK, '/', self.commitRequiredCount
+ print '# commit preferred: ', self.commitPreferredOK, '/', self.commitPreferredCount
+
+ def run(self):
+ self.populateStepDictionary()
+ self.runPermutations(self.stepThreading)
+ self.printStatistics()
+ if self.rollbackRequiredOK < self.rollbackRequiredCount:
+ raise TestFailure("serialization anomalies incorrectly allowed",
+ "Database integrity not protected.")
+ if self.commitRequiredOK < self.commitRequiredCount:
+ raise TestFailure("serialization failure occurred when it should not have",
+ "Transactions we thought we knew how to recognize as safe resulted in a rollback..")
+
+ def printStepResults(self, stepIdList):
+ print stepIdList,
+ if self.serializationFailure:
+ if self.commitRequired(stepIdList):
+ print 'rolled back ??'
+ else:
+ if not self.rollbackRequired(stepIdList):
+ print 'rolled back ?'
+ else:
+ print 'rolled back'
+ else:
+ if self.rollbackRequired(stepIdList):
+ print 'committed ***'
+ else:
+ print 'committed'
+
+
+ class SimpleWriteSkewTest(DatabasePermutationTest):
+ """ Write skew test.
+ This test has two serializable transactions: one which updates all
+ 'apple' rows to 'pear' and one which updates all 'pear' rows to
+ 'apple'. If these were serialized (run one at a time) either
+ value could be present, but not both. One must be rolled back to
+ prevent the write skew anomaly.
+ """
+
+ needs = (('conn1', 'ISqlConnection'),
+ ('conn2', 'ISqlConnection'))
+
+ description = "write skew test"
+
+ stepThreading = [['rwx1','c1'],['rwx2','c2']]
+
+ def populateStepDictionary(self):
+ self.stepDictionary = {
+ 'rwx1': lambda : self.tryOperation(self.conn1, "UPDATE test SET t = 'apple' WHERE t = 'pear';"),
+ 'c1': lambda : self.tryOperation(self.conn1, "COMMIT;"),
+ 'rwx2': lambda : self.tryOperation(self.conn2, "UPDATE test SET t = 'pear' WHERE t = 'apple';"),
+ 'c2': lambda : self.tryOperation(self.conn2, "COMMIT;")
+ }
+
+ def setUpIteration(self, stepIdList):
+ self.serializationFailure = False
+ self.syncCall(10, self.conn1.operation, "UPDATE test SET t = 'apple' WHERE i = 5;", "UPDATE 1")
+ self.syncCall(10, self.conn1.operation, "UPDATE test SET t = 'pear' WHERE i = 7;", "UPDATE 1")
+ self.syncCall(10, self.conn1.operation, "BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;", "BEGIN")
+ self.syncCall(10, self.conn2.operation, "BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;", "BEGIN")
+
+ def tearDownIteration(self, stepIdList):
+ self.syncCall(10, self.conn1.operation, "ROLLBACK;")
+ self.syncCall(10, self.conn2.operation, "ROLLBACK;")
+ self.printStepResults(stepIdList)
+
+ def commitRequired(self, stepIdList):
+ return (stepIdList.index('c1') < stepIdList.index('rwx2')
+ or stepIdList.index('c2') < stepIdList.index('rwx1'))
+
+ def rollbackRequired(self, stepIdList):
+ return not self.commitRequired(stepIdList)
+
+
+ class ReceiptReportTest(DatabasePermutationTest):
+ """ Daily Report of Receipts test.
+ This test doesn't persist a bad state in the database; rather, it
+ provides a view of the data which is not consistent with any
+ order of execution of the serializable transactions. It
+ demonstrates a situation where the deposit date for receipts could
+ be changed and a report of the closed day's receipts subsequently
+ run which will miss a receipt from the date which has been closed.
+ """
+
+ needs = (('conn1', 'ISqlConnection'),
+ ('conn2', 'ISqlConnection'),
+ ('conn3', 'ISqlConnection'))
+
+ description = "daily report of receipts test"
+
+ stepThreading = [['rxwy1','c1'],['wx2','c2'],['rx3','ry3','c3']]
+
+ def populateStepDictionary(self):
+ self.stepDictionary = {
+ 'rxwy1': lambda : self.tryOperation(self.conn1, "INSERT INTO receipt VALUES (3, (SELECT deposit_date FROM ctl WHERE k = 'receipt'), 4.00);"),
+ 'c1': lambda : self.tryOperation(self.conn1, "COMMIT;"),
+ 'wx2': lambda : self.tryOperation(self.conn2, "UPDATE ctl SET deposit_date = DATE '2008-12-23' WHERE k = 'receipt';"),
+ 'c2': lambda : self.tryOperation(self.conn2, "COMMIT;"),
+ 'rx3': lambda : self.tryOperation(self.conn3, "SELECT * FROM ctl WHERE k = 'receipt';"),
+ 'ry3': lambda : self.tryOperation(self.conn3, "SELECT * FROM receipt WHERE deposit_date = DATE '2008-12-22';"),
+ 'c3': lambda : self.tryOperation(self.conn3, "COMMIT;")
+ }
+
+ def setUpIteration(self, stepIdList):
+ self.serializationFailure = False
+ self.syncCall(10, self.conn1.operation, "DROP TABLE IF EXISTS ctl, receipt;")
+ self.syncCall(10, self.conn1.operation, "CREATE TABLE ctl (k text NOT NULL PRIMARY KEY, deposit_date date NOT NULL);")
+ self.syncCall(10, self.conn1.operation, "INSERT INTO ctl VALUES ('receipt', DATE '2008-12-22');")
+ self.syncCall(10, self.conn1.operation, "CREATE TABLE receipt (receipt_no int NOT NULL PRIMARY KEY, deposit_date date NOT NULL, amount numeric(13,2));")
+ self.syncCall(10, self.conn1.operation, "INSERT INTO receipt VALUES (1, (SELECT deposit_date FROM ctl WHERE k = 'receipt'), 1.00);")
+ self.syncCall(10, self.conn1.operation, "INSERT INTO receipt VALUES (2, (SELECT deposit_date FROM ctl WHERE k = 'receipt'), 2.00);")
+ self.syncCall(10, self.conn1.operation, "BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;", "BEGIN")
+ self.syncCall(10, self.conn2.operation, "BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;", "BEGIN")
+ self.syncCall(10, self.conn3.operation, "BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY;", "BEGIN")
+
+ def tearDownIteration(self, stepIdList):
+ self.syncCall(10, self.conn1.operation, "ROLLBACK;")
+ self.syncCall(10, self.conn2.operation, "ROLLBACK;")
+ self.syncCall(10, self.conn3.operation, "ROLLBACK;")
+ self.printStepResults(stepIdList)
+
+ def commitRequired(self, stepIdList):
+ return ( (stepIdList.index('c1') < stepIdList.index('wx2')
+ and stepIdList.index('c1') < stepIdList.index('rx3'))
+ or (stepIdList.index('c2') < stepIdList.index('rxwy1')
+ and stepIdList.index('c2') < stepIdList.index('rx3'))
+ or (stepIdList.index('c3') < stepIdList.index('rxwy1')
+ and stepIdList.index('c3') < stepIdList.index('wx2'))
+ or (stepIdList.index('c2') < stepIdList.index('rxwy1')
+ and stepIdList.index('c3') < stepIdList.index('rxwy1'))
+ or (stepIdList.index('c1') < stepIdList.index('wx2')
+ and stepIdList.index('c3') < stepIdList.index('wx2'))
+ or (stepIdList.index('c1') < stepIdList.index('rx3')
+ and stepIdList.index('c2') < stepIdList.index('rx3')))
+
+ def rollbackRequired(self, stepIdList):
+ return ((stepIdList.index('c2') < stepIdList.index('c1')
+ and stepIdList.index('c2') < stepIdList.index('c3')
+ and stepIdList.index('rxwy1') < stepIdList.index('c2')
+ and stepIdList.index('rx3') < stepIdList.index('c1')
+ #############################################################
+ # The following test excludes some rows from rollback
+ # required for which we know our current SSI algorithm
+ # requires a rollback, but which don't, in fact, cause
+ # any anomaly. If we determine that we can allow pivots
+ # in which conflictIn and conflictOut are separate and
+ # overlapping transactions, these can be committed.
+ # To include these permutations in the "rollback required"
+ # count, comment out the following line.
+ and stepIdList.index('c2') < stepIdList.index('rx3')
+ #############################################################
+ )
+
+ #############################################################
+ # An anomaly can't actually occur based on the following
+ # "or" clause, but we know that our algorithm can't
+ # currently detect that, because T2's conflictIn is set
+ # to a self-reference because of multiple conflicts.
+ # To count these in the "rollback required" list, uncomment
+ # this section; otherwise they are "commit preferred"..
+ # or (stepIdList.index('rxwy1') < stepIdList.index('c1')
+ # and stepIdList.index('rxwy1') < stepIdList.index('c2')
+ # and stepIdList.index('rxwy1') < stepIdList.index('c3')
+ # and stepIdList.index('wx2') < stepIdList.index('c1')
+ # and stepIdList.index('wx2') < stepIdList.index('c2')
+ # and stepIdList.index('wx2') < stepIdList.index('c3')
+ # and stepIdList.index('rx3') < stepIdList.index('c1')
+ # and stepIdList.index('rx3') < stepIdList.index('c2')
+ # and stepIdList.index('rx3') < stepIdList.index('c3')
+ # )
+ #############################################################
+ )
+
+
+ class TemporalRangeIntegrityTest(DatabasePermutationTest):
+ """ Temporal range integrity test.
+ Snapshot integrity fails with simple referential integrity tests,
+ but those don't make for good demonstrations because people just
+ say that foreign key definitions should be used instead. There
+ are many integrity tests which are conceptually very similar but
+ don't have built-in support which will fail when used in triggers.
+ This is intended to illustrate such cases. It is obviously very
+ hard to exercise all these permutations when the code is actually
+ in a trigger; this test pulls what would normally be inside of
+ triggers out to the top level to control the permutations.
+ """
+
+ needs = (('conn1', 'ISqlConnection'),
+ ('conn2', 'ISqlConnection'))
+
+ description = "temporal range integrity test"
+
+ stepThreading = [['rx1','wy1','c1'],['ry2','wx2','c2']]
+
+ def populateStepDictionary(self):
+ self.stepDictionary = {
+ 'rx1': lambda : self.tryOperation(self.conn1, "SELECT count(*) FROM statute WHERE statute_cite = '123.45(1)a' AND eff_date <= DATE '2009-05-15' AND (exp_date IS NULL OR exp_date > DATE '2009-05-15');"),
+ 'wy1': lambda : self.tryOperation(self.conn1, "INSERT INTO offense VALUES (1, '123.45(1)a', DATE '2009-05-15');"),
+ 'c1': lambda : self.tryOperation(self.conn1, "COMMIT;"),
+ 'ry2': lambda : self.tryOperation(self.conn2, "SELECT count(*) FROM offense WHERE statute_cite = '123.45(1)a' AND offense_date >= DATE '2008-01-01';"),
+ 'wx2': lambda : self.tryOperation(self.conn2, "DELETE FROM statute WHERE statute_cite = '123.45(1)a' AND eff_date = DATE '2008-01-01';"),
+ 'c2': lambda : self.tryOperation(self.conn2, "COMMIT;")
+ }
+
+ def setUpIteration(self, stepIdList):
+ self.serializationFailure = False
+ self.syncCall(10, self.conn1.operation, "DROP TABLE IF EXISTS statute, offense;", "DROP TABLE")
+ self.syncCall(10, self.conn1.operation, "CREATE TABLE statute (statute_cite text NOT NULL, eff_date date NOT NULL, exp_date date, CONSTRAINT statute_pkey PRIMARY KEY (statute_cite, eff_date));", "CREATE TABLE")
+ self.syncCall(10, self.conn1.operation, "INSERT INTO statute VALUES ('123.45(1)a', DATE '2008-01-01', NULL);", "INSERT 0 1")
+ self.syncCall(10, self.conn1.operation, "CREATE TABLE offense (offense_no int NOT NULL, statute_cite text NOT NULL, offense_date date NOT NULL, CONSTRAINT offense_pkey PRIMARY KEY (offense_no));", "CREATE TABLE")
+ self.syncCall(10, self.conn1.operation, "BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;", "BEGIN")
+ self.syncCall(10, self.conn2.operation, "BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;", "BEGIN")
+
+ def tearDownIteration(self, stepIdList):
+ self.syncCall(10, self.conn1.operation, "ROLLBACK;")
+ self.syncCall(10, self.conn2.operation, "ROLLBACK;")
+ self.printStepResults(stepIdList)
+
+ def commitRequired(self, stepIdList):
+ return ( stepIdList.index('c1') < stepIdList.index('ry2')
+ or stepIdList.index('c2') < stepIdList.index('rx1'))
+
+ def rollbackRequired(self, stepIdList):
+ return not self.commitRequired(stepIdList)
+
+
+ class ProjectManagerTest(DatabasePermutationTest):
+ """ Project manager test.
+ Ensure that the person who is on the project as a manager
+ is flagged as a project manager in the person table.
+ """
+
+ needs = (('conn1', 'ISqlConnection'),
+ ('conn2', 'ISqlConnection'))
+
+ description = "project manager test"
+
+ stepThreading = [['rx1','wy1','c1'],['ry2','wx2','c2']]
+
+ def populateStepDictionary(self):
+ self.stepDictionary = {
+ 'rx1': lambda : self.tryOperation(self.conn1, "SELECT count(*) FROM person WHERE person_id = 1 AND is_project_manager;"),
+ 'wy1': lambda : self.tryOperation(self.conn1, "INSERT INTO project VALUES (101, 'Build Great Wall', 1);"),
+ 'c1': lambda : self.tryOperation(self.conn1, "COMMIT;"),
+ 'ry2': lambda : self.tryOperation(self.conn2, "SELECT count(*) FROM project WHERE project_manager = 1;"),
+ 'wx2': lambda : self.tryOperation(self.conn2, "UPDATE person SET is_project_manager = false WHERE person_id = 1;"),
+ 'c2': lambda : self.tryOperation(self.conn2, "COMMIT;")
+ }
+
+ def setUpIteration(self, stepIdList):
+ self.serializationFailure = False
+ self.syncCall(10, self.conn1.operation, "DROP TABLE IF EXISTS person, project;", "DROP TABLE")
+ self.syncCall(10, self.conn1.operation, "CREATE TABLE person (person_id int NOT NULL PRIMARY KEY, name text NOT NULL, is_project_manager bool NOT NULL);", "CREATE TABLE")
+ self.syncCall(10, self.conn1.operation, "INSERT INTO person VALUES (1, 'Robert Haas', true);", "INSERT 0 1")
+ self.syncCall(10, self.conn1.operation, "CREATE TABLE project (project_no int NOT NULL PRIMARY KEY, description text NOT NULL, project_manager int NOT NULL);", "CREATE TABLE")
+ self.syncCall(10, self.conn1.operation, "BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;", "BEGIN")
+ self.syncCall(10, self.conn2.operation, "BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;", "BEGIN")
+
+ def tearDownIteration(self, stepIdList):
+ self.syncCall(10, self.conn1.operation, "ROLLBACK;")
+ self.syncCall(10, self.conn2.operation, "ROLLBACK;")
+ self.printStepResults(stepIdList)
+
+ def commitRequired(self, stepIdList):
+ return ( stepIdList.index('c1') < stepIdList.index('ry2')
+ or stepIdList.index('c2') < stepIdList.index('rx1'))
+
+ def rollbackRequired(self, stepIdList):
+ return not self.commitRequired(stepIdList)
+
+
+ class ClassroomSchedulingTest(DatabasePermutationTest):
+ """ Classroom scheduling test.
+ Ensure that the classroom is not scheduled more than once
+ for any moment in time.
+ """
+
+ needs = (('conn1', 'ISqlConnection'),
+ ('conn2', 'ISqlConnection'))
+
+ description = "classroom scheduling test"
+
+ stepThreading = [['rx1','wy1','c1'],['ry2','wx2','c2']]
+
+ def populateStepDictionary(self):
+ self.stepDictionary = {
+ 'rx1': lambda : self.tryOperation(self.conn1, "SELECT count(*) FROM room_reservation WHERE room_id = '101' AND start_time < TIMESTAMP WITH TIME ZONE '2010-04-01 14:00' AND end_time > TIMESTAMP WITH TIME ZONE '2010-04-01 13:00';"),
+ 'wy1': lambda : self.tryOperation(self.conn1, "INSERT INTO room_reservation VALUES ('101', TIMESTAMP WITH TIME ZONE '2010-04-01 13:00', TIMESTAMP WITH TIME ZONE '2010-04-01 14:00', 'Carol');"),
+ 'c1': lambda : self.tryOperation(self.conn1, "COMMIT;"),
+ 'ry2': lambda : self.tryOperation(self.conn2, "SELECT count(*) FROM room_reservation WHERE room_id = '101' AND start_time < TIMESTAMP WITH TIME ZONE '2010-04-01 14:30' AND end_time > TIMESTAMP WITH TIME ZONE '2010-04-01 13:30';"),
+ 'wx2': lambda : self.tryOperation(self.conn2, "UPDATE room_reservation SET start_time = TIMESTAMP WITH TIME ZONE '2010-04-01 13:30', end_time = TIMESTAMP WITH TIME ZONE '2010-04-01 14:30' WHERE room_id = '101' AND start_time = TIMESTAMP WITH TIME ZONE '2010-04-01 10:00';"),
+ 'c2': lambda : self.tryOperation(self.conn2, "COMMIT;")
+ }
+
+ def setUpIteration(self, stepIdList):
+ self.serializationFailure = False
+ self.syncCall(10, self.conn1.operation, "DROP TABLE IF EXISTS room_reservation;", "DROP TABLE")
+ self.syncCall(10, self.conn1.operation, "CREATE TABLE room_reservation (room_id text NOT NULL, start_time timestamp with time zone NOT NULL, end_time timestamp with time zone NOT NULL, description text NOT NULL, CONSTRAINT room_reservation_pkey PRIMARY KEY (room_id, start_time));", "CREATE TABLE")
+ self.syncCall(10, self.conn1.operation, "INSERT INTO room_reservation VALUES ('101', TIMESTAMP WITH TIME ZONE '2010-04-01 10:00', TIMESTAMP WITH TIME ZONE '2010-04-01 11:00', 'Bob');", "INSERT 0 1")
+ self.syncCall(10, self.conn1.operation, "BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;", "BEGIN")
+ self.syncCall(10, self.conn2.operation, "BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;", "BEGIN")
+
+ def tearDownIteration(self, stepIdList):
+ self.syncCall(10, self.conn1.operation, "ROLLBACK;")
+ self.syncCall(10, self.conn2.operation, "ROLLBACK;")
+ self.printStepResults(stepIdList)
+
+ def commitRequired(self, stepIdList):
+ return ( stepIdList.index('c1') < stepIdList.index('ry2')
+ or stepIdList.index('c2') < stepIdList.index('rx1'))
+
+ def rollbackRequired(self, stepIdList):
+ return not self.commitRequired(stepIdList)
+
+
+ class TotalCashTest(DatabasePermutationTest):
+ """ Total cash test.
+ Another famous test of snapshot isolation anomaly.
+ """
+
+ needs = (('conn1', 'ISqlConnection'),
+ ('conn2', 'ISqlConnection'))
+
+ description = "total cash test"
+
+ stepThreading = [['wx1','rxy1','c1'],['wy2','rxy2','c2']]
+
+ def populateStepDictionary(self):
+ self.stepDictionary = {
+ 'wx1': lambda : self.tryOperation(self.conn1, "UPDATE accounts SET balance = balance - 200 WHERE accountid = 'checking';"),
+ 'rxy1': lambda : self.tryOperation(self.conn1, "SELECT SUM(balance) FROM accounts;"),
+ 'c1': lambda : self.tryOperation(self.conn1, "COMMIT;"),
+ 'wy2': lambda : self.tryOperation(self.conn2, "UPDATE accounts SET balance = balance - 200 WHERE accountid = 'savings';"),
+ 'rxy2': lambda : self.tryOperation(self.conn2, "SELECT SUM(balance) FROM accounts;"),
+ 'c2': lambda : self.tryOperation(self.conn2, "COMMIT;")
+ }
+
+ def setUpIteration(self, stepIdList):
+ self.serializationFailure = False
+ self.syncCall(10, self.conn1.operation, "DROP TABLE IF EXISTS accounts;", "DROP TABLE")
+ self.syncCall(10, self.conn1.operation, "CREATE TABLE accounts (accountid text NOT NULL PRIMARY KEY, balance numeric not null);", "CREATE TABLE")
+ self.syncCall(10, self.conn1.operation, "INSERT INTO accounts VALUES ('checking', 600),('savings',600);", "INSERT 0 2")
+ self.syncCall(10, self.conn1.operation, "BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;", "BEGIN")
+ self.syncCall(10, self.conn2.operation, "BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;", "BEGIN")
+
+ def tearDownIteration(self, stepIdList):
+ self.syncCall(10, self.conn1.operation, "ROLLBACK;")
+ self.syncCall(10, self.conn2.operation, "ROLLBACK;")
+ self.printStepResults(stepIdList)
+
+ def commitRequired(self, stepIdList):
+ return ( stepIdList.index('c1') < stepIdList.index('wy2')
+ or stepIdList.index('c2') < stepIdList.index('wx1'))
+
+ def rollbackRequired(self, stepIdList):
+ return not self.commitRequired(stepIdList)
+
+
+ class ReferentialIntegrityTest(DatabasePermutationTest):
+ """ Referential integrity test.
+ The assumption here is that the application code issuing the SELECT
+ to test for the presence or absence of a related record would do the
+ right thing -- this script doesn't include that logic.
+ """
+
+ needs = (('conn1', 'ISqlConnection'),
+ ('conn2', 'ISqlConnection'))
+
+ description = "referential integrity test"
+
+ stepThreading = [['rx1','wy1','c1'],['rx2','ry2','wx2','c2']]
+
+ def populateStepDictionary(self):
+ self.stepDictionary = {
+ 'rx1': lambda : self.tryOperation(self.conn1, "SELECT i FROM a WHERE i = 1;"),
+ 'wy1': lambda : self.tryOperation(self.conn1, "INSERT INTO b VALUES (1);"),
+ 'c1': lambda : self.tryOperation(self.conn1, "COMMIT;"),
+ 'rx2': lambda : self.tryOperation(self.conn2, "SELECT i FROM a WHERE i = 1;"),
+ 'ry2': lambda : self.tryOperation(self.conn2, "SELECT a_id FROM b WHERE a_id = 1;"),
+ 'wx2': lambda : self.tryOperation(self.conn2, "DELETE FROM a WHERE i = 1;"),
+ 'c2': lambda : self.tryOperation(self.conn2, "COMMIT;")
+ }
+
+ def setUpIteration(self, stepIdList):
+ self.serializationFailure = False
+ self.syncCall(10, self.conn1.operation, "DROP TABLE IF EXISTS a, b;", "DROP TABLE")
+ self.syncCall(10, self.conn1.operation, "CREATE TABLE a (i int PRIMARY KEY);", "CREATE TABLE")
+ self.syncCall(10, self.conn1.operation, "CREATE TABLE b (a_id int);", "CREATE TABLE")
+ self.syncCall(10, self.conn1.operation, "INSERT INTO a VALUES (1);", "INSERT 0 1")
+ self.syncCall(10, self.conn1.operation, "BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;", "BEGIN")
+ self.syncCall(10, self.conn2.operation, "BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;", "BEGIN")
+
+ def tearDownIteration(self, stepIdList):
+ self.syncCall(10, self.conn1.operation, "ROLLBACK;")
+ self.syncCall(10, self.conn2.operation, "ROLLBACK;")
+ self.printStepResults(stepIdList)
+
+ def commitRequired(self, stepIdList):
+ return ( stepIdList.index('c1') < stepIdList.index('rx2')
+ or stepIdList.index('c2') < stepIdList.index('rx1'))
+
+ def rollbackRequired(self, stepIdList):
+ return not self.commitRequired(stepIdList)
+
+
+ class RITriggerTest(DatabasePermutationTest):
+ """ Referential integrity trigger test.
+ """
+
+ needs = (('conn1', 'ISqlConnection'),
+ ('conn2', 'ISqlConnection'))
+
+ description = "referential integrity trigger test"
+
+ stepThreading = [['wxry1','c1'],['r2','wyrx2','c2']]
+
+ def populateStepDictionary(self):
+ self.stepDictionary = {
+ 'wxry1': lambda : self.tryOperation(self.conn1, "INSERT INTO child (parent_id) VALUES (0);"),
+ 'c1': lambda : self.tryOperation(self.conn1, "COMMIT;"),
+ 'r2': lambda : self.tryOperation(self.conn2, "SELECT TRUE;"),
+ 'wyrx2': lambda : self.tryOperation(self.conn2, "DELETE FROM parent WHERE parent_id = 0;"),
+ 'c2': lambda : self.tryOperation(self.conn2, "COMMIT;")
+ }
+
+ def setUpIteration(self, stepIdList):
+ self.serializationFailure = False
+ self.syncCall(10, self.conn1.operation, "DROP TABLE IF EXISTS parent, child;", "DROP TABLE")
+ self.syncCall(10, self.conn1.operation, "CREATE TABLE parent (parent_id SERIAL NOT NULL PRIMARY KEY);", "CREATE TABLE")
+ self.syncCall(10, self.conn1.operation, "CREATE TABLE child (child_id SERIAL NOT NULL PRIMARY KEY, parent_id INTEGER NOT NULL);", "CREATE TABLE")
+ self.syncCall(10, self.conn1.operation, "CREATE OR REPLACE FUNCTION ri_parent() RETURNS TRIGGER AS $body$\
+ BEGIN\
+ PERFORM TRUE FROM child WHERE parent_id = OLD.parent_id;\
+ IF FOUND THEN\
+ RAISE SQLSTATE '23503' USING MESSAGE = 'Parent ' || OLD.parent_id || ' still referenced during ' || TG_OP;\
+ END IF;\
+ RETURN NULL;\
+ END;\
+ $body$ LANGUAGE PLPGSQL VOLATILE;", "CREATE FUNCTION")
+ self.syncCall(10, self.conn1.operation, "CREATE TRIGGER ri_parent AFTER UPDATE OR DELETE ON parent FOR EACH ROW EXECUTE PROCEDURE ri_parent();", "CREATE TRIGGER")
+ self.syncCall(10, self.conn1.operation, "CREATE OR REPLACE FUNCTION ri_child() RETURNS TRIGGER AS $body$\
+ BEGIN\
+ PERFORM TRUE FROM parent WHERE parent_id = NEW.parent_id;\
+ IF NOT FOUND THEN\
+ RAISE SQLSTATE '23503' USING MESSAGE = 'Parent ' || NEW.parent_id || ' does not exist during ' || TG_OP;\
+ END IF;\
+ RETURN NULL;\
+ END;\
+ $body$ LANGUAGE PLPGSQL VOLATILE;", "CREATE FUNCTION")
+ self.syncCall(10, self.conn1.operation, "CREATE TRIGGER ri_child AFTER INSERT OR UPDATE ON child FOR EACH ROW EXECUTE PROCEDURE ri_child();", "CREATE TRIGGER")
+ self.syncCall(10, self.conn1.operation, "INSERT INTO parent VALUES(0);", "INSERT 0 1")
+ self.syncCall(10, self.conn1.operation, "BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;", "BEGIN")
+ self.syncCall(10, self.conn2.operation, "BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;", "BEGIN")
+
+ # Override the normal method to allow failures generated by the trigger code
+ # to be considered "success". Just so we can count things up.
+ def tryOperation(self, conn, sql):
+ result = self.syncCall(10, conn.operation, sql),
+ for line in result:
+ if len(line) > 0 and line.startswith("ERROR: could not serialize"):
+ self.serializationFailure = True
+ else:
+ if (len(line) > 0 and line.startswith("ERROR:")
+ and len(line) > 0 and not line.startswith("ERROR: Parent 0 ")):
+ raise TestFailure("failure other than serializable encountered: " + line, line)
+
+ def tearDownIteration(self, stepIdList):
+ self.syncCall(10, self.conn1.operation, "ROLLBACK;")
+ self.syncCall(10, self.conn2.operation, "ROLLBACK;")
+ self.printStepResults(stepIdList)
+
+ def commitRequired(self, stepIdList):
+ return ( stepIdList.index('c1') < stepIdList.index('r2')
+ or stepIdList.index('c2') < stepIdList.index('wxry1'))
+
+ def rollbackRequired(self, stepIdList):
+ return not self.commitRequired(stepIdList)
+
+
+ class TestTrueSerializabilityConcurrentUpdates(SyncTest):
+ """ Runs three transactions concurrently, each reading from what the
+ other writes in turn. Should raise a serialization failure, but
+ instead leads to wrong results, ATM.
+ """
+
+ description = "concurrent updates"
+
+ needs = (('conn1', 'ISqlConnection'),
+ ('conn2', 'ISqlConnection'),
+ ('conn3', 'ISqlConnection'))
+
+ def execOnAllConnections(self, sql, expRes=None):
+ deferreds = []
+ for conn in self.connections:
+ d = conn.operation(sql, expRes)
+ deferreds.append(d)
+
+ d = defer.DeferredList(deferreds,
+ consumeErrors=True, fireOnOneErrback=True)
+ return d
+
+ def readValueThenWrite(self, conn, readFromId, writeToId):
+ d = conn.query("SELECT t FROM test WHERE i = %d;" % readFromId)
+ d.addCallback(self.writeValueBack, conn, writeToId)
+ return d
+
+ def writeValueBack(self, result, conn, writeToId):
+ self.assertEqual(1, len(result),
+ "expected exactly one result row")
+ row = result[0]
+ self.assertEqual(1, len(row),
+ "expected exactly one column")
+ value = row['t']
+ d = conn.operation("UPDATE test SET t = '%s' WHERE i = %d;" % (value, writeToId),
+ "UPDATE")
+ return d
+
+ def startConcurrentOperations(self):
+ d1 = self.readValueThenWrite(self.conn1, readFromId=5, writeToId=7)
+ d2 = self.readValueThenWrite(self.conn2, readFromId=7, writeToId=11)
+ d3 = self.readValueThenWrite(self.conn3, readFromId=11, writeToId=5)
+ return defer.DeferredList([d1, d2, d3],
+ consumeErrors=True, fireOnOneErrback=True)
+
+ def run(self):
+ try:
+ self.sub_run()
+ finally:
+ self.syncCall(10, self.execOnAllConnections, "ROLLBACK;")
+
+ def sub_run(self):
+ self.connections = [
+ self.conn1,
+ self.conn2,
+ self.conn3]
+
+ # begin a transaction on all three connections
+ self.syncCall(10, self.execOnAllConnections,
+ "BEGIN;", "BEGIN")
+
+ # set their isolation level to SERIALIZABLE
+ self.syncCall(10, self.execOnAllConnections,
+ "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;", "SET")
+
+ # concurrently let each of the three transactions read a value and
+ # write that to another tuple, wait for all the UPDATEs to complete
+ # before trying to commit any of the transactions
+ self.syncCall(10, self.startConcurrentOperations)
+
+ # try to commit all three transactions (accepting both COMMIT or
+ # ERROR, we check the result later on).
+ self.syncCall(10, self.execOnAllConnections,
+ "COMMIT;", "COMMIT|ERROR");
+
+ # count the occurrance of each fruit
+ result = self.syncCall(10, self.conn1.query,
+ "SELECT t FROM test WHERE i IN (5, 7, 11);")
+ counters = {'banana': 0, 'apple': 0, 'pear': 0}
+ for row in result:
+ counters[row['t']] += 1
+
+ # you currently get one fruit each, as no transaction gets aborted,
+ # which is impossible if the transactions had been executed one
+ # after another.
+ if counters.values() == [1, 1, 1]:
+ raise TestFailure("conflict not detected",
+ "All transactions committed, so the conflict hasn't been detected.")
+
+ class TestTrueSerializabilityConcurrentInsert(BaseTest):
+ """ Runs two transactions, both doing an insert, first, then select
+ all the relevant rows (within the range 100 <= i < 110). We let the
+ first transaction commit before creating the cyclic dependency,
+ which forces transaction 2 to abort.
+ """
+
+ description = "concurrent insert"
+
+ needs = (('conn1', 'ISqlConnection'),
+ ('conn2', 'ISqlConnection'))
+
+ def execOnAllConnections(self, sql, expRes=None):
+ deferreds = []
+ for conn in self.connections:
+ d = conn.operation(sql, expRes)
+ deferreds.append(d)
+
+ d = defer.DeferredList(deferreds,
+ consumeErrors=True, fireOnOneErrback=True)
+ return d
+
+ def run(self):
+ self.connections = [
+ self.conn1,
+ self.conn2]
+
+ # begin a transaction on all three connections
+ d = self.execOnAllConnections("BEGIN;", "BEGIN")
+
+ # set their isolation level to SERIALIZABLE
+ d.addCallback(lambda x:
+ self.execOnAllConnections(
+ "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;", "SET"))
+
+ # let transaction 1 do an insert (so it acquires a snapshot)
+ d.addCallback(lambda x:
+ self.conn1.operation(
+ "INSERT INTO test (i, t) VALUES (101, 'orange');", "INSERT 0 1"))
+
+ # then same for transaction 2
+ d.addCallback(lambda x:
+ self.conn2.operation(
+ "INSERT INTO test (i, t) VALUES (102, 'grapefruit');", "INSERT 0 1"))
+
+ # let transaction 1 read the relevant rows, so it acquires an SIREAD
+ # lock on the predicate. (The result is discarded).
+ d.addCallback(lambda x:
+ self.conn2.query("SELECT t FROM test WHERE i >= 100 AND i < 110;"))
+
+ # then commit transaction 1 (which should still succeed)
+ d.addCallback(lambda x:
+ self.conn1.operation(
+ "COMMIT;", "COMMIT"))
+
+ # try to read all rows with the second transaction's snapshot (which
+ # doesn't see the update of transaction 1)
+ d.addCallback(lambda x:
+ self.conn2.query("SELECT t FROM test WHERE i >= 100 AND i < 110;"))
+
+ # With SSI in place, this should lock the same predicate with an
+ # SIREAD lock, which should bomb out on the orange (tuple i = 101)
+ # from transaction 1.
+ #
+ # dtester FIXME: Hm.. this could need some "expect to fail" help
+ # from dtester
+ d.addCallback(self.checkResult)
+
+ # cleanup both transactions, especially in case of failure
+ d.addBoth(self.cleanup)
+
+ return d
+
+ def checkResult(self, result):
+ if not isinstance(result, failure.Failure):
+ raise TestFailure("conflict not detected",
+ "SELECT should raise a serialization error")
+ return result
+
+ def cleanup(self, result):
+ d = self.execOnAllConnections("ROLLBACK;")
+
+ # ignore errors above, but instead make sure we return the result
+ # we got here, especially if it was an error.
+ d.addBoth(lambda x: result)
+ return d
+
+ class TestTrueSerializabilityConcurrentInsert2(BaseTest):
+ """ Pretty similar to the above test, except that the first transaction
+ doesn't read (and thus predicate lock) the relevant rows. This still
+ leaves a possible serialization ordering, even if it doesn't match
+ the real commit ordering.
+
+ Uses rows 200 <= i < 210
+ """
+
+ description = "concurrent insert"
+
+ needs = (('conn1', 'ISqlConnection'),
+ ('conn2', 'ISqlConnection'))
+
+ def execOnAllConnections(self, sql, expRes=None):
+ deferreds = []
+ for conn in self.connections:
+ d = conn.operation(sql, expRes)
+ deferreds.append(d)
+
+ d = defer.DeferredList(deferreds,
+ consumeErrors=True, fireOnOneErrback=True)
+ return d
+
+ def run(self):
+ self.connections = [
+ self.conn1,
+ self.conn2]
+
+ # begin a transaction on all three connections
+ d = self.execOnAllConnections("BEGIN;", "BEGIN")
+
+ # set their isolation level to SERIALIZABLE
+ d.addCallback(lambda x:
+ self.execOnAllConnections(
+ "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;", "SET"))
+
+ # let transaction 1 do an insert (so it acquires a snapshot)
+ d.addCallback(lambda x:
+ self.conn1.operation(
+ "INSERT INTO test (i, t) VALUES (201, 'orange');", "INSERT 0 1"))
+
+ # then same for transaction 2
+ d.addCallback(lambda x:
+ self.conn2.operation(
+ "INSERT INTO test (i, t) VALUES (202, 'grapefruit');", "INSERT 0 1"))
+
+ # no SELECT here, so transaction 1 doesn't acquire any SIREAD lock
+
+ # then commit transaction 1 (which should succeed)
+ d.addCallback(lambda x:
+ self.conn1.operation(
+ "COMMIT;", "COMMIT"))
+
+ # try to read all rows with the second transaction's snapshot (which
+ # doesn't see the update of transaction 1)
+ d.addCallback(lambda x:
+ self.conn2.query("SELECT t FROM test WHERE i >= 200 AND i < 210;"))
+
+ # With SSI in place, this should lock the same predicate as abover
+ # with an SIREAD lock. This includes the row just written by the
+ # first transaction.
+ #
+ # As long as there are no other edges, this still leaves a possible
+ # serialization ordering: if we executed the second transaction
+ # *before* the first one, the second didn't see the 'orange' row
+ # inserted "later" by the first transaction. That's the result we
+ # expect.
+ d.addCallback(self.checkResult)
+
+ # commit transaction 2
+ d.addCallback(lambda x:
+ self.conn2.operation(
+ "COMMIT;", "COMMIT"))
+
+ # add a cleanup handler
+ d.addErrback(self.cleanup)
+
+ return d
+
+ def checkResult(self, result):
+ self.assertEqual(len(result), 1,
+ "Expected exactly one row, got %d (%s)" % (
+ len(result), repr(result)))
+ self.assertEqual(result[0], {"t": "grapefruit"},
+ "Expected to read the grapefruit row, but got %s" % (result[0],))
+
+ return result
+
+ def cleanup(self, result):
+ d = self.execOnAllConnections("ROLLBACK;")
+
+ # ignore errors above, but instead make sure we return the result
+ # we got here, especially if it was an error.
+ d.addBoth(lambda x: result)
+ return d
+
+
+ # ****** test running code ************************************************
+
+ class Logger(object):
+ """ A simplistic logger that just writes it all into one single file.
+ """
+ def __init__(self, logFileName):
+ self.logfile = open(logFileName, 'w')
+
+ def __del__(self):
+ self.logfile.close()
+
+ def callback(self, event):
+ self.logfile.write(str(event) + "\n")
+ self.logfile.flush()
+
+ def main(argv):
+ print "Postgres dtester suite Copyright (c) 2004-2010, by Markus Wanner\n"
+
+ postgres_configure_args = "@configure_args@"
+
+ config = {
+ 'temp-port': 65432,
+
+ # by default, use the same installation directory as make check
+ 'inst_dir': os.path.join(os.getcwd(), 'tmp_check/install'),
+
+ # and a similar prefix
+ 'pgdata_prefix': os.path.join(os.getcwd(), 'tmp_check/data-dtester'),
+ 'logfile' : os.path.join(os.getcwd(), 'dtester.log'),
+
+ 'enable_cassert': 'enable_cassert' in postgres_configure_args
+ }
+
+ try:
+ opts, args = getopt.getopt(argv,
+ "h",
+ ["help", "temp-install", "top-builddir=", "temp-port=",
+ "multibyte="])
+ except getopt.GetoptError:
+ usage()
+ sys.exit(2)
+
+ for opt, arg in opts:
+ if opt in ("-h", "--help"):
+ usage()
+ sys.exit()
+ elif opt in ("--temp-install"):
+ config["temp-install"] = True
+ elif opt in ("--temp-port"):
+ try:
+ arg = int(arg)
+ if arg >= 1024 and arg <= 65535:
+ config["temp-port"] = arg
+ else:
+ print "temp-port out of range."
+ sys.exit(2)
+ except ValueError:
+ print "Fatal: invalid temp-port specified"
+ sys.exit(2)
+ elif opt in ("--top-builddir"):
+ config["top-builddir"] = arg
+
+
+ if not config.has_key('bindir'):
+ bindir = '@bindir@'
+ if bindir[0] == '/':
+ bindir = bindir[1:]
+ config['bindir'] = os.path.join(config['inst_dir'], bindir)
+ if not config.has_key('libdir'):
+ libdir = '@libdir@'
+ if libdir[0] == '/':
+ libdir = libdir[1:]
+ config['libdir'] = os.path.join(config['inst_dir'], libdir)
+ if not config.has_key('datadir'):
+ datadir = '@datadir@'
+ if datadir[0] == '/':
+ datadir = datadir[1:]
+ config['datadir'] = os.path.join(config['inst_dir'], datadir)
+
+
+ # FIXME: should not have to be here
+ logger = Logger(config['logfile'])
+ config['main_logging_hook'] = (EventMatcher(Event), logger.callback)
+
+
+ # definition of tests and suites, including their dependencies
+ tdef = {
+ # runs 'make install' to make sure the installation is up to date
+ 'temp_install': {'class': InstallationSuite,
+ 'uses': ('__system__',)},
+
+ # runs initdb, providing the Postgres data directory
+ 'initdb-0': {'class': InitdbSuite,
+ 'uses': ('temp_install',),
+ 'args': (0,)},
+
+ # runs a postmaster on the created database directory
+ 'pg-0': {'class': PostmasterSuite,
+ 'uses': ('temp_install', 'initdb-0')},
+
+ # creates a test database on pg-0
+ 'testdb': {'class': TestDatabaseSuite,
+ 'uses': ('temp_install', 'pg-0'),
+ 'args': ('testdb',)},
+
+ # open two connections
+ 'conn-0A': {'class': SqlConnectionSuite,
+ 'uses': ('temp_install', 'pg-0'),
+ 'args': ('testdb',),
+ 'depends': ('testdb',)},
+ 'conn-0B': {'class': SqlConnectionSuite,
+ 'uses': ('temp_install', 'pg-0'),
+ 'args': ('testdb',),
+ 'depends': ('testdb',)},
+ 'conn-0C': {'class': SqlConnectionSuite,
+ 'uses': ('temp_install', 'pg-0'),
+ 'args': ('testdb',),
+ 'depends': ('testdb',)},
+
+ # test the connections
+ 'test-conn-0A': {'class': TestDatabaseConnection,
+ 'uses': ('conn-0A',)},
+ 'test-conn-0B': {'class': TestDatabaseConnection,
+ 'uses': ('conn-0B',)},
+ 'test-conn-0C': {'class': TestDatabaseConnection,
+ 'uses': ('conn-0C',)},
+
+ # 'dummy-recursion': {'class': DummyPermutationTest},
+
+ # populate the test database
+ 'populate-testdb': {'class': PopulateTestDatabase,
+ 'uses': ('conn-0A',),
+ 'onlyAfter': ('test-conn-0A', 'test-conn-0B',
+ 'test-conn-0C')},
+
+ 'simple-write-skew': {'class': SimpleWriteSkewTest,
+ 'uses': ('conn-0A', 'conn-0B'),
+ 'onlyAfter': ('populate-testdb',)},
+
+ 'receipt-report': {'class': ReceiptReportTest,
+ 'uses': ('conn-0A', 'conn-0B', 'conn-0C'),
+ 'onlyAfter': ('simple-write-skew',)},
+
+ 'temporal-range': {'class': TemporalRangeIntegrityTest,
+ 'uses': ('conn-0A', 'conn-0B'),
+ 'onlyAfter': ('receipt-report',)},
+
+ 'project-manager': {'class': ProjectManagerTest,
+ 'uses': ('conn-0A', 'conn-0B'),
+ 'onlyAfter': ('temporal-range',)},
+
+ 'classroom-scheduling': {'class': ClassroomSchedulingTest,
+ 'uses': ('conn-0A', 'conn-0B'),
+ 'onlyAfter': ('project-manager',)},
+
+ 'total-cash': {'class': TotalCashTest,
+ 'uses': ('conn-0A', 'conn-0B'),
+ 'onlyAfter': ('classroom-scheduling',)},
+
+ 'referential-integrity': {'class': ReferentialIntegrityTest,
+ 'uses': ('conn-0A', 'conn-0B'),
+ 'onlyAfter': ('total-cash',)},
+
+ 'ri-trigger': {'class': RITriggerTest,
+ 'uses': ('conn-0A', 'conn-0B'),
+ 'onlyAfter': ('referential-integrity',)}
+
+ # 'ser-updates': {'class': TestTrueSerializabilityConcurrentUpdates,
+ # 'uses': ('conn-0A', 'conn-0B', 'conn-0C'),
+ # 'onlyAfter': ('populate-testdb',),
+ # 'xfail': True},
+ #
+ # 'ser-insert': {'class': TestTrueSerializabilityConcurrentInsert,
+ # 'uses': ('conn-0A', 'conn-0B'),
+ # 'onlyAfter': ('ser-updates',),
+ # 'xfail': True},
+ #
+ # 'ser-insert2': {'class': TestTrueSerializabilityConcurrentInsert2,
+ # 'uses': ('conn-0A', 'conn-0B'),
+ # 'onlyAfter': ('ser-insert',)}
+ }
+
+
+ runner = Runner(reporter=TapReporter(sys.stdout, sys.stderr, showTimingInfo=True),
+ testTimeout=600, suiteTimeout=3600)
+ runner.run(tdef, config)
+
+
+ if __name__ == "__main__":
+ main(sys.argv[1:])
+
I wrote:
Dan and I have now implemented most of the mitigation techniques
..., and I now feel confident I have a good grasp of how long each
type of data is useful. (By useful I mean that to maintain data
integrity without them it will be necessary to roll back some
transactions which could have been allowed to commit had the data
been available.)
I think that we need to be able keep something on the order of 10
times the max_connections number of SERIALIZABLEXACT structures in
shared memory for mitigation techniques to have a chance to work
well for most workloads. When that fills, we will start pushing the
oldest committed transactions out to make room, and fall back on the
graceful degradation. Heikki said in a previous post that he didn't
care if it 10 times or 100 times so long as it was finite and there
was graceful degradation after that, so it would appear that unless
someone else objects, this should fly. This structure fits (barely)
into 128 bytes when pointers are 64-bits.
(1) An active read only transaction needs to be able to recognize
when it is reading a tuple which was written by an overlapping
transaction which has committed, but only if that read write
transaction has a rw-conflict out to a transaction committed
before the read only transaction acquired its snapshot.
(2) An active read write transaction needs to be able to
recognize when it is reading a tuple which was written by an
overlapping transaction which has committed, and to know whether
that committed transaction had any rw-conflict(s) out to
previously committed transaction(s).
When committed transactions which have written to permanent tables
need to be pushed from the main structures, I think that keeping the
xid and the 64 bit commit seq no of the earliest rw-conflict out is
needed. Zero would mean no conflict. Such a list could address
these two needs. We could keep track of min and max xid values on
the list to avoid searches for values out of range. This seems like
a reasonable fit for the SLRU technique suggested by Heikki. Read
only transactions (declared or de facto) don't need to be included
in this list.
(3) An active read write transaction needs to be able to detect
when one of its writes conflicts with a predicate lock from an
overlapping transaction which has committed. There's no need to
know which one, but by the definition of a rw-conflict, it must
have overlapped.
I think the cleanest way to handle this need is to have a "dummy"
SERIALIZABLEXACT structure sitting around to represent displaced
committed transactions and to move predicate locks to that as
transactions are pushed out of the primary structures. We would add
a commit seq no field to the predicate lock structure which would
only be used when locks were moved here. Duplicate locks (locks on
the same target) would collapse to a single lock and would use the
latest commit seq no. This is conceptually very similar to Heikki's
initial suggestion on this topic.
(4) An active read write transaction needs to know that it had a
rw-conflict out to a committed transaction. There's no need to
know which one, but by the definition of a rw-conflict, it must
have overlapped.(5) An active read write transaction needs to know that it had a
rw-conflict in from a committed transaction. There's no need to
know which one, but by the definition of a rw-conflict, it must
have overlapped.
These two are easy -- we can define a couple more flag bits for
active transactions to check when determining whether a new
rw-conflict has created a dangerous structure which must be rolled
back.
Any comments welcome. Barring surprises, I start coding on this
tomorrow.
-Kevin
On 26.12.2010 21:40, Kevin Grittner wrote:
To recap, I've had an open question on the Serializable Wiki page[1]
since January about how we should handle long-running transactions.
The algorithm published by Cahill et al requires keeping some
transaction information in memory for all committed transactions
which overlapped a still-running transaction. Since we need to keep
this in shared memory, and the structures must have a finite
allocation, there's an obvious looming limit, even if the allocation
is relatively generous.
Looking at the predicate lock splitting, it occurs to me that it's
possible for a non-serializable transaction to be canceled if it needs
to split a predicate lock held by a concurrent serializable transaction,
and you run out of space in the shared memory predicate lock area. Any
chance of upgrading the lock to a relation lock, or killing the
serializable transaction instead?
--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com
Heikki Linnakangas <heikki.linnakangas@enterprisedb.com> wrote:
Looking at the predicate lock splitting, it occurs to me that
it's possible for a non-serializable transaction to be canceled if
it needs to split a predicate lock held by a concurrent
serializable transaction, and you run out of space in the shared
memory predicate lock area.
Good point. We don't want that, for sure.
Any chance of upgrading the lock to a relation lock, or killing
the serializable transaction instead?
Absolutely. Good suggestion. Thanks!
-Kevin
"Kevin Grittner" <Kevin.Grittner@wicourts.gov> wrote:
Any chance of upgrading the lock to a relation lock, or killing
the serializable transaction instead?Absolutely. Good suggestion. Thanks!
I pushed a TODO SSI comment at the appropriate point with my ideas
on how best to fix this. I want to stick with the SLRU changes for
now, rather than risk flushing "brain cache" on the topic just now.
If Dan (or anyone else, for that matter) wants to fix this, feel
free; just post first, as will I if nobody beats me to it.
There are actually two spots in PredicateLockPageSplit and one in
PredicateLockPageCombine where this needs to be addressed. I can't
think of any other functions where we're vulnerable to having an
impact on non-serializable transactions. We sure want to plug those
-- I see it as critical to acceptance that we can honor the promise
of no impact on any transactions at REPEATABLE READ or less strict
isolation levels.
-Kevin