[GSOC][weekly report 8] Eliminate O(N^2) scaling from rw-conflict tracking in serializable transactions

Started by Mengxing Liuover 8 years ago1 messages
#1Mengxing Liu
liu-mx15@mails.tsinghua.edu.cn
1 attachment(s)

In the last week, I focused on tuning the performance of skip list and fixed several bugs.
1. As only out-conflicts are checked in RWConflictExists, I removed all modification concerned with in-conflicts;
2. If the conflict list is too short, I inserted an item just like inserting into an ordered linked list, that is,
comparing items existing in the list one by one to find the right position. Using skip list is slow when the list is short.
3. Not using the abstract skip list interface; I was afraid that it would bring a little overhead. But results showed that
it had no influence. I will roll it back if necessary.

Sadly, The performance is not better than the original version. But it's highest one among all efforts I did before.
It likes hash table. Tracking conflict is faster but inserting conflicts is slower.
In a quick test, cpu cycles consumed by two functions RWConflictExists/SetRWConflict is as below.
| | RWConflictExists | SetRWConflict |
| linked list | 1.39% | 0.14% |
| skip list | 1.15% | 0.35% |

According to the suggestions of Alvaro, I'll give a detailed performance report tomorrow.

--

Sincerely

Mengxing Liu

Attachments:

skip-list-for-conflict-tracking.patchapplication/octet-stream; name=skip-list-for-conflict-tracking.patchDownload
diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c
index 27c4af9..d75eee6 100644
--- a/src/backend/storage/lmgr/predicate.c
+++ b/src/backend/storage/lmgr/predicate.c
@@ -628,6 +628,141 @@ NextPredXact(SERIALIZABLEXACT *sxact)
 }
 
 /*------------------------------------------------------------------------*/
