#define _GNU_SOURCE

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <assert.h>

#include <libpq-fe.h>


// Comment this to make serialisation anomalies go away.
#define WITH_INDEX


// I have seen the problem with as few as 3 threads. 32 threads make
// the issue appear much sooner.
#define NR_THREADS (32)
#define NR_RUNS (1024 * 1024)

static PGconn *conns[NR_THREADS];

static void* try_acquire_lock(void *arg)
{
        PGconn *c = arg;
        PGresult *res;
        int ntuples;

        res = PQexec(c, "BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE");
        assert(PQresultStatus(res) == PGRES_COMMAND_OK);
        PQclear(res);

        res = PQexec(c, "SELECT * FROM locks WHERE path = '{42}'");
        assert(PQresultStatus(res) == PGRES_TUPLES_OK);
        ntuples = PQntuples(res);
        PQclear(res);

        if (ntuples > 0) {
                // someone else already has a lock
                res = PQexec(c, "COMMIT");
                assert(PQresultStatus(res) == PGRES_COMMAND_OK);
                PQclear(res);
                return NULL;
        }

        res = PQexec(c, "INSERT INTO locks(path) VALUES('{42}')");
        PQclear(res);

        res = PQexec(c, "COMMIT");
        PQclear(res);

        return NULL;
}

static void test_once(void)
{
        PGconn *c = conns[0];
        PGresult *res;
        int ntuples;

        pthread_t thrs[NR_THREADS];
        for (int i = 0; i < NR_THREADS; ++i)
                pthread_create(&thrs[i], NULL, &try_acquire_lock,
conns[i]);
        for (int i = 0; i < NR_THREADS; ++i)
                pthread_join(thrs[i], NULL);

        res = PQexec(c, "SELECT * FROM locks WHERE path = '{42}'");
        assert(PQresultStatus(res) == PGRES_TUPLES_OK);
        ntuples = PQntuples(res);
        PQclear(res);

        if (ntuples != 1)
                printf("ntuples = %d\n", ntuples);
        assert(ntuples == 1);

        res = PQexec(c, "TRUNCATE TABLE locks");
        assert(PQresultStatus(res) == PGRES_COMMAND_OK);
        PQclear(res);

        res = PQexec(c, "INSERT INTO locks VALUES ('{666}')");
        assert(PQresultStatus(res) == PGRES_COMMAND_OK);
        PQclear(res);
}

static void prepare_db(void)
{
        PGconn *c = conns[0];
        PGresult *res;

        res = PQexec(c, "DROP TABLE locks");
        PQclear(res);

        res = PQexec(c, "CREATE TABLE locks (path int[] NOT NULL)");
        assert(PQresultStatus(res) == PGRES_COMMAND_OK);
        PQclear(res);

#ifdef WITH_INDEX
        res = PQexec(c, "CREATE INDEX ON locks USING gin(path)");
        assert(PQresultStatus(res) == PGRES_COMMAND_OK);
        PQclear(res);
#endif
}

int main(void)
{
        const char *connstr = getenv("DB_CONNSTR");
        if (connstr == NULL)
                connstr = "dbname=postgres";

        for (int i = 0; i < NR_THREADS; ++i) {
                conns[i] = PQconnectdb(connstr);
                assert(PQstatus(conns[i]) == CONNECTION_OK);
        }
        prepare_db();

        for (int i = 0; i < NR_RUNS; ++i)
                test_once();

        for (int i = 0; i < NR_THREADS; ++i)
                PQfinish(conns[i]);

        return 0;
}
