plpq

Started by Darko Prenosilabout 23 years ago4 messages
#1Darko Prenosil
darko.prenosil@finteh.hr
4 attachment(s)

Tree weeks later than I promised, but it is finished (I hope).

In attachment are files:
dblink.c
dblink.h
dblink.sql.in
pqtest.sql

In file pqtest.sql is sample queries and results. It seem OK to me.

There are two reasons why I did not make a diff.

1. The source I started from is 7.3b1, not the latest.
2. I would like You to check the code, especially the part that touches memory
management.
I can say that it works, but I do not know exactly why, and this can be
dangerous. With my knowledge of postgres internals this is
as far I can go at the moment. And once more sorry for bad English !

Regards !

Attachments:

dblink.sql.intext/plain; charset=us-ascii; name=dblink.sql.inDownload
-- Uncomment the following 9 lines to use original DEPRECATED functions
--CREATE OR REPLACE FUNCTION dblink (text,text) RETURNS setof int
--  AS 'MODULE_PATHNAME','dblink' LANGUAGE 'c'
--  WITH (isstrict);
--CREATE OR REPLACE FUNCTION dblink_tok (int,int) RETURNS text
--  AS 'MODULE_PATHNAME','dblink_tok' LANGUAGE 'c'
--  WITH (isstrict);
--CREATE OR REPLACE FUNCTION dblink_last_oid (int) RETURNS oid
--  AS 'MODULE_PATHNAME','dblink_last_oid' LANGUAGE 'c'
--  WITH (isstrict);

CREATE OR REPLACE FUNCTION dblink_connect (text) RETURNS text
  AS 'MODULE_PATHNAME','dblink_connect' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION dblink_disconnect () RETURNS text
  AS 'MODULE_PATHNAME','dblink_disconnect' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION dblink_open (text,text) RETURNS text
  AS 'MODULE_PATHNAME','dblink_open' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION dblink_fetch (text,int) RETURNS setof record
  AS 'MODULE_PATHNAME','dblink_fetch' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION dblink_close (text) RETURNS text
  AS 'MODULE_PATHNAME','dblink_close' LANGUAGE 'c'
  WITH (isstrict);

-- Note: if this is a first time install of dblink, the following DROP
-- FUNCTION line is expected to fail.
-- Comment out the following 4 lines if the DEPRECATED functions are used.
DROP FUNCTION dblink (text,text);
CREATE OR REPLACE FUNCTION dblink (text,text) RETURNS setof record
  AS 'MODULE_PATHNAME','dblink_record' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION dblink (text) RETURNS setof record
  AS 'MODULE_PATHNAME','dblink_record' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION dblink_exec (text,text) RETURNS text
  AS 'MODULE_PATHNAME','dblink_exec' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION dblink_exec (text) RETURNS text
  AS 'MODULE_PATHNAME','dblink_exec' LANGUAGE 'c'
  WITH (isstrict);

CREATE TYPE dblink_pkey_results AS (position int4, colname text);

CREATE OR REPLACE FUNCTION dblink_get_pkey (text) RETURNS setof dblink_pkey_results
  AS 'MODULE_PATHNAME','dblink_get_pkey' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION dblink_build_sql_insert (text, int2vector, int2, _text, _text) RETURNS text
  AS 'MODULE_PATHNAME','dblink_build_sql_insert' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION dblink_build_sql_delete (text, int2vector, int2, _text) RETURNS text
  AS 'MODULE_PATHNAME','dblink_build_sql_delete' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION dblink_build_sql_update (text, int2vector, int2, _text, _text) RETURNS text
  AS 'MODULE_PATHNAME','dblink_build_sql_update' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION dblink_current_query () RETURNS text
  AS 'MODULE_PATHNAME','dblink_current_query' LANGUAGE 'c';

-- ******************************************************************************
CREATE OR REPLACE FUNCTION PQconnectdb (text) RETURNS int
  AS 'MODULE_PATHNAME','dblink_pq_Connectdb' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION PQsetdbLogin (text,text,text,text,text,text,text) RETURNS int
  AS 'MODULE_PATHNAME','dblink_pq_SetdbLogin' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION PQfinish (int) RETURNS int
  AS 'MODULE_PATHNAME','dblink_pq_Finish' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION PQreset (int) RETURNS int
  AS 'MODULE_PATHNAME','dblink_pq_Reset' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION PQstatus (int) RETURNS int
  AS 'MODULE_PATHNAME','dblink_pq_Status' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION PQstatusStr (int) RETURNS text
  AS 'MODULE_PATHNAME','dblink_pq_StatusStr' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION PQdb (int) RETURNS text
  AS 'MODULE_PATHNAME','dblink_pq_Db' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION PQuser (int) RETURNS text
  AS 'MODULE_PATHNAME','dblink_pq_User' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION PQpass (int) RETURNS text
  AS 'MODULE_PATHNAME','dblink_pq_Password' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION PQhost (int) RETURNS text
  AS 'MODULE_PATHNAME','dblink_pq_Host' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION PQport (int) RETURNS text
  AS 'MODULE_PATHNAME','dblink_pq_Port' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION PQtty (int) RETURNS text
  AS 'MODULE_PATHNAME','dblink_pq_Tty' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION PQerrorMessage (int) RETURNS text
  AS 'MODULE_PATHNAME','dblink_pq_ErrorMessage' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION PQbackendPID (int) RETURNS int
  AS 'MODULE_PATHNAME','dblink_pq_BackendPID' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION PQexec (int,text) RETURNS int
  AS 'MODULE_PATHNAME','dblink_pq_Exec' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION PQresultStatus (int) RETURNS int
  AS 'MODULE_PATHNAME','dblink_pq_ResultStatus' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION PQresStatus (int) RETURNS text
  AS 'MODULE_PATHNAME','dblink_pq_ResStatus' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION PQresultErrorMessage (int) RETURNS text
  AS 'MODULE_PATHNAME','dblink_pq_ResultErrorMessage' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION PQclear (int) RETURNS int
  AS 'MODULE_PATHNAME','dblink_pq_Clear' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION PQescapeString (text) RETURNS text
  AS 'MODULE_PATHNAME','dblink_pq_EscapeString' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION PQntuples (int) RETURNS int
  AS 'MODULE_PATHNAME','dblink_pq_Ntuples' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION PQnfields (int) RETURNS int
  AS 'MODULE_PATHNAME','dblink_pq_Nfields' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION PQfname (int,int) RETURNS text
  AS 'MODULE_PATHNAME','dblink_pq_Fname' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION PQfnumber (int,text) RETURNS int
  AS 'MODULE_PATHNAME','dblink_pq_Fnumber' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION PQftype (int,int) RETURNS oid
  AS 'MODULE_PATHNAME','dblink_pq_Ftype' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION PQfmod (int,int) RETURNS int
  AS 'MODULE_PATHNAME','dblink_pq_Fmod' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION PQfsize (int,int) RETURNS int
  AS 'MODULE_PATHNAME','dblink_pq_Fsize' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION PQBinaryTuples (int) RETURNS int
  AS 'MODULE_PATHNAME','dblink_pq_BinaryTuples' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION PQgetvalue(int,int,int) RETURNS text
  AS 'MODULE_PATHNAME','dblink_pq_GetValue' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION PQgetisnull(int,int,int) RETURNS int
  AS 'MODULE_PATHNAME','dblink_pq_Getisnull' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION PQgetlength(int,int,int) RETURNS int
  AS 'MODULE_PATHNAME','dblink_pq_Getlength' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION PQcmdStatus(int) RETURNS text
  AS 'MODULE_PATHNAME','dblink_pq_CmdStatus' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION PQcmdTuples(int) RETURNS int
  AS 'MODULE_PATHNAME','dblink_pq_CmdTuples' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION PQoidValue(int) RETURNS int
  AS 'MODULE_PATHNAME','dblink_pq_OidValue' LANGUAGE 'c'
  WITH (isstrict);

CREATE OR REPLACE FUNCTION PQoidStatus(int) RETURNS text
  AS 'MODULE_PATHNAME','dblink_pq_OidStatus' LANGUAGE 'c'
  WITH (isstrict);
dblink.htext/x-chdr; charset=us-ascii; name=dblink.hDownload
dblink.ctext/x-csrc; charset=us-ascii; name=dblink.cDownload
/*
 * dblink.c
 *
 * Functions returning results from a remote database
 *
 * Joe Conway <mail@joeconway.com>
 *
 * Copyright (c) 2001, 2002 by PostgreSQL Global Development Group
 * ALL RIGHTS RESERVED;
 *
 * Permission to use, copy, modify, and distribute this software and its
 * documentation for any purpose, without fee, and without a written agreement
 * is hereby granted, provided that the above copyright notice and this
 * paragraph and the following two paragraphs appear in all copies.
 *
 * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
 * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
 * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 *
 * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
 * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
 * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
 *
 */
#include "postgres.h"

#include "libpq-fe.h"

#include "fmgr.h"
#include "funcapi.h"
#include "access/tupdesc.h"
#include "access/heapam.h"
#include "catalog/catname.h"
#include "catalog/namespace.h"
#include "catalog/pg_index.h"
#include "catalog/pg_type.h"
#include "executor/executor.h"
#include "executor/spi.h"
#include "lib/stringinfo.h"
#include "nodes/nodes.h"
#include "nodes/execnodes.h"
#include "nodes/pg_list.h"
#include "parser/parse_type.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/array.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"

#include "dblink.h"

/*
 * Internal declarations
 */
static dblink_results *init_dblink_results(MemoryContext fn_mcxt);
static public_pq_connections *init_public_connections();
static public_pq_results *init_public_results();
static char **get_pkey_attnames(Oid relid, int16 *numatts);
static char *get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
static char *get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals);
static char *get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
static char *quote_literal_cstr(char *rawstr);
static char *quote_ident_cstr(char *rawstr);
static int16 get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key);
static HeapTuple get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals);
static Oid	get_relid_from_relname(text *relname_text);
static dblink_results *get_res_ptr(int32 res_id_index);
static void append_res_ptr(dblink_results * results);
static void remove_res_ptr(dblink_results * results);
static TupleDesc pgresultGetTupleDesc(PGresult *res);

static public_pq_connections *get_public_conn(int32 public_conn_id_index);
static void append_public_conn(public_pq_connections * connections);
static void remove_public_conn(public_pq_connections * connections);

static public_pq_results *get_public_res(int32 public_res_id_index);
static void append_public_res(public_pq_results * results);
static void remove_public_res(public_pq_results * results);

static PGresult* getResultPointer(int32 nResId, bool removePointer);
static PGconn* getConnPointer(int32 nConnId,bool removePointer);


/* Global */
List	   *res_id = NIL;
int			res_id_index = 0;

List	   *public_conn_id = NIL;
int			public_conn_id_index = 0;
List	   *public_res_id = NIL;
int			public_res_id_index = 0;

PGconn	   *persistent_conn = NULL;

#define GET_TEXT(cstrp) DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(cstrp)))
#define GET_STR(textp) DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(textp)))
#define xpfree(var_) \
	do { \
		if (var_ != NULL) \
		{ \
			pfree(var_); \
			var_ = NULL; \
		} \
	} while (0)


/*
 * Create a persistent connection to another database
 */
PG_FUNCTION_INFO_V1(dblink_connect);
Datum
dblink_connect(PG_FUNCTION_ARGS)
{
	char	   *connstr = GET_STR(PG_GETARG_TEXT_P(0));
	char	   *msg;
	text	   *result_text;
	MemoryContext oldcontext;

	if (persistent_conn != NULL)
		PQfinish(persistent_conn);

	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
	persistent_conn = PQconnectdb(connstr);
	MemoryContextSwitchTo(oldcontext);

	if (PQstatus(persistent_conn) == CONNECTION_BAD)
	{
		msg = pstrdup(PQerrorMessage(persistent_conn));
		PQfinish(persistent_conn);
		persistent_conn = NULL;
		elog(ERROR, "dblink_connect: connection error: %s", msg);
	}

	result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK")));
	PG_RETURN_TEXT_P(result_text);
}