+/*
+ * These functions manage access to the skip list
+ */
+
+static inline bool 
+outConflictsSkipListFindElem(const SERIALIZABLEXACT* reader, const SERIALIZABLEXACT* writer,
+							 SHM_QUEUE** topLinkPreElem, SHM_QUEUE** belowLinkPreElem)
+{
+	unsigned long value, nextValue;
+	SHM_QUEUE *belowCurElem;
+	SHM_QUEUE *topCurElem;
+	RWConflict nextConflict;
+
+	value = (unsigned long)writer;
+	topCurElem = &reader->outConflictsTopLink;
+
+	nextConflict = (RWConflict)SHMQueueNext(&reader->outConflictsTopLink,
+										topCurElem,
+										offsetof(RWConflictData, outTopLink));
+	while (nextConflict)
+	{
+		nextValue = (unsigned long)nextConflict->sxactIn;
+		if (nextValue == value)
+		{
+			/* If the function is called by inserting an item, the element cann't exist 
+			 * So we don't have to set the topLinkPreElem and belowLinkPreElem
+			 */
+			return true;
+		}
+		if (nextValue > value)
+		{
+			topCurElem = nextConflict->outTopLink.prev;
+			break;
+		}
+		nextConflict = (RWConflict)SHMQueueNext(&reader->outConflictsTopLink,
+										&nextConflict->outTopLink,
+										offsetof(RWConflictData, outTopLink));
+	}
+
+	belowCurElem = (SHM_QUEUE*)((char*)(topCurElem) + sizeof(SHM_QUEUE));
+	nextConflict = (RWConflict)SHMQueueNext(&reader->outConflicts,
+										belowCurElem,
+										offsetof(RWConflictData, outLink));
+	while (nextConflict)
+	{
+		nextValue = (unsigned long)nextConflict->sxactIn;
+		if (nextValue == value)
+			return true;
+		if (nextValue > value)
+		{
+			belowCurElem = nextConflict->outLink.prev;
+			break;
+		}
+		nextConflict = (RWConflict)SHMQueueNext(&reader->outConflicts,
+											&nextConflict->outLink,
+											offsetof(RWConflictData, outLink));
+	}
+	if (topLinkPreElem)
+		*topLinkPreElem = topCurElem;
+	if (belowLinkPreElem)
+		*belowLinkPreElem = belowCurElem;
+	return false;
+}
+
+static inline void
+addAnTopElemForSkipList(const SERIALIZABLEXACT* sxact)
+{
+	RWConflict curElem;
+	int i;
+
+	curElem = (RWConflict)SHMQueueNext(&sxact->outConflicts, 
+									   &sxact->outConflicts,
+									   offsetof(RWConflictData, outLink));
+	i = 0;
+	while (curElem && i < 3)
+	{
+		curElem = (RWConflict)SHMQueueNext(&curElem->outLink, 
+										   &curElem->outLink,
+										   offsetof(RWConflictData, outLink));
+		i++;
+	}
+	Assert(curElem);
+	Assert(sxact->outConflictsTopLink.next == &sxact->outConflictsTopLink);
+	SHMQueueInsertAfter(&sxact->outConflictsTopLink, &curElem->outTopLink);
+}
+
+static inline void
+InsertOutConflictSkipList(SERIALIZABLEXACT* reader, RWConflict conflict)
+{
+	SHM_QUEUE *topLinkPreElem, *belowLinkPreElem;
+	bool find;
+
+	topLinkPreElem = belowLinkPreElem = NULL;
+	find = outConflictsSkipListFindElem(reader, conflict->sxactIn, 
+									&topLinkPreElem, &belowLinkPreElem);
+	Assert(!find);
+	Assert(topLinkPreElem);
+	Assert(belowLinkPreElem);
+
+	if (rand()%3 == 0)
+		SHMQueueInsertAfter(topLinkPreElem, &conflict->outTopLink);
+	SHMQueueInsertAfter(belowLinkPreElem, &conflict->outLink);
+
+}
+
+static inline void
+InsertOutConflict(SERIALIZABLEXACT* reader, RWConflict conflict)
+{
+	RWConflict curElem;
+
+	if (reader->conflictOutUseList)
+	{
+		curElem = (RWConflict)SHMQueueNext(&reader->outConflicts, 
+										   &reader->outConflicts,
+										   offsetof(RWConflictData, outLink));
+		while (curElem)
+		{
+			if ((unsigned long)curElem->sxactIn > (unsigned long)conflict->sxactIn)
+			{
+				SHMQueueInsertBefore(&curElem->outLink, &conflict->outLink);
+				return;
+			}
+			curElem = (RWConflict)SHMQueueNext(&reader->outConflicts, 
+											   &curElem->outLink,
+											   offsetof(RWConflictData, outLink));
+		}
+		SHMQueueInsertBefore(&reader->outConflicts, &conflict->outLink);
+	}
+	else
+	{
+		InsertOutConflictSkipList(reader, conflict);
+	}
+}
+
+/*------------------------------------------------------------------------*/
 
 /*
  * These functions manage primitive access to the RWConflict pool and lists.
@@ -636,7 +771,10 @@ static bool
 RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer)
 {
 	RWConflict	conflict;
-
+	Size 		valueOffset;
+    bool result;
+    
+    result=  false;
 	Assert(reader != writer);
 
 	/* Check the ends of the purported conflict first. */
@@ -645,24 +783,30 @@ RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer)
 		|| SHMQueueEmpty(&reader->outConflicts)
 		|| SHMQueueEmpty(&writer->inConflicts))
 		return false;
-
+    
 	/* A conflict is possible; walk the list to find out. */
-	conflict = (RWConflict)
-		SHMQueueNext(&reader->outConflicts,
-					 &reader->outConflicts,
-					 offsetof(RWConflictData, outLink));
-	while (conflict)
-	{
-		if (conflict->sxactIn == writer)
-			return true;
-		conflict = (RWConflict)
+    if (reader->outConflictsNum > 10)
+    {
+		result = outConflictsSkipListFindElem(reader, writer, NULL, NULL);
+	}else{
+	    conflict = (RWConflict)
 			SHMQueueNext(&reader->outConflicts,
-						 &conflict->outLink,
+						 &reader->outConflicts,
 						 offsetof(RWConflictData, outLink));
-	}
-
-	/* No conflict found. */
-	return false;
+		while (conflict)
+		{
+			if (conflict->sxactIn == writer){
+			//	return true;
+	            result = true;
+	            break;
+			}
+			conflict = (RWConflict)
+				SHMQueueNext(&reader->outConflicts,
+							 &conflict->outLink,
+							 offsetof(RWConflictData, outLink));
+		}
+    }
+    return result;
 }
 
 static void
