FDW-based dblink (WIP)

Started by Itagaki Takahiroover 16 years ago12 messages
#1Itagaki Takahiro
itagaki.takahiro@oss.ntt.co.jp
1 attachment(s)

Here is a WIP patch for a foreign data wrapper based dblink.

It integrates dblink module into core and adds a new functionality,
automatic transaction management. The new interface of dblink is
exported by include/foreign/dblink.h. We can easily write a connector
module for another database because we can reuse transaction and
resource management parts in core.

Syntax to create FDW with connector is below:
CREATE FOREIGN DATA WRAPPER postgresql
VALIDATOR postgresql_fdw_validator
CONNECTOR postgresql_fdw_connector
OPTIONS (...);

contrib/dblink2 is a sample of postgres connector. It exports one function:
CREATE FUNCTION postgresql_fdw_connector(options internal)
RETURNS internal -- returns a connection object

Basic dblink functions are moved to postgres core:
Name | Result type | Argument data types
-------------------+--------------+----------------------
dblink | SETOF record | text, text
dblink_close | boolean | integer
dblink_connect | boolean | text
dblink_connect | boolean | text, text
dblink_disconnect | boolean | text
dblink_exec | bigint | text, text
dblink_fetch | SETOF record | integer, integer
dblink_open | integer | text, text

The new dblink can work together closely with local transactions. If a
local transaction is committed or rollbacked, remote transactions take
the same status with the local one. Please set max_prepared_transactions
to 1 or greater if you could test the patch.

I want pretty much the automatic transaction management. It is useful to
write applied modules like materialized-view-over-network. But it should
be able to be turned off if we don't want it. I'll work on those parts next.

-- connect
CREATE SERVER server_postgres FOREIGN DATA WRAPPER postgresql;
SELECT dblink_connect('conn_postgres', 'server_postgres');
-- commit both local and remote transactions.
BEGIN;
SELECT dblink_exec('conn_postgres', 'UPDATE ...');
COMMIT;
-- rollback both local and remote transactions.
BEGIN;
SELECT dblink_exec('conn_postgres', 'UPDATE ...');
ROLLBACK;
-- disconnect
SELECT dblink_disconnect('conn_postgres');

I've not ported all features in present dblink, but I'd like to hear
wheather the goal and the concepts are reasonable. Comments welcome.

Regards,
---
ITAGAKI Takahiro
NTT Open Source Software Center

Attachments:

