[GSOC][weekly report 7] Eliminate O(N^2) scaling from rw-conflict tracking in serializable transactions
In the last week, I focus on
1) Creating an independent skip list data structure and related interfaces.
Now it has only two levels so that I don't have to modify too much existing code. But it is easy to be transformed into the data structure with any number of levels if necessary. Unfortunately, its performance is not good. In some cases, it's only 1/2 of original version. It reminded me that even conflict-tracking didn't consume too much CPU time, it was on the critical path and wrapped by a pair of lock acquiring and releasing. Slower conflicts tracking would result in more lock contentions, which would make the performance drop quickly.
2) Using some tricks to improve its performance.
For example, I found if the length of the conflict list is smaller than 10, the original linked list is faster
than the skip list. So I used it in a hybrid way: if the total conflicts are more than 10, using skip list; otherwise using linked list.
Now, the performance is approximately equal to the original version in different benchmarks.
But I don't found a case in which the new version is much faster.
The patch is attached.
So far, I have tried: 1) using hash table for conflict tracking.
2) reducing the global lock contention
3) using skip list for conflict tracking.
But all of them did not improve the performance. So I'm a little confused now about what to do next.
Could you please give me any suggestions?
--
Sincerely
Mengxing Liu
Attachments:
skip-list-for-conflict-tracking.patchapplication/octet-stream; name=skip-list-for-conflict-tracking.patchDownload
diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c
index 27c4af9..e97e151 100644
--- a/src/backend/storage/lmgr/predicate.c
+++ b/src/backend/storage/lmgr/predicate.c
@@ -628,6 +628,103 @@ NextPredXact(SERIALIZABLEXACT *sxact)
}
/*------------------------------------------------------------------------*/
+/*
+ * These functions manage access to the skip list
+ */
+
+#define GET_SKIPLIST_VALUE(elem, offset) \
+ (*(unsigned long*)(((char*)(elem)) + (offset)))
+#define SKIPLIST_VALUE_OFFSET(type, skiplist_field, value_field) \
+ (offsetof(type, value_field) - offsetof(type, skiplist_field))
+
+
+static inline bool
+SkipListFindElem(SHM_SKIPLIST* skiplist, Size valueOffset, unsigned long value,
+ SHM_SKIPLIST** topLinkPreElem, SHM_SKIPLIST** belowLinkPreElem)
+{
+ bool result;
+ unsigned long curValue;
+ SHM_SKIPLIST* curElem, *nextElem;
+
+ curElem = skiplist;
+ result = false;
+ nextElem = (SHM_SKIPLIST*) SHMQueueNext(&(skiplist->topLink),
+ &(curElem->topLink),
+ offsetof(SHM_SKIPLIST, topLink));
+ while (nextElem)
+ {
+ curValue = GET_SKIPLIST_VALUE(nextElem, valueOffset);
+ if (curValue == value)
+ {
+ if (topLinkPreElem)
+ *topLinkPreElem = nextElem;
+ if (belowLinkPreElem)
+ *belowLinkPreElem = nextElem;
+ return true;
+ }
+ if (curValue > value)
+ break;
+ curElem = nextElem;
+ nextElem = (SHM_SKIPLIST*)SHMQueueNext(&skiplist->topLink,
+ &nextElem->topLink,
+ offsetof(SHM_SKIPLIST, topLink));
+ }
+ if (topLinkPreElem)
+ *topLinkPreElem = curElem;
+
+ nextElem = (SHM_SKIPLIST*)SHMQueueNext(&(skiplist->belowLink),
+ &(curElem->belowLink),
+ offsetof(SHM_SKIPLIST, belowLink));
+ while (nextElem)
+ {
+ curValue = GET_SKIPLIST_VALUE(nextElem, valueOffset);
+ if (curValue == value)
+ {
+ result = true;
+ break;
+ }
+ if (curValue > value)
+ break;
+ curElem = nextElem;
+ nextElem = (SHM_SKIPLIST*) SHMQueueNext(&skiplist->belowLink,
+ &nextElem->belowLink,
+ offsetof(SHM_SKIPLIST, belowLink));
+ }
+
+ if (belowLinkPreElem)
+ *belowLinkPreElem = curElem;
+ return result;
+}
+
+static inline void
+SkipListInsert(SHM_SKIPLIST* skiplist, Size valueOffset, SHM_SKIPLIST* elem)
+{
+ unsigned long elemValue;
+ SHM_SKIPLIST* topLinkPreElem, *belowLinkPreElem;
+ bool find;
+ topLinkPreElem = NULL;
+ belowLinkPreElem = NULL;
+
+ elemValue = GET_SKIPLIST_VALUE(elem, valueOffset);
+ find = SkipListFindElem(skiplist, valueOffset, elemValue, &topLinkPreElem, &belowLinkPreElem);
+ Assert(find == false);
+ Assert(topLinkPreElem);
+ Assert(belowLinkPreElem);
+
+ if (rand()%6 == 0)
+ SHMQueueInsertAfter(&topLinkPreElem->topLink, &elem->topLink);
+ SHMQueueInsertAfter(&belowLinkPreElem->belowLink, &elem->belowLink);
+}
+
+static inline bool
+SkipListExist(SHM_SKIPLIST* skiplist, Size valueOffset, unsigned long value)
+{
+ return SkipListFindElem(skiplist, valueOffset, value, NULL, NULL);
+}
+
+
+
+/*------------------------------------------------------------------------*/
/*
* These functions manage primitive access to the RWConflict pool and lists.
@@ -636,6 +733,7 @@ static bool
RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer)
{
RWConflict conflict;
+ Size valueOffset;
Assert(reader != writer);
@@ -647,14 +745,23 @@ RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer)
return false;
/* A conflict is possible; walk the list to find out. */
- conflict = (RWConflict)
+ if (reader->outConflictsNum > 10 )
+ {
+ valueOffset = SKIPLIST_VALUE_OFFSET(RWConflictData, outTopLink, sxactIn);
+ return SkipListExist((SHM_SKIPLIST*)&reader->outConflictsTopLink,
+ valueOffset,
+ GET_SKIPLIST_VALUE(reader, valueOffset));
+ }
+
+ conflict = (RWConflict)
SHMQueueNext(&reader->outConflicts,
&reader->outConflicts,
offsetof(RWConflictData, outLink));
while (conflict)
{
- if (conflict->sxactIn == writer)
+ if (conflict->sxactIn == writer){
return true;
+ }
conflict = (RWConflict)
SHMQueueNext(&reader->outConflicts,
&conflict->outLink,
@@ -684,11 +791,24 @@ SetRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer)
errhint("You might need to run fewer transactions at a time or increase max_connections.")));
SHMQueueDelete(&conflict->outLink);
+ SHMQueueInit(&conflict->outTopLink);
+ SHMQueueInit(&conflict->inTopLink);
conflict->sxactOut = reader;
conflict->sxactIn = writer;
- SHMQueueInsertBefore(&reader->outConflicts, &conflict->outLink);
- SHMQueueInsertBefore(&writer->inConflicts, &conflict->inLink);
+
+// SHMQueueInsertBefore(&reader->outConflicts, &conflict->outLink);
+// SHMQueueInsertBefore(&writer->inConflicts, &conflict->inLink);
+ SkipListInsert((SHM_SKIPLIST*)&reader->outConflictsTopLink,
+ SKIPLIST_VALUE_OFFSET(RWConflictData, outTopLink, sxactIn),
+ (SHM_SKIPLIST*)&conflict->outTopLink);
+ SkipListInsert((SHM_SKIPLIST*)&writer->inConflictsTopLink,
+ SKIPLIST_VALUE_OFFSET(RWConflictData, inTopLink, sxactOut),
+ (SHM_SKIPLIST*)&conflict->inTopLink);
+
+ reader->outConflictsNum++;
+ writer->inConflictsNum++;
+
}
static void
@@ -726,6 +846,10 @@ ReleaseRWConflict(RWConflict conflict)
{
SHMQueueDelete(&conflict->inLink);
SHMQueueDelete(&conflict->outLink);
+ if (conflict->inTopLink.prev)
+ SHMQueueDelete(&conflict->inTopLink);
+ if (conflict->outTopLink.prev)
+ SHMQueueDelete(&conflict->outTopLink);
SHMQueueInsertBefore(&RWConflictPool->availableList, &conflict->outLink);
}
@@ -1204,7 +1328,13 @@ InitPredicateLocks(void)
PredXact->OldCommittedSxact->prepareSeqNo = 0;
PredXact->OldCommittedSxact->commitSeqNo = 0;
PredXact->OldCommittedSxact->SeqNo.lastCommitBeforeSnapshot = 0;
+
+ PredXact->OldCommittedSxact->outConflictsNum = 0;
+ PredXact->OldCommittedSxact->inConflictsNum = 0;
+
+ SHMQueueInit(&PredXact->OldCommittedSxact->outConflictsTopLink);
SHMQueueInit(&PredXact->OldCommittedSxact->outConflicts);
+ SHMQueueInit(&PredXact->OldCommittedSxact->inConflictsTopLink);
SHMQueueInit(&PredXact->OldCommittedSxact->inConflicts);
SHMQueueInit(&PredXact->OldCommittedSxact->predicateLocks);
SHMQueueInit(&PredXact->OldCommittedSxact->finishedLink);
@@ -1796,8 +1926,14 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
sxact->SeqNo.lastCommitBeforeSnapshot = PredXact->LastSxactCommitSeqNo;
sxact->prepareSeqNo = InvalidSerCommitSeqNo;
sxact->commitSeqNo = InvalidSerCommitSeqNo;
+
+ sxact->outConflictsNum = 0;
+ SHMQueueInit(&(sxact->outConflictsTopLink));
SHMQueueInit(&(sxact->outConflicts));
+ sxact->inConflictsNum = 0;
+ SHMQueueInit(&(sxact->inConflictsTopLink));
SHMQueueInit(&(sxact->inConflicts));
+
SHMQueueInit(&(sxact->possibleUnsafeConflicts));
sxact->topXid = GetTopTransactionIdIfAny();
sxact->finishedBefore = InvalidTransactionId;
@@ -3423,8 +3559,11 @@ ReleasePredicateLocks(bool isCommit)
if (!isCommit
|| SxactIsCommitted(conflict->sxactIn)
- || (conflict->sxactIn->SeqNo.lastCommitBeforeSnapshot >= PredXact->LastSxactCommitSeqNo))
+ || (conflict->sxactIn->SeqNo.lastCommitBeforeSnapshot >= PredXact->LastSxactCommitSeqNo)){
+ MySerializableXact->outConflictsNum--;
+ conflict->sxactIn->inConflictsNum--;
ReleaseRWConflict(conflict);
+ }
conflict = nextConflict;
}
@@ -3446,8 +3585,11 @@ ReleasePredicateLocks(bool isCommit)
if (!isCommit
|| SxactIsCommitted(conflict->sxactOut)
- || SxactIsReadOnly(conflict->sxactOut))
+ || SxactIsReadOnly(conflict->sxactOut)){
+ MySerializableXact->inConflictsNum--;
+ conflict->sxactOut->outConflictsNum--;
ReleaseRWConflict(conflict);
+ }
conflict = nextConflict;
}
@@ -3839,6 +3981,8 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
if (summarize)
conflict->sxactIn->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
ReleaseRWConflict(conflict);
+ sxact->outConflictsNum--;
+ conflict->sxactIn->inConflictsNum--;
conflict = nextConflict;
}
}
@@ -3857,6 +4001,8 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
if (summarize)
conflict->sxactOut->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
ReleaseRWConflict(conflict);
+ sxact->inConflictsNum--;
+ conflict->sxactOut->outConflictsNum--;
conflict = nextConflict;
}
@@ -4969,7 +5115,9 @@ predicatelock_twophase_recover(TransactionId xid, uint16 info,
* we'll conservatively assume that it had both a conflict in and a
* conflict out, and represent that with the summary conflict flags.
*/
+ SHMQueueInit(&(sxact->outConflictsTopLink));
SHMQueueInit(&(sxact->outConflicts));
+ SHMQueueInit(&(sxact->inConflictsTopLink));
SHMQueueInit(&(sxact->inConflicts));
sxact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
sxact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
diff --git a/src/include/storage/predicate_internals.h b/src/include/storage/predicate_internals.h
index 3cb0ab9..dbc5bcf 100644
--- a/src/include/storage/predicate_internals.h
+++ b/src/include/storage/predicate_internals.h
@@ -83,8 +83,13 @@ typedef struct SERIALIZABLEXACT
SerCommitSeqNo lastCommitBeforeSnapshot; /* when not committed or
* no conflict out */
} SeqNo;
+ int outConflictsNum;
+ SHM_QUEUE outConflictsTopLink;
SHM_QUEUE outConflicts; /* list of write transactions whose data we
* couldn't read. */
+
+ int inConflictsNum;
+ SHM_QUEUE inConflictsTopLink;
SHM_QUEUE inConflicts; /* list of read transactions which couldn't
* see our write. */
SHM_QUEUE predicateLocks; /* list of associated PREDICATELOCK objects */
@@ -182,6 +187,15 @@ typedef struct PredXactListData *PredXactList;
((Size)MAXALIGN(sizeof(PredXactListData)))
+/*
+ *
+ */
+typedef struct SHM_SKIPLIST
+{
+ SHM_QUEUE topLink;
+ SHM_QUEUE belowLink;
+}SHM_SKIPLIST;
+
/*
* The following types are used to provide lists of rw-conflicts between
* pairs of transactions. Since exactly the same information is needed,
@@ -194,7 +208,9 @@ typedef struct PredXactListData *PredXactList;
*/
typedef struct RWConflictData
{
+ SHM_QUEUE outTopLink;
SHM_QUEUE outLink; /* link for list of conflicts out from a sxact */
+ SHM_QUEUE inTopLink;
SHM_QUEUE inLink; /* link for list of conflicts in to a sxact */
SERIALIZABLEXACT *sxactOut;
SERIALIZABLEXACT *sxactIn;