@@ -684,11 +828,24 @@ SetRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer)
 				 errhint("You might need to run fewer transactions at a time or increase max_connections.")));
 
 	SHMQueueDelete(&conflict->outLink);
+	SHMQueueInit(&conflict->outTopLink);
+	SHMQueueInit(&conflict->inTopLink);
 
 	conflict->sxactOut = reader;
 	conflict->sxactIn = writer;
-	SHMQueueInsertBefore(&reader->outConflicts, &conflict->outLink);
+
+	/* because we only need check out conflicts of reader; 
+	 * So it is not necessary to maintain the skip list of inConflicts
+	 */
 	SHMQueueInsertBefore(&writer->inConflicts, &conflict->inLink);
+	InsertOutConflict(reader, conflict);
+
+	reader->outConflictsNum++;
+	if (reader->conflictOutUseList && reader->outConflictsNum > 6){
+		/* Now we have 8 out Conflicts; we'd better add an item in the top link */
+		addAnTopElemForSkipList(reader);
+		reader->conflictOutUseList = false;
+	}
 }
 
 static void
@@ -726,6 +883,10 @@ ReleaseRWConflict(RWConflict conflict)
 {
 	SHMQueueDelete(&conflict->inLink);
 	SHMQueueDelete(&conflict->outLink);
+	if (conflict->inTopLink.prev)
+		SHMQueueDelete(&conflict->inTopLink);
+	if (conflict->outTopLink.prev)
+		SHMQueueDelete(&conflict->outTopLink);
 	SHMQueueInsertBefore(&RWConflictPool->availableList, &conflict->outLink);
 }
 
@@ -1204,7 +1365,15 @@ InitPredicateLocks(void)
 		PredXact->OldCommittedSxact->prepareSeqNo = 0;
 		PredXact->OldCommittedSxact->commitSeqNo = 0;
 		PredXact->OldCommittedSxact->SeqNo.lastCommitBeforeSnapshot = 0;
+
+		PredXact->OldCommittedSxact->conflictOutUseList = true;
+		PredXact->OldCommittedSxact->conflictInUseList = true;
+		PredXact->OldCommittedSxact->outConflictsNum = 0;
+		PredXact->OldCommittedSxact->inConflictsNum = 0;
+		
+		SHMQueueInit(&PredXact->OldCommittedSxact->outConflictsTopLink);
 		SHMQueueInit(&PredXact->OldCommittedSxact->outConflicts);
+		SHMQueueInit(&PredXact->OldCommittedSxact->inConflictsTopLink);
 		SHMQueueInit(&PredXact->OldCommittedSxact->inConflicts);
 		SHMQueueInit(&PredXact->OldCommittedSxact->predicateLocks);
 		SHMQueueInit(&PredXact->OldCommittedSxact->finishedLink);
@@ -1796,8 +1965,16 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
 	sxact->SeqNo.lastCommitBeforeSnapshot = PredXact->LastSxactCommitSeqNo;
 	sxact->prepareSeqNo = InvalidSerCommitSeqNo;
 	sxact->commitSeqNo = InvalidSerCommitSeqNo;
+
+	sxact->conflictOutUseList = true;
+	sxact->conflictInUseList = true;
+	sxact->outConflictsNum = 0;
+	SHMQueueInit(&(sxact->outConflictsTopLink));
 	SHMQueueInit(&(sxact->outConflicts));
+	sxact->inConflictsNum = 0;
+	SHMQueueInit(&(sxact->inConflictsTopLink));
 	SHMQueueInit(&(sxact->inConflicts));
+
 	SHMQueueInit(&(sxact->possibleUnsafeConflicts));
 	sxact->topXid = GetTopTransactionIdIfAny();
 	sxact->finishedBefore = InvalidTransactionId;
@@ -3423,8 +3600,11 @@ ReleasePredicateLocks(bool isCommit)
 
 		if (!isCommit
 			|| SxactIsCommitted(conflict->sxactIn)
-			|| (conflict->sxactIn->SeqNo.lastCommitBeforeSnapshot >= PredXact->LastSxactCommitSeqNo))
+			|| (conflict->sxactIn->SeqNo.lastCommitBeforeSnapshot >= PredXact->LastSxactCommitSeqNo)){
+			MySerializableXact->outConflictsNum--;
+            conflict->sxactIn->inConflictsNum--;
 			ReleaseRWConflict(conflict);
+		}
 
 		conflict = nextConflict;
 	}