/*
 * Clear a persistent connection to another database
 */
PG_FUNCTION_INFO_V1(dblink_disconnect);
Datum
dblink_disconnect(PG_FUNCTION_ARGS)
{
	text	   *result_text;

	if (persistent_conn != NULL)
		PQfinish(persistent_conn);

	persistent_conn = NULL;

	result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK")));
	PG_RETURN_TEXT_P(result_text);
}

/*
 * opens a cursor using a persistent connection
 */
PG_FUNCTION_INFO_V1(dblink_open);
Datum
dblink_open(PG_FUNCTION_ARGS)
{
	char	   *msg;
	PGresult   *res = NULL;
	PGconn	   *conn = NULL;
	text	   *result_text;
	char	   *curname = GET_STR(PG_GETARG_TEXT_P(0));
	char	   *sql = GET_STR(PG_GETARG_TEXT_P(1));
	StringInfo	str = makeStringInfo();

	if (persistent_conn != NULL)
		conn = persistent_conn;
	else
		elog(ERROR, "dblink_open: no connection available");

	res = PQexec(conn, "BEGIN");
	if (PQresultStatus(res) != PGRES_COMMAND_OK)
	{
		msg = pstrdup(PQerrorMessage(conn));
		PQclear(res);

		PQfinish(conn);
		persistent_conn = NULL;

		elog(ERROR, "dblink_open: begin error: %s", msg);
	}
	PQclear(res);

	appendStringInfo(str, "DECLARE %s CURSOR FOR %s", quote_ident_cstr(curname), sql);
	res = PQexec(conn, str->data);
	if (!res ||
		(PQresultStatus(res) != PGRES_COMMAND_OK &&
		 PQresultStatus(res) != PGRES_TUPLES_OK))
	{
		msg = pstrdup(PQerrorMessage(conn));

		PQclear(res);

		PQfinish(conn);
		persistent_conn = NULL;

		elog(ERROR, "dblink: sql error: %s", msg);
	}

	result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK")));
	PG_RETURN_TEXT_P(result_text);
}

/*
 * closes a cursor
 */
PG_FUNCTION_INFO_V1(dblink_close);
Datum
dblink_close(PG_FUNCTION_ARGS)
{
	PGconn	   *conn = NULL;
	PGresult   *res = NULL;
	char	   *curname = GET_STR(PG_GETARG_TEXT_P(0));
	StringInfo	str = makeStringInfo();
	text	   *result_text;
	char	   *msg;

	if (persistent_conn != NULL)
		conn = persistent_conn;
	else
		elog(ERROR, "dblink_close: no connection available");

	appendStringInfo(str, "CLOSE %s", quote_ident_cstr(curname));

	/* close the cursor */
	res = PQexec(conn, str->data);
	if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
	{
		msg = pstrdup(PQerrorMessage(conn));
		PQclear(res);

		PQfinish(persistent_conn);
		persistent_conn = NULL;

		elog(ERROR, "dblink_close: sql error: %s", msg);
	}

	PQclear(res);

	/* commit the transaction */
	res = PQexec(conn, "COMMIT");
	if (PQresultStatus(res) != PGRES_COMMAND_OK)
	{
		msg = pstrdup(PQerrorMessage(conn));
		PQclear(res);

		PQfinish(persistent_conn);
		persistent_conn = NULL;

		elog(ERROR, "dblink_close: commit error: %s", msg);
	}
	PQclear(res);

	result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK")));
	PG_RETURN_TEXT_P(result_text);
}

/*
 * Fetch results from an open cursor
 */
PG_FUNCTION_INFO_V1(dblink_fetch);
Datum
dblink_fetch(PG_FUNCTION_ARGS)
{
	FuncCallContext *funcctx;
	TupleDesc	tupdesc = NULL;
	int			call_cntr;
	int			max_calls;
	TupleTableSlot *slot;
	AttInMetadata *attinmeta;
	char	   *msg;
	PGresult   *res = NULL;
	MemoryContext oldcontext;

	/* stuff done only on the first call of the function */
	if (SRF_IS_FIRSTCALL())
	{
		Oid			functypeid;
		char		functyptype;
		Oid			funcid = fcinfo->flinfo->fn_oid;
		PGconn	   *conn = NULL;
		StringInfo	str = makeStringInfo();
		char	   *curname = GET_STR(PG_GETARG_TEXT_P(0));
		int			howmany = PG_GETARG_INT32(1);

		/* create a function context for cross-call persistence */
		funcctx = SRF_FIRSTCALL_INIT();

		/*
		 * switch to memory context appropriate for multiple function
		 * calls
		 */
		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);

		if (persistent_conn != NULL)
			conn = persistent_conn;
		else
			elog(ERROR, "dblink_fetch: no connection available");

		appendStringInfo(str, "FETCH %d FROM %s", howmany, quote_ident_cstr(curname));

		res = PQexec(conn, str->data);
		if (!res ||
			(PQresultStatus(res) != PGRES_COMMAND_OK &&
			 PQresultStatus(res) != PGRES_TUPLES_OK))
		{
			msg = pstrdup(PQerrorMessage(conn));
			PQclear(res);

			PQfinish(persistent_conn);
			persistent_conn = NULL;

			elog(ERROR, "dblink_fetch: sql error: %s", msg);
		}
		else if (PQresultStatus(res) == PGRES_COMMAND_OK)
		{
			/* cursor does not exist - closed already or bad name */
			PQclear(res);
			elog(ERROR, "dblink_fetch: cursor %s does not exist", quote_ident_cstr(curname));
		}

		funcctx->max_calls = PQntuples(res);

		/* got results, keep track of them */
		funcctx->user_fctx = res;

		/* fast track when no results */
		if (funcctx->max_calls < 1)
			SRF_RETURN_DONE(funcctx);

		/* check typtype to see if we have a predetermined return type */
		functypeid = get_func_rettype(funcid);
		functyptype = get_typtype(functypeid);

		if (functyptype == 'c')
			tupdesc = TypeGetTupleDesc(functypeid, NIL);
		else if (functyptype == 'p' && functypeid == RECORDOID)
			tupdesc = pgresultGetTupleDesc(res);
		else if (functyptype == 'b')
			elog(ERROR, "dblink_fetch: invalid kind of return type specified for function");
		else
			elog(ERROR, "dblink_fetch: unknown kind of return type specified for function");

		/* store needed metadata for subsequent calls */
		slot = TupleDescGetSlot(tupdesc);
		funcctx->slot = slot;
		attinmeta = TupleDescGetAttInMetadata(tupdesc);
		funcctx->attinmeta = attinmeta;

		MemoryContextSwitchTo(oldcontext);
	}

	/* stuff done on every call of the function */
	funcctx = SRF_PERCALL_SETUP();

	/*
	 * initialize per-call variables
	 */
	call_cntr = funcctx->call_cntr;
	max_calls = funcctx->max_calls;

	slot = funcctx->slot;

	res = (PGresult *) funcctx->user_fctx;
	attinmeta = funcctx->attinmeta;
	tupdesc = attinmeta->tupdesc;

	if (call_cntr < max_calls)	/* do when there is more left to send */
	{
		char	  **values;
		HeapTuple	tuple;
		Datum		result;
		int			i;
		int			nfields = PQnfields(res);

		values = (char **) palloc(nfields * sizeof(char *));
		for (i = 0; i < nfields; i++)
		{
			if (PQgetisnull(res, call_cntr, i) == 0)
				values[i] = PQgetvalue(res, call_cntr, i);
			else
				values[i] = NULL;
		}

		/* build the tuple */
		tuple = BuildTupleFromCStrings(attinmeta, values);

		/* make the tuple into a datum */
		result = TupleGetDatum(slot, tuple);

		SRF_RETURN_NEXT(funcctx, result);
	}
	else
/* do when there is no more left */
	{
		PQclear(res);
		SRF_RETURN_DONE(funcctx);
	}
}

/*
 * Note: this is the new preferred version of dblink
 */
PG_FUNCTION_INFO_V1(dblink_record);
Datum
dblink_record(PG_FUNCTION_ARGS)
{
	FuncCallContext *funcctx;
	TupleDesc	tupdesc = NULL;
	int			call_cntr;
	int			max_calls;
	TupleTableSlot *slot;
	AttInMetadata *attinmeta;
	char	   *msg;
	PGresult   *res = NULL;
	bool		is_sql_cmd = false;
	char	   *sql_cmd_status = NULL;
	MemoryContext oldcontext;

	/* stuff done only on the first call of the function */
	if (SRF_IS_FIRSTCALL())
	{
		Oid			functypeid;
		char		functyptype;
		Oid			funcid = fcinfo->flinfo->fn_oid;
		PGconn	   *conn = NULL;
		char	   *connstr = NULL;
		char	   *sql = NULL;

		/* create a function context for cross-call persistence */
		funcctx = SRF_FIRSTCALL_INIT();

		/*
		 * switch to memory context appropriate for multiple function
		 * calls
		 */
		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);

		if (fcinfo->nargs == 2)
		{
			connstr = GET_STR(PG_GETARG_TEXT_P(0));
			sql = GET_STR(PG_GETARG_TEXT_P(1));

			conn = PQconnectdb(connstr);
			if (PQstatus(conn) == CONNECTION_BAD)
			{
				msg = pstrdup(PQerrorMessage(conn));
				PQfinish(conn);
				elog(ERROR, "dblink: connection error: %s", msg);
			}
		}
		else if (fcinfo->nargs == 1)
		{
			sql = GET_STR(PG_GETARG_TEXT_P(0));

			if (persistent_conn != NULL)
				conn = persistent_conn;
			else
				elog(ERROR, "dblink: no connection available");
		}
		else
			elog(ERROR, "dblink: wrong number of arguments");

		res = PQexec(conn, sql);
		if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK))
		{
			msg = pstrdup(PQerrorMessage(conn));
			PQclear(res);
			PQfinish(conn);
			if (fcinfo->nargs == 1)
				persistent_conn = NULL;

			elog(ERROR, "dblink: sql error: %s", msg);
		}
		else
		{
			if (PQresultStatus(res) == PGRES_COMMAND_OK)
			{
				is_sql_cmd = true;

				/* need a tuple descriptor representing one TEXT column */
				tupdesc = CreateTemplateTupleDesc(1, false);
				TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
								   TEXTOID, -1, 0, false);

				/*
				 * and save a copy of the command status string to return
				 * as our result tuple
				 */
				sql_cmd_status = PQcmdStatus(res);
				funcctx->max_calls = 1;
			}
			else
				funcctx->max_calls = PQntuples(res);

			/* got results, keep track of them */
			funcctx->user_fctx = res;

			/* if needed, close the connection to the database and cleanup */
			if (fcinfo->nargs == 2)
				PQfinish(conn);
		}

		/* fast track when no results */
		if (funcctx->max_calls < 1)
			SRF_RETURN_DONE(funcctx);

		/* check typtype to see if we have a predetermined return type */
		functypeid = get_func_rettype(funcid);
		functyptype = get_typtype(functypeid);

		if (!is_sql_cmd)
		{
			if (functyptype == 'c')
				tupdesc = TypeGetTupleDesc(functypeid, NIL);
			else if (functyptype == 'p' && functypeid == RECORDOID)
				tupdesc = pgresultGetTupleDesc(res);
			else if (functyptype == 'b')
				elog(ERROR, "Invalid kind of return type specified for function");
			else
				elog(ERROR, "Unknown kind of return type specified for function");
		}

		/* store needed metadata for subsequent calls */
		slot = TupleDescGetSlot(tupdesc);
		funcctx->slot = slot;
		attinmeta = TupleDescGetAttInMetadata(tupdesc);
		funcctx->attinmeta = attinmeta;

		MemoryContextSwitchTo(oldcontext);
	}

	/* stuff done on every call of the function */
	funcctx = SRF_PERCALL_SETUP();

	/*
	 * initialize per-call variables
	 */
	call_cntr = funcctx->call_cntr;
	max_calls = funcctx->max_calls;

	slot = funcctx->slot;

	res = (PGresult *) funcctx->user_fctx;
	attinmeta = funcctx->attinmeta;
	tupdesc = attinmeta->tupdesc;

	if (call_cntr < max_calls)	/* do when there is more left to send */
	{
		char	  **values;
		HeapTuple	tuple;
		Datum		result;

		if (!is_sql_cmd)
		{
			int			i;
			int			nfields = PQnfields(res);

			values = (char **) palloc(nfields * sizeof(char *));
			for (i = 0; i < nfields; i++)
			{
				if (PQgetisnull(res, call_cntr, i) == 0)
					values[i] = PQgetvalue(res, call_cntr, i);
				else
					values[i] = NULL;
			}
		}
		else
		{
			values = (char **) palloc(1 * sizeof(char *));
			values[0] = sql_cmd_status;
		}

		/* build the tuple */
		tuple = BuildTupleFromCStrings(attinmeta, values);

		/* make the tuple into a datum */
		result = TupleGetDatum(slot, tuple);

		SRF_RETURN_NEXT(funcctx, result);
	}
	else