dblink2-20090819.patchapplication/octet-stream; name=dblink2-20090819.patchDownload
diff -cprN head/contrib/dblink2/Makefile dblink2/contrib/dblink2/Makefile
*** head/contrib/dblink2/Makefile	1970-01-01 09:00:00.000000000 +0900
--- dblink2/contrib/dblink2/Makefile	2009-08-19 11:42:22.350167550 +0900
***************
*** 0 ****
--- 1,20 ----
+ MODULE_big = dblink2
+ PG_CPPFLAGS = -I$(libpq_srcdir)
+ OBJS	= dblink_postgres.o
+ SHLIB_LINK = $(libpq)
+ 
+ DATA_built = dblink2.sql 
+ DATA = uninstall_dblink2.sql 
+ REGRESS = dblink2
+ 
+ 
+ ifdef USE_PGXS
+ PG_CONFIG = pg_config
+ PGXS := $(shell $(PG_CONFIG) --pgxs)
+ include $(PGXS)
+ else
+ subdir = contrib/dblink2
+ top_builddir = ../..
+ include $(top_builddir)/src/Makefile.global
+ include $(top_srcdir)/contrib/contrib-global.mk
+ endif
diff -cprN head/contrib/dblink2/dblink2.sql.in dblink2/contrib/dblink2/dblink2.sql.in
*** head/contrib/dblink2/dblink2.sql.in	1970-01-01 09:00:00.000000000 +0900
--- dblink2/contrib/dblink2/dblink2.sql.in	2009-08-19 12:23:37.444175945 +0900
***************
*** 0 ****
--- 1,16 ----
+ -- Adjust this setting to control where the objects get created.
+ SET search_path = public;
+ 
+ BEGIN;
+ 
+ CREATE FUNCTION postgresql_fdw_connector(internal)
+ RETURNS internal
+ AS 'MODULE_PATHNAME','postgresql_fdw_connector'
+ LANGUAGE C STRICT;
+ 
+ CREATE FOREIGN DATA WRAPPER postgresql
+ 	VALIDATOR postgresql_fdw_validator
+ 	CONNECTOR postgresql_fdw_connector
+ ;
+ 
+ COMMIT;
diff -cprN head/contrib/dblink2/dblink_postgres.c dblink2/contrib/dblink2/dblink_postgres.c
*** head/contrib/dblink2/dblink_postgres.c	1970-01-01 09:00:00.000000000 +0900
--- dblink2/contrib/dblink2/dblink_postgres.c	2009-08-19 15:42:57.884513821 +0900
***************
*** 0 ****
--- 1,424 ----
+ /*-------------------------------------------------------------------------
+  *
+  * dblink_postgres.c
+  *		  support for foreign postgres server connectors.
+  *
+  *-------------------------------------------------------------------------
+  */
+ #include "postgres.h"
+ 
+ #include "fmgr.h"
+ #include "foreign/dblink.h"
+ #include "mb/pg_wchar.h"
+ #include "lib/stringinfo.h"
+ #include "nodes/parsenodes.h"
+ 
+ #include "libpq-fe.h"
+ 
+ PG_MODULE_MAGIC;
+ 
+ typedef struct pglink_connection
+ {
+ 	dblink_connection	base;
+ 	PGconn			   *conn;
+ } pglink_connection;
+ 
+ /* PQresult wrapper cursor */
+ typedef struct pglink_cursor
+ {
+ 	dblink_cursor	base;
+ 	PGresult	   *res;
+ 	int				row;
+ } pglink_cursor;
+ 
+ /* Server-side cursor */
+ typedef struct pglink_srvcur
+ {
+ 	pglink_cursor	base;
+ 	PGconn		   *conn;
+ 	int				fetchsize;
+ 	char			name[NAMEDATALEN];	/* cursor name */
+ } pglink_srvcur;
+ 
+ PG_FUNCTION_INFO_V1(postgresql_fdw_connector);
+ extern Datum postgresql_fdw_connector(PG_FUNCTION_ARGS);
+ 
+ static pglink_connection *pglink_connection_new(PGconn *conn);
+ static void pglink_disconnect(pglink_connection *conn);
+ static int64 pglink_exec(pglink_connection *conn, const char *sql);
+ static pglink_cursor *pglink_open(pglink_connection *conn, const char *sql, int32 fetchsize);
+ static bool pglink_command(pglink_connection *conn, dblink_command type);
+ 
+ static pglink_cursor *pglink_cursor_new(void);
+ static bool pglink_fetch(pglink_cursor *cur, const char *values[]);
+ static void pglink_close(pglink_cursor *cur);
+ 
+ static pglink_srvcur *pglink_srvcur_new(void);
+ static bool pglink_fetch_srvcur(pglink_srvcur *cur, const char *values[]);
+ static void pglink_close_srvcur(pglink_srvcur *cur);
+ 
+ static void pglink_error(PGconn *conn, PGresult *res);
+ 
+ /*
+  * Escaping libpq connect parameter strings.
+  *
+  * Replaces "'" with "\'" and "\" with "\\".
+  */
+ static char *
+ escape_param_str(const char *str)
+ {
+ 	const char *cp;
+ 	StringInfo	buf = makeStringInfo();
+ 
+ 	for (cp = str; *cp; cp++)
+ 	{
+ 		if (*cp == '\\' || *cp == '\'')
+ 			appendStringInfoChar(buf, '\\');
+ 		appendStringInfoChar(buf, *cp);
+ 	}
+ 
+ 	return buf->data;
+ }
+ 
+ static char *
+ join_options(List *options)
+ {
+ 	ListCell	   *cell;
+ 	StringInfoData	buf;
+ 
+ 	initStringInfo(&buf);
+ 	foreach(cell, options)
+ 	{
+ 		DefElem    *def = lfirst(cell);
+ 
+ 		appendStringInfo(&buf, "%s='%s' ", def->defname,
+ 						 escape_param_str(strVal(def->arg)));
+ 	}
+ 
+ 	return buf.data;
+ }
+ 
+ Datum
+ postgresql_fdw_connector(PG_FUNCTION_ARGS)
+ {
+ 	List	   *options = (List *) PG_GETARG_POINTER(0);
+ 	PGconn	   *conn;
+ 
+ 	conn = PQconnectdb(join_options(options));
+ 
+ 	if (PQstatus(conn) == CONNECTION_BAD)
+ 	{
+ 		char *detail = pstrdup(PQerrorMessage(conn));
+ 		PQfinish(conn);
+ 
+ 		ereport(ERROR,
+ 			(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
+ 			errmsg("could not establish connection"),
+ 			errdetail("%s", detail)));
+ 	}
+ 
+ 	PQsetClientEncoding(conn, GetDatabaseEncodingName());
+ 
+ 	PG_RETURN_POINTER(pglink_connection_new(conn));
+ }
+ 
+ static pglink_connection *
+ pglink_connection_new(PGconn *conn)
+ {
+ 	pglink_connection  *p;
+ 
+ 	p = malloc(sizeof(pglink_connection));
+ 	p->base.disconnect = (dblink_disconnect_t) pglink_disconnect;
+ 	p->base.exec = (dblink_exec_t) pglink_exec;
+ 	p->base.open = (dblink_open_t) pglink_open;
+ 	p->base.command = (dblink_command_t) pglink_command;
+ 	p->conn = conn;
+ 
+ 	return p;
+ }
+ 
+ static void
+ pglink_disconnect(pglink_connection *conn)
+ {
+ 	PQfinish(conn->conn);
+ 	free(conn);
+ }
+ 
+ static int64
+ pglink_exec(pglink_connection *conn, const char *sql)
+ {
+ 	PGresult	   *res;
+ 	int64			ntuples;
+ 
+ 	res = PQexec(conn->conn, sql);
+ 
+ 	switch (PQresultStatus(res))
+ 	{
+ 		case PGRES_COMMAND_OK:
+ 			ntuples = atoi(PQcmdTuples(res));
+ 			PQclear(res);
+ 			break;
+ 		case PGRES_TUPLES_OK:
+ 			ntuples = PQntuples(res);
+ 			PQclear(res);
+ 			break;
+ 		default:
+ 		pglink_error(conn->conn, res);
+ 		return 0;	/* keep compiler quiet */
+ 	}
+ 
+ 	return ntuples;
+ }
+ 
+ static bool
+ fetch_forward(pglink_srvcur *cur)
+ {
+ 	char			sql[NAMEDATALEN + 64];
+ 	PGresult	   *res;
+ 	ExecStatusType	code;
+ 
+ 	PQclear(cur->base.res);
+ 	cur->base.res = NULL;
+ 
+ 	if (!cur->name[0])
+ 		return false;	/* already done */
+ 
+ 	snprintf(sql, lengthof(sql), "FETCH FORWARD %d FROM %s", cur->fetchsize, cur->name);
+ 	res = PQexec(cur->conn, sql);
+ 	code = PQresultStatus(res);
+ 	if (code != PGRES_COMMAND_OK && code != PGRES_TUPLES_OK)
+ 		pglink_error(cur->conn, res);
+ 
+ 	if (PQntuples(res) <= 0)
+ 	{
+ 		/* no more tuples */
+ 		PQclear(res);
+ 		snprintf(sql, lengthof(sql), "CLOSE %s", cur->name);
+ 		PQclear(PQexec(cur->conn, sql));
+ 		cur->name[0] = '\0';
+ 		return false;
+ 	}
+ 
+ 	cur->base.res = res;
+ 	cur->base.row = 0;
+ 	return true;
+ }
+ 
+ static pglink_cursor *
+ pglink_open(pglink_connection *conn, const char *sql, int32 fetchsize)
+ {
+ 	PGresult	   *res;
+ 	ExecStatusType	code;
+ 
+ 	if (fetchsize > 0)
+ 	{
+ 		char			name[NAMEDATALEN];
+ 		StringInfoData	buf;
+ 
+ 		initStringInfo(&buf);
+ 
+ 		/* TODO: Use less conflict cursor name. */
+ 		snprintf(name, NAMEDATALEN, "dblink2_%d", rand());
+ 		appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", name, sql);
+ 		res = PQexec(conn->conn, buf.data);
+ 		code = PQresultStatus(res);
+ 
+ 		if (code == PGRES_COMMAND_OK)
+ 		{
+ 			pglink_srvcur *cur;
+ 
+ 			PQclear(res);
+ 
+ 			cur = pglink_srvcur_new();
+ 			cur->conn = conn->conn;
+ 			cur->fetchsize = fetchsize;
+ 			strlcpy(cur->name, name, NAMEDATALEN);
+ 			if (!fetch_forward(cur))
+ 			{
+ 				pglink_close_srvcur(cur);
+ 				return NULL;
+ 			}
+ 			cur->base.base.nfields = PQnfields(cur->base.res);
+ 
+ 			return (pglink_cursor *) cur;
+ 		}
+ 	}
+ 	else
+ 	{
+ 		res = PQexec(conn->conn, sql);
+ 		code = PQresultStatus(res);
+ 
+ 		if (code == PGRES_COMMAND_OK || code == PGRES_TUPLES_OK)
+ 		{
+ 			pglink_cursor  *cur;
+ 
+ 			cur = pglink_cursor_new();
+ 			cur->base.nfields = PQnfields(res);
+ 			cur->res = res;
+ 			return cur;
+ 		}
+ 	}
+ 
+ 	pglink_error(conn->conn, res);
+ 	return NULL;	/* keep compiler quiet */
+ }
+ 
+ /* TODO: Use less conflict and more human-readable XA-ID. */
+ static bool
+ pglink_command(pglink_connection *conn, dblink_command type)
+ {
+ 	char		sql[256];
+ 	PGresult   *res;
+ 	bool		ok;
+ 
+ 	switch (type)
+ 	{
+ 		case DBLINK_BEGIN:
+ 		case DBLINK_XA_START:
+ 			strlcpy(sql, "BEGIN", lengthof(sql));
+ 			break;
+ 		case DBLINK_COMMIT:
+ 			strlcpy(sql, "COMMIT", lengthof(sql));
+ 			break;
+ 		case DBLINK_ROLLBACK:
+ 			strlcpy(sql, "ROLLBACK", lengthof(sql));
+ 			break;
+ 		case DBLINK_XA_PREPARE:
+ 			snprintf(sql, lengthof(sql), "PREPARE TRANSACTION 'dblink2_%p'", conn);
+ 			break;
+ 		case DBLINK_XA_COMMIT:
+ 			snprintf(sql, lengthof(sql), "COMMIT PREPARED 'dblink2_%p'", conn);
+ 			break;
+ 		case DBLINK_XA_ROLLBACK:
+ 			snprintf(sql, lengthof(sql), "ROLLBACK PREPARED 'dblink2_%p'", conn);
+ 			break;
+ 		default:
+ 			return false;
+ 	}
+ 
+ 	res = PQexec(conn->conn, sql);
+ 	ok = (PQresultStatus(res) == PGRES_COMMAND_OK);
+ 	PQclear(res);
+ 
+ 	return ok;
+ }
+ 
+ static pglink_cursor *
+ pglink_cursor_new(void)
+ {
+ 	pglink_cursor  *p;
+ 
+ 	p = malloc(sizeof(pglink_cursor));
+ 	p->base.fetch = (dblink_fetch_t) pglink_fetch;
+ 	p->base.close = (dblink_close_t) pglink_close;
+ 	p->base.nfields = 0;
+ 	p->row = 0;
+ 	p->res = NULL;
+ 
+ 	return p;
+ }
+ 
+ static bool
+ pglink_fetch(pglink_cursor *cur, const char *values[])
+ {
+ 	int		c;
+ 
+ 	if (cur->row >= PQntuples(cur->res))
+ 		return false;
+ 
+ 	for (c = 0; c < cur->base.nfields; c++)
+ 	{
+ 		if (PQgetisnull(cur->res, cur->row, c))
+ 			values[c] = NULL;
+ 		else
+ 			values[c] = PQgetvalue(cur->res, cur->row, c);
+ 	}
+ 	cur->row++;
+ 	return true;
+ }
+ 
+ static void
+ pglink_close(pglink_cursor *cur)
+ {
+ 	PQclear(cur->res);
+ 	free(cur);
+ }
+ 
+ static pglink_srvcur *
+ pglink_srvcur_new(void)
+ {
+ 	pglink_srvcur  *p;
+ 
+ 	p = malloc(sizeof(pglink_srvcur));
+ 	p->base.base.fetch = (dblink_fetch_t) pglink_fetch_srvcur;
+ 	p->base.base.close = (dblink_close_t) pglink_close_srvcur;
+ 	p->base.base.nfields = 0;
+ 	p->base.row = 0;
+ 	p->base.res = NULL;
+ 	p->conn = NULL;
+ 	p->name[0] = '\0';
+ 
+ 	return p;
+ }
+ 
+ static bool
+ pglink_fetch_srvcur(pglink_srvcur *cur, const char *values[])
+ {
+ 	if (cur->base.res == NULL || cur->base.row >= PQntuples(cur->base.res))
+ 	{
+ 		if (!fetch_forward(cur))
+ 			return false;
+ 		else if (cur->base.base.nfields != PQnfields(cur->base.res))
+ 			elog(ERROR, "nfields should not be changed");
+ 	}
+ 
+ 	return pglink_fetch(&cur->base, values);
+ }
+ 
+ static void
+ pglink_close_srvcur(pglink_srvcur *cur)
+ {
+ 	PQclear(cur->base.res);
+ 	if (cur->name[0] && cur->conn)
+ 	{
+ 		char	sql[NAMEDATALEN + 64];
+ 
+ 		snprintf(sql, lengthof(sql), "CLOSE %s", cur->name);
+ 		PQclear(PQexec(cur->conn, sql));
+ 	}
+ 	free(cur);
+ }
+ 
+ static void
+ pglink_error(PGconn *conn, PGresult *res)
+ {
+ 	const char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
+ 	const char *message = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
+ 	const char *detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
+ 	const char *hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
+ 	const char *context = PQresultErrorField(res, PG_DIAG_CONTEXT);
+ 	int			sqlstate;
+ 
+ 	if (res)
+ 		PQclear(res);
+ 
+ 	if (diag_sqlstate)
+ 		sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
+ 								 diag_sqlstate[1],
+ 								 diag_sqlstate[2],
+ 								 diag_sqlstate[3],
+ 								 diag_sqlstate[4]);
+ 	else
+ 		sqlstate = ERRCODE_CONNECTION_FAILURE;
+ 
+ 	message = message ? pstrdup(message) : NULL;
+ 	detail = detail ? pstrdup(detail) : NULL;
+ 	hint = hint ? pstrdup(hint) : NULL;
+ 	context = context ? pstrdup(context) : NULL;
+ 
+ 	ereport(ERROR, (errcode(sqlstate),
+ 		message ? errmsg("%s", message) : errmsg("unknown error"),
+ 		detail ? errdetail("%s", detail) : 0,
+ 		hint ? errhint("%s", hint) : 0,
+ 		context ? errcontext("%s", context) : 0));
+ }
diff -cprN head/contrib/dblink2/expected/dblink2.out dblink2/contrib/dblink2/expected/dblink2.out
*** head/contrib/dblink2/expected/dblink2.out	1970-01-01 09:00:00.000000000 +0900
--- dblink2/contrib/dblink2/expected/dblink2.out	2009-08-19 15:58:20.810172000 +0900
***************
*** 0 ****
--- 1,235 ----
+ -- Adjust this setting to control where the objects get created.
+ SET search_path = public;
+ SET client_min_messages = warning;
+ \set ECHO none
+ RESET client_min_messages;
+ CREATE SERVER server_postgres FOREIGN DATA WRAPPER postgresql OPTIONS (dbname 'contrib_regression');
+ CREATE USER MAPPING FOR CURRENT_USER SERVER server_postgres OPTIONS (user 'postgres');
+ CREATE TABLE dblink_tbl (id integer, value text);
+ INSERT INTO dblink_tbl VALUES(1, 'X');
+ INSERT INTO dblink_tbl VALUES(2, 'BB');
+ INSERT INTO dblink_tbl VALUES(3, 'CCC');
+ INSERT INTO dblink_tbl VALUES(4, 'DDDD');
+ CREATE TABLE dblink_tmp (LIKE dblink_tbl);
+ CREATE FUNCTION dblink_cursor_test(cursor integer, howmany integer)
+ RETURNS SETOF dblink_tmp AS
+ $$
+ TRUNCATE dblink_tmp;
+ INSERT INTO dblink_tmp SELECT * FROM dblink_fetch($1, $2) AS t(id integer, value text);
+ INSERT INTO dblink_tmp SELECT * FROM dblink_fetch($1, $2) AS t(id integer, value text);
+ INSERT INTO dblink_tmp SELECT * FROM dblink_fetch($1, $2) AS t(id integer, value text);
+ SELECT * FROM dblink_tmp;
+ $$
+ LANGUAGE sql;
+ -- dblink_connect()
+ SELECT name, srvname, status, keep FROM dblink_connections, pg_foreign_server WHERE server = oid;
+  name | srvname | status | keep 
+ ------+---------+--------+------
+ (0 rows)
+ 
+ SELECT dblink_connect('conn_postgres', 'server_postgres');
+  dblink_connect 
+ ----------------
+  t
+ (1 row)
+ 
+ SELECT name, srvname, status, keep FROM dblink_connections, pg_foreign_server WHERE server = oid;
+      name      |     srvname     | status | keep 
+ ---------------+-----------------+--------+------
+  conn_postgres | server_postgres | idle   | t
+ (1 row)
+ 
+ -- dblink_exec() with an existing connection
+ SELECT * FROM dblink_tbl ORDER BY id;
+  id | value 
+ ----+-------
+   1 | X
+   2 | BB
+   3 | CCC
+   4 | DDDD
+ (4 rows)
+ 
+ BEGIN;
+ SELECT dblink_exec('conn_postgres', 'UPDATE dblink_tbl SET value = ''A'' WHERE id = 1');
+  dblink_exec 
+ -------------
+            1
+ (1 row)
+ 
+ SELECT name, srvname, status, keep FROM dblink_connections, pg_foreign_server WHERE server = oid;
+      name      |     srvname     | status | keep 
+ ---------------+-----------------+--------+------
+  conn_postgres | server_postgres | used   | t
+ (1 row)
+ 
+ ROLLBACK;
+ SELECT * FROM dblink_tbl ORDER BY id;
+  id | value 
+ ----+-------
+   1 | X
+   2 | BB
+   3 | CCC
+   4 | DDDD
+ (4 rows)
+ 
+ BEGIN;
+ SELECT dblink_exec('conn_postgres', 'UPDATE dblink_tbl SET value = ''A'' WHERE id = 1');
+  dblink_exec 
+ -------------
+            1
+ (1 row)
+ 
+ SELECT name, srvname, status, keep FROM dblink_connections, pg_foreign_server WHERE server = oid;
+      name      |     srvname     | status | keep 
+ ---------------+-----------------+--------+------
+  conn_postgres | server_postgres | used   | t
+ (1 row)
+ 
+ COMMIT;
+ SELECT * FROM dblink_tbl ORDER BY id;
+  id | value 
+ ----+-------
+   1 | A
+   2 | BB
+   3 | CCC
+   4 | DDDD
+ (4 rows)
+ 
+ -- dblink() with an existing connection
+ BEGIN;
+ SELECT * FROM dblink('conn_postgres', 'SELECT * FROM dblink_tbl ORDER BY id') AS t(id integer, value text) ORDER BY id;
+  id | value 
+ ----+-------
+   1 | A
+   2 | BB
+   3 | CCC
+   4 | DDDD
+ (4 rows)
+ 
+ SELECT name, srvname, status, keep FROM dblink_connections, pg_foreign_server WHERE server = oid;
+      name      |     srvname     | status | keep 
+ ---------------+-----------------+--------+------
+  conn_postgres | server_postgres | used   | t
+ (1 row)
+ 
+ COMMIT;
+ -- dblink_open() with an existing connection
+ SELECT * FROM dblink_cursor_test(dblink_open('conn_postgres', 'SELECT * FROM dblink_tbl ORDER BY id'), 100);
+  id | value 
+ ----+-------
+   1 | A
+   2 | BB
+   3 | CCC
+   4 | DDDD
+ (4 rows)
+ 
+ SELECT * FROM dblink_cursor_test(dblink_open('conn_postgres', 'SELECT * FROM dblink_tbl ORDER BY id'), 2);
+  id | value 
+ ----+-------
+   1 | A
+   2 | BB
+   3 | CCC
+   4 | DDDD
+ (4 rows)
+ 
+ SELECT * FROM dblink_cursor_test(dblink_open('conn_postgres', 'SELECT * FROM dblink_tbl ORDER BY id'), 1);
+  id | value 
+ ----+-------
+   1 | A
+   2 | BB
+   3 | CCC
+ (3 rows)
+ 
+ -- dblink_disconnect()
+ SELECT name, srvname, status, keep FROM dblink_connections, pg_foreign_server WHERE server = oid;
+      name      |     srvname     | status | keep 
+ ---------------+-----------------+--------+------
+  conn_postgres | server_postgres | idle   | t
+ (1 row)
+ 
+ SELECT dblink_disconnect('conn_postgres');
+  dblink_disconnect 
+ -------------------
+  t
+ (1 row)
+ 
+ SELECT name, srvname, status, keep FROM dblink_connections, pg_foreign_server WHERE server = oid;
+  name | srvname | status | keep 
+ ------+---------+--------+------
+ (0 rows)
+ 
+ -- dblink() with an anonymous connection
+ BEGIN;
+ SELECT * FROM dblink('server_postgres', 'SELECT * FROM dblink_tbl ORDER BY id') AS t(id integer, value text) ORDER BY id;;
+  id | value 
+ ----+-------
+   1 | A
+   2 | BB
+   3 | CCC
+   4 | DDDD
+ (4 rows)
+ 
+ SELECT name, srvname, status, keep FROM dblink_connections, pg_foreign_server WHERE server = oid;
+       name       |     srvname     | status | keep 
+ -----------------+-----------------+--------+------
+  server_postgres | server_postgres | used   | f
+ (1 row)
+ 
+ COMMIT;
+ SELECT name, srvname, status, keep FROM dblink_connections, pg_foreign_server WHERE server = oid;
+  name | srvname | status | keep 
+ ------+---------+--------+------
+ (0 rows)
+ 
+ -- dblink_exec() with an anonymous connection
+ BEGIN;
+ SELECT dblink_exec('server_postgres', 'UPDATE dblink_tbl SET value = value WHERE id < 3');
+  dblink_exec 
+ -------------
+            2
+ (1 row)
+ 
+ SELECT name, srvname, status, keep FROM dblink_connections, pg_foreign_server WHERE server = oid;
+       name       |     srvname     | status | keep 
+ -----------------+-----------------+--------+------
+  server_postgres | server_postgres | used   | f
+ (1 row)
+ 
+ COMMIT;
+ SELECT name, srvname, status, keep FROM dblink_connections, pg_foreign_server WHERE server = oid;
+  name | srvname | status | keep 
+ ------+---------+--------+------
+ (0 rows)
+ 
+ -- dblink_open() with an anonymous connection
+ SELECT * FROM dblink_cursor_test(dblink_open('server_postgres', 'SELECT * FROM dblink_tbl ORDER BY id'), 100);
+  id | value 
+ ----+-------
+   1 | A
+   2 | BB
+   3 | CCC
+   4 | DDDD
+ (4 rows)
+ 
+ SELECT * FROM dblink_cursor_test(dblink_open('server_postgres', 'SELECT * FROM dblink_tbl ORDER BY id'), 2);
+  id | value 
+ ----+-------
+   1 | A
+   2 | BB
+   3 | CCC
+   4 | DDDD
+ (4 rows)
+ 
+ SELECT * FROM dblink_cursor_test(dblink_open('server_postgres', 'SELECT * FROM dblink_tbl ORDER BY id'), 1);
+  id | value 
+ ----+-------
+   1 | A
+   2 | BB
+   3 | CCC
+ (3 rows)
+ 
+ SELECT name, srvname, status, keep FROM dblink_connections, pg_foreign_server WHERE server = oid;
+  name | srvname | status | keep 
+ ------+---------+--------+------
+ (0 rows)
+ 
diff -cprN head/contrib/dblink2/sql/dblink2.sql dblink2/contrib/dblink2/sql/dblink2.sql
*** head/contrib/dblink2/sql/dblink2.sql	1970-01-01 09:00:00.000000000 +0900
--- dblink2/contrib/dblink2/sql/dblink2.sql	2009-08-19 15:58:14.178525096 +0900
***************
*** 0 ****
--- 1,88 ----
+ -- Adjust this setting to control where the objects get created.
+ SET search_path = public;
+ 
+ SET client_min_messages = warning;
+ \set ECHO none
+ \i dblink2.sql
+ \set ECHO all
+ RESET client_min_messages;
+ 
+ CREATE SERVER server_postgres FOREIGN DATA WRAPPER postgresql OPTIONS (dbname 'contrib_regression');
+ CREATE USER MAPPING FOR CURRENT_USER SERVER server_postgres OPTIONS (user 'postgres');
+ 
+ CREATE TABLE dblink_tbl (id integer, value text);
+ INSERT INTO dblink_tbl VALUES(1, 'X');
+ INSERT INTO dblink_tbl VALUES(2, 'BB');
+ INSERT INTO dblink_tbl VALUES(3, 'CCC');
+ INSERT INTO dblink_tbl VALUES(4, 'DDDD');
+ 
+ CREATE TABLE dblink_tmp (LIKE dblink_tbl);
+ 
+ CREATE FUNCTION dblink_cursor_test(cursor integer, howmany integer)
+ RETURNS SETOF dblink_tmp AS
+ $$
+ TRUNCATE dblink_tmp;
+ INSERT INTO dblink_tmp SELECT * FROM dblink_fetch($1, $2) AS t(id integer, value text);
+ INSERT INTO dblink_tmp SELECT * FROM dblink_fetch($1, $2) AS t(id integer, value text);
+ INSERT INTO dblink_tmp SELECT * FROM dblink_fetch($1, $2) AS t(id integer, value text);
+ SELECT * FROM dblink_tmp;
+ $$
+ LANGUAGE sql;
+ 
+ -- dblink_connect()
+ SELECT name, srvname, status, keep FROM dblink_connections, pg_foreign_server WHERE server = oid;
+ SELECT dblink_connect('conn_postgres', 'server_postgres');
+ SELECT name, srvname, status, keep FROM dblink_connections, pg_foreign_server WHERE server = oid;
+ 
+ -- dblink_exec() with an existing connection
+ SELECT * FROM dblink_tbl ORDER BY id;
+ 
+ BEGIN;
+ SELECT dblink_exec('conn_postgres', 'UPDATE dblink_tbl SET value = ''A'' WHERE id = 1');
+ SELECT name, srvname, status, keep FROM dblink_connections, pg_foreign_server WHERE server = oid;
+ ROLLBACK;
+ 
+ SELECT * FROM dblink_tbl ORDER BY id;
+ 
+ BEGIN;
+ SELECT dblink_exec('conn_postgres', 'UPDATE dblink_tbl SET value = ''A'' WHERE id = 1');
+ SELECT name, srvname, status, keep FROM dblink_connections, pg_foreign_server WHERE server = oid;
+ COMMIT;
+ 
+ SELECT * FROM dblink_tbl ORDER BY id;
+ 
+ -- dblink() with an existing connection
+ BEGIN;
+ SELECT * FROM dblink('conn_postgres', 'SELECT * FROM dblink_tbl ORDER BY id') AS t(id integer, value text) ORDER BY id;
+ SELECT name, srvname, status, keep FROM dblink_connections, pg_foreign_server WHERE server = oid;
+ COMMIT;
+ 
+ -- dblink_open() with an existing connection
+ SELECT * FROM dblink_cursor_test(dblink_open('conn_postgres', 'SELECT * FROM dblink_tbl ORDER BY id'), 100);
+ SELECT * FROM dblink_cursor_test(dblink_open('conn_postgres', 'SELECT * FROM dblink_tbl ORDER BY id'), 2);
+ SELECT * FROM dblink_cursor_test(dblink_open('conn_postgres', 'SELECT * FROM dblink_tbl ORDER BY id'), 1);
+ 
+ -- dblink_disconnect()
+ SELECT name, srvname, status, keep FROM dblink_connections, pg_foreign_server WHERE server = oid;
+ SELECT dblink_disconnect('conn_postgres');
+ SELECT name, srvname, status, keep FROM dblink_connections, pg_foreign_server WHERE server = oid;
+ 
+ -- dblink() with an anonymous connection
+ BEGIN;
+ SELECT * FROM dblink('server_postgres', 'SELECT * FROM dblink_tbl ORDER BY id') AS t(id integer, value text) ORDER BY id;;
+ SELECT name, srvname, status, keep FROM dblink_connections, pg_foreign_server WHERE server = oid;
+ COMMIT;
+ SELECT name, srvname, status, keep FROM dblink_connections, pg_foreign_server WHERE server = oid;
+ 
+ -- dblink_exec() with an anonymous connection
+ BEGIN;
+ SELECT dblink_exec('server_postgres', 'UPDATE dblink_tbl SET value = value WHERE id < 3');
+ SELECT name, srvname, status, keep FROM dblink_connections, pg_foreign_server WHERE server = oid;
+ COMMIT;
+ SELECT name, srvname, status, keep FROM dblink_connections, pg_foreign_server WHERE server = oid;
+ 
+ -- dblink_open() with an anonymous connection
+ SELECT * FROM dblink_cursor_test(dblink_open('server_postgres', 'SELECT * FROM dblink_tbl ORDER BY id'), 100);
+ SELECT * FROM dblink_cursor_test(dblink_open('server_postgres', 'SELECT * FROM dblink_tbl ORDER BY id'), 2);
+ SELECT * FROM dblink_cursor_test(dblink_open('server_postgres', 'SELECT * FROM dblink_tbl ORDER BY id'), 1);
+ SELECT name, srvname, status, keep FROM dblink_connections, pg_foreign_server WHERE server = oid;
diff -cprN head/contrib/dblink2/uninstall_dblink2.sql dblink2/contrib/dblink2/uninstall_dblink2.sql
*** head/contrib/dblink2/uninstall_dblink2.sql	1970-01-01 09:00:00.000000000 +0900
--- dblink2/contrib/dblink2/uninstall_dblink2.sql	2009-08-19 11:08:10.151175641 +0900
***************
*** 0 ****
--- 1,4 ----
+ -- Adjust this setting to control where the objects get dropped.
+ SET search_path = public;
+ 
+ DROP FUNCTION dblink_postgres(internal);
diff -cprN head/src/backend/access/transam/xact.c dblink2/src/backend/access/transam/xact.c
*** head/src/backend/access/transam/xact.c	2009-06-11 23:48:54.000000000 +0900
--- dblink2/src/backend/access/transam/xact.c	2009-08-19 11:18:21.612157870 +0900
***************
*** 33,38 ****
--- 33,39 ----
  #include "commands/tablecmds.h"
  #include "commands/trigger.h"
  #include "executor/spi.h"