@@ -3446,8 +3626,11 @@ ReleasePredicateLocks(bool isCommit)
 
 		if (!isCommit
 			|| SxactIsCommitted(conflict->sxactOut)
-			|| SxactIsReadOnly(conflict->sxactOut))
+			|| SxactIsReadOnly(conflict->sxactOut)){
+			MySerializableXact->inConflictsNum--;
+            conflict->sxactOut->outConflictsNum--;
 			ReleaseRWConflict(conflict);
+		}
 
 		conflict = nextConflict;
 	}
@@ -3839,6 +4022,8 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
 			if (summarize)
 				conflict->sxactIn->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
 			ReleaseRWConflict(conflict);
+			sxact->outConflictsNum--;
+            conflict->sxactIn->inConflictsNum--;
 			conflict = nextConflict;
 		}
 	}
@@ -3857,6 +4042,8 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
 		if (summarize)
 			conflict->sxactOut->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
 		ReleaseRWConflict(conflict);
+		sxact->inConflictsNum--;
+        conflict->sxactOut->outConflictsNum--;
 		conflict = nextConflict;
 	}
 
@@ -4209,6 +4396,7 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag)
 			 * transaction may have flagged a conflict.
 			 */
 			if (!SxactIsDoomed(sxact)
+				&& !SxactIsDoomed(MySerializableXact)
 				&& (!SxactIsCommitted(sxact)
 					|| TransactionIdPrecedes(GetTransactionSnapshot()->xmin,
 											 sxact->finishedBefore))
@@ -4969,7 +5157,9 @@ predicatelock_twophase_recover(TransactionId xid, uint16 info,
 		 * we'll conservatively assume that it had both a conflict in and a
 		 * conflict out, and represent that with the summary conflict flags.
 		 */
+		SHMQueueInit(&(sxact->outConflictsTopLink));
 		SHMQueueInit(&(sxact->outConflicts));
+		SHMQueueInit(&(sxact->inConflictsTopLink));
 		SHMQueueInit(&(sxact->inConflicts));
 		sxact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
 		sxact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
diff --git a/src/include/storage/predicate_internals.h b/src/include/storage/predicate_internals.h
index 3cb0ab9..be68d4f 100644
--- a/src/include/storage/predicate_internals.h
+++ b/src/include/storage/predicate_internals.h
@@ -83,8 +83,15 @@ typedef struct SERIALIZABLEXACT
 		SerCommitSeqNo lastCommitBeforeSnapshot;		/* when not committed or
 														 * no conflict out */
 	}			SeqNo;
+	int 		outConflictsNum;
+	bool 		conflictOutUseList;
+	SHM_QUEUE	outConflictsTopLink;
 	SHM_QUEUE	outConflicts;	/* list of write transactions whose data we
 								 * couldn't read. */
+
+	int 		inConflictsNum;
+	bool 		conflictInUseList;
+	SHM_QUEUE	inConflictsTopLink;
 	SHM_QUEUE	inConflicts;	/* list of read transactions which couldn't
 								 * see our write. */
 	SHM_QUEUE	predicateLocks; /* list of associated PREDICATELOCK objects */
@@ -182,6 +189,15 @@ typedef struct PredXactListData *PredXactList;
 		((Size)MAXALIGN(sizeof(PredXactListData)))
 
 
+/* 
+ * 
+ */
+typedef struct SHM_SKIPLIST
+{
+	SHM_QUEUE topLink;
+	SHM_QUEUE belowLink;
+}SHM_SKIPLIST;
+
 /*
  * The following types are used to provide lists of rw-conflicts between
  * pairs of transactions.  Since exactly the same information is needed,
@@ -194,7 +210,9 @@ typedef struct PredXactListData *PredXactList;
  */
 typedef struct RWConflictData
 {
+	SHM_QUEUE	outTopLink;
 	SHM_QUEUE	outLink;		/* link for list of conflicts out from a sxact */
+	SHM_QUEUE	inTopLink;
 	SHM_QUEUE	inLink;			/* link for list of conflicts in to a sxact */
 	SERIALIZABLEXACT *sxactOut;
 	SERIALIZABLEXACT *sxactIn;