/* do when there is no more left */
	{
		PQclear(res);
		SRF_RETURN_DONE(funcctx);
	}
}

/*
 * Execute an SQL non-SELECT command
 */
PG_FUNCTION_INFO_V1(dblink_exec);
Datum
dblink_exec(PG_FUNCTION_ARGS)
{
	char	   *msg;
	PGresult   *res = NULL;
	char	   *sql_cmd_status = NULL;
	TupleDesc	tupdesc = NULL;
	text	   *result_text;
	PGconn	   *conn = NULL;
	char	   *connstr = NULL;
	char	   *sql = NULL;

	if (fcinfo->nargs == 2)
	{
		connstr = GET_STR(PG_GETARG_TEXT_P(0));
		sql = GET_STR(PG_GETARG_TEXT_P(1));

		conn = PQconnectdb(connstr);
		if (PQstatus(conn) == CONNECTION_BAD)
		{
			msg = pstrdup(PQerrorMessage(conn));
			PQfinish(conn);
			elog(ERROR, "dblink_exec: connection error: %s", msg);
		}
	}
	else if (fcinfo->nargs == 1)
	{
		sql = GET_STR(PG_GETARG_TEXT_P(0));

		if (persistent_conn != NULL)
			conn = persistent_conn;
		else
			elog(ERROR, "dblink_exec: no connection available");
	}
	else
		elog(ERROR, "dblink_exec: wrong number of arguments");


	res = PQexec(conn, sql);
	if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK))
	{
		msg = pstrdup(PQerrorMessage(conn));
		PQclear(res);
		PQfinish(conn);
		if (fcinfo->nargs == 1)
			persistent_conn = NULL;

		elog(ERROR, "dblink_exec: sql error: %s", msg);
	}
	else
	{
		if (PQresultStatus(res) == PGRES_COMMAND_OK)
		{
			/* need a tuple descriptor representing one TEXT column */
			tupdesc = CreateTemplateTupleDesc(1, false);
			TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
							   TEXTOID, -1, 0, false);

			/*
			 * and save a copy of the command status string to return as
			 * our result tuple
			 */
			sql_cmd_status = PQcmdStatus(res);
		}
		else
			elog(ERROR, "dblink_exec: queries returning results not allowed");
	}
	PQclear(res);

	/* if needed, close the connection to the database and cleanup */
	if (fcinfo->nargs == 2)
		PQfinish(conn);

	result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql_cmd_status)));
	PG_RETURN_TEXT_P(result_text);
}

/*
 * Note: this original version of dblink is DEPRECATED;
 * it *will* be removed in favor of the new version on next release
 */
PG_FUNCTION_INFO_V1(dblink);
Datum
dblink(PG_FUNCTION_ARGS)
{
	PGconn	   *conn = NULL;
	PGresult   *res = NULL;
	dblink_results *results;
	char	   *optstr;
	char	   *sqlstatement;
	char	   *execstatement;
	char	   *msg;
	int			ntuples = 0;
	ReturnSetInfo *rsi;

	if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo))
		elog(ERROR, "dblink: function called in context that does not accept a set result");

	optstr = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(0))));
	sqlstatement = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(1))));

	if (fcinfo->flinfo->fn_extra == NULL)
	{

		conn = PQconnectdb(optstr);
		if (PQstatus(conn) == CONNECTION_BAD)
		{
			msg = pstrdup(PQerrorMessage(conn));
			PQfinish(conn);
			elog(ERROR, "dblink: connection error: %s", msg);
		}

		execstatement = (char *) palloc(strlen(sqlstatement) + 1);
		if (execstatement != NULL)
		{
			strcpy(execstatement, sqlstatement);
			strcat(execstatement, "\0");
		}
		else
			elog(ERROR, "dblink: insufficient memory");

		res = PQexec(conn, execstatement);
		if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK))
		{
			msg = pstrdup(PQerrorMessage(conn));
			PQclear(res);
			PQfinish(conn);
			elog(ERROR, "dblink: sql error: %s", msg);
		}
		else
		{
			/*
			 * got results, start fetching them
			 */
			ntuples = PQntuples(res);

			/*
			 * increment resource index
			 */
			res_id_index++;

			results = init_dblink_results(fcinfo->flinfo->fn_mcxt);
			results->tup_num = 0;
			results->res_id_index = res_id_index;
			results->res = res;

			/*
			 * Append node to res_id to hold pointer to results. Needed by
			 * dblink_tok to access the data
			 */
			append_res_ptr(results);

			/*
			 * save pointer to results for the next function manager call
			 */
			fcinfo->flinfo->fn_extra = (void *) results;

			/* close the connection to the database and cleanup */
			PQfinish(conn);

			rsi = (ReturnSetInfo *) fcinfo->resultinfo;
			rsi->isDone = ExprMultipleResult;

			PG_RETURN_INT32(res_id_index);
		}
	}
	else
	{
		/*
		 * check for more results
		 */
		results = fcinfo->flinfo->fn_extra;

		results->tup_num++;
		res_id_index = results->res_id_index;
		ntuples = PQntuples(results->res);

		if (results->tup_num < ntuples)
		{
			/*
			 * fetch them if available
			 */

			rsi = (ReturnSetInfo *) fcinfo->resultinfo;
			rsi->isDone = ExprMultipleResult;

			PG_RETURN_INT32(res_id_index);
		}
		else
		{
			/*
			 * or if no more, clean things up
			 */
			results = fcinfo->flinfo->fn_extra;

			remove_res_ptr(results);
			PQclear(results->res);
			pfree(results);
			fcinfo->flinfo->fn_extra = NULL;

			rsi = (ReturnSetInfo *) fcinfo->resultinfo;
			rsi->isDone = ExprEndResult;

			PG_RETURN_NULL();
		}
	}
	PG_RETURN_NULL();
}

/*
 * Note: dblink_tok is DEPRECATED;
 * it *will* be removed in favor of the new version on next release
 *
 * dblink_tok
 * parse dblink output string
 * return fldnum item (0 based)
 * based on provided field separator
 */
PG_FUNCTION_INFO_V1(dblink_tok);
Datum
dblink_tok(PG_FUNCTION_ARGS)
{
	dblink_results *results;
	int			fldnum;
	text	   *result_text;
	char	   *result;
	int			nfields = 0;
	int			text_len = 0;

	results = get_res_ptr(PG_GETARG_INT32(0));
	if (results == NULL)
	{
		if (res_id != NIL)
		{
			freeList(res_id);
			res_id = NIL;
			res_id_index = 0;
		}

		elog(ERROR, "dblink_tok: function called with invalid resource id");
	}

	fldnum = PG_GETARG_INT32(1);
	if (fldnum < 0)
		elog(ERROR, "dblink_tok: field number < 0 not permitted");

	nfields = PQnfields(results->res);
	if (fldnum > (nfields - 1))
		elog(ERROR, "dblink_tok: field number %d does not exist", fldnum);

	if (PQgetisnull(results->res, results->tup_num, fldnum) == 1)
		PG_RETURN_NULL();
	else
	{
		text_len = PQgetlength(results->res, results->tup_num, fldnum);

		result = (char *) palloc(text_len + 1);

		if (result != NULL)
		{
			strcpy(result, PQgetvalue(results->res, results->tup_num, fldnum));
			strcat(result, "\0");
		}
		else
			elog(ERROR, "dblink: insufficient memory");

		result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result)));

		PG_RETURN_TEXT_P(result_text);
	}
}

/*
 * dblink_get_pkey
 *
 * Return list of primary key fields for the supplied relation,
 * or NULL if none exists.
 */
PG_FUNCTION_INFO_V1(dblink_get_pkey);
Datum
dblink_get_pkey(PG_FUNCTION_ARGS)
{
	int16		numatts;
	Oid			relid;
	char	  **results;
	FuncCallContext *funcctx;
	int32		call_cntr;
	int32		max_calls;
	TupleTableSlot *slot;
	AttInMetadata *attinmeta;
	MemoryContext oldcontext;

	/* stuff done only on the first call of the function */
	if (SRF_IS_FIRSTCALL())
	{
		TupleDesc	tupdesc = NULL;

		/* create a function context for cross-call persistence */
		funcctx = SRF_FIRSTCALL_INIT();

		/*
		 * switch to memory context appropriate for multiple function
		 * calls
		 */
		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);

		/* convert relname to rel Oid */
		relid = get_relid_from_relname(PG_GETARG_TEXT_P(0));
		if (!OidIsValid(relid))
			elog(ERROR, "dblink_get_pkey: relation does not exist");

		/*
		 * need a tuple descriptor representing one INT and one TEXT
		 * column
		 */
		tupdesc = CreateTemplateTupleDesc(2, false);
		TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
						   INT4OID, -1, 0, false);
		TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
						   TEXTOID, -1, 0, false);

		/* allocate a slot for a tuple with this tupdesc */
		slot = TupleDescGetSlot(tupdesc);

		/* assign slot to function context */
		funcctx->slot = slot;

		/*
		 * Generate attribute metadata needed later to produce tuples from
		 * raw C strings
		 */
		attinmeta = TupleDescGetAttInMetadata(tupdesc);
		funcctx->attinmeta = attinmeta;

		/* get an array of attnums */
		results = get_pkey_attnames(relid, &numatts);

		if ((results != NULL) && (numatts > 0))
		{
			funcctx->max_calls = numatts;

			/* got results, keep track of them */
			funcctx->user_fctx = results;
		}
		else
/* fast track when no results */
			SRF_RETURN_DONE(funcctx);

		MemoryContextSwitchTo(oldcontext);
	}

	/* stuff done on every call of the function */
	funcctx = SRF_PERCALL_SETUP();

	/*
	 * initialize per-call variables
	 */
	call_cntr = funcctx->call_cntr;
	max_calls = funcctx->max_calls;

	slot = funcctx->slot;

	results = (char **) funcctx->user_fctx;
	attinmeta = funcctx->attinmeta;

	if (call_cntr < max_calls)	/* do when there is more left to send */
	{
		char	  **values;
		HeapTuple	tuple;
		Datum		result;

		values = (char **) palloc(2 * sizeof(char *));
		values[0] = (char *) palloc(12);		/* sign, 10 digits, '\0' */

		sprintf(values[0], "%d", call_cntr + 1);

		values[1] = results[call_cntr];

		/* build the tuple */
		tuple = BuildTupleFromCStrings(attinmeta, values);

		/* make the tuple into a datum */
		result = TupleGetDatum(slot, tuple);

		SRF_RETURN_NEXT(funcctx, result);
	}
	else
/* do when there is no more left */
		SRF_RETURN_DONE(funcctx);
}

/*
 * Note: dblink_last_oid is DEPRECATED;
 * it *will* be removed on next release
 *
 * dblink_last_oid
 * return last inserted oid
 */
PG_FUNCTION_INFO_V1(dblink_last_oid);
Datum
dblink_last_oid(PG_FUNCTION_ARGS)
{
	dblink_results *results;

	results = get_res_ptr(PG_GETARG_INT32(0));
	if (results == NULL)
	{
		if (res_id != NIL)
		{
			freeList(res_id);
			res_id = NIL;
			res_id_index = 0;
		}

		elog(ERROR, "dblink_tok: function called with invalid resource id");
	}

	PG_RETURN_OID(PQoidValue(results->res));
}


/*
 * dblink_build_sql_insert
 *
 * Used to generate an SQL insert statement
 * based on an existing tuple in a local relation.
 * This is useful for selectively replicating data
 * to another server via dblink.
 *
 * API:
 * <relname> - name of local table of interest
 * <pkattnums> - an int2vector of attnums which will be used
 * to identify the local tuple of interest
 * <pknumatts> - number of attnums in pkattnums
 * <src_pkattvals_arry> - text array of key values which will be used
 * to identify the local tuple of interest
 * <tgt_pkattvals_arry> - text array of key values which will be used
 * to build the string for execution remotely. These are substituted
 * for their counterparts in src_pkattvals_arry
 */
PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
Datum
dblink_build_sql_insert(PG_FUNCTION_ARGS)
{
	Oid			relid;
	text	   *relname_text;
	int16	   *pkattnums;
	int16		pknumatts;
	char	  **src_pkattvals;
	char	  **tgt_pkattvals;
	ArrayType  *src_pkattvals_arry;
	ArrayType  *tgt_pkattvals_arry;
	int			src_ndim;
	int		   *src_dim;
	int			src_nitems;
	int			tgt_ndim;
	int		   *tgt_dim;
	int			tgt_nitems;
	int			i;
	char	   *ptr;
	char	   *sql;
	text	   *sql_text;
	int16		typlen;
	bool		typbyval;
	char		typalign;

	relname_text = PG_GETARG_TEXT_P(0);

	/*
	 * Convert relname to rel OID.
	 */
	relid = get_relid_from_relname(relname_text);
	if (!OidIsValid(relid))
		elog(ERROR, "dblink_build_sql_insert: relation does not exist");

	pkattnums = (int16 *) PG_GETARG_POINTER(1);
	pknumatts = PG_GETARG_INT16(2);

	/*
	 * There should be at least one key attribute
	 */
	if (pknumatts == 0)
		elog(ERROR, "dblink_build_sql_insert: number of key attributes must be > 0.");

	src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
	tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);

	/*
	 * Source array is made up of key values that will be used to locate
	 * the tuple of interest from the local system.
	 */
	src_ndim = ARR_NDIM(src_pkattvals_arry);
	src_dim = ARR_DIMS(src_pkattvals_arry);
	src_nitems = ArrayGetNItems(src_ndim, src_dim);

	/*
	 * There should be one source array key value for each key attnum
	 */
	if (src_nitems != pknumatts)
		elog(ERROR, "dblink_build_sql_insert: source key array length does not match number of key attributes.");

	/*
	 * get array of pointers to c-strings from the input source array
	 */
	Assert(ARR_ELEMTYPE(src_pkattvals_arry) == TEXTOID);
	get_typlenbyvalalign(ARR_ELEMTYPE(src_pkattvals_arry),
						 &typlen, &typbyval, &typalign);

	src_pkattvals = (char **) palloc(src_nitems * sizeof(char *));
	ptr = ARR_DATA_PTR(src_pkattvals_arry);
	for (i = 0; i < src_nitems; i++)
	{
		src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
		ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr));
		ptr = (char *) att_align(ptr, typalign);
	}

	/*
	 * Target array is made up of key values that will be used to build
	 * the SQL string for use on the remote system.
	 */
	tgt_ndim = ARR_NDIM(tgt_pkattvals_arry);
	tgt_dim = ARR_DIMS(tgt_pkattvals_arry);
	tgt_nitems = ArrayGetNItems(tgt_ndim, tgt_dim);

	/*
	 * There should be one target array key value for each key attnum
	 */
	if (tgt_nitems != pknumatts)
		elog(ERROR, "dblink_build_sql_insert: target key array length does not match number of key attributes.");

	/*
	 * get array of pointers to c-strings from the input target array
	 */
	Assert(ARR_ELEMTYPE(tgt_pkattvals_arry) == TEXTOID);
	get_typlenbyvalalign(ARR_ELEMTYPE(tgt_pkattvals_arry),
						 &typlen, &typbyval, &typalign);

	tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *));
	ptr = ARR_DATA_PTR(tgt_pkattvals_arry);
	for (i = 0; i < tgt_nitems; i++)
	{
		tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
		ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr));
		ptr = (char *) att_align(ptr, typalign);
	}

	/*
	 * Prep work is finally done. Go get the SQL string.
	 */
	sql = get_sql_insert(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);

	/*
	 * Make it into TEXT for return to the client
	 */
	sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql)));

	/*
	 * And send it
	 */
	PG_RETURN_TEXT_P(sql_text);
}


/*
 * dblink_build_sql_delete
 *
 * Used to generate an SQL delete statement.
 * This is useful for selectively replicating a
 * delete to another server via dblink.
 *
 * API:
 * <relname> - name of remote table of interest
 * <pkattnums> - an int2vector of attnums which will be used
 * to identify the remote tuple of interest
 * <pknumatts> - number of attnums in pkattnums
 * <tgt_pkattvals_arry> - text array of key values which will be used
 * to build the string for execution remotely.
 */
PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
Datum
dblink_build_sql_delete(PG_FUNCTION_ARGS)
{
	Oid			relid;
	text	   *relname_text;
	int16	   *pkattnums;
	int16		pknumatts;
	char	  **tgt_pkattvals;
	ArrayType  *tgt_pkattvals_arry;
	int			tgt_ndim;
	int		   *tgt_dim;
	int			tgt_nitems;
	int			i;
	char	   *ptr;
	char	   *sql;
	text	   *sql_text;
	int16		typlen;
	bool		typbyval;
	char		typalign;

	relname_text = PG_GETARG_TEXT_P(0);

	/*
	 * Convert relname to rel OID.
	 */
	relid = get_relid_from_relname(relname_text);
	if (!OidIsValid(relid))
		elog(ERROR, "dblink_build_sql_delete: relation does not exist");

	pkattnums = (int16 *) PG_GETARG_POINTER(1);
	pknumatts = PG_GETARG_INT16(2);

	/*
	 * There should be at least one key attribute
	 */
	if (pknumatts == 0)
		elog(ERROR, "dblink_build_sql_insert: number of key attributes must be > 0.");

	tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);

	/*
	 * Target array is made up of key values that will be used to build
	 * the SQL string for use on the remote system.
	 */
	tgt_ndim = ARR_NDIM(tgt_pkattvals_arry);
	tgt_dim = ARR_DIMS(tgt_pkattvals_arry);
	tgt_nitems = ArrayGetNItems(tgt_ndim, tgt_dim);

	/*
	 * There should be one target array key value for each key attnum
	 */
	if (tgt_nitems != pknumatts)
		elog(ERROR, "dblink_build_sql_insert: target key array length does not match number of key attributes.");

	/*
	 * get array of pointers to c-strings from the input target array
	 */
	Assert(ARR_ELEMTYPE(tgt_pkattvals_arry) == TEXTOID);
	get_typlenbyvalalign(ARR_ELEMTYPE(tgt_pkattvals_arry),
						 &typlen, &typbyval, &typalign);

	tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *));
	ptr = ARR_DATA_PTR(tgt_pkattvals_arry);
	for (i = 0; i < tgt_nitems; i++)
	{
		tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
		ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr));
		ptr = (char *) att_align(ptr, typalign);
	}

	/*
	 * Prep work is finally done. Go get the SQL string.
	 */
	sql = get_sql_delete(relid, pkattnums, pknumatts, tgt_pkattvals);

	/*
	 * Make it into TEXT for return to the client
	 */
	sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql)));

	/*
	 * And send it
	 */
	PG_RETURN_TEXT_P(sql_text);
}


/*
 * dblink_build_sql_update
 *
 * Used to generate an SQL update statement
 * based on an existing tuple in a local relation.
 * This is useful for selectively replicating data
 * to another server via dblink.
 *
 * API:
 * <relname> - name of local table of interest
 * <pkattnums> - an int2vector of attnums which will be used
 * to identify the local tuple of interest
 * <pknumatts> - number of attnums in pkattnums
 * <src_pkattvals_arry> - text array of key values which will be used
 * to identify the local tuple of interest
 * <tgt_pkattvals_arry> - text array of key values which will be used
 * to build the string for execution remotely. These are substituted
 * for their counterparts in src_pkattvals_arry
 */
PG_FUNCTION_INFO_V1(dblink_build_sql_update);
Datum
dblink_build_sql_update(PG_FUNCTION_ARGS)
{
	Oid			relid;
	text	   *relname_text;
	int16	   *pkattnums;
	int16		pknumatts;
	char	  **src_pkattvals;
	char	  **tgt_pkattvals;
	ArrayType  *src_pkattvals_arry;
	ArrayType  *tgt_pkattvals_arry;
	int			src_ndim;
	int		   *src_dim;
	int			src_nitems;
	int			tgt_ndim;
	int		   *tgt_dim;
	int			tgt_nitems;
	int			i;
	char	   *ptr;
	char	   *sql;
	text	   *sql_text;
	int16		typlen;
	bool		typbyval;
	char		typalign;

	relname_text = PG_GETARG_TEXT_P(0);

	/*
	 * Convert relname to rel OID.
	 */
	relid = get_relid_from_relname(relname_text);
	if (!OidIsValid(relid))
		elog(ERROR, "dblink_build_sql_update: relation does not exist");

	pkattnums = (int16 *) PG_GETARG_POINTER(1);
	pknumatts = PG_GETARG_INT16(2);

	/*
	 * There should be one source array key values for each key attnum
	 */
	if (pknumatts == 0)
		elog(ERROR, "dblink_build_sql_insert: number of key attributes must be > 0.");

	src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
	tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);

	/*
	 * Source array is made up of key values that will be used to locate
	 * the tuple of interest from the local system.
	 */
	src_ndim = ARR_NDIM(src_pkattvals_arry);
	src_dim = ARR_DIMS(src_pkattvals_arry);
	src_nitems = ArrayGetNItems(src_ndim, src_dim);

	/*
	 * There should be one source array key value for each key attnum
	 */
	if (src_nitems != pknumatts)
		elog(ERROR, "dblink_build_sql_insert: source key array length does not match number of key attributes.");

	/*
	 * get array of pointers to c-strings from the input source array
	 */
	Assert(ARR_ELEMTYPE(src_pkattvals_arry) == TEXTOID);
	get_typlenbyvalalign(ARR_ELEMTYPE(src_pkattvals_arry),
						 &typlen, &typbyval, &typalign);

	src_pkattvals = (char **) palloc(src_nitems * sizeof(char *));
	ptr = ARR_DATA_PTR(src_pkattvals_arry);
	for (i = 0; i < src_nitems; i++)
	{
		src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
		ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr));
		ptr = (char *) att_align(ptr, typalign);
	}

	/*
	 * Target array is made up of key values that will be used to build
	 * the SQL string for use on the remote system.
	 */
	tgt_ndim = ARR_NDIM(tgt_pkattvals_arry);
	tgt_dim = ARR_DIMS(tgt_pkattvals_arry);
	tgt_nitems = ArrayGetNItems(tgt_ndim, tgt_dim);

	/*
	 * There should be one target array key value for each key attnum
	 */
	if (tgt_nitems != pknumatts)
		elog(ERROR, "dblink_build_sql_insert: target key array length does not match number of key attributes.");

	/*
	 * get array of pointers to c-strings from the input target array
	 */
	Assert(ARR_ELEMTYPE(tgt_pkattvals_arry) == TEXTOID);
	get_typlenbyvalalign(ARR_ELEMTYPE(tgt_pkattvals_arry),
						 &typlen, &typbyval, &typalign);

	tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *));
	ptr = ARR_DATA_PTR(tgt_pkattvals_arry);
	for (i = 0; i < tgt_nitems; i++)
	{
		tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
		ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr));
		ptr = (char *) att_align(ptr, typalign);
	}

	/*
	 * Prep work is finally done. Go get the SQL string.
	 */
	sql = get_sql_update(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);

	/*
	 * Make it into TEXT for return to the client
	 */
	sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql)));

	/*
	 * And send it
	 */
	PG_RETURN_TEXT_P(sql_text);
}

