[GSOC][weekly report 3] Eliminate O(N^2) scaling from rw-conflict tracking in serializable transactions

Started by Mengxing Liuover 8 years ago3 messages
#1Mengxing Liu
liu-mx15@mails.tsinghua.edu.cn
3 attachment(s)

Hi, all. In the last week, I fixed some bugs and replace the list of “PossibleUnsafeConflicts” with hash table.
It passed the existing 178 tests. The patch is attached.

But in my benchmark, the throughput decrease by 15% after the modification.
Can you help me do a quick review to find if there is anything wrong?
I also attached the flame graph before/after the modification for reference.

Here is my questions:
1. Is there any function in HTAB like “clear” that can make itself empty quickly?
In this patch, when releasing a transaction object, I need to scan the hash table and
use “hash_search” to remove entries one by one. It would make releasing operation slower.

In a previous email, I reported that many backends wait for the lock “SerializableFinishedListLock”;
If we don't implement functions like “ReleaseOneSerializableXact” quickly, they will be the bottleneck.

2. Is the HTAB thread-safe? I would focus on removing some unnecessary locks if possible.

My plan for the next:
I will add a TPC-B benchmark to represent classic OLTP benchmark.

--

Sincerely

Mengxing Liu

Attachments:

patchapplication/octet-stream; name=patchDownload
diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c
index 27c4af9..09c757f 100644
--- a/src/backend/storage/lmgr/predicate.c
+++ b/src/backend/storage/lmgr/predicate.c
@@ -374,12 +374,6 @@ int			max_predicate_locks_per_page;		/* set by guc.c */
 static PredXactList PredXact;
 
 /*
- * This provides a pool of RWConflict data elements to use in conflict lists
- * between transactions.
- */
-static RWConflictPoolHeader RWConflictPool;
-
-/*
  * The predicate locking hash tables are in shared memory.
  * Each backend keeps pointers to them.
  */
@@ -471,6 +465,15 @@ static void OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *read
 /*------------------------------------------------------------------------*/
 
 /*
+ * Test whether a hash table is empty.
+ * I didn't find any function in dynamic hash supports the requirement.
+ */
+#define hash_table_empty(hashp)	\
+			(hash_get_num_entries(hashp) == 0)
+
+/*------------------------------------------------------------------------*/
+
+/*
  * Does this relation participate in predicate locking? Temporary and system
  * relations are exempt, as are materialized views.
  */
@@ -573,6 +576,14 @@ CreatePredXact(void)
 
 	SHMQueueDelete(&ptle->link);
 	SHMQueueInsertBefore(&PredXact->activeList, &ptle->link);
+
+	/*
+	 * NOTE: We don't need to clean the HTAB, because all of its elements
+	 *		 has been released before.
+	 */
+	Assert(hash_table_empty(ptle->inConflictsTab));
+	Assert(hash_table_empty(ptle->outConflictsTab));
+
 	return &ptle->sxact;
 }
 
@@ -635,60 +646,55 @@ NextPredXact(SERIALIZABLEXACT *sxact)
 static bool
 RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer)
 {
-	RWConflict	conflict;
+	bool found;
 
 	Assert(reader != writer);
 
 	/* Check the ends of the purported conflict first. */
 	if (SxactIsDoomed(reader)
-		|| SxactIsDoomed(writer)
-		|| SHMQueueEmpty(&reader->outConflicts)
-		|| SHMQueueEmpty(&writer->inConflicts))
+		|| SxactIsDoomed(writer))
 		return false;
 
-	/* A conflict is possible; walk the list to find out. */
-	conflict = (RWConflict)
-		SHMQueueNext(&reader->outConflicts,
-					 &reader->outConflicts,
-					 offsetof(RWConflictData, outLink));
-	while (conflict)
-	{
-		if (conflict->sxactIn == writer)
-			return true;
-		conflict = (RWConflict)
-			SHMQueueNext(&reader->outConflicts,
-						 &conflict->outLink,
-						 offsetof(RWConflictData, outLink));
-	}
+	hash_search(reader->outConflictsTab,
+				&writer,
+				HASH_FIND,
+				&found);
 
-	/* No conflict found. */
-	return false;
+	return found;
 }
 
 static void
 SetRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer)
 {
 	RWConflict	conflict;
+	bool		found;
 
 	Assert(reader != writer);
 	Assert(!RWConflictExists(reader, writer));
 
-	conflict = (RWConflict)
-		SHMQueueNext(&RWConflictPool->availableList,
-					 &RWConflictPool->availableList,
-					 offsetof(RWConflictData, outLink));
-	if (!conflict)
-		ereport(ERROR,
+	conflict = (RWConflict)hash_search(reader->outConflictsTab,
+										&writer,
+										HASH_ENTER_NULL,
+										&found);
+	if (!conflict){
+		ereport(ERROR, 
 				(errcode(ERRCODE_OUT_OF_MEMORY),
-				 errmsg("not enough elements in RWConflictPool to record a read/write conflict"),
-				 errhint("You might need to run fewer transactions at a time or increase max_connections.")));
-
-	SHMQueueDelete(&conflict->outLink);
+				 errmsg("insert to outRWconflicts hash table failed")));
+	}
+	conflict->sxactOut = reader;
+	conflict->sxactIn = writer;
 
