diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 2326f391d3..874cc26b9f 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -27,9 +27,11 @@ #include "pgstat.h" #include "postgres_fdw.h" #include "storage/latch.h" +#include "tcop/tcopprot.h" #include "utils/builtins.h" #include "utils/hsearch.h" #include "utils/inval.h" +#include "utils/portal.h" #include "utils/syscache.h" /* @@ -71,6 +73,9 @@ typedef struct ConnCacheEntry PgFdwConnState state; /* extra per-connection state */ } ConnCacheEntry; +#define PgFdwConnStateContainer(ptr) \ + (ConnCacheEntry *) ((char *) (ptr) - offsetof(ConnCacheEntry, state)) + /* * Connection cache (initialized on first use) */ @@ -773,6 +778,15 @@ begin_remote_xact(ConnCacheEntry *entry) do_sql_command(entry->conn, sql); entry->xact_depth = 1; entry->changing_xact_state = false; + + if (!IsolationUsesXactSnapshot() && + PQserverVersion(entry->conn) >= 180000) + { + entry->state.rcIsEmulated = true; + entry->state.haveFirstQuery = false; + entry->state.lastIterId = 0; + entry->state.lastIterSubId = 0; + } } /* @@ -848,13 +862,34 @@ GetPrepStmtNumber(PGconn *conn) PGresult * pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state) { + PGresult *res = NULL; + /* First, process a pending asynchronous request, if any. */ if (state && state->pendingAreq) process_pending_request(state->pendingAreq); + /* + * Next, if emulating READ COMMITTED behavior, refresh the snapshot for + * the remote transaction if allowed. + */ + if (state && state->rcIsEmulated && snapshot_refresh_ok(state)) + do_snapshot_refresh(state); + if (!PQsendQuery(conn, query)) return NULL; - return pgfdw_get_result(conn); + res = pgfdw_get_result(conn); + + /* + * If emulating READ COMMITTED behavior, check to see if the query has + * been executed successfully, and if so, update information related to + * that mode. + */ + if (state && state->rcIsEmulated && + (PQresultStatus(res) == PGRES_COMMAND_OK || + PQresultStatus(res) == PGRES_TUPLES_OK)) + update_emulated_rc_mode_info(state); + + return res; } /* @@ -1282,6 +1317,14 @@ pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel) /* Reset state to show we're out of a transaction */ entry->xact_depth = 0; + if (entry->state.rcIsEmulated) + { + entry->state.rcIsEmulated = false; + entry->state.haveFirstQuery = false; + entry->state.lastIterId = 0; + entry->state.lastIterSubId = 0; + } + /* * If the connection isn't in a good idle state, it is marked as * invalid or keep_connections option of its server is disabled, then @@ -1652,7 +1695,7 @@ pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel) * fetch_more_data(); in that case reset the per-connection state here. */ if (entry->state.pendingAreq) - memset(&entry->state, 0, sizeof(entry->state)); + entry->state.pendingAreq = NULL; /* Disarm changing_xact_state if it all worked */ entry->changing_xact_state = false; @@ -1938,7 +1981,7 @@ pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested, /* Reset the per-connection state if needed */ if (entry->state.pendingAreq) - memset(&entry->state, 0, sizeof(entry->state)); + entry->state.pendingAreq = NULL; /* We're done with this entry; unset the changing_xact_state flag */ entry->changing_xact_state = false; @@ -1983,7 +2026,7 @@ pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested, /* Reset the per-connection state if needed */ if (entry->state.pendingAreq) - memset(&entry->state, 0, sizeof(entry->state)); + entry->state.pendingAreq = NULL; /* We're done with this entry; unset the changing_xact_state flag */ entry->changing_xact_state = false; @@ -1991,6 +2034,95 @@ pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested, } } +/* + * Check if it is safe to refresh the snapshot for the remote transaction. + */ +bool +snapshot_refresh_ok(PgFdwConnState *state) +{ + uint64 lastIterId = state->lastIterId; + uint64 lastIterSubId = state->lastIterSubId; + + Assert(state->rcIsEmulated); + Assert(PostgresMainLoopIterationId >= lastIterId); + Assert(PostgresMainLoopIterationSubId >= lastIterSubId); + + /* + * If we haven't executed any query in the remote transaction, there's no + * need to refresh the snapshot as the transaction will take a fresh + * snapshot when executing the first query. + */ + if (!state->haveFirstQuery) + return false; + + /* + * If we have already executed any query from within the current top-level + * query in the remote transaction, the transaction should reuse the + * snapthot when executing the query we are about to send to the remote; + * don't refresh the snapshot. + */ + if (PostgresMainLoopIterationId == lastIterId && + PostgresMainLoopIterationSubId == lastIterSubId) + return false; + Assert(PostgresMainLoopIterationId > lastIterId || + PostgresMainLoopIterationSubId > lastIterSubId); + + /* + * If there is any live portal that was created in a previous top-level + * query, the portal's query might access foreign tables by re-creating + * the remote cursor, which should use the same snapshot as before; don't + * refresh the snapshot. + */ + if (!ThereAreNoOldLivePortals()) + return false; + + /* Otherwise, it is safe to refresh the snapshot. */ + return true; +} + +/* + * Refresh the snapshot for the remote transaction. + */ +void +do_snapshot_refresh(PgFdwConnState *state) +{ + ConnCacheEntry *entry = PgFdwConnStateContainer(state); + PGconn *conn = entry->conn; + const char *sql = "SELECT pg_catalog.pg_refresh_snapshot()"; + PGresult *res = NULL; + + /* In what follows, do not risk leaking any PGresults. */ + PG_TRY(); + { + if (!PQsendQuery(conn, sql)) + pgfdw_report_error(ERROR, NULL, conn, false, sql); + + res = pgfdw_get_result(conn); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, conn, false, sql); + + if (PQntuples(res) != 1 || PQnfields(res) != 1) + elog(ERROR, "unexpected result from snapshot refresh query"); + } + PG_FINALLY(); + { + PQclear(res); + } + PG_END_TRY(); +} + +/* + * Update information related to emulated READ COMMITTED mode. + */ +void +update_emulated_rc_mode_info(PgFdwConnState *state) +{ + if (!state->haveFirstQuery) + state->haveFirstQuery = true; + state->lastIterId = PostgresMainLoopIterationId; + state->lastIterSubId = PostgresMainLoopIterationSubId; +} + /* Number of output arguments (columns) for various API versions */ #define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_1 2 #define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_2 5 diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index c0810fbd7c..2cbffd9b3b 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -431,6 +431,7 @@ static void estimate_path_cost_size(PlannerInfo *root, Cost *p_startup_cost, Cost *p_total_cost); static void get_remote_estimate(const char *sql, PGconn *conn, + PgFdwConnState *conn_state, double *rows, int *width, Cost *startup_cost, @@ -2987,9 +2988,11 @@ postgresExecForeignTruncate(List *rels, Oid serverid = InvalidOid; UserMapping *user = NULL; PGconn *conn = NULL; + PgFdwConnState *conn_state; StringInfoData sql; ListCell *lc; bool server_truncatable = true; + PGresult *res = NULL; /* * By default, all postgres_fdw foreign tables are assumed truncatable. @@ -3059,14 +3062,22 @@ postgresExecForeignTruncate(List *rels, * establish new connection if necessary. */ user = GetUserMapping(GetUserId(), serverid); - conn = GetConnection(user, false, NULL); + conn = GetConnection(user, false, &conn_state); /* Construct the TRUNCATE command string */ initStringInfo(&sql); deparseTruncateSql(&sql, rels, behavior, restart_seqs); - /* Issue the TRUNCATE command to remote server */ - do_sql_command(conn, sql.data); + /* + * Issue the TRUNCATE command to remote server + * + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + res = pgfdw_exec_query(conn, sql.data, conn_state); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(ERROR, res, conn, true, sql.data); + PQclear(res); pfree(sql.data); } @@ -3119,6 +3130,7 @@ estimate_path_cost_size(PlannerInfo *root, List *local_param_join_conds; StringInfoData sql; PGconn *conn; + PgFdwConnState *conn_state; Selectivity local_sel; QualCost local_cost; List *fdw_scan_tlist = NIL; @@ -3162,8 +3174,8 @@ estimate_path_cost_size(PlannerInfo *root, false, &retrieved_attrs, NULL); /* Get the remote estimate */ - conn = GetConnection(fpinfo->user, false, NULL); - get_remote_estimate(sql.data, conn, &rows, &width, + conn = GetConnection(fpinfo->user, false, &conn_state); + get_remote_estimate(sql.data, conn, conn_state, &rows, &width, &startup_cost, &total_cost); ReleaseConnection(conn); @@ -3594,7 +3606,7 @@ estimate_path_cost_size(PlannerInfo *root, * The given "sql" must be an EXPLAIN command. */ static void -get_remote_estimate(const char *sql, PGconn *conn, +get_remote_estimate(const char *sql, PGconn *conn, PgFdwConnState *conn_state, double *rows, int *width, Cost *startup_cost, Cost *total_cost) { @@ -3610,7 +3622,7 @@ get_remote_estimate(const char *sql, PGconn *conn, /* * Execute EXPLAIN remotely. */ - res = pgfdw_exec_query(conn, sql, NULL); + res = pgfdw_exec_query(conn, sql, conn_state); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, sql); @@ -3739,6 +3751,14 @@ create_cursor(ForeignScanState *node) if (fsstate->conn_state->pendingAreq) process_pending_request(fsstate->conn_state->pendingAreq); + /* + * Next, if emulating READ COMMITTED behavior, refresh the snapshot for + * the remote transaction if allowed. + */ + if (fsstate->conn_state->rcIsEmulated && + snapshot_refresh_ok(fsstate->conn_state)) + do_snapshot_refresh(fsstate->conn_state); + /* * Construct array of query parameter values in text format. We do the * conversions in the short-lived per-tuple context, so as not to cause a @@ -3795,6 +3815,13 @@ create_cursor(ForeignScanState *node) /* Clean up */ pfree(buf.data); + + /* + * If emulating READ COMMITTED behavior, update information related to + * that mode. + */ + if (fsstate->conn_state->rcIsEmulated) + update_emulated_rc_mode_info(fsstate->conn_state); } /* @@ -4224,6 +4251,14 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) * if any, so no need to do it here. */ + /* + * Next, if emulating READ COMMITTED behavior, refresh the snapshot for + * the remote transaction if allowed. + */ + if (fmstate->conn_state->rcIsEmulated && + snapshot_refresh_ok(fmstate->conn_state)) + do_snapshot_refresh(fmstate->conn_state); + /* Construct name we'll use for the prepared statement. */ snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u", GetPrepStmtNumber(fmstate->conn)); @@ -4256,6 +4291,13 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) /* This action shows that the prepare has been done. */ fmstate->p_name = p_name; + + /* + * If emulating READ COMMITTED behavior, update information related to + * that mode. + */ + if (fmstate->conn_state->rcIsEmulated) + update_emulated_rc_mode_info(fmstate->conn_state); } /* @@ -4565,6 +4607,14 @@ execute_dml_stmt(ForeignScanState *node) if (dmstate->conn_state->pendingAreq) process_pending_request(dmstate->conn_state->pendingAreq); + /* + * Next, if emulating READ COMMITTED behavior, refresh the snapshot for + * the remote transaction if allowed. + */ + if (dmstate->conn_state->rcIsEmulated && + snapshot_refresh_ok(dmstate->conn_state)) + do_snapshot_refresh(dmstate->conn_state); + /* * Construct array of query parameter values in text format. */ @@ -4602,6 +4652,13 @@ execute_dml_stmt(ForeignScanState *node) dmstate->num_tuples = PQntuples(dmstate->result); else dmstate->num_tuples = atoi(PQcmdTuples(dmstate->result)); + + /* + * If emulating READ COMMITTED behavior, update information related to + * that mode. + */ + if (dmstate->conn_state->rcIsEmulated) + update_emulated_rc_mode_info(dmstate->conn_state); } /* @@ -4946,6 +5003,7 @@ postgresAnalyzeForeignTable(Relation relation, ForeignTable *table; UserMapping *user; PGconn *conn; + PgFdwConnState *conn_state; StringInfoData sql; PGresult *volatile res = NULL; @@ -4965,7 +5023,7 @@ postgresAnalyzeForeignTable(Relation relation, */ table = GetForeignTable(RelationGetRelid(relation)); user = GetUserMapping(relation->rd_rel->relowner, table->serverid); - conn = GetConnection(user, false, NULL); + conn = GetConnection(user, false, &conn_state); /* * Construct command to get page count for relation. @@ -4976,7 +5034,7 @@ postgresAnalyzeForeignTable(Relation relation, /* In what follows, do not risk leaking any PGresults. */ PG_TRY(); { - res = pgfdw_exec_query(conn, sql.data, NULL); + res = pgfdw_exec_query(conn, sql.data, conn_state); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, sql.data); @@ -5462,6 +5520,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) ForeignServer *server; UserMapping *mapping; PGconn *conn; + PgFdwConnState *conn_state; StringInfoData buf; PGresult *volatile res = NULL; int numrows, @@ -5493,7 +5552,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) */ server = GetForeignServer(serverOid); mapping = GetUserMapping(GetUserId(), server->serverid); - conn = GetConnection(mapping, false, NULL); + conn = GetConnection(mapping, false, &conn_state); /* Don't attempt to import collation if remote server hasn't got it */ if (PQserverVersion(conn) < 90100) @@ -5509,7 +5568,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = "); deparseStringLiteral(&buf, stmt->remote_schema); - res = pgfdw_exec_query(conn, buf.data, NULL); + res = pgfdw_exec_query(conn, buf.data, conn_state); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, buf.data); diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index 9e501660d1..d7fa74559e 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -137,6 +137,17 @@ typedef struct PgFdwRelationInfo typedef struct PgFdwConnState { AsyncRequest *pendingAreq; /* pending async request */ + + bool rcIsEmulated; /* do we emulate READ COMMITTED behavior in + * this xact? */ + bool haveFirstQuery; /* have we executed first query in this + * xact? */ + uint64 lastIterId; /* id of PostgresMain loop iteration during + * which we have executed last query in this + * xact */ + uint64 lastIterSubId; /* subid of PostgresMain loop iteration during + * which we have executed last query in this + * xact */ } PgFdwConnState; /* @@ -168,6 +179,9 @@ extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state); extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql); +extern bool snapshot_refresh_ok(PgFdwConnState *state); +extern void do_snapshot_refresh(PgFdwConnState *state); +extern void update_emulated_rc_mode_info(PgFdwConnState *state); /* in option.c */ extern int ExtractConnectionOptions(List *defelems, diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 42af768045..926064b0ff 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -103,6 +103,14 @@ int PostAuthDelay = 0; /* Time between checks that the client is still connected. */ int client_connection_check_interval = 0; +/* + * IDs for iterations of the PostgresMain loop, for use by extensions like + * postgres_fdw to know which top-level query is currently being processed, + * for example. + */ +uint64 PostgresMainLoopIterationId = 0; +uint64 PostgresMainLoopIterationSubId = 0; + /* flags for non-system relation kinds to restrict use */ int restrict_nonsystem_relation_kind; @@ -1117,6 +1125,8 @@ exec_simple_query(const char *query_string) const char *cmdtagname; size_t cmdtaglen; + PostgresMainLoopIterationSubId += 1; + pgstat_report_query_id(0, true); /* @@ -4606,6 +4616,9 @@ PostgresMain(const char *dbname, const char *username) int firstchar; StringInfoData input_message; + PostgresMainLoopIterationId += 1; + PostgresMainLoopIterationSubId = 0; + /* * At top of loop, reset extended-query-message flag, so that any * errors encountered in "idle" state don't provoke skip. diff --git a/src/backend/utils/mmgr/portalmem.c b/src/backend/utils/mmgr/portalmem.c index 93137820ac..2485147d82 100644 --- a/src/backend/utils/mmgr/portalmem.c +++ b/src/backend/utils/mmgr/portalmem.c @@ -23,6 +23,7 @@ #include "funcapi.h" #include "miscadmin.h" #include "storage/ipc.h" +#include "tcop/tcopprot.h" #include "utils/builtins.h" #include "utils/memutils.h" #include "utils/snapmgr.h" @@ -217,6 +218,8 @@ CreatePortal(const char *name, bool allowDup, bool dupSilent) portal->atEnd = true; /* disallow fetches until query is set */ portal->visible = true; portal->creation_time = GetCurrentStatementStartTimestamp(); + portal->createIterId = PostgresMainLoopIterationId; + portal->createIterSubId = PostgresMainLoopIterationSubId; /* put portal in table (sets portal->name) */ PortalHashTableInsert(portal, name); @@ -1186,6 +1189,31 @@ ThereAreNoReadyPortals(void) return true; } +bool +ThereAreNoOldLivePortals(void) +{ + HASH_SEQ_STATUS status; + PortalHashEnt *hentry; + + hash_seq_init(&status, PortalHashTable); + + while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL) + { + Portal portal = hentry->portal; + + if ((portal->status == PORTAL_READY || + portal->status == PORTAL_ACTIVE) && + (portal->createIterId < PostgresMainLoopIterationId || + portal->createIterSubId < PostgresMainLoopIterationSubId)) + { + hash_seq_term(&status); + return false; + } + } + + return true; +} + /* * Hold all pinned portals. * diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c index 7d2b34d4f2..1b000cce92 100644 --- a/src/backend/utils/time/snapmgr.c +++ b/src/backend/utils/time/snapmgr.c @@ -159,6 +159,7 @@ static Snapshot CopySnapshot(Snapshot snapshot); static void UnregisterSnapshotNoOwner(Snapshot snapshot); static void FreeSnapshot(Snapshot snapshot); static void SnapshotResetXmin(void); +static Snapshot RefreshTransactionSnapshot(void); /* ResourceOwner callbacks to track snapshot references */ static void ResOwnerReleaseSnapshot(Datum res); @@ -1842,6 +1843,70 @@ RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc) SetTransactionSnapshot(snapshot, NULL, InvalidPid, source_pgproc); } +/* + * RefreshTransactionSnapshot + * Refresh the snapshot for a REPEATABLE READ transaction. + */ +static Snapshot +RefreshTransactionSnapshot(void) +{ + bool resetXmin = false; + + Assert(FirstSnapshotSet); + Assert(FirstXactSnapshot->regd_count > 0); + Assert(!pairingheap_is_empty(&RegisteredSnapshots)); + + /* + * Decrement the reference count of the current transaction snapshot, and + * free the snapshot if no more references remain. + */ + FirstXactSnapshot->regd_count--; + if (FirstXactSnapshot->regd_count == 0) + pairingheap_remove(&RegisteredSnapshots, + &FirstXactSnapshot->ph_node); + + if (FirstXactSnapshot->regd_count == 0 && + FirstXactSnapshot->active_count == 0) + { + FreeSnapshot(FirstXactSnapshot); + resetXmin = true; + } + + /* Don't allow catalog snapshot to be older than xact snapshot. */ + InvalidateCatalogSnapshot(); + + /* First, create the snapshot in CurrentSnapshotData */ + CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData); + + /* Make a saved copy */ + CurrentSnapshot = CopySnapshot(CurrentSnapshot); + FirstXactSnapshot = CurrentSnapshot; + /* Mark it as "registered" in FirstXactSnapshot */ + FirstXactSnapshot->regd_count++; + pairingheap_add(&RegisteredSnapshots, &FirstXactSnapshot->ph_node); + + if (resetXmin) + SnapshotResetXmin(); + + return CurrentSnapshot; +} + +/* + * pg_refresh_snapshot + * SQL-callable wrapper for RefreshTransactionSnapshot. + */ +Datum +pg_refresh_snapshot(PG_FUNCTION_ARGS) +{ + if (XactIsoLevel != XACT_REPEATABLE_READ) + ereport(ERROR, + (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("pg_refresh_snapshot must be called in a repeatable-read transaction"))); + + RefreshTransactionSnapshot(); + PG_RETURN_VOID(); +} + /* * XidInMVCCSnapshot * Is the given XID still-in-progress according to the snapshot? diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 9575524007..3d770c8991 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -6643,6 +6643,10 @@ proname => 'pg_export_snapshot', provolatile => 'v', proparallel => 'u', prorettype => 'text', proargtypes => '', prosrc => 'pg_export_snapshot' }, +{ oid => '8000', descr => 'refresh transaction snapshot', + proname => 'pg_refresh_snapshot', provolatile => 'v', proparallel => 'u', + prorettype => 'void', proargtypes => '', prosrc => 'pg_refresh_snapshot' }, + { oid => '3810', descr => 'true if server is in recovery', proname => 'pg_is_in_recovery', provolatile => 'v', prorettype => 'bool', proargtypes => '', prosrc => 'pg_is_in_recovery' }, diff --git a/src/include/tcop/tcopprot.h b/src/include/tcop/tcopprot.h index 0c36d92742..38eaedde6d 100644 --- a/src/include/tcop/tcopprot.h +++ b/src/include/tcop/tcopprot.h @@ -29,6 +29,8 @@ extern PGDLLIMPORT const char *debug_query_string; extern PGDLLIMPORT int max_stack_depth; extern PGDLLIMPORT int PostAuthDelay; extern PGDLLIMPORT int client_connection_check_interval; +extern PGDLLIMPORT uint64 PostgresMainLoopIterationId; +extern PGDLLIMPORT uint64 PostgresMainLoopIterationSubId; /* GUC-configurable parameters */ diff --git a/src/include/utils/portal.h b/src/include/utils/portal.h index 29f49829f2..226828b20e 100644 --- a/src/include/utils/portal.h +++ b/src/include/utils/portal.h @@ -203,6 +203,13 @@ typedef struct PortalData /* Presentation data, primarily used by the pg_cursors system view */ TimestampTz creation_time; /* time at which this portal was defined */ bool visible; /* include this portal in pg_cursors? */ + + /* + * State data for remembering which iteration of the PostgresMain loop the + * portal was created at, for use by extensions like postgres_fdw. + */ + uint64 createIterId; + uint64 createIterSubId; } PortalData; /* @@ -246,6 +253,7 @@ extern PlannedStmt *PortalGetPrimaryStmt(Portal portal); extern void PortalCreateHoldStore(Portal portal); extern void PortalHashTableDeleteAll(void); extern bool ThereAreNoReadyPortals(void); +extern bool ThereAreNoOldLivePortals(void); extern void HoldPinnedPortals(void); extern void ForgetPortalSnapshots(void);