diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 4aff315b7c..03223afd53 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -58,6 +58,7 @@ typedef struct ConnCacheEntry bool have_prep_stmt; /* have we prepared any stmts in this xact? */ bool have_error; /* have any subxacts aborted in this xact? */ bool changing_xact_state; /* xact state change in process */ + bool parallel_commit; /* do we commit (sub)xacts in parallel? */ bool invalidated; /* true if reconnect is pending */ bool keep_connections; /* setting value of keep_connections * server option */ @@ -92,6 +93,8 @@ static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user); static void disconnect_pg_server(ConnCacheEntry *entry); static void check_conn_params(const char **keywords, const char **values, UserMapping *user); static void configure_remote_session(PGconn *conn); +static void do_sql_command_begin(PGconn *conn, const char *sql); +static void do_sql_command_end(PGconn *conn, const char *sql, bool ignore_errors); static void begin_remote_xact(ConnCacheEntry *entry); static void pgfdw_xact_callback(XactEvent event, void *arg); static void pgfdw_subxact_callback(SubXactEvent event, @@ -100,6 +103,7 @@ static void pgfdw_subxact_callback(SubXactEvent event, void *arg); static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue); static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry); +static void pgfdw_reset_xact_nesting_depth(ConnCacheEntry *entry); static bool pgfdw_cancel_query(PGconn *conn); static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors); @@ -318,12 +322,15 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user) * By default, all the connections to any foreign servers are kept open. */ entry->keep_connections = true; + entry->parallel_commit = false; foreach(lc, server->options) { DefElem *def = (DefElem *) lfirst(lc); if (strcmp(def->defname, "keep_connections") == 0) entry->keep_connections = defGetBoolean(def); + if (strcmp(def->defname, "parallel_commit") == 0) + entry->parallel_commit = defGetBoolean(def); } /* Now try to make the connection */ @@ -589,11 +596,36 @@ do_sql_command(PGconn *conn, const char *sql) { PGresult *res; + do_sql_command_begin(conn, sql); + res = pgfdw_get_result(conn, sql); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(ERROR, res, conn, true, sql); + PQclear(res); +} + +static void +do_sql_command_begin(PGconn *conn, const char *sql) +{ if (!PQsendQuery(conn, sql)) pgfdw_report_error(ERROR, NULL, conn, false, sql); +} + +static void +do_sql_command_end(PGconn *conn, const char *sql, bool ignore_errors) +{ + PGresult *res; + + /* Consume whatever data is available from the socket */ + if (!PQconsumeInput(conn)) + pgfdw_report_error(ERROR, NULL, conn, false, sql); res = pgfdw_get_result(conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) - pgfdw_report_error(ERROR, res, conn, true, sql); + { + if (ignore_errors) + pgfdw_report_error(WARNING, res, conn, true, sql); + else + pgfdw_report_error(ERROR, res, conn, true, sql); + } PQclear(res); } @@ -851,6 +883,7 @@ pgfdw_xact_callback(XactEvent event, void *arg) { HASH_SEQ_STATUS scan; ConnCacheEntry *entry; + List *pending_xacts = NIL; /* Quick exit if no connections were touched in this transaction. */ if (!xact_got_connection) @@ -888,6 +921,13 @@ pgfdw_xact_callback(XactEvent event, void *arg) /* Commit all remote transactions during pre-commit */ entry->changing_xact_state = true; + if (entry->parallel_commit) + { + do_sql_command_begin(entry->conn, + "COMMIT TRANSACTION"); + pending_xacts = lappend(pending_xacts, entry); + continue; + } do_sql_command(entry->conn, "COMMIT TRANSACTION"); entry->changing_xact_state = false; @@ -943,23 +983,68 @@ pgfdw_xact_callback(XactEvent event, void *arg) } } - /* Reset state to show we're out of a transaction */ - entry->xact_depth = 0; - /* - * If the connection isn't in a good idle state, it is marked as - * invalid or keep_connections option of its server is disabled, then - * discard it to recover. Next GetConnection will open a new - * connection. + * Reset state to show we're out of a transaction, and, if necessary, + * discard the connection */ - if (PQstatus(entry->conn) != CONNECTION_OK || - PQtransactionStatus(entry->conn) != PQTRANS_IDLE || - entry->changing_xact_state || - entry->invalidated || - !entry->keep_connections) + pgfdw_reset_xact_nesting_depth(entry); + } + + if (pending_xacts) + { + List *pending_deallocs = NIL; + ListCell *lc; + + Assert(event == XACT_EVENT_PARALLEL_PRE_COMMIT || + event == XACT_EVENT_PRE_COMMIT); + + foreach(lc, pending_xacts) + { + entry = (ConnCacheEntry *) lfirst(lc); + + Assert(entry->changing_xact_state); + do_sql_command_end(entry->conn, "COMMIT TRANSACTION", false); + entry->changing_xact_state = false; + + /* Do a DEALLOCATE ALL if needed */ + if (entry->have_prep_stmt && entry->have_error) + { + entry->changing_xact_state = true; + do_sql_command_begin(entry->conn, "DEALLOCATE ALL"); + pending_deallocs = lappend(pending_deallocs, entry); + continue; + } + + entry->have_prep_stmt = false; + entry->have_error = false; + + /* + * Reset state to show we're out of a transaction, and, if + * necessary, discard the connection + */ + pgfdw_reset_xact_nesting_depth(entry); + } + + if (pending_deallocs) { - elog(DEBUG3, "discarding connection %p", entry->conn); - disconnect_pg_server(entry); + foreach(lc, pending_deallocs) + { + entry = (ConnCacheEntry *) lfirst(lc); + + Assert(entry->changing_xact_state); + /* Ignore errors in the DEALLOCATE (see note above) */ + do_sql_command_end(entry->conn, "DEALLOCATE ALL", true); + entry->changing_xact_state = false; + + entry->have_prep_stmt = false; + entry->have_error = false; + + /* + * Reset state to show we're out of a transaction, and, if + * necessary, discard the connection + */ + pgfdw_reset_xact_nesting_depth(entry); + } } } @@ -984,6 +1069,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, HASH_SEQ_STATUS scan; ConnCacheEntry *entry; int curlevel; + List *pending_subxacts = NIL; /* Nothing to do at subxact start, nor after commit. */ if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB || @@ -1026,6 +1112,12 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, /* Commit all remote subtransactions during pre-commit */ snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel); entry->changing_xact_state = true; + if (entry->parallel_commit) + { + do_sql_command_begin(entry->conn, sql); + pending_subxacts = lappend(pending_subxacts, entry); + continue; + } do_sql_command(entry->conn, sql); entry->changing_xact_state = false; } @@ -1041,6 +1133,27 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, /* OK, we're outta that level of subtransaction */ entry->xact_depth--; } + + if (pending_subxacts) + { + char sql[100]; + ListCell *lc; + + Assert(event == SUBXACT_EVENT_PRE_COMMIT_SUB); + + snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel); + foreach(lc, pending_subxacts) + { + entry = (ConnCacheEntry *) lfirst(lc); + + Assert(entry->changing_xact_state); + do_sql_command_end(entry->conn, sql, false); + entry->changing_xact_state = false; + + /* OK, we're outta that level of subtransaction */ + entry->xact_depth--; + } + } } /* @@ -1132,6 +1245,28 @@ pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry) server->servername))); } +static void +pgfdw_reset_xact_nesting_depth(ConnCacheEntry *entry) +{ + /* Reset state to show we're out of a transaction */ + entry->xact_depth = 0; + + /* + * If the connection isn't in a good idle state, it is marked as invalid + * or keep_connections option of its server is disabled, then discard it + * to recover. Next GetConnection will open a new connection. + */ + if (PQstatus(entry->conn) != CONNECTION_OK || + PQtransactionStatus(entry->conn) != PQTRANS_IDLE || + entry->changing_xact_state || + entry->invalidated || + !entry->keep_connections) + { + elog(DEBUG3, "discarding connection %p", entry->conn); + disconnect_pg_server(entry); + } +} + /* * Cancel the currently-in-progress query (whose query text we do not have) * and ignore the result. Returns true if we successfully cancel the query diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index fd141a0fa5..54b473de3d 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -9452,7 +9452,7 @@ DO $d$ END; $d$; ERROR: invalid option "password" -HINT: Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, sslcrldir, sslsni, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, truncatable, fetch_size, batch_size, async_capable, keep_connections +HINT: Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, sslcrldir, sslsni, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, truncatable, fetch_size, batch_size, async_capable, parallel_commit, keep_connections CONTEXT: SQL statement "ALTER SERVER loopback_nopw OPTIONS (ADD password 'dummypw')" PL/pgSQL function inline_code_block line 3 at EXECUTE -- If we add a password for our user mapping instead, we should get a different @@ -10768,3 +10768,76 @@ ERROR: invalid value for integer option "batch_size": 100$%$#$# ALTER FOREIGN DATA WRAPPER postgres_fdw OPTIONS (nonexistent 'fdw'); ERROR: invalid option "nonexistent" HINT: There are no valid options in this context. +-- =================================================================== +-- test parallel commit +-- =================================================================== +ALTER SERVER loopback OPTIONS (ADD parallel_commit 'true'); +CREATE TABLE ploc1 (f1 int, f2 text); +CREATE FOREIGN TABLE prem1 (f1 int, f2 text) + SERVER loopback OPTIONS (table_name 'ploc1'); +CREATE TABLE ploc2 (f1 int, f2 text); +CREATE FOREIGN TABLE prem2 (f1 int, f2 text) + SERVER loopback OPTIONS (table_name 'ploc2'); +BEGIN; +INSERT INTO prem1 VALUES (101, 'foo'); +INSERT INTO prem2 VALUES (201, 'bar'); +COMMIT; +SELECT * FROM prem1; + f1 | f2 +-----+----- + 101 | foo +(1 row) + +SELECT * FROM prem2; + f1 | f2 +-----+----- + 201 | bar +(1 row) + +BEGIN; +SAVEPOINT s; +INSERT INTO prem1 VALUES (102, 'foofoo'); +INSERT INTO prem2 VALUES (202, 'barbar'); +RELEASE SAVEPOINT s; +COMMIT; +SELECT * FROM prem1; + f1 | f2 +-----+-------- + 101 | foo + 102 | foofoo +(2 rows) + +SELECT * FROM prem2; + f1 | f2 +-----+-------- + 201 | bar + 202 | barbar +(2 rows) + +-- This tests parallelizing crearing prepared statements during pre-commit +BEGIN; +SAVEPOINT s; +INSERT INTO prem1 VALUES (103, 'foofoo'); +INSERT INTO prem2 VALUES (203, 'barbar'); +ROLLBACK TO SAVEPOINT s; +RELEASE SAVEPOINT s; +INSERT INTO prem1 VALUES (104, 'test1'); +INSERT INTO prem2 VALUES (204, 'test2'); +COMMIT; +SELECT * FROM prem1; + f1 | f2 +-----+-------- + 101 | foo + 102 | foofoo + 104 | test1 +(3 rows) + +SELECT * FROM prem2; + f1 | f2 +-----+-------- + 201 | bar + 202 | barbar + 204 | test2 +(3 rows) + +ALTER SERVER loopback OPTIONS (DROP parallel_commit); diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c index 48c7417e6e..1be5830c53 100644 --- a/contrib/postgres_fdw/option.c +++ b/contrib/postgres_fdw/option.c @@ -120,6 +120,7 @@ postgres_fdw_validator(PG_FUNCTION_ARGS) strcmp(def->defname, "updatable") == 0 || strcmp(def->defname, "truncatable") == 0 || strcmp(def->defname, "async_capable") == 0 || + strcmp(def->defname, "parallel_commit") == 0 || strcmp(def->defname, "keep_connections") == 0) { /* these accept only boolean values */ @@ -248,6 +249,7 @@ InitPgFdwOptions(void) /* async_capable is available on both server and table */ {"async_capable", ForeignServerRelationId, false}, {"async_capable", ForeignTableRelationId, false}, + {"parallel_commit", ForeignServerRelationId, false}, {"keep_connections", ForeignServerRelationId, false}, {"password_required", UserMappingRelationId, false}, diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 43c30d492d..6bb8937ced 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -3426,3 +3426,46 @@ CREATE FOREIGN TABLE inv_bsz (c1 int ) -- No option is allowed to be specified at foreign data wrapper level ALTER FOREIGN DATA WRAPPER postgres_fdw OPTIONS (nonexistent 'fdw'); + +-- =================================================================== +-- test parallel commit +-- =================================================================== +ALTER SERVER loopback OPTIONS (ADD parallel_commit 'true'); + +CREATE TABLE ploc1 (f1 int, f2 text); +CREATE FOREIGN TABLE prem1 (f1 int, f2 text) + SERVER loopback OPTIONS (table_name 'ploc1'); +CREATE TABLE ploc2 (f1 int, f2 text); +CREATE FOREIGN TABLE prem2 (f1 int, f2 text) + SERVER loopback OPTIONS (table_name 'ploc2'); + +BEGIN; +INSERT INTO prem1 VALUES (101, 'foo'); +INSERT INTO prem2 VALUES (201, 'bar'); +COMMIT; +SELECT * FROM prem1; +SELECT * FROM prem2; + +BEGIN; +SAVEPOINT s; +INSERT INTO prem1 VALUES (102, 'foofoo'); +INSERT INTO prem2 VALUES (202, 'barbar'); +RELEASE SAVEPOINT s; +COMMIT; +SELECT * FROM prem1; +SELECT * FROM prem2; + +-- This tests parallelizing crearing prepared statements during pre-commit +BEGIN; +SAVEPOINT s; +INSERT INTO prem1 VALUES (103, 'foofoo'); +INSERT INTO prem2 VALUES (203, 'barbar'); +ROLLBACK TO SAVEPOINT s; +RELEASE SAVEPOINT s; +INSERT INTO prem1 VALUES (104, 'test1'); +INSERT INTO prem2 VALUES (204, 'test2'); +COMMIT; +SELECT * FROM prem1; +SELECT * FROM prem2; + +ALTER SERVER loopback OPTIONS (DROP parallel_commit);