+ #include "foreign/dblink.h"
  #include "libpq/be-fsstubs.h"
  #include "miscadmin.h"
  #include "pgstat.h"
*************** CommitTransaction(void)
*** 1593,1598 ****
--- 1594,1602 ----
  	/* Now we can shut down the deferred-trigger manager */
  	AfterTriggerEndXact(true);
  
+ 	/* Commit all remote transactions */
+ 	AtCommit_dblink();
+ 
  	/* Close any open regular cursors */
  	AtCommit_Portals();
  
diff -cprN head/src/backend/catalog/system_views.sql dblink2/src/backend/catalog/system_views.sql
*** head/src/backend/catalog/system_views.sql	2009-04-07 09:31:26.000000000 +0900
--- dblink2/src/backend/catalog/system_views.sql	2009-08-19 12:40:49.328205213 +0900
*************** CREATE VIEW pg_user_mappings AS
*** 403,408 ****
--- 403,411 ----
  
  REVOKE ALL on pg_user_mapping FROM public;
  
+ CREATE VIEW dblink_connections AS
+     SELECT * FROM dblink_connections();
+ 
  --
  -- We have a few function definitions in here, too.
  -- At some point there might be enough to justify breaking them out into
diff -cprN head/src/backend/commands/foreigncmds.c dblink2/src/backend/commands/foreigncmds.c
*** head/src/backend/commands/foreigncmds.c	2009-06-11 23:48:55.000000000 +0900
--- dblink2/src/backend/commands/foreigncmds.c	2009-08-19 10:58:02.412170451 +0900
*************** AlterForeignServerOwner(const char *name
*** 312,332 ****
  	heap_freetuple(tup);
  }
  
  
  /*
   * Convert a validator function name passed from the parser to an Oid.
   */
  static Oid
! lookup_fdw_validator_func(List *validator)
  {
  	Oid			funcargtypes[2];
  
  	funcargtypes[0] = TEXTARRAYOID;
  	funcargtypes[1] = OIDOID;
! 	return LookupFuncName(validator, 2, funcargtypes, false);
  	/* return value is ignored, so we don't check the type */
  }
  
  
  /*
   * Create a foreign-data wrapper
--- 312,379 ----
  	heap_freetuple(tup);
  }
  
+ static void
+ parse_fdw_options(List *options, DefElem **validator, DefElem **connector)
+ {
+ 	ListCell   *cell;
+ 
+ 	*validator = NULL;
+ 	*connector = NULL;
+ 	foreach(cell, options)
+ 	{
+ 		DefElem    *def = lfirst(cell);
+ 
+ 		if (pg_strcasecmp(def->defname, "validator") == 0)
+ 		{
+ 			if (*validator)
+ 				elog(ERROR, "duplicated VALIDATOR");
+ 			*validator = def;
+ 		}
+ 		else if (pg_strcasecmp(def->defname, "connector") == 0)
+ 		{
+ 			if (*connector)
+ 				elog(ERROR, "duplicated CONNECTOR");
+ 			*connector = def;
+ 		}
+ 		else
+ 		{
+ 			elog(ERROR, "invalid option");
+ 		}
+ 	}
+ }
  
  /*
   * Convert a validator function name passed from the parser to an Oid.
   */
  static Oid
! lookup_fdw_validator_func(DefElem *validator)
  {
  	Oid			funcargtypes[2];
  
+ 	if (validator == NULL || validator->arg == NULL)
+ 		return InvalidOid;
+ 
  	funcargtypes[0] = TEXTARRAYOID;
  	funcargtypes[1] = OIDOID;
! 	return LookupFuncName((List *) validator->arg, 2, funcargtypes, false);
  	/* return value is ignored, so we don't check the type */
  }
  
+ /*
+  * Convert a connector function name passed from the parser to an Oid.
+  */
+ static Oid
+ lookup_fdw_connector_func(DefElem *connector)
+ {
+ 	Oid			funcargtypes[1];
+ 
+ 	if (connector == NULL || connector->arg == NULL)
+ 		return InvalidOid;
+ 
+ 	funcargtypes[0] = INTERNALOID;
+ 	return LookupFuncName((List *) connector->arg, 1, funcargtypes, false);
+ }
+ 
  
  /*
   * Create a foreign-data wrapper
*************** CreateForeignDataWrapper(CreateFdwStmt *
*** 339,345 ****
--- 386,395 ----
  	bool		nulls[Natts_pg_foreign_data_wrapper];
  	HeapTuple	tuple;
  	Oid			fdwId;
+ 	DefElem	   *defvalidator;
+ 	DefElem	   *defconnector;
  	Oid			fdwvalidator;
+ 	Oid			fdwconnector;
  	Datum		fdwoptions;
  	Oid			ownerId;
  
*************** CreateForeignDataWrapper(CreateFdwStmt *
*** 375,386 ****
  		DirectFunctionCall1(namein, CStringGetDatum(stmt->fdwname));
  	values[Anum_pg_foreign_data_wrapper_fdwowner - 1] = ObjectIdGetDatum(ownerId);
  
! 	if (stmt->validator)
! 		fdwvalidator = lookup_fdw_validator_func(stmt->validator);
! 	else
! 		fdwvalidator = InvalidOid;
  
  	values[Anum_pg_foreign_data_wrapper_fdwvalidator - 1] = fdwvalidator;
  
  	nulls[Anum_pg_foreign_data_wrapper_fdwacl - 1] = true;
  
--- 425,436 ----
  		DirectFunctionCall1(namein, CStringGetDatum(stmt->fdwname));
  	values[Anum_pg_foreign_data_wrapper_fdwowner - 1] = ObjectIdGetDatum(ownerId);
  
! 	parse_fdw_options(stmt->fdwoptions, &defvalidator, &defconnector);
! 	fdwvalidator = lookup_fdw_validator_func(defvalidator);
! 	fdwconnector = lookup_fdw_connector_func(defconnector);
  
  	values[Anum_pg_foreign_data_wrapper_fdwvalidator - 1] = fdwvalidator;
+ 	values[Anum_pg_foreign_data_wrapper_fdwconnector - 1] = fdwconnector;
  
  	nulls[Anum_pg_foreign_data_wrapper_fdwacl - 1] = true;
  
*************** AlterForeignDataWrapper(AlterFdwStmt *st
*** 434,440 ****
--- 484,493 ----
  	Oid			fdwId;
  	bool		isnull;
  	Datum		datum;
+ 	DefElem	   *defvalidator;
+ 	DefElem	   *defconnector;
  	Oid			fdwvalidator;
+ 	Oid			fdwconnector;
  
  	/* Must be super user */
  	if (!superuser())
