/*
 * dblink.c
 *
 * Functions returning results from a remote database
 *
 * Copyright (c) Joseph Conway <joe.conway@mail.com>, 2001;
 * 
 * Permission to use, copy, modify, and distribute this software and its
 * documentation for any purpose, without fee, and without a written agreement
 * is hereby granted, provided that the above copyright notice and this
 * paragraph and the following two paragraphs appear in all copies.
 * 
 * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
 * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
 * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 * 
 * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
 * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
 * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
 *
 */

#include "dblink.h"

PG_FUNCTION_INFO_V1(dblink);
Datum
dblink(PG_FUNCTION_ARGS)
{
	PGconn			*conn = NULL;
	PGresult		*res = NULL;
	dblink_results	*results;
	char			*fldsep;
	char			*optstr;
	char			*sqlstatement;
	char			*curstr = "DECLARE mycursor CURSOR FOR ";
	char			*execstatement;
	char			*msg;
	int				ntuples = 0;
	int				nfields = 0;
	ReturnSetInfo 	*rsi;
	char			*result;
	text			*result_text;
	int				j = 0;
	int				text_len = 0;

	if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2)) {
		elog(ERROR, "dblink: NULL arguments are not permitted");
	}

	if (fcinfo->resultinfo == NULL || ! IsA(fcinfo->resultinfo, ReturnSetInfo)) {
		elog(ERROR, "dblink: function called in context that does not accept a set result");
	}

	optstr = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(0))));
	sqlstatement = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(1))));
	fldsep = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(2))));

	if (fcinfo->flinfo->fn_extra == NULL) {

		conn = PQconnectdb(optstr);
		if (PQstatus(conn) == CONNECTION_BAD)
		{
			msg = PQerrorMessage(conn);
			PQfinish(conn);
			elog(ERROR, "dblink: connection error: %s", msg);
		}

		res = PQexec(conn, "BEGIN");
		if (PQresultStatus(res) != PGRES_COMMAND_OK)
		{
			msg = PQerrorMessage(conn);
			PQclear(res);
			PQfinish(conn);
			elog(ERROR, "dblink: begin error: %s", msg);
		}
		PQclear(res);


		execstatement = (char *) palloc(strlen(curstr) + strlen(sqlstatement) + 1);
		if (execstatement != NULL) {
			strcpy(execstatement, curstr);
			strcat(execstatement, sqlstatement);
			strcat(execstatement, "\0");
		} else {
			elog(ERROR, "dblink: insufficient memory" );
		}

		res = PQexec(conn, execstatement);
		if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK))
		{
			msg = PQerrorMessage(conn);
			PQclear(res);
			PQfinish(conn);
			elog(ERROR, "dblink: sql error: %s", msg);
		} else {
			/*
			 * got results, start fetching them
			 */
		    PQclear(res);

		    res = PQexec(conn, "FETCH ALL in mycursor");
		    if (!res || PQresultStatus(res) != PGRES_TUPLES_OK) {
				msg = PQerrorMessage(conn);
				PQclear(res);
				PQfinish(conn);
				elog(ERROR, "dblink: sql error: %s", msg);
		    }

			ntuples = PQntuples(res);

			if (ntuples > 0) {

				results = init_dblink_results(fcinfo->flinfo->fn_mcxt);

				results->fldsep = fldsep;
				results->tup_num = 0;
				results->res = res;
				res = NULL;

				(dblink_results *) fcinfo->flinfo->fn_extra = results;

				results = NULL;
				results = fcinfo->flinfo->fn_extra;

				nfields = PQnfields(results->res);

				for (j = 0; j < nfields; j++) {
					text_len += PQgetlength(results->res, results->tup_num, j);
				}

				text_len += (nfields - 1) * strlen(results->fldsep);
				result = (char *) palloc(text_len + 1);

				if (result != NULL) {
					strcpy(result, PQgetvalue(results->res, results->tup_num, 0));
					for (j = 1; j < nfields; j++) {
						result = strcat(result, results->fldsep);
						result = strcat(result, PQgetvalue(results->res, results->tup_num, j));
					}
					strcat(result, "\0");
				} else {
					elog(ERROR, "dblink: insufficient memory" );
				}

				result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result)));

			    /* close the cursor */
			    res = PQexec(conn, "CLOSE mycursor");
			    PQclear(res);

			    /* commit the transaction */
			    res = PQexec(conn, "COMMIT");
			    PQclear(res);

			    /* close the connection to the database and cleanup */
			    PQfinish(conn);

				rsi = (ReturnSetInfo *)fcinfo->resultinfo;
				rsi->isDone = ExprMultipleResult;

				PG_RETURN_TEXT_P(result_text);

			} else {

			    PQclear(res);

			    /* close the cursor */
			    res = PQexec(conn, "CLOSE mycursor");
			    PQclear(res);

			    /* commit the transaction */
			    res = PQexec(conn, "COMMIT");
			    PQclear(res);

			    /* close the connection to the database and cleanup */
			    PQfinish(conn);

				rsi = (ReturnSetInfo *)fcinfo->resultinfo;
				rsi->isDone = ExprEndResult ;

				PG_RETURN_NULL();
			}
		}
	} else {
		/*
		 * check for more results
		 */

		results = fcinfo->flinfo->fn_extra;

		ntuples = PQntuples(results->res);
		results->tup_num++;

		if (results->tup_num < ntuples) {

			/*
			 * fetch them if available
			 */

			nfields = PQnfields(results->res);

			for (j = 0; j < nfields; j++) {
				text_len += PQgetlength(results->res, results->tup_num, j);
			}

			text_len += (nfields - 1) * strlen(results->fldsep);
			result = (char *) palloc(text_len + 1);

			if (result != NULL) {
				strcpy(result, PQgetvalue(results->res, results->tup_num, 0));
				for (j = 1; j < nfields; j++) {
					result = strcat(result, results->fldsep);
					result = strcat(result, PQgetvalue(results->res, results->tup_num, j));
				}
				strcat(result, "\0");
			} else {
				elog(ERROR, "dblink: insufficient memory" );
			}

			result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result)));

			rsi = (ReturnSetInfo *)fcinfo->resultinfo;
			rsi->isDone = ExprMultipleResult;

			PG_RETURN_TEXT_P(result_text);


		} else {

			/*
			 * or if no more, clean things up
			 */

			results = fcinfo->flinfo->fn_extra;

		    PQclear(results->res);

			rsi = (ReturnSetInfo *)fcinfo->resultinfo;
			rsi->isDone = ExprEndResult ;

			PG_RETURN_NULL();
		}
	}
	PG_RETURN_NULL();
}


