[GOSC' 17][Performance report] Eliminate O(N^2) scaling from rw-conflict tracking in serializable transactions
Hi, all. I wrote a performance report to conclude what I've done so far.
https://docs.google.com/document/d/16A6rfJnQSTkd0SHK-2XzDiLj7aZ5nC189jGPcfVdhMQ/edit?usp=sharing
Three patch are attached. I used the benchmark below to test the performance.
https://github.com/liumx10/pg-bench
It requires golang (>= 1.6) if you want to reproduce the code.
NOTE:
1. The reason why hash table or skip list didn't improve the performance is that
maintaining the conflict list became slower even though conflict tracking was faster.
So far, skip list is the most promising way. But the code is a little tricky.
BTW, if there is a case in which inserting an conflict element is rare but conflict checking is common,
my patch may be better.
2. Reducing contention on global lock has a better performance in this evaluation.
But two weeks ago when I used another machine, it has a worse performance.
It's hard to explain why...
You can reply directly if you have any questions or comments.
--
Sincerely
Mengxing Liu
Attachments:
HTAB-for-conflict-tracking.patchapplication/octet-stream; name=HTAB-for-conflict-tracking.patchDownload
diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c
index 27c4af9..1f60f13 100644
--- a/src/backend/storage/lmgr/predicate.c
+++ b/src/backend/storage/lmgr/predicate.c
@@ -374,12 +374,6 @@ int max_predicate_locks_per_page; /* set by guc.c */
static PredXactList PredXact;
/*
- * This provides a pool of RWConflict data elements to use in conflict lists
- * between transactions.
- */
-static RWConflictPoolHeader RWConflictPool;
-
-/*
* The predicate locking hash tables are in shared memory.
* Each backend keeps pointers to them.
*/
@@ -471,6 +465,15 @@ static void OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *read
/*------------------------------------------------------------------------*/
/*
+ * Test whether a hash table is empty.
+ * I didn't find any function in dynamic hash supports the requirement.
+ */
+#define hash_table_empty(hashp) \
+ (hash_get_num_entries(hashp) == 0)
+
+/*------------------------------------------------------------------------*/
+
+/*
* Does this relation participate in predicate locking? Temporary and system
* relations are exempt, as are materialized views.
*/
@@ -573,6 +576,14 @@ CreatePredXact(void)
SHMQueueDelete(&ptle->link);
SHMQueueInsertBefore(&PredXact->activeList, &ptle->link);
+
+ /*
+ * NOTE: We don't need to clean the HTAB, because all of its elements
+ * has been released before.
+ */
+ Assert(hash_table_empty(ptle->sxact.inConflictsTab));
+ Assert(hash_table_empty(ptle->sxact.outConflictsTab));
+
return &ptle->sxact;
}
@@ -635,60 +646,55 @@ NextPredXact(SERIALIZABLEXACT *sxact)
static bool
RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer)
{
- RWConflict conflict;
+ bool found;
Assert(reader != writer);
/* Check the ends of the purported conflict first. */
if (SxactIsDoomed(reader)
- || SxactIsDoomed(writer)
- || SHMQueueEmpty(&reader->outConflicts)
- || SHMQueueEmpty(&writer->inConflicts))
+ || SxactIsDoomed(writer))
return false;
- /* A conflict is possible; walk the list to find out. */
- conflict = (RWConflict)
- SHMQueueNext(&reader->outConflicts,
- &reader->outConflicts,
- offsetof(RWConflictData, outLink));
- while (conflict)
- {
- if (conflict->sxactIn == writer)
- return true;
- conflict = (RWConflict)
- SHMQueueNext(&reader->outConflicts,
- &conflict->outLink,
- offsetof(RWConflictData, outLink));
- }
+ hash_search(reader->outConflictsTab,
+ &writer,
+ HASH_FIND,
+ &found);
- /* No conflict found. */
- return false;
+ return found;
}
static void
SetRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer)
{
RWConflict conflict;
+ bool found;
Assert(reader != writer);
Assert(!RWConflictExists(reader, writer));
- conflict = (RWConflict)
- SHMQueueNext(&RWConflictPool->availableList,
- &RWConflictPool->availableList,
- offsetof(RWConflictData, outLink));
- if (!conflict)
- ereport(ERROR,
+ conflict = (RWConflict)hash_search(reader->outConflictsTab,
+ &writer,
+ HASH_ENTER_NULL,
+ &found);
+ if (!conflict){
+ ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
- errmsg("not enough elements in RWConflictPool to record a read/write conflict"),
- errhint("You might need to run fewer transactions at a time or increase max_connections.")));
-
- SHMQueueDelete(&conflict->outLink);
+ errmsg("insert to outRWconflicts hash table failed")));
+ }
+ conflict->sxactOut = reader;
+ conflict->sxactIn = writer;
+ conflict = (RWConflict)hash_search(writer->inConflictsTab,
+ &reader,
+ HASH_ENTER_NULL,
+ &found);
+ if (!conflict){
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("insert to inRWconflicts hash table failed")));
+ }
conflict->sxactOut = reader;
conflict->sxactIn = writer;
- SHMQueueInsertBefore(&reader->outConflicts, &conflict->outLink);
- SHMQueueInsertBefore(&writer->inConflicts, &conflict->inLink);
}
static void
@@ -696,44 +702,83 @@ SetPossibleUnsafeConflict(SERIALIZABLEXACT *roXact,
SERIALIZABLEXACT *activeXact)
{
RWConflict conflict;
+ bool found;
Assert(roXact != activeXact);
Assert(SxactIsReadOnly(roXact));
Assert(!SxactIsReadOnly(activeXact));
- conflict = (RWConflict)
- SHMQueueNext(&RWConflictPool->availableList,
- &RWConflictPool->availableList,
- offsetof(RWConflictData, outLink));
- if (!conflict)
- ereport(ERROR,
+ conflict = (RWConflict)hash_search(roXact->possibleUnsafeConflictsTab,
+ &activeXact,
+ HASH_ENTER_NULL,
+ &found);
+ if (!found || conflict == NULL)
+ {
+ ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
- errmsg("not enough elements in RWConflictPool to record a potential read/write conflict"),
- errhint("You might need to run fewer transactions at a time or increase max_connections.")));
-
- SHMQueueDelete(&conflict->outLink);
+ errmsg("insert to possibleUnsafeConflicts hash table failed")));
+ }
+ conflict->sxactOut = activeXact;
+ conflict->sxactIn = roXact;
+ conflict = (RWConflict)hash_search(activeXact->possibleUnsafeConflictsTab,
+ &roXact,
+ HASH_ENTER_NULL,
+ &found);
+ if (!found || conflict == NULL)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("insert to possibleUnsafeConflicts hash table failed")));
+ }
conflict->sxactOut = activeXact;
conflict->sxactIn = roXact;
- SHMQueueInsertBefore(&activeXact->possibleUnsafeConflicts,
- &conflict->outLink);
- SHMQueueInsertBefore(&roXact->possibleUnsafeConflicts,
- &conflict->inLink);
}
+/*
+ * PossibleUnsafeConflicts are stored in a different hash table. Thus we need a new function
+ * NOTE: the two functions below could be merged.
+ */
+
static void
ReleaseRWConflict(RWConflict conflict)
{
- SHMQueueDelete(&conflict->inLink);
- SHMQueueDelete(&conflict->outLink);
- SHMQueueInsertBefore(&RWConflictPool->availableList, &conflict->outLink);
+ bool found;
+
+ hash_search(conflict->sxactOut->outConflictsTab,
+ &conflict->sxactIn,
+ HASH_REMOVE,
+ &found);
+
+ hash_search(conflict->sxactIn->inConflictsTab,
+ &conflict->sxactOut,
+ HASH_REMOVE,
+ &found);
+}
+
+static void
+ReleaseUnsafeRWConflict(RWConflict conflict)
+{
+ bool found;
+
+ hash_search(conflict->sxactOut->possibleUnsafeConflictsTab,
+ &conflict->sxactIn,
+ HASH_REMOVE,
+ &found);
+ Assert(found);
+
+ hash_search(conflict->sxactIn->possibleUnsafeConflictsTab,
+ &conflict->sxactOut,
+ HASH_REMOVE,
+ &found);
+ Assert(found);
}
static void
FlagSxactUnsafe(SERIALIZABLEXACT *sxact)
{
- RWConflict conflict,
- nextConflict;
+ RWConflict conflict;
+ HASH_SEQ_STATUS seqstat;
Assert(SxactIsReadOnly(sxact));
Assert(!SxactIsROSafe(sxact));
@@ -744,23 +789,13 @@ FlagSxactUnsafe(SERIALIZABLEXACT *sxact)
* We know this isn't a safe snapshot, so we can stop looking for other
* potential conflicts.
*/
- conflict = (RWConflict)
- SHMQueueNext(&sxact->possibleUnsafeConflicts,
- &sxact->possibleUnsafeConflicts,
- offsetof(RWConflictData, inLink));
- while (conflict)
+ hash_seq_init(&seqstat, sxact->possibleUnsafeConflictsTab);
+ while ((conflict = (RWConflict)hash_seq_search(&seqstat))!=NULL)
{
- nextConflict = (RWConflict)
- SHMQueueNext(&sxact->possibleUnsafeConflicts,
- &conflict->inLink,
- offsetof(RWConflictData, inLink));
-
Assert(!SxactIsReadOnly(conflict->sxactOut));
Assert(sxact == conflict->sxactIn);
- ReleaseRWConflict(conflict);
-
- conflict = nextConflict;
+ ReleaseUnsafeRWConflict(conflict);
}
}
@@ -1181,6 +1216,9 @@ InitPredicateLocks(void)
{
int i;
+ /* 50 is enough for a hash table name */
+ const char name[50];
+
SHMQueueInit(&PredXact->availableList);
SHMQueueInit(&PredXact->activeList);
PredXact->SxactGlobalXmin = InvalidTransactionId;
@@ -1194,21 +1232,52 @@ InitPredicateLocks(void)
PredXact->element = ShmemAlloc(requestSize);
/* Add all elements to available list, clean. */
memset(PredXact->element, 0, requestSize);
+
+ MemSet(&info, 0, sizeof(info));
+ info.keysize = sizeof(SERIALIZABLEXACT*);
+ info.entrysize = sizeof(RWConflictData);
+ info.num_partitions = NUM_PREDICATELOCK_PARTITIONS;
+
for (i = 0; i < max_table_size; i++)
{
SHMQueueInsertBefore(&(PredXact->availableList),
&(PredXact->element[i].link));
+
+ snprintf((char*)name, 50, "PredXact inConflictsTab %d", i);
+ PredXact->element[i].sxact.inConflictsTab = ShmemInitHash(name,
+ SERIALIZABLEXACT_CONFLICT_HASHTAB_SIZE,
+ SERIALIZABLEXACT_CONFLICT_HASHTAB_SIZE,
+ &info,
+ HASH_ELEM | HASH_BLOBS |
+ HASH_PARTITION | HASH_FIXED_SIZE);
+
+ snprintf((char*)name, 50, "PredXact outConflictsTab %d", i);
+ PredXact->element[i].sxact.outConflictsTab = ShmemInitHash(name,
+ SERIALIZABLEXACT_CONFLICT_HASHTAB_SIZE,
+ SERIALIZABLEXACT_CONFLICT_HASHTAB_SIZE,
+ &info,
+ HASH_ELEM | HASH_BLOBS |
+ HASH_PARTITION | HASH_FIXED_SIZE);
+
+ snprintf((char*)name, 50, "PredXact possibleUnsafeConflictsTab %d", i);
+ PredXact->element[i].sxact.possibleUnsafeConflictsTab = ShmemInitHash(name,
+ SERIALIZABLEXACT_CONFLICT_HASHTAB_SIZE,
+ SERIALIZABLEXACT_CONFLICT_HASHTAB_SIZE,
+ &info,
+ HASH_ELEM | HASH_BLOBS |
+ HASH_PARTITION | HASH_FIXED_SIZE);
}
+
PredXact->OldCommittedSxact = CreatePredXact();
SetInvalidVirtualTransactionId(PredXact->OldCommittedSxact->vxid);
PredXact->OldCommittedSxact->prepareSeqNo = 0;
PredXact->OldCommittedSxact->commitSeqNo = 0;
PredXact->OldCommittedSxact->SeqNo.lastCommitBeforeSnapshot = 0;
- SHMQueueInit(&PredXact->OldCommittedSxact->outConflicts);
- SHMQueueInit(&PredXact->OldCommittedSxact->inConflicts);
+ //SHMQueueInit(&PredXact->OldCommittedSxact->outConflicts);
+ //SHMQueueInit(&PredXact->OldCommittedSxact->inConflicts);
SHMQueueInit(&PredXact->OldCommittedSxact->predicateLocks);
SHMQueueInit(&PredXact->OldCommittedSxact->finishedLink);
- SHMQueueInit(&PredXact->OldCommittedSxact->possibleUnsafeConflicts);
+ //SHMQueueInit(&PredXact->OldCommittedSxact->possibleUnsafeConflicts);
PredXact->OldCommittedSxact->topXid = InvalidTransactionId;
PredXact->OldCommittedSxact->finishedBefore = InvalidTransactionId;
PredXact->OldCommittedSxact->xmin = InvalidTransactionId;
@@ -1246,26 +1315,6 @@ InitPredicateLocks(void)
*/
max_table_size *= 5;
- RWConflictPool = ShmemInitStruct("RWConflictPool",
- RWConflictPoolHeaderDataSize,
- &found);
- if (!found)
- {
- int i;
-
- SHMQueueInit(&RWConflictPool->availableList);
- requestSize = mul_size((Size) max_table_size,
- RWConflictDataSize);
- RWConflictPool->element = ShmemAlloc(requestSize);
- /* Add all elements to available list, clean. */
- memset(RWConflictPool->element, 0, requestSize);
- for (i = 0; i < max_table_size; i++)
- {
- SHMQueueInsertBefore(&(RWConflictPool->availableList),
- &(RWConflictPool->element[i].outLink));
- }
- }
-
/*
* Create or attach to the header for the list of finished serializable
* transactions.
@@ -1320,13 +1369,23 @@ PredicateLockShmemSize(void)
size = add_size(size, mul_size((Size) max_table_size,
PredXactListElementDataSize));
+ /* Hash table in Sxact
+ *
+ * NOTE: It should be 3*max_table_size*hash_table_size. Because for each Sxact, there are
+ 3 HTAB: inConflictsTab, outConflictsTab, possibleUnsafeConflictsTab.
+ * But I don't know why this setting will cause "out of memory". So I set it as 6.
+ */
+ size = add_size(size, mul_size((Size) 6*max_table_size,
+ hash_estimate_size(SERIALIZABLEXACT_CONFLICT_HASHTAB_SIZE,
+ sizeof(RWConflict))));
+
/* transaction xid table */
size = add_size(size, hash_estimate_size(max_table_size,
sizeof(SERIALIZABLEXID)));
/* rw-conflict pool */
max_table_size *= 5;
- size = add_size(size, RWConflictPoolHeaderDataSize);
+
size = add_size(size, mul_size((Size) max_table_size,
RWConflictDataSize));
@@ -1522,7 +1581,7 @@ GetSafeSnapshot(Snapshot origSnapshot)
* them marked us as conflicted.
*/
MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING;
- while (!(SHMQueueEmpty(&MySerializableXact->possibleUnsafeConflicts) ||
+ while (!(hash_table_empty(MySerializableXact->possibleUnsafeConflictsTab) ||
SxactIsROUnsafe(MySerializableXact)))
{
LWLockRelease(SerializableXactHashLock);
@@ -1569,6 +1628,7 @@ GetSafeSnapshotBlockingPids(int blocked_pid, int *output, int output_size)
{
int num_written = 0;
SERIALIZABLEXACT *sxact;
+ HASH_SEQ_STATUS seqstat;
LWLockAcquire(SerializableXactHashLock, LW_SHARED);
@@ -1585,18 +1645,11 @@ GetSafeSnapshotBlockingPids(int blocked_pid, int *output, int output_size)
RWConflict possibleUnsafeConflict;
/* Traverse the list of possible unsafe conflicts collecting PIDs. */
- possibleUnsafeConflict = (RWConflict)
- SHMQueueNext(&sxact->possibleUnsafeConflicts,
- &sxact->possibleUnsafeConflicts,
- offsetof(RWConflictData, inLink));
- while (possibleUnsafeConflict != NULL && num_written < output_size)
+ hash_seq_init(&seqstat, sxact->possibleUnsafeConflictsTab);
+ while ((possibleUnsafeConflict = hash_seq_search(&seqstat))!=NULL)
{
output[num_written++] = possibleUnsafeConflict->sxactOut->pid;
- possibleUnsafeConflict = (RWConflict)
- SHMQueueNext(&sxact->possibleUnsafeConflicts,
- &possibleUnsafeConflict->inLink,
- offsetof(RWConflictData, inLink));
}
}
@@ -1796,9 +1849,6 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
sxact->SeqNo.lastCommitBeforeSnapshot = PredXact->LastSxactCommitSeqNo;
sxact->prepareSeqNo = InvalidSerCommitSeqNo;
sxact->commitSeqNo = InvalidSerCommitSeqNo;
- SHMQueueInit(&(sxact->outConflicts));
- SHMQueueInit(&(sxact->inConflicts));
- SHMQueueInit(&(sxact->possibleUnsafeConflicts));
sxact->topXid = GetTopTransactionIdIfAny();
sxact->finishedBefore = InvalidTransactionId;
sxact->xmin = snapshot->xmin;
@@ -3250,9 +3300,9 @@ ReleasePredicateLocks(bool isCommit)
{
bool needToClear;
RWConflict conflict,
- nextConflict,
possibleUnsafeConflict;
SERIALIZABLEXACT *roXact;
+ HASH_SEQ_STATUS seqstat;
/*
* We can't trust XactReadOnly here, because a transaction which started
@@ -3361,23 +3411,15 @@ ReleasePredicateLocks(bool isCommit)
* make us unsafe. Note that we use 'inLink' for the iteration as
* opposed to 'outLink' for the r/w xacts.
*/
- possibleUnsafeConflict = (RWConflict)
- SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts,
- &MySerializableXact->possibleUnsafeConflicts,
- offsetof(RWConflictData, inLink));
- while (possibleUnsafeConflict)
- {
- nextConflict = (RWConflict)
- SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts,
- &possibleUnsafeConflict->inLink,
- offsetof(RWConflictData, inLink));
+ hash_seq_init(&seqstat, MySerializableXact->possibleUnsafeConflictsTab);
+ while ((possibleUnsafeConflict = hash_seq_search(&seqstat)) != NULL)
+
+ {
Assert(!SxactIsReadOnly(possibleUnsafeConflict->sxactOut));
Assert(MySerializableXact == possibleUnsafeConflict->sxactIn);
- ReleaseRWConflict(possibleUnsafeConflict);
-
- possibleUnsafeConflict = nextConflict;
+ ReleaseUnsafeRWConflict(possibleUnsafeConflict);
}
}
@@ -3400,17 +3442,9 @@ ReleasePredicateLocks(bool isCommit)
* back clear them all. Set SXACT_FLAG_CONFLICT_OUT if any point to
* previously committed transactions.
*/
- conflict = (RWConflict)
- SHMQueueNext(&MySerializableXact->outConflicts,
- &MySerializableXact->outConflicts,
- offsetof(RWConflictData, outLink));
- while (conflict)
+ hash_seq_init(&seqstat, MySerializableXact->outConflictsTab);
+ while((conflict = (RWConflict)hash_seq_search(&seqstat))!=NULL)
{
- nextConflict = (RWConflict)
- SHMQueueNext(&MySerializableXact->outConflicts,
- &conflict->outLink,
- offsetof(RWConflictData, outLink));
-
if (isCommit
&& !SxactIsReadOnly(MySerializableXact)
&& SxactIsCommitted(conflict->sxactIn))
@@ -3425,33 +3459,22 @@ ReleasePredicateLocks(bool isCommit)
|| SxactIsCommitted(conflict->sxactIn)
|| (conflict->sxactIn->SeqNo.lastCommitBeforeSnapshot >= PredXact->LastSxactCommitSeqNo))
ReleaseRWConflict(conflict);
-
- conflict = nextConflict;
}
/*
* Release all inConflicts from committed and read-only transactions. If
* we're rolling back, clear them all.
*/
- conflict = (RWConflict)
- SHMQueueNext(&MySerializableXact->inConflicts,
- &MySerializableXact->inConflicts,
- offsetof(RWConflictData, inLink));
- while (conflict)
+ hash_seq_init(&seqstat, MySerializableXact->inConflictsTab);
+ while((conflict = (RWConflict)hash_seq_search(&seqstat))!=NULL)
{
- nextConflict = (RWConflict)
- SHMQueueNext(&MySerializableXact->inConflicts,
- &conflict->inLink,
- offsetof(RWConflictData, inLink));
-
if (!isCommit
|| SxactIsCommitted(conflict->sxactOut)
|| SxactIsReadOnly(conflict->sxactOut))
ReleaseRWConflict(conflict);
-
- conflict = nextConflict;
}
+
if (!topLevelIsDeclaredReadOnly)
{
/*
@@ -3460,17 +3483,10 @@ ReleasePredicateLocks(bool isCommit)
* conflict out. If any are waiting DEFERRABLE transactions, wake them
* up if they are known safe or known unsafe.
*/
- possibleUnsafeConflict = (RWConflict)
- SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts,
- &MySerializableXact->possibleUnsafeConflicts,
- offsetof(RWConflictData, outLink));
- while (possibleUnsafeConflict)
+
+ hash_seq_init(&seqstat, MySerializableXact->possibleUnsafeConflictsTab);
+ while ((possibleUnsafeConflict = hash_seq_search(&seqstat))!=NULL)
{
- nextConflict = (RWConflict)
- SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts,
- &possibleUnsafeConflict->outLink,
- offsetof(RWConflictData, outLink));
-
roXact = possibleUnsafeConflict->sxactIn;
Assert(MySerializableXact == possibleUnsafeConflict->sxactOut);
Assert(SxactIsReadOnly(roXact));
@@ -3490,14 +3506,14 @@ ReleasePredicateLocks(bool isCommit)
}
else
{
- ReleaseRWConflict(possibleUnsafeConflict);
+ ReleaseUnsafeRWConflict(possibleUnsafeConflict);
/*
* If we were the last possible conflict, flag it safe. The
* transaction can now safely release its predicate locks (but
* that transaction's backend has to do that itself).
*/
- if (SHMQueueEmpty(&roXact->possibleUnsafeConflicts))
+ if (hash_table_empty(roXact->possibleUnsafeConflictsTab))
roXact->flags |= SXACT_FLAG_RO_SAFE;
}
@@ -3509,7 +3525,6 @@ ReleasePredicateLocks(bool isCommit)
(SxactIsROUnsafe(roXact) || SxactIsROSafe(roXact)))
ProcSendSignal(roXact->pid);
- possibleUnsafeConflict = nextConflict;
}
}
@@ -3726,8 +3741,8 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
{
PREDICATELOCK *predlock;
SERIALIZABLEXIDTAG sxidtag;
- RWConflict conflict,
- nextConflict;
+ RWConflict conflict;
+ HASH_SEQ_STATUS seqstat;
Assert(sxact != NULL);
Assert(SxactIsRolledBack(sxact) || SxactIsCommitted(sxact));
@@ -3823,41 +3838,25 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
sxidtag.xid = sxact->topXid;
LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
- /* Release all outConflicts (unless 'partial' is true) */
+ /* Release all outConflictsTab (unless 'partial' is true) */
if (!partial)
{
- conflict = (RWConflict)
- SHMQueueNext(&sxact->outConflicts,
- &sxact->outConflicts,
- offsetof(RWConflictData, outLink));
- while (conflict)
+ hash_seq_init(&seqstat, sxact->outConflictsTab);
+ while((conflict = (RWConflict)hash_seq_search(&seqstat))!=NULL)
{
- nextConflict = (RWConflict)
- SHMQueueNext(&sxact->outConflicts,
- &conflict->outLink,
- offsetof(RWConflictData, outLink));
if (summarize)
conflict->sxactIn->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
ReleaseRWConflict(conflict);
- conflict = nextConflict;
}
}
- /* Release all inConflicts. */
- conflict = (RWConflict)
- SHMQueueNext(&sxact->inConflicts,
- &sxact->inConflicts,
- offsetof(RWConflictData, inLink));
- while (conflict)
+ /* Release all inConflictsTab. */
+ hash_seq_init(&seqstat, sxact->inConflictsTab);
+ while((conflict = (RWConflict)hash_seq_search(&seqstat))!=NULL)
{
- nextConflict = (RWConflict)
- SHMQueueNext(&sxact->inConflicts,
- &conflict->inLink,
- offsetof(RWConflictData, inLink));
if (summarize)
conflict->sxactOut->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
ReleaseRWConflict(conflict);
- conflict = nextConflict;
}
/* Finally, get rid of the xid and the record of the transaction itself. */
@@ -4035,7 +4034,8 @@ CheckForSerializableConflictOut(bool visible, Relation relation,
errhint("The transaction might succeed if retried.")));
if (SxactHasSummaryConflictIn(MySerializableXact)
- || !SHMQueueEmpty(&MySerializableXact->inConflicts))
+ // || !SHMQueueEmpty(&MySerializableXact->inConflicts))
+ || !hash_table_empty(MySerializableXact->inConflictsTab))
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("could not serialize access due to read/write dependencies among transactions"),
@@ -4520,6 +4520,7 @@ OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
{
bool failure;
RWConflict conflict;
+ HASH_SEQ_STATUS seqstat;
Assert(LWLockHeldByMe(SerializableXactHashLock));
@@ -4567,29 +4568,24 @@ OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
conflict = NULL;
}
else
- conflict = (RWConflict)
- SHMQueueNext(&writer->outConflicts,
- &writer->outConflicts,
- offsetof(RWConflictData, outLink));
- while (conflict)
{
- SERIALIZABLEXACT *t2 = conflict->sxactIn;
-
- if (SxactIsPrepared(t2)
- && (!SxactIsCommitted(reader)
- || t2->prepareSeqNo <= reader->commitSeqNo)
- && (!SxactIsCommitted(writer)
- || t2->prepareSeqNo <= writer->commitSeqNo)
- && (!SxactIsReadOnly(reader)
- || t2->prepareSeqNo <= reader->SeqNo.lastCommitBeforeSnapshot))
+ hash_seq_init(&seqstat, writer->outConflictsTab);
+ while ((conflict = hash_seq_search(&seqstat))!=NULL)
{
- failure = true;
- break;
+ SERIALIZABLEXACT *t2 = conflict->sxactIn;
+
+ if (SxactIsPrepared(t2)
+ && (!SxactIsCommitted(reader)
+ || t2->prepareSeqNo <= reader->commitSeqNo)
+ && (!SxactIsCommitted(writer)
+ || t2->prepareSeqNo <= writer->commitSeqNo)
+ && (!SxactIsReadOnly(reader)
+ || t2->prepareSeqNo <= reader->SeqNo.lastCommitBeforeSnapshot))
+ {
+ failure = true;
+ break;
+ }
}
- conflict = (RWConflict)
- SHMQueueNext(&writer->outConflicts,
- &conflict->outLink,
- offsetof(RWConflictData, outLink));
}
}
@@ -4614,27 +4610,22 @@ OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
conflict = NULL;
}
else
- conflict = (RWConflict)
- SHMQueueNext(&reader->inConflicts,
- &reader->inConflicts,
- offsetof(RWConflictData, inLink));
- while (conflict)
{
- SERIALIZABLEXACT *t0 = conflict->sxactOut;
-
- if (!SxactIsDoomed(t0)
- && (!SxactIsCommitted(t0)
- || t0->commitSeqNo >= writer->prepareSeqNo)
- && (!SxactIsReadOnly(t0)
- || t0->SeqNo.lastCommitBeforeSnapshot >= writer->prepareSeqNo))
+ hash_seq_init(&seqstat, reader->inConflictsTab);
+ while((conflict = (RWConflict)hash_seq_search(&seqstat))!=NULL)
{
- failure = true;
- break;
+ SERIALIZABLEXACT *t0 = conflict->sxactOut;
+
+ if (!SxactIsDoomed(t0)
+ && (!SxactIsCommitted(t0)
+ || t0->commitSeqNo >= writer->prepareSeqNo)
+ && (!SxactIsReadOnly(t0)
+ || t0->SeqNo.lastCommitBeforeSnapshot >= writer->prepareSeqNo))
+ {
+ failure = true;
+ break;
+ }
}
- conflict = (RWConflict)
- SHMQueueNext(&reader->inConflicts,
- &conflict->inLink,
- offsetof(RWConflictData, inLink));
}
}
@@ -4693,6 +4684,7 @@ void
PreCommit_CheckForSerializationFailure(void)
{
RWConflict nearConflict;
+ HASH_SEQ_STATUS nearseqstat;
if (MySerializableXact == InvalidSerializableXact)
return;
@@ -4712,22 +4704,18 @@ PreCommit_CheckForSerializationFailure(void)
errhint("The transaction might succeed if retried.")));
}
- nearConflict = (RWConflict)
- SHMQueueNext(&MySerializableXact->inConflicts,
- &MySerializableXact->inConflicts,
- offsetof(RWConflictData, inLink));
- while (nearConflict)
+ hash_seq_init(&nearseqstat, MySerializableXact->inConflictsTab);
+ while((nearConflict = hash_seq_search(&nearseqstat))!=NULL)
{
if (!SxactIsCommitted(nearConflict->sxactOut)
&& !SxactIsDoomed(nearConflict->sxactOut))
{
- RWConflict farConflict;
+ RWConflict farConflict;
+ HASH_SEQ_STATUS farseqstat;
+
+ hash_seq_init(&farseqstat, nearConflict->sxactOut->inConflictsTab);
- farConflict = (RWConflict)
- SHMQueueNext(&nearConflict->sxactOut->inConflicts,
- &nearConflict->sxactOut->inConflicts,
- offsetof(RWConflictData, inLink));
- while (farConflict)
+ while((farConflict = hash_seq_search(&farseqstat))!=NULL)
{
if (farConflict->sxactOut == MySerializableXact
|| (!SxactIsCommitted(farConflict->sxactOut)
@@ -4752,17 +4740,8 @@ PreCommit_CheckForSerializationFailure(void)
nearConflict->sxactOut->flags |= SXACT_FLAG_DOOMED;
break;
}
- farConflict = (RWConflict)
- SHMQueueNext(&nearConflict->sxactOut->inConflicts,
- &farConflict->inLink,
- offsetof(RWConflictData, inLink));
}
}
-
- nearConflict = (RWConflict)
- SHMQueueNext(&MySerializableXact->inConflicts,
- &nearConflict->inLink,
- offsetof(RWConflictData, inLink));
}
MySerializableXact->prepareSeqNo = ++(PredXact->LastSxactCommitSeqNo);
@@ -4948,7 +4927,6 @@ predicatelock_twophase_recover(TransactionId xid, uint16 info,
* recovered xact started are still active, except possibly other
* prepared xacts and we don't care whether those are RO_SAFE or not.
*/
- SHMQueueInit(&(sxact->possibleUnsafeConflicts));
SHMQueueInit(&(sxact->predicateLocks));
SHMQueueElemInit(&(sxact->finishedLink));
@@ -4969,8 +4947,6 @@ predicatelock_twophase_recover(TransactionId xid, uint16 info,
* we'll conservatively assume that it had both a conflict in and a
* conflict out, and represent that with the summary conflict flags.
*/
- SHMQueueInit(&(sxact->outConflicts));
- SHMQueueInit(&(sxact->inConflicts));
sxact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
sxact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
diff --git a/src/include/storage/predicate_internals.h b/src/include/storage/predicate_internals.h
index 3cb0ab9..948827d 100644
--- a/src/include/storage/predicate_internals.h
+++ b/src/include/storage/predicate_internals.h
@@ -38,6 +38,12 @@ typedef uint64 SerCommitSeqNo;
#define FirstNormalSerCommitSeqNo ((SerCommitSeqNo) 2)
/*
+ * Hash table size in SERIALIZABLEXACT
+ */
+#define SERIALIZABLEXACT_CONFLICT_HASHTAB_SIZE 200
+
+
+/*
* The SERIALIZABLEXACT struct contains information needed for each
* serializable database transaction to support SSI techniques.
*
@@ -83,10 +89,10 @@ typedef struct SERIALIZABLEXACT
SerCommitSeqNo lastCommitBeforeSnapshot; /* when not committed or
* no conflict out */
} SeqNo;
- SHM_QUEUE outConflicts; /* list of write transactions whose data we
- * couldn't read. */
- SHM_QUEUE inConflicts; /* list of read transactions which couldn't
- * see our write. */
+
+ HTAB* outConflictsTab; /* Table of write transactions whose data we couldn't read */
+ HTAB* inConflictsTab; /* Table of read transactions which couldn't see our write. */
+
SHM_QUEUE predicateLocks; /* list of associated PREDICATELOCK objects */
SHM_QUEUE finishedLink; /* list link in
* FinishedSerializableTransactions */
@@ -95,7 +101,7 @@ typedef struct SERIALIZABLEXACT
* for r/o transactions: list of concurrent r/w transactions that we could
* potentially have conflicts with, and vice versa for r/w transactions
*/
- SHM_QUEUE possibleUnsafeConflicts;
+ HTAB* possibleUnsafeConflictsTab;
TransactionId topXid; /* top level xid for the transaction, if one
* exists; else invalid */
@@ -205,18 +211,6 @@ typedef struct RWConflictData *RWConflict;
#define RWConflictDataSize \
((Size)MAXALIGN(sizeof(RWConflictData)))
-typedef struct RWConflictPoolHeaderData
-{
- SHM_QUEUE availableList;
- RWConflict element;
-} RWConflictPoolHeaderData;
-
-typedef struct RWConflictPoolHeaderData *RWConflictPoolHeader;
-
-#define RWConflictPoolHeaderDataSize \
- ((Size)MAXALIGN(sizeof(RWConflictPoolHeaderData)))
-
-
/*
* The SERIALIZABLEXIDTAG struct identifies an xid assigned to a serializable
* transaction or any of its subtransactions.
skip-list-for-conflict-tracking.patchapplication/octet-stream; name=skip-list-for-conflict-tracking.patchDownload
diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c
index 27c4af9..d75eee6 100644
--- a/src/backend/storage/lmgr/predicate.c
+++ b/src/backend/storage/lmgr/predicate.c
@@ -628,6 +628,141 @@ NextPredXact(SERIALIZABLEXACT *sxact)
}
/*------------------------------------------------------------------------*/
+/*
+ * These functions manage access to the skip list
+ */
+
+static inline bool
+outConflictsSkipListFindElem(const SERIALIZABLEXACT* reader, const SERIALIZABLEXACT* writer,
+ SHM_QUEUE** topLinkPreElem, SHM_QUEUE** belowLinkPreElem)
+{
+ unsigned long value, nextValue;
+ SHM_QUEUE *belowCurElem;
+ SHM_QUEUE *topCurElem;
+ RWConflict nextConflict;
+
+ value = (unsigned long)writer;
+ topCurElem = &reader->outConflictsTopLink;
+
+ nextConflict = (RWConflict)SHMQueueNext(&reader->outConflictsTopLink,
+ topCurElem,
+ offsetof(RWConflictData, outTopLink));
+ while (nextConflict)
+ {
+ nextValue = (unsigned long)nextConflict->sxactIn;
+ if (nextValue == value)
+ {
+ /* If the function is called by inserting an item, the element cann't exist
+ * So we don't have to set the topLinkPreElem and belowLinkPreElem
+ */
+ return true;
+ }
+ if (nextValue > value)
+ {
+ topCurElem = nextConflict->outTopLink.prev;
+ break;
+ }
+ nextConflict = (RWConflict)SHMQueueNext(&reader->outConflictsTopLink,
+ &nextConflict->outTopLink,
+ offsetof(RWConflictData, outTopLink));
+ }
+
+ belowCurElem = (SHM_QUEUE*)((char*)(topCurElem) + sizeof(SHM_QUEUE));
+ nextConflict = (RWConflict)SHMQueueNext(&reader->outConflicts,
+ belowCurElem,
+ offsetof(RWConflictData, outLink));
+ while (nextConflict)
+ {
+ nextValue = (unsigned long)nextConflict->sxactIn;
+ if (nextValue == value)
+ return true;
+ if (nextValue > value)
+ {
+ belowCurElem = nextConflict->outLink.prev;
+ break;
+ }
+ nextConflict = (RWConflict)SHMQueueNext(&reader->outConflicts,
+ &nextConflict->outLink,
+ offsetof(RWConflictData, outLink));
+ }
+ if (topLinkPreElem)
+ *topLinkPreElem = topCurElem;
+ if (belowLinkPreElem)
+ *belowLinkPreElem = belowCurElem;
+ return false;
+}
+
+static inline void
+addAnTopElemForSkipList(const SERIALIZABLEXACT* sxact)
+{
+ RWConflict curElem;
+ int i;
+
+ curElem = (RWConflict)SHMQueueNext(&sxact->outConflicts,
+ &sxact->outConflicts,
+ offsetof(RWConflictData, outLink));
+ i = 0;
+ while (curElem && i < 3)
+ {
+ curElem = (RWConflict)SHMQueueNext(&curElem->outLink,
+ &curElem->outLink,
+ offsetof(RWConflictData, outLink));
+ i++;
+ }
+ Assert(curElem);
+ Assert(sxact->outConflictsTopLink.next == &sxact->outConflictsTopLink);
+ SHMQueueInsertAfter(&sxact->outConflictsTopLink, &curElem->outTopLink);
+}
+
+static inline void
+InsertOutConflictSkipList(SERIALIZABLEXACT* reader, RWConflict conflict)
+{
+ SHM_QUEUE *topLinkPreElem, *belowLinkPreElem;
+ bool find;
+
+ topLinkPreElem = belowLinkPreElem = NULL;
+ find = outConflictsSkipListFindElem(reader, conflict->sxactIn,
+ &topLinkPreElem, &belowLinkPreElem);
+ Assert(!find);
+ Assert(topLinkPreElem);
+ Assert(belowLinkPreElem);
+
+ if (rand()%3 == 0)
+ SHMQueueInsertAfter(topLinkPreElem, &conflict->outTopLink);
+ SHMQueueInsertAfter(belowLinkPreElem, &conflict->outLink);
+
+}
+
+static inline void
+InsertOutConflict(SERIALIZABLEXACT* reader, RWConflict conflict)
+{
+ RWConflict curElem;
+
+ if (reader->conflictOutUseList)
+ {
+ curElem = (RWConflict)SHMQueueNext(&reader->outConflicts,
+ &reader->outConflicts,
+ offsetof(RWConflictData, outLink));
+ while (curElem)
+ {
+ if ((unsigned long)curElem->sxactIn > (unsigned long)conflict->sxactIn)
+ {
+ SHMQueueInsertBefore(&curElem->outLink, &conflict->outLink);
+ return;
+ }
+ curElem = (RWConflict)SHMQueueNext(&reader->outConflicts,
+ &curElem->outLink,
+ offsetof(RWConflictData, outLink));
+ }
+ SHMQueueInsertBefore(&reader->outConflicts, &conflict->outLink);
+ }
+ else
+ {
+ InsertOutConflictSkipList(reader, conflict);
+ }
+}
+
+/*------------------------------------------------------------------------*/
/*
* These functions manage primitive access to the RWConflict pool and lists.
@@ -636,7 +771,10 @@ static bool
RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer)
{
RWConflict conflict;
-
+ Size valueOffset;
+ bool result;
+
+ result= false;
Assert(reader != writer);
/* Check the ends of the purported conflict first. */
@@ -645,24 +783,30 @@ RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer)
|| SHMQueueEmpty(&reader->outConflicts)
|| SHMQueueEmpty(&writer->inConflicts))
return false;
-
+
/* A conflict is possible; walk the list to find out. */
- conflict = (RWConflict)
- SHMQueueNext(&reader->outConflicts,
- &reader->outConflicts,
- offsetof(RWConflictData, outLink));
- while (conflict)
- {
- if (conflict->sxactIn == writer)
- return true;
- conflict = (RWConflict)
+ if (reader->outConflictsNum > 10)
+ {
+ result = outConflictsSkipListFindElem(reader, writer, NULL, NULL);
+ }else{
+ conflict = (RWConflict)
SHMQueueNext(&reader->outConflicts,
- &conflict->outLink,
+ &reader->outConflicts,
offsetof(RWConflictData, outLink));
- }
-
- /* No conflict found. */
- return false;
+ while (conflict)
+ {
+ if (conflict->sxactIn == writer){
+ // return true;
+ result = true;
+ break;
+ }
+ conflict = (RWConflict)
+ SHMQueueNext(&reader->outConflicts,
+ &conflict->outLink,
+ offsetof(RWConflictData, outLink));
+ }
+ }
+ return result;
}
static void
@@ -684,11 +828,24 @@ SetRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer)
errhint("You might need to run fewer transactions at a time or increase max_connections.")));
SHMQueueDelete(&conflict->outLink);
+ SHMQueueInit(&conflict->outTopLink);
+ SHMQueueInit(&conflict->inTopLink);
conflict->sxactOut = reader;
conflict->sxactIn = writer;
- SHMQueueInsertBefore(&reader->outConflicts, &conflict->outLink);
+
+ /* because we only need check out conflicts of reader;
+ * So it is not necessary to maintain the skip list of inConflicts
+ */
SHMQueueInsertBefore(&writer->inConflicts, &conflict->inLink);
+ InsertOutConflict(reader, conflict);
+
+ reader->outConflictsNum++;
+ if (reader->conflictOutUseList && reader->outConflictsNum > 6){
+ /* Now we have 8 out Conflicts; we'd better add an item in the top link */
+ addAnTopElemForSkipList(reader);
+ reader->conflictOutUseList = false;
+ }
}
static void
@@ -726,6 +883,10 @@ ReleaseRWConflict(RWConflict conflict)
{
SHMQueueDelete(&conflict->inLink);
SHMQueueDelete(&conflict->outLink);
+ if (conflict->inTopLink.prev)
+ SHMQueueDelete(&conflict->inTopLink);
+ if (conflict->outTopLink.prev)
+ SHMQueueDelete(&conflict->outTopLink);
SHMQueueInsertBefore(&RWConflictPool->availableList, &conflict->outLink);
}
@@ -1204,7 +1365,15 @@ InitPredicateLocks(void)
PredXact->OldCommittedSxact->prepareSeqNo = 0;
PredXact->OldCommittedSxact->commitSeqNo = 0;
PredXact->OldCommittedSxact->SeqNo.lastCommitBeforeSnapshot = 0;
+
+ PredXact->OldCommittedSxact->conflictOutUseList = true;
+ PredXact->OldCommittedSxact->conflictInUseList = true;
+ PredXact->OldCommittedSxact->outConflictsNum = 0;
+ PredXact->OldCommittedSxact->inConflictsNum = 0;
+
+ SHMQueueInit(&PredXact->OldCommittedSxact->outConflictsTopLink);
SHMQueueInit(&PredXact->OldCommittedSxact->outConflicts);
+ SHMQueueInit(&PredXact->OldCommittedSxact->inConflictsTopLink);
SHMQueueInit(&PredXact->OldCommittedSxact->inConflicts);
SHMQueueInit(&PredXact->OldCommittedSxact->predicateLocks);
SHMQueueInit(&PredXact->OldCommittedSxact->finishedLink);
@@ -1796,8 +1965,16 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
sxact->SeqNo.lastCommitBeforeSnapshot = PredXact->LastSxactCommitSeqNo;
sxact->prepareSeqNo = InvalidSerCommitSeqNo;
sxact->commitSeqNo = InvalidSerCommitSeqNo;
+
+ sxact->conflictOutUseList = true;
+ sxact->conflictInUseList = true;
+ sxact->outConflictsNum = 0;
+ SHMQueueInit(&(sxact->outConflictsTopLink));
SHMQueueInit(&(sxact->outConflicts));
+ sxact->inConflictsNum = 0;
+ SHMQueueInit(&(sxact->inConflictsTopLink));
SHMQueueInit(&(sxact->inConflicts));
+
SHMQueueInit(&(sxact->possibleUnsafeConflicts));
sxact->topXid = GetTopTransactionIdIfAny();
sxact->finishedBefore = InvalidTransactionId;
@@ -3423,8 +3600,11 @@ ReleasePredicateLocks(bool isCommit)
if (!isCommit
|| SxactIsCommitted(conflict->sxactIn)
- || (conflict->sxactIn->SeqNo.lastCommitBeforeSnapshot >= PredXact->LastSxactCommitSeqNo))
+ || (conflict->sxactIn->SeqNo.lastCommitBeforeSnapshot >= PredXact->LastSxactCommitSeqNo)){
+ MySerializableXact->outConflictsNum--;
+ conflict->sxactIn->inConflictsNum--;
ReleaseRWConflict(conflict);
+ }
conflict = nextConflict;
}
@@ -3446,8 +3626,11 @@ ReleasePredicateLocks(bool isCommit)
if (!isCommit
|| SxactIsCommitted(conflict->sxactOut)
- || SxactIsReadOnly(conflict->sxactOut))
+ || SxactIsReadOnly(conflict->sxactOut)){
+ MySerializableXact->inConflictsNum--;
+ conflict->sxactOut->outConflictsNum--;
ReleaseRWConflict(conflict);
+ }
conflict = nextConflict;
}
@@ -3839,6 +4022,8 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
if (summarize)
conflict->sxactIn->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
ReleaseRWConflict(conflict);
+ sxact->outConflictsNum--;
+ conflict->sxactIn->inConflictsNum--;
conflict = nextConflict;
}
}
@@ -3857,6 +4042,8 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
if (summarize)
conflict->sxactOut->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
ReleaseRWConflict(conflict);
+ sxact->inConflictsNum--;
+ conflict->sxactOut->outConflictsNum--;
conflict = nextConflict;
}
@@ -4209,6 +4396,7 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag)
* transaction may have flagged a conflict.
*/
if (!SxactIsDoomed(sxact)
+ && !SxactIsDoomed(MySerializableXact)
&& (!SxactIsCommitted(sxact)
|| TransactionIdPrecedes(GetTransactionSnapshot()->xmin,
sxact->finishedBefore))
@@ -4969,7 +5157,9 @@ predicatelock_twophase_recover(TransactionId xid, uint16 info,
* we'll conservatively assume that it had both a conflict in and a
* conflict out, and represent that with the summary conflict flags.
*/
+ SHMQueueInit(&(sxact->outConflictsTopLink));
SHMQueueInit(&(sxact->outConflicts));
+ SHMQueueInit(&(sxact->inConflictsTopLink));
SHMQueueInit(&(sxact->inConflicts));
sxact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
sxact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
diff --git a/src/include/storage/predicate_internals.h b/src/include/storage/predicate_internals.h
index 3cb0ab9..be68d4f 100644
--- a/src/include/storage/predicate_internals.h
+++ b/src/include/storage/predicate_internals.h
@@ -83,8 +83,15 @@ typedef struct SERIALIZABLEXACT
SerCommitSeqNo lastCommitBeforeSnapshot; /* when not committed or
* no conflict out */
} SeqNo;
+ int outConflictsNum;
+ bool conflictOutUseList;
+ SHM_QUEUE outConflictsTopLink;
SHM_QUEUE outConflicts; /* list of write transactions whose data we
* couldn't read. */
+
+ int inConflictsNum;
+ bool conflictInUseList;
+ SHM_QUEUE inConflictsTopLink;
SHM_QUEUE inConflicts; /* list of read transactions which couldn't
* see our write. */
SHM_QUEUE predicateLocks; /* list of associated PREDICATELOCK objects */
@@ -182,6 +189,15 @@ typedef struct PredXactListData *PredXactList;
((Size)MAXALIGN(sizeof(PredXactListData)))
+/*
+ *
+ */
+typedef struct SHM_SKIPLIST
+{
+ SHM_QUEUE topLink;
+ SHM_QUEUE belowLink;
+}SHM_SKIPLIST;
+
/*
* The following types are used to provide lists of rw-conflicts between
* pairs of transactions. Since exactly the same information is needed,
@@ -194,7 +210,9 @@ typedef struct PredXactListData *PredXactList;
*/
typedef struct RWConflictData
{
+ SHM_QUEUE outTopLink;
SHM_QUEUE outLink; /* link for list of conflicts out from a sxact */
+ SHM_QUEUE inTopLink;
SHM_QUEUE inLink; /* link for list of conflicts in to a sxact */
SERIALIZABLEXACT *sxactOut;
SERIALIZABLEXACT *sxactIn;
reduce-contention-on-FinishedListLock.patchapplication/octet-stream; name=reduce-contention-on-FinishedListLock.patchDownload
diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c
index 27c4af9..26e76a7 100644
--- a/src/backend/storage/lmgr/predicate.c
+++ b/src/backend/storage/lmgr/predicate.c
@@ -3569,11 +3569,17 @@ ClearOldPredicateLocks(void)
SERIALIZABLEXACT *finishedSxact;
PREDICATELOCK *predlock;
- /*
+ /* Clear at most 200 finishedSxact once*/
+ int releasedNum = 0;
+ SERIALIZABLEXACT* releasedSxact[200];
+
+ /*
* Loop through finished transactions. They are in commit order, so we can
* stop as soon as we find one that's still interesting.
*/
- LWLockAcquire(SerializableFinishedListLock, LW_EXCLUSIVE);
+ if (!LWLockConditionalAcquire(SerializableFinishedListLock, LW_EXCLUSIVE)){
+ return;
+ }
finishedSxact = (SERIALIZABLEXACT *)
SHMQueueNext(FinishedSerializableTransactions,
FinishedSerializableTransactions,
@@ -3595,10 +3601,8 @@ ClearOldPredicateLocks(void)
* This transaction committed before any in-progress transaction
* took its snapshot. It's no longer interesting.
*/
- LWLockRelease(SerializableXactHashLock);
SHMQueueDelete(&(finishedSxact->finishedLink));
- ReleaseOneSerializableXact(finishedSxact, false, false);
- LWLockAcquire(SerializableXactHashLock, LW_SHARED);
+ releasedSxact[releasedNum++] = finishedSxact;
}
else if (finishedSxact->commitSeqNo > PredXact->HavePartialClearedThrough
&& finishedSxact->commitSeqNo <= PredXact->CanPartialClearThrough)
@@ -3614,8 +3618,8 @@ ClearOldPredicateLocks(void)
{
/* A read-only transaction can be removed entirely */
SHMQueueDelete(&(finishedSxact->finishedLink));
- ReleaseOneSerializableXact(finishedSxact, false, false);
- }
+ releasedSxact[releasedNum++] = finishedSxact;
+ }
else
{
/*
@@ -3635,6 +3639,11 @@ ClearOldPredicateLocks(void)
break;
}
finishedSxact = nextSxact;
+
+ if (releasedNum == 200){
+ printf("more than 200 released num\n");
+ break;
+ }
}
LWLockRelease(SerializableXactHashLock);
@@ -3698,7 +3707,14 @@ ClearOldPredicateLocks(void)
}
LWLockRelease(SerializablePredicateLockListLock);
- LWLockRelease(SerializableFinishedListLock);
+ LWLockRelease(SerializableFinishedListLock);
+
+ int i;
+ for(i=0; i<releasedNum; i++){
+ ReleaseOneSerializableXact(releasedSxact[i], false, false);
+ }
+
+
}
/*
@@ -3732,7 +3748,7 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
Assert(sxact != NULL);
Assert(SxactIsRolledBack(sxact) || SxactIsCommitted(sxact));
Assert(partial || !SxactIsOnFinishedList(sxact));
- Assert(LWLockHeldByMe(SerializableFinishedListLock));
+// Assert(LWLockHeldByMe(SerializableFinishedListLock));
/*
* First release all the predicate locks held by this xact (or transfer