// Standard C++ headers
#include <iostream>
#include <sstream>
#include <stdexcept>

// PostgreSQL libpq headers
#include "libpq-fe.h"
#include "libpq/libpq-fs.h"

// Intel TBB header for threading
#include <tbb/tbb_thread.h>
#include <tbb/spin_mutex.h>

#include <cstdlib>

#if defined(_WIN32)
#  include <windows.h>
#  define sleep Sleep
#else
#  include <unistd.h>
#endif

class TestException: public std::exception
{
  std::string m_what; 
public:
  TestException(const std::string& what) : m_what(what) { }
  ~TestException() throw() { }
  virtual const char* what() const throw()
  {
    return "My exception happened";
  }
};

// Flag used to control thread exit
static bool terminate = false;

// This class does an insert into the test table
struct Inserter
{
  void operator()()
  {
    // Establish a connection
    PGconn* conn = PQconnectdb("user=postgres password=1234");

    // Insert data into table indefinitely
    int i=0;
    while(!terminate)
    {
      // SQL Statement
      std::stringstream insert;
      insert << "INSERT INTO tmp (value) VALUES (" << i%250 << ");";
      std::string insertStr = insert.str();
      const char* c_str     = insertStr.c_str();

      // Execute query
      PGresult* res=PQexec(conn,c_str);
      if (PQresultStatus(res) == PGRES_FATAL_ERROR)
      {
        std::cerr << "Error in inserting data:\nError code: " << PQresStatus(PQresultStatus(res)) << "Error Message: " << PQerrorMessage(conn);
        PQclear(res);
        PQfinish(conn);
        throw TestException( "Inserter::PQexec() failed." );
      }
      PQclear(res);

      // Increment index
      i++;
    }
    PQfinish(conn);
  }
};

struct Queryer
{
  void operator()()
  {
    // Establish a connection
    PGconn* conn = PQconnectdb("user=postgres password=1234");

    // Retrieve data from the test table indefinitely
    int i=1;
    while (!terminate)
    {
      // SQL statement - read the top 1000 values off `tmp'
      std::stringstream query;
      query << "SELECT * FROM tmp WHERE id > (SELECT last_value - 1000 FROM tmp_id_seq);";
      std::string queryStr = query.str();
      const char* c_str    = queryStr.c_str();

      // Execute query
      PGresult* res=PQexec(conn, c_str);
      if (PQresultStatus(res) == PGRES_FATAL_ERROR)
      {
        std::cerr << "Error in searching data:\nError code: " << PQresStatus(PQresultStatus(res)) << "Error Message: " << PQerrorMessage(conn);
        PQclear(res);
        PQfinish(conn);
        throw TestException( "Queryer::PQexec() failed." );
      }
      PQclear(res);

      // Increment index
      i++;
    }
    PQfinish(conn);
  }

};

int main(int argc, char * argv[])
{
  int sleep_time = 0;
  if (argc > 1) {
    sleep_time = atoi(argv[1]);
  }

  std::cerr << std::string("Libpq is ") + (PQisthreadsafe() ? "" : "not ") + "threadsafe" << std::endl;

  // Establish a connection
  PGconn* conn = PQconnectdb("user=postgres password=1234");

  // Create the test table
  std::cout << "Creating table...\n";
  PGresult* res=PQexec(conn,"DROP TABLE IF EXISTS tmp; CREATE TABLE tmp (id SERIAL8 PRIMARY KEY,value INT);");
  if (PQresultStatus(res) == PGRES_FATAL_ERROR)
  {
    std::cerr << "Error in Creating table:\nError code: " << PQresStatus(PQresultStatus(res)) << "Error Message: " << PQerrorMessage(conn);
    PQclear(res);
    PQfinish(conn);
    return 1;
  }
  // Clear and close the current connection
  PQclear(res);
  PQfinish(conn);

  // Launch thread that does INSERT
  std::cout << "Starting table filling thread...\n";
  Inserter inserter;
  tbb::tbb_thread inserter_thread(inserter);

  // Launch thread that does SELECT ...This thread causes memory in postgres.exe to slowly go up indefinitely
  std::cout << "Starting table searching thread...\n";
  Queryer selector;
  tbb::tbb_thread selector_thread(selector);

  // run for timer
  if (sleep_time) {
    sleep( sleep_time );
    terminate = true;
  }

  inserter_thread.join();
  selector_thread.join();

  // and terminate
  return 0;
}
