*** dblink.c Wed Sep 16 13:01:21 2009 --- dblink-fix.c Wed Sep 16 12:59:23 2009 *************** *** 35,45 **** --- 35,47 ---- #include #include "libpq-fe.h" + #include "libpq-int.h" #include "fmgr.h" #include "funcapi.h" #include "access/genam.h" #include "access/heapam.h" #include "access/tupdesc.h" + #include "access/xact.h" #include "catalog/indexing.h" #include "catalog/namespace.h" #include "catalog/pg_index.h" *************** *** 54,59 **** --- 56,62 ---- #include "nodes/nodes.h" #include "nodes/pg_list.h" #include "parser/parse_type.h" + #include "storage/ipc.h" #include "utils/acl.h" #include "utils/array.h" #include "utils/builtins.h" *************** *** 76,81 **** --- 79,87 ---- bool newXactForCursor; /* Opened a transaction for a cursor */ } remoteConn; + extern void _PG_init(void); + extern void _PG_fini(void); + /* * Internal declarations */ *************** *** 100,109 **** --- 106,133 ---- static void dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail); static char *get_connect_string(const char *servername); static char *escape_param_str(const char *from); + static PGresult *execute_query(PGconn *conn, const char *sql, bool freeconn); + static PGresult *wait_for_result(PGconn *conn, bool freeconn); + static void register_result(PGresult *res); + static void unregister_result(PGresult *res); + static void AtEOXact_dblink(XactEvent event, void *arg); /* Global */ static remoteConn *pconn = NULL; static HTAB *remoteConnHash = NULL; + static List *managedResults = NIL; + + void + _PG_init(void) + { + RegisterXactCallback(AtEOXact_dblink, 0); + } + + void + _PG_fini(void) + { + UnregisterXactCallback(AtEOXact_dblink, 0); + } /* * Following is list that holds multiple remote connections. *************** *** 580,586 **** * PGresult will be long-lived even though we are still in a * short-lived memory context. */ ! res = PQexec(conn, buf.data); if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) --- 604,610 ---- * PGresult will be long-lived even though we are still in a * short-lived memory context. */ ! res = execute_query(conn, buf.data, false); if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) *************** *** 637,642 **** --- 661,667 ---- PQclear(res); SRF_RETURN_DONE(funcctx); } + register_result(res); /* * switch to memory context appropriate for multiple function calls, *************** *** 695,701 **** else { /* do when there is no more left */ ! PQclear(res); SRF_RETURN_DONE(funcctx); } } --- 720,726 ---- else { /* do when there is no more left */ ! unregister_result(res); SRF_RETURN_DONE(funcctx); } } *************** *** 839,848 **** /* synchronous query, or async result retrieval */ if (!is_async) ! res = PQexec(conn, sql); else { ! res = PQgetResult(conn); /* NULL means we're all done with the async results */ if (!res) { --- 864,873 ---- /* synchronous query, or async result retrieval */ if (!is_async) ! res = execute_query(conn, sql, freeconn); else { ! res = wait_for_result(conn, freeconn); /* NULL means we're all done with the async results */ if (!res) { *************** *** 930,935 **** --- 955,961 ---- MemoryContextSwitchTo(oldcontext); SRF_RETURN_DONE(funcctx); } + register_result(res); /* store needed metadata for subsequent calls */ attinmeta = TupleDescGetAttInMetadata(tupdesc); *************** *** 989,995 **** else { /* do when there is no more left */ ! PQclear(res); SRF_RETURN_DONE(funcctx); } } --- 1015,1021 ---- else { /* do when there is no more left */ ! unregister_result(res); SRF_RETURN_DONE(funcctx); } } *************** *** 1167,1173 **** if (!conn) DBLINK_CONN_NOT_AVAIL; ! res = PQexec(conn, sql); if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) --- 1193,1199 ---- if (!conn) DBLINK_CONN_NOT_AVAIL; ! res = execute_query(conn, sql, freeconn); if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) *************** *** 2550,2552 **** --- 2576,2748 ---- return buf->data; } + + static PGresult * + execute_query(PGconn *conn, const char *sql, bool freeconn) + { + /* async query send */ + if (PQsendQuery(conn, sql) != 1) + elog(NOTICE, "%s", PQerrorMessage(conn)); + + return wait_for_result(conn, freeconn); + } + + typedef struct + { + PGconn *conn; + bool freeconn; + } cancel_query_args; + + static void + cancel_query(int code, Datum arg) + { + cancel_query_args *args = (cancel_query_args *) DatumGetPointer(arg); + PGconn *conn = args->conn; + PGresult *res; + + /* cancel current query */ + if (PQisBusy(conn)) + { + PGcancel *cancel; + char errbuf[256]; + + cancel = PQgetCancel(conn); + if (cancel != NULL) + { + PQcancel(cancel, errbuf, 256); + PQfreeCancel(cancel); + } + } + + /* discard all results */ + while ((res = PQgetResult(conn)) != NULL) + PQclear(res); + + /* disconnect if a temporary connection */ + if (args->freeconn) + PQfinish(conn); + } + + /* + * copied from PQexecFinish in libpq. + */ + static PGresult * + getLastResult(PGconn *conn) + { + PGresult *result; + PGresult *lastResult; + + lastResult = NULL; + while ((result = PQgetResult(conn)) != NULL) + { + if (lastResult) + { + if (lastResult->resultStatus == PGRES_FATAL_ERROR && + result->resultStatus == PGRES_FATAL_ERROR) + { + PQExpBufferData errorBuf; + + initPQExpBuffer(&errorBuf); + if (lastResult->errMsg) + { + appendPQExpBufferStr(&errorBuf, lastResult->errMsg); + free(lastResult->errMsg); + } + appendPQExpBufferStr(&errorBuf, result->errMsg); + lastResult->errMsg = errorBuf.data; + + PQclear(result); + result = lastResult; + + /* Make sure PQerrorMessage agrees with concatenated result */ + resetPQExpBuffer(&conn->errorMessage); + appendPQExpBufferStr(&conn->errorMessage, result->errMsg); + } + else + PQclear(lastResult); + } + lastResult = result; + if (result->resultStatus == PGRES_COPY_IN || + result->resultStatus == PGRES_COPY_OUT || + conn->status == CONNECTION_BAD) + break; + } + + return lastResult; + } + + static PGresult * + wait_for_result(PGconn *conn, bool freeconn) + { + PGresult *res = NULL; + cancel_query_args args; + + args.conn = conn; + args.freeconn = freeconn; + + PG_ENSURE_ERROR_CLEANUP(cancel_query, PointerGetDatum(&args)); + { + for (;;) + { + fd_set rset; + int sock; + struct timeval tv; + + CHECK_FOR_INTERRUPTS(); + + PQconsumeInput(conn); + if (!PQisBusy(conn)) + { + res = getLastResult(conn); + break; + } + + sock = PQsocket(conn); + if (sock < 0) + break; + FD_ZERO(&rset); + FD_SET(sock, &rset); + tv.tv_sec = 1; + tv.tv_usec = 0; + if (select(sock + 1, &rset, NULL, NULL, &tv) < 0 && errno != EINTR) + break; + } + } + PG_END_ENSURE_ERROR_CLEANUP(cancel_query, PointerGetDatum(&args)); + + return res; + } + + static void + register_result(PGresult *res) + { + MemoryContext oldcontext; + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + managedResults = lappend(managedResults, res); + MemoryContextSwitchTo(oldcontext); + } + + /* unregister and free result */ + static void + unregister_result(PGresult *res) + { + managedResults = list_delete_ptr(managedResults, res); + PQclear(res); + } + + static void + AtEOXact_dblink(XactEvent event, void *arg) + { + if (managedResults != NIL) + { + ListCell *cell; + + foreach(cell, managedResults) + { + PQclear((PGresult *) lfirst(cell)); + } + + list_free(managedResults); + managedResults = NIL; + } + }