/*
 * dblink_current_query
 * return the current query string
 * to allow its use in (among other things)
 * rewrite rules
 */
PG_FUNCTION_INFO_V1(dblink_current_query);
Datum
dblink_current_query(PG_FUNCTION_ARGS)
{
	text	   *result_text;

	result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(debug_query_string)));
	PG_RETURN_TEXT_P(result_text);
}


/*************************************************************
 * internal functions
 */


/*
 * init_dblink_results
 *	 - create an empty dblink_results data structure
 */
static dblink_results *
init_dblink_results(MemoryContext fn_mcxt)
{
	MemoryContext oldcontext;
	dblink_results *retval;

	oldcontext = MemoryContextSwitchTo(fn_mcxt);

	retval = (dblink_results *) palloc(sizeof(dblink_results));
	MemSet(retval, 0, sizeof(dblink_results));

	retval->tup_num = -1;
	retval->res_id_index = -1;
	retval->res = NULL;

	MemoryContextSwitchTo(oldcontext);

	return retval;
}

/*
 * get_pkey_attnames
 *
 * Get the primary key attnames for the given relation.
 * Return NULL, and set numatts = 0, if no primary key exists.
 */
static char **
get_pkey_attnames(Oid relid, int16 *numatts)
{
	Relation	indexRelation;
	ScanKeyData entry;
	HeapScanDesc scan;
	HeapTuple	indexTuple;
	int			i;
	char	  **result = NULL;
	Relation	rel;
	TupleDesc	tupdesc;

	/* open relation using relid, get tupdesc */
	rel = relation_open(relid, AccessShareLock);
	tupdesc = rel->rd_att;

	/* initialize numatts to 0 in case no primary key exists */
	*numatts = 0;

	/* use relid to get all related indexes */
	indexRelation = heap_openr(IndexRelationName, AccessShareLock);
	ScanKeyEntryInitialize(&entry, 0, Anum_pg_index_indrelid,
						   F_OIDEQ, ObjectIdGetDatum(relid));
	scan = heap_beginscan(indexRelation, SnapshotNow, 1, &entry);

	while ((indexTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
	{
		Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple);

		/* we're only interested if it is the primary key */
		if (index->indisprimary == TRUE)
		{
			i = 0;
			while (index->indkey[i++] != 0)
				(*numatts)++;

			if (*numatts > 0)
			{
				result = (char **) palloc(*numatts * sizeof(char *));

				for (i = 0; i < *numatts; i++)
					result[i] = SPI_fname(tupdesc, index->indkey[i]);
			}
			break;
		}
	}
	heap_endscan(scan);
	heap_close(indexRelation, AccessShareLock);
	relation_close(rel, AccessShareLock);

	return result;
}

static char *
get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
{
	Relation	rel;
	char	   *relname;
	HeapTuple	tuple;
	TupleDesc	tupdesc;
	int			natts;
	StringInfo	str = makeStringInfo();
	char	   *sql;
	char	   *val;
	int16		key;
	int			i;
	bool		needComma;

	/*
	 * Open relation using relid
	 */
	rel = relation_open(relid, AccessShareLock);
	relname = RelationGetRelationName(rel);
	tupdesc = rel->rd_att;
	natts = tupdesc->natts;

	tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
	if (!tuple)
		elog(ERROR, "dblink_build_sql_insert: row not found");

	appendStringInfo(str, "INSERT INTO %s(", quote_ident_cstr(relname));

	needComma = false;
	for (i = 0; i < natts; i++)
	{
		if (tupdesc->attrs[i]->attisdropped)
			continue;

		if (needComma)
			appendStringInfo(str, ",");

		appendStringInfo(str, "%s",
				  quote_ident_cstr(NameStr(tupdesc->attrs[i]->attname)));
		needComma = true;
	}

	appendStringInfo(str, ") VALUES(");

	/*
	 * remember attvals are 1 based
	 */
	needComma = false;
	for (i = 0; i < natts; i++)
	{
		if (tupdesc->attrs[i]->attisdropped)
			continue;

		if (needComma)
			appendStringInfo(str, ",");

		if (tgt_pkattvals != NULL)
			key = get_attnum_pk_pos(pkattnums, pknumatts, i + 1);
		else
			key = -1;

		if (key > -1)
			val = pstrdup(tgt_pkattvals[key]);
		else
			val = SPI_getvalue(tuple, tupdesc, i + 1);

		if (val != NULL)
		{
			appendStringInfo(str, "%s", quote_literal_cstr(val));
			pfree(val);
		}
		else
			appendStringInfo(str, "NULL");
		needComma = true;
	}
	appendStringInfo(str, ")");

	sql = pstrdup(str->data);
	pfree(str->data);
	pfree(str);
	relation_close(rel, AccessShareLock);

	return (sql);
}

static char *
get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals)
{
	Relation	rel;
	char	   *relname;
	TupleDesc	tupdesc;
	int			natts;
	StringInfo	str = makeStringInfo();
	char	   *sql;
	char	   *val;
	int			i;

	/*
	 * Open relation using relid
	 */
	rel = relation_open(relid, AccessShareLock);
	relname = RelationGetRelationName(rel);
	tupdesc = rel->rd_att;
	natts = tupdesc->natts;

	appendStringInfo(str, "DELETE FROM %s WHERE ", quote_ident_cstr(relname));
	for (i = 0; i < pknumatts; i++)
	{
		int16		pkattnum = pkattnums[i];

		if (i > 0)
			appendStringInfo(str, " AND ");

		appendStringInfo(str, "%s",
		quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum - 1]->attname)));

		if (tgt_pkattvals != NULL)
			val = pstrdup(tgt_pkattvals[i]);
		else
		{
			elog(ERROR, "Target key array must not be NULL");
			val = NULL;			/* keep compiler quiet */
		}

		if (val != NULL)
		{
			appendStringInfo(str, " = %s", quote_literal_cstr(val));
			pfree(val);
		}
		else
			appendStringInfo(str, " IS NULL");
	}

	sql = pstrdup(str->data);
	pfree(str->data);
	pfree(str);
	relation_close(rel, AccessShareLock);

	return (sql);
}

static char *
get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
{
	Relation	rel;
	char	   *relname;
	HeapTuple	tuple;
	TupleDesc	tupdesc;
	int			natts;
	StringInfo	str = makeStringInfo();
	char	   *sql;
	char	   *val;
	int16		key;
	int			i;
	bool		needComma;

	/*
	 * Open relation using relid
	 */
	rel = relation_open(relid, AccessShareLock);
	relname = RelationGetRelationName(rel);
	tupdesc = rel->rd_att;
	natts = tupdesc->natts;

	tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
	if (!tuple)
		elog(ERROR, "dblink_build_sql_update: row not found");

	appendStringInfo(str, "UPDATE %s SET ", quote_ident_cstr(relname));

	needComma = false;
	for (i = 0; i < natts; i++)
	{
		if (tupdesc->attrs[i]->attisdropped)
			continue;

		if (needComma)
			appendStringInfo(str, ", ");

		appendStringInfo(str, "%s = ",
				  quote_ident_cstr(NameStr(tupdesc->attrs[i]->attname)));

		if (tgt_pkattvals != NULL)
			key = get_attnum_pk_pos(pkattnums, pknumatts, i + 1);
		else
			key = -1;

		if (key > -1)
			val = pstrdup(tgt_pkattvals[key]);
		else
			val = SPI_getvalue(tuple, tupdesc, i + 1);

		if (val != NULL)
		{
			appendStringInfo(str, "%s", quote_literal_cstr(val));
			pfree(val);
		}
		else
			appendStringInfo(str, "NULL");
		needComma = true;
	}

	appendStringInfo(str, " WHERE ");

	for (i = 0; i < pknumatts; i++)
	{
		int16		pkattnum = pkattnums[i];

		if (i > 0)
			appendStringInfo(str, " AND ");

		appendStringInfo(str, "%s",
		quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum - 1]->attname)));

		if (tgt_pkattvals != NULL)
			val = pstrdup(tgt_pkattvals[i]);
		else
			val = SPI_getvalue(tuple, tupdesc, pkattnum);

		if (val != NULL)
		{
			appendStringInfo(str, " = %s", quote_literal_cstr(val));
			pfree(val);
		}
		else
			appendStringInfo(str, " IS NULL");
	}

	sql = pstrdup(str->data);
	pfree(str->data);
	pfree(str);
	relation_close(rel, AccessShareLock);

	return (sql);
}

/*
 * Return a properly quoted literal value.
 * Uses quote_literal in quote.c
 */
static char *
quote_literal_cstr(char *rawstr)
{
	text	   *rawstr_text;
	text	   *result_text;
	char	   *result;

	rawstr_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(rawstr)));
	result_text = DatumGetTextP(DirectFunctionCall1(quote_literal, PointerGetDatum(rawstr_text)));
	result = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(result_text)));

	return result;
}

/*
 * Return a properly quoted identifier.
 * Uses quote_ident in quote.c
 */
static char *
quote_ident_cstr(char *rawstr)
{
	text	   *rawstr_text;
	text	   *result_text;
	char	   *result;

	rawstr_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(rawstr)));
	result_text = DatumGetTextP(DirectFunctionCall1(quote_ident, PointerGetDatum(rawstr_text)));
	result = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(result_text)));

	return result;
}

static int16
get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key)
{
	int			i;

	/*
	 * Not likely a long list anyway, so just scan for the value
	 */
	for (i = 0; i < pknumatts; i++)
		if (key == pkattnums[i])
			return i;

	return -1;
}

static HeapTuple
get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals)
{
	Relation	rel;
	char	   *relname;
	TupleDesc	tupdesc;
	StringInfo	str = makeStringInfo();
	char	   *sql = NULL;
	int			ret;
	HeapTuple	tuple;
	int			i;
	char	   *val = NULL;

	/*
	 * Open relation using relid
	 */
	rel = relation_open(relid, AccessShareLock);
	relname = RelationGetRelationName(rel);
	tupdesc = CreateTupleDescCopy(rel->rd_att);
	relation_close(rel, AccessShareLock);

	/*
	 * Connect to SPI manager
	 */
	if ((ret = SPI_connect()) < 0)
		elog(ERROR, "get_tuple_of_interest: SPI_connect returned %d", ret);

	/*
	 * Build sql statement to look up tuple of interest Use src_pkattvals
	 * as the criteria.
	 */
	appendStringInfo(str, "SELECT * FROM %s WHERE ", quote_ident_cstr(relname));

	for (i = 0; i < pknumatts; i++)
	{
		int16		pkattnum = pkattnums[i];

		if (i > 0)
			appendStringInfo(str, " AND ");

		appendStringInfo(str, "%s",
		quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum - 1]->attname)));

		val = pstrdup(src_pkattvals[i]);
		if (val != NULL)
		{
			appendStringInfo(str, " = %s", quote_literal_cstr(val));
			pfree(val);
		}
		else
			appendStringInfo(str, " IS NULL");
	}

	sql = pstrdup(str->data);
	pfree(str->data);
	pfree(str);

	/*
	 * Retrieve the desired tuple
	 */
	ret = SPI_exec(sql, 0);
	pfree(sql);

	/*
	 * Only allow one qualifying tuple
	 */
	if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
		elog(ERROR, "get_tuple_of_interest: Source criteria may not match more than one record.");
	else if (ret == SPI_OK_SELECT && SPI_processed == 1)
	{
		SPITupleTable *tuptable = SPI_tuptable;

		tuple = SPI_copytuple(tuptable->vals[0]);

		return tuple;
	}
	else
	{
		/*
		 * no qualifying tuples
		 */
		return NULL;
	}

	/*
	 * never reached, but keep compiler quiet
	 */
	return NULL;
}

static Oid
get_relid_from_relname(text *relname_text)
{
	RangeVar   *relvar;
	Relation	rel;
	Oid			relid;

	relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text, "get_relid_from_relname"));
	rel = heap_openrv(relvar, AccessShareLock);
	relid = RelationGetRelid(rel);
	relation_close(rel, AccessShareLock);

	return relid;
}

