/*

create function qsort_comparisons(N int)
  returns table (N int, groups int,
                 avg_cmps int8, min_cmps int8, max_cmps int8,
                 note text)
strict volatile language c as '/path/to/count_comparisons.so';

select * from qsort_comparisons(10000);

 */

#include "postgres.h"

#include <math.h>

#include "common/pg_prng.h"
#include "fmgr.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "utils/tuplestore.h"

PG_MODULE_MAGIC;

/*
 * qsort_arg comparator function for integers.
 * The number of calls is tracked in *arg.
 */
static int
cmp_func(const void *a, const void *b, void *arg)
{
	int			aa = *(const int *) a;
	int			bb = *(const int *) b;
	size_t	   *count_ptr = (size_t *) arg;

	(*count_ptr)++;
	if (aa < bb)
		return -1;
	if (aa > bb)
		return 1;
	return 0;
}

/*
 * Randomly shuffle an array of integers.
 */
static void
shuffle(int *x, int n)
{
	/*
	 * Shuffle array using Fisher-Yates algorithm. Iterate array and swap head
	 * (i) with a randomly chosen same-or-later item (j) at each iteration.
	 */
	for (int i = 0; i < n; i++)
	{
		int			j = (int) pg_prng_uint64_range(&pg_global_prng_state,
												   i, n - 1);
		int			tmp = x[i];

		x[i] = x[j];
		x[j] = tmp;
	}
}

/*
 * Test qsort for the given test case.
 * Shuffle the given array, sort it using qsort_arg,
 * and report the numbers of comparisons made.
 */
static void
qsort_test_case(int *array, int n, char *note, int trials,
				Tuplestorestate *tupstore, AttInMetadata *attinmeta)
{
	size_t		tot_count = 0;
	size_t		min_count = SIZE_MAX;
	size_t		max_count = 0;
	int			groups;
	HeapTuple	tuple;
	char	   *values[6];
	char		v0[32];
	char		v1[32];
	char		v2[32];
	char		v3[32];
	char		v4[32];

	for (int t = 0; t < trials; t++)
	{
		size_t		count = 0;

		CHECK_FOR_INTERRUPTS();
		shuffle(array, n);
		qsort_arg(array, n, sizeof(int), cmp_func, &count);
		tot_count += count;
		min_count = Min(min_count, count);
		max_count = Max(max_count, count);
	}

	/* count groups in sorted array */
	groups = 1;
	for (int i = 1; i < n; i++)
	{
		if (array[i - 1] != array[i])
			groups++;
	}

	snprintf(v0, sizeof(v0), "%d", n);
	snprintf(v1, sizeof(v1), "%d", groups);
	snprintf(v2, sizeof(v2), "%zd", tot_count / trials);
	snprintf(v3, sizeof(v3), "%zd", min_count);
	snprintf(v4, sizeof(v4), "%zd", max_count);
	values[0] = v0;
	values[1] = v1;
	values[2] = v2;
	values[3] = v3;
	values[4] = v4;
	values[5] = note;

	tuple = BuildTupleFromCStrings(attinmeta, values);
	tuplestore_puttuple(tupstore, tuple);
	heap_freetuple(tuple);
}

/* zipfian code borrowed from pgbench */

/*
 * Computing zipfian using rejection method, based on
 * "Non-Uniform Random Variate Generation",
 * Luc Devroye, p. 550-551, Springer 1986.
 *
 * This works for s > 1.0, but may perform badly for s very close to 1.0.
 */
static int64
computeIterativeZipfian(pg_prng_state *state, int64 n, double s)
{
	double		b = pow(2.0, s - 1.0);
	double		x,
				t,
				u,
				v;

	/* Ensure n is sane */
	if (n <= 1)
		return 1;

	while (true)
	{
		/* random variates */
		u = pg_prng_double(state);
		v = pg_prng_double(state);

		x = floor(pow(u, -1.0 / (s - 1.0)));

		t = pow(1.0 + 1.0 / x, s - 1.0);
		/* reject if too large or out of bound */
		if (v * x * (t - 1.0) / (b - 1.0) <= t / b && x <= n)
			break;
	}
	return (int64) x;
}

/* random number generator: zipfian distribution from min to max inclusive */
static int64
getZipfianRand(pg_prng_state *state, int64 min, int64 max, double s)
{
	int64		n = max - min + 1;

	return min - 1 + computeIterativeZipfian(state, n, s);
}

