*** dblink.c Mon Jun 29 12:31:46 2009 --- dblink.fixed.c Mon Jun 29 12:28:49 2009 *************** *** 40,45 **** --- 40,46 ---- #include "access/genam.h" #include "access/heapam.h" #include "access/tupdesc.h" + #include "access/xact.h" #include "catalog/indexing.h" #include "catalog/namespace.h" #include "catalog/pg_index.h" *************** *** 54,59 **** --- 55,61 ---- #include "nodes/nodes.h" #include "nodes/pg_list.h" #include "parser/parse_type.h" + #include "storage/ipc.h" #include "utils/acl.h" #include "utils/array.h" #include "utils/builtins.h" *************** typedef struct remoteConn *** 76,81 **** --- 78,86 ---- bool newXactForCursor; /* Opened a transaction for a cursor */ } remoteConn; + extern void _PG_init(void); + extern void _PG_fini(void); + /* * Internal declarations */ *************** static void dblink_security_check(PGconn *** 100,109 **** --- 105,132 ---- static void dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail); static char *get_connect_string(const char *servername); static char *escape_param_str(const char *from); + static PGresult *execute_query(PGconn *conn, const char *sql); + static PGresult *wait_for_result(PGconn *conn); + static void register_result(PGresult *res); + static void unregister_result(PGresult *res); + static void AtEOXact_dblink(XactEvent event, void *arg); /* Global */ static remoteConn *pconn = NULL; static HTAB *remoteConnHash = NULL; + static List *results = NIL; + + void + _PG_init(void) + { + RegisterXactCallback(AtEOXact_dblink, 0); + } + + void + _PG_fini(void) + { + UnregisterXactCallback(AtEOXact_dblink, 0); + } /* * Following is list that holds multiple remote connections. *************** dblink_fetch(PG_FUNCTION_ARGS) *** 580,586 **** * PGresult will be long-lived even though we are still in a * short-lived memory context. */ ! res = PQexec(conn, buf.data); if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) --- 603,609 ---- * PGresult will be long-lived even though we are still in a * short-lived memory context. */ ! res = execute_query(conn, buf.data); if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) *************** dblink_fetch(PG_FUNCTION_ARGS) *** 637,642 **** --- 660,666 ---- PQclear(res); SRF_RETURN_DONE(funcctx); } + register_result(res); /* * switch to memory context appropriate for multiple function calls, *************** dblink_fetch(PG_FUNCTION_ARGS) *** 695,701 **** else { /* do when there is no more left */ ! PQclear(res); SRF_RETURN_DONE(funcctx); } } --- 719,725 ---- else { /* do when there is no more left */ ! unregister_result(res); SRF_RETURN_DONE(funcctx); } } *************** dblink_record_internal(FunctionCallInfo *** 839,848 **** /* synchronous query, or async result retrieval */ if (!is_async) ! res = PQexec(conn, sql); else { ! res = PQgetResult(conn); /* NULL means we're all done with the async results */ if (!res) { --- 863,872 ---- /* synchronous query, or async result retrieval */ if (!is_async) ! res = execute_query(conn, sql); else { ! res = wait_for_result(conn); /* NULL means we're all done with the async results */ if (!res) { *************** dblink_record_internal(FunctionCallInfo *** 930,935 **** --- 954,960 ---- MemoryContextSwitchTo(oldcontext); SRF_RETURN_DONE(funcctx); } + register_result(res); /* store needed metadata for subsequent calls */ attinmeta = TupleDescGetAttInMetadata(tupdesc); *************** dblink_record_internal(FunctionCallInfo *** 989,995 **** else { /* do when there is no more left */ ! PQclear(res); SRF_RETURN_DONE(funcctx); } } --- 1014,1020 ---- else { /* do when there is no more left */ ! unregister_result(res); SRF_RETURN_DONE(funcctx); } } *************** dblink_exec(PG_FUNCTION_ARGS) *** 1167,1173 **** if (!conn) DBLINK_CONN_NOT_AVAIL; ! res = PQexec(conn, sql); if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) --- 1192,1198 ---- if (!conn) DBLINK_CONN_NOT_AVAIL; ! res = execute_query(conn, sql); if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) *************** escape_param_str(const char *str) *** 2466,2469 **** --- 2491,2584 ---- } return buf->data; + } + + static PGresult * + execute_query(PGconn *conn, const char *sql) + { + /* async query send */ + if (PQsendQuery(conn, sql) != 1) + elog(NOTICE, "%s", PQerrorMessage(conn)); + + return wait_for_result(conn); + } + + static void + cancel_query(int code, Datum arg) + { + PGconn *conn = (PGconn *) DatumGetPointer(arg); + PGcancel *cancel; + char errbuf[256]; + + cancel = PQgetCancel(conn); + PQcancel(cancel, errbuf, 256); + PQfreeCancel(cancel); + } + + static PGresult * + wait_for_result(PGconn *conn) + { + PGresult *res = NULL; + + PG_ENSURE_ERROR_CLEANUP(cancel_query, PointerGetDatum(conn)); + for (;;) + { + fd_set rset; + int sock; + struct timeval tv; + + CHECK_FOR_INTERRUPTS(); + + PQconsumeInput(conn); + if (!PQisBusy(conn)) + { + res = PQgetResult(conn); + break; + } + + sock = PQsocket(conn); + FD_ZERO(&rset); + FD_SET(sock, &rset); + tv.tv_sec = 1; + tv.tv_usec = 0; + if (select(sock + 1, &rset, NULL, NULL, &tv) < 0 && errno != EINTR) + break; + } + PG_END_ENSURE_ERROR_CLEANUP(cancel_query, PointerGetDatum(conn)); + + return res; + } + + static void + register_result(PGresult *res) + { + MemoryContext oldcontext; + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + results = lappend(results, res); + MemoryContextSwitchTo(oldcontext); + } + + /* unregister and free result */ + static void + unregister_result(PGresult *res) + { + results = list_delete_ptr(results, res); + PQclear(res); + } + + static void + AtEOXact_dblink(XactEvent event, void *arg) + { + if (results != NIL) + { + ListCell *cell; + + foreach(cell, results) + { + PQclear((PGresult *) lfirst(cell)); + } + + list_free(results); + results = NIL; + } }