+	conflict = (RWConflict)hash_search(writer->inConflictsTab,
+										&reader,
+										HASH_ENTER_NULL,
+										&found);
+	if (!conflict){
+		ereport(ERROR, 
+				(errcode(ERRCODE_OUT_OF_MEMORY),
+				 errmsg("insert to inRWconflicts hash table failed")));
+	}
 	conflict->sxactOut = reader;
 	conflict->sxactIn = writer;
-	SHMQueueInsertBefore(&reader->outConflicts, &conflict->outLink);
-	SHMQueueInsertBefore(&writer->inConflicts, &conflict->inLink);
 }
 
 static void
@@ -696,44 +702,85 @@ SetPossibleUnsafeConflict(SERIALIZABLEXACT *roXact,
 						  SERIALIZABLEXACT *activeXact)
 {
 	RWConflict	conflict;
+	bool		found;
 
 	Assert(roXact != activeXact);
 	Assert(SxactIsReadOnly(roXact));
 	Assert(!SxactIsReadOnly(activeXact));
 
-	conflict = (RWConflict)
-		SHMQueueNext(&RWConflictPool->availableList,
-					 &RWConflictPool->availableList,
-					 offsetof(RWConflictData, outLink));
-	if (!conflict)
-		ereport(ERROR,
+	conflict = (RWConflict)hash_search(roXact->possibleUnsafeConflictsTab,
+										&activeXact,
+										HASH_ENTER_NULL,
+										&found);
+	if (!found || conflict == NULL)
+	{
+		ereport(ERROR, 
 				(errcode(ERRCODE_OUT_OF_MEMORY),
-				 errmsg("not enough elements in RWConflictPool to record a potential read/write conflict"),
-				 errhint("You might need to run fewer transactions at a time or increase max_connections.")));
-
-	SHMQueueDelete(&conflict->outLink);
+				 errmsg("insert to possibleUnsafeConflicts hash table failed")));
+	}
+	conflict->sxactOut = activeXact;
+	conflict->sxactIn = roXact;
 
+	conflict = (RWConflict)hash_search(activeXact->possibleUnsafeConflictsTab,
+										&roXact,
+										HASH_ENTER_NULL,
+										&found);
+	if (!found || conflict == NULL)
+	{
+		ereport(ERROR, 
+				(errcode(ERRCODE_OUT_OF_MEMORY),
+				 errmsg("insert to possibleUnsafeConflicts hash table failed")));
+	}
 	conflict->sxactOut = activeXact;
 	conflict->sxactIn = roXact;
-	SHMQueueInsertBefore(&activeXact->possibleUnsafeConflicts,
-						 &conflict->outLink);
-	SHMQueueInsertBefore(&roXact->possibleUnsafeConflicts,
-						 &conflict->inLink);
 }
 
+/*
+ *	PossibleUnsafeConflicts are stored in a different hash table. Thus we need a new function
+ *	NOTE: the two functions below could be merged.
+ */
+
 static void
 ReleaseRWConflict(RWConflict conflict)
 {
-	SHMQueueDelete(&conflict->inLink);
-	SHMQueueDelete(&conflict->outLink);
-	SHMQueueInsertBefore(&RWConflictPool->availableList, &conflict->outLink);
+	bool found;
+
+	hash_search(conflict->sxactOut->outConflictsTab,
+				&conflict->sxactIn,
+				HASH_REMOVE,
+				&found);
+	Assert(found);
+
+	hash_search(conflict->sxactIn->inConflictsTab,
+				&conflict->sxactOut,
+				HASH_REMOVE,
+				&found);
+	Assert(found);
+}
+
+static void
+ReleaseUnsafeRWConflict(RWConflict conflict)
+{
+	bool		found;
+	
+	hash_search(conflict->sxactOut->possibleUnsafeConflictsTab,
+				&conflict->sxactIn,
+				HASH_REMOVE,
+				&found);
+	Assert(found);
+
+	hash_search(conflict->sxactIn->possibleUnsafeConflictsTab,
+				&conflict->sxactOut,
+				HASH_REMOVE,
+				&found);
+	Assert(found);
 }
 
 static void
 FlagSxactUnsafe(SERIALIZABLEXACT *sxact)
 {
-	RWConflict	conflict,
-				nextConflict;
+	RWConflict			conflict;
+	HASH_SEQ_STATUS		seqstat;
 
 	Assert(SxactIsReadOnly(sxact));
 	Assert(!SxactIsROSafe(sxact));
@@ -744,23 +791,13 @@ FlagSxactUnsafe(SERIALIZABLEXACT *sxact)
 	 * We know this isn't a safe snapshot, so we can stop looking for other
 	 * potential conflicts.
 	 */
-	conflict = (RWConflict)
-		SHMQueueNext(&sxact->possibleUnsafeConflicts,
-					 &sxact->possibleUnsafeConflicts,
-					 offsetof(RWConflictData, inLink));
-	while (conflict)
+	hash_seq_init(&seqstat, sxact->possibleUnsafeConflictsTab);
+	while ((conflict = (RWConflict)hash_seq_search(&seqstat))!=NULL)
 	{
-		nextConflict = (RWConflict)
-			SHMQueueNext(&sxact->possibleUnsafeConflicts,
-						 &conflict->inLink,
-						 offsetof(RWConflictData, inLink));
-
 		Assert(!SxactIsReadOnly(conflict->sxactOut));
 		Assert(sxact == conflict->sxactIn);
 
-		ReleaseRWConflict(conflict);
-
-		conflict = nextConflict;
+		ReleaseUnsafeRWConflict(conflict);
 	}
 }
 