/*
 * qsort_comparisons(N int) returns table
 */
PG_FUNCTION_INFO_V1(qsort_comparisons);
Datum
qsort_comparisons(PG_FUNCTION_ARGS)
{
	int32		N = PG_GETARG_INT32(0);
	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
	Tuplestorestate *tupstore;
	TupleDesc	tupdesc;
	AttInMetadata *attinmeta;
	int		   *array;
	MemoryContext per_query_ctx;
	MemoryContext oldcontext;

	/* check to see if caller supports us returning a tuplestore */
	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
		ereport(ERROR,
				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
				 errmsg("set-valued function called in context that cannot accept a set")));
	if (!(rsinfo->allowedModes & SFRM_Materialize))
		ereport(ERROR,
				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
				 errmsg("materialize mode required, but it is not allowed in this context")));

	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;

	/* get a tuple descriptor for our result type */
	switch (get_call_result_type(fcinfo, NULL, &tupdesc))
	{
		case TYPEFUNC_COMPOSITE:
			/* success */
			break;
		case TYPEFUNC_RECORD:
			/* failed to determine actual type of RECORD */
			ereport(ERROR,
					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
					 errmsg("function returning record called in context "
							"that cannot accept type record")));
			break;
		default:
			/* result type isn't composite */
			ereport(ERROR,
					(errcode(ERRCODE_DATATYPE_MISMATCH),
					 errmsg("return type must be a row type")));
			break;
	}

	/* make sure we have a persistent copy of the result tupdesc */
	oldcontext = MemoryContextSwitchTo(per_query_ctx);

	tupdesc = CreateTupleDescCopy(tupdesc);

	/* initialize our tuplestore in long-lived context */
	tupstore =
		tuplestore_begin_heap(rsinfo->allowedModes & SFRM_Materialize_Random,
							  false, work_mem);

	MemoryContextSwitchTo(oldcontext);

	/* Set up to construct tuples */
	attinmeta = TupleDescGetAttInMetadata(tupdesc);

	/* Allocate test array */
	array = (int *) palloc(N * sizeof(int));

	/* All-the-same */
	for (int i = 0; i < N; i++)
		array[i] = 1;
	qsort_test_case(array, N, "all values the same", 10,
					tupstore, attinmeta);

	/* All different */
	for (int i = 0; i < N; i++)
		array[i] = i;
	qsort_test_case(array, N, "all values distinct", 1000,
					tupstore, attinmeta);

	/* Various uniform group sizes */
	for (int i = 0; i < N; i++)
		array[i] = i % 2;
	qsort_test_case(array, N, "2 distinct values", 1000,
					tupstore, attinmeta);
	for (int i = 0; i < N; i++)
		array[i] = i % 16;
	qsort_test_case(array, N, "16 distinct values", 1000,
					tupstore, attinmeta);
	for (int i = 0; i < N; i++)
		array[i] = i % 64;
	qsort_test_case(array, N, "64 distinct values", 1000,
					tupstore, attinmeta);
	for (int i = 0; i < N; i++)
		array[i] = i % 256;
	qsort_test_case(array, N, "256 distinct values", 1000,
					tupstore, attinmeta);
	for (int i = 0; i < N; i++)
		array[i] = i % 1024;
	qsort_test_case(array, N, "1024 distinct values", 1000,
					tupstore, attinmeta);

	/* Zipfian */
	for (int i = 0; i < N; i++)
		array[i] = (int) getZipfianRand(&pg_global_prng_state, 0, 1000000,
										1.1);
	qsort_test_case(array, N, "Zipfian, parameter 1.1", 1000,
					tupstore, attinmeta);
	for (int i = 0; i < N; i++)
		array[i] = (int) getZipfianRand(&pg_global_prng_state, 0, 1000000,
										1.5);
	qsort_test_case(array, N, "Zipfian, parameter 1.5", 1000,
					tupstore, attinmeta);
	for (int i = 0; i < N; i++)
		array[i] = (int) getZipfianRand(&pg_global_prng_state, 0, 1000000,
										3.0);
	qsort_test_case(array, N, "Zipfian, parameter 3.0", 1000,
					tupstore, attinmeta);

	/* let the caller know we're sending back a tuplestore */
	rsinfo->returnMode = SFRM_Materialize;
	rsinfo->setResult = tupstore;
	rsinfo->setDesc = tupdesc;

	return (Datum) 0;
}
