Fix for memory leak in dblink

Started by Takahiro Itagakiabout 16 years ago3 messages
#1Takahiro Itagaki
itagaki.takahiro@oss.ntt.co.jp
1 attachment(s)

There is a memory leak in dblink when we cancel a query during
returning tuples. It could leak a PGresult because memory used
by it is not palloc'ed one. I wrote a patch[1]http://archives.postgresql.org/pgsql-hackers/2009-06/msg01358.php before, but I've
badly used global variables to track the resource.

The attached is a cleaned up patch rewritten to use a tuplestore
(SFRM_Materialize mode) to return tuples suggested at [2]http://archives.postgresql.org/pgsql-hackers/2009-10/msg00292.php. Since
we don't return from the dblink function in tuplestore mode, we
can surely release the PGresult with a PG_CATCH block even on error.

Also, dblink_record_internal() and dblink_fetch() are rearranged
to share the same code to return tuples for code refactoring.

[1]: http://archives.postgresql.org/pgsql-hackers/2009-06/msg01358.php
[2]: http://archives.postgresql.org/pgsql-hackers/2009-10/msg00292.php

Regards,
---
Takahiro Itagaki
NTT Open Source Software Center

Attachments:

dblink-memleak_20100112.patchapplication/octet-stream; name=dblink-memleak_20100112.patchDownload
diff -cprN head/contrib/dblink/dblink.c work/contrib/dblink/dblink.c
*** head/contrib/dblink/dblink.c	2010-01-04 09:10:26.638773000 +0900
--- work/contrib/dblink/dblink.c	2010-01-12 12:26:03.747620050 +0900
*************** typedef struct remoteConn
*** 80,85 ****
--- 80,86 ----
   * Internal declarations
   */
  static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
+ static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);
  static remoteConn *getConnectionByName(const char *name);
  static HTAB *createConnHash(void);
  static void createNewConnection(const char *name, remoteConn *rconn);