@@ -1181,6 +1218,9 @@ InitPredicateLocks(void)
 	{
 		int			i;
 
+		/* 50 is enough for a hash table name */
+		const char	name[50];
+
 		SHMQueueInit(&PredXact->availableList);
 		SHMQueueInit(&PredXact->activeList);
 		PredXact->SxactGlobalXmin = InvalidTransactionId;
@@ -1194,21 +1234,52 @@ InitPredicateLocks(void)
 		PredXact->element = ShmemAlloc(requestSize);
 		/* Add all elements to available list, clean. */
 		memset(PredXact->element, 0, requestSize);
+
+		MemSet(&info, 0, sizeof(info));
+		info.keysize = sizeof(SERIALIZABLEXACT*);
+		info.entrysize = sizeof(RWConflictData);
+		info.num_partitions = NUM_PREDICATELOCK_PARTITIONS;
+
 		for (i = 0; i < max_table_size; i++)
 		{
 			SHMQueueInsertBefore(&(PredXact->availableList),
 								 &(PredXact->element[i].link));
+
+			snprintf((char*)name, 50, "PredXact inConflictsTab %d", i);
+			PredXact->element[i].sxact.inConflictsTab = ShmemInitHash(name, 
+																  SERIALIZABLEXACT_CONFLICT_HASHTAB_SIZE, 
+																  SERIALIZABLEXACT_CONFLICT_HASHTAB_SIZE, 
+																  &info, 
+																  HASH_ELEM | HASH_BLOBS | 
+																  HASH_PARTITION | HASH_FIXED_SIZE);
+
+			snprintf((char*)name, 50, "PredXact outConflictsTab %d", i);
+			PredXact->element[i].sxact.outConflictsTab = ShmemInitHash(name,
+																  SERIALIZABLEXACT_CONFLICT_HASHTAB_SIZE, 
+																  SERIALIZABLEXACT_CONFLICT_HASHTAB_SIZE, 
+																  &info, 
+																  HASH_ELEM | HASH_BLOBS |
+																  HASH_PARTITION | HASH_FIXED_SIZE); 
+
+			snprintf((char*)name, 50, "PredXact possibleUnsafeConflictsTab %d", i);
+			PredXact->element[i].sxact.possibleUnsafeConflictsTab = ShmemInitHash(name,
+																				   SERIALIZABLEXACT_CONFLICT_HASHTAB_SIZE, 
+																				   SERIALIZABLEXACT_CONFLICT_HASHTAB_SIZE, 
+																				   &info, 
+																				   HASH_ELEM | HASH_BLOBS |
+																				   HASH_PARTITION | HASH_FIXED_SIZE);
 		}
+
 		PredXact->OldCommittedSxact = CreatePredXact();
 		SetInvalidVirtualTransactionId(PredXact->OldCommittedSxact->vxid);
 		PredXact->OldCommittedSxact->prepareSeqNo = 0;
 		PredXact->OldCommittedSxact->commitSeqNo = 0;
 		PredXact->OldCommittedSxact->SeqNo.lastCommitBeforeSnapshot = 0;
-		SHMQueueInit(&PredXact->OldCommittedSxact->outConflicts);
-		SHMQueueInit(&PredXact->OldCommittedSxact->inConflicts);
+		//SHMQueueInit(&PredXact->OldCommittedSxact->outConflicts);
+		//SHMQueueInit(&PredXact->OldCommittedSxact->inConflicts);
 		SHMQueueInit(&PredXact->OldCommittedSxact->predicateLocks);
 		SHMQueueInit(&PredXact->OldCommittedSxact->finishedLink);
-		SHMQueueInit(&PredXact->OldCommittedSxact->possibleUnsafeConflicts);
+		//SHMQueueInit(&PredXact->OldCommittedSxact->possibleUnsafeConflicts);
 		PredXact->OldCommittedSxact->topXid = InvalidTransactionId;
 		PredXact->OldCommittedSxact->finishedBefore = InvalidTransactionId;
 		PredXact->OldCommittedSxact->xmin = InvalidTransactionId;
@@ -1246,26 +1317,6 @@ InitPredicateLocks(void)
 	 */
 	max_table_size *= 5;
 
-	RWConflictPool = ShmemInitStruct("RWConflictPool",
-									 RWConflictPoolHeaderDataSize,
-									 &found);
-	if (!found)
-	{
-		int			i;
-
-		SHMQueueInit(&RWConflictPool->availableList);
-		requestSize = mul_size((Size) max_table_size,
-							   RWConflictDataSize);
-		RWConflictPool->element = ShmemAlloc(requestSize);
-		/* Add all elements to available list, clean. */
-		memset(RWConflictPool->element, 0, requestSize);
-		for (i = 0; i < max_table_size; i++)
-		{
-			SHMQueueInsertBefore(&(RWConflictPool->availableList),
-								 &(RWConflictPool->element[i].outLink));
-		}
-	}
-
 	/*
 	 * Create or attach to the header for the list of finished serializable
 	 * transactions.
@@ -1320,13 +1371,23 @@ PredicateLockShmemSize(void)
 	size = add_size(size, mul_size((Size) max_table_size,
 								   PredXactListElementDataSize));
 
+	/* Hash table in Sxact 
+	 *
+	 * NOTE: It should be 3*max_table_size*hash_table_size. Because for each Sxact, there are
+	 		 3 HTAB: inConflictsTab, outConflictsTab, possibleUnsafeConflictsTab.
+	 *		 But I don't know why this setting will cause "out of memory". So I set it as 6.
+	 */
+	size = add_size(size, mul_size((Size) 6*max_table_size, 
+								   hash_estimate_size(SERIALIZABLEXACT_CONFLICT_HASHTAB_SIZE, 
+								   sizeof(RWConflict))));
+
 	/* transaction xid table */
 	size = add_size(size, hash_estimate_size(max_table_size,
 											 sizeof(SERIALIZABLEXID)));
 
 	/* rw-conflict pool */
 	max_table_size *= 5;
-	size = add_size(size, RWConflictPoolHeaderDataSize);
+
 	size = add_size(size, mul_size((Size) max_table_size,
 								   RWConflictDataSize));
 
@@ -1522,7 +1583,7 @@ GetSafeSnapshot(Snapshot origSnapshot)
 		 * them marked us as conflicted.
 		 */
 		MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING;
-		while (!(SHMQueueEmpty(&MySerializableXact->possibleUnsafeConflicts) ||
+		while (!(hash_table_empty(MySerializableXact->possibleUnsafeConflictsTab) || 
 				 SxactIsROUnsafe(MySerializableXact)))
 		{
 			LWLockRelease(SerializableXactHashLock);
@@ -1569,6 +1630,7 @@ GetSafeSnapshotBlockingPids(int blocked_pid, int *output, int output_size)
 {
 	int			num_written = 0;
 	SERIALIZABLEXACT *sxact;
+	HASH_SEQ_STATUS		seqstat;
 
 	LWLockAcquire(SerializableXactHashLock, LW_SHARED);
 
@@ -1585,18 +1647,11 @@ GetSafeSnapshotBlockingPids(int blocked_pid, int *output, int output_size)
 		RWConflict	possibleUnsafeConflict;
 
 		/* Traverse the list of possible unsafe conflicts collecting PIDs. */
-		possibleUnsafeConflict = (RWConflict)
-			SHMQueueNext(&sxact->possibleUnsafeConflicts,
-						 &sxact->possibleUnsafeConflicts,
-						 offsetof(RWConflictData, inLink));
 
-		while (possibleUnsafeConflict != NULL && num_written < output_size)
+		hash_seq_init(&seqstat, sxact->possibleUnsafeConflictsTab);
+		while ((possibleUnsafeConflict = hash_seq_search(&seqstat))!=NULL)
 		{
 			output[num_written++] = possibleUnsafeConflict->sxactOut->pid;
-			possibleUnsafeConflict = (RWConflict)
-				SHMQueueNext(&sxact->possibleUnsafeConflicts,
-							 &possibleUnsafeConflict->inLink,
-							 offsetof(RWConflictData, inLink));
 		}
 	}
 
@@ -1796,9 +1851,6 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
 	sxact->SeqNo.lastCommitBeforeSnapshot = PredXact->LastSxactCommitSeqNo;
 	sxact->prepareSeqNo = InvalidSerCommitSeqNo;
 	sxact->commitSeqNo = InvalidSerCommitSeqNo;
-	SHMQueueInit(&(sxact->outConflicts));
-	SHMQueueInit(&(sxact->inConflicts));
-	SHMQueueInit(&(sxact->possibleUnsafeConflicts));
 	sxact->topXid = GetTopTransactionIdIfAny();
 	sxact->finishedBefore = InvalidTransactionId;
 	sxact->xmin = snapshot->xmin;
@@ -3250,9 +3302,9 @@ ReleasePredicateLocks(bool isCommit)
 {
 	bool		needToClear;
 	RWConflict	conflict,
-				nextConflict,
 				possibleUnsafeConflict;
 	SERIALIZABLEXACT *roXact;
+	HASH_SEQ_STATUS seqstat;
 
 	/*
 	 * We can't trust XactReadOnly here, because a transaction which started
@@ -3361,23 +3413,15 @@ ReleasePredicateLocks(bool isCommit)
 		 * make us unsafe. Note that we use 'inLink' for the iteration as
 		 * opposed to 'outLink' for the r/w xacts.
 		 */
-		possibleUnsafeConflict = (RWConflict)
-			SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts,
-						 &MySerializableXact->possibleUnsafeConflicts,
-						 offsetof(RWConflictData, inLink));
-		while (possibleUnsafeConflict)
-		{
-			nextConflict = (RWConflict)
-				SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts,
-							 &possibleUnsafeConflict->inLink,
-							 offsetof(RWConflictData, inLink));
 
+		hash_seq_init(&seqstat, MySerializableXact->possibleUnsafeConflictsTab);
+		while ((possibleUnsafeConflict = hash_seq_search(&seqstat)) != NULL)
+
+		{
 			Assert(!SxactIsReadOnly(possibleUnsafeConflict->sxactOut));
 			Assert(MySerializableXact == possibleUnsafeConflict->sxactIn);
 
-			ReleaseRWConflict(possibleUnsafeConflict);
-
-			possibleUnsafeConflict = nextConflict;
+			ReleaseUnsafeRWConflict(possibleUnsafeConflict);
 		}
 	}
 
@@ -3400,17 +3444,9 @@ ReleasePredicateLocks(bool isCommit)
 	 * back clear them all.  Set SXACT_FLAG_CONFLICT_OUT if any point to
 	 * previously committed transactions.
 	 */
-	conflict = (RWConflict)
-		SHMQueueNext(&MySerializableXact->outConflicts,
-					 &MySerializableXact->outConflicts,
-					 offsetof(RWConflictData, outLink));
-	while (conflict)
+	hash_seq_init(&seqstat, MySerializableXact->outConflictsTab);
+	while((conflict = (RWConflict)hash_seq_search(&seqstat))!=NULL)
 	{
-		nextConflict = (RWConflict)
-			SHMQueueNext(&MySerializableXact->outConflicts,
-						 &conflict->outLink,
-						 offsetof(RWConflictData, outLink));
-
 		if (isCommit
 			&& !SxactIsReadOnly(MySerializableXact)
 			&& SxactIsCommitted(conflict->sxactIn))
@@ -3425,33 +3461,22 @@ ReleasePredicateLocks(bool isCommit)
 			|| SxactIsCommitted(conflict->sxactIn)
 			|| (conflict->sxactIn->SeqNo.lastCommitBeforeSnapshot >= PredXact->LastSxactCommitSeqNo))
 			ReleaseRWConflict(conflict);
-
-		conflict = nextConflict;
 	}
 
 	/*
 	 * Release all inConflicts from committed and read-only transactions. If
 	 * we're rolling back, clear them all.
 	 */
-	conflict = (RWConflict)
-		SHMQueueNext(&MySerializableXact->inConflicts,
-					 &MySerializableXact->inConflicts,
-					 offsetof(RWConflictData, inLink));
-	while (conflict)
+	hash_seq_init(&seqstat, MySerializableXact->inConflictsTab);
+	while((conflict = (RWConflict)hash_seq_search(&seqstat))!=NULL)
 	{
-		nextConflict = (RWConflict)
-			SHMQueueNext(&MySerializableXact->inConflicts,
-						 &conflict->inLink,
-						 offsetof(RWConflictData, inLink));
-
 		if (!isCommit
 			|| SxactIsCommitted(conflict->sxactOut)
 			|| SxactIsReadOnly(conflict->sxactOut))
 			ReleaseRWConflict(conflict);
-
-		conflict = nextConflict;
 	}
 
+
 	if (!topLevelIsDeclaredReadOnly)
 	{
 		/*
@@ -3460,17 +3485,10 @@ ReleasePredicateLocks(bool isCommit)
 		 * conflict out. If any are waiting DEFERRABLE transactions, wake them
 		 * up if they are known safe or known unsafe.
 		 */
-		possibleUnsafeConflict = (RWConflict)
-			SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts,
-						 &MySerializableXact->possibleUnsafeConflicts,
-						 offsetof(RWConflictData, outLink));
-		while (possibleUnsafeConflict)
+		
+		hash_seq_init(&seqstat, MySerializableXact->possibleUnsafeConflictsTab);
+		while ((possibleUnsafeConflict = hash_seq_search(&seqstat))!=NULL)
 		{
-			nextConflict = (RWConflict)
-				SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts,
-							 &possibleUnsafeConflict->outLink,
-							 offsetof(RWConflictData, outLink));
-
 			roXact = possibleUnsafeConflict->sxactIn;
 			Assert(MySerializableXact == possibleUnsafeConflict->sxactOut);
 			Assert(SxactIsReadOnly(roXact));
@@ -3490,14 +3508,14 @@ ReleasePredicateLocks(bool isCommit)
 			}
 			else
 			{
-				ReleaseRWConflict(possibleUnsafeConflict);
+				ReleaseUnsafeRWConflict(possibleUnsafeConflict);
 
 				/*
 				 * If we were the last possible conflict, flag it safe. The
 				 * transaction can now safely release its predicate locks (but
 				 * that transaction's backend has to do that itself).
 				 */
-				if (SHMQueueEmpty(&roXact->possibleUnsafeConflicts))
+				if (hash_table_empty(roXact->possibleUnsafeConflictsTab))
 					roXact->flags |= SXACT_FLAG_RO_SAFE;
 			}
 
@@ -3509,7 +3527,6 @@ ReleasePredicateLocks(bool isCommit)
 				(SxactIsROUnsafe(roXact) || SxactIsROSafe(roXact)))
 				ProcSendSignal(roXact->pid);
 
