postgres_fdw: Provide better emulation of READ COMMITTED behavior

Started by Etsuro Fujitaabout 1 year ago7 messages
#1Etsuro Fujita
etsuro.fujita@gmail.com
1 attachment(s)

Hi,

postgres_fdw uses REPEATABLE READ isolation level for the remote
transaction when the local transaction has READ COMMITTED isolation
level, for the reason described in the comments for
begin_remote_xact() in connection.c:

/*
* Start remote transaction or subtransaction, if needed.
*
* Note that we always use at least REPEATABLE READ in the remote session.
* This is so that, if a query initiates multiple scans of the same or
* different foreign tables, we will get snapshot-consistent results from
* those scans. A disadvantage is that we can't provide sane emulation of
* READ COMMITTED behavior --- it would be nice if we had some other way to
* control which remote queries share a snapshot.
*/

But as mentioned above, this causes unexpected behavior like this:

S1: CREATE TABLE t1 (c1 TEXT);
S1: CREATE FOREIGN TABLE ft1 (c1 TEXT) SERVER loopback OPTIONS
(table_name 't1');
S1: INSERT INTO ft1 VALUES ('foo');
S2: START TRANSACTION ISOLATION LEVEL READ COMMITTED;
S2: SELECT * FROM ft1;
c1
-----
foo
(1 row)

Looks good, but:

S1: INSERT INTO ft1 VALUES ('bar');
S2: SELECT * FROM ft1;
c1
-----
foo
(1 row)

The SELECT query would be expected to retrieve not only the row
(‘foo’) but the row (‘bar’) inserted by session S1 just before, but
retrieves only the row (‘foo’).

I would like to propose a simple solution for this issue. The idea
for the solution is that while postgres_fdw uses REPEATABLE READ for
the remote transaction as before, it refreshes the snapshot for the
transaction so that the same snapshot is shared by remote queries from
within a top-level query (or a set of queries expanded from it by
rewrite rules) received via simple/extended query protocol. To do
this, changes I would like to make to the core and postgres_fdw code
is:

* Add IDs for iterations of the PostgresMain() loop for extensions
like postgres_fdw to know which top-level query is currently being
processed.
* Add a function pg_refresh_snapshot() to refresh the snapshot for a
REPEATABLE READ transaction. (This function would cause phantom read,
which is not allowed to occur at REPEATABLE READ isolation level in
Postgres, but is allowed by the SQL standard.)
* Modify postgres_fdw so that when encountering a new top-level query,
it refreshes the snapshot for the remote transaction by using
pg_refresh_snapshot() before sending the first query from within the
top-level query if it is safe to do so.

Attached is a WIP patch for that. With the patch we have:

S1: INSERT INTO ft1 VALUES ('foo');
S2: START TRANSACTION ISOLATION LEVEL READ COMMITTED;
S2: SELECT * FROM ft1;
c1
-----
foo
(1 row)

S1: INSERT INTO ft1 VALUES ('bar');
S2: SELECT * FROM ft1;
c1
-----
foo
bar
(2 rows)

The SELECT query retrieves both rows!

Comments welcome! Maybe I am missing something, though.

Best regards,
Etsuro Fujita

Attachments:

postgres_fdw-emulate-RC-behavior-WIP.patchapplication/octet-stream; name=postgres_fdw-emulate-RC-behavior-WIP.patchDownload
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 2326f391d3..874cc26b9f 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -27,9 +27,11 @@
 #include "pgstat.h"
 #include "postgres_fdw.h"
 #include "storage/latch.h"
+#include "tcop/tcopprot.h"
 #include "utils/builtins.h"
 #include "utils/hsearch.h"
 #include "utils/inval.h"
+#include "utils/portal.h"
 #include "utils/syscache.h"
 
 /*
@@ -71,6 +73,9 @@ typedef struct ConnCacheEntry
 	PgFdwConnState state;		/* extra per-connection state */
 } ConnCacheEntry;
 
+#define PgFdwConnStateContainer(ptr) \
+	(ConnCacheEntry *) ((char *) (ptr) - offsetof(ConnCacheEntry, state))
+
 /*
  * Connection cache (initialized on first use)
  */
@@ -773,6 +778,15 @@ begin_remote_xact(ConnCacheEntry *entry)
 		do_sql_command(entry->conn, sql);
 		entry->xact_depth = 1;
 		entry->changing_xact_state = false;