*************** PG_FUNCTION_INFO_V1(dblink_fetch);
*** 504,703 ****
  Datum
  dblink_fetch(PG_FUNCTION_ARGS)
  {
! 	FuncCallContext *funcctx;
! 	TupleDesc	tupdesc = NULL;
! 	int			call_cntr;
! 	int			max_calls;
! 	AttInMetadata *attinmeta;
! 	PGresult   *res = NULL;
! 	MemoryContext oldcontext;
! 	char	   *conname = NULL;
! 	remoteConn *rconn = NULL;
  
  	DBLINK_INIT;
  
! 	/* stuff done only on the first call of the function */
! 	if (SRF_IS_FIRSTCALL())
  	{
! 		PGconn	   *conn = NULL;
! 		StringInfoData buf;
! 		char	   *curname = NULL;
! 		int			howmany = 0;
! 		bool		fail = true;	/* default to backward compatible */
  
! 		if (PG_NARGS() == 4)
  		{
- 			/* text,text,int,bool */
  			conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
  			curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
  			howmany = PG_GETARG_INT32(2);
- 			fail = PG_GETARG_BOOL(3);
  
  			rconn = getConnectionByName(conname);
  			if (rconn)
  				conn = rconn->conn;
  		}
! 		else if (PG_NARGS() == 3)
! 		{
! 			/* text,text,int or text,int,bool */
! 			if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
! 			{
! 				curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
! 				howmany = PG_GETARG_INT32(1);
! 				fail = PG_GETARG_BOOL(2);
! 				conn = pconn->conn;
! 			}
! 			else
! 			{
! 				conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
! 				curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
! 				howmany = PG_GETARG_INT32(2);
! 
! 				rconn = getConnectionByName(conname);
! 				if (rconn)
! 					conn = rconn->conn;
! 			}
! 		}
! 		else if (PG_NARGS() == 2)
! 		{
! 			/* text,int */
! 			curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
! 			howmany = PG_GETARG_INT32(1);
! 			conn = pconn->conn;
! 		}
! 
! 		if (!conn)
! 			DBLINK_CONN_NOT_AVAIL;
! 
! 		initStringInfo(&buf);
! 		appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
! 
! 		/* create a function context for cross-call persistence */
! 		funcctx = SRF_FIRSTCALL_INIT();
! 
! 		/*
! 		 * Try to execute the query.  Note that since libpq uses malloc, the
! 		 * PGresult will be long-lived even though we are still in a
! 		 * short-lived memory context.
! 		 */
! 		res = PQexec(conn, buf.data);
! 		if (!res ||
! 			(PQresultStatus(res) != PGRES_COMMAND_OK &&
! 			 PQresultStatus(res) != PGRES_TUPLES_OK))
! 		{
! 			dblink_res_error(conname, res, "could not fetch from cursor", fail);
! 			SRF_RETURN_DONE(funcctx);
! 		}
! 		else if (PQresultStatus(res) == PGRES_COMMAND_OK)
! 		{
! 			/* cursor does not exist - closed already or bad name */
! 			PQclear(res);
! 			ereport(ERROR,
! 					(errcode(ERRCODE_INVALID_CURSOR_NAME),
! 					 errmsg("cursor \"%s\" does not exist", curname)));
! 		}
! 
! 		funcctx->max_calls = PQntuples(res);
! 
! 		/* got results, keep track of them */
! 		funcctx->user_fctx = res;
! 
! 		/* get a tuple descriptor for our result type */
! 		switch (get_call_result_type(fcinfo, NULL, &tupdesc))
! 		{
! 			case TYPEFUNC_COMPOSITE:
! 				/* success */
! 				break;
! 			case TYPEFUNC_RECORD:
! 				/* failed to determine actual type of RECORD */
! 				ereport(ERROR,
! 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! 						 errmsg("function returning record called in context "
! 								"that cannot accept type record")));
! 				break;
! 			default:
! 				/* result type isn't composite */
! 				elog(ERROR, "return type must be a row type");
! 				break;
! 		}
! 
! 		/* check result and tuple descriptor have the same number of columns */
! 		if (PQnfields(res) != tupdesc->natts)
! 			ereport(ERROR,
! 					(errcode(ERRCODE_DATATYPE_MISMATCH),
! 					 errmsg("remote query result rowtype does not match "
! 							"the specified FROM clause rowtype")));
! 
! 		/*
! 		 * fast track when no results.	We could exit earlier, but then we'd
! 		 * not report error if the result tuple type is wrong.
! 		 */
! 		if (funcctx->max_calls < 1)
! 		{
! 			PQclear(res);
! 			SRF_RETURN_DONE(funcctx);
! 		}
! 
! 		/*
! 		 * switch to memory context appropriate for multiple function calls,
! 		 * so we can make long-lived copy of tupdesc etc
! 		 */
! 		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
! 
! 		/* make sure we have a persistent copy of the tupdesc */
! 		tupdesc = CreateTupleDescCopy(tupdesc);
  
! 		/* store needed metadata for subsequent calls */
! 		attinmeta = TupleDescGetAttInMetadata(tupdesc);
! 		funcctx->attinmeta = attinmeta;
  
! 		MemoryContextSwitchTo(oldcontext);
! 	}
  
! 	/* stuff done on every call of the function */
! 	funcctx = SRF_PERCALL_SETUP();
  
  	/*
! 	 * initialize per-call variables
  	 */
! 	call_cntr = funcctx->call_cntr;
! 	max_calls = funcctx->max_calls;
! 
! 	res = (PGresult *) funcctx->user_fctx;
! 	attinmeta = funcctx->attinmeta;
! 	tupdesc = attinmeta->tupdesc;
! 
! 	if (call_cntr < max_calls)	/* do when there is more left to send */
  	{
! 		char	  **values;
! 		HeapTuple	tuple;
! 		Datum		result;
! 		int			i;
! 		int			nfields = PQnfields(res);
! 
! 		values = (char **) palloc(nfields * sizeof(char *));
! 		for (i = 0; i < nfields; i++)
! 		{
! 			if (PQgetisnull(res, call_cntr, i) == 0)
! 				values[i] = PQgetvalue(res, call_cntr, i);
! 			else
! 				values[i] = NULL;
! 		}
! 
! 		/* build the tuple */
! 		tuple = BuildTupleFromCStrings(attinmeta, values);
! 
! 		/* make the tuple into a datum */
! 		result = HeapTupleGetDatum(tuple);
! 
! 		SRF_RETURN_NEXT(funcctx, result);
  	}