-			possibleUnsafeConflict = nextConflict;
 		}
 	}
 
@@ -3726,8 +3743,8 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
 {
 	PREDICATELOCK *predlock;
 	SERIALIZABLEXIDTAG sxidtag;
-	RWConflict	conflict,
-				nextConflict;
+	RWConflict	conflict;
+	HASH_SEQ_STATUS seqstat;
 
 	Assert(sxact != NULL);
 	Assert(SxactIsRolledBack(sxact) || SxactIsCommitted(sxact));
@@ -3823,41 +3840,25 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
 	sxidtag.xid = sxact->topXid;
 	LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
 
-	/* Release all outConflicts (unless 'partial' is true) */
+	/* Release all outConflictsTab (unless 'partial' is true) */
 	if (!partial)
 	{
-		conflict = (RWConflict)
-			SHMQueueNext(&sxact->outConflicts,
-						 &sxact->outConflicts,
-						 offsetof(RWConflictData, outLink));
-		while (conflict)
+		hash_seq_init(&seqstat, sxact->outConflictsTab);
+		while((conflict = (RWConflict)hash_seq_search(&seqstat))!=NULL)
 		{
-			nextConflict = (RWConflict)
-				SHMQueueNext(&sxact->outConflicts,
-							 &conflict->outLink,
-							 offsetof(RWConflictData, outLink));
 			if (summarize)
 				conflict->sxactIn->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
 			ReleaseRWConflict(conflict);
-			conflict = nextConflict;
 		}
 	}
 
-	/* Release all inConflicts. */
-	conflict = (RWConflict)
-		SHMQueueNext(&sxact->inConflicts,
-					 &sxact->inConflicts,
-					 offsetof(RWConflictData, inLink));
-	while (conflict)
+	/* Release all inConflictsTab. */
+	hash_seq_init(&seqstat, sxact->inConflictsTab);
+	while((conflict = (RWConflict)hash_seq_search(&seqstat))!=NULL)
 	{
-		nextConflict = (RWConflict)
-			SHMQueueNext(&sxact->inConflicts,
-						 &conflict->inLink,
-						 offsetof(RWConflictData, inLink));
 		if (summarize)
 			conflict->sxactOut->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
 		ReleaseRWConflict(conflict);
-		conflict = nextConflict;
 	}
 
 	/* Finally, get rid of the xid and the record of the transaction itself. */
@@ -4035,7 +4036,8 @@ CheckForSerializableConflictOut(bool visible, Relation relation,
 					  errhint("The transaction might succeed if retried.")));
 
 			if (SxactHasSummaryConflictIn(MySerializableXact)
-				|| !SHMQueueEmpty(&MySerializableXact->inConflicts))
+			//	|| !SHMQueueEmpty(&MySerializableXact->inConflicts))
+				|| !hash_table_empty(MySerializableXact->inConflictsTab))
 				ereport(ERROR,
 						(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
 						 errmsg("could not serialize access due to read/write dependencies among transactions"),
@@ -4520,6 +4522,7 @@ OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
 {
 	bool		failure;
 	RWConflict	conflict;
+	HASH_SEQ_STATUS seqstat;
 
 	Assert(LWLockHeldByMe(SerializableXactHashLock));
 
@@ -4567,29 +4570,24 @@ OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
 			conflict = NULL;
 		}
 		else
-			conflict = (RWConflict)
-				SHMQueueNext(&writer->outConflicts,
-							 &writer->outConflicts,
-							 offsetof(RWConflictData, outLink));
-		while (conflict)
 		{
-			SERIALIZABLEXACT *t2 = conflict->sxactIn;
-
-			if (SxactIsPrepared(t2)
-				&& (!SxactIsCommitted(reader)
-					|| t2->prepareSeqNo <= reader->commitSeqNo)
-				&& (!SxactIsCommitted(writer)
-					|| t2->prepareSeqNo <= writer->commitSeqNo)
-				&& (!SxactIsReadOnly(reader)
-			  || t2->prepareSeqNo <= reader->SeqNo.lastCommitBeforeSnapshot))
+			hash_seq_init(&seqstat, writer->outConflictsTab);
+			while ((conflict = hash_seq_search(&seqstat))!=NULL)
 			{
-				failure = true;
-				break;
+				SERIALIZABLEXACT *t2 = conflict->sxactIn;
+
+				if (SxactIsPrepared(t2)
+					&& (!SxactIsCommitted(reader)
+						|| t2->prepareSeqNo <= reader->commitSeqNo)
+					&& (!SxactIsCommitted(writer)
+						|| t2->prepareSeqNo <= writer->commitSeqNo)
+					&& (!SxactIsReadOnly(reader)
+				  || t2->prepareSeqNo <= reader->SeqNo.lastCommitBeforeSnapshot))
+				{
+					failure = true;
+					break;
+				}
 			}
-			conflict = (RWConflict)
-				SHMQueueNext(&writer->outConflicts,
-							 &conflict->outLink,
-							 offsetof(RWConflictData, outLink));
 		}
 	}
 
@@ -4614,27 +4612,22 @@ OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
 			conflict = NULL;
 		}
 		else