+
+		if (!IsolationUsesXactSnapshot() &&
+			PQserverVersion(entry->conn) >= 180000)
+		{
+			entry->state.rcIsEmulated = true;
+			entry->state.haveFirstQuery = false;
+			entry->state.lastIterId = 0;
+			entry->state.lastIterSubId = 0;
+		}
 	}
 
 	/*
@@ -848,13 +862,34 @@ GetPrepStmtNumber(PGconn *conn)
 PGresult *
 pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
 {
+	PGresult   *res = NULL;
+
 	/* First, process a pending asynchronous request, if any. */
 	if (state && state->pendingAreq)
 		process_pending_request(state->pendingAreq);
 
+	/*
+	 * Next, if emulating READ COMMITTED behavior, refresh the snapshot for
+	 * the remote transaction if allowed.
+	 */
+	if (state && state->rcIsEmulated && snapshot_refresh_ok(state))
+		do_snapshot_refresh(state);
+
 	if (!PQsendQuery(conn, query))
 		return NULL;
-	return pgfdw_get_result(conn);
+	res = pgfdw_get_result(conn);
+
+	/*
+	 * If emulating READ COMMITTED behavior, check to see if the query has
+	 * been executed successfully, and if so, update information related to
+	 * that mode.
+	 */
+	if (state && state->rcIsEmulated &&
+		(PQresultStatus(res) == PGRES_COMMAND_OK ||
+		 PQresultStatus(res) == PGRES_TUPLES_OK))
+		update_emulated_rc_mode_info(state);
+
+	return res;
 }
 
 /*
@@ -1282,6 +1317,14 @@ pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
 		/* Reset state to show we're out of a transaction */
 		entry->xact_depth = 0;
 
+		if (entry->state.rcIsEmulated)
+		{
+			entry->state.rcIsEmulated = false;
+			entry->state.haveFirstQuery = false;
+			entry->state.lastIterId = 0;
+			entry->state.lastIterSubId = 0;
+		}
+
 		/*
 		 * If the connection isn't in a good idle state, it is marked as
 		 * invalid or keep_connections option of its server is disabled, then
@@ -1652,7 +1695,7 @@ pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
 	 * fetch_more_data(); in that case reset the per-connection state here.
 	 */
 	if (entry->state.pendingAreq)
-		memset(&entry->state, 0, sizeof(entry->state));
+		entry->state.pendingAreq = NULL;
 
 	/* Disarm changing_xact_state if it all worked */
 	entry->changing_xact_state = false;
@@ -1938,7 +1981,7 @@ pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested,
 
 		/* Reset the per-connection state if needed */
 		if (entry->state.pendingAreq)
-			memset(&entry->state, 0, sizeof(entry->state));
+			entry->state.pendingAreq = NULL;
 
 		/* We're done with this entry; unset the changing_xact_state flag */
 		entry->changing_xact_state = false;
@@ -1983,7 +2026,7 @@ pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested,
 
 		/* Reset the per-connection state if needed */
 		if (entry->state.pendingAreq)
-			memset(&entry->state, 0, sizeof(entry->state));
+			entry->state.pendingAreq = NULL;
 
 		/* We're done with this entry; unset the changing_xact_state flag */
 		entry->changing_xact_state = false;
@@ -1991,6 +2034,95 @@ pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested,
 	}
 }
 