static dblink_results *
get_res_ptr(int32 res_id_index)
{
	List	   *ptr;

	/*
	 * short circuit empty list
	 */
	if (res_id == NIL)
		return NULL;

	/*
	 * OK, should be good to go
	 */
	foreach(ptr, res_id)
	{
		dblink_results *this_res_id = (dblink_results *) lfirst(ptr);

		if (this_res_id->res_id_index == res_id_index)
			return this_res_id;
	}
	return NULL;
}

/*
 * Add node to global List res_id
 */
static void
append_res_ptr(dblink_results * results)
{
	//res_id = lappend(res_id, results);
	res_id = lcons(results,res_id);
}

/*
 * Remove node from global List
 * using res_id_index
 */
static void
remove_res_ptr(dblink_results * results)
{
	res_id = lremove(results, res_id);

	if (res_id == NIL)
		res_id_index = 0;
}

static TupleDesc
pgresultGetTupleDesc(PGresult *res)
{
	int			natts;
	AttrNumber	attnum;
	TupleDesc	desc;
	char	   *attname;
	int32		atttypmod;
	int			attdim;
	bool		attisset;
	Oid			atttypid;
	int			i;

	/*
	 * allocate a new tuple descriptor
	 */
	natts = PQnfields(res);
	if (natts < 1)
		elog(ERROR, "cannot create a description for empty results");

	desc = CreateTemplateTupleDesc(natts, false);

	attnum = 0;

	for (i = 0; i < natts; i++)
	{
		/*
		 * for each field, get the name and type information from the
		 * query result and have TupleDescInitEntry fill in the attribute
		 * information we need.
		 */
		attnum++;

		attname = PQfname(res, i);
		atttypid = PQftype(res, i);
		atttypmod = PQfmod(res, i);

		if (PQfsize(res, i) != get_typlen(atttypid))
			elog(ERROR, "Size of remote field \"%s\" does not match size "
				 "of local type \"%s\"",
				 attname,
				 format_type_with_typemod(atttypid, atttypmod));

		attdim = 0;
		attisset = false;

		TupleDescInitEntry(desc, attnum, attname, atttypid,
						   atttypmod, attdim, attisset);
	}

	return desc;
}


/*
 * init_public_connections
 */
static public_pq_connections *
init_public_connections()
{
	MemoryContext oldcontext;
	public_pq_connections *retval;
	oldcontext = MemoryContextSwitchTo(TopMemoryContext);

	retval = (public_pq_connections *) palloc(sizeof(public_pq_connections));
	MemSet(retval, 0, sizeof(public_pq_connections));

	retval->conn_id_index = -1;
	retval->conn = NULL;

	MemoryContextSwitchTo(oldcontext);

	return retval;
}

/*
 * init_dblink_results
 */
static public_pq_results *
init_public_results()
{
	MemoryContext oldcontext;
	public_pq_results *retval;
	oldcontext = MemoryContextSwitchTo(TopMemoryContext);

	retval = (public_pq_results *) palloc(sizeof(public_pq_results));
	MemSet(retval, 0, sizeof(public_pq_results));

	retval->res_id_index = -1;
	retval->res = NULL;

	MemoryContextSwitchTo(oldcontext);

	return retval;
}

/*
 * get_public_conn
 */
static public_pq_connections *
get_public_conn(int32 public_conn_id_index)
{
	List	   *ptr;
	MemoryContext oldcontext;
	public_pq_connections *this_public_conn_id;
	oldcontext = MemoryContextSwitchTo(TopMemoryContext);

	/*
	 * short circuit empty list
	 */
	if (public_conn_id== NIL){
		MemoryContextSwitchTo(oldcontext);
		return NULL;
	}

	/*
	 * OK, should be good to go
	 */
	foreach(ptr, public_conn_id)
	{
		this_public_conn_id = (public_pq_connections *) lfirst(ptr);
		if (this_public_conn_id->conn_id_index == public_conn_id_index){
			MemoryContextSwitchTo(oldcontext);
			return this_public_conn_id;
		}
	}
	MemoryContextSwitchTo(oldcontext);
	return NULL;
}

/*
 * get_public_conn
 */
static public_pq_results *
get_public_res(int32 public_res_id_index)
{
	List	   *ptr;
	public_pq_results *this_public_res_id;
	MemoryContext oldcontext;
	oldcontext = MemoryContextSwitchTo(TopMemoryContext);

	/*
	 * short circuit empty list
	 */
	if (public_res_id== NIL){
		MemoryContextSwitchTo(oldcontext);
		return NULL;
	}

	/*
	 * OK, should be good to go
	 */
	foreach(ptr, public_res_id)
	{
		this_public_res_id = (public_pq_results *) lfirst(ptr);
		if (this_public_res_id->res_id_index == public_res_id_index){
			MemoryContextSwitchTo(oldcontext);
			return this_public_res_id;
		}
	}
	MemoryContextSwitchTo(oldcontext);
	return NULL;
}

/*
 * Add node to global List public_conn_id
 */
static void
append_public_conn(public_pq_connections * connections)
{
	MemoryContext oldcontext;
	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
	public_conn_id = lappend(public_conn_id, connections);
	MemoryContextSwitchTo(oldcontext);
}

/*
 * Add node to global List public_res_id
 */
static void
append_public_res(public_pq_results* results)
{
	MemoryContext oldcontext;
	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
	public_res_id = lappend(public_res_id, results);
	MemoryContextSwitchTo(oldcontext);
}

/*
 * Remove node from global List
 * using public_conn_id_index
 */
static void
remove_public_conn(public_pq_connections * connections)
{
	MemoryContext oldcontext;
	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
	public_conn_id= lremove(connections, public_conn_id);

	if (public_conn_id== NIL)
		public_conn_id_index = 0;
	MemoryContextSwitchTo(oldcontext);
}

/*
 * Remove node from global List
 * using public_conn_id_index
 */
static void
remove_public_res(public_pq_results * results)
{
	MemoryContext oldcontext;
	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
	public_res_id= lremove(results, public_res_id);

	if (public_res_id== NIL)
		public_res_id_index = 0;
	MemoryContextSwitchTo(oldcontext);
}

/*
	Check connection id and return PGconn pointer, delete connection from list if removePointer = TRUE
*/
PGconn* getConnPointer(int32 nConnId,bool removePointer){
    PGconn* conn;
    public_pq_connections* connections;
	MemoryContext oldcontext;
	oldcontext = MemoryContextSwitchTo(TopMemoryContext);

	connections = get_public_conn(nConnId);
	if (connections == NULL){
		MemoryContextSwitchTo(oldcontext);
	   	return NULL;
	}
	conn = connections->conn;
    if (removePointer){
	    remove_public_conn(connections);
		pfree(connections);
	}
	MemoryContextSwitchTo(oldcontext);
	return conn;
}

/*
	Check result id and return PGresult pointer, delete result from list if removePointer = TRUE
*/
PGresult* getResultPointer(int32 nResId,bool removePointer){
    PGresult* res;
	public_pq_results *results;
	MemoryContext oldcontext;
	oldcontext = MemoryContextSwitchTo(TopMemoryContext);

	results = get_public_res(nResId);
	if (results == NULL){
		MemoryContextSwitchTo(oldcontext);
		return NULL;
	}
	res = results->res;
    if (removePointer){
	    remove_public_res(results);
		pfree(results);
	}
	MemoryContextSwitchTo(oldcontext);
	return res;
}


/*
	PGconn Connectdb(char *ConnectionString);
	--Connect to database
*/
PG_FUNCTION_INFO_V1(dblink_pq_Connectdb);
Datum
dblink_pq_Connectdb(PG_FUNCTION_ARGS)
{
	PGconn	*conn = NULL;
   	char *connString = GET_STR(PG_GETARG_TEXT_P(0));
	public_pq_connections* connections;

	conn = PQconnectdb(connString);
	if (PQstatus(conn) != CONNECTION_BAD)
	{
		public_conn_id_index++;
		connections = init_public_connections();
		connections->conn_id_index = public_conn_id_index;
		connections->conn=conn;
		append_public_conn(connections);
		PG_RETURN_INT32(public_conn_id_index);
	}else{
		PG_RETURN_INT32(0);
	}
}

/*
	PGconn SetdbLogin(char *host,char *port,char *options,char *tty,char *dbName,char *Login,char *Password);
	--Connect to database
*/
PG_FUNCTION_INFO_V1(dblink_pq_SetdbLogin);
Datum
dblink_pq_SetdbLogin(PG_FUNCTION_ARGS)
{
	PGconn	*conn = NULL;
	char		*pghost;
	char 		*pgport;
	char 		*pgoptions;
	char 		*pgtty;
	char 		*dbName;
	char 		*login;
	char 		*pwd;
	public_pq_connections* connections;

	pghost = GET_STR(PG_GETARG_TEXT_P(0));
	pgport = GET_STR(PG_GETARG_TEXT_P(1));
	pgoptions = GET_STR(PG_GETARG_TEXT_P(2));
	pgtty = GET_STR(PG_GETARG_TEXT_P(3));
	dbName = GET_STR(PG_GETARG_TEXT_P(4));
	login = GET_STR(PG_GETARG_TEXT_P(5));
	pwd	= GET_STR(PG_GETARG_TEXT_P(6));

	if (pgport == NULL)
		pgport="5432";

	conn = PQsetdbLogin(pghost,pgport,pgoptions,pgtty,dbName,login,pwd);
	if (PQstatus(conn) != CONNECTION_BAD)
	{
		public_conn_id_index++;
		connections = init_public_connections();
		connections->conn_id_index = public_conn_id_index;
		connections->conn=conn;
		append_public_conn(connections);
		PG_RETURN_INT32(public_conn_id_index);
	}else{
		PG_RETURN_INT32(0);
	}
}

/*
	int Status(PGconn *Connection);
	--Connection status
*/
PG_FUNCTION_INFO_V1(dblink_pq_Status);
Datum
dblink_pq_Status(PG_FUNCTION_ARGS)
{
    PGconn* conn;
    conn = getConnPointer(PG_GETARG_INT32(0),FALSE);
    if (conn==NULL)
		elog(ERROR, "dblink_pq_Status: function called with invalid connection id");
	PG_RETURN_INT32(PQstatus(conn));
}

/*
	text StatusStr(PGconn *Connection);
	--Connection status
*/
PG_FUNCTION_INFO_V1(dblink_pq_StatusStr);
Datum
dblink_pq_StatusStr(PG_FUNCTION_ARGS)
{
	text*   ConStat = NULL;
    PGconn* conn;
    conn = getConnPointer(PG_GETARG_INT32(0),FALSE);

    if (conn==NULL)
    	elog(ERROR, "dblink_pq_StatusStr: function called with invalid connection id");

	switch( PQstatus(conn) ){
		case 0 : {
			ConStat = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("CONNECTION_OK")));
			break;
		} case 1 : {
			ConStat = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("CONNECTION_BAD")));
			break;
		} case 2 : {
			ConStat = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("CONNECTION_STARTED")));
			break;
		} case 3 : {
			ConStat = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("CONNECTION_MADE")));
			break;
		} case 4 : {
			ConStat = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("CONNECTION_AWAITING_RESPONSE")));
			break;
		}case 5 : {
			ConStat = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("CONNECTION_AUTH_OK")));
			break;
		}case 6 : {
			ConStat = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("CONNECTION_SETENV")));
			break;
		}
	}
	PG_RETURN_TEXT_P(ConStat);
}

/*
	int Finish(PGconn *Connection);
	--Finish the connection
*/
PG_FUNCTION_INFO_V1(dblink_pq_Finish);
Datum
dblink_pq_Finish(PG_FUNCTION_ARGS)
{
    PGconn* conn;
    conn = getConnPointer(PG_GETARG_INT32(0),TRUE);
    if (conn==NULL)
    	elog(ERROR, "dblink_pq_Finish: function called with invalid connection id");

	PQfinish(conn);
	PG_RETURN_INT32(0);
}

