query cancel issues in contrib/dblink
Hi,
contrib/dblink seems to have no treatments for query cancels.
It causes the following issues:
(1) Users need to wait for completion of remote query.
Requests for query cancel won't be delivered to remote servers.
(2) PGresult objects will be memory leak. The result is not released
when query is cancelled; it is released only when dblink function
is called max_calls times.
They are long standing issues (not only in 8.4),
but I hope we will fix them to make dblink more robust.
For (1), asynchronous libpq functions should be used instead of blocking
ones, and wait for the remote query using a loop with CHECK_FOR_INTERRUPTS().
For (2), we might need to store PGresult not only in funcctx->user_fctx
but also in a global list, and free all results in XactCallback if remain.
Comments welcome.
Regards,
---
ITAGAKI Takahiro
NTT Open Source Software Center
On Thu, Jun 25, 2009 at 10:41 PM, Itagaki
Takahiro<itagaki.takahiro@oss.ntt.co.jp> wrote:
Hi,
contrib/dblink seems to have no treatments for query cancels.
It causes the following issues:(1) Users need to wait for completion of remote query.
Requests for query cancel won't be delivered to remote servers.(2) PGresult objects will be memory leak. The result is not released
when query is cancelled; it is released only when dblink function
is called max_calls times.They are long standing issues (not only in 8.4),
but I hope we will fix them to make dblink more robust.For (1), asynchronous libpq functions should be used instead of blocking
ones, and wait for the remote query using a loop with CHECK_FOR_INTERRUPTS().
How would you structure this loop exactly?
merlin
Merlin Moncure <mmoncure@gmail.com> wrote:
Takahiro<itagaki.takahiro@oss.ntt.co.jp> wrote:
contrib/dblink seems to have no treatments for query cancels.
(1) Users need to wait for completion of remote query.
(2) PGresult objects will be memory leak.
Here is a patch to fix the issues. I hope the fixes will be ported
to older versions if possible.
(1) is fixed by using non-blocking APIs in libpq. I think we should
always use non-blocking APIs even if the dblink function itself is
a blocking-function.
(2) is fixed by RegisterXactCallback(AtEOXact_dblink). However, there
might be any better solutions -- for example, ResourceOwner framework.
For (1), asynchronous libpq functions should be used instead of blocking
ones, and wait for the remote query using a loop with CHECK_FOR_INTERRUPTS().How would you structure this loop exactly?
Please check execute_query() and wait_for_result() in the patch.
Regards,
---
ITAGAKI Takahiro
NTT Open Source Software Center
Attachments:
dblink.patchapplication/octet-stream; name=dblink.patchDownload
*** dblink.c Mon Jun 29 12:31:46 2009
--- dblink.fixed.c Mon Jun 29 12:28:49 2009
***************
*** 40,45 ****
--- 40,46 ----
#include "access/genam.h"
#include "access/heapam.h"
#include "access/tupdesc.h"
+ #include "access/xact.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/pg_index.h"
***************
*** 54,59 ****
--- 55,61 ----
#include "nodes/nodes.h"
#include "nodes/pg_list.h"
#include "parser/parse_type.h"
+ #include "storage/ipc.h"
#include "utils/acl.h"
#include "utils/array.h"
#include "utils/builtins.h"
*************** typedef struct remoteConn
*** 76,81 ****
--- 78,86 ----
bool newXactForCursor; /* Opened a transaction for a cursor */
} remoteConn;
+ extern void _PG_init(void);
+ extern void _PG_fini(void);
+
/*
* Internal declarations
*/
*************** static void dblink_security_check(PGconn
*** 100,109 ****
--- 105,132 ----
static void dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail);
static char *get_connect_string(const char *servername);
static char *escape_param_str(const char *from);
+ static PGresult *execute_query(PGconn *conn, const char *sql);
+ static PGresult *wait_for_result(PGconn *conn);
+ static void register_result(PGresult *res);
+ static void unregister_result(PGresult *res);
+ static void AtEOXact_dblink(XactEvent event, void *arg);
/* Global */
static remoteConn *pconn = NULL;
static HTAB *remoteConnHash = NULL;
+ static List *results = NIL;
+
+ void
+ _PG_init(void)
+ {
+ RegisterXactCallback(AtEOXact_dblink, 0);
+ }
+
+ void
+ _PG_fini(void)
+ {
+ UnregisterXactCallback(AtEOXact_dblink, 0);
+ }
/*
* Following is list that holds multiple remote connections.
*************** dblink_fetch(PG_FUNCTION_ARGS)
*** 580,586 ****
* PGresult will be long-lived even though we are still in a
* short-lived memory context.
*/
! res = PQexec(conn, buf.data);
if (!res ||
(PQresultStatus(res) != PGRES_COMMAND_OK &&
PQresultStatus(res) != PGRES_TUPLES_OK))
--- 603,609 ----
* PGresult will be long-lived even though we are still in a
* short-lived memory context.
*/
! res = execute_query(conn, buf.data);
if (!res ||
(PQresultStatus(res) != PGRES_COMMAND_OK &&
PQresultStatus(res) != PGRES_TUPLES_OK))
*************** dblink_fetch(PG_FUNCTION_ARGS)
*** 637,642 ****
--- 660,666 ----
PQclear(res);
SRF_RETURN_DONE(funcctx);
}
+ register_result(res);
/*
* switch to memory context appropriate for multiple function calls,
*************** dblink_fetch(PG_FUNCTION_ARGS)
*** 695,701 ****
else
{
/* do when there is no more left */
! PQclear(res);
SRF_RETURN_DONE(funcctx);
}
}
--- 719,725 ----
else
{
/* do when there is no more left */
! unregister_result(res);
SRF_RETURN_DONE(funcctx);
}
}
*************** dblink_record_internal(FunctionCallInfo
*** 839,848 ****
/* synchronous query, or async result retrieval */
if (!is_async)
! res = PQexec(conn, sql);
else
{
! res = PQgetResult(conn);
/* NULL means we're all done with the async results */
if (!res)
{
--- 863,872 ----
/* synchronous query, or async result retrieval */
if (!is_async)
! res = execute_query(conn, sql);
else
{
! res = wait_for_result(conn);
/* NULL means we're all done with the async results */
if (!res)
{
*************** dblink_record_internal(FunctionCallInfo
*** 930,935 ****
--- 954,960 ----
MemoryContextSwitchTo(oldcontext);
SRF_RETURN_DONE(funcctx);
}
+ register_result(res);
/* store needed metadata for subsequent calls */
attinmeta = TupleDescGetAttInMetadata(tupdesc);
*************** dblink_record_internal(FunctionCallInfo
*** 989,995 ****
else
{
/* do when there is no more left */
! PQclear(res);
SRF_RETURN_DONE(funcctx);
}
}
--- 1014,1020 ----
else
{
/* do when there is no more left */
! unregister_result(res);
SRF_RETURN_DONE(funcctx);
}
}
*************** dblink_exec(PG_FUNCTION_ARGS)
*** 1167,1173 ****
if (!conn)
DBLINK_CONN_NOT_AVAIL;
! res = PQexec(conn, sql);
if (!res ||
(PQresultStatus(res) != PGRES_COMMAND_OK &&
PQresultStatus(res) != PGRES_TUPLES_OK))
--- 1192,1198 ----
if (!conn)
DBLINK_CONN_NOT_AVAIL;
! res = execute_query(conn, sql);
if (!res ||
(PQresultStatus(res) != PGRES_COMMAND_OK &&
PQresultStatus(res) != PGRES_TUPLES_OK))
*************** escape_param_str(const char *str)
*** 2466,2469 ****
--- 2491,2584 ----
}
return buf->data;
+ }
+
+ static PGresult *
+ execute_query(PGconn *conn, const char *sql)
+ {
+ /* async query send */
+ if (PQsendQuery(conn, sql) != 1)
+ elog(NOTICE, "%s", PQerrorMessage(conn));
+
+ return wait_for_result(conn);
+ }
+
+ static void
+ cancel_query(int code, Datum arg)
+ {
+ PGconn *conn = (PGconn *) DatumGetPointer(arg);
+ PGcancel *cancel;
+ char errbuf[256];
+
+ cancel = PQgetCancel(conn);
+ PQcancel(cancel, errbuf, 256);
+ PQfreeCancel(cancel);
+ }
+
+ static PGresult *
+ wait_for_result(PGconn *conn)
+ {
+ PGresult *res = NULL;
+
+ PG_ENSURE_ERROR_CLEANUP(cancel_query, PointerGetDatum(conn));
+ for (;;)
+ {
+ fd_set rset;
+ int sock;
+ struct timeval tv;
+
+ CHECK_FOR_INTERRUPTS();
+
+ PQconsumeInput(conn);
+ if (!PQisBusy(conn))
+ {
+ res = PQgetResult(conn);
+ break;
+ }
+
+ sock = PQsocket(conn);
+ FD_ZERO(&rset);
+ FD_SET(sock, &rset);
+ tv.tv_sec = 1;
+ tv.tv_usec = 0;
+ if (select(sock + 1, &rset, NULL, NULL, &tv) < 0 && errno != EINTR)
+ break;
+ }
+ PG_END_ENSURE_ERROR_CLEANUP(cancel_query, PointerGetDatum(conn));
+
+ return res;
+ }
+
+ static void
+ register_result(PGresult *res)
+ {
+ MemoryContext oldcontext;
+ oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+ results = lappend(results, res);
+ MemoryContextSwitchTo(oldcontext);
+ }
+
+ /* unregister and free result */
+ static void
+ unregister_result(PGresult *res)
+ {
+ results = list_delete_ptr(results, res);
+ PQclear(res);
+ }
+
+ static void
+ AtEOXact_dblink(XactEvent event, void *arg)
+ {
+ if (results != NIL)
+ {
+ ListCell *cell;
+
+ foreach(cell, results)
+ {
+ PQclear((PGresult *) lfirst(cell));
+ }
+
+ list_free(results);
+ results = NIL;
+ }
}
Joe, Itagaki,
Can you provide an update on this patch? Joe, you were going to
review and possibly commit it. Itagaki, did you have a new version?
Are there any outstanding issues?
Thanks,
Stephen
Stephen Frost <sfrost@snowman.net> wrote:
Can you provide an update on this patch? Joe, you were going to
review and possibly commit it. Itagaki, did you have a new version?
Are there any outstanding issues?
Thanks for your reviewing.
Here is an updated version. I fixed some bugs:
* Fix connection leak on cancel. In the previous patch, running
transactions are canceled, but the temporary connection was leaked.
* Discard all pending results on cancel handler. I implemented
PQexec-compatible behavior with async APIs.
Regards,
---
ITAGAKI Takahiro
NTT Open Source Software Center
Attachments:
dblink_20090916.diffapplication/octet-stream; name=dblink_20090916.diffDownload
*** dblink.c Wed Sep 16 13:01:21 2009
--- dblink-fix.c Wed Sep 16 12:59:23 2009
***************
*** 35,45 ****
--- 35,47 ----
#include <limits.h>
#include "libpq-fe.h"
+ #include "libpq-int.h"
#include "fmgr.h"
#include "funcapi.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/tupdesc.h"
+ #include "access/xact.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/pg_index.h"
***************
*** 54,59 ****
--- 56,62 ----
#include "nodes/nodes.h"
#include "nodes/pg_list.h"
#include "parser/parse_type.h"
+ #include "storage/ipc.h"
#include "utils/acl.h"
#include "utils/array.h"
#include "utils/builtins.h"
***************
*** 76,81 ****
--- 79,87 ----
bool newXactForCursor; /* Opened a transaction for a cursor */
} remoteConn;
+ extern void _PG_init(void);
+ extern void _PG_fini(void);
+
/*
* Internal declarations
*/
***************
*** 100,109 ****
--- 106,133 ----
static void dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail);
static char *get_connect_string(const char *servername);
static char *escape_param_str(const char *from);
+ static PGresult *execute_query(PGconn *conn, const char *sql, bool freeconn);
+ static PGresult *wait_for_result(PGconn *conn, bool freeconn);
+ static void register_result(PGresult *res);
+ static void unregister_result(PGresult *res);
+ static void AtEOXact_dblink(XactEvent event, void *arg);
/* Global */
static remoteConn *pconn = NULL;
static HTAB *remoteConnHash = NULL;
+ static List *managedResults = NIL;
+
+ void
+ _PG_init(void)
+ {
+ RegisterXactCallback(AtEOXact_dblink, 0);
+ }
+
+ void
+ _PG_fini(void)
+ {
+ UnregisterXactCallback(AtEOXact_dblink, 0);
+ }
/*
* Following is list that holds multiple remote connections.
***************
*** 580,586 ****
* PGresult will be long-lived even though we are still in a
* short-lived memory context.
*/
! res = PQexec(conn, buf.data);
if (!res ||
(PQresultStatus(res) != PGRES_COMMAND_OK &&
PQresultStatus(res) != PGRES_TUPLES_OK))
--- 604,610 ----
* PGresult will be long-lived even though we are still in a
* short-lived memory context.
*/
! res = execute_query(conn, buf.data, false);
if (!res ||
(PQresultStatus(res) != PGRES_COMMAND_OK &&
PQresultStatus(res) != PGRES_TUPLES_OK))
***************
*** 637,642 ****
--- 661,667 ----
PQclear(res);
SRF_RETURN_DONE(funcctx);
}
+ register_result(res);
/*
* switch to memory context appropriate for multiple function calls,
***************
*** 695,701 ****
else
{
/* do when there is no more left */
! PQclear(res);
SRF_RETURN_DONE(funcctx);
}
}
--- 720,726 ----
else
{
/* do when there is no more left */
! unregister_result(res);
SRF_RETURN_DONE(funcctx);
}
}
***************
*** 839,848 ****
/* synchronous query, or async result retrieval */
if (!is_async)
! res = PQexec(conn, sql);
else
{
! res = PQgetResult(conn);
/* NULL means we're all done with the async results */
if (!res)
{
--- 864,873 ----
/* synchronous query, or async result retrieval */
if (!is_async)
! res = execute_query(conn, sql, freeconn);
else
{
! res = wait_for_result(conn, freeconn);
/* NULL means we're all done with the async results */
if (!res)
{
***************
*** 930,935 ****
--- 955,961 ----
MemoryContextSwitchTo(oldcontext);
SRF_RETURN_DONE(funcctx);
}
+ register_result(res);
/* store needed metadata for subsequent calls */
attinmeta = TupleDescGetAttInMetadata(tupdesc);
***************
*** 989,995 ****
else
{
/* do when there is no more left */
! PQclear(res);
SRF_RETURN_DONE(funcctx);
}
}
--- 1015,1021 ----
else
{
/* do when there is no more left */
! unregister_result(res);
SRF_RETURN_DONE(funcctx);
}
}
***************
*** 1167,1173 ****
if (!conn)
DBLINK_CONN_NOT_AVAIL;
! res = PQexec(conn, sql);
if (!res ||
(PQresultStatus(res) != PGRES_COMMAND_OK &&
PQresultStatus(res) != PGRES_TUPLES_OK))
--- 1193,1199 ----
if (!conn)
DBLINK_CONN_NOT_AVAIL;
! res = execute_query(conn, sql, freeconn);
if (!res ||
(PQresultStatus(res) != PGRES_COMMAND_OK &&
PQresultStatus(res) != PGRES_TUPLES_OK))
***************
*** 2550,2552 ****
--- 2576,2748 ----
return buf->data;
}
+
+ static PGresult *
+ execute_query(PGconn *conn, const char *sql, bool freeconn)
+ {
+ /* async query send */
+ if (PQsendQuery(conn, sql) != 1)
+ elog(NOTICE, "%s", PQerrorMessage(conn));
+
+ return wait_for_result(conn, freeconn);
+ }
+
+ typedef struct
+ {
+ PGconn *conn;
+ bool freeconn;
+ } cancel_query_args;
+
+ static void
+ cancel_query(int code, Datum arg)
+ {
+ cancel_query_args *args = (cancel_query_args *) DatumGetPointer(arg);
+ PGconn *conn = args->conn;
+ PGresult *res;
+
+ /* cancel current query */
+ if (PQisBusy(conn))
+ {
+ PGcancel *cancel;
+ char errbuf[256];
+
+ cancel = PQgetCancel(conn);
+ if (cancel != NULL)
+ {
+ PQcancel(cancel, errbuf, 256);
+ PQfreeCancel(cancel);
+ }
+ }
+
+ /* discard all results */
+ while ((res = PQgetResult(conn)) != NULL)
+ PQclear(res);
+
+ /* disconnect if a temporary connection */
+ if (args->freeconn)
+ PQfinish(conn);
+ }
+
+ /*
+ * copied from PQexecFinish in libpq.
+ */
+ static PGresult *
+ getLastResult(PGconn *conn)
+ {
+ PGresult *result;
+ PGresult *lastResult;
+
+ lastResult = NULL;
+ while ((result = PQgetResult(conn)) != NULL)
+ {
+ if (lastResult)
+ {
+ if (lastResult->resultStatus == PGRES_FATAL_ERROR &&
+ result->resultStatus == PGRES_FATAL_ERROR)
+ {
+ PQExpBufferData errorBuf;
+
+ initPQExpBuffer(&errorBuf);
+ if (lastResult->errMsg)
+ {
+ appendPQExpBufferStr(&errorBuf, lastResult->errMsg);
+ free(lastResult->errMsg);
+ }
+ appendPQExpBufferStr(&errorBuf, result->errMsg);
+ lastResult->errMsg = errorBuf.data;
+
+ PQclear(result);
+ result = lastResult;
+
+ /* Make sure PQerrorMessage agrees with concatenated result */
+ resetPQExpBuffer(&conn->errorMessage);
+ appendPQExpBufferStr(&conn->errorMessage, result->errMsg);
+ }
+ else
+ PQclear(lastResult);
+ }
+ lastResult = result;
+ if (result->resultStatus == PGRES_COPY_IN ||
+ result->resultStatus == PGRES_COPY_OUT ||
+ conn->status == CONNECTION_BAD)
+ break;
+ }
+
+ return lastResult;
+ }
+
+ static PGresult *
+ wait_for_result(PGconn *conn, bool freeconn)
+ {
+ PGresult *res = NULL;
+ cancel_query_args args;
+
+ args.conn = conn;
+ args.freeconn = freeconn;
+
+ PG_ENSURE_ERROR_CLEANUP(cancel_query, PointerGetDatum(&args));
+ {
+ for (;;)
+ {
+ fd_set rset;
+ int sock;
+ struct timeval tv;
+
+ CHECK_FOR_INTERRUPTS();
+
+ PQconsumeInput(conn);
+ if (!PQisBusy(conn))
+ {
+ res = getLastResult(conn);
+ break;
+ }
+
+ sock = PQsocket(conn);
+ if (sock < 0)
+ break;
+ FD_ZERO(&rset);
+ FD_SET(sock, &rset);
+ tv.tv_sec = 1;
+ tv.tv_usec = 0;
+ if (select(sock + 1, &rset, NULL, NULL, &tv) < 0 && errno != EINTR)
+ break;
+ }
+ }
+ PG_END_ENSURE_ERROR_CLEANUP(cancel_query, PointerGetDatum(&args));
+
+ return res;
+ }
+
+ static void
+ register_result(PGresult *res)
+ {
+ MemoryContext oldcontext;
+ oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+ managedResults = lappend(managedResults, res);
+ MemoryContextSwitchTo(oldcontext);
+ }
+
+ /* unregister and free result */
+ static void
+ unregister_result(PGresult *res)
+ {
+ managedResults = list_delete_ptr(managedResults, res);
+ PQclear(res);
+ }
+
+ static void
+ AtEOXact_dblink(XactEvent event, void *arg)
+ {
+ if (managedResults != NIL)
+ {
+ ListCell *cell;
+
+ foreach(cell, managedResults)
+ {
+ PQclear((PGresult *) lfirst(cell));
+ }
+
+ list_free(managedResults);
+ managedResults = NIL;
+ }
+ }
Itagaki Takahiro wrote:
Stephen Frost <sfrost@snowman.net> wrote:
Can you provide an update on this patch? Joe, you were going to
review and possibly commit it. Itagaki, did you have a new version?
Are there any outstanding issues?Thanks for your reviewing.
Here is an updated version. I fixed some bugs:* Fix connection leak on cancel. In the previous patch, running
transactions are canceled, but the temporary connection was leaked.
* Discard all pending results on cancel handler. I implemented
PQexec-compatible behavior with async APIs.
Thanks for the update. I am planning to review, but my time will be
extremely limited for the next two weeks. I might find some time this
coming weekend, and will do what I can, but after that it will be 9/28
(after my son's wedding on 9/27) before I get time to look closely.
Joe
What happened to this patch?
---------------------------------------------------------------------------
Itagaki Takahiro wrote:
Merlin Moncure <mmoncure@gmail.com> wrote:
Takahiro<itagaki.takahiro@oss.ntt.co.jp> wrote:
contrib/dblink seems to have no treatments for query cancels.
(1) Users need to wait for completion of remote query.
(2) PGresult objects will be memory leak.Here is a patch to fix the issues. I hope the fixes will be ported
to older versions if possible.(1) is fixed by using non-blocking APIs in libpq. I think we should
always use non-blocking APIs even if the dblink function itself is
a blocking-function.(2) is fixed by RegisterXactCallback(AtEOXact_dblink). However, there
might be any better solutions -- for example, ResourceOwner framework.For (1), asynchronous libpq functions should be used instead of blocking
ones, and wait for the remote query using a loop with CHECK_FOR_INTERRUPTS().How would you structure this loop exactly?
Please check execute_query() and wait_for_result() in the patch.
Regards,
---
ITAGAKI Takahiro
NTT Open Source Software Center
[ Attachment, skipping... ]
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
--
Bruce Momjian <bruce@momjian.us> http://momjian.us
EnterpriseDB http://enterprisedb.com
PG East: http://www.enterprisedb.com/community/nav-pg-east-2010.do
+ If your life is a hard drive, Christ can be your backup. +
On Wed, Feb 24, 2010 at 2:33 PM, Bruce Momjian <bruce@momjian.us> wrote:
What happened to this patch?
I'm pretty sure it's the same as this:
https://commitfest.postgresql.org/action/patch_view?id=263
...Robert
Robert Haas <robertmhaas@gmail.com> wrote:
I'm pretty sure it's the same as this:
https://commitfest.postgresql.org/action/patch_view?id=263
Yes, (2) are resolved with the patch with a different implementation.
(2) is fixed by RegisterXactCallback(AtEOXact_dblink). However, there
might be any better solutions -- for example, ResourceOwner framework.
(1) still exists, but we had a consensus that we don't have to fix it
because we have async functions.
(1) is fixed by using non-blocking APIs in libpq. I think we should
always use non-blocking APIs even if the dblink function itself is
a blocking-function.
Regards,
---
Takahiro Itagaki
NTT Open Source Software Center