+/*
+ * Check if it is safe to refresh the snapshot for the remote transaction.
+ */
+bool
+snapshot_refresh_ok(PgFdwConnState *state)
+{
+	uint64		lastIterId = state->lastIterId;
+	uint64		lastIterSubId = state->lastIterSubId;
+
+	Assert(state->rcIsEmulated);
+	Assert(PostgresMainLoopIterationId >= lastIterId);
+	Assert(PostgresMainLoopIterationSubId >= lastIterSubId);
+
+	/*
+	 * If we haven't executed any query in the remote transaction, there's no
+	 * need to refresh the snapshot as the transaction will take a fresh
+	 * snapshot when executing the first query.
+	 */
+	if (!state->haveFirstQuery)
+		return false;
+
+	/*
+	 * If we have already executed any query from within the current top-level
+	 * query in the remote transaction, the transaction should reuse the
+	 * snapthot when executing the query we are about to send to the remote;
+	 * don't refresh the snapshot.
+	 */
+	if (PostgresMainLoopIterationId == lastIterId &&
+		PostgresMainLoopIterationSubId == lastIterSubId)
+		return false;
+	Assert(PostgresMainLoopIterationId > lastIterId ||
+		   PostgresMainLoopIterationSubId > lastIterSubId);
+
+	/*
+	 * If there is any live portal that was created in a previous top-level
+	 * query, the portal's query might access foreign tables by re-creating
+	 * the remote cursor, which should use the same snapshot as before; don't
+	 * refresh the snapshot.
+	 */
+	if (!ThereAreNoOldLivePortals())
+		return false;
+
+	/* Otherwise, it is safe to refresh the snapshot. */
+	return true;
+}
+
+/*
+ * Refresh the snapshot for the remote transaction.
+ */
+void
+do_snapshot_refresh(PgFdwConnState *state)
+{
+	ConnCacheEntry *entry = PgFdwConnStateContainer(state);
+	PGconn	   *conn = entry->conn;
+	const char *sql = "SELECT pg_catalog.pg_refresh_snapshot()";
+	PGresult   *res = NULL;
+
+	/* In what follows, do not risk leaking any PGresults. */
+	PG_TRY();
+	{
+		if (!PQsendQuery(conn, sql))
+			pgfdw_report_error(ERROR, NULL, conn, false, sql);
+
+		res = pgfdw_get_result(conn);
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+			pgfdw_report_error(ERROR, res, conn, false, sql);
+
+		if (PQntuples(res) != 1 || PQnfields(res) != 1)
+			elog(ERROR, "unexpected result from snapshot refresh query");
+	}
+	PG_FINALLY();
+	{
+		PQclear(res);
+	}
+	PG_END_TRY();
+}
+
+/*
+ * Update information related to emulated READ COMMITTED mode.
+ */
+void
+update_emulated_rc_mode_info(PgFdwConnState *state)
+{
+	if (!state->haveFirstQuery)
+		state->haveFirstQuery = true;
+	state->lastIterId = PostgresMainLoopIterationId;
+	state->lastIterSubId = PostgresMainLoopIterationSubId;
+}
+
 /* Number of output arguments (columns) for various API versions */
 #define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_1	2
 #define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_2	5
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index c0810fbd7c..2cbffd9b3b 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -431,6 +431,7 @@ static void estimate_path_cost_size(PlannerInfo *root,
 									Cost *p_startup_cost, Cost *p_total_cost);
 static void get_remote_estimate(const char *sql,
 								PGconn *conn,
+								PgFdwConnState *conn_state,
 								double *rows,
 								int *width,
 								Cost *startup_cost,
@@ -2987,9 +2988,11 @@ postgresExecForeignTruncate(List *rels,
 	Oid			serverid = InvalidOid;
 	UserMapping *user = NULL;
 	PGconn	   *conn = NULL;
+	PgFdwConnState *conn_state;
 	StringInfoData sql;
 	ListCell   *lc;
 	bool		server_truncatable = true;
+	PGresult   *res = NULL;
 
 	/*
 	 * By default, all postgres_fdw foreign tables are assumed truncatable.
@@ -3059,14 +3062,22 @@ postgresExecForeignTruncate(List *rels,
 	 * establish new connection if necessary.
 	 */
 	user = GetUserMapping(GetUserId(), serverid);
-	conn = GetConnection(user, false, NULL);
+	conn = GetConnection(user, false, &conn_state);
 
 	/* Construct the TRUNCATE command string */
 	initStringInfo(&sql);
 	deparseTruncateSql(&sql, rels, behavior, restart_seqs);
 
-	/* Issue the TRUNCATE command to remote server */
-	do_sql_command(conn, sql.data);
+	/*
+	 * Issue the TRUNCATE command to remote server
+	 *
+	 * We don't use a PG_TRY block here, so be careful not to throw error
+	 * without releasing the PGresult.
+	 */
+	res = pgfdw_exec_query(conn, sql.data, conn_state);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pgfdw_report_error(ERROR, res, conn, true, sql.data);
+	PQclear(res);
 
 	pfree(sql.data);
 }
@@ -3119,6 +3130,7 @@ estimate_path_cost_size(PlannerInfo *root,
 		List	   *local_param_join_conds;
 		StringInfoData sql;
 		PGconn	   *conn;
+		PgFdwConnState *conn_state;
 		Selectivity local_sel;
 		QualCost	local_cost;
 		List	   *fdw_scan_tlist = NIL;
@@ -3162,8 +3174,8 @@ estimate_path_cost_size(PlannerInfo *root,
 								false, &retrieved_attrs, NULL);
 
 		/* Get the remote estimate */
-		conn = GetConnection(fpinfo->user, false, NULL);
-		get_remote_estimate(sql.data, conn, &rows, &width,
+		conn = GetConnection(fpinfo->user, false, &conn_state);
+		get_remote_estimate(sql.data, conn, conn_state, &rows, &width,
 							&startup_cost, &total_cost);
 		ReleaseConnection(conn);
 
@@ -3594,7 +3606,7 @@ estimate_path_cost_size(PlannerInfo *root,
  * The given "sql" must be an EXPLAIN command.
  */
 static void
-get_remote_estimate(const char *sql, PGconn *conn,
+get_remote_estimate(const char *sql, PGconn *conn, PgFdwConnState *conn_state,
 					double *rows, int *width,
 					Cost *startup_cost, Cost *total_cost)
 {
@@ -3610,7 +3622,7 @@ get_remote_estimate(const char *sql, PGconn *conn,
 		/*
 		 * Execute EXPLAIN remotely.
 		 */
-		res = pgfdw_exec_query(conn, sql, NULL);
+		res = pgfdw_exec_query(conn, sql, conn_state);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, sql);
 
@@ -3739,6 +3751,14 @@ create_cursor(ForeignScanState *node)
 	if (fsstate->conn_state->pendingAreq)
 		process_pending_request(fsstate->conn_state->pendingAreq);
 
+	/*
+	 * Next, if emulating READ COMMITTED behavior, refresh the snapshot for
+	 * the remote transaction if allowed.
+	 */
+	if (fsstate->conn_state->rcIsEmulated &&
+		snapshot_refresh_ok(fsstate->conn_state))
+		do_snapshot_refresh(fsstate->conn_state);
+
 	/*
 	 * Construct array of query parameter values in text format.  We do the
 	 * conversions in the short-lived per-tuple context, so as not to cause a
@@ -3795,6 +3815,13 @@ create_cursor(ForeignScanState *node)
 
 	/* Clean up */
 	pfree(buf.data);
+
+	/*
+	 * If emulating READ COMMITTED behavior, update information related to
+	 * that mode.
+	 */
+	if (fsstate->conn_state->rcIsEmulated)
+		update_emulated_rc_mode_info(fsstate->conn_state);
 }
 
 /*
@@ -4224,6 +4251,14 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
 	 * if any, so no need to do it here.
 	 */
 
+	/*
+	 * Next, if emulating READ COMMITTED behavior, refresh the snapshot for
+	 * the remote transaction if allowed.
+	 */
+	if (fmstate->conn_state->rcIsEmulated &&
+		snapshot_refresh_ok(fmstate->conn_state))
+		do_snapshot_refresh(fmstate->conn_state);
+
 	/* Construct name we'll use for the prepared statement. */
 	snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
 			 GetPrepStmtNumber(fmstate->conn));
@@ -4256,6 +4291,13 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
 
 	/* This action shows that the prepare has been done. */
 	fmstate->p_name = p_name;
+
+	/*
+	 * If emulating READ COMMITTED behavior, update information related to
+	 * that mode.
+	 */
+	if (fmstate->conn_state->rcIsEmulated)
+		update_emulated_rc_mode_info(fmstate->conn_state);
 }
 
 /*
@@ -4565,6 +4607,14 @@ execute_dml_stmt(ForeignScanState *node)
 	if (dmstate->conn_state->pendingAreq)
 		process_pending_request(dmstate->conn_state->pendingAreq);
 
+	/*
+	 * Next, if emulating READ COMMITTED behavior, refresh the snapshot for
+	 * the remote transaction if allowed.
+	 */
+	if (dmstate->conn_state->rcIsEmulated &&
+		snapshot_refresh_ok(dmstate->conn_state))
+		do_snapshot_refresh(dmstate->conn_state);
+
 	/*
 	 * Construct array of query parameter values in text format.
 	 */
@@ -4602,6 +4652,13 @@ execute_dml_stmt(ForeignScanState *node)
 		dmstate->num_tuples = PQntuples(dmstate->result);
 	else
 		dmstate->num_tuples = atoi(PQcmdTuples(dmstate->result));
+
+	/*
+	 * If emulating READ COMMITTED behavior, update information related to
+	 * that mode.
+	 */
+	if (dmstate->conn_state->rcIsEmulated)
+		update_emulated_rc_mode_info(dmstate->conn_state);
 }
 
 /*
@@ -4946,6 +5003,7 @@ postgresAnalyzeForeignTable(Relation relation,
 	ForeignTable *table;
 	UserMapping *user;
 	PGconn	   *conn;
+	PgFdwConnState *conn_state;
 	StringInfoData sql;
 	PGresult   *volatile res = NULL;
 
@@ -4965,7 +5023,7 @@ postgresAnalyzeForeignTable(Relation relation,
 	 */
 	table = GetForeignTable(RelationGetRelid(relation));
 	user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
-	conn = GetConnection(user, false, NULL);
+	conn = GetConnection(user, false, &conn_state);
 
 	/*
 	 * Construct command to get page count for relation.
@@ -4976,7 +5034,7 @@ postgresAnalyzeForeignTable(Relation relation,
 	/* In what follows, do not risk leaking any PGresults. */
 	PG_TRY();
 	{
-		res = pgfdw_exec_query(conn, sql.data, NULL);
+		res = pgfdw_exec_query(conn, sql.data, conn_state);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, sql.data);
 
@@ -5462,6 +5520,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 	ForeignServer *server;
 	UserMapping *mapping;
 	PGconn	   *conn;
+	PgFdwConnState *conn_state;
 	StringInfoData buf;
 	PGresult   *volatile res = NULL;
 	int			numrows,
@@ -5493,7 +5552,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 	 */
 	server = GetForeignServer(serverOid);
 	mapping = GetUserMapping(GetUserId(), server->serverid);
-	conn = GetConnection(mapping, false, NULL);
+	conn = GetConnection(mapping, false, &conn_state);
 
 	/* Don't attempt to import collation if remote server hasn't got it */
 	if (PQserverVersion(conn) < 90100)
@@ -5509,7 +5568,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 		appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
 		deparseStringLiteral(&buf, stmt->remote_schema);
 
-		res = pgfdw_exec_query(conn, buf.data, NULL);
+		res = pgfdw_exec_query(conn, buf.data, conn_state);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, buf.data);
 
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 9e501660d1..d7fa74559e 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -137,6 +137,17 @@ typedef struct PgFdwRelationInfo
 typedef struct PgFdwConnState
 {
 	AsyncRequest *pendingAreq;	/* pending async request */
+
+	bool		rcIsEmulated;	/* do we emulate READ COMMITTED behavior in
+								 * this xact? */
+	bool		haveFirstQuery;	/* have we executed first query in this
+								 * xact? */
+	uint64		lastIterId;		/* id of PostgresMain loop iteration during
+								 * which we have executed last query in this
+								 * xact */
+	uint64		lastIterSubId;	/* subid of PostgresMain loop iteration during
+								 * which we have executed last query in this
+								 * xact */
 } PgFdwConnState;
 
 /*
@@ -168,6 +179,9 @@ extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query,
 								  PgFdwConnState *state);
 extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
 							   bool clear, const char *sql);
+extern bool snapshot_refresh_ok(PgFdwConnState *state);
+extern void do_snapshot_refresh(PgFdwConnState *state);
+extern void update_emulated_rc_mode_info(PgFdwConnState *state);
 
 /* in option.c */
 extern int	ExtractConnectionOptions(List *defelems,
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 42af768045..926064b0ff 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -103,6 +103,14 @@ int			PostAuthDelay = 0;
 /* Time between checks that the client is still connected. */
 int			client_connection_check_interval = 0;
 
+/*
+ * IDs for iterations of the PostgresMain loop, for use by extensions like
+ * postgres_fdw to know which top-level query is currently being processed,
+ * for example.
+ */
+uint64		PostgresMainLoopIterationId = 0;
+uint64		PostgresMainLoopIterationSubId = 0;
+
 /* flags for non-system relation kinds to restrict use */
 int			restrict_nonsystem_relation_kind;
 
@@ -1117,6 +1125,8 @@ exec_simple_query(const char *query_string)
 		const char *cmdtagname;
 		size_t		cmdtaglen;
 
+		PostgresMainLoopIterationSubId += 1;
+
 		pgstat_report_query_id(0, true);
 
 		/*
@@ -4606,6 +4616,9 @@ PostgresMain(const char *dbname, const char *username)
 		int			firstchar;
 		StringInfoData input_message;
 
+		PostgresMainLoopIterationId += 1;
+		PostgresMainLoopIterationSubId = 0;
+
 		/*
 		 * At top of loop, reset extended-query-message flag, so that any
 		 * errors encountered in "idle" state don't provoke skip.
diff --git a/src/backend/utils/mmgr/portalmem.c b/src/backend/utils/mmgr/portalmem.c
index 93137820ac..2485147d82 100644
--- a/src/backend/utils/mmgr/portalmem.c
+++ b/src/backend/utils/mmgr/portalmem.c
@@ -23,6 +23,7 @@
 #include "funcapi.h"
 #include "miscadmin.h"
 #include "storage/ipc.h"
+#include "tcop/tcopprot.h"
 #include "utils/builtins.h"
 #include "utils/memutils.h"
 #include "utils/snapmgr.h"
@@ -217,6 +218,8 @@ CreatePortal(const char *name, bool allowDup, bool dupSilent)
 	portal->atEnd = true;		/* disallow fetches until query is set */
 	portal->visible = true;
 	portal->creation_time = GetCurrentStatementStartTimestamp();
+	portal->createIterId = PostgresMainLoopIterationId;
+	portal->createIterSubId = PostgresMainLoopIterationSubId;
 
 	/* put portal in table (sets portal->name) */
 	PortalHashTableInsert(portal, name);
@@ -1186,6 +1189,31 @@ ThereAreNoReadyPortals(void)
 	return true;
 }
 
+bool
+ThereAreNoOldLivePortals(void)
+{
+	HASH_SEQ_STATUS status;
+	PortalHashEnt *hentry;
+
+	hash_seq_init(&status, PortalHashTable);
+
+	while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL)
+	{
+		Portal		portal = hentry->portal;
+
+		if ((portal->status == PORTAL_READY ||
+			 portal->status == PORTAL_ACTIVE) &&
+			(portal->createIterId < PostgresMainLoopIterationId ||
+			 portal->createIterSubId < PostgresMainLoopIterationSubId))
+		{
+			hash_seq_term(&status);
+			return false;
+		}
+	}
+
+	return true;
+}
+
 /*
  * Hold all pinned portals.
  *
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index 7d2b34d4f2..1b000cce92 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -159,6 +159,7 @@ static Snapshot CopySnapshot(Snapshot snapshot);
 static void UnregisterSnapshotNoOwner(Snapshot snapshot);
 static void FreeSnapshot(Snapshot snapshot);
 static void SnapshotResetXmin(void);
+static Snapshot RefreshTransactionSnapshot(void);
 
 /* ResourceOwner callbacks to track snapshot references */
 static void ResOwnerReleaseSnapshot(Datum res);