/*
	int Reset(PGconn *Connection);
	--Reset the connection
*/
PG_FUNCTION_INFO_V1(dblink_pq_Reset);
Datum
dblink_pq_Reset(PG_FUNCTION_ARGS)
{
    PGconn* conn;
    conn = getConnPointer(PG_GETARG_INT32(0),FALSE);
    if (conn==NULL)
    	elog(ERROR, "dblink_pq_Reset: function called with invalid connection id");

	PQreset(conn);
	PG_RETURN_INT32(0);
}

/*
	text Db(PGconn *Connection);
	--Returns database name
*/
PG_FUNCTION_INFO_V1(dblink_pq_Db);
Datum
dblink_pq_Db(PG_FUNCTION_ARGS)
{
	text* DbName = NULL;
    PGconn* conn;
    conn = getConnPointer(PG_GETARG_INT32(0),FALSE);
    if (conn==NULL)
    	elog(ERROR, "dblink_pq_Db: function called with invalid connection id");

	DbName = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(PQdb(conn))));
	PG_RETURN_TEXT_P(DbName);
}

/*
	text User(PGconn *Connection);
	--Returns user name
*/
PG_FUNCTION_INFO_V1(dblink_pq_User);
Datum
dblink_pq_User(PG_FUNCTION_ARGS)
{
	text* User = NULL;
    PGconn* conn;
    conn = getConnPointer(PG_GETARG_INT32(0),FALSE);
    if (conn==NULL)
    	elog(ERROR, "dblink_pq_User: function called with invalid connection id");

	User = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(PQuser(conn))));
	PG_RETURN_TEXT_P(User);
}

/*
	text Password(PGconn *Connection);
	--Returns password
*/
PG_FUNCTION_INFO_V1(dblink_pq_Password);
Datum
dblink_pq_Password(PG_FUNCTION_ARGS)
{
	text* Password = NULL;
    PGconn* conn;
    conn = getConnPointer(PG_GETARG_INT32(0),FALSE);
    if (conn==NULL)
    	elog(ERROR, "dblink_pq_Password: function called with invalid connection id");

	Password = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(PQpass(conn))));
	PG_RETURN_TEXT_P(Password);
}

/*
	text Host(PGconn *Connection);
	--Returns host name
*/
PG_FUNCTION_INFO_V1(dblink_pq_Host);
Datum
dblink_pq_Host(PG_FUNCTION_ARGS)
{
	text			*Host = NULL;
    PGconn* conn;
    conn = getConnPointer(PG_GETARG_INT32(0),FALSE);
    if (conn==NULL)
    	elog(ERROR, "dblink_pq_Host: function called with invalid connection id");

	Host = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(PQhost(conn))));
	PG_RETURN_TEXT_P(Host);
}

/*
	text Port(PGconn *Connection);
	--Returns port
*/
PG_FUNCTION_INFO_V1(dblink_pq_Port);
Datum
dblink_pq_Port(PG_FUNCTION_ARGS)
{
	text* Port = NULL;
	PGconn* conn;
	conn = getConnPointer(PG_GETARG_INT32(0),FALSE);
	if (conn==NULL)
		elog(ERROR, "dblink_pq_Port: function called with invalid connection id");

	Port = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(PQport(conn))));
	PG_RETURN_TEXT_P(Port);
}

/*
	text Tty(PGconn *Connection);
	--Returns tty
*/
PG_FUNCTION_INFO_V1(dblink_pq_Tty);
Datum
dblink_pq_Tty(PG_FUNCTION_ARGS)
{
	text* Tty = NULL;
    PGconn* conn;
    conn = getConnPointer(PG_GETARG_INT32(0),FALSE);
    if (conn==NULL)
    	elog(ERROR, "dblink_pq_Tty: function called with invalid connection id");

	Tty = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(PQtty(conn))));
	PG_RETURN_TEXT_P(Tty);
}

/*
	text ErrorMessage(PGconn *Connection);
	--Returns connection error message
*/
PG_FUNCTION_INFO_V1(dblink_pq_ErrorMessage);
Datum
dblink_pq_ErrorMessage(PG_FUNCTION_ARGS)
{
	text* ErrorMsg = NULL;
    PGconn* conn;
    conn = getConnPointer(PG_GETARG_INT32(0),FALSE);
    if (conn==NULL)
    	elog(ERROR, "dblink_pq_ErrorMessage: function called with invalid connection id");

	ErrorMsg = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(PQerrorMessage(conn))));
	PG_RETURN_TEXT_P(ErrorMsg);
}

/*
	int BackendPID(PGconn *Connection);
	--Returns backend PID of host server process
*/
PG_FUNCTION_INFO_V1(dblink_pq_BackendPID);
Datum
dblink_pq_BackendPID(PG_FUNCTION_ARGS)
{
    PGconn* conn;
    conn = getConnPointer(PG_GETARG_INT32(0),FALSE);
    if (conn==NULL)
    	elog(ERROR, "dblink_pq_BackendPID: function called with invalid connection id");

	PG_RETURN_INT32(PQbackendPID(conn));
}

/*
	PGresult Exec(PGconn *Connection, char *SqlExecString);
	--Execute query and return pointer to PGresult
*/
PG_FUNCTION_INFO_V1(dblink_pq_Exec);
Datum
dblink_pq_Exec(PG_FUNCTION_ARGS)
{
    public_pq_results* results;
	PGresult* res = NULL;
	char* SqlExecStr;
    PGconn* conn;
    conn = getConnPointer(PG_GETARG_INT32(0),FALSE);
    if (conn==NULL)
    	elog(ERROR, "dblink_pq_Exec: function called with invalid connection id");

	SqlExecStr = GET_STR(PG_GETARG_TEXT_P(1));
	res= PQexec(conn,SqlExecStr);

	public_res_id_index++;
	results = init_public_results();
	results->res_id_index = public_res_id_index;
	results->res=res;
	append_public_res(results);
	PG_RETURN_INT32(public_res_id_index);
}

/*
	int ResultStatus(PGresult *Res);
	--Status of PGresult
*/
PG_FUNCTION_INFO_V1(dblink_pq_ResultStatus);
Datum
dblink_pq_ResultStatus(PG_FUNCTION_ARGS)
{
    PGresult* res;
    res = getResultPointer(PG_GETARG_INT32(0),FALSE);
    if (res==NULL)
		elog(ERROR, "dblink_pq_ResultStatus: function called with invalid resource id");

	PG_RETURN_INT32(PQresultStatus(res));
}

/*
	text ResStatus(PGresult *Res);
	--Status of PGresult as string (PGRES_COMMAND_OK for example)
*/
PG_FUNCTION_INFO_V1(dblink_pq_ResStatus);
Datum
dblink_pq_ResStatus(PG_FUNCTION_ARGS)
{
	int		statusFlag;
	text	*StatMsg;

	statusFlag = PG_GETARG_INT32(0);
	if (statusFlag < 0)
		elog(ERROR, "dblink_pq_ResStatus: invalid status value");

	StatMsg = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(PQresStatus(statusFlag))));
	PG_RETURN_TEXT_P(StatMsg);
}

/*
	text ResultErrorMessage(PGresult *Res);
	--Return PGresult error message
*/
PG_FUNCTION_INFO_V1(dblink_pq_ResultErrorMessage);
Datum
dblink_pq_ResultErrorMessage(PG_FUNCTION_ARGS)
{
	text *ErrMsg;
    PGresult* res;
    res = getResultPointer(PG_GETARG_INT32(0),FALSE);
    if (res==NULL)
		elog(ERROR, "dblink_pq_ResultErrorMessage: function called with invalid resource id");

	ErrMsg=DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(PQresultErrorMessage(res))));
	PG_RETURN_TEXT_P(ErrMsg);
}

/*
	int Clear(PGresult *Res);
	--Clear the PGresult
*/
PG_FUNCTION_INFO_V1(dblink_pq_Clear);
Datum
dblink_pq_Clear(PG_FUNCTION_ARGS)
{
    PGresult* res;
    res = getResultPointer(PG_GETARG_INT32(0),TRUE);
    if (res==NULL)
		elog(ERROR, "dblink_pq_ResultErrorMessage: function called with invalid resource id");

	PQclear(res);
	PG_RETURN_INT32(0);
}

/*
	text EscapeString(char *EscapeStr);
	--Escape the query string
*/
PG_FUNCTION_INFO_V1(dblink_pq_EscapeString);
Datum
dblink_pq_EscapeString(PG_FUNCTION_ARGS)
{
	char* InputStr;
	char* OutputStr;
	size_t InputLen;
	size_t OutputLen;

	InputStr = GET_STR(PG_GETARG_TEXT_P(0));
	InputLen=strlen(InputStr);

	if(InputStr==NULL){
		PG_RETURN_NULL();
	}
	OutputStr=(char*)palloc((2*InputLen)+1);
	OutputLen=PQescapeString (OutputStr, InputStr, InputLen);
	PG_RETURN_TEXT_P( DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(OutputStr))) );
}

/*
	int Ntuples(PGresult *Res);
	--Return number of tuples in PGresult
*/
PG_FUNCTION_INFO_V1(dblink_pq_Ntuples);
Datum
dblink_pq_Ntuples(PG_FUNCTION_ARGS)
{
    PGresult* res;
    res = getResultPointer(PG_GETARG_INT32(0),FALSE);
    if (res==NULL)
		elog(ERROR, "dblink_pq_Ntuples: function called with invalid resource id");

	PG_RETURN_INT32(PQntuples(res));
}

/*
	int Nfields(PGresult *Res);
	--Return number of fields(columns) in PGresult
*/
PG_FUNCTION_INFO_V1(dblink_pq_Nfields);
Datum
dblink_pq_Nfields(PG_FUNCTION_ARGS)
{
    PGresult* res;
    res = getResultPointer(PG_GETARG_INT32(0),FALSE);
    if (res==NULL)
		elog(ERROR, "dblink_pq_Nfields: function called with invalid resource id");

	PG_RETURN_INT32(PQnfields(res));
}

/*
	text Fname(PGresult *Res, int FieldNumber);
	--Return field(column) name for given column index
*/
PG_FUNCTION_INFO_V1(dblink_pq_Fname);
Datum
dblink_pq_Fname(PG_FUNCTION_ARGS)
{
	int				FieldNumber;
	char			*FieldName;
    PGresult* res;
    res = getResultPointer(PG_GETARG_INT32(0),FALSE);
    if (res==NULL)
		elog(ERROR, "dblink_pq_Fname: function called with invalid resource id");

	FieldNumber = PG_GETARG_INT32(1);
	if (FieldNumber > (PQnfields(res) - 1))
		PG_RETURN_NULL();

	FieldName=PQfname(res,FieldNumber);
	PG_RETURN_TEXT_P( DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(FieldName))) );
}

/*
	text Fnumber(PGresult *Res, char *FieldName);
	--Return field(column) number for given column name
*/
PG_FUNCTION_INFO_V1(dblink_pq_Fnumber);
Datum
dblink_pq_Fnumber(PG_FUNCTION_ARGS)
{
	char			*FieldName;
	PGresult	*res = NULL;
    res = getResultPointer(PG_GETARG_INT32(0),FALSE);
    if (res==NULL)
		elog(ERROR, "dblink_pq_Fnumber: function called with invalid resource id");

	FieldName = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(1))));
	PG_RETURN_INT32(PQfnumber(res,FieldName));
}

/*
	oid Ftype(PGresult *Res, int FieldNumber);
	--Return oid of data type for given field(column) number
*/
PG_FUNCTION_INFO_V1(dblink_pq_Ftype);
Datum
dblink_pq_Ftype(PG_FUNCTION_ARGS)
{
	int				FieldNumber;
	PGresult	*res = NULL;
    res = getResultPointer(PG_GETARG_INT32(0),FALSE);
    if (res==NULL)
		elog(ERROR, "dblink_pq_Ftype: function called with invalid resource id");

	FieldNumber = PG_GETARG_INT32(1);
	if (FieldNumber > (PQnfields(res) - 1))
		PG_RETURN_NULL();
	PG_RETURN_OID(PQftype(res,FieldNumber));
}