*************** AlterForeignDataWrapper(AlterFdwStmt *st
*** 459,467 ****
  	memset(repl_null, false, sizeof(repl_null));
  	memset(repl_repl, false, sizeof(repl_repl));
  
! 	if (stmt->change_validator)
  	{
! 		fdwvalidator = stmt->validator ? lookup_fdw_validator_func(stmt->validator) : InvalidOid;
  		repl_val[Anum_pg_foreign_data_wrapper_fdwvalidator - 1] = ObjectIdGetDatum(fdwvalidator);
  		repl_repl[Anum_pg_foreign_data_wrapper_fdwvalidator - 1] = true;
  
--- 512,522 ----
  	memset(repl_null, false, sizeof(repl_null));
  	memset(repl_repl, false, sizeof(repl_repl));
  
! 	parse_fdw_options(stmt->fdwoptions, &defvalidator, &defconnector);
! 
! 	if (defvalidator)
  	{
! 		fdwvalidator = lookup_fdw_validator_func(defvalidator);
  		repl_val[Anum_pg_foreign_data_wrapper_fdwvalidator - 1] = ObjectIdGetDatum(fdwvalidator);
  		repl_repl[Anum_pg_foreign_data_wrapper_fdwvalidator - 1] = true;
  
*************** AlterForeignDataWrapper(AlterFdwStmt *st
*** 469,475 ****
  		 * It could be that the options for the FDW, SERVER and USER MAPPING
  		 * are no longer valid with the new validator.	Warn about this.
  		 */
! 		if (stmt->validator)
  			ereport(WARNING,
  			 (errmsg("changing the foreign-data wrapper validator can cause "
  					 "the options for dependent objects to become invalid")));
--- 524,530 ----
  		 * It could be that the options for the FDW, SERVER and USER MAPPING
  		 * are no longer valid with the new validator.	Warn about this.
  		 */
! 		if (defvalidator->arg)
  			ereport(WARNING,
  			 (errmsg("changing the foreign-data wrapper validator can cause "
  					 "the options for dependent objects to become invalid")));
*************** AlterForeignDataWrapper(AlterFdwStmt *st
*** 487,492 ****
--- 542,566 ----
  		fdwvalidator = DatumGetObjectId(datum);
  	}
  
+ 	if (defconnector)
+ 	{
+ 		fdwconnector = lookup_fdw_connector_func(defconnector);
+ 		repl_val[Anum_pg_foreign_data_wrapper_fdwconnector - 1] = ObjectIdGetDatum(fdwconnector);
+ 		repl_repl[Anum_pg_foreign_data_wrapper_fdwconnector - 1] = true;
+ 	}
+ 	else
+ 	{
+ 		/*
+ 		 * Connector is not changed, but we need it for validating options.
+ 		 */
+ 		datum = SysCacheGetAttr(FOREIGNDATAWRAPPEROID,
+ 								tp,
+ 								Anum_pg_foreign_data_wrapper_fdwconnector,
+ 								&isnull);
+ 		Assert(!isnull);
+ 		fdwconnector = DatumGetObjectId(datum);
+ 	}
+ 
  	/*
  	 * Options specified, validate and update.
  	 */
diff -cprN head/src/backend/foreign/Makefile dblink2/src/backend/foreign/Makefile
*** head/src/backend/foreign/Makefile	2009-02-24 19:06:32.000000000 +0900
--- dblink2/src/backend/foreign/Makefile	2009-08-19 11:08:20.718460218 +0900
*************** subdir = src/backend/foreign
*** 12,17 ****
  top_builddir = ../../..
  include $(top_builddir)/src/Makefile.global
  
! OBJS= foreign.o
  
  include $(top_srcdir)/src/backend/common.mk
--- 12,17 ----
  top_builddir = ../../..
  include $(top_builddir)/src/Makefile.global
  
! OBJS= dblink.o foreign.o
  
  include $(top_srcdir)/src/backend/common.mk
diff -cprN head/src/backend/foreign/dblink.c dblink2/src/backend/foreign/dblink.c
*** head/src/backend/foreign/dblink.c	1970-01-01 09:00:00.000000000 +0900
--- dblink2/src/backend/foreign/dblink.c	2009-08-19 16:18:33.053168777 +0900
***************
*** 0 ****
--- 1,680 ----
+ /*-------------------------------------------------------------------------
+  *
+  * dblink.c
+  *		  support for foreign server connectors.
+  *
+  *-------------------------------------------------------------------------
+  */
+ #include "postgres.h"
+ 
+ #include "access/xact.h"
+ #include "foreign/dblink.h"
+ #include "foreign/foreign.h"
+ #include "funcapi.h"
+ #include "miscadmin.h"
+ #include "utils/acl.h"
+ #include "utils/builtins.h"
+ #include "utils/memutils.h"
+ 
+ #undef open
+ 
+ #define NameGetTextDatum(name)	CStringGetTextDatum(NameStr(name))
+ 
+ /* initial number of connection hashes */
+ #define NUMCONN		16
+ 
+ typedef enum ConnStatus
+ {
+ 	CS_UNUSED = 0,	/* unconnected */
+ 	CS_IDLE,		/* connection idle */
+ 	CS_USED,		/* in transaction */
+ 	CS_PREPARED,	/* transaction is prepared and ready to commit */
+ } ConnStatus;
+ 
+ static const char *ConnStatusName[] =
+ {
+ 	"unused",
+ 	"idle",
+ 	"used",
+ 	"prepared",
+ };
+ 
+ typedef struct Conn
+ {
+ 	NameData			name;
+ 	dblink_connection  *connection;
+ 	ConnStatus			status;
+ 	Oid					server;		/* server oid (only for display) */
+ 	bool				keep;		/* true iff connected by user */
+ } Conn;
+ 
+ typedef struct Cursor
+ {
+ 	int32				id;
+ 	dblink_cursor	   *cursor;
+ } Cursor;
+ 
+ extern Datum dblink_connect(PG_FUNCTION_ARGS);
+ extern Datum dblink_connect_name(PG_FUNCTION_ARGS);
+ extern Datum dblink_disconnect(PG_FUNCTION_ARGS);
+ extern Datum dblink_query(PG_FUNCTION_ARGS);
+ extern Datum dblink_exec(PG_FUNCTION_ARGS);
+ extern Datum dblink_open(PG_FUNCTION_ARGS);
+ extern Datum dblink_fetch(PG_FUNCTION_ARGS);
+ extern Datum dblink_close(PG_FUNCTION_ARGS);
+ extern Datum dblink_connections(PG_FUNCTION_ARGS);
+ 
+ static Conn *searchLink(const text *name, HASHACTION action, bool *found);
+ static Conn *searchLinkByName(const NameData *name, HASHACTION action, bool *found);
+ static Conn *doConnect(const text *name, const text *servername, bool *isNew);
+ static Conn *doTransaction(const text *name);
+ static ForeignServer *getServerByName(const char *name);
+ static void AtEOXact_dblink(XactEvent event, void *arg);
+ static List *getConnectionParams(ForeignServer *server, ForeignDataWrapper *fdw);
+ static void closeCursors(void);
+ 
+ static HTAB	   *connections;	/* all connections managed by the module */
+ static List	   *cursors;		/* all cursors managed by the module */
+ static int32	next_cursor_id = 0;
+ 
+ /*
+  * dblink_connect(server text) : boolean
+  */
+ Datum
+ dblink_connect(PG_FUNCTION_ARGS)
+ {
+ 	text   *name = PG_GETARG_TEXT_PP(0);
+ 	Conn   *conn;
+ 	bool	isNew;
+ 
+ 	conn = doConnect(name, name, &isNew);
+ 	conn->keep = true;
+ 
+ 	PG_RETURN_BOOL(isNew);
+ }
+ 
+ /*
+  * dblink_connect_name(name text, server text) : boolean
+  */
+ Datum
+ dblink_connect_name(PG_FUNCTION_ARGS)
+ {
+ 	text   *name = PG_GETARG_TEXT_PP(0);
+ 	text   *server = PG_GETARG_TEXT_PP(1);
+ 	Conn   *conn;
+ 	bool	isNew;
+ 
+ 	conn = doConnect(name, server, &isNew);
+ 	conn->keep = true;
+ 
+ 	PG_RETURN_BOOL(isNew);
+ }
+ 
+ /*
+  * dblink_disconnect(name text) : boolean
+  */
+ Datum
+ dblink_disconnect(PG_FUNCTION_ARGS)
+ {
+ 	text	   *name = PG_GETARG_TEXT_PP(0);
+ 	Conn	   *conn;
+ 
+ 	conn = searchLink(name, HASH_FIND, NULL);
+ 	if (conn == NULL)
+ 		PG_RETURN_BOOL(false);	/* not found */
+ 
+ 	conn->connection->disconnect(conn->connection);
+ 	conn->status = CS_UNUSED;
+ 	searchLink(name, HASH_REMOVE, NULL);
+ 
+ 	PG_RETURN_BOOL(true);
+ }
+ 
+ typedef struct query_context
+ {
+ 	dblink_cursor	   *cursor;
+ 	const char		  **values;
+ } query_context;
+ 
+ static Datum
+ query_or_fetch(bool query, PG_FUNCTION_ARGS)
+ {
+ 	query_context	   *context;
+ 	FuncCallContext	   *fctx;
+ 	AttInMetadata	   *attinmeta;
+ 
+ 	if (SRF_IS_FIRSTCALL())
+ 	{
+ 		Conn		   *conn;
+ 		TupleDesc		tupdesc;
+ 		MemoryContext	mctx;
+ 		dblink_cursor  *cursor;
+ 		int				nfields;
+ 
+ 		/* Build a tuple descriptor for our result type */
+ 		if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ 			elog(ERROR, "return type must be a row type");
+ 
+ 		fctx = SRF_FIRSTCALL_INIT();
+ 
+ 		if (query)
+ 		{
+ 			char   *sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
+ 
+ 			conn = doTransaction(PG_GETARG_TEXT_PP(0));
+ 			cursor = conn->connection->open(conn->connection, sql, 0);
+ 		}
+ 		else
+ 		{
+ 			int32		id = PG_GETARG_INT32(0);
+ 			int32		howmany = PG_GETARG_INT32(1);
+ 			ListCell   *cell;
+ 
+ 			if (howmany < 1)
+ 				SRF_RETURN_DONE(fctx);	/* no rows required */
+ 
+ 			cursor = NULL;
+ 			foreach(cell, cursors)
+ 			{
+ 				Cursor *cur = (Cursor *) lfirst(cell);
+ 				if (cur->id == id)
+ 				{
+ 					cursor = cur->cursor;
+ 					break;
+ 				}
+ 			}
+ 			if (cursor == NULL)
+ 				SRF_RETURN_DONE(fctx);	/* cursor not found */
+ 
+ 			fctx->max_calls = howmany;
+ 		}
+ 
+ 		nfields = cursor->nfields;
+ 		if (nfields < 1)
+ 			SRF_RETURN_DONE(fctx);	/* no fields */
+ 
+ 		mctx = MemoryContextSwitchTo(fctx->multi_call_memory_ctx);
+ 		context = palloc(sizeof(query_context));
+ 		context->cursor = cursor;
+ 		context->values = palloc(nfields * sizeof(char *));
+ 
+ 		/* make sure we have a persistent copy of the tupdesc */
+ 		tupdesc = CreateTupleDescCopy(tupdesc);
+ 
+ 		/* store needed metadata for subsequent calls */
+ 		attinmeta = TupleDescGetAttInMetadata(tupdesc);
+ 		fctx->attinmeta = attinmeta;
+ 
+ 		fctx->user_fctx = context;
+ 		MemoryContextSwitchTo(mctx);
+ 	}
+ 	else
+ 	{
+ 		fctx = SRF_PERCALL_SETUP();
+ 		attinmeta = fctx->attinmeta;
+ 		context = fctx->user_fctx;
+ 	}
+ 
+ 	/* Exit if fetch limit exceeded. Don't close cursor in this case. */
+ 	if (!query && fctx->call_cntr >= fctx->max_calls)
+ 		SRF_RETURN_DONE(fctx);
+ 
+ 	if (context->cursor->fetch(context->cursor, context->values))
+ 	{
+ 		HeapTuple	tuple;
+ 		Datum		result;
+ 		
+ 		tuple = BuildTupleFromCStrings(attinmeta, (char **) context->values);
+ 		result = HeapTupleGetDatum(tuple);
+ 
+ 		SRF_RETURN_NEXT(fctx, result);
+ 	}
+ 	else
+ 	{
+ 		ListCell   *cell;
+ 		ListCell   *prev;
+ 
+ 		context->cursor->close(context->cursor);
+ 
+ 		/* forget cursor */
+ 		prev = NULL;
+ 		foreach(cell, cursors)
+ 		{
+ 			Cursor *cur = (Cursor *) lfirst(cell);
+ 
+ 			if (cur->cursor == context->cursor)
+ 			{
+ 				cursors = list_delete_cell(cursors, cell, prev);
+ 				break;
+ 			}
+ 
+ 			prev = cell;
+ 		}
+ 
+ 		SRF_RETURN_DONE(fctx);
+ 	}
+ }
+ 
+ /*
+  * dblink_query(name text, sql text) : SETOF record
+  */
+ Datum
+ dblink_query(PG_FUNCTION_ARGS)
+ {
+ 	return query_or_fetch(true, fcinfo);
+ }
+ 
+ /*
+  * dblink_exec(name text, sql text) : bigint
+  */
+ Datum
+ dblink_exec(PG_FUNCTION_ARGS)
+ {
+ 	Conn	*conn = doTransaction(PG_GETARG_TEXT_PP(0));
+ 	char	*sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
+ 	int64	ntuples;
+ 
+ 	ntuples = conn->connection->exec(conn->connection, sql);
+ 
+ 	PG_RETURN_INT64(ntuples);
+ }
+ 
+ /*
+  * dblink_open(name text, sql text) : cursor
+  */
+ Datum
+ dblink_open(PG_FUNCTION_ARGS)
+ {
+ #define DEFAULT_FETCHSIZE		1000
+ 
+ 	Conn		   *conn;
+ 	dblink_cursor  *cursor;
+ 	char		   *sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
+ 	MemoryContext	ctx;
+ 	Cursor		   *cur;
+ 
+ 	conn = doTransaction(PG_GETARG_TEXT_PP(0));
+ 	cursor = conn->connection->open(conn->connection, sql, DEFAULT_FETCHSIZE);
+ 
+ 	ctx = MemoryContextSwitchTo(TopMemoryContext);
+ 	cur = (Cursor *) palloc(sizeof(Cursor));
+ 	cur->id = ++next_cursor_id;
+ 	cur->cursor = cursor;
+ 	cursors = lappend(cursors, cur);
+ 	MemoryContextSwitchTo(ctx);
+ 
+ 	PG_RETURN_INT32(cur->id);
+ }
+ 
+ /*
+  * dblink_fetch(cursor, howmany integer) : SETOF record
+  */
+ Datum
+ dblink_fetch(PG_FUNCTION_ARGS)
+ {
+ 	return query_or_fetch(false, fcinfo);
+ }
+ 
+ /*
+  * dblink_close(cur cursor) : boolean
+  */
+ Datum
+ dblink_close(PG_FUNCTION_ARGS)
+ {
+ 	int32		id = PG_GETARG_INT32(0);
+ 	ListCell   *cell;
+ 	ListCell   *prev;
+ 
+ 	/* forget cursor */
+ 	prev = NULL;
+ 	foreach(cell, cursors)
+ 	{
+ 		Cursor *cur = (Cursor *) lfirst(cell);
+ 
+ 		if (cur->id == id)
+ 		{
+ 			cur->cursor->close(cur->cursor);
+ 			cursors = list_delete_cell(cursors, cell, prev);
+ 			PG_RETURN_BOOL(true);
+ 		}
+ 
+ 		prev = cell;
+ 	}
+ 
+ 	PG_RETURN_BOOL(false);
+ }
+ 
+ /*
+  * dblink_connections() :
+  * SETOF (name text, server oid, status text, keep boolean)
+  */
+ #define DBLINK_COLS		4
+ Datum
+ dblink_connections(PG_FUNCTION_ARGS)
+ {
+ 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ 	TupleDesc	tupdesc;
+ 	Tuplestorestate *tupstore;
+ 	MemoryContext per_query_ctx;
+ 	MemoryContext oldcontext;
+ 
+ 	/* check to see if caller supports us returning a tuplestore */
+ 	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+ 		ereport(ERROR,
+ 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ 				 errmsg("set-valued function called in context that cannot accept a set")));
+ 	if (!(rsinfo->allowedModes & SFRM_Materialize))
+ 		ereport(ERROR,
+ 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ 				 errmsg("materialize mode required, but it is not " \
+ 						"allowed in this context")));
+ 
+ 	/* Build a tuple descriptor for our result type */
+ 	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ 		elog(ERROR, "return type must be a row type");
+ 
+ 	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+ 	oldcontext = MemoryContextSwitchTo(per_query_ctx);
+ 
+ 	tupstore = tuplestore_begin_heap(true, false, work_mem);
+ 	rsinfo->returnMode = SFRM_Materialize;
+ 	rsinfo->setResult = tupstore;
+ 	rsinfo->setDesc = tupdesc;
+ 
+ 	if (connections)
+ 	{
+ 		HASH_SEQ_STATUS seq;
+ 		Conn  *conn;
+ 
+ 		hash_seq_init(&seq, connections);
+ 		while ((conn = (Conn *) hash_seq_search(&seq)) != NULL)
+ 		{
+ 			Datum		values[DBLINK_COLS];
+ 			bool		nulls[DBLINK_COLS];
+ 			int			i = 0;
+ 
+ 			/* generate junk in short-term context */
+ 			MemoryContextSwitchTo(oldcontext);
+ 
+ 			memset(values, 0, sizeof(values));
+ 			memset(nulls, 0, sizeof(nulls));
+ 
+ 			/* use query for server type */
+ 			values[i++] = CStringGetTextDatum(NameStr(conn->name));
+ 			values[i++] = ObjectIdGetDatum(conn->server);
+ 			values[i++] = CStringGetTextDatum(ConnStatusName[conn->status]);
+ 			values[i++] = BoolGetDatum(conn->keep);
+ 
+ 			Assert(i == DBLINK_COLS);
+ 
+ 			/* switch to appropriate context while storing the tuple */
+ 			MemoryContextSwitchTo(per_query_ctx);
+ 			tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+ 		}
+ 	}
+ 
+ 	/* clean up and return the tuplestore */
+ 	tuplestore_donestoring(tupstore);
+ 
+ 	MemoryContextSwitchTo(oldcontext);
+ 
+ 	return (Datum) 0;
+ }
+ 
+ /*
+  * search a connection by text type
+  */
+ static Conn *
+ searchLink(const text *name, HASHACTION action, bool *found)
+ {
+ 	NameData	key;
+ 	const char *str = VARDATA_ANY(name);
+ 	int			len = VARSIZE_ANY_EXHDR(name);
+ 
+ 	if (len >= NAMEDATALEN)
+ 		elog(ERROR, "connection name too long");
+ 
+ 	MemSet(key.data, 0, NAMEDATALEN);
+ 	strncpy(key.data, str, len);
+ 
+ 	return searchLinkByName(&key, action, found);
+ }
+ 
+ /*
+  * search a connection by name type
+  */
+ static Conn *
+ searchLinkByName(const NameData *name, HASHACTION action, bool *found)
+ {
+ 	if (!connections)
+ 	{
+ 		HASHCTL		ctl;
+ 
+ 		ctl.keysize = NAMEDATALEN;
+ 		ctl.entrysize = sizeof(Conn);
+ 		connections = hash_create("dblink", NUMCONN, &ctl, HASH_ELEM);
+ 
+ 		/* register error callback */
+ 		RegisterXactCallback(AtEOXact_dblink, 0);
+ 	}
+ 
+ 	return (Conn *) hash_search(connections, name, action, found);
+ }
+ 
+ static Conn *
+ doConnect(const text *name, const text *servername, bool *isNew)
+ {
+ 	Conn			   *conn;
+ 	bool				found;
+ 	ForeignServer	   *server;
+ 	ForeignDataWrapper *fdw;
+ 	List			   *params;
+ 
+ 	server = getServerByName(text_to_cstring(servername));
+ 	fdw = GetForeignDataWrapper(server->fdwid);
+ 
+ 	if (fdw->fdwconnector == InvalidOid)
+ 		elog(ERROR, "server '%s' and foreign data wrapper '%s' have no connector",
+ 			server->servername, fdw->fdwname);
+ 
+ 	params = getConnectionParams(server, fdw);
+ 
+ 	conn = searchLink(name, HASH_ENTER, &found);
+ 	if (found && conn->status != CS_UNUSED)
+ 	{
+ 		if (conn->server != server->serverid)
+ 			elog(ERROR, "same name for different connector");
+ 	}
+ 	else
+ 	{
+ 		conn->status = CS_UNUSED;
+ 		conn->server = server->serverid;
+ 	}
+ 
+ 	if (conn->status == CS_UNUSED)
+ 	{
+ 		conn->connection = (dblink_connection *) DatumGetPointer(
+ 			OidFunctionCall1(fdw->fdwconnector, PointerGetDatum(params)));
+ 		conn->status = CS_IDLE;
+ 		conn->keep = false;
+ 	}
+ 
+ 	if (isNew)
+ 		*isNew = !found;
+ 
+ 	return conn;
+ }
+ 
+ static Conn *
+ doTransaction(const text *name)
+ {
+ 	Conn	   *conn;
+ 
+ 	conn = searchLink(name, HASH_FIND, NULL);
+ 	if (conn == NULL)
+ 		conn = doConnect(name, name, NULL);
+ 
+ 	switch (conn->status)
+ 	{
+ 	case CS_IDLE:
+ 		/* start transaction automatically */
+ 		if (!conn->connection->command(conn->connection, DBLINK_XA_START))
+ 			elog(ERROR, "DBLINK_XA_START failed for '%s'", NameStr(conn->name));
+ 		conn->status = CS_USED;
+ 		break;
+ 	case CS_USED:
+ 		/* noop */
+ 		break;
+ 	default:
+ 		elog(ERROR, "unexpected status: %d", conn->status);
+ 		break;
+ 	}
+ 
+ 	return conn;
+ }
+ 
+ /*
+  * get foreign server by name and check acl.
+  */
+ static ForeignServer *
+ getServerByName(const char *name)
+ {
+ 	ForeignServer  *server;
+ 	AclResult		aclresult;
+ 
+ 	server = GetForeignServerByName(name, false);
+ 
+ 	/* Check permissions, user must have usage on the server. */
+ 	aclresult = pg_foreign_server_aclcheck(server->serverid, GetUserId(), ACL_USAGE);
+ 	if (aclresult != ACLCHECK_OK)
+ 		aclcheck_error(aclresult, ACL_KIND_FOREIGN_SERVER, server->servername);
+ 
+ 	return server;
+ }
+ 
+ /*
+  * Commit all remote transaction with 2PC.
+  */
+ void
+ AtCommit_dblink(void)
+ {
+ 	closeCursors();
+ 	if (connections)
+ 	{
+ 		HASH_SEQ_STATUS seq;
+ 		Conn		   *conn;
+ 
+ 		hash_seq_init(&seq, connections);
+ 		while ((conn = (Conn *) hash_seq_search(&seq)) != NULL)
+ 		{
+ 			if (conn->status == CS_USED)
+ 			{
+ 				if (!conn->connection->command(conn->connection, DBLINK_XA_PREPARE))
+ 					elog(ERROR, "DBLINK_XA_PREPARE failed for '%s'", NameStr(conn->name));
+ 				conn->status = CS_PREPARED;
+ 			}
+ 		}
+ 
+ 		hash_seq_init(&seq, connections);
+ 		while ((conn = (Conn *) hash_seq_search(&seq)) != NULL)
+ 		{
+ 			if (conn->status == CS_PREPARED)
+ 			{
+ 				if (!conn->connection->command(conn->connection, DBLINK_XA_COMMIT))
+ 				{
+ 					/* XXX: or FATAL? */
+ 					elog(WARNING, "DBLINK_XA_COMMIT failed for '%s'", NameStr(conn->name));
+ 				}
+ 				conn->status = CS_IDLE;
+ 			}
+ 		}
+ 	}
+ }
+ 
+ /*
+  * Rollback all remote transaction on error.
+  */
+ static void
+ AtEOXact_dblink(XactEvent event, void *arg)
+ {
+ 	HASH_SEQ_STATUS seq;
+ 	Conn		   *conn;
+ 	TimestampTz		now;
+ 
+ 	closeCursors();
+ 	if (connections == NULL || hash_get_num_entries(connections) < 1)
+ 		return;
+ 
+ 	now = GetCurrentTimestamp();
+ 
+ 	hash_seq_init(&seq, connections);
+ 	while ((conn = (Conn *) hash_seq_search(&seq)) != NULL)
+ 	{
+ 		switch (conn->status)
+ 		{
+ 		case CS_USED:
+ 			conn->connection->command(conn->connection, DBLINK_ROLLBACK);
+ 			conn->status = CS_IDLE;
+ 			break;
+ 		case CS_PREPARED:
+ 			conn->connection->command(conn->connection, DBLINK_XA_ROLLBACK);
+ 			conn->status = CS_IDLE;
+ 			break;
+ 		default:
+ 			break;
+ 		}
+ 
+ 		/* disconnect automatic connections */
+ 		if (conn->status == CS_IDLE && !conn->keep)
+ 		{
+ 			conn->connection->disconnect(conn->connection);
+ 			conn->status = CS_UNUSED;
+ 		}
+ 
+ 		/* remove if unused */
+ 		if (conn->status == CS_UNUSED)
+ 			searchLinkByName(&conn->name, HASH_REMOVE, NULL);
+ 	}
+ }
+ 
+ /*
+  * Obtain connection string for a foreign server
+  */
+ static List *
+ getConnectionParams(ForeignServer *server, ForeignDataWrapper *fdw)
+ {
+ 	UserMapping *user_mapping;
+ 	Oid			serverid;
+ 	Oid			fdwid;
+ 	Oid			userid;
+ 	List	   *result;
+ 
+ 	serverid = server->serverid;
+ 	fdwid = server->fdwid;
+ 	userid = GetUserId();
+ 	user_mapping = GetUserMapping(userid, serverid);
+ 	fdw = GetForeignDataWrapper(fdwid);
+ 
+ 	result = list_copy(fdw->options);
+ 	result = list_concat(result, list_copy(server->options));
+ 	result = list_concat(result, list_copy(user_mapping->options));
+ 
+ 	return result;
+ }
+ 
+ static void
+ closeCursors(void)
+ {
+ 	ListCell *cell;
+ 
+ 	if (cursors != NIL)
+ 	{
+ 		foreach (cell, cursors)
+ 		{
+ 			Cursor *cur = (Cursor *) lfirst(cell);
+ 			cur->cursor->close(cur->cursor);
+ 		}
+ 		list_free(cursors);
+ 		cursors = NIL;
+ 	}
+ }
diff -cprN head/src/backend/foreign/foreign.c dblink2/src/backend/foreign/foreign.c
*** head/src/backend/foreign/foreign.c	2009-06-12 01:14:18.000000000 +0900
--- dblink2/src/backend/foreign/foreign.c	2009-08-19 12:42:48.250171886 +0900
*************** GetForeignDataWrapper(Oid fdwid)
*** 61,66 ****
--- 61,67 ----
  	fdw->owner = fdwform->fdwowner;
  	fdw->fdwname = pstrdup(NameStr(fdwform->fdwname));
  	fdw->fdwvalidator = fdwform->fdwvalidator;
+ 	fdw->fdwconnector = fdwform->fdwconnector;
  
  	/* Extract the options */
  	datum = SysCacheGetAttr(FOREIGNDATAWRAPPEROID,
diff -cprN head/src/backend/nodes/copyfuncs.c dblink2/src/backend/nodes/copyfuncs.c
*** head/src/backend/nodes/copyfuncs.c	2009-07-30 11:45:37.000000000 +0900
--- dblink2/src/backend/nodes/copyfuncs.c	2009-08-19 10:58:02.414273021 +0900
*************** _copyCreateFdwStmt(CreateFdwStmt *from)
*** 2957,2963 ****
  	CreateFdwStmt *newnode = makeNode(CreateFdwStmt);
  
  	COPY_STRING_FIELD(fdwname);
! 	COPY_NODE_FIELD(validator);
  	COPY_NODE_FIELD(options);
  
  	return newnode;
--- 2957,2963 ----
  	CreateFdwStmt *newnode = makeNode(CreateFdwStmt);
  
  	COPY_STRING_FIELD(fdwname);
! 	COPY_NODE_FIELD(fdwoptions);
  	COPY_NODE_FIELD(options);
  
  	return newnode;
*************** _copyAlterFdwStmt(AlterFdwStmt *from)
*** 2969,2976 ****
  	AlterFdwStmt *newnode = makeNode(AlterFdwStmt);
  
  	COPY_STRING_FIELD(fdwname);
! 	COPY_NODE_FIELD(validator);
! 	COPY_SCALAR_FIELD(change_validator);
  	COPY_NODE_FIELD(options);
  
  	return newnode;
--- 2969,2975 ----
  	AlterFdwStmt *newnode = makeNode(AlterFdwStmt);
  
  	COPY_STRING_FIELD(fdwname);
! 	COPY_NODE_FIELD(fdwoptions);
  	COPY_NODE_FIELD(options);
  
  	return newnode;
diff -cprN head/src/backend/nodes/equalfuncs.c dblink2/src/backend/nodes/equalfuncs.c
*** head/src/backend/nodes/equalfuncs.c	2009-07-30 11:45:37.000000000 +0900
--- dblink2/src/backend/nodes/equalfuncs.c	2009-08-19 10:58:02.415272908 +0900
*************** static bool
*** 1542,1548 ****
  _equalCreateFdwStmt(CreateFdwStmt *a, CreateFdwStmt *b)
  {
  	COMPARE_STRING_FIELD(fdwname);
! 	COMPARE_NODE_FIELD(validator);
  	COMPARE_NODE_FIELD(options);
  
  	return true;
--- 1542,1548 ----
  _equalCreateFdwStmt(CreateFdwStmt *a, CreateFdwStmt *b)
  {
  	COMPARE_STRING_FIELD(fdwname);
! 	COMPARE_NODE_FIELD(fdwoptions);
  	COMPARE_NODE_FIELD(options);
  
  	return true;
*************** static bool
*** 1552,1559 ****
  _equalAlterFdwStmt(AlterFdwStmt *a, AlterFdwStmt *b)
  {
  	COMPARE_STRING_FIELD(fdwname);
! 	COMPARE_NODE_FIELD(validator);
! 	COMPARE_SCALAR_FIELD(change_validator);
  	COMPARE_NODE_FIELD(options);
  
  	return true;
--- 1552,1558 ----
  _equalAlterFdwStmt(AlterFdwStmt *a, AlterFdwStmt *b)
  {
  	COMPARE_STRING_FIELD(fdwname);
! 	COMPARE_NODE_FIELD(fdwoptions);
  	COMPARE_NODE_FIELD(options);
  
  	return true;
diff -cprN head/src/backend/parser/gram.y dblink2/src/backend/parser/gram.y
*** head/src/backend/parser/gram.y	2009-08-03 07:14:52.000000000 +0900
--- dblink2/src/backend/parser/gram.y	2009-08-19 16:18:14.421178465 +0900
*************** static TypeName *TableFuncTypeName(List 
*** 228,234 ****
  %type <list>	createdb_opt_list alterdb_opt_list copy_opt_list
  				transaction_mode_list
  %type <defelt>	createdb_opt_item alterdb_opt_item copy_opt_item
! 				transaction_mode_item
  
  %type <ival>	opt_lock lock_type cast_context
  %type <boolean>	opt_force opt_or_replace
--- 228,234 ----
  %type <list>	createdb_opt_list alterdb_opt_list copy_opt_list
  				transaction_mode_list
  %type <defelt>	createdb_opt_item alterdb_opt_item copy_opt_item
! 				transaction_mode_item fdw_option
  
  %type <ival>	opt_lock lock_type cast_context
  %type <boolean>	opt_force opt_or_replace
*************** static TypeName *TableFuncTypeName(List 
*** 256,262 ****
  				index_name name file_name cluster_index_specification
  
  %type <list>	func_name handler_name qual_Op qual_all_Op subquery_Op
! 				opt_class opt_validator validator_clause
  
  %type <range>	qualified_name OptConstrFromTable
  
--- 256,262 ----
  				index_name name file_name cluster_index_specification
  
  %type <list>	func_name handler_name qual_Op qual_all_Op subquery_Op
! 				opt_class opt_validator validator_clause fdw_options
  
  %type <range>	qualified_name OptConstrFromTable
  
*************** static TypeName *TableFuncTypeName(List 
*** 455,462 ****
  	CACHE CALLED CASCADE CASCADED CASE CAST CATALOG_P CHAIN CHAR_P
  	CHARACTER CHARACTERISTICS CHECK CHECKPOINT CLASS CLOSE
  	CLUSTER COALESCE COLLATE COLUMN COMMENT COMMIT
! 	COMMITTED CONCURRENTLY CONFIGURATION CONNECTION CONSTRAINT CONSTRAINTS
! 	CONTENT_P CONTINUE_P CONVERSION_P COPY COST CREATE CREATEDB
  	CREATEROLE CREATEUSER CROSS CSV CURRENT_P
  	CURRENT_CATALOG CURRENT_DATE CURRENT_ROLE CURRENT_SCHEMA
  	CURRENT_TIME CURRENT_TIMESTAMP CURRENT_USER CURSOR CYCLE
--- 455,462 ----
  	CACHE CALLED CASCADE CASCADED CASE CAST CATALOG_P CHAIN CHAR_P
  	CHARACTER CHARACTERISTICS CHECK CHECKPOINT CLASS CLOSE
  	CLUSTER COALESCE COLLATE COLUMN COMMENT COMMIT
! 	COMMITTED CONCURRENTLY CONFIGURATION CONNECTION CONNECTOR CONSTRAINT
! 	CONSTRAINTS CONTENT_P CONTINUE_P CONVERSION_P COPY COST CREATE CREATEDB
  	CREATEROLE CREATEUSER CROSS CSV CURRENT_P
  	CURRENT_CATALOG CURRENT_DATE CURRENT_ROLE CURRENT_SCHEMA
  	CURRENT_TIME CURRENT_TIMESTAMP CURRENT_USER CURSOR CYCLE
*************** opt_validator:
*** 2758,2763 ****
--- 2758,2775 ----
  			| /*EMPTY*/								{ $$ = NIL; }
  		;
  
+ fdw_option:
+ 			VALIDATOR handler_name		{ $$ = makeDefElem("validator", (Node *) $2); }
+ 			| NO VALIDATOR				{ $$ = makeDefElem("validator", NULL); }
+ 			| CONNECTOR handler_name	{ $$ = makeDefElem("connector", (Node *) $2); }
+ 			| NO CONNECTOR				{ $$ = makeDefElem("connector", NULL); }
+ 		;
+ 
+ fdw_options:
+ 			fdw_option								{ $$ = list_make1($1); }
+ 			| fdw_options fdw_option				{ $$ = lappend($1, $2); }
+ 		;
+ 
  opt_lancompiler:
  			LANCOMPILER Sconst						{ $$ = $2; }
  			| /*EMPTY*/								{ $$ = NULL; }
*************** DropTableSpaceStmt: DROP TABLESPACE name
*** 2837,2854 ****
  /*****************************************************************************
   *
   * 		QUERY:
!  *             CREATE FOREIGN DATA WRAPPER name [ VALIDATOR name ]
   *
   *****************************************************************************/
  
! CreateFdwStmt: CREATE FOREIGN DATA_P WRAPPER name opt_validator create_generic_options
  				{
  					CreateFdwStmt *n = makeNode(CreateFdwStmt);
  					n->fdwname = $5;
! 					n->validator = $6;
  					n->options = $7;
  					$$ = (Node *) n;
  				}
  		;
  
  /*****************************************************************************
--- 2849,2873 ----
  /*****************************************************************************
   *
   * 		QUERY:
!  *             CREATE FOREIGN DATA WRAPPER name [ VALIDATOR name ] [ CONNECTOR name ]
   *
   *****************************************************************************/
  
! CreateFdwStmt: CREATE FOREIGN DATA_P WRAPPER name fdw_options create_generic_options
  				{
  					CreateFdwStmt *n = makeNode(CreateFdwStmt);
  					n->fdwname = $5;
! 					n->fdwoptions = $6;
  					n->options = $7;
  					$$ = (Node *) n;
  				}
+ 			| CREATE FOREIGN DATA_P WRAPPER name create_generic_options
+ 				{
+ 					CreateFdwStmt *n = makeNode(CreateFdwStmt);
+ 					n->fdwname = $5;
+ 					n->options = $6;
+ 					$$ = (Node *) n;
+ 				}
  		;
  
  /*****************************************************************************
*************** DropFdwStmt: DROP FOREIGN DATA_P WRAPPER
*** 2883,2903 ****
   *
   ****************************************************************************/
  
! AlterFdwStmt: ALTER FOREIGN DATA_P WRAPPER name validator_clause alter_generic_options
  				{
  					AlterFdwStmt *n = makeNode(AlterFdwStmt);
  					n->fdwname = $5;
! 					n->validator = $6;
! 					n->change_validator = true;
  					n->options = $7;
  					$$ = (Node *) n;
  				}
! 			| ALTER FOREIGN DATA_P WRAPPER name validator_clause
  				{
  					AlterFdwStmt *n = makeNode(AlterFdwStmt);
  					n->fdwname = $5;
! 					n->validator = $6;
! 					n->change_validator = true;
  					$$ = (Node *) n;
  				}
  			| ALTER FOREIGN DATA_P WRAPPER name alter_generic_options
--- 2902,2920 ----
   *
   ****************************************************************************/
  
! AlterFdwStmt: ALTER FOREIGN DATA_P WRAPPER name fdw_options alter_generic_options
  				{
  					AlterFdwStmt *n = makeNode(AlterFdwStmt);
  					n->fdwname = $5;
! 					n->fdwoptions = $6;
  					n->options = $7;
  					$$ = (Node *) n;
  				}
! 			| ALTER FOREIGN DATA_P WRAPPER name fdw_options
  				{
  					AlterFdwStmt *n = makeNode(AlterFdwStmt);
  					n->fdwname = $5;
! 					n->fdwoptions = $6;
  					$$ = (Node *) n;
  				}
  			| ALTER FOREIGN DATA_P WRAPPER name alter_generic_options
*************** unreserved_keyword:
*** 10240,10245 ****
--- 10257,10263 ----
  			| CONCURRENTLY
  			| CONFIGURATION
  			| CONNECTION
+ 			| CONNECTOR
  			| CONSTRAINTS
  			| CONTENT_P
  			| CONTINUE_P
diff -cprN head/src/include/catalog/pg_foreign_data_wrapper.h dblink2/src/include/catalog/pg_foreign_data_wrapper.h
*** head/src/include/catalog/pg_foreign_data_wrapper.h	2009-02-24 19:06:34.000000000 +0900
--- dblink2/src/include/catalog/pg_foreign_data_wrapper.h	2009-08-19 10:58:02.419272921 +0900
*************** CATALOG(pg_foreign_data_wrapper,2328)
*** 33,38 ****
--- 33,39 ----
  	NameData	fdwname;		/* foreign-data wrapper name */
  	Oid			fdwowner;		/* FDW owner */
  	Oid			fdwvalidator;	/* optional validation function */
+ 	Oid			fdwconnector;	/* optional connector function */
  
  	/* VARIABLE LENGTH FIELDS start here. */
  
*************** typedef FormData_pg_foreign_data_wrapper
*** 52,62 ****
   * ----------------
   */
  
! #define Natts_pg_foreign_data_wrapper				5
  #define Anum_pg_foreign_data_wrapper_fdwname		1
  #define Anum_pg_foreign_data_wrapper_fdwowner		2
  #define Anum_pg_foreign_data_wrapper_fdwvalidator	3
! #define Anum_pg_foreign_data_wrapper_fdwacl			4
! #define Anum_pg_foreign_data_wrapper_fdwoptions		5
  
  #endif   /* PG_FOREIGN_DATA_WRAPPER_H */
--- 53,64 ----
   * ----------------
   */
  
! #define Natts_pg_foreign_data_wrapper				6
  #define Anum_pg_foreign_data_wrapper_fdwname		1
  #define Anum_pg_foreign_data_wrapper_fdwowner		2
  #define Anum_pg_foreign_data_wrapper_fdwvalidator	3
! #define Anum_pg_foreign_data_wrapper_fdwconnector	4
! #define Anum_pg_foreign_data_wrapper_fdwacl			5
! #define Anum_pg_foreign_data_wrapper_fdwoptions		6
  
  #endif   /* PG_FOREIGN_DATA_WRAPPER_H */
diff -cprN head/src/include/catalog/pg_proc.h dblink2/src/include/catalog/pg_proc.h
*** head/src/include/catalog/pg_proc.h	2009-08-04 13:04:12.000000000 +0900
--- dblink2/src/include/catalog/pg_proc.h	2009-08-19 16:16:11.766178182 +0900
*************** DESCR("convert a long int to a human rea
*** 3702,3707 ****
--- 3702,3717 ----
  
  DATA(insert OID = 2316 ( postgresql_fdw_validator PGNSP PGUID 12 1 0 0 f f f t f i 2 0 16 "1009 26" _null_ _null_ _null_ _null_ postgresql_fdw_validator _null_ _null_ _null_));
  
+ DATA(insert OID = 3030 ( dblink_connect		 PGNSP PGUID 12 1 0		0 f f f t f v 2 0 16	"25 25"		 _null_ _null_ _null_ _null_ dblink_connect_name _null_ _null_ _null_));
+ DATA(insert OID = 3031 ( dblink_connect		 PGNSP PGUID 12 1 0		0 f f f t f v 1 0 16	"25"		 _null_ _null_ _null_ _null_ dblink_connect _null_ _null_ _null_));
+ DATA(insert OID = 3032 ( dblink_disconnect	 PGNSP PGUID 12 1 0		0 f f f t f v 1 0 16	"25"		 _null_ _null_ _null_ _null_ dblink_disconnect _null_ _null_ _null_));
+ DATA(insert OID = 3033 ( dblink				 PGNSP PGUID 12 1 1000	0 f f f t t v 2 0 2249	"25 25"		 _null_ _null_ _null_ _null_ dblink_query _null_ _null_ _null_));
+ DATA(insert OID = 3034 ( dblink_exec		 PGNSP PGUID 12 1 0		0 f f f t f v 2 0 20	"25 25"		 _null_ _null_ _null_ _null_ dblink_exec _null_ _null_ _null_));
+ DATA(insert OID = 3035 ( dblink_open		 PGNSP PGUID 12 1 0		0 f f f t f v 2 0 23	"25 25"		 _null_ _null_ _null_ _null_ dblink_open _null_ _null_ _null_));
+ DATA(insert OID = 3036 ( dblink_fetch		 PGNSP PGUID 12 1 1000	0 f f f t t v 2 0 2249	"23 23"		 _null_ _null_ _null_ _null_ dblink_fetch _null_ _null_ _null_));
+ DATA(insert OID = 3037 ( dblink_close		 PGNSP PGUID 12 1 0		0 f f f t f v 1 0 16	"23"		 _null_ _null_ _null_ _null_ dblink_close _null_ _null_ _null_));
+ DATA(insert OID = 3038 ( dblink_connections	 PGNSP PGUID 12 1 10	0 f f f t t v 0 0 2249	"" "{25,26,25,16}" "{o,o,o,o}" "{name,server,status,keep}" _null_ dblink_connections _null_ _null_ _null_));
+ 
  DATA(insert OID = 2290 (  record_in			PGNSP PGUID 12 1 0 0 f f f t f v 3 0 2249 "2275 26 23" _null_ _null_ _null_ _null_	record_in _null_ _null_ _null_ ));
  DESCR("I/O");
  DATA(insert OID = 2291 (  record_out		PGNSP PGUID 12 1 0 0 f f f t f v 1 0 2275 "2249" _null_ _null_ _null_ _null_ record_out _null_ _null_ _null_ ));
diff -cprN head/src/include/foreign/dblink.h dblink2/src/include/foreign/dblink.h
*** head/src/include/foreign/dblink.h	1970-01-01 09:00:00.000000000 +0900
--- dblink2/src/include/foreign/dblink.h	2009-08-19 11:40:49.767168997 +0900
***************
*** 0 ****
--- 1,54 ----
+ /*-------------------------------------------------------------------------
+  *
+  * dblink.h
+  *	  support for foreign server connectors.
+  *
+  *-------------------------------------------------------------------------
+  */
+ #ifndef DBLINK_H
+ #define DBLINK_H
+ 
+ typedef enum dblink_command
+ {
+ 	DBLINK_BEGIN,
+ 	DBLINK_COMMIT,
+ 	DBLINK_ROLLBACK,
+ 	DBLINK_XA_START,
+ 	DBLINK_XA_PREPARE,
+ 	DBLINK_XA_COMMIT,
+ 	DBLINK_XA_ROLLBACK
+ } dblink_command;
+ 
+ typedef struct dblink_connection	dblink_connection;
+ typedef struct dblink_cursor		dblink_cursor;
+ 
+ typedef void (*dblink_disconnect_t)(dblink_connection *conn);
+ typedef int64 (*dblink_exec_t)(dblink_connection *conn, const char *sql);
+ typedef dblink_cursor *(*dblink_open_t)(dblink_connection *conn, const char *sql, int32 fetchsize);
+ typedef bool (*dblink_command_t)(dblink_connection *conn, dblink_command type);
+ 
+ struct dblink_connection
+ {
+ 	dblink_disconnect_t	disconnect;
+ 	dblink_exec_t		exec;
+ 	dblink_open_t		open;
+ 	dblink_command_t	command;
+ };
+ 
+ /*
+  * interface dblink_cursor
+  */
+ 
+ typedef bool (*dblink_fetch_t)(dblink_cursor *cur, const char *values[]);
+ typedef void (*dblink_close_t)(dblink_cursor *cur);
+ 
+ struct dblink_cursor
+ {
+ 	dblink_fetch_t		fetch;
+ 	dblink_close_t		close;
+ 	int					nfields;
+ };
+ 
+ extern void AtCommit_dblink(void);
+ 
+ #endif   /* DBLINK_H */
diff -cprN head/src/include/foreign/foreign.h dblink2/src/include/foreign/foreign.h
*** head/src/include/foreign/foreign.h	2009-06-11 23:49:11.000000000 +0900
--- dblink2/src/include/foreign/foreign.h	2009-08-19 10:58:02.419272921 +0900
*************** typedef struct ForeignDataWrapper
*** 38,43 ****
--- 38,44 ----
  	Oid			owner;			/* FDW owner user Oid */
  	char	   *fdwname;		/* Name of the FDW */
  	Oid			fdwvalidator;
+ 	Oid			fdwconnector;
  	List	   *options;		/* fdwoptions as DefElem list */
  } ForeignDataWrapper;
  
diff -cprN head/src/include/nodes/parsenodes.h dblink2/src/include/nodes/parsenodes.h
*** head/src/include/nodes/parsenodes.h	2009-08-03 07:14:53.000000000 +0900
--- dblink2/src/include/nodes/parsenodes.h	2009-08-19 10:58:02.420272955 +0900
*************** typedef struct CreateFdwStmt
*** 1455,1461 ****
  {
  	NodeTag		type;
  	char	   *fdwname;		/* foreign-data wrapper name */
! 	List	   *validator;		/* optional validator function (qual. name) */
  	List	   *options;		/* generic options to FDW */
  } CreateFdwStmt;
  
--- 1455,1461 ----
  {
  	NodeTag		type;
  	char	   *fdwname;		/* foreign-data wrapper name */
! 	List	   *fdwoptions;		/* validator and connector */
  	List	   *options;		/* generic options to FDW */
  } CreateFdwStmt;
  
*************** typedef struct AlterFdwStmt
*** 1463,1470 ****
  {
  	NodeTag		type;
  	char	   *fdwname;		/* foreign-data wrapper name */
! 	List	   *validator;		/* optional validator function (qual. name) */
! 	bool		change_validator;
  	List	   *options;		/* generic options to FDW */
  } AlterFdwStmt;
  
--- 1463,1469 ----
  {
  	NodeTag		type;
  	char	   *fdwname;		/* foreign-data wrapper name */
! 	List	   *fdwoptions;		/* validator and connector */
  	List	   *options;		/* generic options to FDW */
  } AlterFdwStmt;
  
diff -cprN head/src/include/parser/kwlist.h dblink2/src/include/parser/kwlist.h
*** head/src/include/parser/kwlist.h	2009-04-06 17:42:53.000000000 +0900
--- dblink2/src/include/parser/kwlist.h	2009-08-19 12:34:33.826159610 +0900
*************** PG_KEYWORD("committed", COMMITTED, UNRES
*** 85,90 ****
--- 85,91 ----
  PG_KEYWORD("concurrently", CONCURRENTLY, UNRESERVED_KEYWORD)
  PG_KEYWORD("configuration", CONFIGURATION, UNRESERVED_KEYWORD)
  PG_KEYWORD("connection", CONNECTION, UNRESERVED_KEYWORD)
+ PG_KEYWORD("connector", CONNECTOR, UNRESERVED_KEYWORD)
  PG_KEYWORD("constraint", CONSTRAINT, RESERVED_KEYWORD)
  PG_KEYWORD("constraints", CONSTRAINTS, UNRESERVED_KEYWORD)
  PG_KEYWORD("content", CONTENT_P, UNRESERVED_KEYWORD)
diff -cprN head/src/test/regress/expected/rules.out dblink2/src/test/regress/expected/rules.out
*** head/src/test/regress/expected/rules.out	2009-02-07 06:15:12.000000000 +0900
--- dblink2/src/test/regress/expected/rules.out	2009-08-19 16:20:40.142177000 +0900
*************** drop table cchild;
*** 1278,1283 ****
--- 1278,1284 ----
  SELECT viewname, definition FROM pg_views WHERE schemaname <> 'information_schema' ORDER BY viewname;
           viewname         |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              definition                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               
  --------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+  dblink_connections       | SELECT dblink_connections.name, dblink_connections.server, dblink_connections.status, dblink_connections.keep FROM dblink_connections() dblink_connections(name, server, status, keep);
   iexit                    | SELECT ih.name, ih.thepath, interpt_pp(ih.thepath, r.thepath) AS exit FROM ihighway ih, ramp r WHERE (ih.thepath ## r.thepath);
   pg_cursors               | SELECT c.name, c.statement, c.is_holdable, c.is_binary, c.is_scrollable, c.creation_time FROM pg_cursor() c(name, statement, is_holdable, is_binary, is_scrollable, creation_time);
   pg_group                 | SELECT pg_authid.rolname AS groname, pg_authid.oid AS grosysid, ARRAY(SELECT pg_auth_members.member FROM pg_auth_members WHERE (pg_auth_members.roleid = pg_authid.oid)) AS grolist FROM pg_authid WHERE (NOT pg_authid.rolcanlogin);
*************** SELECT viewname, definition FROM pg_view
*** 1329,1335 ****
   shoelace_obsolete        | SELECT shoelace.sl_name, shoelace.sl_avail, shoelace.sl_color, shoelace.sl_len, shoelace.sl_unit, shoelace.sl_len_cm FROM shoelace WHERE (NOT (EXISTS (SELECT shoe.shoename FROM shoe WHERE (shoe.slcolor = shoelace.sl_color))));
   street                   | SELECT r.name, r.thepath, c.cname FROM ONLY road r, real_city c WHERE (c.outline ## r.thepath);
   toyemp                   | SELECT emp.name, emp.age, emp.location, (12 * emp.salary) AS annualsal FROM emp;
! (51 rows)
  
  SELECT tablename, rulename, definition FROM pg_rules 
  	ORDER BY tablename, rulename;
--- 1330,1336 ----
   shoelace_obsolete        | SELECT shoelace.sl_name, shoelace.sl_avail, shoelace.sl_color, shoelace.sl_len, shoelace.sl_unit, shoelace.sl_len_cm FROM shoelace WHERE (NOT (EXISTS (SELECT shoe.shoename FROM shoe WHERE (shoe.slcolor = shoelace.sl_color))));
   street                   | SELECT r.name, r.thepath, c.cname FROM ONLY road r, real_city c WHERE (c.outline ## r.thepath);
   toyemp                   | SELECT emp.name, emp.age, emp.location, (12 * emp.salary) AS annualsal FROM emp;
! (52 rows)
  
  SELECT tablename, rulename, definition FROM pg_rules 
  	ORDER BY tablename, rulename;
#2Tom Lane
tgl@sss.pgh.pa.us
In reply to: Itagaki Takahiro (#1)
Re: FDW-based dblink (WIP)

Itagaki Takahiro <itagaki.takahiro@oss.ntt.co.jp> writes:

Here is a WIP patch for a foreign data wrapper based dblink.
It integrates dblink module into core and adds a new functionality,
automatic transaction management.

I don't believe there is any consensus for integrating dblink into core,
and I for one will resist that strongly. Keep it in contrib.

regards, tom lane

#3Pavel Stehule
pavel.stehule@gmail.com
In reply to: Tom Lane (#2)
Re: FDW-based dblink (WIP)

2009/8/19 Tom Lane <tgl@sss.pgh.pa.us>:

Itagaki Takahiro <itagaki.takahiro@oss.ntt.co.jp> writes:

Here is a WIP patch for a foreign data wrapper based dblink.
It integrates dblink module into core and adds a new functionality,
automatic transaction management.

I don't believe there is any consensus for integrating dblink into core,
and I for one will resist that strongly.  Keep it in contrib.

if integration means, so I could to write query like

SELECT * FROM otherdatabase.schema.table ....
UPDATE otherdb.table SET ...

I am for integration.

regards
Pavel Stehule

Show quoted text

                       regards, tom lane

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#4Heikki Linnakangas
heikki.linnakangas@enterprisedb.com
In reply to: Itagaki Takahiro (#1)
Re: FDW-based dblink (WIP)

Itagaki Takahiro wrote:

It integrates dblink module into core and adds a new functionality,
automatic transaction management.

Why does it need to be integrated to core? I'd prefer to keep dblink out
of core, unless there's a reason.

I want pretty much the automatic transaction management. It is useful to
write applied modules like materialized-view-over-network. But it should
be able to be turned off if we don't want it. I'll work on those parts next.

The transaction management is very unsafe as it is, for the reasons I
mentioned earlier.

--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com

#5Tom Lane
tgl@sss.pgh.pa.us
In reply to: Pavel Stehule (#3)
Re: FDW-based dblink (WIP)

Pavel Stehule <pavel.stehule@gmail.com> writes:

2009/8/19 Tom Lane <tgl@sss.pgh.pa.us>:

I don't believe there is any consensus for integrating dblink into core,
and I for one will resist that strongly.  Keep it in contrib.

if integration means, so I could to write query like
SELECT * FROM otherdatabase.schema.table ....
UPDATE otherdb.table SET ...
I am for integration.

That is not what "integrating dblink" means --- what Itagaki-san is
talking about is moving the dblink_xxx functions into core. What
you are talking about is actual SQL/MED functionality, which we should
indeed try to get into core someday. But dblink is a dead end as far
as standards compliance goes. Between that and the potential security
issues, we should not put it in core.

regards, tom lane

#6Pavel Stehule
pavel.stehule@gmail.com
In reply to: Tom Lane (#5)
Re: FDW-based dblink (WIP)

2009/8/19 Tom Lane <tgl@sss.pgh.pa.us>:

Pavel Stehule <pavel.stehule@gmail.com> writes:

2009/8/19 Tom Lane <tgl@sss.pgh.pa.us>:

I don't believe there is any consensus for integrating dblink into core,
and I for one will resist that strongly.  Keep it in contrib.

if integration means, so I could to write query like
SELECT * FROM otherdatabase.schema.table ....
UPDATE otherdb.table SET ...
I am for integration.

That is not what "integrating dblink" means --- what Itagaki-san is
talking about is moving the dblink_xxx functions into core.  What
you are talking about is actual SQL/MED functionality, which we should
indeed try to get into core someday.  But dblink is a dead end as far
as standards compliance goes.  Between that and the potential security
issues, we should not put it in core.

aha, - ok

regards
Pavel Stehule

Show quoted text

                       regards, tom lane

#7Joe Conway
mail@joeconway.com
In reply to: Tom Lane (#5)
Re: FDW-based dblink (WIP)

Tom Lane wrote:

Pavel Stehule <pavel.stehule@gmail.com> writes:

2009/8/19 Tom Lane <tgl@sss.pgh.pa.us>:

I don't believe there is any consensus for integrating dblink into core,
and I for one will resist that strongly. Â Keep it in contrib.

if integration means, so I could to write query like
SELECT * FROM otherdatabase.schema.table ....
UPDATE otherdb.table SET ...
I am for integration.

That is not what "integrating dblink" means --- what Itagaki-san is
talking about is moving the dblink_xxx functions into core. What
you are talking about is actual SQL/MED functionality, which we should
indeed try to get into core someday. But dblink is a dead end as far
as standards compliance goes. Between that and the potential security
issues, we should not put it in core.

+1

Joe

#8Itagaki Takahiro
itagaki.takahiro@oss.ntt.co.jp
In reply to: Tom Lane (#2)
1 attachment(s)
Re: FDW-based dblink (WIP)

Tom Lane <tgl@sss.pgh.pa.us> wrote:

I don't believe there is any consensus for integrating dblink into core,
and I for one will resist that strongly. Keep it in contrib.

OK, our consensus is that dblink should be replaced with SQL/MED interface
and then we'll start to consider integrating into core.

However, automatic transaction management needs help by core. Is it
acceptable to have two-phase callbacks? Registered callbacks are
called with TWOPHASE_EVENT_PRE_COMMIT when a transaction is about
to be committed or prepared. The argument gxact is NULL if the
transaction is committed without 2PC.

typedef enum
{
TWOPHASE_EVENT_PRE_COMMIT,
TWOPHASE_EVENT_POST_COMMIT,
TWOPHASE_EVENT_POST_ABORT,
TWOPHASE_EVENT_RECOVER,
} TwoPhaseEvent;

typedef void (*TwoPhaseEventCallback) (
TwoPhaseEvent event, GlobalTransaction gxact, void *arg);

Regards,
---
ITAGAKI Takahiro
NTT Open Source Software Center

Attachments:

twophase_callbacks-20090820.patchapplication/octet-stream; name=twophase_callbacks-20090820.patchDownload
diff -cprN head/src/backend/access/transam/twophase.c dblink2/src/backend/access/transam/twophase.c
*** head/src/backend/access/transam/twophase.c	2009-06-26 04:05:52.000000000 +0900
--- dblink2/src/backend/access/transam/twophase.c	2009-08-20 11:18:35.852151624 +0900
*************** typedef struct TwoPhaseStateData
*** 138,143 ****
--- 138,155 ----
  
  static TwoPhaseStateData *TwoPhaseState;
  
+ /*
+  * List of add-on for twophase commit callbacks
+  */
+ typedef struct TwoPhaseCallbackItem
+ {
+ 	struct TwoPhaseCallbackItem *next;
+ 	TwoPhaseEventCallback callback;
+ 	void	   *arg;
+ } TwoPhaseCallbackItem;
+ 
+ static TwoPhaseCallbackItem *TwoPhase_callbacks = NULL;
+ 
  
  static void RecordTransactionCommitPrepared(TransactionId xid,
  								int nchildren,
*************** EndPrepare(GlobalTransaction gxact)
*** 944,949 ****
--- 956,964 ----
  				 errmsg("could not seek in two-phase state file: %m")));
  	}
  
+ 	/* Fire pre-commit callbacks */
+ 	CallTwoPhaseCallbacks(TWOPHASE_EVENT_PRE_COMMIT, gxact);
+ 
  	/*
  	 * The state file isn't valid yet, because we haven't written the correct
  	 * CRC yet.  Before we do that, insert entry in WAL and flush it to disk.
*************** FinishPreparedTransaction(const char *gi
*** 1248,1256 ****
--- 1263,1277 ----
  
  	/* And now do the callbacks */
  	if (isCommit)
+ 	{
  		ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
+ 		CallTwoPhaseCallbacks(TWOPHASE_EVENT_POST_COMMIT, gxact);
+ 	}
  	else
+ 	{
  		ProcessRecords(bufptr, xid, twophase_postabort_callbacks);
+ 		CallTwoPhaseCallbacks(TWOPHASE_EVENT_POST_ABORT, gxact);
+ 	}
  
  	/* Count the prepared xact as committed or aborted */
  	AtEOXact_PgStat(isCommit);
*************** RecoverPreparedTransactions(void)
*** 1687,1692 ****
--- 1708,1714 ----
  			 * Recover other state (notably locks) using resource managers
  			 */
  			ProcessRecords(bufptr, xid, twophase_recover_callbacks);
+ 			CallTwoPhaseCallbacks(TWOPHASE_EVENT_RECOVER, gxact);
  
  			pfree(buf);
  		}
*************** RecordTransactionAbortPrepared(Transacti
*** 1839,1841 ****
--- 1861,1906 ----
  
  	END_CRIT_SECTION();
  }
+ 
+ void
+ RegisterTwoPhaseCallback(TwoPhaseEventCallback callback, void *arg)
+ {
+ 	TwoPhaseCallbackItem *item;
+ 
+ 	item = (TwoPhaseCallbackItem *)
+ 		MemoryContextAlloc(TopMemoryContext, sizeof(TwoPhaseCallbackItem));
+ 	item->callback = callback;
+ 	item->arg = arg;
+ 	item->next = TwoPhase_callbacks;
+ 	TwoPhase_callbacks = item;
+ }
+ 
+ void
+ UnregisterTwoPhaseCallback(TwoPhaseEventCallback callback, void *arg)
+ {
+ 	TwoPhaseCallbackItem *item;
+ 	TwoPhaseCallbackItem *prev;
+ 
+ 	prev = NULL;
+ 	for (item = TwoPhase_callbacks; item; prev = item, item = item->next)
+ 	{
+ 		if (item->callback == callback && item->arg == arg)
+ 		{
+ 			if (prev)
+ 				prev->next = item->next;
+ 			else
+ 				TwoPhase_callbacks = item->next;
+ 			pfree(item);
+ 			break;
+ 		}
+ 	}
+ }
+ 
+ void
+ CallTwoPhaseCallbacks(TwoPhaseEvent event, GlobalTransaction gxact)
+ {
+ 	TwoPhaseCallbackItem *item;
+ 
+ 	for (item = TwoPhase_callbacks; item; item = item->next)
+ 		(*item->callback) (event, gxact, item->arg);
+ }
diff -cprN head/src/backend/access/transam/xact.c dblink2/src/backend/access/transam/xact.c
*** head/src/backend/access/transam/xact.c	2009-06-11 23:48:54.000000000 +0900
--- dblink2/src/backend/access/transam/xact.c	2009-08-20 11:14:22.576185055 +0900
*************** CommitTransaction(void)
*** 1593,1598 ****
--- 1593,1601 ----
  	/* Now we can shut down the deferred-trigger manager */
  	AfterTriggerEndXact(true);
  
+ 	/* Fire pre-commit callbacks */
+ 	CallTwoPhaseCallbacks(TWOPHASE_EVENT_PRE_COMMIT, NULL);
+ 
  	/* Close any open regular cursors */
  	AtCommit_Portals();
  
diff -cprN head/src/include/access/twophase.h dblink2/src/include/access/twophase.h
*** head/src/include/access/twophase.h	2009-01-02 02:23:56.000000000 +0900
--- dblink2/src/include/access/twophase.h	2009-08-20 11:06:06.572168421 +0900
***************
*** 24,29 ****
--- 24,40 ----
   */
  typedef struct GlobalTransactionData *GlobalTransaction;
  
+ typedef enum
+ {
+ 	TWOPHASE_EVENT_PRE_COMMIT,
+ 	TWOPHASE_EVENT_POST_COMMIT,
+ 	TWOPHASE_EVENT_POST_ABORT,
+ 	TWOPHASE_EVENT_RECOVER,
+ } TwoPhaseEvent;
+ 
+ typedef void (*TwoPhaseEventCallback) (TwoPhaseEvent event,
+ 									   GlobalTransaction gxact, void *arg);
+ 
  /* GUC variable */
  extern int	max_prepared_xacts;
  
*************** extern void CheckPointTwoPhase(XLogRecPt
*** 49,52 ****
--- 60,67 ----
  
  extern void FinishPreparedTransaction(const char *gid, bool isCommit);
  
+ extern void RegisterTwoPhaseCallback(TwoPhaseEventCallback callback, void *arg);
+ extern void UnregisterTwoPhaseCallback(TwoPhaseEventCallback callback, void *arg);
+ extern void CallTwoPhaseCallbacks(TwoPhaseEvent event, GlobalTransaction gxact);
+ 
  #endif   /* TWOPHASE_H */
#9Heikki Linnakangas
heikki.linnakangas@enterprisedb.com
In reply to: Itagaki Takahiro (#8)
Re: FDW-based dblink (WIP)

Itagaki Takahiro wrote:

However, automatic transaction management needs help by core. Is it
acceptable to have two-phase callbacks?

Probably, but it's far too early to decide what the interface should
look like.

--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com

#10Peter Eisentraut
peter_e@gmx.net
In reply to: Itagaki Takahiro (#1)
Re: FDW-based dblink (WIP)

On Wed, 2009-08-19 at 17:07 +0900, Itagaki Takahiro wrote:

Here is a WIP patch for a foreign data wrapper based dblink.

It integrates dblink module into core and adds a new functionality,
automatic transaction management. The new interface of dblink is
exported by include/foreign/dblink.h. We can easily write a connector
module for another database because we can reuse transaction and
resource management parts in core.

This patch is listed in the commitfest, but I think the consensus was
that it needed some rework. I think the idea is that we will have
support for syntax like

Syntax to create FDW with connector is below:
CREATE FOREIGN DATA WRAPPER postgresql
VALIDATOR postgresql_fdw_validator
CONNECTOR postgresql_fdw_connector
OPTIONS (...);

in core, but the actual implementation of postgresql_fdw_connector would
be a loadable module.

Personally, I'm undecided whether the single-function connector
implementation is the best. The other approach would be to use a
multiple-function interface based directly on the functions currently
provided by dblink.

More generally, what does this really buy us? It doesn't advance the
SQL/MED implementation, because you are not adding, say, some kind of
CREATE FOREIGN TABLE support. You are just changing the dblink
implementation to go through the FDW. I would argue that it should be
the other way around: The FDW should go through dblink.

#11Itagaki Takahiro
itagaki.takahiro@oss.ntt.co.jp
In reply to: Peter Eisentraut (#10)
Re: FDW-based dblink (WIP)

Peter Eisentraut <peter_e@gmx.net> wrote:

This patch is listed in the commitfest, but I think the consensus was
that it needed some rework.

No doubt, but SQL/MED will require a lot of works. Can we split the work
into small parts? I intended FDW-based dblink to be one of the split jobs.

Here are some random considerations:

* Split dblink to connector and connection management layers.
Present dblink has own name-based connection management and error
handling routines, but I think we should share them amoung connectors.

* CREATE FOREIGN TABLE supports only select query in SQL standard.
I thnk we will still need to have free-style SQL executor like dblink
even when we support SQL/MED It is not a waste to include dblink in core.

* Consider interface of extensible connecter to be able to connect
other DBMSs. Especially, there are many differences in usage of 2PC.

* Automatic 2PC is very useful if we supports non-select query in SQL/MED.
It would be better to have some infrastructure for it.

Regards,
---
ITAGAKI Takahiro
NTT Open Source Software Center

#12Peter Eisentraut
peter_e@gmx.net
In reply to: Itagaki Takahiro (#11)
Re: FDW-based dblink (WIP)

On Wed, 2009-09-16 at 13:47 +0900, Itagaki Takahiro wrote:

Peter Eisentraut <peter_e@gmx.net> wrote:

This patch is listed in the commitfest, but I think the consensus was
that it needed some rework.

No doubt, but SQL/MED will require a lot of works. Can we split the work
into small parts? I intended FDW-based dblink to be one of the split jobs.

Sure, but I think what you are doing here isn't on anyone's agenda. I
would imagine that the next step would be to implement foreign tables
using something like dblink's interface as underlying interface. What
you are proposing doesn't really move us closer to that, or I'm
misunderstanding what you are trying to achieve. So what is the purpose
of this patch anyway?