@@ -1842,6 +1843,70 @@ RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
 	SetTransactionSnapshot(snapshot, NULL, InvalidPid, source_pgproc);
 }
 
+/*
+ * RefreshTransactionSnapshot
+ *		Refresh the snapshot for a REPEATABLE READ transaction.
+ */
+static Snapshot
+RefreshTransactionSnapshot(void)
+{
+	bool		resetXmin = false;
+
+	Assert(FirstSnapshotSet);
+	Assert(FirstXactSnapshot->regd_count > 0);
+	Assert(!pairingheap_is_empty(&RegisteredSnapshots));
+
+	/*
+	 * Decrement the reference count of the current transaction snapshot, and
+	 * free the snapshot if no more references remain.
+	 */
+	FirstXactSnapshot->regd_count--;
+	if (FirstXactSnapshot->regd_count == 0)
+		pairingheap_remove(&RegisteredSnapshots,
+						   &FirstXactSnapshot->ph_node);
+
+	if (FirstXactSnapshot->regd_count == 0 &&
+		FirstXactSnapshot->active_count == 0)
+	{
+		FreeSnapshot(FirstXactSnapshot);
+		resetXmin = true;
+	}
+
+	/* Don't allow catalog snapshot to be older than xact snapshot. */
+	InvalidateCatalogSnapshot();
+
+	/* First, create the snapshot in CurrentSnapshotData */
+	CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
+
+	/* Make a saved copy */
+	CurrentSnapshot = CopySnapshot(CurrentSnapshot);
+	FirstXactSnapshot = CurrentSnapshot;
+	/* Mark it as "registered" in FirstXactSnapshot */
+	FirstXactSnapshot->regd_count++;
+	pairingheap_add(&RegisteredSnapshots, &FirstXactSnapshot->ph_node);
+
+	if (resetXmin)
+		SnapshotResetXmin();
+
+	return CurrentSnapshot;
+}
+
+/*
+ * pg_refresh_snapshot
+ *		SQL-callable wrapper for RefreshTransactionSnapshot.
+ */
+Datum
+pg_refresh_snapshot(PG_FUNCTION_ARGS)
+{
+	if (XactIsoLevel != XACT_REPEATABLE_READ)
+		ereport(ERROR,
+				(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
+				 errmsg("pg_refresh_snapshot must be called in a repeatable-read transaction")));
+
+	RefreshTransactionSnapshot();
+	PG_RETURN_VOID();
+}
+
 /*
  * XidInMVCCSnapshot
  *		Is the given XID still-in-progress according to the snapshot?
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 9575524007..3d770c8991 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -6643,6 +6643,10 @@
   proname => 'pg_export_snapshot', provolatile => 'v', proparallel => 'u',
   prorettype => 'text', proargtypes => '', prosrc => 'pg_export_snapshot' },
 
+{ oid => '8000', descr => 'refresh transaction snapshot',
+  proname => 'pg_refresh_snapshot', provolatile => 'v', proparallel => 'u',
+  prorettype => 'void', proargtypes => '', prosrc => 'pg_refresh_snapshot' },
+
 { oid => '3810', descr => 'true if server is in recovery',
   proname => 'pg_is_in_recovery', provolatile => 'v', prorettype => 'bool',
   proargtypes => '', prosrc => 'pg_is_in_recovery' },
diff --git a/src/include/tcop/tcopprot.h b/src/include/tcop/tcopprot.h
index 0c36d92742..38eaedde6d 100644
--- a/src/include/tcop/tcopprot.h
+++ b/src/include/tcop/tcopprot.h
@@ -29,6 +29,8 @@ extern PGDLLIMPORT const char *debug_query_string;
 extern PGDLLIMPORT int max_stack_depth;
 extern PGDLLIMPORT int PostAuthDelay;
 extern PGDLLIMPORT int client_connection_check_interval;
+extern PGDLLIMPORT uint64 PostgresMainLoopIterationId;
+extern PGDLLIMPORT uint64 PostgresMainLoopIterationSubId;
 
 /* GUC-configurable parameters */
 