! 	else
  	{
! 		/* do when there is no more left */
  		PQclear(res);
! 		SRF_RETURN_DONE(funcctx);
  	}
  }
  
  /*
--- 505,598 ----
  Datum
  dblink_fetch(PG_FUNCTION_ARGS)
  {
! 	ReturnSetInfo  *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
! 	PGresult	   *res = NULL;
! 	char		   *conname = NULL;
! 	remoteConn	   *rconn = NULL;
! 	PGconn		   *conn = NULL;
! 	StringInfoData	buf;
! 	char		   *curname = NULL;
! 	int				howmany = 0;
! 	bool			fail = true;	/* default to backward compatible */
  
  	DBLINK_INIT;
  
! 	if (PG_NARGS() == 4)
  	{
! 		/* text,text,int,bool */
! 		conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
! 		curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
! 		howmany = PG_GETARG_INT32(2);
! 		fail = PG_GETARG_BOOL(3);
  
! 		rconn = getConnectionByName(conname);
! 		if (rconn)
! 			conn = rconn->conn;
! 	}
! 	else if (PG_NARGS() == 3)
! 	{
! 		/* text,text,int or text,int,bool */
! 		if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
! 		{
! 			curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
! 			howmany = PG_GETARG_INT32(1);
! 			fail = PG_GETARG_BOOL(2);
! 			conn = pconn->conn;
! 		}
! 		else
  		{
  			conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
  			curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
  			howmany = PG_GETARG_INT32(2);
  
  			rconn = getConnectionByName(conname);
  			if (rconn)
  				conn = rconn->conn;
  		}
! 	}
! 	else if (PG_NARGS() == 2)
! 	{
! 		/* text,int */
! 		curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
! 		howmany = PG_GETARG_INT32(1);
! 		conn = pconn->conn;
! 	}
  
! 	if (!conn)
! 		DBLINK_CONN_NOT_AVAIL;
  
! 	/* let the caller know we're sending back a tuplestore */
! 	rsinfo->returnMode = SFRM_Materialize;
! 	rsinfo->setResult = NULL;
! 	rsinfo->setDesc = NULL;
  
! 	initStringInfo(&buf);
! 	appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
  
  	/*
! 	 * Try to execute the query.  Note that since libpq uses malloc, the
! 	 * PGresult will be long-lived even though we are still in a
! 	 * short-lived memory context.
  	 */
! 	res = PQexec(conn, buf.data);
! 	if (!res ||
! 		(PQresultStatus(res) != PGRES_COMMAND_OK &&
! 		 PQresultStatus(res) != PGRES_TUPLES_OK))
  	{
! 		dblink_res_error(conname, res, "could not fetch from cursor", fail);
! 		return (Datum) 0;
  	}
! 	else if (PQresultStatus(res) == PGRES_COMMAND_OK)
  	{
! 		/* cursor does not exist - closed already or bad name */
  		PQclear(res);
! 		ereport(ERROR,
! 				(errcode(ERRCODE_INVALID_CURSOR_NAME),
! 				 errmsg("cursor \"%s\" does not exist", curname)));
  	}