-			conflict = (RWConflict)
-				SHMQueueNext(&reader->inConflicts,
-							 &reader->inConflicts,
-							 offsetof(RWConflictData, inLink));
-		while (conflict)
 		{
-			SERIALIZABLEXACT *t0 = conflict->sxactOut;
-
-			if (!SxactIsDoomed(t0)
-				&& (!SxactIsCommitted(t0)
-					|| t0->commitSeqNo >= writer->prepareSeqNo)
-				&& (!SxactIsReadOnly(t0)
-			  || t0->SeqNo.lastCommitBeforeSnapshot >= writer->prepareSeqNo))
+			hash_seq_init(&seqstat, reader->inConflictsTab);
+			while((conflict = (RWConflict)hash_seq_search(&seqstat))!=NULL)
 			{
-				failure = true;
-				break;
+				SERIALIZABLEXACT *t0 = conflict->sxactOut;
+
+				if (!SxactIsDoomed(t0)
+					&& (!SxactIsCommitted(t0)
+						|| t0->commitSeqNo >= writer->prepareSeqNo)
+					&& (!SxactIsReadOnly(t0)
+				  || t0->SeqNo.lastCommitBeforeSnapshot >= writer->prepareSeqNo))
+				{
+					failure = true;
+					break;
+				}
 			}
-			conflict = (RWConflict)
-				SHMQueueNext(&reader->inConflicts,
-							 &conflict->inLink,
-							 offsetof(RWConflictData, inLink));
 		}
 	}
 