diff --git a/src/include/utils/portal.h b/src/include/utils/portal.h
index 29f49829f2..226828b20e 100644
--- a/src/include/utils/portal.h
+++ b/src/include/utils/portal.h
@@ -203,6 +203,13 @@ typedef struct PortalData
 	/* Presentation data, primarily used by the pg_cursors system view */
 	TimestampTz creation_time;	/* time at which this portal was defined */
 	bool		visible;		/* include this portal in pg_cursors? */
+
+	/*
+	 * State data for remembering which iteration of the PostgresMain loop the
+	 * portal was created at, for use by extensions like postgres_fdw.
+	 */
+	uint64		createIterId;
+	uint64		createIterSubId;
 }			PortalData;
 
 /*
@@ -246,6 +253,7 @@ extern PlannedStmt *PortalGetPrimaryStmt(Portal portal);
 extern void PortalCreateHoldStore(Portal portal);
 extern void PortalHashTableDeleteAll(void);
 extern bool ThereAreNoReadyPortals(void);
+extern bool ThereAreNoOldLivePortals(void);
 extern void HoldPinnedPortals(void);
 extern void ForgetPortalSnapshots(void);
 
#2Robert Haas
robertmhaas@gmail.com
In reply to: Etsuro Fujita (#1)
Re: postgres_fdw: Provide better emulation of READ COMMITTED behavior

On Thu, Dec 5, 2024 at 4:41 AM Etsuro Fujita <etsuro.fujita@gmail.com> wrote:

Comments welcome! Maybe I am missing something, though.

I have a hard time seeing how this would work if cursors are in use on
the main server. Say I do this:

DECLARE foo CURSOR FOR SELECT * FROM ft1 UNION ALL SELECT * FROM ft2;
...fetch some rows from cursor foo but few enough that we only scan ft1...
...do something that causes a snapshot refresh like issue another query...
...fetch more rows from cursor foo until we start scanning ft2...

--
Robert Haas
EDB: http://www.enterprisedb.com

#3Andy Fan
zhihuifan1213@163.com
In reply to: Robert Haas (#2)
Re: postgres_fdw: Provide better emulation of READ COMMITTED behavior

Robert Haas <robertmhaas@gmail.com> writes:

On Thu, Dec 5, 2024 at 4:41 AM Etsuro Fujita <etsuro.fujita@gmail.com> wrote:

Comments welcome! Maybe I am missing something, though.

I have a hard time seeing how this would work if cursors are in use on
the main server. Say I do this:

DECLARE foo CURSOR FOR SELECT * FROM ft1 UNION ALL SELECT * FROM ft2;
...fetch some rows from cursor foo but few enough that we only scan ft1...
...do something that causes a snapshot refresh like issue another query...
...fetch more rows from cursor foo until we start scanning ft2...

Apart from the above issue, what do you think about that we are using a
'SELECT pg_catalog.pg_refresh_snapshot()' to let the remote do the
refresh_snapshot VS 'a new message type for this'? There are lots of
things happen in the 'SELECT' way like 'a extra network communication',
'a complete parser-planner-executor workflow.' With a new message type
for this, we can send the message character with the next query
together. if so, can the two overheads removed?

--
Best Regards
Andy Fan

#4Etsuro Fujita
etsuro.fujita@gmail.com
In reply to: Robert Haas (#2)
Re: postgres_fdw: Provide better emulation of READ COMMITTED behavior

On Fri, Dec 6, 2024 at 2:37 AM Robert Haas <robertmhaas@gmail.com> wrote:

I have a hard time seeing how this would work if cursors are in use on
the main server. Say I do this:

DECLARE foo CURSOR FOR SELECT * FROM ft1 UNION ALL SELECT * FROM ft2;
...fetch some rows from cursor foo but few enough that we only scan ft1...
...do something that causes a snapshot refresh like issue another query...
...fetch more rows from cursor foo until we start scanning ft2...

Good point! Maybe my explanation was not enough, but the proposed
patch does not allow any query to do a snapshot refresh if such open
cursors exist on the main server, so cursor foo is guaranteed to scan
ft2 using the same snapshot that was used to scan ft1.

Thank you for taking the time for this proposal!

Best regards,
Etsuro Fujita

#5Etsuro Fujita
etsuro.fujita@gmail.com
In reply to: Andy Fan (#3)
Re: postgres_fdw: Provide better emulation of READ COMMITTED behavior

On Fri, Dec 6, 2024 at 7:50 PM Andy Fan <zhihuifan1213@163.com> wrote:

Apart from the above issue, what do you think about that we are using a
'SELECT pg_catalog.pg_refresh_snapshot()' to let the remote do the
refresh_snapshot VS 'a new message type for this'? There are lots of
things happen in the 'SELECT' way like 'a extra network communication',
'a complete parser-planner-executor workflow.' With a new message type
for this, we can send the message character with the next query
together. if so, can the two overheads removed?

I think it might be a good idea to extend simple/extend query
protocols that way, but even if so, I would like to leave that for
future work, because even without that, I think this is still an
improvement, and I do not want to set the goal for the first cut too
high.

Having said that, if the next query uses simple query protocol, we can
avoid the extra communication by sending the two queries in a single
function call. I will do that in the next version.

Thanks for the comment!

Best regards,
Etsuro Fujita

#6Andy Fan
zhihuifan1213@163.com
In reply to: Etsuro Fujita (#5)
Re: postgres_fdw: Provide better emulation of READ COMMITTED behavior

Etsuro Fujita <etsuro.fujita@gmail.com> writes:

On Fri, Dec 6, 2024 at 7:50 PM Andy Fan <zhihuifan1213@163.com> wrote:

Apart from the above issue, what do you think about that we are using a
'SELECT pg_catalog.pg_refresh_snapshot()' to let the remote do the
refresh_snapshot VS 'a new message type for this'? There are lots of
things happen in the 'SELECT' way like 'a extra network communication',
'a complete parser-planner-executor workflow.' With a new message type
for this, we can send the message character with the next query
together. if so, can the two overheads removed?

I think it might be a good idea to extend simple/extend query
protocols that way, but even if so, I would like to leave that for
future work, because even without that, I think this is still an
improvement, and I do not want to set the goal for the first cut too
high.

That's reasonable.

Having said that, if the next query uses simple query protocol, we can
avoid the extra communication by sending the two queries in a single
function call. I will do that in the next version.

Good to know that.

After reading the patch, the changes looks good to me except the name of
ThereAre[No]OldLivePortals(), multiple negations can be somewhat confusing
at times. Opinions may vary, However. I point this out just in case this
is done by mistake when you were refactoring the code. If you think the
current one is better, I'm totoally OK with it.

--
Best Regards
Andy Fan

#7Robert Haas
robertmhaas@gmail.com
In reply to: Etsuro Fujita (#4)
Re: postgres_fdw: Provide better emulation of READ COMMITTED behavior

On Sat, Dec 7, 2024 at 4:03 AM Etsuro Fujita <etsuro.fujita@gmail.com> wrote:

On Fri, Dec 6, 2024 at 2:37 AM Robert Haas <robertmhaas@gmail.com> wrote:

I have a hard time seeing how this would work if cursors are in use on
the main server. Say I do this:

DECLARE foo CURSOR FOR SELECT * FROM ft1 UNION ALL SELECT * FROM ft2;
...fetch some rows from cursor foo but few enough that we only scan ft1...
...do something that causes a snapshot refresh like issue another query...
...fetch more rows from cursor foo until we start scanning ft2...

Good point! Maybe my explanation was not enough, but the proposed
patch does not allow any query to do a snapshot refresh if such open
cursors exist on the main server, so cursor foo is guaranteed to scan
ft2 using the same snapshot that was used to scan ft1.

OK, I see. That does prevent the hazard I mentioned, but it also means
that the results returned by a query are dependent on whether there's
an unrelated cursor open, which seems unfortunate.

--
Robert Haas
EDB: http://www.enterprisedb.com