/*
 * dblink_tok
 * parse dblink output string
 * return fldnum item (0 based)
 * based on provided field separator
 */

PG_FUNCTION_INFO_V1(dblink_tok);
Datum
dblink_tok(PG_FUNCTION_ARGS)
{
	char		*fldtext;
	char		*fldsep;
	int			fldnum;
	char		*buffer = "\0";
	int			j = 0;
	text		*result_text;


	if (PG_ARGISNULL(0) || PG_ARGISNULL(1)) {
		elog(ERROR, "dblink: NULL arguments are not permitted");
	}

	fldtext = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(0))));
	fldsep = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(1))));
	fldnum = PG_GETARG_INT32(2);

	if (fldnum < 0) {
		elog(ERROR, "dblink: field number < 0 not permitted");
	}

	buffer = strtok(fldtext, fldsep);
	for (j = 1; j < fldnum + 1; j++) {
		buffer = strtok(NULL, fldsep);
		if (buffer == NULL) {
			elog(ERROR, "dblink: field number %d does not exist", fldnum);
		}
	} 

	result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(buffer)));
	PG_RETURN_TEXT_P(result_text);
}


/*
 * internal functions
 */


/*
 * init_dblink_results
 *	 - create an empty dblink_results data structure
 */
dblink_results *
init_dblink_results(MemoryContext fn_mcxt)
{
	MemoryContext	oldcontext;
	dblink_results	*retval;

	oldcontext = MemoryContextSwitchTo(fn_mcxt);

	retval = (dblink_results *) palloc(sizeof(dblink_results));
	MemSet(retval, 0, sizeof(dblink_results));

	retval->tup_num = -1;
	retval->fldsep = NULL;
	retval->res = NULL;

	MemoryContextSwitchTo(oldcontext);

	return retval;
}
