Refactoring postgres_fdw/connection.c
Hi,
When reviewing the postgres_fdw parallel-abort patch [1]https://commitfest.postgresql.org/38/3392/, I found that
there are several duplicate codes in postgres_fdw/connection.c.
Which seems to make it harder to review the patch changing connection.c.
So I'd like to remove such duplicate codes and refactor the functions
in connection.c. I attached the following three patches.
There are two functions, pgfdw_get_result() and pgfdw_get_cleanup_result(),
to get a query result. They have almost the same code, call PQisBusy(),
WaitLatchOrSocket(), PQconsumeInput() and PQgetResult() in the loop,
but only pgfdw_get_cleanup_result() allows its callers to specify the timeout.
0001 patch transforms pgfdw_get_cleanup_result() to the common function
to get a query result and makes pgfdw_get_result() use it instead of
its own (duplicate) code. The patch also renames pgfdw_get_cleanup_result()
to pgfdw_get_result_timed().
pgfdw_xact_callback() and pgfdw_subxact_callback() have similar codes to
issue COMMIT or RELEASE SAVEPOINT commands. 0002 patch adds the common function,
pgfdw_exec_pre_commit(), for that purpose, and changes those functions
so that they use the common one.
pgfdw_finish_pre_commit_cleanup() and pgfdw_finish_pre_subcommit_cleanup()
have similar codes to wait for the results of COMMIT or RELEASE SAVEPOINT commands.
0003 patch adds the common function, pgfdw_finish_pre_commit(), for that purpose,
and replaces those functions with the common one.
That is, pgfdw_finish_pre_commit_cleanup() and pgfdw_finish_pre_subcommit_cleanup()
are no longer necessary and 0003 patch removes them.
[1]: https://commitfest.postgresql.org/38/3392/
Regards,
--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION
Attachments:
v1-0001-Refactor-pgfdw_get_result-and-pgfdw_get_cleanup_resu.patchtext/plain; charset=UTF-8; name=v1-0001-Refactor-pgfdw_get_result-and-pgfdw_get_cleanup_resu.patchDownload
From aa115d03880968c2e5bab68415e06e17a337a45b Mon Sep 17 00:00:00 2001
From: Fujii Masao <fujii@postgresql.org>
Date: Mon, 25 Jul 2022 17:25:24 +0900
Subject: [PATCH 1/3] Refactor pgfdw_get_result() and
pgfdw_get_cleanup_result().
---
contrib/postgres_fdw/connection.c | 125 +++++++++++-------------------
1 file changed, 47 insertions(+), 78 deletions(-)
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 939d114f02..cbee285480 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -108,8 +108,8 @@ static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
static bool pgfdw_cancel_query(PGconn *conn);
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
bool ignore_errors);
-static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
- PGresult **result, bool *timed_out);
+static bool pgfdw_get_result_timed(PGconn *conn, TimestampTz endtime,
+ PGresult **result, bool *timed_out);
static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
@@ -799,53 +799,12 @@ pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
PGresult *
pgfdw_get_result(PGconn *conn, const char *query)
{
- PGresult *volatile last_res = NULL;
-
- /* In what follows, do not leak any PGresults on an error. */
- PG_TRY();
- {
- for (;;)
- {
- PGresult *res;
-
- while (PQisBusy(conn))
- {
- int wc;
-
- /* Sleep until there's something to do */
- wc = WaitLatchOrSocket(MyLatch,
- WL_LATCH_SET | WL_SOCKET_READABLE |
- WL_EXIT_ON_PM_DEATH,
- PQsocket(conn),
- -1L, PG_WAIT_EXTENSION);
- ResetLatch(MyLatch);
-
- CHECK_FOR_INTERRUPTS();
-
- /* Data available in socket? */
- if (wc & WL_SOCKET_READABLE)
- {
- if (!PQconsumeInput(conn))
- pgfdw_report_error(ERROR, NULL, conn, false, query);
- }
- }
-
- res = PQgetResult(conn);
- if (res == NULL)
- break; /* query is complete */
+ PGresult *result = NULL;
- PQclear(last_res);
- last_res = res;
- }
- }
- PG_CATCH();
- {
- PQclear(last_res);
- PG_RE_THROW();
- }
- PG_END_TRY();
+ if (pgfdw_get_result_timed(conn, 0, &result, NULL))
+ pgfdw_report_error(ERROR, NULL, conn, false, query);
- return last_res;
+ return result;
}
/*
@@ -1295,7 +1254,7 @@ pgfdw_cancel_query(PGconn *conn)
}
/* Get and discard the result of the query. */
- if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
+ if (pgfdw_get_result_timed(conn, endtime, &result, &timed_out))
{
if (timed_out)
ereport(WARNING,
@@ -1351,7 +1310,7 @@ pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
}
/* Get the result of the query. */
- if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
+ if (pgfdw_get_result_timed(conn, endtime, &result, &timed_out))
{
if (timed_out)
ereport(WARNING,
@@ -1375,24 +1334,33 @@ pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
}
/*
- * Get, during abort cleanup, the result of a query that is in progress. This
- * might be a query that is being interrupted by transaction abort, or it might
- * be a query that was initiated as part of transaction abort to get the remote
- * side back to the appropriate state.
+ * Get the result of a query.
+ *
+ * This function offers quick responsiveness by checking for any interruptions.
+ *
+ * If timed_out is NULL, the timeout does not occur. Otherwise, the timeout is
+ * enabled and endtime is used as the time at which this function should
+ * give up and assume the remote side is dead.
+ *
+ * Return true if the timeout expired or connection trouble occurred. Otherwise
+ * return false and set *result to the last result of a query. Set timed_out to
+ * true only when the timeout expired.
+ *
+ * This function emulates PQexec()'s behavior of returning the last result
+ * when there are many.
+ *
+ * Caller is responsible for the error handling on the result.
*
- * endtime is the time at which we should give up and assume the remote
- * side is dead. Returns true if the timeout expired or connection trouble
- * occurred, false otherwise. Sets *result except in case of a timeout.
- * Sets timed_out to true only when the timeout expired.
*/
static bool
-pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result,
- bool *timed_out)
+pgfdw_get_result_timed(PGconn *conn, TimestampTz endtime, PGresult **result,
+ bool *timed_out)
{
volatile bool failed = false;
PGresult *volatile last_res = NULL;
- *timed_out = false;
+ if (timed_out != NULL)
+ *timed_out = false;
/* In what follows, do not leak any PGresults on an error. */
PG_TRY();
@@ -1404,23 +1372,27 @@ pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result,
while (PQisBusy(conn))
{
int wc;
- TimestampTz now = GetCurrentTimestamp();
- long cur_timeout;
+ long cur_timeout = -1;
+ int wakeEvents = WL_LATCH_SET | WL_SOCKET_READABLE |
+ WL_EXIT_ON_PM_DEATH;
/* If timeout has expired, give up, else get sleep time. */
- cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
- if (cur_timeout <= 0)
+ if (timed_out != NULL)
{
- *timed_out = true;
- failed = true;
- goto exit;
+ TimestampTz now = GetCurrentTimestamp();
+
+ cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
+ if (cur_timeout <= 0)
+ {
+ *timed_out = true;
+ failed = true;
+ goto exit;
+ }
+ wakeEvents |= WL_TIMEOUT;
}
/* Sleep until there's something to do */
- wc = WaitLatchOrSocket(MyLatch,
- WL_LATCH_SET | WL_SOCKET_READABLE |
- WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
- PQsocket(conn),
+ wc = WaitLatchOrSocket(MyLatch, wakeEvents, PQsocket(conn),
cur_timeout, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
@@ -1458,6 +1430,7 @@ exit: ;
PQclear(last_res);
else
*result = last_res;
+
return failed;
}
@@ -1599,13 +1572,9 @@ pgfdw_finish_pre_commit_cleanup(List *pending_entries)
entry = (ConnCacheEntry *) lfirst(lc);
/* Ignore errors (see notes in pgfdw_xact_callback) */
- while ((res = PQgetResult(entry->conn)) != NULL)
- {
- PQclear(res);
- /* Stop if the connection is lost (else we'll loop infinitely) */
- if (PQstatus(entry->conn) == CONNECTION_BAD)
- break;
- }
+ pgfdw_get_result_timed(entry->conn, 0, &res, NULL);
+ PQclear(res);
+
entry->have_prep_stmt = false;
entry->have_error = false;
--
2.37.1
v1-0002-Add-common-function-to-commit-xact-or-subxact-during.patchtext/plain; charset=UTF-8; name=v1-0002-Add-common-function-to-commit-xact-or-subxact-during.patchDownload
From 4205943a12d50255e7150f3b70aa75b061070e5e Mon Sep 17 00:00:00 2001
From: Fujii Masao <fujii@postgresql.org>
Date: Mon, 25 Jul 2022 22:45:06 +0900
Subject: [PATCH 2/3] Add common function to commit xact or subxact during
pre-commit.
---
contrib/postgres_fdw/connection.c | 122 ++++++++++++++++--------------
1 file changed, 66 insertions(+), 56 deletions(-)
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index cbee285480..ec290459be 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -111,6 +111,8 @@ static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
static bool pgfdw_get_result_timed(PGconn *conn, TimestampTz endtime,
PGresult **result, bool *timed_out);
static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
+static bool pgfdw_exec_pre_commit(ConnCacheEntry *entry, const char *sql,
+ List **pending_entries, bool toplevel);
static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
int curlevel);
@@ -894,8 +896,6 @@ pgfdw_xact_callback(XactEvent event, void *arg)
hash_seq_init(&scan, ConnectionHash);
while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
{
- PGresult *res;
-
/* Ignore cache entry if no open connection right now */
if (entry->conn == NULL)
continue;
@@ -911,45 +911,10 @@ pgfdw_xact_callback(XactEvent event, void *arg)
case XACT_EVENT_PARALLEL_PRE_COMMIT:
case XACT_EVENT_PRE_COMMIT:
- /*
- * If abort cleanup previously failed for this connection,
- * we can't issue any more commands against it.
- */
- pgfdw_reject_incomplete_xact_state_change(entry);
-
/* Commit all remote transactions during pre-commit */
- entry->changing_xact_state = true;
- if (entry->parallel_commit)
- {
- do_sql_command_begin(entry->conn, "COMMIT TRANSACTION");
- pending_entries = lappend(pending_entries, entry);
+ if (pgfdw_exec_pre_commit(entry, "COMMIT TRANSACTION",
+ &pending_entries, true))
continue;
- }
- do_sql_command(entry->conn, "COMMIT TRANSACTION");
- entry->changing_xact_state = false;
-
- /*
- * If there were any errors in subtransactions, and we
- * made prepared statements, do a DEALLOCATE ALL to make
- * sure we get rid of all prepared statements. This is
- * annoying and not terribly bulletproof, but it's
- * probably not worth trying harder.
- *
- * DEALLOCATE ALL only exists in 8.3 and later, so this
- * constrains how old a server postgres_fdw can
- * communicate with. We intentionally ignore errors in
- * the DEALLOCATE, so that we can hobble along to some
- * extent with older servers (leaking prepared statements
- * as we go; but we don't really support update operations
- * pre-8.3 anyway).
- */
- if (entry->have_prep_stmt && entry->have_error)
- {
- res = PQexec(entry->conn, "DEALLOCATE ALL");
- PQclear(res);
- }
- entry->have_prep_stmt = false;
- entry->have_error = false;
break;
case XACT_EVENT_PRE_PREPARE:
@@ -1014,6 +979,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
ConnCacheEntry *entry;
int curlevel;
List *pending_entries = NIL;
+ char sql[100];
/* Nothing to do at subxact start, nor after commit. */
if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
@@ -1029,11 +995,11 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
* of the current level, and close them.
*/
curlevel = GetCurrentTransactionNestLevel();
+ snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
+
hash_seq_init(&scan, ConnectionHash);
while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
{
- char sql[100];
-
/*
* We only care about connections with open remote subtransactions of
* the current level.
@@ -1047,23 +1013,9 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
{
- /*
- * If abort cleanup previously failed for this connection, we
- * can't issue any more commands against it.
- */
- pgfdw_reject_incomplete_xact_state_change(entry);
-
/* Commit all remote subtransactions during pre-commit */
- snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
- entry->changing_xact_state = true;
- if (entry->parallel_commit)
- {
- do_sql_command_begin(entry->conn, sql);
- pending_entries = lappend(pending_entries, entry);
+ if (pgfdw_exec_pre_commit(entry, sql, &pending_entries, false))
continue;
- }
- do_sql_command(entry->conn, sql);
- entry->changing_xact_state = false;
}
else
{
@@ -1512,6 +1464,64 @@ pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
entry->changing_xact_state = false;
}
+/*
+ * Commit all remote transactions or subtransactions during pre-commit.
+ *
+ * If parallel_commit is enabled at this connection cache entry and
+ * the result of "sql" needs to be gotten later, return true and append
+ * this entry to "pending_entries".
+ *
+ * "toplevel" should be set to true if toplevel (main) transaction is
+ * committed, false otherwise.
+ */
+static bool
+pgfdw_exec_pre_commit(ConnCacheEntry *entry, const char *sql,
+ List **pending_entries, bool toplevel)
+{
+ PGresult *res;
+
+ /*
+ * If abort cleanup previously failed for this connection, we can't issue
+ * any more commands against it.
+ */
+ pgfdw_reject_incomplete_xact_state_change(entry);
+
+ entry->changing_xact_state = true;
+ if (entry->parallel_commit)
+ {
+ do_sql_command_begin(entry->conn, sql);
+ *pending_entries = lappend(*pending_entries, entry);
+ return true;
+ }
+ do_sql_command(entry->conn, sql);
+ entry->changing_xact_state = false;
+
+ if (!toplevel)
+ return false;
+
+ /*
+ * If there were any errors in subtransactions, and we made prepared
+ * statements, do a DEALLOCATE ALL to make sure we get rid of all prepared
+ * statements. This is annoying and not terribly bulletproof, but it's
+ * probably not worth trying harder.
+ *
+ * DEALLOCATE ALL only exists in 8.3 and later, so this constrains how old
+ * a server postgres_fdw can communicate with. We intentionally ignore
+ * errors in the DEALLOCATE, so that we can hobble along to some extent
+ * with older servers (leaking prepared statements as we go; but we don't
+ * really support update operations pre-8.3 anyway).
+ */
+ if (entry->have_prep_stmt && entry->have_error)
+ {
+ res = PQexec(entry->conn, "DEALLOCATE ALL");
+ PQclear(res);
+ }
+ entry->have_prep_stmt = false;
+ entry->have_error = false;
+
+ return false;
+}
+
/*
* Finish pre-commit cleanup of connections on each of which we've sent a
* COMMIT command to the remote server.
--
2.37.1
v1-0003-Merge-pgfdw_finish_pre_commit_cleanup-and-pgfdw_fini.patchtext/plain; charset=UTF-8; name=v1-0003-Merge-pgfdw_finish_pre_commit_cleanup-and-pgfdw_fini.patchDownload
From dbf44195ca0c51a172634a5d0fc16c34beb0381f Mon Sep 17 00:00:00 2001
From: Fujii Masao <fujii@postgresql.org>
Date: Mon, 25 Jul 2022 23:27:14 +0900
Subject: [PATCH 3/3] Merge pgfdw_finish_pre_commit_cleanup and
pgfdw_finish_pre_subcommit_cleanup into one.
---
contrib/postgres_fdw/connection.c | 78 ++++++++++---------------------
1 file changed, 25 insertions(+), 53 deletions(-)
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index ec290459be..6e23046ad6 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -113,9 +113,8 @@ static bool pgfdw_get_result_timed(PGconn *conn, TimestampTz endtime,
static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
static bool pgfdw_exec_pre_commit(ConnCacheEntry *entry, const char *sql,
List **pending_entries, bool toplevel);
-static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
-static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
- int curlevel);
+static void pgfdw_finish_pre_commit(List *pending_entries, const char *sql,
+ bool toplevel);
static bool UserMappingPasswordRequired(UserMapping *user);
static bool disconnect_cached_connections(Oid serverid);
@@ -954,7 +953,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
{
Assert(event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
event == XACT_EVENT_PRE_COMMIT);
- pgfdw_finish_pre_commit_cleanup(pending_entries);
+ pgfdw_finish_pre_commit(pending_entries, "COMMIT TRANSACTION", true);
}
/*
@@ -1031,7 +1030,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
if (pending_entries)
{
Assert(event == SUBXACT_EVENT_PRE_COMMIT_SUB);
- pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
+ pgfdw_finish_pre_commit(pending_entries, sql, false);
}
}
@@ -1523,11 +1522,14 @@ pgfdw_exec_pre_commit(ConnCacheEntry *entry, const char *sql,
}
/*
- * Finish pre-commit cleanup of connections on each of which we've sent a
- * COMMIT command to the remote server.
+ * Wait for all remote transactions or subtransactions to be committed
+ * and finish pre-commit.
+ *
+ * "toplevel" should be set to true if toplevel (main) transaction is
+ * committed, false otherwise.
*/
static void
-pgfdw_finish_pre_commit_cleanup(List *pending_entries)
+pgfdw_finish_pre_commit(List *pending_entries, const char *sql, bool toplevel)
{
ConnCacheEntry *entry;
List *pending_deallocs = NIL;
@@ -1536,7 +1538,8 @@ pgfdw_finish_pre_commit_cleanup(List *pending_entries)
Assert(pending_entries);
/*
- * Get the result of the COMMIT command for each of the pending entries
+ * Get the result of COMMIT or RELEASE command for each of the pending
+ * entries.
*/
foreach(lc, pending_entries)
{
@@ -1548,23 +1551,26 @@ pgfdw_finish_pre_commit_cleanup(List *pending_entries)
* We might already have received the result on the socket, so pass
* consume_input=true to try to consume it first
*/
- do_sql_command_end(entry->conn, "COMMIT TRANSACTION", true);
+ do_sql_command_end(entry->conn, sql, true);
entry->changing_xact_state = false;
/* Do a DEALLOCATE ALL in parallel if needed */
- if (entry->have_prep_stmt && entry->have_error)
+ if (toplevel)
{
- /* Ignore errors (see notes in pgfdw_xact_callback) */
- if (PQsendQuery(entry->conn, "DEALLOCATE ALL"))
+ if (entry->have_prep_stmt && entry->have_error)
{
- pending_deallocs = lappend(pending_deallocs, entry);
- continue;
+ /* Ignore errors (see notes in pgfdw_xact_callback) */
+ if (PQsendQuery(entry->conn, "DEALLOCATE ALL"))
+ {
+ pending_deallocs = lappend(pending_deallocs, entry);
+ continue;
+ }
}
+ entry->have_prep_stmt = false;
+ entry->have_error = false;
}
- entry->have_prep_stmt = false;
- entry->have_error = false;
- pgfdw_reset_xact_state(entry, true);
+ pgfdw_reset_xact_state(entry, toplevel);
}
/* No further work if no pending entries */
@@ -1588,41 +1594,7 @@ pgfdw_finish_pre_commit_cleanup(List *pending_entries)
entry->have_prep_stmt = false;
entry->have_error = false;
- pgfdw_reset_xact_state(entry, true);
- }
-}
-
-/*
- * Finish pre-subcommit cleanup of connections on each of which we've sent a
- * RELEASE command to the remote server.
- */
-static void
-pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
-{
- ConnCacheEntry *entry;
- char sql[100];
- ListCell *lc;
-
- Assert(pending_entries);
-
- /*
- * Get the result of the RELEASE command for each of the pending entries
- */
- snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
- foreach(lc, pending_entries)
- {
- entry = (ConnCacheEntry *) lfirst(lc);
-
- Assert(entry->changing_xact_state);
-
- /*
- * We might already have received the result on the socket, so pass
- * consume_input=true to try to consume it first
- */
- do_sql_command_end(entry->conn, sql, true);
- entry->changing_xact_state = false;
-
- pgfdw_reset_xact_state(entry, false);
+ pgfdw_reset_xact_state(entry, toplevel);
}
}
--
2.37.1
At Tue, 26 Jul 2022 00:54:47 +0900, Fujii Masao <masao.fujii@oss.nttdata.com> wrote in
Hi,
When reviewing the postgres_fdw parallel-abort patch [1], I found that
there are several duplicate codes in postgres_fdw/connection.c.
Which seems to make it harder to review the patch changing
connection.c.
So I'd like to remove such duplicate codes and refactor the functions
in connection.c. I attached the following three patches.There are two functions, pgfdw_get_result() and
pgfdw_get_cleanup_result(),
to get a query result. They have almost the same code, call
PQisBusy(),
WaitLatchOrSocket(), PQconsumeInput() and PQgetResult() in the loop,
but only pgfdw_get_cleanup_result() allows its callers to specify the
timeout.
0001 patch transforms pgfdw_get_cleanup_result() to the common
function
to get a query result and makes pgfdw_get_result() use it instead of
its own (duplicate) code. The patch also renames
pgfdw_get_cleanup_result()
to pgfdw_get_result_timed().
Agree to that refactoring. And it looks fine to me.
pgfdw_xact_callback() and pgfdw_subxact_callback() have similar codes
to
issue COMMIT or RELEASE SAVEPOINT commands. 0002 patch adds the common
function,
pgfdw_exec_pre_commit(), for that purpose, and changes those functions
so that they use the common one.
I'm not sure the two are similar with each other. The new function
pgfdw_exec_pre_commit() looks like a merger of two isolated code paths
intended to share a seven-line codelet. I feel the code gets a bit
harder to understand after the change. I mildly oppose to this part.
pgfdw_finish_pre_commit_cleanup() and
pgfdw_finish_pre_subcommit_cleanup()
have similar codes to wait for the results of COMMIT or RELEASE
SAVEPOINT commands.
0003 patch adds the common function, pgfdw_finish_pre_commit(), for
that purpose,
and replaces those functions with the common one.
That is, pgfdw_finish_pre_commit_cleanup() and
pgfdw_finish_pre_subcommit_cleanup()
are no longer necessary and 0003 patch removes them.
It gives the same feeling with 0002. Considering that
pending_deallocate becomes non-NIL only when toplevel, 38 lines out of
66 lines of the function are the toplevel-dedicated stuff.
regards.
--
Kyotaro Horiguchi
NTT Open Source Software Center
On 2022/07/26 16:25, Kyotaro Horiguchi wrote:
Agree to that refactoring. And it looks fine to me.
Thanks for reviewing the patches!
I'm not sure the two are similar with each other. The new function
pgfdw_exec_pre_commit() looks like a merger of two isolated code paths
intended to share a seven-line codelet. I feel the code gets a bit
harder to understand after the change. I mildly oppose to this part.
If so, we can pgfdw_exec_pre_commit() into two, one is the common
function that sends or executes the command (i.e., calls
do_sql_command_begin() or do_sql_command()), and another is
the function only for toplevel. The latter function calls
the common function and then executes DEALLOCATE ALL things.
But this is not the way that other functions like pgfdw_abort_cleanup()
is implemented. Those functions have both codes for toplevel and
!toplevel (i.e., subxact), and run the processings depending
on the argument "toplevel". So I'm thinking that
pgfdw_exec_pre_commit() implemented in the same way is better.
It gives the same feeling with 0002. Considering that
pending_deallocate becomes non-NIL only when toplevel, 38 lines out of
66 lines of the function are the toplevel-dedicated stuff.
Same as above.
Regards,
--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION
Fujii-san,
On Tue, Jul 26, 2022 at 12:55 AM Fujii Masao
<masao.fujii@oss.nttdata.com> wrote:
When reviewing the postgres_fdw parallel-abort patch [1], I found that
there are several duplicate codes in postgres_fdw/connection.c.
Which seems to make it harder to review the patch changing connection.c.
So I'd like to remove such duplicate codes and refactor the functions
in connection.c. I attached the following three patches.There are two functions, pgfdw_get_result() and pgfdw_get_cleanup_result(),
to get a query result. They have almost the same code, call PQisBusy(),
WaitLatchOrSocket(), PQconsumeInput() and PQgetResult() in the loop,
but only pgfdw_get_cleanup_result() allows its callers to specify the timeout.
0001 patch transforms pgfdw_get_cleanup_result() to the common function
to get a query result and makes pgfdw_get_result() use it instead of
its own (duplicate) code. The patch also renames pgfdw_get_cleanup_result()
to pgfdw_get_result_timed().pgfdw_xact_callback() and pgfdw_subxact_callback() have similar codes to
issue COMMIT or RELEASE SAVEPOINT commands. 0002 patch adds the common function,
pgfdw_exec_pre_commit(), for that purpose, and changes those functions
so that they use the common one.pgfdw_finish_pre_commit_cleanup() and pgfdw_finish_pre_subcommit_cleanup()
have similar codes to wait for the results of COMMIT or RELEASE SAVEPOINT commands.
0003 patch adds the common function, pgfdw_finish_pre_commit(), for that purpose,
and replaces those functions with the common one.
That is, pgfdw_finish_pre_commit_cleanup() and pgfdw_finish_pre_subcommit_cleanup()
are no longer necessary and 0003 patch removes them.
Thanks for working on this! I'd like to review this after the end of
the current CF. Could you add this to the next CF?
Best regards,
Etsuro Fujita
On 2022/07/26 19:26, Etsuro Fujita wrote:
Thanks for working on this! I'd like to review this after the end of
the current CF.
Thanks!
Could you add this to the next CF?
Yes.
https://commitfest.postgresql.org/39/3782/
Regards,
--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION
At Tue, 26 Jul 2022 18:33:04 +0900, Fujii Masao <masao.fujii@oss.nttdata.com> wrote in
I'm not sure the two are similar with each other. The new function
pgfdw_exec_pre_commit() looks like a merger of two isolated code paths
intended to share a seven-line codelet. I feel the code gets a bit
harder to understand after the change. I mildly oppose to this part.If so, we can pgfdw_exec_pre_commit() into two, one is the common
function that sends or executes the command (i.e., calls
do_sql_command_begin() or do_sql_command()), and another is
the function only for toplevel. The latter function calls
the common function and then executes DEALLOCATE ALL things.But this is not the way that other functions like
pgfdw_abort_cleanup()
is implemented. Those functions have both codes for toplevel and
!toplevel (i.e., subxact), and run the processings depending
on the argument "toplevel". So I'm thinking that
pgfdw_exec_pre_commit() implemented in the same way is better.
I didn't see it from that viewpoint but I don't think that
unconditionally justifies other refactoring. If we merge
pgfdw_finish_pre_(sub)?commit_cleanup()s this way, in turn
pgfdw_subxact_callback() and pgfdw_xact_callback() are going to be
almost identical except event IDs to handle. But I don't think we
would want to merge them.
A concern on 0002 is that it is hiding the subxact-specific steps from
the subxact callback. It would look reasonable if it were called from
two or more places for each topleve and !toplevel, but actually it has
only one caller for each. So I think that pgfdw_exec_pre_commit
should not do that and should be renamed to pgfdw_commit_remote() or
something. On the other hand pgfdw_finish_pre_commit() hides
toplevel-specific steps from the caller so the same argument holds.
Another point that makes me concern about the patch is the new
function takes an SQL statement, along with the toplevel flag. I guess
the reason is that the command for subxact (RELEASE SAVEPOINT %d)
requires the current transaction level. However, the values
isobtainable very cheap within the cleanup functions. So I propose to
get rid of the parameter "sql" from the two functions.
regards.
--
Kyotaro Horiguchi
NTT Open Source Software Center
On Tue, Jul 26, 2022 at 7:46 PM Fujii Masao <masao.fujii@oss.nttdata.com> wrote:
On 2022/07/26 19:26, Etsuro Fujita wrote:
Could you add this to the next CF?
Thanks!
Best regards,
Etsuro Fujita
On 2022/07/27 10:36, Kyotaro Horiguchi wrote:
At Tue, 26 Jul 2022 18:33:04 +0900, Fujii Masao <masao.fujii@oss.nttdata.com> wrote in
I'm not sure the two are similar with each other. The new function
pgfdw_exec_pre_commit() looks like a merger of two isolated code paths
intended to share a seven-line codelet. I feel the code gets a bit
harder to understand after the change. I mildly oppose to this part.If so, we can pgfdw_exec_pre_commit() into two, one is the common
function that sends or executes the command (i.e., calls
do_sql_command_begin() or do_sql_command()), and another is
the function only for toplevel. The latter function calls
the common function and then executes DEALLOCATE ALL things.But this is not the way that other functions like
pgfdw_abort_cleanup()
is implemented. Those functions have both codes for toplevel and
!toplevel (i.e., subxact), and run the processings depending
on the argument "toplevel". So I'm thinking that
pgfdw_exec_pre_commit() implemented in the same way is better.I didn't see it from that viewpoint but I don't think that
unconditionally justifies other refactoring. If we merge
pgfdw_finish_pre_(sub)?commit_cleanup()s this way, in turn
pgfdw_subxact_callback() and pgfdw_xact_callback() are going to be
almost identical except event IDs to handle. But I don't think we
would want to merge them.
I don't think they are so identical because (as you say) they have to handle different event IDs. So I agree we don't want to merge them.
A concern on 0002 is that it is hiding the subxact-specific steps from
the subxact callback. It would look reasonable if it were called from
two or more places for each topleve and !toplevel, but actually it has
only one caller for each. So I think that pgfdw_exec_pre_commit
should not do that and should be renamed to pgfdw_commit_remote() or
something. On the other hand pgfdw_finish_pre_commit() hides
toplevel-specific steps from the caller so the same argument holds.
So you conclusion is to rename pgfdw_exec_pre_commit() to pgfdw_commit_remote() or something?
Another point that makes me concern about the patch is the new
function takes an SQL statement, along with the toplevel flag. I guess
the reason is that the command for subxact (RELEASE SAVEPOINT %d)
requires the current transaction level. However, the values
isobtainable very cheap within the cleanup functions. So I propose to
get rid of the parameter "sql" from the two functions.
Yes, that's possible. That is, pgfdw_exec_pre_commit() can construct the query string by executing the following codes, instead of accepting the query as an argument. But one downside of this approach is that the following codes are executed for every remote subtransaction entries. Maybe it's cheap to construct the query string as follows, but I'd like to avoid any unnecessray overhead if possible. So the patch makes the caller, pgfdw_subxact_callback(), construct the query string only once and give it to pgfdw_exec_pre_commit().
curlevel = GetCurrentTransactionNestLevel();
snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
Regards,
--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION
At Thu, 28 Jul 2022 15:26:42 +0900, Fujii Masao <masao.fujii@oss.nttdata.com> wrote in
On 2022/07/27 10:36, Kyotaro Horiguchi wrote:
At Tue, 26 Jul 2022 18:33:04 +0900, Fujii Masao
<masao.fujii@oss.nttdata.com> wrote in
I didn't see it from that viewpoint but I don't think that
unconditionally justifies other refactoring. If we merge
pgfdw_finish_pre_(sub)?commit_cleanup()s this way, in turn
pgfdw_subxact_callback() and pgfdw_xact_callback() are going to be
almost identical except event IDs to handle. But I don't think we
would want to merge them.I don't think they are so identical because (as you say) they have to
handle different event IDs. So I agree we don't want to merge them.
The xact_callback and subxact_callback handles different sets of event
IDs so they can be merged into one switch(). I don't think there's
much difference from merging the functions for xact and subxact into
one rountine then calling it with a flag to chose one of the two
paths. (Even though less-than-half lines of the fuction are shared..)
However, still I don't think they ought to be merged.
A concern on 0002 is that it is hiding the subxact-specific steps from
the subxact callback. It would look reasonable if it were called from
two or more places for each topleve and !toplevel, but actually it has
only one caller for each. So I think that pgfdw_exec_pre_commit
should not do that and should be renamed to pgfdw_commit_remote() or
something. On the other hand pgfdw_finish_pre_commit() hides
toplevel-specific steps from the caller so the same argument holds.So you conclusion is to rename pgfdw_exec_pre_commit() to
pgfdw_commit_remote() or something?
And the remote stuff is removed from the function. That being said, I
don't mean to fight this no longer since that is rather a matter of
taste.
Another point that makes me concern about the patch is the new
function takes an SQL statement, along with the toplevel flag. I guess
the reason is that the command for subxact (RELEASE SAVEPOINT %d)
requires the current transaction level. However, the values
isobtainable very cheap within the cleanup functions. So I propose to
get rid of the parameter "sql" from the two functions.Yes, that's possible. That is, pgfdw_exec_pre_commit() can construct
the query string by executing the following codes, instead of
accepting the query as an argument. But one downside of this approach
is that the following codes are executed for every remote
subtransaction entries. Maybe it's cheap to construct the query string
as follows, but I'd like to avoid any unnecessray overhead if
possible. So the patch makes the caller, pgfdw_subxact_callback(),
construct the query string only once and give it to
pgfdw_exec_pre_commit().curlevel = GetCurrentTransactionNestLevel();
snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
That *overhead* has been there and I'm not sure how much actual impact
it gives on performance (comparing to the surrounding code). But I
would choose leaving it open-coded as-is than turning it into a
function that need two tightly-bonded parameters passed and that also
tightly bonded to the caller via the parameters. ...In other words,
the original code doesn't seem to meet the requirement for a function.
However, it's okay if you prefer the functions than the open-coded
lines based on the above discussion, I'd stop objecting.
regards.
--
Kyotaro Horiguchi
NTT Open Source Software Center
On Thu, Jul 28, 2022 at 11:56 AM Fujii Masao
<masao.fujii@oss.nttdata.com> wrote:
On 2022/07/27 10:36, Kyotaro Horiguchi wrote:
At Tue, 26 Jul 2022 18:33:04 +0900, Fujii Masao <masao.fujii@oss.nttdata.com> wrote in
I'm not sure the two are similar with each other. The new function
pgfdw_exec_pre_commit() looks like a merger of two isolated code paths
intended to share a seven-line codelet. I feel the code gets a bit
harder to understand after the change. I mildly oppose to this part.If so, we can pgfdw_exec_pre_commit() into two, one is the common
function that sends or executes the command (i.e., calls
do_sql_command_begin() or do_sql_command()), and another is
the function only for toplevel. The latter function calls
the common function and then executes DEALLOCATE ALL things.But this is not the way that other functions like
pgfdw_abort_cleanup()
is implemented. Those functions have both codes for toplevel and
!toplevel (i.e., subxact), and run the processings depending
on the argument "toplevel". So I'm thinking that
pgfdw_exec_pre_commit() implemented in the same way is better.I didn't see it from that viewpoint but I don't think that
unconditionally justifies other refactoring. If we merge
pgfdw_finish_pre_(sub)?commit_cleanup()s this way, in turn
pgfdw_subxact_callback() and pgfdw_xact_callback() are going to be
almost identical except event IDs to handle. But I don't think we
would want to merge them.I don't think they are so identical because (as you say) they have to handle different event IDs. So I agree we don't want to merge them.
A concern on 0002 is that it is hiding the subxact-specific steps from
the subxact callback. It would look reasonable if it were called from
two or more places for each topleve and !toplevel, but actually it has
only one caller for each. So I think that pgfdw_exec_pre_commit
should not do that and should be renamed to pgfdw_commit_remote() or
something. On the other hand pgfdw_finish_pre_commit() hides
toplevel-specific steps from the caller so the same argument holds.So you conclusion is to rename pgfdw_exec_pre_commit() to pgfdw_commit_remote() or something?
Another point that makes me concern about the patch is the new
function takes an SQL statement, along with the toplevel flag. I guess
the reason is that the command for subxact (RELEASE SAVEPOINT %d)
requires the current transaction level. However, the values
isobtainable very cheap within the cleanup functions. So I propose to
get rid of the parameter "sql" from the two functions.Yes, that's possible. That is, pgfdw_exec_pre_commit() can construct the query string by executing the following codes, instead of accepting the query as an argument. But one downside of this approach is that the following codes are executed for every remote subtransaction entries. Maybe it's cheap to construct the query string as follows, but I'd like to avoid any unnecessray overhead if possible. So the patch makes the caller, pgfdw_subxact_callback(), construct the query string only once and give it to pgfdw_exec_pre_commit().
curlevel = GetCurrentTransactionNestLevel();
snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
Another possibility I can see is that instead of calling
pgfdw_exec_pre_commit() (similarly pgfdw_abort_cleanup) for every
connection entry, we should call that once from the callback function,
and for that we need to move the hash table loop inside that function.
The structure of the callback function looks a little fuzzy to me
where the same event is checked for every entry of the connection hash
table. Instead of simply move that loop should be inside those
function (e.g. pgfdw_exec_pre_commit and pgfdw_abort_cleanup), and let
called those function called once w.r.t to event and that function
should take care of every entry of the connection hash table. The
benefit is that we would save a few processing cycles that needed to
match events and call the same function for each connection entry.
I tried this refactoring in 0004 patch which is not complete, and
reattaching other patches too to make CFboat happy.
Thoughts? Suggestions?
Regards,
Amul
Attachments:
v2-0003-Merge-pgfdw_finish_pre_commit_cleanup-and-pgfdw_f.patchapplication/x-patch; name=v2-0003-Merge-pgfdw_finish_pre_commit_cleanup-and-pgfdw_f.patchDownload
From d6e241cb946afe3b74c2893bda6dab8d3288716b Mon Sep 17 00:00:00 2001
From: Fujii Masao <fujii@postgresql.org>
Date: Mon, 25 Jul 2022 23:27:14 +0900
Subject: [PATCH v2 3/4] Merge pgfdw_finish_pre_commit_cleanup and
pgfdw_finish_pre_subcommit_cleanup into one.
---
contrib/postgres_fdw/connection.c | 78 ++++++++++---------------------
1 file changed, 25 insertions(+), 53 deletions(-)
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index ec290459be3..6e23046ad69 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -113,9 +113,8 @@ static bool pgfdw_get_result_timed(PGconn *conn, TimestampTz endtime,
static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
static bool pgfdw_exec_pre_commit(ConnCacheEntry *entry, const char *sql,
List **pending_entries, bool toplevel);
-static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
-static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
- int curlevel);
+static void pgfdw_finish_pre_commit(List *pending_entries, const char *sql,
+ bool toplevel);
static bool UserMappingPasswordRequired(UserMapping *user);
static bool disconnect_cached_connections(Oid serverid);
@@ -954,7 +953,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
{
Assert(event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
event == XACT_EVENT_PRE_COMMIT);
- pgfdw_finish_pre_commit_cleanup(pending_entries);
+ pgfdw_finish_pre_commit(pending_entries, "COMMIT TRANSACTION", true);
}
/*
@@ -1031,7 +1030,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
if (pending_entries)
{
Assert(event == SUBXACT_EVENT_PRE_COMMIT_SUB);
- pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
+ pgfdw_finish_pre_commit(pending_entries, sql, false);
}
}
@@ -1523,11 +1522,14 @@ pgfdw_exec_pre_commit(ConnCacheEntry *entry, const char *sql,
}
/*
- * Finish pre-commit cleanup of connections on each of which we've sent a
- * COMMIT command to the remote server.
+ * Wait for all remote transactions or subtransactions to be committed
+ * and finish pre-commit.
+ *
+ * "toplevel" should be set to true if toplevel (main) transaction is
+ * committed, false otherwise.
*/
static void
-pgfdw_finish_pre_commit_cleanup(List *pending_entries)
+pgfdw_finish_pre_commit(List *pending_entries, const char *sql, bool toplevel)
{
ConnCacheEntry *entry;
List *pending_deallocs = NIL;
@@ -1536,7 +1538,8 @@ pgfdw_finish_pre_commit_cleanup(List *pending_entries)
Assert(pending_entries);
/*
- * Get the result of the COMMIT command for each of the pending entries
+ * Get the result of COMMIT or RELEASE command for each of the pending
+ * entries.
*/
foreach(lc, pending_entries)
{
@@ -1548,23 +1551,26 @@ pgfdw_finish_pre_commit_cleanup(List *pending_entries)
* We might already have received the result on the socket, so pass
* consume_input=true to try to consume it first
*/
- do_sql_command_end(entry->conn, "COMMIT TRANSACTION", true);
+ do_sql_command_end(entry->conn, sql, true);
entry->changing_xact_state = false;
/* Do a DEALLOCATE ALL in parallel if needed */
- if (entry->have_prep_stmt && entry->have_error)
+ if (toplevel)
{
- /* Ignore errors (see notes in pgfdw_xact_callback) */
- if (PQsendQuery(entry->conn, "DEALLOCATE ALL"))
+ if (entry->have_prep_stmt && entry->have_error)
{
- pending_deallocs = lappend(pending_deallocs, entry);
- continue;
+ /* Ignore errors (see notes in pgfdw_xact_callback) */
+ if (PQsendQuery(entry->conn, "DEALLOCATE ALL"))
+ {
+ pending_deallocs = lappend(pending_deallocs, entry);
+ continue;
+ }
}
+ entry->have_prep_stmt = false;
+ entry->have_error = false;
}
- entry->have_prep_stmt = false;
- entry->have_error = false;
- pgfdw_reset_xact_state(entry, true);
+ pgfdw_reset_xact_state(entry, toplevel);
}
/* No further work if no pending entries */
@@ -1588,41 +1594,7 @@ pgfdw_finish_pre_commit_cleanup(List *pending_entries)
entry->have_prep_stmt = false;
entry->have_error = false;
- pgfdw_reset_xact_state(entry, true);
- }
-}
-
-/*
- * Finish pre-subcommit cleanup of connections on each of which we've sent a
- * RELEASE command to the remote server.
- */
-static void
-pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
-{
- ConnCacheEntry *entry;
- char sql[100];
- ListCell *lc;
-
- Assert(pending_entries);
-
- /*
- * Get the result of the RELEASE command for each of the pending entries
- */
- snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
- foreach(lc, pending_entries)
- {
- entry = (ConnCacheEntry *) lfirst(lc);
-
- Assert(entry->changing_xact_state);
-
- /*
- * We might already have received the result on the socket, so pass
- * consume_input=true to try to consume it first
- */
- do_sql_command_end(entry->conn, sql, true);
- entry->changing_xact_state = false;
-
- pgfdw_reset_xact_state(entry, false);
+ pgfdw_reset_xact_state(entry, toplevel);
}
}
--
2.18.0
v2-0002-Add-common-function-to-commit-xact-or-subxact-dur.patchapplication/x-patch; name=v2-0002-Add-common-function-to-commit-xact-or-subxact-dur.patchDownload
From 087d1aa83850577ee75792e78f310fe45e3240dd Mon Sep 17 00:00:00 2001
From: Fujii Masao <fujii@postgresql.org>
Date: Mon, 25 Jul 2022 22:45:06 +0900
Subject: [PATCH v2 2/4] Add common function to commit xact or subxact during
pre-commit.
---
contrib/postgres_fdw/connection.c | 122 ++++++++++++++++--------------
1 file changed, 66 insertions(+), 56 deletions(-)
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index cbee2854803..ec290459be3 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -111,6 +111,8 @@ static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
static bool pgfdw_get_result_timed(PGconn *conn, TimestampTz endtime,
PGresult **result, bool *timed_out);
static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
+static bool pgfdw_exec_pre_commit(ConnCacheEntry *entry, const char *sql,
+ List **pending_entries, bool toplevel);
static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
int curlevel);
@@ -894,8 +896,6 @@ pgfdw_xact_callback(XactEvent event, void *arg)
hash_seq_init(&scan, ConnectionHash);
while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
{
- PGresult *res;
-
/* Ignore cache entry if no open connection right now */
if (entry->conn == NULL)
continue;
@@ -911,45 +911,10 @@ pgfdw_xact_callback(XactEvent event, void *arg)
case XACT_EVENT_PARALLEL_PRE_COMMIT:
case XACT_EVENT_PRE_COMMIT:
- /*
- * If abort cleanup previously failed for this connection,
- * we can't issue any more commands against it.
- */
- pgfdw_reject_incomplete_xact_state_change(entry);
-
/* Commit all remote transactions during pre-commit */
- entry->changing_xact_state = true;
- if (entry->parallel_commit)
- {
- do_sql_command_begin(entry->conn, "COMMIT TRANSACTION");
- pending_entries = lappend(pending_entries, entry);
+ if (pgfdw_exec_pre_commit(entry, "COMMIT TRANSACTION",
+ &pending_entries, true))
continue;
- }
- do_sql_command(entry->conn, "COMMIT TRANSACTION");
- entry->changing_xact_state = false;
-
- /*
- * If there were any errors in subtransactions, and we
- * made prepared statements, do a DEALLOCATE ALL to make
- * sure we get rid of all prepared statements. This is
- * annoying and not terribly bulletproof, but it's
- * probably not worth trying harder.
- *
- * DEALLOCATE ALL only exists in 8.3 and later, so this
- * constrains how old a server postgres_fdw can
- * communicate with. We intentionally ignore errors in
- * the DEALLOCATE, so that we can hobble along to some
- * extent with older servers (leaking prepared statements
- * as we go; but we don't really support update operations
- * pre-8.3 anyway).
- */
- if (entry->have_prep_stmt && entry->have_error)
- {
- res = PQexec(entry->conn, "DEALLOCATE ALL");
- PQclear(res);
- }
- entry->have_prep_stmt = false;
- entry->have_error = false;
break;
case XACT_EVENT_PRE_PREPARE:
@@ -1014,6 +979,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
ConnCacheEntry *entry;
int curlevel;
List *pending_entries = NIL;
+ char sql[100];
/* Nothing to do at subxact start, nor after commit. */
if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
@@ -1029,11 +995,11 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
* of the current level, and close them.
*/
curlevel = GetCurrentTransactionNestLevel();
+ snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
+
hash_seq_init(&scan, ConnectionHash);
while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
{
- char sql[100];
-
/*
* We only care about connections with open remote subtransactions of
* the current level.
@@ -1047,23 +1013,9 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
{
- /*
- * If abort cleanup previously failed for this connection, we
- * can't issue any more commands against it.
- */
- pgfdw_reject_incomplete_xact_state_change(entry);
-
/* Commit all remote subtransactions during pre-commit */
- snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
- entry->changing_xact_state = true;
- if (entry->parallel_commit)
- {
- do_sql_command_begin(entry->conn, sql);
- pending_entries = lappend(pending_entries, entry);
+ if (pgfdw_exec_pre_commit(entry, sql, &pending_entries, false))
continue;
- }
- do_sql_command(entry->conn, sql);
- entry->changing_xact_state = false;
}
else
{
@@ -1512,6 +1464,64 @@ pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
entry->changing_xact_state = false;
}
+/*
+ * Commit all remote transactions or subtransactions during pre-commit.
+ *
+ * If parallel_commit is enabled at this connection cache entry and
+ * the result of "sql" needs to be gotten later, return true and append
+ * this entry to "pending_entries".
+ *
+ * "toplevel" should be set to true if toplevel (main) transaction is
+ * committed, false otherwise.
+ */
+static bool
+pgfdw_exec_pre_commit(ConnCacheEntry *entry, const char *sql,
+ List **pending_entries, bool toplevel)
+{
+ PGresult *res;
+
+ /*
+ * If abort cleanup previously failed for this connection, we can't issue
+ * any more commands against it.
+ */
+ pgfdw_reject_incomplete_xact_state_change(entry);
+
+ entry->changing_xact_state = true;
+ if (entry->parallel_commit)
+ {
+ do_sql_command_begin(entry->conn, sql);
+ *pending_entries = lappend(*pending_entries, entry);
+ return true;
+ }
+ do_sql_command(entry->conn, sql);
+ entry->changing_xact_state = false;
+
+ if (!toplevel)
+ return false;
+
+ /*
+ * If there were any errors in subtransactions, and we made prepared
+ * statements, do a DEALLOCATE ALL to make sure we get rid of all prepared
+ * statements. This is annoying and not terribly bulletproof, but it's
+ * probably not worth trying harder.
+ *
+ * DEALLOCATE ALL only exists in 8.3 and later, so this constrains how old
+ * a server postgres_fdw can communicate with. We intentionally ignore
+ * errors in the DEALLOCATE, so that we can hobble along to some extent
+ * with older servers (leaking prepared statements as we go; but we don't
+ * really support update operations pre-8.3 anyway).
+ */
+ if (entry->have_prep_stmt && entry->have_error)
+ {
+ res = PQexec(entry->conn, "DEALLOCATE ALL");
+ PQclear(res);
+ }
+ entry->have_prep_stmt = false;
+ entry->have_error = false;
+
+ return false;
+}
+
/*
* Finish pre-commit cleanup of connections on each of which we've sent a
* COMMIT command to the remote server.
--
2.18.0
v2-0004-TRIAL-cleanup-xact-callback-WIP.patchapplication/x-patch; name=v2-0004-TRIAL-cleanup-xact-callback-WIP.patchDownload
From f8aaf91d0fdb88c6726c45f924d8dea19bf98cff Mon Sep 17 00:00:00 2001
From: Amul Sul <amul.sul@enterprisedb.com>
Date: Wed, 3 Aug 2022 10:17:09 -0400
Subject: [PATCH v2 4/4] TRIAL: cleanup xact callback - WIP
---
contrib/postgres_fdw/connection.c | 420 +++++++++++++++---------------
1 file changed, 213 insertions(+), 207 deletions(-)
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 6e23046ad69..181d2f83e32 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -110,9 +110,8 @@ static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
bool ignore_errors);
static bool pgfdw_get_result_timed(PGconn *conn, TimestampTz endtime,
PGresult **result, bool *timed_out);
-static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
-static bool pgfdw_exec_pre_commit(ConnCacheEntry *entry, const char *sql,
- List **pending_entries, bool toplevel);
+static void pgfdw_abort_cleanup(bool toplevel);
+static void pgfdw_exec_pre_commit(bool toplevel);
static void pgfdw_finish_pre_commit(List *pending_entries, const char *sql,
bool toplevel);
static bool UserMappingPasswordRequired(UserMapping *user);
@@ -880,80 +879,47 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
static void
pgfdw_xact_callback(XactEvent event, void *arg)
{
- HASH_SEQ_STATUS scan;
- ConnCacheEntry *entry;
- List *pending_entries = NIL;
-
/* Quick exit if no connections were touched in this transaction. */
if (!xact_got_connection)
return;
- /*
- * Scan all connection cache entries to find open remote transactions, and
- * close them.
- */
- hash_seq_init(&scan, ConnectionHash);
- while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+ switch (event)
{
- /* Ignore cache entry if no open connection right now */
- if (entry->conn == NULL)
- continue;
-
- /* If it has an open remote transaction, try to close it */
- if (entry->xact_depth > 0)
- {
- elog(DEBUG3, "closing remote transaction on connection %p",
- entry->conn);
+ case XACT_EVENT_PARALLEL_PRE_COMMIT:
+ case XACT_EVENT_PRE_COMMIT:
- switch (event)
- {
- case XACT_EVENT_PARALLEL_PRE_COMMIT:
- case XACT_EVENT_PRE_COMMIT:
-
- /* Commit all remote transactions during pre-commit */
- if (pgfdw_exec_pre_commit(entry, "COMMIT TRANSACTION",
- &pending_entries, true))
- continue;
- break;
- case XACT_EVENT_PRE_PREPARE:
-
- /*
- * We disallow any remote transactions, since it's not
- * very reasonable to hold them open until the prepared
- * transaction is committed. For the moment, throw error
- * unconditionally; later we might allow read-only cases.
- * Note that the error will cause us to come right back
- * here with event == XACT_EVENT_ABORT, so we'll clean up
- * the connection state at that point.
- */
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
- break;
- case XACT_EVENT_PARALLEL_COMMIT:
- case XACT_EVENT_COMMIT:
- case XACT_EVENT_PREPARE:
- /* Pre-commit should have closed the open transaction */
- elog(ERROR, "missed cleaning up connection during pre-commit");
- break;
- case XACT_EVENT_PARALLEL_ABORT:
- case XACT_EVENT_ABORT:
- /* Rollback all remote transactions during abort */
- pgfdw_abort_cleanup(entry, true);
- break;
- }
- }
+ /* Commit all remote transactions during pre-commit */
+ pgfdw_exec_pre_commit(true);
+ break;
- /* Reset state to show we're out of a transaction */
- pgfdw_reset_xact_state(entry, true);
- }
+ case XACT_EVENT_PRE_PREPARE:
- /* If there are any pending connections, finish cleaning them up */
- if (pending_entries)
- {
- Assert(event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
- event == XACT_EVENT_PRE_COMMIT);
- pgfdw_finish_pre_commit(pending_entries, "COMMIT TRANSACTION", true);
+ /*
+ * We disallow any remote transactions, since it's not
+ * very reasonable to hold them open until the prepared
+ * transaction is committed. For the moment, throw error
+ * unconditionally; later we might allow read-only cases.
+ * Note that the error will cause us to come right back
+ * here with event == XACT_EVENT_ABORT, so we'll clean up
+ * the connection state at that point.
+ */
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
+ break;
+
+ case XACT_EVENT_PARALLEL_COMMIT:
+ case XACT_EVENT_COMMIT:
+ case XACT_EVENT_PREPARE:
+ /* Pre-commit should have closed the open transaction */
+ elog(ERROR, "missed cleaning up connection during pre-commit");
+ break;
+
+ case XACT_EVENT_PARALLEL_ABORT:
+ case XACT_EVENT_ABORT:
+ /* Rollback all remote transactions during abort */
+ pgfdw_abort_cleanup(true);
+ break;
}
/*
@@ -974,12 +940,6 @@ static void
pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
SubTransactionId parentSubid, void *arg)
{
- HASH_SEQ_STATUS scan;
- ConnCacheEntry *entry;
- int curlevel;
- List *pending_entries = NIL;
- char sql[100];
-
/* Nothing to do at subxact start, nor after commit. */
if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
event == SUBXACT_EVENT_ABORT_SUB))
@@ -989,48 +949,15 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
if (!xact_got_connection)
return;
- /*
- * Scan all connection cache entries to find open remote subtransactions
- * of the current level, and close them.
- */
- curlevel = GetCurrentTransactionNestLevel();
- snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
-
- hash_seq_init(&scan, ConnectionHash);
- while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+ if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
{
- /*
- * We only care about connections with open remote subtransactions of
- * the current level.
- */
- if (entry->conn == NULL || entry->xact_depth < curlevel)
- continue;
-
- if (entry->xact_depth > curlevel)
- elog(ERROR, "missed cleaning up remote subtransaction at level %d",
- entry->xact_depth);
-
- if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
- {
- /* Commit all remote subtransactions during pre-commit */
- if (pgfdw_exec_pre_commit(entry, sql, &pending_entries, false))
- continue;
- }
- else
- {
- /* Rollback all remote subtransactions during abort */
- pgfdw_abort_cleanup(entry, false);
- }
-
- /* OK, we're outta that level of subtransaction */
- pgfdw_reset_xact_state(entry, false);
+ /* Commit all remote subtransactions during pre-commit */
+ pgfdw_exec_pre_commit(false);
}
-
- /* If there are any pending connections, finish cleaning them up */
- if (pending_entries)
+ else
{
- Assert(event == SUBXACT_EVENT_PRE_COMMIT_SUB);
- pgfdw_finish_pre_commit(pending_entries, sql, false);
+ /* Rollback all remote subtransactions during abort */
+ pgfdw_abort_cleanup(false);
}
}
@@ -1394,131 +1321,210 @@ exit: ;
* Set entry->changing_xact_state to false on success, true on failure.
*/
static void
-pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
+pgfdw_abort_cleanup(bool toplevel)
{
+ HASH_SEQ_STATUS scan;
+ ConnCacheEntry *entry;
char sql[100];
/*
- * Don't try to clean up the connection if we're already in error
- * recursion trouble.
+ * Scan all connection cache entries to find open remote transactions, and
+ * close them.
*/
- if (in_error_recursion_trouble())
- entry->changing_xact_state = true;
+ hash_seq_init(&scan, ConnectionHash);
+ while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+ {
+ /* Ignore cache entry if no open connection right now */
+ if (entry->conn == NULL)
+ continue;
- /*
- * If connection is already unsalvageable, don't touch it further.
- */
- if (entry->changing_xact_state)
- return;
+ /* Sanity check for subtransaction */
+ if (!toplevel)
+ {
+ int curlevel = GetCurrentTransactionNestLevel();
- /*
- * Mark this connection as in the process of changing transaction state.
- */
- entry->changing_xact_state = true;
+ if (entry->xact_depth < curlevel)
+ continue;
- /* Assume we might have lost track of prepared statements */
- entry->have_error = true;
+ if (entry->xact_depth > curlevel)
+ elog(ERROR, "missed cleaning up remote subtransaction at level %d",
+ entry->xact_depth);
+ }
- /*
- * If a command has been submitted to the remote server by using an
- * asynchronous execution function, the command might not have yet
- * completed. Check to see if a command is still being processed by the
- * remote server, and if so, request cancellation of the command.
- */
- if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
- !pgfdw_cancel_query(entry->conn))
- return; /* Unable to cancel running query */
+ /* If it has an open remote transaction, try to close it */
+ if (entry->xact_depth > 0)
+ {
+ /*
+ * Don't try to clean up the connection if we're already in error
+ * recursion trouble.
+ */
+ if (in_error_recursion_trouble())
+ entry->changing_xact_state = true;
- if (toplevel)
- snprintf(sql, sizeof(sql), "ABORT TRANSACTION");
- else
- snprintf(sql, sizeof(sql),
- "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
- entry->xact_depth, entry->xact_depth);
- if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
- return; /* Unable to abort remote (sub)transaction */
+ /*
+ * If connection is already unsalvageable, don't touch it further.
+ */
+ if (entry->changing_xact_state)
+ goto xact_abort_end;
- if (toplevel)
- {
- if (entry->have_prep_stmt && entry->have_error &&
- !pgfdw_exec_cleanup_query(entry->conn,
- "DEALLOCATE ALL",
- true))
- return; /* Trouble clearing prepared statements */
+ /*
+ * Mark this connection as in the process of changing transaction state.
+ */
+ entry->changing_xact_state = true;
- entry->have_prep_stmt = false;
- entry->have_error = false;
- }
+ /* Assume we might have lost track of prepared statements */
+ entry->have_error = true;
- /*
- * If pendingAreq of the per-connection state is not NULL, it means that
- * an asynchronous fetch begun by fetch_more_data_begin() was not done
- * successfully and thus the per-connection state was not reset in
- * fetch_more_data(); in that case reset the per-connection state here.
- */
- if (entry->state.pendingAreq)
- memset(&entry->state, 0, sizeof(entry->state));
+ /*
+ * If a command has been submitted to the remote server by using an
+ * asynchronous execution function, the command might not have yet
+ * completed. Check to see if a command is still being processed by the
+ * remote server, and if so, request cancellation of the command.
+ */
+ if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
+ !pgfdw_cancel_query(entry->conn))
+ goto xact_abort_end; /* Unable to cancel running query */
- /* Disarm changing_xact_state if it all worked */
- entry->changing_xact_state = false;
+ if (toplevel)
+ snprintf(sql, sizeof(sql), "ABORT TRANSACTION");
+ else
+ snprintf(sql, sizeof(sql),
+ "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
+ entry->xact_depth, entry->xact_depth);
+ if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
+ goto xact_abort_end; /* Unable to abort remote (sub)transaction */
+
+ if (toplevel)
+ {
+ if (entry->have_prep_stmt && entry->have_error &&
+ !pgfdw_exec_cleanup_query(entry->conn,
+ "DEALLOCATE ALL",
+ true))
+ goto xact_abort_end; /* Trouble clearing prepared statements */
+
+ entry->have_prep_stmt = false;
+ entry->have_error = false;
+ }
+
+ /*
+ * If pendingAreq of the per-connection state is not NULL, it means that
+ * an asynchronous fetch begun by fetch_more_data_begin() was not done
+ * successfully and thus the per-connection state was not reset in
+ * fetch_more_data(); in that case reset the per-connection state here.
+ */
+ if (entry->state.pendingAreq)
+ memset(&entry->state, 0, sizeof(entry->state));
+
+ /* Disarm changing_xact_state if it all worked */
+ entry->changing_xact_state = false;
+ }
+
+xact_abort_end:
+ /* Reset state to show we're out of a transaction */
+ pgfdw_reset_xact_state(entry, toplevel);
+ }
}
/*
* Commit all remote transactions or subtransactions during pre-commit.
*
- * If parallel_commit is enabled at this connection cache entry and
- * the result of "sql" needs to be gotten later, return true and append
- * this entry to "pending_entries".
- *
* "toplevel" should be set to true if toplevel (main) transaction is
* committed, false otherwise.
*/
-static bool
-pgfdw_exec_pre_commit(ConnCacheEntry *entry, const char *sql,
- List **pending_entries, bool toplevel)
+static void
+pgfdw_exec_pre_commit(bool toplevel)
{
- PGresult *res;
-
- /*
- * If abort cleanup previously failed for this connection, we can't issue
- * any more commands against it.
- */
- pgfdw_reject_incomplete_xact_state_change(entry);
+ HASH_SEQ_STATUS scan;
+ ConnCacheEntry *entry;
+ List *pending_entries = NIL;
+ int curlevel = 0;
+ char sql[100];
- entry->changing_xact_state = true;
- if (entry->parallel_commit)
+ /* Form SQL query */
+ if (toplevel)
{
- do_sql_command_begin(entry->conn, sql);
- *pending_entries = lappend(*pending_entries, entry);
- return true;
+ snprintf(sql, sizeof(sql), "COMMIT TRANSACTION");
+ }
+ else
+ {
+ curlevel = GetCurrentTransactionNestLevel();
+ snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
}
- do_sql_command(entry->conn, sql);
- entry->changing_xact_state = false;
-
- if (!toplevel)
- return false;
/*
- * If there were any errors in subtransactions, and we made prepared
- * statements, do a DEALLOCATE ALL to make sure we get rid of all prepared
- * statements. This is annoying and not terribly bulletproof, but it's
- * probably not worth trying harder.
- *
- * DEALLOCATE ALL only exists in 8.3 and later, so this constrains how old
- * a server postgres_fdw can communicate with. We intentionally ignore
- * errors in the DEALLOCATE, so that we can hobble along to some extent
- * with older servers (leaking prepared statements as we go; but we don't
- * really support update operations pre-8.3 anyway).
+ * Scan all connection cache entries to find open remote transactions, and
+ * close them.
*/
- if (entry->have_prep_stmt && entry->have_error)
+ hash_seq_init(&scan, ConnectionHash);
+ while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
{
- res = PQexec(entry->conn, "DEALLOCATE ALL");
- PQclear(res);
+ /* Ignore cache entry if no open connection right now */
+ if (entry->conn == NULL)
+ continue;
+
+ /* Sanity check for subtransaction */
+ if (!toplevel)
+ {
+ if (entry->xact_depth < curlevel)
+ continue;
+
+ if (entry->xact_depth > curlevel)
+ elog(ERROR, "missed cleaning up remote subtransaction at level %d",
+ entry->xact_depth);
+ }
+
+ /* If it has an open remote transaction, try to close it */
+ if (entry->xact_depth > 0)
+ {
+ /*
+ * If abort cleanup previously failed for this connection, we can't issue
+ * any more commands against it.
+ */
+ pgfdw_reject_incomplete_xact_state_change(entry);
+
+ entry->changing_xact_state = true;
+ if (entry->parallel_commit)
+ {
+ do_sql_command_begin(entry->conn, sql);
+ pending_entries = lappend(pending_entries, entry);
+ continue;
+ }
+ do_sql_command(entry->conn, sql);
+ entry->changing_xact_state = false;
+
+ if (toplevel)
+ {
+ /*
+ * If there were any errors in subtransactions, and we made prepared
+ * statements, do a DEALLOCATE ALL to make sure we get rid of all prepared
+ * statements. This is annoying and not terribly bulletproof, but it's
+ * probably not worth trying harder.
+ *
+ * DEALLOCATE ALL only exists in 8.3 and later, so this constrains how old
+ * a server postgres_fdw can communicate with. We intentionally ignore
+ * errors in the DEALLOCATE, so that we can hobble along to some extent
+ * with older servers (leaking prepared statements as we go; but we don't
+ * really support update operations pre-8.3 anyway).
+ */
+ if (entry->have_prep_stmt && entry->have_error)
+ {
+ PGresult *res;
+
+ res = PQexec(entry->conn, "DEALLOCATE ALL");
+ PQclear(res);
+ }
+ entry->have_prep_stmt = false;
+ entry->have_error = false;
+ }
+ }
+
+ /* Reset state to show we're out of a transaction */
+ pgfdw_reset_xact_state(entry, toplevel);
}
- entry->have_prep_stmt = false;
- entry->have_error = false;
- return false;
+ /* If there are any pending connections, finish cleaning them up */
+ if (pending_entries)
+ pgfdw_finish_pre_commit(pending_entries, sql, toplevel);
}
/*
--
2.18.0
v2-0001-Refactor-pgfdw_get_result-and-pgfdw_get_cleanup_r.patchapplication/x-patch; name=v2-0001-Refactor-pgfdw_get_result-and-pgfdw_get_cleanup_r.patchDownload
From 79068f8f60c04612bd4a086e001a6b0aa11daa12 Mon Sep 17 00:00:00 2001
From: Fujii Masao <fujii@postgresql.org>
Date: Mon, 25 Jul 2022 17:25:24 +0900
Subject: [PATCH v2 1/4] Refactor pgfdw_get_result() and
pgfdw_get_cleanup_result().
---
contrib/postgres_fdw/connection.c | 125 +++++++++++-------------------
1 file changed, 47 insertions(+), 78 deletions(-)
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 939d114f02e..cbee2854803 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -108,8 +108,8 @@ static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
static bool pgfdw_cancel_query(PGconn *conn);
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
bool ignore_errors);
-static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
- PGresult **result, bool *timed_out);
+static bool pgfdw_get_result_timed(PGconn *conn, TimestampTz endtime,
+ PGresult **result, bool *timed_out);
static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
@@ -799,53 +799,12 @@ pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
PGresult *
pgfdw_get_result(PGconn *conn, const char *query)
{
- PGresult *volatile last_res = NULL;
-
- /* In what follows, do not leak any PGresults on an error. */
- PG_TRY();
- {
- for (;;)
- {
- PGresult *res;
-
- while (PQisBusy(conn))
- {
- int wc;
-
- /* Sleep until there's something to do */
- wc = WaitLatchOrSocket(MyLatch,
- WL_LATCH_SET | WL_SOCKET_READABLE |
- WL_EXIT_ON_PM_DEATH,
- PQsocket(conn),
- -1L, PG_WAIT_EXTENSION);
- ResetLatch(MyLatch);
-
- CHECK_FOR_INTERRUPTS();
-
- /* Data available in socket? */
- if (wc & WL_SOCKET_READABLE)
- {
- if (!PQconsumeInput(conn))
- pgfdw_report_error(ERROR, NULL, conn, false, query);
- }
- }
-
- res = PQgetResult(conn);
- if (res == NULL)
- break; /* query is complete */
+ PGresult *result = NULL;
- PQclear(last_res);
- last_res = res;
- }
- }
- PG_CATCH();
- {
- PQclear(last_res);
- PG_RE_THROW();
- }
- PG_END_TRY();
+ if (pgfdw_get_result_timed(conn, 0, &result, NULL))
+ pgfdw_report_error(ERROR, NULL, conn, false, query);
- return last_res;
+ return result;
}
/*
@@ -1295,7 +1254,7 @@ pgfdw_cancel_query(PGconn *conn)
}
/* Get and discard the result of the query. */
- if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
+ if (pgfdw_get_result_timed(conn, endtime, &result, &timed_out))
{
if (timed_out)
ereport(WARNING,
@@ -1351,7 +1310,7 @@ pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
}
/* Get the result of the query. */
- if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
+ if (pgfdw_get_result_timed(conn, endtime, &result, &timed_out))
{
if (timed_out)
ereport(WARNING,
@@ -1375,24 +1334,33 @@ pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
}
/*
- * Get, during abort cleanup, the result of a query that is in progress. This
- * might be a query that is being interrupted by transaction abort, or it might
- * be a query that was initiated as part of transaction abort to get the remote
- * side back to the appropriate state.
+ * Get the result of a query.
+ *
+ * This function offers quick responsiveness by checking for any interruptions.
+ *
+ * If timed_out is NULL, the timeout does not occur. Otherwise, the timeout is
+ * enabled and endtime is used as the time at which this function should
+ * give up and assume the remote side is dead.
+ *
+ * Return true if the timeout expired or connection trouble occurred. Otherwise
+ * return false and set *result to the last result of a query. Set timed_out to
+ * true only when the timeout expired.
+ *
+ * This function emulates PQexec()'s behavior of returning the last result
+ * when there are many.
+ *
+ * Caller is responsible for the error handling on the result.
*
- * endtime is the time at which we should give up and assume the remote
- * side is dead. Returns true if the timeout expired or connection trouble
- * occurred, false otherwise. Sets *result except in case of a timeout.
- * Sets timed_out to true only when the timeout expired.
*/
static bool
-pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result,
- bool *timed_out)
+pgfdw_get_result_timed(PGconn *conn, TimestampTz endtime, PGresult **result,
+ bool *timed_out)
{
volatile bool failed = false;
PGresult *volatile last_res = NULL;
- *timed_out = false;
+ if (timed_out != NULL)
+ *timed_out = false;
/* In what follows, do not leak any PGresults on an error. */
PG_TRY();
@@ -1404,23 +1372,27 @@ pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result,
while (PQisBusy(conn))
{
int wc;
- TimestampTz now = GetCurrentTimestamp();
- long cur_timeout;
+ long cur_timeout = -1;
+ int wakeEvents = WL_LATCH_SET | WL_SOCKET_READABLE |
+ WL_EXIT_ON_PM_DEATH;
/* If timeout has expired, give up, else get sleep time. */
- cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
- if (cur_timeout <= 0)
+ if (timed_out != NULL)
{
- *timed_out = true;
- failed = true;
- goto exit;
+ TimestampTz now = GetCurrentTimestamp();
+
+ cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
+ if (cur_timeout <= 0)
+ {
+ *timed_out = true;
+ failed = true;
+ goto exit;
+ }
+ wakeEvents |= WL_TIMEOUT;
}
/* Sleep until there's something to do */
- wc = WaitLatchOrSocket(MyLatch,
- WL_LATCH_SET | WL_SOCKET_READABLE |
- WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
- PQsocket(conn),
+ wc = WaitLatchOrSocket(MyLatch, wakeEvents, PQsocket(conn),
cur_timeout, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
@@ -1458,6 +1430,7 @@ exit: ;
PQclear(last_res);
else
*result = last_res;
+
return failed;
}
@@ -1599,13 +1572,9 @@ pgfdw_finish_pre_commit_cleanup(List *pending_entries)
entry = (ConnCacheEntry *) lfirst(lc);
/* Ignore errors (see notes in pgfdw_xact_callback) */
- while ((res = PQgetResult(entry->conn)) != NULL)
- {
- PQclear(res);
- /* Stop if the connection is lost (else we'll loop infinitely) */
- if (PQstatus(entry->conn) == CONNECTION_BAD)
- break;
- }
+ pgfdw_get_result_timed(entry->conn, 0, &res, NULL);
+ PQclear(res);
+
entry->have_prep_stmt = false;
entry->have_error = false;
--
2.18.0
On Tue, Jul 26, 2022 at 4:25 PM Kyotaro Horiguchi
<horikyota.ntt@gmail.com> wrote:
At Tue, 26 Jul 2022 00:54:47 +0900, Fujii Masao <masao.fujii@oss.nttdata.com> wrote in
There are two functions, pgfdw_get_result() and
pgfdw_get_cleanup_result(),
to get a query result. They have almost the same code, call
PQisBusy(),
WaitLatchOrSocket(), PQconsumeInput() and PQgetResult() in the loop,
but only pgfdw_get_cleanup_result() allows its callers to specify the
timeout.
0001 patch transforms pgfdw_get_cleanup_result() to the common
function
to get a query result and makes pgfdw_get_result() use it instead of
its own (duplicate) code. The patch also renames
pgfdw_get_cleanup_result()
to pgfdw_get_result_timed().Agree to that refactoring.
+1 for that refactoring. Here are a few comments about the 0001 patch:
I'm not sure it's a good idea to change the function's name, because
that would make backpatching hard. To avoid that, how about
introducing a workhorse function for pgfdw_get_result and
pgfdw_get_cleanup_result, based on the latter function as you
proposed, and modifying the two functions so that they call the
workhorse function?
@@ -1599,13 +1572,9 @@ pgfdw_finish_pre_commit_cleanup(List *pending_entries)
entry = (ConnCacheEntry *) lfirst(lc);
/* Ignore errors (see notes in pgfdw_xact_callback) */
- while ((res = PQgetResult(entry->conn)) != NULL)
- {
- PQclear(res);
- /* Stop if the connection is lost (else we'll loop infinitely) */
- if (PQstatus(entry->conn) == CONNECTION_BAD)
- break;
- }
+ pgfdw_get_result_timed(entry->conn, 0, &res, NULL);
+ PQclear(res);
The existing code prevents interruption, but this would change to
allow it. Do we need this change?
pgfdw_xact_callback() and pgfdw_subxact_callback() have similar codes
to
issue COMMIT or RELEASE SAVEPOINT commands. 0002 patch adds the common
function,
pgfdw_exec_pre_commit(), for that purpose, and changes those functions
so that they use the common one.I'm not sure the two are similar with each other. The new function
pgfdw_exec_pre_commit() looks like a merger of two isolated code paths
intended to share a seven-line codelet. I feel the code gets a bit
harder to understand after the change. I mildly oppose to this part.
I have to agree with Horiguchi-san, because as mentioned by him, 1)
there isn't enough duplicate code in the two bits to justify merging
them into a single function, and 2) the 0002 patch rather just makes
code complicated. The current implementation is easy to understand,
so I'd vote for leaving them alone for now.
(I think the introduction of pgfdw_abort_cleanup is good, because that
de-duplicated much code that existed both in pgfdw_xact_callback and
in pgfdw_subxact_callback, which would outweigh the downside of
pgfdw_abort_cleanup that it made code somewhat complicated due to the
logic to handle both the transaction and subtransaction cases within
that single function. But 0002 is not the case, I think.)
pgfdw_finish_pre_commit_cleanup() and
pgfdw_finish_pre_subcommit_cleanup()
have similar codes to wait for the results of COMMIT or RELEASE
SAVEPOINT commands.
0003 patch adds the common function, pgfdw_finish_pre_commit(), for
that purpose,
and replaces those functions with the common one.
That is, pgfdw_finish_pre_commit_cleanup() and
pgfdw_finish_pre_subcommit_cleanup()
are no longer necessary and 0003 patch removes them.It gives the same feeling with 0002.
I have to agree with Horiguchi-san on this as well; the existing
single-purpose functions are easy to understand, so I'd vote for
leaving them alone.
Sorry for being late to the party.
Best regards,
Etsuro Fujita
On 2022/09/05 15:17, Etsuro Fujita wrote:
+1 for that refactoring. Here are a few comments about the 0001 patch:
Thanks for reviewing the patch!
I'm not sure it's a good idea to change the function's name, because
that would make backpatching hard. To avoid that, how about
introducing a workhorse function for pgfdw_get_result and
pgfdw_get_cleanup_result, based on the latter function as you
proposed, and modifying the two functions so that they call the
workhorse function?
That's possible. We can revive pgfdw_get_cleanup_result() and
make it call pgfdw_get_result_timed(). Also, with the patch,
pgfdw_get_result() already works in that way. But I'm not sure
how much we should be concerned about back-patch "issue"
in this case. We usually rename the internal functions
if new names are better.
@@ -1599,13 +1572,9 @@ pgfdw_finish_pre_commit_cleanup(List *pending_entries)
entry = (ConnCacheEntry *) lfirst(lc);/* Ignore errors (see notes in pgfdw_xact_callback) */ - while ((res = PQgetResult(entry->conn)) != NULL) - { - PQclear(res); - /* Stop if the connection is lost (else we'll loop infinitely) */ - if (PQstatus(entry->conn) == CONNECTION_BAD) - break; - } + pgfdw_get_result_timed(entry->conn, 0, &res, NULL); + PQclear(res);The existing code prevents interruption, but this would change to
allow it. Do we need this change?
You imply that we intentially avoided calling CHECK_FOR_INTERRUPT()
there, don't you? But could you tell me why?
I have to agree with Horiguchi-san, because as mentioned by him, 1)
there isn't enough duplicate code in the two bits to justify merging
them into a single function, and 2) the 0002 patch rather just makes
code complicated. The current implementation is easy to understand,
so I'd vote for leaving them alone for now.(I think the introduction of pgfdw_abort_cleanup is good, because that
de-duplicated much code that existed both in pgfdw_xact_callback and
in pgfdw_subxact_callback, which would outweigh the downside of
pgfdw_abort_cleanup that it made code somewhat complicated due to the
logic to handle both the transaction and subtransaction cases within
that single function. But 0002 is not the case, I think.)
The function pgfdw_exec_pre_commit() that I newly introduced consists
of two parts; issue the transaction-end command based on
parallel_commit setting and issue DEALLOCATE ALL. The first part is
duplicated between pgfdw_xact_callback() and pgfdw_subxact_callback(),
but the second not (i.e., it's used only by pgfdw_xact_callback()).
So how about getting rid of that non duplicated part from
pgfdw_exec_pre_commit()?
It gives the same feeling with 0002.
I have to agree with Horiguchi-san on this as well; the existing
single-purpose functions are easy to understand, so I'd vote for
leaving them alone.
Ok, I will reconsider 0003 patch. BTW, parallel abort patch that
you're proposing seems to add new function pgfdw_finish_abort_cleanup()
with the similar structure as the function added by 0003 patch.
So probably it's helpful for us to consider this together :)
Regards,
--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION
Hi Fujii-san,
On Thu, Sep 15, 2022 at 12:17 AM Fujii Masao
<masao.fujii@oss.nttdata.com> wrote:
On 2022/09/05 15:17, Etsuro Fujita wrote:
I'm not sure it's a good idea to change the function's name, because
that would make backpatching hard. To avoid that, how about
introducing a workhorse function for pgfdw_get_result and
pgfdw_get_cleanup_result, based on the latter function as you
proposed, and modifying the two functions so that they call the
workhorse function?That's possible. We can revive pgfdw_get_cleanup_result() and
make it call pgfdw_get_result_timed(). Also, with the patch,
pgfdw_get_result() already works in that way. But I'm not sure
how much we should be concerned about back-patch "issue"
in this case. We usually rename the internal functions
if new names are better.
I agree that if the name of an existing function was bad, we should
rename it, but I do not think the name pgfdw_get_cleanup_result is
bad; I think it is good in the sense that it well represents what the
function waits for.
The patch you proposed changes pgfdw_get_cleanup_result to cover the
timed_out==NULL case, but I do not think it is a good idea to rename
it for such a minor reason. That function is used in all supported
versions, so that would just make back-patching hard.
@@ -1599,13 +1572,9 @@ pgfdw_finish_pre_commit_cleanup(List *pending_entries)
entry = (ConnCacheEntry *) lfirst(lc);/* Ignore errors (see notes in pgfdw_xact_callback) */ - while ((res = PQgetResult(entry->conn)) != NULL) - { - PQclear(res); - /* Stop if the connection is lost (else we'll loop infinitely) */ - if (PQstatus(entry->conn) == CONNECTION_BAD) - break; - } + pgfdw_get_result_timed(entry->conn, 0, &res, NULL); + PQclear(res);The existing code prevents interruption, but this would change to
allow it. Do we need this change?You imply that we intentially avoided calling CHECK_FOR_INTERRUPT()
there, don't you?
Yeah, this is intentional; in commit 04e706d42, I coded this to match
the behavior in the non-parallel-commit mode, which does not call
CHECK_FOR_INTERRUPT.
But could you tell me why?
My concern about doing so is that WaitLatchOrSocket is rather
expensive, so it might lead to useless overhead in most cases.
Anyway, this changes the behavior, so you should show the evidence
that this is useful. I think this would be beyond refactoring,
though.
I have to agree with Horiguchi-san, because as mentioned by him, 1)
there isn't enough duplicate code in the two bits to justify merging
them into a single function, and 2) the 0002 patch rather just makes
code complicated. The current implementation is easy to understand,
so I'd vote for leaving them alone for now.(I think the introduction of pgfdw_abort_cleanup is good, because that
de-duplicated much code that existed both in pgfdw_xact_callback and
in pgfdw_subxact_callback, which would outweigh the downside of
pgfdw_abort_cleanup that it made code somewhat complicated due to the
logic to handle both the transaction and subtransaction cases within
that single function. But 0002 is not the case, I think.)The function pgfdw_exec_pre_commit() that I newly introduced consists
of two parts; issue the transaction-end command based on
parallel_commit setting and issue DEALLOCATE ALL. The first part is
duplicated between pgfdw_xact_callback() and pgfdw_subxact_callback(),
but the second not (i.e., it's used only by pgfdw_xact_callback()).
So how about getting rid of that non duplicated part from
pgfdw_exec_pre_commit()?
Seems like a good idea.
I have to agree with Horiguchi-san on this as well; the existing
single-purpose functions are easy to understand, so I'd vote for
leaving them alone.Ok, I will reconsider 0003 patch. BTW, parallel abort patch that
you're proposing seems to add new function pgfdw_finish_abort_cleanup()
with the similar structure as the function added by 0003 patch.
So probably it's helpful for us to consider this together :)
Ok, let us discuss this after the parallel-abort patch.
Sorry for the late reply again.
Best regards,
Etsuro Fujita
On 2023/01/29 19:31, Etsuro Fujita wrote:
I agree that if the name of an existing function was bad, we should
rename it, but I do not think the name pgfdw_get_cleanup_result is
bad; I think it is good in the sense that it well represents what the
function waits for.The patch you proposed changes pgfdw_get_cleanup_result to cover the
timed_out==NULL case, but I do not think it is a good idea to rename
it for such a minor reason. That function is used in all supported
versions, so that would just make back-patching hard.
As far as I understand, the function name pgfdw_get_cleanup_result is
used because it's used to get the result during abort cleanup as
the comment tells. OTOH new function is used even not during abort clean,
e.g., pgfdw_get_result() calls that new function. So I don't think that
pgfdw_get_cleanup_result is good name in some places.
If you want to leave pgfdw_get_cleanup_result for the existing uses,
we can leave that and redefine it so that it just calls the workhorse
function pgfdw_get_result_timed.
Yeah, this is intentional; in commit 04e706d42, I coded this to match
the behavior in the non-parallel-commit mode, which does not call
CHECK_FOR_INTERRUPT.But could you tell me why?
My concern about doing so is that WaitLatchOrSocket is rather
expensive, so it might lead to useless overhead in most cases.
pgfdw_get_result() and pgfdw_get_cleanup_result() already call
WaitLatchOrSocket() and CHECK_FOR_INTERRUPTS(). That is, during
commit phase, they are currently called when receiving the result
of COMMIT TRANSACTION command from remote server. Why do we need
to worry about their overhead only when executing DEALLOCATE ALL?
Anyway, this changes the behavior, so you should show the evidence
that this is useful. I think this would be beyond refactoring,
though.
Isn't it useful to react the interrupts (e.g., shutdown requests)
soon even during waiting for the result of DEALLOCATE ALL?
Regards,
--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION
On Tue, Jan 31, 2023 at 3:44 PM Fujii Masao <masao.fujii@oss.nttdata.com> wrote:
On 2023/01/29 19:31, Etsuro Fujita wrote:
I agree that if the name of an existing function was bad, we should
rename it, but I do not think the name pgfdw_get_cleanup_result is
bad; I think it is good in the sense that it well represents what the
function waits for.The patch you proposed changes pgfdw_get_cleanup_result to cover the
timed_out==NULL case, but I do not think it is a good idea to rename
it for such a minor reason. That function is used in all supported
versions, so that would just make back-patching hard.As far as I understand, the function name pgfdw_get_cleanup_result is
used because it's used to get the result during abort cleanup as
the comment tells. OTOH new function is used even not during abort clean,
e.g., pgfdw_get_result() calls that new function. So I don't think that
pgfdw_get_cleanup_result is good name in some places.
Yeah, I agree on that point.
If you want to leave pgfdw_get_cleanup_result for the existing uses,
we can leave that and redefine it so that it just calls the workhorse
function pgfdw_get_result_timed.
+1; that's actually what I proposed upthread. :-)
BTW the name "pgfdw_get_result_timed" is a bit confusing to me,
because the new function works *without* a timeout condition. We
usually append the suffix "_internal", so how about
"pgfdw_get_result_internal", to avoid that confusion?
Yeah, this is intentional; in commit 04e706d42, I coded this to match
the behavior in the non-parallel-commit mode, which does not call
CHECK_FOR_INTERRUPT.But could you tell me why?
My concern about doing so is that WaitLatchOrSocket is rather
expensive, so it might lead to useless overhead in most cases.pgfdw_get_result() and pgfdw_get_cleanup_result() already call
WaitLatchOrSocket() and CHECK_FOR_INTERRUPTS(). That is, during
commit phase, they are currently called when receiving the result
of COMMIT TRANSACTION command from remote server. Why do we need
to worry about their overhead only when executing DEALLOCATE ALL?
DEALLOCATE ALL is a light operation and is issued immediately after
executing COMMIT TRANSACTION successfully, so I thought that in most
cases it too would be likely to be executed successfully and quickly;
there would be less need to do so for DEALLOCATE ALL.
Anyway, this changes the behavior, so you should show the evidence
that this is useful. I think this would be beyond refactoring,
though.Isn't it useful to react the interrupts (e.g., shutdown requests)
soon even during waiting for the result of DEALLOCATE ALL?
That might be useful, but another concern about this is error
handling. The existing code (both in parallel commit and non-parallel
commit) ignores any kinds of errors in libpq as well as interrupts
when doing DEALLOCATE ALL, and then commits the local transaction,
making the remote/local transaction states consistent, but IIUC the
patch aborts the local transaction when doing the command, e.g., if
WaitLatchOrSocket detected some kind of error in socket access, making
the transaction states *inconsistent*, which I don't think would be
great. So I'm still not sure this would be acceptable.
Best regards,
Etsuro Fujita