/*
	int Fmod(PGresult *Res, int FieldNumber);
	--Return type-specific modificaton data for given field(column) number
*/
PG_FUNCTION_INFO_V1(dblink_pq_Fmod);
Datum
dblink_pq_Fmod(PG_FUNCTION_ARGS)
{
	int				FieldNumber;
	PGresult	*res = NULL;
    res = getResultPointer(PG_GETARG_INT32(0),FALSE);
    if (res==NULL)
		elog(ERROR, "dblink_pq_Fmod: function called with invalid resource id");

	FieldNumber = PG_GETARG_INT32(1);
	if (FieldNumber > (PQnfields(res) - 1))
		PG_RETURN_NULL();
	PG_RETURN_INT32(PQfmod(res,FieldNumber));
}

/*
	int Fsize(PGresult *Res, int FieldNumber);
	--Return the size in bytes of the field
*/
PG_FUNCTION_INFO_V1(dblink_pq_Fsize);
Datum
dblink_pq_Fsize(PG_FUNCTION_ARGS)
{
	int				FieldNumber;
	PGresult	*res = NULL;
    res = getResultPointer(PG_GETARG_INT32(0),FALSE);
    if (res==NULL)
		elog(ERROR, "dblink_pq_Fsize: function called with invalid resource id");

	FieldNumber = PG_GETARG_INT32(1);
	if (FieldNumber > (PQnfields(res) - 1))
		PG_RETURN_NULL();
	PG_RETURN_INT32(PQfsize(res,FieldNumber));
}

/*
	int BinaryTuples(PGresult *Res);
	--Return 1 if PGresult have binary tuples, 0 if not
*/
PG_FUNCTION_INFO_V1(dblink_pq_BinaryTuples);
Datum
dblink_pq_BinaryTuples(PG_FUNCTION_ARGS)
{
	PGresult	*res = NULL;
    res = getResultPointer(PG_GETARG_INT32(0),FALSE);
    if (res==NULL)
		elog(ERROR, "dblink_pq_BinaryTuples: function called with invalid resource id");

	PG_RETURN_INT32(PQbinaryTuples(res));

}

/*
	text GetValue(PGresult *Res, int TupleNumber, int FieldNumber);
	--Return value of field
*/
PG_FUNCTION_INFO_V1(dblink_pq_GetValue);
Datum
dblink_pq_GetValue(PG_FUNCTION_ARGS)
{
	int				TupleNumber=0;
	int				FieldNumber=0;
	PGresult	*res = NULL;
    res = getResultPointer(PG_GETARG_INT32(0),FALSE);
    if (res==NULL)
		elog(ERROR, "dblink_pq_GetValue: function called with invalid resource id");

	TupleNumber = PG_GETARG_INT32(1);
	if ( (TupleNumber<0) || (FieldNumber > (PQntuples(res) - 1)) )
		elog(ERROR, "PlPq: invalid tuple number: %i",TupleNumber);

	FieldNumber = PG_GETARG_INT32(2);
	if (FieldNumber > (PQnfields(res) - 1))
		elog(ERROR, "PlPq: invalid field number: %i",FieldNumber);

	if (PQgetisnull(res,TupleNumber,FieldNumber)==1){
		/*Return NULL*/
		PG_RETURN_NULL();
	}else{
		/*Return text*/
		PG_RETURN_TEXT_P( DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum( PQgetvalue(res,TupleNumber,FieldNumber) ))) );
	}
}

/*
	int Getisnull(PGresult *Res, int TupleNumber, int FieldNumber);
	--Return 1 if field has no value(NULL), 0 if not
*/
PG_FUNCTION_INFO_V1(dblink_pq_Getisnull);
Datum
dblink_pq_Getisnull(PG_FUNCTION_ARGS)
{
	int				TupleNumber=0;
	int				FieldNumber=0;
	PGresult	*res = NULL;
    res = getResultPointer(PG_GETARG_INT32(0),FALSE);
    if (res==NULL)
		elog(ERROR, "dblink_pq_Getisnull: function called with invalid resource id");

	TupleNumber = PG_GETARG_INT32(1);
	if ( (TupleNumber<0) || (FieldNumber > (PQntuples(res) - 1)) )
		elog(ERROR, "PlPq: invalid tuple number: %i",TupleNumber);

	FieldNumber = PG_GETARG_INT32(2);
	if (FieldNumber > (PQnfields(res) - 1))
		elog(ERROR, "PlPq: invalid field number: %i",FieldNumber);

	PG_RETURN_INT32(PQgetisnull(res,TupleNumber,FieldNumber));
}

/*
	int Getlength(PGresult *Res, int TupleNumber, int FieldNumber);
	--Return actual size of field
*/
PG_FUNCTION_INFO_V1(dblink_pq_Getlength);
Datum
dblink_pq_Getlength(PG_FUNCTION_ARGS)
{
	int				TupleNumber=0;
	int				FieldNumber=0;
	PGresult	*res = NULL;
    res = getResultPointer(PG_GETARG_INT32(0),FALSE);
    if (res==NULL)
		elog(ERROR, "dblink_pq_Getlength: function called with invalid resource id");

	TupleNumber = PG_GETARG_INT32(1);
	if ( (TupleNumber<0) || (FieldNumber > (PQntuples(res) - 1)) )
		elog(ERROR, "PlPq: invalid tuple number: %i",TupleNumber);

	FieldNumber = PG_GETARG_INT32(2);
	if (FieldNumber > (PQnfields(res) - 1))
		elog(ERROR, "PlPq: invalid field number: %i",FieldNumber);

	PG_RETURN_INT32(PQgetlength(res,TupleNumber,FieldNumber));
}

/*
	text CmdStatus(PGresult *Res);
	--Return command status string from PGresult
*/
PG_FUNCTION_INFO_V1(dblink_pq_CmdStatus);
Datum
dblink_pq_CmdStatus(PG_FUNCTION_ARGS)
{
	PGresult	*res = NULL;
    res = getResultPointer(PG_GETARG_INT32(0),FALSE);
    if (res==NULL)
		elog(ERROR, "dblink_pq_CmdStatus: function called with invalid resource id");

	PG_RETURN_TEXT_P( DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum( PQcmdStatus(res) ))) );
}

/*
	int CmdTuples(PGresult *Res);
	--Return number of rows affected by the SQL command
*/
PG_FUNCTION_INFO_V1(dblink_pq_CmdTuples);
Datum
dblink_pq_CmdTuples(PG_FUNCTION_ARGS)
{
	PGresult	*res = NULL;
    res = getResultPointer(PG_GETARG_INT32(0),FALSE);
    if (res==NULL)
		elog(ERROR, "dblink_pq_CmdTuples: function called with invalid resource id");

	PG_RETURN_INT32(atoi(PQcmdTuples(res)));
}

/*
	oid OidValue(PGresult *Res);
	--Return last inserted OID
*/
PG_FUNCTION_INFO_V1(dblink_pq_OidValue);
Datum
dblink_pq_OidValue(PG_FUNCTION_ARGS)
{
	PGresult	*res = NULL;
    res = getResultPointer(PG_GETARG_INT32(0),FALSE);
    if (res==NULL)
		elog(ERROR, "dblink_pq_OidValue: function called with invalid resource id");

	PG_RETURN_OID(PQoidValue(res));
}

/*
	text OidStatus(PGresult *Res);
	--Return OID status as string
*/
PG_FUNCTION_INFO_V1(dblink_pq_OidStatus);
Datum
dblink_pq_OidStatus(PG_FUNCTION_ARGS)
{
	PGresult	*res = NULL;
    res = getResultPointer(PG_GETARG_INT32(0),FALSE);
    if (res==NULL)
		elog(ERROR, "dblink_pq_OidStatus: function called with invalid resource id");

	PG_RETURN_TEXT_P( DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum( PQoidStatus(res) ))) );
}
pqtest.sqltext/plain; charset=us-ascii; name=pqtest.sqlDownload
#2Bruce Momjian
pgman@candle.pha.pa.us
In reply to: Darko Prenosil (#1)
Re: plpq

Would you tell us exactly what this is.

---------------------------------------------------------------------------

Darko Prenosil wrote:

Tree weeks later than I promised, but it is finished (I hope).

In attachment are files:
dblink.c
dblink.h
dblink.sql.in
pqtest.sql

In file pqtest.sql is sample queries and results. It seem OK to me.

There are two reasons why I did not make a diff.

1. The source I started from is 7.3b1, not the latest.
2. I would like You to check the code, especially the part that touches memory
management.
I can say that it works, but I do not know exactly why, and this can be
dangerous. With my knowledge of postgres internals this is
as far I can go at the moment. And once more sorry for bad English !

Regards !

[ Attachment, skipping... ]

[ Attachment, skipping... ]

[ Attachment, skipping... ]

[ Attachment, skipping... ]

---------------------------(end of broadcast)---------------------------
TIP 3: if posting/reading through Usenet, please send an appropriate
subscribe-nomail command to majordomo@postgresql.org so that your
message can get through to the mailing list cleanly

-- 
  Bruce Momjian                        |  http://candle.pha.pa.us
  pgman@candle.pha.pa.us               |  (610) 359-1001
  +  If your life is a hard drive,     |  13 Roberts Road
  +  Christ can be your backup.        |  Newtown Square, Pennsylvania 19073
#3Darko Prenosil
darko.prenosil@finteh.hr
In reply to: Bruce Momjian (#2)
Re: plpq

It is wrapper about libpq client library functions for use in PL/PSQL.
I agreed with Joe Conway that it may fit within dblink, because dblink is much
more easy to work with than the libpq, but sometimes it is Just not enough.
So, the idea is to re-implement all important libpq functions like
PQConnectdb,
PQExec ... and make one postgres server become just a client for another (or
another's) postgres server. As You know, dblink still does not support
simultaneous connections to two different databases, but with this it can !
I agreed with Joe that when he is back from holiday he will take a look.

Regards !

Show quoted text

On Monday 28 October 2002 20:35, Bruce Momjian wrote:

Would you tell us exactly what this is.

---------------------------------------------------------------------------

Darko Prenosil wrote:

Tree weeks later than I promised, but it is finished (I hope).

In attachment are files:
dblink.c
dblink.h
dblink.sql.in
pqtest.sql

In file pqtest.sql is sample queries and results. It seem OK to me.

There are two reasons why I did not make a diff.

1. The source I started from is 7.3b1, not the latest.
2. I would like You to check the code, especially the part that touches
memory management.
I can say that it works, but I do not know exactly why, and this can be
dangerous. With my knowledge of postgres internals this is
as far I can go at the moment. And once more sorry for bad English !

Regards !

[ Attachment, skipping... ]

[ Attachment, skipping... ]

[ Attachment, skipping... ]

[ Attachment, skipping... ]

---------------------------(end of broadcast)---------------------------
TIP 3: if posting/reading through Usenet, please send an appropriate
subscribe-nomail command to majordomo@postgresql.org so that your
message can get through to the mailing list cleanly

#4Bruce Momjian
pgman@candle.pha.pa.us
In reply to: Darko Prenosil (#3)
Re: plpq

Darko Prenosil wrote:

It is wrapper about libpq client library functions for use in PL/PSQL.
I agreed with Joe Conway that it may fit within dblink, because dblink is much
more easy to work with than the libpq, but sometimes it is Just not enough.
So, the idea is to re-implement all important libpq functions like
PQConnectdb,
PQExec ... and make one postgres server become just a client for another (or
another's) postgres server. As You know, dblink still does not support
simultaneous connections to two different databases, but with this it can !
I agreed with Joe that when he is back from holiday he will take a look.

Brilliant idea. I assume you added those functions and rewrote dblink
to use them. dblink already is great, but by adding libpq inside
plpgsql, all sorts of other things become possible.

-- 
  Bruce Momjian                        |  http://candle.pha.pa.us
  pgman@candle.pha.pa.us               |  (610) 359-1001
  +  If your life is a hard drive,     |  13 Roberts Road
  +  Christ can be your backup.        |  Newtown Square, Pennsylvania 19073