+ 
+ 	materializeResult(fcinfo, res);
+ 	return (Datum) 0;
  }
  
  /*
*************** dblink_get_result(PG_FUNCTION_ARGS)
*** 749,895 ****
  static Datum
  dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
  {
! 	FuncCallContext *funcctx;
! 	TupleDesc	tupdesc = NULL;
! 	int			call_cntr;
! 	int			max_calls;
! 	AttInMetadata *attinmeta;
! 	char	   *msg;
! 	PGresult   *res = NULL;
! 	bool		is_sql_cmd = false;
! 	char	   *sql_cmd_status = NULL;
! 	MemoryContext oldcontext;
! 	bool		freeconn = false;
  
  	DBLINK_INIT;
  
! 	/* stuff done only on the first call of the function */
! 	if (SRF_IS_FIRSTCALL())
  	{
! 		PGconn	   *conn = NULL;
! 		char	   *connstr = NULL;
! 		char	   *sql = NULL;
! 		char	   *conname = NULL;
! 		remoteConn *rconn = NULL;
! 		bool		fail = true;	/* default to backward compatible */
! 
! 		/* create a function context for cross-call persistence */
! 		funcctx = SRF_FIRSTCALL_INIT();
! 
! 		/*
! 		 * switch to memory context appropriate for multiple function calls
! 		 */
! 		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
! 
! 		if (!is_async)
  		{
! 			if (PG_NARGS() == 3)
! 			{
! 				/* text,text,bool */
! 				DBLINK_GET_CONN;
! 				sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
! 				fail = PG_GETARG_BOOL(2);
! 			}
! 			else if (PG_NARGS() == 2)
! 			{
! 				/* text,text or text,bool */
! 				if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
! 				{
! 					conn = pconn->conn;
! 					sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
! 					fail = PG_GETARG_BOOL(1);
! 				}
! 				else
! 				{
! 					DBLINK_GET_CONN;
! 					sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
! 				}
! 			}
! 			else if (PG_NARGS() == 1)
! 			{
! 				/* text */
! 				conn = pconn->conn;
! 				sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
! 			}
! 			else
! 				/* shouldn't happen */
! 				elog(ERROR, "wrong number of arguments");
  		}
! 		else	/* is_async */
  		{
! 			/* get async result */
! 			if (PG_NARGS() == 2)
  			{
! 				/* text,bool */
! 				DBLINK_GET_CONN;
  				fail = PG_GETARG_BOOL(1);
  			}
! 			else if (PG_NARGS() == 1)
  			{
- 				/* text */
  				DBLINK_GET_CONN;
  			}
- 			else
- 				/* shouldn't happen */
- 				elog(ERROR, "wrong number of arguments");
  		}
! 
! 		if (!conn)
! 			DBLINK_CONN_NOT_AVAIL;
! 
! 		/* synchronous query, or async result retrieval */
! 		if (!is_async)
! 			res = PQexec(conn, sql);
  		else
  		{
! 			res = PQgetResult(conn);
! 			/* NULL means we're all done with the async results */
! 			if (!res)
! 			{
! 				MemoryContextSwitchTo(oldcontext);
! 				SRF_RETURN_DONE(funcctx);
! 			}
  		}
! 
! 		if (!res ||
! 			(PQresultStatus(res) != PGRES_COMMAND_OK &&
! 			 PQresultStatus(res) != PGRES_TUPLES_OK))
! 		{
! 			if (freeconn)
! 				PQfinish(conn);
! 			dblink_res_error(conname, res, "could not execute query", fail);
! 			MemoryContextSwitchTo(oldcontext);
! 			SRF_RETURN_DONE(funcctx);
  		}
  
  		if (PQresultStatus(res) == PGRES_COMMAND_OK)
  		{
  			is_sql_cmd = true;
  
! 			/* need a tuple descriptor representing one TEXT column */
  			tupdesc = CreateTemplateTupleDesc(1, false);
  			TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
  							   TEXTOID, -1, 0);
! 
! 			/*
! 			 * and save a copy of the command status string to return as our
! 			 * result tuple
! 			 */
! 			sql_cmd_status = PQcmdStatus(res);
! 			funcctx->max_calls = 1;
  		}
  		else
! 			funcctx->max_calls = PQntuples(res);
! 
! 		/* got results, keep track of them */
! 		funcctx->user_fctx = res;
  
! 		/* if needed, close the connection to the database and cleanup */
! 		if (freeconn)
! 			PQfinish(conn);
  
- 		if (!is_sql_cmd)
- 		{
  			/* get a tuple descriptor for our result type */
  			switch (get_call_result_type(fcinfo, NULL, &tupdesc))
  			{
--- 644,799 ----
  static Datum
  dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
  {
! 	ReturnSetInfo  *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
! 	char		   *msg;
! 	PGresult	   *res = NULL;
! 	PGconn		   *conn = NULL;
! 	char		   *connstr = NULL;
! 	char		   *sql = NULL;
! 	char		   *conname = NULL;
! 	remoteConn	   *rconn = NULL;
! 	bool			fail = true;	/* default to backward compatible */
! 	bool			freeconn = false;
! 
! 	/* check to see if caller supports us returning a tuplestore */
! 	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
! 		ereport(ERROR,
! 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! 				 errmsg("set-valued function called in context that cannot accept a set")));
! 	if (!(rsinfo->allowedModes & SFRM_Materialize))
! 		ereport(ERROR,
! 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! 				 errmsg("materialize mode required, but it is not " \
! 						"allowed in this context")));
  
  	DBLINK_INIT;
  
! 	if (!is_async)
  	{
! 		if (PG_NARGS() == 3)
  		{
! 			/* text,text,bool */
! 			DBLINK_GET_CONN;
! 			sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
! 			fail = PG_GETARG_BOOL(2);
  		}
! 		else if (PG_NARGS() == 2)
  		{
! 			/* text,text or text,bool */
! 			if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
  			{
! 				conn = pconn->conn;
! 				sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
  				fail = PG_GETARG_BOOL(1);
  			}
! 			else
  			{
  				DBLINK_GET_CONN;
+ 				sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
  			}
  		}
! 		else if (PG_NARGS() == 1)
! 		{
! 			/* text */
! 			conn = pconn->conn;
! 			sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
! 		}
  		else
+ 			/* shouldn't happen */
+ 			elog(ERROR, "wrong number of arguments");
+ 	}
+ 	else	/* is_async */
+ 	{
+ 		/* get async result */
+ 		if (PG_NARGS() == 2)
  		{
! 			/* text,bool */
! 			DBLINK_GET_CONN;
! 			fail = PG_GETARG_BOOL(1);
  		}
! 		else if (PG_NARGS() == 1)
! 		{
! 			/* text */
! 			DBLINK_GET_CONN;
  		}
+ 		else
+ 			/* shouldn't happen */
+ 			elog(ERROR, "wrong number of arguments");
+ 	}
+ 
+ 	if (!conn)
+ 		DBLINK_CONN_NOT_AVAIL;
+ 
+ 	/* let the caller know we're sending back a tuplestore */
+ 	rsinfo->returnMode = SFRM_Materialize;
+ 	rsinfo->setResult = NULL;
+ 	rsinfo->setDesc = NULL;
+ 
+ 	/* synchronous query, or async result retrieval */
+ 	if (!is_async)
+ 		res = PQexec(conn, sql);
+ 	else
+ 	{
+ 		res = PQgetResult(conn);
+ 		/* NULL means we're all done with the async results */
+ 		if (!res)
+ 			return (Datum) 0;
+ 	}
+ 
+ 	/* if needed, close the connection to the database and cleanup */
+ 	if (freeconn)
+ 		PQfinish(conn);
+ 
+ 	if (!res ||
+ 		(PQresultStatus(res) != PGRES_COMMAND_OK &&
+ 		 PQresultStatus(res) != PGRES_TUPLES_OK))
+ 	{
+ 		dblink_res_error(conname, res, "could not execute query", fail);
+ 		return (Datum) 0;
+ 	}
+ 
+ 	materializeResult(fcinfo, res);
+ 	return (Datum) 0;
+ }
+ 
+ /*
+  * Materialize the PGresult to return them as the function result.
+  * The res will be released in this function.
+  */
+ static void
+ materializeResult(FunctionCallInfo fcinfo, PGresult *res)
+ {
+ 	ReturnSetInfo  *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ 
+ 	Assert(rsinfo->returnMode == SFRM_Materialize);
+ 
+ 	PG_TRY();
+ 	{
+ 		TupleDesc	tupdesc;
+ 		bool		is_sql_cmd = false;
+ 		int			ntuples;
+ 		int			nfields;
  
  		if (PQresultStatus(res) == PGRES_COMMAND_OK)
  		{
  			is_sql_cmd = true;
  
! 			/*
! 			 * need a tuple descriptor representing one TEXT column to
! 			 * return the command status string as our result tuple
! 			 */
  			tupdesc = CreateTemplateTupleDesc(1, false);
  			TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
  							   TEXTOID, -1, 0);
! 			ntuples = 1;
! 			nfields = 1;
  		}
  		else
! 		{
! 			Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
  
! 			is_sql_cmd = false;
  
  			/* get a tuple descriptor for our result type */
  			switch (get_call_result_type(fcinfo, NULL, &tupdesc))
  			{
*************** dblink_record_internal(FunctionCallInfo 
*** 911,997 ****
  
  			/* make sure we have a persistent copy of the tupdesc */
  			tupdesc = CreateTupleDescCopy(tupdesc);
  		}
  
  		/*
  		 * check result and tuple descriptor have the same number of columns
  		 */
! 		if (PQnfields(res) != tupdesc->natts)
  			ereport(ERROR,
  					(errcode(ERRCODE_DATATYPE_MISMATCH),
  					 errmsg("remote query result rowtype does not match "
  							"the specified FROM clause rowtype")));
  
! 		/* fast track when no results */
! 		if (funcctx->max_calls < 1)
  		{
! 			if (res)
! 				PQclear(res);
  			MemoryContextSwitchTo(oldcontext);
- 			SRF_RETURN_DONE(funcctx);
- 		}
- 
- 		/* store needed metadata for subsequent calls */
- 		attinmeta = TupleDescGetAttInMetadata(tupdesc);
- 		funcctx->attinmeta = attinmeta;
- 
- 		MemoryContextSwitchTo(oldcontext);
- 
- 	}
  
! 	/* stuff done on every call of the function */
! 	funcctx = SRF_PERCALL_SETUP();
! 
! 	/*
! 	 * initialize per-call variables
! 	 */
! 	call_cntr = funcctx->call_cntr;
! 	max_calls = funcctx->max_calls;
! 
! 	res = (PGresult *) funcctx->user_fctx;
! 	attinmeta = funcctx->attinmeta;
! 	tupdesc = attinmeta->tupdesc;
  
! 	if (call_cntr < max_calls)	/* do when there is more left to send */
! 	{
! 		char	  **values;
! 		HeapTuple	tuple;
! 		Datum		result;
  
! 		if (!is_sql_cmd)
! 		{
! 			int			i;
! 			int			nfields = PQnfields(res);
  
! 			values = (char **) palloc(nfields * sizeof(char *));
! 			for (i = 0; i < nfields; i++)
! 			{
! 				if (PQgetisnull(res, call_cntr, i) == 0)
! 					values[i] = PQgetvalue(res, call_cntr, i);
  				else
! 					values[i] = NULL;
! 			}
! 		}
! 		else
! 		{
! 			values = (char **) palloc(1 * sizeof(char *));
! 			values[0] = sql_cmd_status;
! 		}
  
! 		/* build the tuple */
! 		tuple = BuildTupleFromCStrings(attinmeta, values);
  
! 		/* make the tuple into a datum */
! 		result = HeapTupleGetDatum(tuple);
  
! 		SRF_RETURN_NEXT(funcctx, result);
  	}
! 	else
  	{
! 		/* do when there is no more left */
  		PQclear(res);
! 		SRF_RETURN_DONE(funcctx);
  	}
  }
  
  /*
--- 815,892 ----
  
  			/* make sure we have a persistent copy of the tupdesc */
  			tupdesc = CreateTupleDescCopy(tupdesc);
+ 			ntuples = PQntuples(res);
+ 			nfields = PQnfields(res);
  		}
  
  		/*
  		 * check result and tuple descriptor have the same number of columns
  		 */
! 		if (nfields != tupdesc->natts)
  			ereport(ERROR,
  					(errcode(ERRCODE_DATATYPE_MISMATCH),
  					 errmsg("remote query result rowtype does not match "
  							"the specified FROM clause rowtype")));
  
! 		if (ntuples > 0)
  		{
! 			AttInMetadata	   *attinmeta;
! 			Tuplestorestate	   *tupstore;
! 			MemoryContext		oldcontext;
! 			int					row;
! 			char			  **values;
! 
! 			attinmeta = TupleDescGetAttInMetadata(tupdesc);
! 
! 			oldcontext = MemoryContextSwitchTo(
! 								rsinfo->econtext->ecxt_per_query_memory);
! 			tupstore = tuplestore_begin_heap(true, false, work_mem);
! 			rsinfo->setResult = tupstore;
! 			rsinfo->setDesc = tupdesc;
  			MemoryContextSwitchTo(oldcontext);
  
! 			values = (char **) palloc(nfields * sizeof(char *));
  
! 			/* put all tuples into the tupleslot */
! 			for (row = 0; row < ntuples; row++)
! 			{
! 				HeapTuple	tuple;
  
! 				if (!is_sql_cmd)
! 				{
! 					int			i;
  
! 					for (i = 0; i < nfields; i++)
! 					{
! 						if (PQgetisnull(res, row, i))
! 							values[i] = NULL;
! 						else
! 							values[i] = PQgetvalue(res, row, i);
! 					}
! 				}
  				else
! 				{
! 					values[0] = PQcmdStatus(res);
! 				}
  
! 				/* build the tuple and put it into the tuplestore. */
! 				tuple = BuildTupleFromCStrings(attinmeta, values);
! 				tuplestore_puttuple(tupstore, tuple);
! 			}
  
! 			/* clean up and return the tuplestore */
! 			tuplestore_donestoring(tupstore);
! 		}
  
! 		PQclear(res);
  	}
! 	PG_CATCH();
  	{
! 		/* be sure to release the libpq result */
  		PQclear(res);
! 		PG_RE_THROW();
  	}
+ 	PG_END_TRY();
  }
  
  /*
#2Joe Conway
mail@joeconway.com
In reply to: Takahiro Itagaki (#1)
Re: Fix for memory leak in dblink

On 01/11/2010 07:43 PM, Takahiro Itagaki wrote:

There is a memory leak in dblink when we cancel a query during
returning tuples. It could leak a PGresult because memory used
by it is not palloc'ed one. I wrote a patch[1] before, but I've
badly used global variables to track the resource.

The attached is a cleaned up patch rewritten to use a tuplestore
(SFRM_Materialize mode) to return tuples suggested at [2]. Since
we don't return from the dblink function in tuplestore mode, we
can surely release the PGresult with a PG_CATCH block even on error.

Thanks -- I'll review this weekend.

Joe

#3Joe Conway
mail@joeconway.com
In reply to: Takahiro Itagaki (#1)
Re: Fix for memory leak in dblink

On 01/11/2010 07:43 PM, Takahiro Itagaki wrote:

There is a memory leak in dblink when we cancel a query during
returning tuples. It could leak a PGresult because memory used
by it is not palloc'ed one. I wrote a patch[1] before, but I've
badly used global variables to track the resource.

The attached is a cleaned up patch rewritten to use a tuplestore
(SFRM_Materialize mode) to return tuples suggested at [2]. Since
we don't return from the dblink function in tuplestore mode, we
can surely release the PGresult with a PG_CATCH block even on error.

Also, dblink_record_internal() and dblink_fetch() are rearranged
to share the same code to return tuples for code refactoring.

[1] http://archives.postgresql.org/pgsql-hackers/2009-06/msg01358.php
[2] http://archives.postgresql.org/pgsql-hackers/2009-10/msg00292.php

This looks good to me. I'll commit in the next 24 hours if there are no
objections.

Thanks,

Joe