@@ -4693,6 +4686,7 @@ void
 PreCommit_CheckForSerializationFailure(void)
 {
 	RWConflict	nearConflict;
+	HASH_SEQ_STATUS nearseqstat;
 
 	if (MySerializableXact == InvalidSerializableXact)
 		return;
@@ -4712,22 +4706,18 @@ PreCommit_CheckForSerializationFailure(void)
 				 errhint("The transaction might succeed if retried.")));
 	}
 
-	nearConflict = (RWConflict)
-		SHMQueueNext(&MySerializableXact->inConflicts,
-					 &MySerializableXact->inConflicts,
-					 offsetof(RWConflictData, inLink));
-	while (nearConflict)
+	hash_seq_init(&nearseqstat, MySerializableXact->inConflictsTab);
+	while((nearConflict = hash_seq_search(&nearseqstat))!=NULL)
 	{
 		if (!SxactIsCommitted(nearConflict->sxactOut)
 			&& !SxactIsDoomed(nearConflict->sxactOut))
 		{
-			RWConflict	farConflict;
+			RWConflict farConflict;
+			HASH_SEQ_STATUS farseqstat;
+			
+			hash_seq_init(&farseqstat, nearConflict->sxactOut->inConflictsTab);
 
-			farConflict = (RWConflict)
-				SHMQueueNext(&nearConflict->sxactOut->inConflicts,
-							 &nearConflict->sxactOut->inConflicts,
-							 offsetof(RWConflictData, inLink));
-			while (farConflict)
+			while((farConflict = hash_seq_search(&farseqstat))!=NULL)
 			{
 				if (farConflict->sxactOut == MySerializableXact
 					|| (!SxactIsCommitted(farConflict->sxactOut)
@@ -4752,17 +4742,8 @@ PreCommit_CheckForSerializationFailure(void)
 					nearConflict->sxactOut->flags |= SXACT_FLAG_DOOMED;
 					break;
 				}
-				farConflict = (RWConflict)
-					SHMQueueNext(&nearConflict->sxactOut->inConflicts,
-								 &farConflict->inLink,
-								 offsetof(RWConflictData, inLink));
 			}
 		}
-
-		nearConflict = (RWConflict)
-			SHMQueueNext(&MySerializableXact->inConflicts,
-						 &nearConflict->inLink,
-						 offsetof(RWConflictData, inLink));
 	}
 
 	MySerializableXact->prepareSeqNo = ++(PredXact->LastSxactCommitSeqNo);
@@ -4948,7 +4929,6 @@ predicatelock_twophase_recover(TransactionId xid, uint16 info,
 		 * recovered xact started are still active, except possibly other
 		 * prepared xacts and we don't care whether those are RO_SAFE or not.
 		 */
-		SHMQueueInit(&(sxact->possibleUnsafeConflicts));
 
 		SHMQueueInit(&(sxact->predicateLocks));
 		SHMQueueElemInit(&(sxact->finishedLink));
@@ -4969,8 +4949,6 @@ predicatelock_twophase_recover(TransactionId xid, uint16 info,
 		 * we'll conservatively assume that it had both a conflict in and a
 		 * conflict out, and represent that with the summary conflict flags.
 		 */
-		SHMQueueInit(&(sxact->outConflicts));
-		SHMQueueInit(&(sxact->inConflicts));
 		sxact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
 		sxact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
 
diff --git a/src/include/storage/predicate_internals.h b/src/include/storage/predicate_internals.h
index 3cb0ab9..948827d 100644
--- a/src/include/storage/predicate_internals.h
+++ b/src/include/storage/predicate_internals.h
@@ -38,6 +38,12 @@ typedef uint64 SerCommitSeqNo;
 #define FirstNormalSerCommitSeqNo	((SerCommitSeqNo) 2)
 
 /*
+ * Hash table size in SERIALIZABLEXACT 
+ */
+#define SERIALIZABLEXACT_CONFLICT_HASHTAB_SIZE 200
+
+
+/*
  * The SERIALIZABLEXACT struct contains information needed for each
  * serializable database transaction to support SSI techniques.
  *
@@ -83,10 +89,10 @@ typedef struct SERIALIZABLEXACT
 		SerCommitSeqNo lastCommitBeforeSnapshot;		/* when not committed or
 														 * no conflict out */
 	}			SeqNo;
-	SHM_QUEUE	outConflicts;	/* list of write transactions whose data we
-								 * couldn't read. */
-	SHM_QUEUE	inConflicts;	/* list of read transactions which couldn't
-								 * see our write. */
+
+	HTAB*		outConflictsTab;	/* Table of write transactions whose data we couldn't read  */
+	HTAB*		inConflictsTab;		/* Table of read transactions which couldn't see our write. */
+
 	SHM_QUEUE	predicateLocks; /* list of associated PREDICATELOCK objects */
 	SHM_QUEUE	finishedLink;	/* list link in
 								 * FinishedSerializableTransactions */
@@ -95,7 +101,7 @@ typedef struct SERIALIZABLEXACT
 	 * for r/o transactions: list of concurrent r/w transactions that we could
 	 * potentially have conflicts with, and vice versa for r/w transactions
 	 */
-	SHM_QUEUE	possibleUnsafeConflicts;
+	HTAB*		possibleUnsafeConflictsTab;
 
 	TransactionId topXid;		/* top level xid for the transaction, if one
 								 * exists; else invalid */
@@ -205,18 +211,6 @@ typedef struct RWConflictData *RWConflict;
 #define RWConflictDataSize \
 		((Size)MAXALIGN(sizeof(RWConflictData)))
 
-typedef struct RWConflictPoolHeaderData
-{
-	SHM_QUEUE	availableList;
-	RWConflict	element;
-}	RWConflictPoolHeaderData;
-
-typedef struct RWConflictPoolHeaderData *RWConflictPoolHeader;
-
-#define RWConflictPoolHeaderDataSize \
-		((Size)MAXALIGN(sizeof(RWConflictPoolHeaderData)))
-
-
 /*
  * The SERIALIZABLEXIDTAG struct identifies an xid assigned to a serializable
  * transaction or any of its subtransactions.
PG-after.svgimage/svg+xml; name=PG-after.svgDownload
PG-before.svgimage/svg+xml; name=PG-before.svgDownload
#2Heikki Linnakangas
hlinnaka@iki.fi
In reply to: Mengxing Liu (#1)
Re: [GSOC][weekly report 3] Eliminate O(N^2) scaling from rw-conflict tracking in serializable transactions

On 06/20/2017 06:51 AM, Mengxing Liu wrote:

But in my benchmark, the throughput decrease by 15% after the modification.
Can you help me do a quick review to find if there is anything wrong?
I also attached the flame graph before/after the modification for reference.

Hmm. The hash table ought to speed up the RWConflictExists() function
right? Where in the flame graph is RWConflictExists()? If it only
accounts for a small amount of the overall runtime, even drastic speedup
there won't make much difference to the total.

Here is my questions:
1. Is there any function in HTAB like 锟斤拷clear锟斤拷 that can make itself empty quickly?
In this patch, when releasing a transaction object, I need to scan the hash table and
use 锟斤拷hash_search锟斤拷 to remove entries one by one. It would make releasing operation slower.

Nope, there is no such function, I'm afraid.

In a previous email, I reported that many backends wait for the lock 锟斤拷SerializableFinishedListLock锟斤拷;
If we don't implement functions like 锟斤拷ReleaseOneSerializableXact锟斤拷 quickly, they will be the bottleneck.

Yeah, that's probably what's causing the decrease in throughput you're
seeing.

You might need to also keep the linked lists, and use the hash table to
only look up particular items in the linked list faster.

I was surprised to see that you're creating a lot of smallish hash
tables, three for each PredXact entry. I would've expected all the
PredXact entries to share the same hash tables, i.e. have only three
hash tables in total. That would be more flexible in allocating
resources among entries. (It won't help with the quick-release, though)

2. Is the HTAB thread-safe? I would focus on removing some unnecessary locks if possible.

Nope, you need to hold a lock while searching/manipulating a HTAB hash
table. If the hash table gets big and you start to see lock contention,
you can partition it so that each operation only needs to lock the one
partition covering the search key.

- Heikki

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#3Mengxing Liu
liu-mx15@mails.tsinghua.edu.cn
In reply to: Heikki Linnakangas (#2)
Re: [GSOC][weekly report 3] Eliminate O(N^2) scaling from rw-conflict tracking in serializable transactions

From: "Heikki Linnakangas" <hlinnaka@iki.fi>

Hmm. The hash table ought to speed up the RWConflictExists() function
right? Where in the flame graph is RWConflictExists()? If it only
accounts for a small amount of the overall runtime, even drastic speedup
there won't make much difference to the total.

Yes. It is much smaller than we have expected. I did a small test for HTAB and linked list:
when the set size is smaller than 10, linked list is as quick as HTAB. Here is the result.
(x microseconds running 10 million searching)

setSize: 5, LIST: 423569, HTAB: 523681
setSize: 10, LIST: 540579, HTAB: 430111
setSize: 15, LIST: 752879, HTAB: 429519
setSize: 20, LIST: 940792, HTAB: 431388
setSize: 25, LIST: 1163760, HTAB: 446043
setSize: 30, LIST: 1352990, HTAB: 429057
setSize: 35, LIST: 1524097, HTAB: 431547
setSize: 40, LIST: 1714976, HTAB: 429023

As we can see, the hash table can only benefit in a very extreme situation.
However, even if there are 100 concurrent connections, the average length of conflict
list is about 10. linked list is not the bottleneck.

Nope, there is no such function, I'm afraid.

Oh, that's really a bad news.

In a previous email, I reported that many backends wait for the lock “SerializableFinishedListLock”;
If we don't implement functions like “ReleaseOneSerializableXact” quickly, they will be the bottleneck.

Yeah, that's probably what's causing the decrease in throughput you're
seeing.

An new evidence: I use "SELECT wait_event_type, wait_event FROM pg_stat_activity;" and sum by event_type to analyze the wait event.

The result of original version:
SerializableXactHashLock 27
predicate_lock_manager 512
WALWriteLock 3
SerializableFinishedListLock 402

The result of new version:
SerializableXactHashLock 38
predicate_lock_manager 473
WALWriteLock 3
SerializableFinishedListLock 1068

Obviously, more backends are blocked by SerializableFinishedListLock.

You might need to also keep the linked lists, and use the hash table to
only look up particular items in the linked list faster.

I was surprised to see that you're creating a lot of smallish hash
tables, three for each PredXact entry. I would've expected all the
PredXact entries to share the same hash tables, i.e. have only three
hash tables in total. That would be more flexible in allocating
resources among entries. (It won't help with the quick-release, though)

Yes, it would looks more elegant and require less memory resources.
( because hash table objects also consume memory )
But just for performance, it would be less efficient than my patch.
Because it has to maintain linked lists, besides hash tables. In other words,
it does more works than my patch.

Another point is that removing linked list may have more opportunities to reduce
lock contentions. Otherwise, we need a global lock to protect the linked list.

--
Sincerely

Mengxing Liu

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers