[GSOC][weekly report 3] Eliminate O(N^2) scaling from rw-conflict tracking in serializable transactions
Hi, all. In the last week, I fixed some bugs and replace the list of “PossibleUnsafeConflicts” with hash table.
It passed the existing 178 tests. The patch is attached.
But in my benchmark, the throughput decrease by 15% after the modification.
Can you help me do a quick review to find if there is anything wrong?
I also attached the flame graph before/after the modification for reference.
Here is my questions:
1. Is there any function in HTAB like “clear” that can make itself empty quickly?
In this patch, when releasing a transaction object, I need to scan the hash table and
use “hash_search” to remove entries one by one. It would make releasing operation slower.
In a previous email, I reported that many backends wait for the lock “SerializableFinishedListLock”;
If we don't implement functions like “ReleaseOneSerializableXact” quickly, they will be the bottleneck.
2. Is the HTAB thread-safe? I would focus on removing some unnecessary locks if possible.
My plan for the next:
I will add a TPC-B benchmark to represent classic OLTP benchmark.
--
Sincerely
Mengxing Liu
Attachments:
patchapplication/octet-stream; name=patchDownload
diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c
index 27c4af9..09c757f 100644
--- a/src/backend/storage/lmgr/predicate.c
+++ b/src/backend/storage/lmgr/predicate.c
@@ -374,12 +374,6 @@ int max_predicate_locks_per_page; /* set by guc.c */
static PredXactList PredXact;
/*
- * This provides a pool of RWConflict data elements to use in conflict lists
- * between transactions.
- */
-static RWConflictPoolHeader RWConflictPool;
-
-/*
* The predicate locking hash tables are in shared memory.
* Each backend keeps pointers to them.
*/
@@ -471,6 +465,15 @@ static void OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *read
/*------------------------------------------------------------------------*/
/*
+ * Test whether a hash table is empty.
+ * I didn't find any function in dynamic hash supports the requirement.
+ */
+#define hash_table_empty(hashp) \
+ (hash_get_num_entries(hashp) == 0)
+
+/*------------------------------------------------------------------------*/
+
+/*
* Does this relation participate in predicate locking? Temporary and system
* relations are exempt, as are materialized views.
*/
@@ -573,6 +576,14 @@ CreatePredXact(void)
SHMQueueDelete(&ptle->link);
SHMQueueInsertBefore(&PredXact->activeList, &ptle->link);
+
+ /*
+ * NOTE: We don't need to clean the HTAB, because all of its elements
+ * has been released before.
+ */
+ Assert(hash_table_empty(ptle->inConflictsTab));
+ Assert(hash_table_empty(ptle->outConflictsTab));
+
return &ptle->sxact;
}
@@ -635,60 +646,55 @@ NextPredXact(SERIALIZABLEXACT *sxact)
static bool
RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer)
{
- RWConflict conflict;
+ bool found;
Assert(reader != writer);
/* Check the ends of the purported conflict first. */
if (SxactIsDoomed(reader)
- || SxactIsDoomed(writer)
- || SHMQueueEmpty(&reader->outConflicts)
- || SHMQueueEmpty(&writer->inConflicts))
+ || SxactIsDoomed(writer))
return false;
- /* A conflict is possible; walk the list to find out. */
- conflict = (RWConflict)
- SHMQueueNext(&reader->outConflicts,
- &reader->outConflicts,
- offsetof(RWConflictData, outLink));
- while (conflict)
- {
- if (conflict->sxactIn == writer)
- return true;
- conflict = (RWConflict)
- SHMQueueNext(&reader->outConflicts,
- &conflict->outLink,
- offsetof(RWConflictData, outLink));
- }
+ hash_search(reader->outConflictsTab,
+ &writer,
+ HASH_FIND,
+ &found);
- /* No conflict found. */
- return false;
+ return found;
}
static void
SetRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer)
{
RWConflict conflict;
+ bool found;
Assert(reader != writer);
Assert(!RWConflictExists(reader, writer));
- conflict = (RWConflict)
- SHMQueueNext(&RWConflictPool->availableList,
- &RWConflictPool->availableList,
- offsetof(RWConflictData, outLink));
- if (!conflict)
- ereport(ERROR,
+ conflict = (RWConflict)hash_search(reader->outConflictsTab,
+ &writer,
+ HASH_ENTER_NULL,
+ &found);
+ if (!conflict){
+ ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
- errmsg("not enough elements in RWConflictPool to record a read/write conflict"),
- errhint("You might need to run fewer transactions at a time or increase max_connections.")));
-
- SHMQueueDelete(&conflict->outLink);
+ errmsg("insert to outRWconflicts hash table failed")));
+ }
+ conflict->sxactOut = reader;
+ conflict->sxactIn = writer;
+ conflict = (RWConflict)hash_search(writer->inConflictsTab,
+ &reader,
+ HASH_ENTER_NULL,
+ &found);
+ if (!conflict){
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("insert to inRWconflicts hash table failed")));
+ }
conflict->sxactOut = reader;
conflict->sxactIn = writer;
- SHMQueueInsertBefore(&reader->outConflicts, &conflict->outLink);
- SHMQueueInsertBefore(&writer->inConflicts, &conflict->inLink);
}
static void
@@ -696,44 +702,85 @@ SetPossibleUnsafeConflict(SERIALIZABLEXACT *roXact,
SERIALIZABLEXACT *activeXact)
{
RWConflict conflict;
+ bool found;
Assert(roXact != activeXact);
Assert(SxactIsReadOnly(roXact));
Assert(!SxactIsReadOnly(activeXact));
- conflict = (RWConflict)
- SHMQueueNext(&RWConflictPool->availableList,
- &RWConflictPool->availableList,
- offsetof(RWConflictData, outLink));
- if (!conflict)
- ereport(ERROR,
+ conflict = (RWConflict)hash_search(roXact->possibleUnsafeConflictsTab,
+ &activeXact,
+ HASH_ENTER_NULL,
+ &found);
+ if (!found || conflict == NULL)
+ {
+ ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
- errmsg("not enough elements in RWConflictPool to record a potential read/write conflict"),
- errhint("You might need to run fewer transactions at a time or increase max_connections.")));
-
- SHMQueueDelete(&conflict->outLink);
+ errmsg("insert to possibleUnsafeConflicts hash table failed")));
+ }
+ conflict->sxactOut = activeXact;
+ conflict->sxactIn = roXact;
+ conflict = (RWConflict)hash_search(activeXact->possibleUnsafeConflictsTab,
+ &roXact,
+ HASH_ENTER_NULL,
+ &found);
+ if (!found || conflict == NULL)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("insert to possibleUnsafeConflicts hash table failed")));
+ }
conflict->sxactOut = activeXact;
conflict->sxactIn = roXact;
- SHMQueueInsertBefore(&activeXact->possibleUnsafeConflicts,
- &conflict->outLink);
- SHMQueueInsertBefore(&roXact->possibleUnsafeConflicts,
- &conflict->inLink);
}
+/*
+ * PossibleUnsafeConflicts are stored in a different hash table. Thus we need a new function
+ * NOTE: the two functions below could be merged.
+ */
+
static void
ReleaseRWConflict(RWConflict conflict)
{
- SHMQueueDelete(&conflict->inLink);
- SHMQueueDelete(&conflict->outLink);
- SHMQueueInsertBefore(&RWConflictPool->availableList, &conflict->outLink);
+ bool found;
+
+ hash_search(conflict->sxactOut->outConflictsTab,
+ &conflict->sxactIn,
+ HASH_REMOVE,
+ &found);
+ Assert(found);
+
+ hash_search(conflict->sxactIn->inConflictsTab,
+ &conflict->sxactOut,
+ HASH_REMOVE,
+ &found);
+ Assert(found);
+}
+
+static void
+ReleaseUnsafeRWConflict(RWConflict conflict)
+{
+ bool found;
+
+ hash_search(conflict->sxactOut->possibleUnsafeConflictsTab,
+ &conflict->sxactIn,
+ HASH_REMOVE,
+ &found);
+ Assert(found);
+
+ hash_search(conflict->sxactIn->possibleUnsafeConflictsTab,
+ &conflict->sxactOut,
+ HASH_REMOVE,
+ &found);
+ Assert(found);
}
static void
FlagSxactUnsafe(SERIALIZABLEXACT *sxact)
{
- RWConflict conflict,
- nextConflict;
+ RWConflict conflict;
+ HASH_SEQ_STATUS seqstat;
Assert(SxactIsReadOnly(sxact));
Assert(!SxactIsROSafe(sxact));
@@ -744,23 +791,13 @@ FlagSxactUnsafe(SERIALIZABLEXACT *sxact)
* We know this isn't a safe snapshot, so we can stop looking for other
* potential conflicts.
*/
- conflict = (RWConflict)
- SHMQueueNext(&sxact->possibleUnsafeConflicts,
- &sxact->possibleUnsafeConflicts,
- offsetof(RWConflictData, inLink));
- while (conflict)
+ hash_seq_init(&seqstat, sxact->possibleUnsafeConflictsTab);
+ while ((conflict = (RWConflict)hash_seq_search(&seqstat))!=NULL)
{
- nextConflict = (RWConflict)
- SHMQueueNext(&sxact->possibleUnsafeConflicts,
- &conflict->inLink,
- offsetof(RWConflictData, inLink));
-
Assert(!SxactIsReadOnly(conflict->sxactOut));
Assert(sxact == conflict->sxactIn);
- ReleaseRWConflict(conflict);
-
- conflict = nextConflict;
+ ReleaseUnsafeRWConflict(conflict);
}
}
@@ -1181,6 +1218,9 @@ InitPredicateLocks(void)
{
int i;
+ /* 50 is enough for a hash table name */
+ const char name[50];
+
SHMQueueInit(&PredXact->availableList);
SHMQueueInit(&PredXact->activeList);
PredXact->SxactGlobalXmin = InvalidTransactionId;
@@ -1194,21 +1234,52 @@ InitPredicateLocks(void)
PredXact->element = ShmemAlloc(requestSize);
/* Add all elements to available list, clean. */
memset(PredXact->element, 0, requestSize);
+
+ MemSet(&info, 0, sizeof(info));
+ info.keysize = sizeof(SERIALIZABLEXACT*);
+ info.entrysize = sizeof(RWConflictData);
+ info.num_partitions = NUM_PREDICATELOCK_PARTITIONS;
+
for (i = 0; i < max_table_size; i++)
{
SHMQueueInsertBefore(&(PredXact->availableList),
&(PredXact->element[i].link));
+
+ snprintf((char*)name, 50, "PredXact inConflictsTab %d", i);
+ PredXact->element[i].sxact.inConflictsTab = ShmemInitHash(name,
+ SERIALIZABLEXACT_CONFLICT_HASHTAB_SIZE,
+ SERIALIZABLEXACT_CONFLICT_HASHTAB_SIZE,
+ &info,
+ HASH_ELEM | HASH_BLOBS |
+ HASH_PARTITION | HASH_FIXED_SIZE);
+
+ snprintf((char*)name, 50, "PredXact outConflictsTab %d", i);
+ PredXact->element[i].sxact.outConflictsTab = ShmemInitHash(name,
+ SERIALIZABLEXACT_CONFLICT_HASHTAB_SIZE,
+ SERIALIZABLEXACT_CONFLICT_HASHTAB_SIZE,
+ &info,
+ HASH_ELEM | HASH_BLOBS |
+ HASH_PARTITION | HASH_FIXED_SIZE);
+
+ snprintf((char*)name, 50, "PredXact possibleUnsafeConflictsTab %d", i);
+ PredXact->element[i].sxact.possibleUnsafeConflictsTab = ShmemInitHash(name,
+ SERIALIZABLEXACT_CONFLICT_HASHTAB_SIZE,
+ SERIALIZABLEXACT_CONFLICT_HASHTAB_SIZE,
+ &info,
+ HASH_ELEM | HASH_BLOBS |
+ HASH_PARTITION | HASH_FIXED_SIZE);
}
+
PredXact->OldCommittedSxact = CreatePredXact();
SetInvalidVirtualTransactionId(PredXact->OldCommittedSxact->vxid);
PredXact->OldCommittedSxact->prepareSeqNo = 0;
PredXact->OldCommittedSxact->commitSeqNo = 0;
PredXact->OldCommittedSxact->SeqNo.lastCommitBeforeSnapshot = 0;
- SHMQueueInit(&PredXact->OldCommittedSxact->outConflicts);
- SHMQueueInit(&PredXact->OldCommittedSxact->inConflicts);
+ //SHMQueueInit(&PredXact->OldCommittedSxact->outConflicts);
+ //SHMQueueInit(&PredXact->OldCommittedSxact->inConflicts);
SHMQueueInit(&PredXact->OldCommittedSxact->predicateLocks);
SHMQueueInit(&PredXact->OldCommittedSxact->finishedLink);
- SHMQueueInit(&PredXact->OldCommittedSxact->possibleUnsafeConflicts);
+ //SHMQueueInit(&PredXact->OldCommittedSxact->possibleUnsafeConflicts);
PredXact->OldCommittedSxact->topXid = InvalidTransactionId;
PredXact->OldCommittedSxact->finishedBefore = InvalidTransactionId;
PredXact->OldCommittedSxact->xmin = InvalidTransactionId;
@@ -1246,26 +1317,6 @@ InitPredicateLocks(void)
*/
max_table_size *= 5;
- RWConflictPool = ShmemInitStruct("RWConflictPool",
- RWConflictPoolHeaderDataSize,
- &found);
- if (!found)
- {
- int i;
-
- SHMQueueInit(&RWConflictPool->availableList);
- requestSize = mul_size((Size) max_table_size,
- RWConflictDataSize);
- RWConflictPool->element = ShmemAlloc(requestSize);
- /* Add all elements to available list, clean. */
- memset(RWConflictPool->element, 0, requestSize);
- for (i = 0; i < max_table_size; i++)
- {
- SHMQueueInsertBefore(&(RWConflictPool->availableList),
- &(RWConflictPool->element[i].outLink));
- }
- }
-
/*
* Create or attach to the header for the list of finished serializable
* transactions.
@@ -1320,13 +1371,23 @@ PredicateLockShmemSize(void)
size = add_size(size, mul_size((Size) max_table_size,
PredXactListElementDataSize));
+ /* Hash table in Sxact
+ *
+ * NOTE: It should be 3*max_table_size*hash_table_size. Because for each Sxact, there are
+ 3 HTAB: inConflictsTab, outConflictsTab, possibleUnsafeConflictsTab.
+ * But I don't know why this setting will cause "out of memory". So I set it as 6.
+ */
+ size = add_size(size, mul_size((Size) 6*max_table_size,
+ hash_estimate_size(SERIALIZABLEXACT_CONFLICT_HASHTAB_SIZE,
+ sizeof(RWConflict))));
+
/* transaction xid table */
size = add_size(size, hash_estimate_size(max_table_size,
sizeof(SERIALIZABLEXID)));
/* rw-conflict pool */
max_table_size *= 5;
- size = add_size(size, RWConflictPoolHeaderDataSize);
+
size = add_size(size, mul_size((Size) max_table_size,
RWConflictDataSize));
@@ -1522,7 +1583,7 @@ GetSafeSnapshot(Snapshot origSnapshot)
* them marked us as conflicted.
*/
MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING;
- while (!(SHMQueueEmpty(&MySerializableXact->possibleUnsafeConflicts) ||
+ while (!(hash_table_empty(MySerializableXact->possibleUnsafeConflictsTab) ||
SxactIsROUnsafe(MySerializableXact)))
{
LWLockRelease(SerializableXactHashLock);
@@ -1569,6 +1630,7 @@ GetSafeSnapshotBlockingPids(int blocked_pid, int *output, int output_size)
{
int num_written = 0;
SERIALIZABLEXACT *sxact;
+ HASH_SEQ_STATUS seqstat;
LWLockAcquire(SerializableXactHashLock, LW_SHARED);
@@ -1585,18 +1647,11 @@ GetSafeSnapshotBlockingPids(int blocked_pid, int *output, int output_size)
RWConflict possibleUnsafeConflict;
/* Traverse the list of possible unsafe conflicts collecting PIDs. */
- possibleUnsafeConflict = (RWConflict)
- SHMQueueNext(&sxact->possibleUnsafeConflicts,
- &sxact->possibleUnsafeConflicts,
- offsetof(RWConflictData, inLink));
- while (possibleUnsafeConflict != NULL && num_written < output_size)
+ hash_seq_init(&seqstat, sxact->possibleUnsafeConflictsTab);
+ while ((possibleUnsafeConflict = hash_seq_search(&seqstat))!=NULL)
{
output[num_written++] = possibleUnsafeConflict->sxactOut->pid;
- possibleUnsafeConflict = (RWConflict)
- SHMQueueNext(&sxact->possibleUnsafeConflicts,
- &possibleUnsafeConflict->inLink,
- offsetof(RWConflictData, inLink));
}
}
@@ -1796,9 +1851,6 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
sxact->SeqNo.lastCommitBeforeSnapshot = PredXact->LastSxactCommitSeqNo;
sxact->prepareSeqNo = InvalidSerCommitSeqNo;
sxact->commitSeqNo = InvalidSerCommitSeqNo;
- SHMQueueInit(&(sxact->outConflicts));
- SHMQueueInit(&(sxact->inConflicts));
- SHMQueueInit(&(sxact->possibleUnsafeConflicts));
sxact->topXid = GetTopTransactionIdIfAny();
sxact->finishedBefore = InvalidTransactionId;
sxact->xmin = snapshot->xmin;
@@ -3250,9 +3302,9 @@ ReleasePredicateLocks(bool isCommit)
{
bool needToClear;
RWConflict conflict,
- nextConflict,
possibleUnsafeConflict;
SERIALIZABLEXACT *roXact;
+ HASH_SEQ_STATUS seqstat;
/*
* We can't trust XactReadOnly here, because a transaction which started
@@ -3361,23 +3413,15 @@ ReleasePredicateLocks(bool isCommit)
* make us unsafe. Note that we use 'inLink' for the iteration as
* opposed to 'outLink' for the r/w xacts.
*/
- possibleUnsafeConflict = (RWConflict)
- SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts,
- &MySerializableXact->possibleUnsafeConflicts,
- offsetof(RWConflictData, inLink));
- while (possibleUnsafeConflict)
- {
- nextConflict = (RWConflict)
- SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts,
- &possibleUnsafeConflict->inLink,
- offsetof(RWConflictData, inLink));
+ hash_seq_init(&seqstat, MySerializableXact->possibleUnsafeConflictsTab);
+ while ((possibleUnsafeConflict = hash_seq_search(&seqstat)) != NULL)
+
+ {
Assert(!SxactIsReadOnly(possibleUnsafeConflict->sxactOut));
Assert(MySerializableXact == possibleUnsafeConflict->sxactIn);
- ReleaseRWConflict(possibleUnsafeConflict);
-
- possibleUnsafeConflict = nextConflict;
+ ReleaseUnsafeRWConflict(possibleUnsafeConflict);
}
}
@@ -3400,17 +3444,9 @@ ReleasePredicateLocks(bool isCommit)
* back clear them all. Set SXACT_FLAG_CONFLICT_OUT if any point to
* previously committed transactions.
*/
- conflict = (RWConflict)
- SHMQueueNext(&MySerializableXact->outConflicts,
- &MySerializableXact->outConflicts,
- offsetof(RWConflictData, outLink));
- while (conflict)
+ hash_seq_init(&seqstat, MySerializableXact->outConflictsTab);
+ while((conflict = (RWConflict)hash_seq_search(&seqstat))!=NULL)
{
- nextConflict = (RWConflict)
- SHMQueueNext(&MySerializableXact->outConflicts,
- &conflict->outLink,
- offsetof(RWConflictData, outLink));
-
if (isCommit
&& !SxactIsReadOnly(MySerializableXact)
&& SxactIsCommitted(conflict->sxactIn))
@@ -3425,33 +3461,22 @@ ReleasePredicateLocks(bool isCommit)
|| SxactIsCommitted(conflict->sxactIn)
|| (conflict->sxactIn->SeqNo.lastCommitBeforeSnapshot >= PredXact->LastSxactCommitSeqNo))
ReleaseRWConflict(conflict);
-
- conflict = nextConflict;
}
/*
* Release all inConflicts from committed and read-only transactions. If
* we're rolling back, clear them all.
*/
- conflict = (RWConflict)
- SHMQueueNext(&MySerializableXact->inConflicts,
- &MySerializableXact->inConflicts,
- offsetof(RWConflictData, inLink));
- while (conflict)
+ hash_seq_init(&seqstat, MySerializableXact->inConflictsTab);
+ while((conflict = (RWConflict)hash_seq_search(&seqstat))!=NULL)
{
- nextConflict = (RWConflict)
- SHMQueueNext(&MySerializableXact->inConflicts,
- &conflict->inLink,
- offsetof(RWConflictData, inLink));
-
if (!isCommit
|| SxactIsCommitted(conflict->sxactOut)
|| SxactIsReadOnly(conflict->sxactOut))
ReleaseRWConflict(conflict);
-
- conflict = nextConflict;
}
+
if (!topLevelIsDeclaredReadOnly)
{
/*
@@ -3460,17 +3485,10 @@ ReleasePredicateLocks(bool isCommit)
* conflict out. If any are waiting DEFERRABLE transactions, wake them
* up if they are known safe or known unsafe.
*/
- possibleUnsafeConflict = (RWConflict)
- SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts,
- &MySerializableXact->possibleUnsafeConflicts,
- offsetof(RWConflictData, outLink));
- while (possibleUnsafeConflict)
+
+ hash_seq_init(&seqstat, MySerializableXact->possibleUnsafeConflictsTab);
+ while ((possibleUnsafeConflict = hash_seq_search(&seqstat))!=NULL)
{
- nextConflict = (RWConflict)
- SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts,
- &possibleUnsafeConflict->outLink,
- offsetof(RWConflictData, outLink));
-
roXact = possibleUnsafeConflict->sxactIn;
Assert(MySerializableXact == possibleUnsafeConflict->sxactOut);
Assert(SxactIsReadOnly(roXact));
@@ -3490,14 +3508,14 @@ ReleasePredicateLocks(bool isCommit)
}
else
{
- ReleaseRWConflict(possibleUnsafeConflict);
+ ReleaseUnsafeRWConflict(possibleUnsafeConflict);
/*
* If we were the last possible conflict, flag it safe. The
* transaction can now safely release its predicate locks (but
* that transaction's backend has to do that itself).
*/
- if (SHMQueueEmpty(&roXact->possibleUnsafeConflicts))
+ if (hash_table_empty(roXact->possibleUnsafeConflictsTab))
roXact->flags |= SXACT_FLAG_RO_SAFE;
}
@@ -3509,7 +3527,6 @@ ReleasePredicateLocks(bool isCommit)
(SxactIsROUnsafe(roXact) || SxactIsROSafe(roXact)))
ProcSendSignal(roXact->pid);
- possibleUnsafeConflict = nextConflict;
}
}
@@ -3726,8 +3743,8 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
{
PREDICATELOCK *predlock;
SERIALIZABLEXIDTAG sxidtag;
- RWConflict conflict,
- nextConflict;
+ RWConflict conflict;
+ HASH_SEQ_STATUS seqstat;
Assert(sxact != NULL);
Assert(SxactIsRolledBack(sxact) || SxactIsCommitted(sxact));
@@ -3823,41 +3840,25 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
sxidtag.xid = sxact->topXid;
LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
- /* Release all outConflicts (unless 'partial' is true) */
+ /* Release all outConflictsTab (unless 'partial' is true) */
if (!partial)
{
- conflict = (RWConflict)
- SHMQueueNext(&sxact->outConflicts,
- &sxact->outConflicts,
- offsetof(RWConflictData, outLink));
- while (conflict)
+ hash_seq_init(&seqstat, sxact->outConflictsTab);
+ while((conflict = (RWConflict)hash_seq_search(&seqstat))!=NULL)
{
- nextConflict = (RWConflict)
- SHMQueueNext(&sxact->outConflicts,
- &conflict->outLink,
- offsetof(RWConflictData, outLink));
if (summarize)
conflict->sxactIn->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
ReleaseRWConflict(conflict);
- conflict = nextConflict;
}
}
- /* Release all inConflicts. */
- conflict = (RWConflict)
- SHMQueueNext(&sxact->inConflicts,
- &sxact->inConflicts,
- offsetof(RWConflictData, inLink));
- while (conflict)
+ /* Release all inConflictsTab. */
+ hash_seq_init(&seqstat, sxact->inConflictsTab);
+ while((conflict = (RWConflict)hash_seq_search(&seqstat))!=NULL)
{
- nextConflict = (RWConflict)
- SHMQueueNext(&sxact->inConflicts,
- &conflict->inLink,
- offsetof(RWConflictData, inLink));
if (summarize)
conflict->sxactOut->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
ReleaseRWConflict(conflict);
- conflict = nextConflict;
}
/* Finally, get rid of the xid and the record of the transaction itself. */
@@ -4035,7 +4036,8 @@ CheckForSerializableConflictOut(bool visible, Relation relation,
errhint("The transaction might succeed if retried.")));
if (SxactHasSummaryConflictIn(MySerializableXact)
- || !SHMQueueEmpty(&MySerializableXact->inConflicts))
+ // || !SHMQueueEmpty(&MySerializableXact->inConflicts))
+ || !hash_table_empty(MySerializableXact->inConflictsTab))
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("could not serialize access due to read/write dependencies among transactions"),
@@ -4520,6 +4522,7 @@ OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
{
bool failure;
RWConflict conflict;
+ HASH_SEQ_STATUS seqstat;
Assert(LWLockHeldByMe(SerializableXactHashLock));
@@ -4567,29 +4570,24 @@ OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
conflict = NULL;
}
else
- conflict = (RWConflict)
- SHMQueueNext(&writer->outConflicts,
- &writer->outConflicts,
- offsetof(RWConflictData, outLink));
- while (conflict)
{
- SERIALIZABLEXACT *t2 = conflict->sxactIn;
-
- if (SxactIsPrepared(t2)
- && (!SxactIsCommitted(reader)
- || t2->prepareSeqNo <= reader->commitSeqNo)
- && (!SxactIsCommitted(writer)
- || t2->prepareSeqNo <= writer->commitSeqNo)
- && (!SxactIsReadOnly(reader)
- || t2->prepareSeqNo <= reader->SeqNo.lastCommitBeforeSnapshot))
+ hash_seq_init(&seqstat, writer->outConflictsTab);
+ while ((conflict = hash_seq_search(&seqstat))!=NULL)
{
- failure = true;
- break;
+ SERIALIZABLEXACT *t2 = conflict->sxactIn;
+
+ if (SxactIsPrepared(t2)
+ && (!SxactIsCommitted(reader)
+ || t2->prepareSeqNo <= reader->commitSeqNo)
+ && (!SxactIsCommitted(writer)
+ || t2->prepareSeqNo <= writer->commitSeqNo)
+ && (!SxactIsReadOnly(reader)
+ || t2->prepareSeqNo <= reader->SeqNo.lastCommitBeforeSnapshot))
+ {
+ failure = true;
+ break;
+ }
}
- conflict = (RWConflict)
- SHMQueueNext(&writer->outConflicts,
- &conflict->outLink,
- offsetof(RWConflictData, outLink));
}
}
@@ -4614,27 +4612,22 @@ OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
conflict = NULL;
}
else
- conflict = (RWConflict)
- SHMQueueNext(&reader->inConflicts,
- &reader->inConflicts,
- offsetof(RWConflictData, inLink));
- while (conflict)
{
- SERIALIZABLEXACT *t0 = conflict->sxactOut;
-
- if (!SxactIsDoomed(t0)
- && (!SxactIsCommitted(t0)
- || t0->commitSeqNo >= writer->prepareSeqNo)
- && (!SxactIsReadOnly(t0)
- || t0->SeqNo.lastCommitBeforeSnapshot >= writer->prepareSeqNo))
+ hash_seq_init(&seqstat, reader->inConflictsTab);
+ while((conflict = (RWConflict)hash_seq_search(&seqstat))!=NULL)
{
- failure = true;
- break;
+ SERIALIZABLEXACT *t0 = conflict->sxactOut;
+
+ if (!SxactIsDoomed(t0)
+ && (!SxactIsCommitted(t0)
+ || t0->commitSeqNo >= writer->prepareSeqNo)
+ && (!SxactIsReadOnly(t0)
+ || t0->SeqNo.lastCommitBeforeSnapshot >= writer->prepareSeqNo))
+ {
+ failure = true;
+ break;
+ }
}
- conflict = (RWConflict)
- SHMQueueNext(&reader->inConflicts,
- &conflict->inLink,
- offsetof(RWConflictData, inLink));
}
}
@@ -4693,6 +4686,7 @@ void
PreCommit_CheckForSerializationFailure(void)
{
RWConflict nearConflict;
+ HASH_SEQ_STATUS nearseqstat;
if (MySerializableXact == InvalidSerializableXact)
return;
@@ -4712,22 +4706,18 @@ PreCommit_CheckForSerializationFailure(void)
errhint("The transaction might succeed if retried.")));
}
- nearConflict = (RWConflict)
- SHMQueueNext(&MySerializableXact->inConflicts,
- &MySerializableXact->inConflicts,
- offsetof(RWConflictData, inLink));
- while (nearConflict)
+ hash_seq_init(&nearseqstat, MySerializableXact->inConflictsTab);
+ while((nearConflict = hash_seq_search(&nearseqstat))!=NULL)
{
if (!SxactIsCommitted(nearConflict->sxactOut)
&& !SxactIsDoomed(nearConflict->sxactOut))
{
- RWConflict farConflict;
+ RWConflict farConflict;
+ HASH_SEQ_STATUS farseqstat;
+
+ hash_seq_init(&farseqstat, nearConflict->sxactOut->inConflictsTab);
- farConflict = (RWConflict)
- SHMQueueNext(&nearConflict->sxactOut->inConflicts,
- &nearConflict->sxactOut->inConflicts,
- offsetof(RWConflictData, inLink));
- while (farConflict)
+ while((farConflict = hash_seq_search(&farseqstat))!=NULL)
{
if (farConflict->sxactOut == MySerializableXact
|| (!SxactIsCommitted(farConflict->sxactOut)
@@ -4752,17 +4742,8 @@ PreCommit_CheckForSerializationFailure(void)
nearConflict->sxactOut->flags |= SXACT_FLAG_DOOMED;
break;
}
- farConflict = (RWConflict)
- SHMQueueNext(&nearConflict->sxactOut->inConflicts,
- &farConflict->inLink,
- offsetof(RWConflictData, inLink));
}
}
-
- nearConflict = (RWConflict)
- SHMQueueNext(&MySerializableXact->inConflicts,
- &nearConflict->inLink,
- offsetof(RWConflictData, inLink));
}
MySerializableXact->prepareSeqNo = ++(PredXact->LastSxactCommitSeqNo);
@@ -4948,7 +4929,6 @@ predicatelock_twophase_recover(TransactionId xid, uint16 info,
* recovered xact started are still active, except possibly other
* prepared xacts and we don't care whether those are RO_SAFE or not.
*/
- SHMQueueInit(&(sxact->possibleUnsafeConflicts));
SHMQueueInit(&(sxact->predicateLocks));
SHMQueueElemInit(&(sxact->finishedLink));
@@ -4969,8 +4949,6 @@ predicatelock_twophase_recover(TransactionId xid, uint16 info,
* we'll conservatively assume that it had both a conflict in and a
* conflict out, and represent that with the summary conflict flags.
*/
- SHMQueueInit(&(sxact->outConflicts));
- SHMQueueInit(&(sxact->inConflicts));
sxact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
sxact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
diff --git a/src/include/storage/predicate_internals.h b/src/include/storage/predicate_internals.h
index 3cb0ab9..948827d 100644
--- a/src/include/storage/predicate_internals.h
+++ b/src/include/storage/predicate_internals.h
@@ -38,6 +38,12 @@ typedef uint64 SerCommitSeqNo;
#define FirstNormalSerCommitSeqNo ((SerCommitSeqNo) 2)
/*
+ * Hash table size in SERIALIZABLEXACT
+ */
+#define SERIALIZABLEXACT_CONFLICT_HASHTAB_SIZE 200
+
+
+/*
* The SERIALIZABLEXACT struct contains information needed for each
* serializable database transaction to support SSI techniques.
*
@@ -83,10 +89,10 @@ typedef struct SERIALIZABLEXACT
SerCommitSeqNo lastCommitBeforeSnapshot; /* when not committed or
* no conflict out */
} SeqNo;
- SHM_QUEUE outConflicts; /* list of write transactions whose data we
- * couldn't read. */
- SHM_QUEUE inConflicts; /* list of read transactions which couldn't
- * see our write. */
+
+ HTAB* outConflictsTab; /* Table of write transactions whose data we couldn't read */
+ HTAB* inConflictsTab; /* Table of read transactions which couldn't see our write. */
+
SHM_QUEUE predicateLocks; /* list of associated PREDICATELOCK objects */
SHM_QUEUE finishedLink; /* list link in
* FinishedSerializableTransactions */
@@ -95,7 +101,7 @@ typedef struct SERIALIZABLEXACT
* for r/o transactions: list of concurrent r/w transactions that we could
* potentially have conflicts with, and vice versa for r/w transactions
*/
- SHM_QUEUE possibleUnsafeConflicts;
+ HTAB* possibleUnsafeConflictsTab;
TransactionId topXid; /* top level xid for the transaction, if one
* exists; else invalid */
@@ -205,18 +211,6 @@ typedef struct RWConflictData *RWConflict;
#define RWConflictDataSize \
((Size)MAXALIGN(sizeof(RWConflictData)))
-typedef struct RWConflictPoolHeaderData
-{
- SHM_QUEUE availableList;
- RWConflict element;
-} RWConflictPoolHeaderData;
-
-typedef struct RWConflictPoolHeaderData *RWConflictPoolHeader;
-
-#define RWConflictPoolHeaderDataSize \
- ((Size)MAXALIGN(sizeof(RWConflictPoolHeaderData)))
-
-
/*
* The SERIALIZABLEXIDTAG struct identifies an xid assigned to a serializable
* transaction or any of its subtransactions.
On 06/20/2017 06:51 AM, Mengxing Liu wrote:
But in my benchmark, the throughput decrease by 15% after the modification.
Can you help me do a quick review to find if there is anything wrong?
I also attached the flame graph before/after the modification for reference.
Hmm. The hash table ought to speed up the RWConflictExists() function
right? Where in the flame graph is RWConflictExists()? If it only
accounts for a small amount of the overall runtime, even drastic speedup
there won't make much difference to the total.
Here is my questions:
1. Is there any function in HTAB like 锟斤拷clear锟斤拷 that can make itself empty quickly?
In this patch, when releasing a transaction object, I need to scan the hash table and
use 锟斤拷hash_search锟斤拷 to remove entries one by one. It would make releasing operation slower.
Nope, there is no such function, I'm afraid.
In a previous email, I reported that many backends wait for the lock 锟斤拷SerializableFinishedListLock锟斤拷;
If we don't implement functions like 锟斤拷ReleaseOneSerializableXact锟斤拷 quickly, they will be the bottleneck.
Yeah, that's probably what's causing the decrease in throughput you're
seeing.
You might need to also keep the linked lists, and use the hash table to
only look up particular items in the linked list faster.
I was surprised to see that you're creating a lot of smallish hash
tables, three for each PredXact entry. I would've expected all the
PredXact entries to share the same hash tables, i.e. have only three
hash tables in total. That would be more flexible in allocating
resources among entries. (It won't help with the quick-release, though)
2. Is the HTAB thread-safe? I would focus on removing some unnecessary locks if possible.
Nope, you need to hold a lock while searching/manipulating a HTAB hash
table. If the hash table gets big and you start to see lock contention,
you can partition it so that each operation only needs to lock the one
partition covering the search key.
- Heikki
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
From: "Heikki Linnakangas" <hlinnaka@iki.fi>
Hmm. The hash table ought to speed up the RWConflictExists() function
right? Where in the flame graph is RWConflictExists()? If it only
accounts for a small amount of the overall runtime, even drastic speedup
there won't make much difference to the total.
Yes. It is much smaller than we have expected. I did a small test for HTAB and linked list:
when the set size is smaller than 10, linked list is as quick as HTAB. Here is the result.
(x microseconds running 10 million searching)
setSize: 5, LIST: 423569, HTAB: 523681
setSize: 10, LIST: 540579, HTAB: 430111
setSize: 15, LIST: 752879, HTAB: 429519
setSize: 20, LIST: 940792, HTAB: 431388
setSize: 25, LIST: 1163760, HTAB: 446043
setSize: 30, LIST: 1352990, HTAB: 429057
setSize: 35, LIST: 1524097, HTAB: 431547
setSize: 40, LIST: 1714976, HTAB: 429023
As we can see, the hash table can only benefit in a very extreme situation.
However, even if there are 100 concurrent connections, the average length of conflict
list is about 10. linked list is not the bottleneck.
Nope, there is no such function, I'm afraid.
Oh, that's really a bad news.
In a previous email, I reported that many backends wait for the lock “SerializableFinishedListLock”;
If we don't implement functions like “ReleaseOneSerializableXact” quickly, they will be the bottleneck.Yeah, that's probably what's causing the decrease in throughput you're
seeing.
An new evidence: I use "SELECT wait_event_type, wait_event FROM pg_stat_activity;" and sum by event_type to analyze the wait event.
The result of original version:
SerializableXactHashLock 27
predicate_lock_manager 512
WALWriteLock 3
SerializableFinishedListLock 402
The result of new version:
SerializableXactHashLock 38
predicate_lock_manager 473
WALWriteLock 3
SerializableFinishedListLock 1068
Obviously, more backends are blocked by SerializableFinishedListLock.
You might need to also keep the linked lists, and use the hash table to
only look up particular items in the linked list faster.I was surprised to see that you're creating a lot of smallish hash
tables, three for each PredXact entry. I would've expected all the
PredXact entries to share the same hash tables, i.e. have only three
hash tables in total. That would be more flexible in allocating
resources among entries. (It won't help with the quick-release, though)
Yes, it would looks more elegant and require less memory resources.
( because hash table objects also consume memory )
But just for performance, it would be less efficient than my patch.
Because it has to maintain linked lists, besides hash tables. In other words,
it does more works than my patch.
Another point is that removing linked list may have more opportunities to reduce
lock contentions. Otherwise, we need a global lock to protect the linked list.
--
Sincerely
Mengxing Liu
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers