Using user mapping OID as hash key for connection hash
Hi All,
As discussed in postgres_fdw join pushdown thread [1]/messages/by-id/CAFjFpRf-LiD5bai4D6cSUseJh=xxJqipo_vN8mTnZG16TMWJ-w@mail.gmail.com, for two different
effective local users which use public user mapping we will be creating two
different connections to the foreign server with the same credentials.
Robert suggested [2]/messages/by-id/CA+TgmoYMMv_Du-VPpQ1d7UfSjaOPBQ+LgpxTChnuQfOBjg2phg@mail.gmail.com -- Best Wishes, Ashutosh Bapat EnterpriseDB Corporation The Postgres Database Company that we should use user mapping OID as the connection
cache key instead of current userid and serverid.
There are two patches attached here:
1. pg_fdw_concache.patch.short - shorter version of the fix. Right now
ForeignTable, ForeignServer have corresponding OIDs saved in these
structures. But UserMapping doesn't. Patch adds user mapping OID as a
member to this structure. This member is then used as key in
GetConnection().
2. pg_fdw_concache.patch.large - most of the callers of GetConnection() get
ForeignServer object just to pass it to GetConnection(). GetConnection can
obtain ForeignServer by using serverid from UserMapping and doesn't need
ForeignServer to be an argument. Larger version of patch has this change.
GetConnection has named the UserMapping argument as just "user", ideally it
should have been named user_mapping. But that seems to be too obvious to be
unintentional. So, I have left that change.
The patch has added userid and user mapping oid to a debug3 message in
GetConnection(). the message is displayed when a new connection to foreign
server is created. With only that change, if we run script multi_conn.sql
(attached) we see that it's creating two connections when same user mapping
is used. Also attached is the output of the same script run on my setup.
Since min_messages is set to DEBUG3 there are too many unrelated messages.
So, search for "new postgres_fdw connection .." for new connection messages.
I have included the changes to the DEBUG3 message in GetConnection(), since
it may be worth committing those changes. In case, reviewers/committers
disagree, those chagnes can be removed.
[1]: /messages/by-id/CAFjFpRf-LiD5bai4D6cSUseJh=xxJqipo_vN8mTnZG16TMWJ-w@mail.gmail.com
/messages/by-id/CAFjFpRf-LiD5bai4D6cSUseJh=xxJqipo_vN8mTnZG16TMWJ-w@mail.gmail.com
[2]: /messages/by-id/CA+TgmoYMMv_Du-VPpQ1d7UfSjaOPBQ+LgpxTChnuQfOBjg2phg@mail.gmail.com -- Best Wishes, Ashutosh Bapat EnterpriseDB Corporation The Postgres Database Company
/messages/by-id/CA+TgmoYMMv_Du-VPpQ1d7UfSjaOPBQ+LgpxTChnuQfOBjg2phg@mail.gmail.com
--
Best Wishes,
Ashutosh Bapat
EnterpriseDB Corporation
The Postgres Database Company
Attachments:
pg_fdw_concache.patch.largeapplication/octet-stream; name=pg_fdw_concache.patch.largeDownload
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 0c6c3ce..5d582f8 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -17,36 +17,36 @@
#include "access/xact.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "utils/hsearch.h"
#include "utils/memutils.h"
/*
* Connection cache hash table entry
*
- * The lookup key in this hash table is the foreign server OID plus the user
- * mapping OID. (We use just one connection per user per foreign server,
- * so that we can ensure all scans use the same snapshot during a query.)
+ * The lookup key in this hash table is the user mapping OID. We use just one
+ * connection per foreign user per foreign server, so that we can ensure that
+ * all the scans use the same snapshot during a query. Multiple local users may
+ * use public user mapping, thus connecting to foreign server as a single
+ * foreign user specified in the public user mapping. Having user mapping OID
+ * as the key avoids creating multiple connections for different local users,
+ * all using public user mapping.
*
* The "conn" pointer can be NULL if we don't currently have a live connection.
* When we do have a connection, xact_depth tracks the current depth of
* transactions and subtransactions open on the remote side. We need to issue
* commands at the same nesting depth on the remote as we're executing at
* ourselves, so that rolling back a subtransaction will kill the right
* queries and not the wrong ones.
*/
-typedef struct ConnCacheKey
-{
- Oid serverid; /* OID of foreign server */
- Oid userid; /* OID of local user whose mapping we use */
-} ConnCacheKey;
+typedef Oid ConnCacheKey;
typedef struct ConnCacheEntry
{
ConnCacheKey key; /* hash key (must be first) */
PGconn *conn; /* connection to foreign server, or NULL */
int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 =
* one level of subxact open, etc */
bool have_prep_stmt; /* have we prepared any stmts in this xact? */
bool have_error; /* have any subxacts aborted in this xact? */
} ConnCacheEntry;
@@ -87,22 +87,21 @@ static void pgfdw_subxact_callback(SubXactEvent event,
* (not even on error), we need this flag to cue manual cleanup.
*
* XXX Note that caching connections theoretically requires a mechanism to
* detect change of FDW objects to invalidate already established connections.
* We could manage that by watching for invalidation events on the relevant
* syscaches. For the moment, though, it's not clear that this would really
* be useful and not mere pedantry. We could not flush any active connections
* mid-transaction anyway.
*/
PGconn *
-GetConnection(ForeignServer *server, UserMapping *user,
- bool will_prep_stmt)
+GetConnection(UserMapping *user, bool will_prep_stmt)
{
bool found;
ConnCacheEntry *entry;
ConnCacheKey key;
/* First time through, initialize connection cache hashtable */
if (ConnectionHash == NULL)
{
HASHCTL ctl;
@@ -120,22 +119,21 @@ GetConnection(ForeignServer *server, UserMapping *user,
* This should be done just once in each backend.
*/
RegisterXactCallback(pgfdw_xact_callback, NULL);
RegisterSubXactCallback(pgfdw_subxact_callback, NULL);
}
/* Set flag that we did GetConnection during the current transaction */
xact_got_connection = true;
/* Create hash key for the entry. Assume no pad bytes in key struct */
- key.serverid = server->serverid;
- key.userid = user->userid;
+ key = user->umid;
/*
* Find or create cached entry for requested connection.
*/
entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
if (!found)
{
/* initialize new hashtable entry (key is already filled in) */
entry->conn = NULL;
entry->xact_depth = 0;
@@ -149,26 +147,29 @@ GetConnection(ForeignServer *server, UserMapping *user,
* connection is actually used.
*/
/*
* If cache entry doesn't have a connection, we have to establish a new
* connection. (If connect_pg_server throws an error, the cache entry
* will be left in a valid empty state.)
*/
if (entry->conn == NULL)
{
+ ForeignServer *server = GetForeignServer(user->serverid);
+
entry->xact_depth = 0; /* just to be sure */
entry->have_prep_stmt = false;
entry->have_error = false;
entry->conn = connect_pg_server(server, user);
- elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\"",
- entry->conn, server->servername);
+
+ elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\ (user mapping oid %d, userid %d)",
+ entry->conn, server->servername, user->umid, user->userid);
}
/*
* Start a new transaction or subtransaction if needed.
*/
begin_remote_xact(entry);
/* Remember if caller will prepare statements */
entry->have_prep_stmt |= will_prep_stmt;
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 374faf5..a237e15 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -1094,21 +1094,20 @@ postgresGetForeignPlan(PlannerInfo *root,
*/
static void
postgresBeginForeignScan(ForeignScanState *node, int eflags)
{
ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
EState *estate = node->ss.ps.state;
PgFdwScanState *fsstate;
RangeTblEntry *rte;
Oid userid;
ForeignTable *table;
- ForeignServer *server;
UserMapping *user;
int numParams;
int i;
ListCell *lc;
/*
* Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
*/
if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
return;
@@ -1122,28 +1121,27 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
/*
* Identify which user to do the remote access as. This should match what
* ExecCheckRTEPerms() does.
*/
rte = rt_fetch(fsplan->scan.scanrelid, estate->es_range_table);
userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
/* Get info about foreign table. */
fsstate->rel = node->ss.ss_currentRelation;
table = GetForeignTable(RelationGetRelid(fsstate->rel));
- server = GetForeignServer(table->serverid);
- user = GetUserMapping(userid, server->serverid);
+ user = GetUserMapping(userid, table->serverid);
/*
* Get connection to the foreign server. Connection manager will
* establish new connection if necessary.
*/
- fsstate->conn = GetConnection(server, user, false);
+ fsstate->conn = GetConnection(user, false);
/* Assign a unique ID for my cursor */
fsstate->cursor_number = GetCursorNumber(fsstate->conn);
fsstate->cursor_exists = false;
/* Get private info created by planner functions. */
fsstate->query = strVal(list_nth(fsplan->fdw_private,
FdwScanPrivateSelectSql));
fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
FdwScanPrivateRetrievedAttrs);
@@ -1496,21 +1494,20 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
int subplan_index,
int eflags)
{
PgFdwModifyState *fmstate;
EState *estate = mtstate->ps.state;
CmdType operation = mtstate->operation;
Relation rel = resultRelInfo->ri_RelationDesc;
RangeTblEntry *rte;
Oid userid;
ForeignTable *table;
- ForeignServer *server;
UserMapping *user;
AttrNumber n_params;
Oid typefnoid;
bool isvarlena;
ListCell *lc;
/*
* Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState
* stays NULL.
*/
@@ -1523,25 +1520,24 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
/*
* Identify which user to do the remote access as. This should match what
* ExecCheckRTEPerms() does.
*/
rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table);
userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
/* Get info about foreign table. */
table = GetForeignTable(RelationGetRelid(rel));
- server = GetForeignServer(table->serverid);
- user = GetUserMapping(userid, server->serverid);
+ user = GetUserMapping(userid, table->serverid);
/* Open connection; report that we'll create a prepared statement. */
- fmstate->conn = GetConnection(server, user, true);
+ fmstate->conn = GetConnection(user, true);
fmstate->p_name = NULL; /* prepared statement not made yet */
/* Deconstruct fdw_private data. */
fmstate->query = strVal(list_nth(fdw_private,
FdwModifyPrivateUpdateSql));
fmstate->target_attrs = (List *) list_nth(fdw_private,
FdwModifyPrivateTargetAttnums);
fmstate->has_returning = intVal(list_nth(fdw_private,
FdwModifyPrivateHasReturning));
fmstate->retrieved_attrs = (List *) list_nth(fdw_private,
@@ -1981,21 +1977,21 @@ estimate_path_cost_size(PlannerInfo *root,
appendWhereClause(&sql, root, baserel, fpinfo->remote_conds,
true, NULL);
if (remote_join_conds)
appendWhereClause(&sql, root, baserel, remote_join_conds,
(fpinfo->remote_conds == NIL), NULL);
if (pathkeys)
appendOrderByClause(&sql, root, baserel, pathkeys);
/* Get the remote estimate */
- conn = GetConnection(fpinfo->server, fpinfo->user, false);
+ conn = GetConnection(fpinfo->user, false);
get_remote_estimate(sql.data, conn, &rows, &width,
&startup_cost, &total_cost);
ReleaseConnection(conn);
retrieved_rows = rows;
/* Factor in the selectivity of the locally-checked quals */
local_sel = clauselist_selectivity(root,
local_join_conds,
baserel->relid,
@@ -2537,44 +2533,42 @@ store_returning_result(PgFdwModifyState *fmstate,
/*
* postgresAnalyzeForeignTable
* Test whether analyzing this foreign table is supported
*/
static bool
postgresAnalyzeForeignTable(Relation relation,
AcquireSampleRowsFunc *func,
BlockNumber *totalpages)
{
ForeignTable *table;
- ForeignServer *server;
UserMapping *user;
PGconn *conn;
StringInfoData sql;
PGresult *volatile res = NULL;
/* Return the row-analysis function pointer */
*func = postgresAcquireSampleRowsFunc;
/*
* Now we have to get the number of pages. It's annoying that the ANALYZE
* API requires us to return that now, because it forces some duplication
* of effort between this routine and postgresAcquireSampleRowsFunc. But
* it's probably not worth redefining that API at this point.
*/
/*
* Get the connection to use. We do the remote access as the table's
* owner, even if the ANALYZE was started by some other user.
*/
table = GetForeignTable(RelationGetRelid(relation));
- server = GetForeignServer(table->serverid);
- user = GetUserMapping(relation->rd_rel->relowner, server->serverid);
- conn = GetConnection(server, user, false);
+ user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
+ conn = GetConnection(user, false);
/*
* Construct command to get page count for relation.
*/
initStringInfo(&sql);
deparseAnalyzeSizeSql(&sql, relation);
/* In what follows, do not risk leaking any PGresults. */
PG_TRY();
{
@@ -2619,21 +2613,20 @@ postgresAnalyzeForeignTable(Relation relation,
* currently (the planner only pays attention to correlation for indexscans).
*/
static int
postgresAcquireSampleRowsFunc(Relation relation, int elevel,
HeapTuple *rows, int targrows,
double *totalrows,
double *totaldeadrows)
{
PgFdwAnalyzeState astate;
ForeignTable *table;
- ForeignServer *server;
UserMapping *user;
PGconn *conn;
unsigned int cursor_number;
StringInfoData sql;
PGresult *volatile res = NULL;
/* Initialize workspace state */
astate.rel = relation;
astate.attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(relation));
@@ -2650,23 +2643,22 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
"postgres_fdw temporary data",
ALLOCSET_SMALL_MINSIZE,
ALLOCSET_SMALL_INITSIZE,
ALLOCSET_SMALL_MAXSIZE);
/*
* Get the connection to use. We do the remote access as the table's
* owner, even if the ANALYZE was started by some other user.
*/
table = GetForeignTable(RelationGetRelid(relation));
- server = GetForeignServer(table->serverid);
- user = GetUserMapping(relation->rd_rel->relowner, server->serverid);
- conn = GetConnection(server, user, false);
+ user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
+ conn = GetConnection(user, false);
/*
* Construct cursor that retrieves whole rows from remote.
*/
cursor_number = GetCursorNumber(conn);
initStringInfo(&sql);
appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs);
/* In what follows, do not risk leaking any PGresults. */
@@ -2853,21 +2845,21 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
(errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
errmsg("invalid option \"%s\"", def->defname)));
}
/*
* Get connection to the foreign server. Connection manager will
* establish new connection if necessary.
*/
server = GetForeignServer(serverOid);
mapping = GetUserMapping(GetUserId(), server->serverid);
- conn = GetConnection(server, mapping, false);
+ conn = GetConnection(mapping, false);
/* Don't attempt to import collation if remote server hasn't got it */
if (PQserverVersion(conn) < 90100)
import_collate = false;
/* Create workspace for strings */
initStringInfo(&buf);
/* In what follows, do not risk leaking any PGresults. */
PG_TRY();
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 8553536..59e9f60 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -53,22 +53,21 @@ typedef struct PgFdwRelationInfo
ForeignTable *table;
ForeignServer *server;
UserMapping *user; /* only set in use_remote_estimate mode */
} PgFdwRelationInfo;
/* in postgres_fdw.c */
extern int set_transmission_modes(void);
extern void reset_transmission_modes(int nestlevel);
/* in connection.c */
-extern PGconn *GetConnection(ForeignServer *server, UserMapping *user,
- bool will_prep_stmt);
+extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt);
extern void ReleaseConnection(PGconn *conn);
extern unsigned int GetCursorNumber(PGconn *conn);
extern unsigned int GetPrepStmtNumber(PGconn *conn);
extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
bool clear, const char *sql);
/* in option.c */
extern int ExtractConnectionOptions(List *defelems,
const char **keywords,
const char **values);
diff --git a/src/backend/foreign/foreign.c b/src/backend/foreign/foreign.c
index 14e082b..73c40af 100644
--- a/src/backend/foreign/foreign.c
+++ b/src/backend/foreign/foreign.c
@@ -188,20 +188,21 @@ GetUserMapping(Oid userid, Oid serverid)
if (!HeapTupleIsValid(tp))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("user mapping not found for \"%s\"",
MappingUserName(userid))));
um = (UserMapping *) palloc(sizeof(UserMapping));
um->userid = userid;
um->serverid = serverid;
+ um->umid = HeapTupleGetOid(tp);
/* Extract the umoptions */
datum = SysCacheGetAttr(USERMAPPINGUSERSERVER,
tp,
Anum_pg_user_mapping_umoptions,
&isnull);
if (isnull)
um->options = NIL;
else
um->options = untransformRelOptions(datum);
diff --git a/src/include/foreign/foreign.h b/src/include/foreign/foreign.h
index 2c1ada1..7767913 100644
--- a/src/include/foreign/foreign.h
+++ b/src/include/foreign/foreign.h
@@ -50,20 +50,21 @@ typedef struct ForeignServer
char *servername; /* name of the server */
char *servertype; /* server type, optional */
char *serverversion; /* server version, optional */
List *options; /* srvoptions as DefElem list */
} ForeignServer;
typedef struct UserMapping
{
Oid userid; /* local user Oid */
Oid serverid; /* server Oid */
+ Oid umid; /* Oid of user mapping */
List *options; /* useoptions as DefElem list */
} UserMapping;
typedef struct ForeignTable
{
Oid relid; /* relation Oid */
Oid serverid; /* server Oid */
List *options; /* ftoptions as DefElem list */
} ForeignTable;
pg_fdw_concache.patch.shortapplication/octet-stream; name=pg_fdw_concache.patch.shortDownload
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 0c6c3ce..f045f2b 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -24,9 +24,13 @@
/*
* Connection cache hash table entry
*
- * The lookup key in this hash table is the foreign server OID plus the user
- * mapping OID. (We use just one connection per user per foreign server,
- * so that we can ensure all scans use the same snapshot during a query.)
+ * The lookup key in this hash table is the user mapping OID. We use just one
+ * connection per foreign user per foreign server, so that we can ensure that
+ * all the scans use the same snapshot during a query. Multiple local users may
+ * use public user mapping, thus connecting to foreign server as a single
+ * foreign user specified in the public user mapping. Having user mapping OID
+ * as the key avoids creating multiple connections for different local users,
+ * all using public user mapping.
*
* The "conn" pointer can be NULL if we don't currently have a live connection.
* When we do have a connection, xact_depth tracks the current depth of
@@ -35,11 +39,7 @@
* ourselves, so that rolling back a subtransaction will kill the right
* queries and not the wrong ones.
*/
-typedef struct ConnCacheKey
-{
- Oid serverid; /* OID of foreign server */
- Oid userid; /* OID of local user whose mapping we use */
-} ConnCacheKey;
+typedef Oid ConnCacheKey;
typedef struct ConnCacheEntry
{
@@ -127,8 +127,7 @@ GetConnection(ForeignServer *server, UserMapping *user,
xact_got_connection = true;
/* Create hash key for the entry. Assume no pad bytes in key struct */
- key.serverid = server->serverid;
- key.userid = user->userid;
+ key = user->umid;
/*
* Find or create cached entry for requested connection.
@@ -160,8 +159,9 @@ GetConnection(ForeignServer *server, UserMapping *user,
entry->have_prep_stmt = false;
entry->have_error = false;
entry->conn = connect_pg_server(server, user);
- elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\"",
- entry->conn, server->servername);
+
+ elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\ (user mapping oid %d, userid %d)",
+ entry->conn, server->servername, user->umid, user->userid);
}
/*
diff --git a/src/backend/foreign/foreign.c b/src/backend/foreign/foreign.c
index 14e082b..73c40af 100644
--- a/src/backend/foreign/foreign.c
+++ b/src/backend/foreign/foreign.c
@@ -195,6 +195,7 @@ GetUserMapping(Oid userid, Oid serverid)
um = (UserMapping *) palloc(sizeof(UserMapping));
um->userid = userid;
um->serverid = serverid;
+ um->umid = HeapTupleGetOid(tp);
/* Extract the umoptions */
datum = SysCacheGetAttr(USERMAPPINGUSERSERVER,
diff --git a/src/include/foreign/foreign.h b/src/include/foreign/foreign.h
index 2c1ada1..7767913 100644
--- a/src/include/foreign/foreign.h
+++ b/src/include/foreign/foreign.h
@@ -57,6 +57,7 @@ typedef struct UserMapping
{
Oid userid; /* local user Oid */
Oid serverid; /* server Oid */
+ Oid umid; /* Oid of user mapping */
List *options; /* useoptions as DefElem list */
} UserMapping;
On Wed, Jan 27, 2016 at 6:32 AM, Ashutosh Bapat
<ashutosh.bapat@enterprisedb.com> wrote:
As discussed in postgres_fdw join pushdown thread [1], for two different
effective local users which use public user mapping we will be creating two
different connections to the foreign server with the same credentials.Robert suggested [2] that we should use user mapping OID as the connection
cache key instead of current userid and serverid.There are two patches attached here:
1. pg_fdw_concache.patch.short - shorter version of the fix. Right now
ForeignTable, ForeignServer have corresponding OIDs saved in these
structures. But UserMapping doesn't. Patch adds user mapping OID as a member
to this structure. This member is then used as key in GetConnection().
2. pg_fdw_concache.patch.large - most of the callers of GetConnection() get
ForeignServer object just to pass it to GetConnection(). GetConnection can
obtain ForeignServer by using serverid from UserMapping and doesn't need
ForeignServer to be an argument. Larger version of patch has this change.
The long form seems like a good idea, so I committed that one.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers