Append with naive multiplexing of FDWs
Hello,
A few years back[1]/messages/by-id/CAEepm=1CuAWfxDk==jZ7pgCDCv52fiUnDSpUvmznmVmRKU5zpA@mail.gmail.com I experimented with a simple readiness API that
would allow Append to start emitting tuples from whichever Foreign
Scan has data available, when working with FDW-based sharding. I used
that primarily as a way to test Andres's new WaitEventSet stuff and my
kqueue implementation of that, but I didn't pursue it seriously
because I knew we wanted a more ambitious async executor rewrite and
many people had ideas about that, with schedulers capable of jumping
all over the tree etc.
Anyway, Stephen Frost pinged me off-list to ask about that patch, and
asked why we don't just do this naive thing until we have something
better. It's a very localised feature that works only between Append
and its immediate children. The patch makes it work for postgres_fdw,
but it should work for any FDW that can get its hands on a socket.
Here's a quick rebase of that old POC patch, along with a demo. Since
2016, Parallel Append landed, but I didn't have time to think about
how to integrate with that so I did a quick "sledgehammer" rebase that
disables itself if parallelism is in the picture.
=== demo ===
create table t (a text, b text);
create or replace function slow_data(name text) returns setof t as
$$
begin
perform pg_sleep(random());
return query select name, generate_series(1, 100)::text as i;
end;
$$
language plpgsql;
create view t1 as select * from slow_data('t1');
create view t2 as select * from slow_data('t2');
create view t3 as select * from slow_data('t3');
create extension postgres_fdw;
create server server1 foreign data wrapper postgres_fdw options
(dbname 'postgres');
create server server2 foreign data wrapper postgres_fdw options
(dbname 'postgres');
create server server3 foreign data wrapper postgres_fdw options
(dbname 'postgres');
create user mapping for current_user server server1;
create user mapping for current_user server server2;
create user mapping for current_user server server3;
create foreign table ft1 (a text, b text) server server1 options
(table_name 't1');
create foreign table ft2 (a text, b text) server server2 options
(table_name 't2');
create foreign table ft3 (a text, b text) server server3 options
(table_name 't3');
-- create three remote shards
create table pt (a text, b text) partition by list (a);
alter table pt attach partition ft1 for values in ('ft1');
alter table pt attach partition ft2 for values in ('ft2');
alter table pt attach partition ft3 for values in ('ft3');
-- see that tuples come back in the order that they're ready
select * from pt where b like '42';
[1]: /messages/by-id/CAEepm=1CuAWfxDk==jZ7pgCDCv52fiUnDSpUvmznmVmRKU5zpA@mail.gmail.com
--
Thomas Munro
https://enterprisedb.com
Attachments:
0001-Multiplexing-Append-POC.patchapplication/octet-stream; name=0001-Multiplexing-Append-POC.patchDownload
From 7240714031e06df64da118f25237381e5250fc2a Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Wed, 4 Sep 2019 17:49:39 +1200
Subject: [PATCH] Multiplexing Append POC.
---
contrib/postgres_fdw/connection.c | 8 +-
contrib/postgres_fdw/postgres_fdw.c | 132 +++++++++++++--
contrib/postgres_fdw/postgres_fdw.h | 12 +-
doc/src/sgml/monitoring.sgml | 6 +-
src/backend/executor/execProcnode.c | 25 +++
src/backend/executor/nodeAppend.c | 214 +++++++++++++++++++++++++
src/backend/executor/nodeForeignscan.c | 19 +++
src/backend/postmaster/pgstat.c | 3 +
src/include/executor/execReady.h | 31 ++++
src/include/executor/executor.h | 1 +
src/include/executor/nodeForeignscan.h | 1 +
src/include/foreign/fdwapi.h | 5 +
src/include/nodes/execnodes.h | 5 +
src/include/pgstat.h | 1 +
14 files changed, 447 insertions(+), 16 deletions(-)
create mode 100644 src/include/executor/execReady.h
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 57ed5f4b905..494b67126a2 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -58,6 +58,7 @@ typedef struct ConnCacheEntry
bool invalidated; /* true if reconnect is pending */
uint32 server_hashvalue; /* hash value of foreign server OID */
uint32 mapping_hashvalue; /* hash value of user mapping OID */
+ PgFdwConnState state; /* extra per-connection state */
} ConnCacheEntry;
/*
@@ -104,7 +105,7 @@ static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
* (not even on error), we need this flag to cue manual cleanup.
*/
PGconn *
-GetConnection(UserMapping *user, bool will_prep_stmt)
+GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
{
bool found;
ConnCacheEntry *entry;
@@ -196,6 +197,7 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
entry->mapping_hashvalue =
GetSysCacheHashValue1(USERMAPPINGOID,
ObjectIdGetDatum(user->umid));
+ memset(&entry->state, 0, sizeof(entry->state));
/* Now try to make the connection */
entry->conn = connect_pg_server(server, user);
@@ -212,6 +214,10 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
/* Remember if caller will prepare statements */
entry->have_prep_stmt |= will_prep_stmt;
+ /* If caller needs access to the per-connection state, return it. */
+ if (state)
+ *state = &entry->state;
+
return entry->conn;
}
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 82d8140ba25..0bc0a1f6960 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -21,6 +21,7 @@
#include "commands/defrem.h"
#include "commands/explain.h"
#include "commands/vacuum.h"
+#include "executor/execReady.h"
#include "foreign/fdwapi.h"
#include "funcapi.h"
#include "miscadmin.h"
@@ -158,6 +159,9 @@ typedef struct PgFdwScanState
MemoryContext temp_cxt; /* context for per-tuple temporary data */
int fetch_size; /* number of tuples per fetch */
+
+ /* per-connection state */
+ PgFdwConnState *conn_state;
} PgFdwScanState;
/*
@@ -391,6 +395,8 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root,
RelOptInfo *output_rel,
void *extra);
+static int postgresReady(ForeignScanState *node);
+
/*
* Helper functions
*/
@@ -418,6 +424,7 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
EquivalenceClass *ec, EquivalenceMember *em,
void *arg);
static void create_cursor(ForeignScanState *node);
+static void fetch_more_data_begin(ForeignScanState *node);
static void fetch_more_data(ForeignScanState *node);
static void close_cursor(PGconn *conn, unsigned int cursor_number);
static PgFdwModifyState *create_foreign_modify(EState *estate,
@@ -557,6 +564,9 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
/* Support functions for upper relation push-down */
routine->GetForeignUpperPaths = postgresGetForeignUpperPaths;
+ /* Support for asynchrony */
+ routine->Ready = postgresReady;
+
PG_RETURN_POINTER(routine);
}
@@ -1446,7 +1456,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
* Get connection to the foreign server. Connection manager will
* establish new connection if necessary.
*/
- fsstate->conn = GetConnection(user, false);
+ fsstate->conn = GetConnection(user, false, &fsstate->conn_state);
/* Assign a unique ID for my cursor */
fsstate->cursor_number = GetCursorNumber(fsstate->conn);
@@ -1497,6 +1507,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
&fsstate->param_flinfo,
&fsstate->param_exprs,
&fsstate->param_values);
+ fsstate->conn_state->async_query_sent = false;
}
/*
@@ -1609,6 +1620,13 @@ postgresEndForeignScan(ForeignScanState *node)
if (fsstate == NULL)
return;
+ /*
+ * If we're ending before we've collected a response from an asynchronous
+ * query, we have to consume the response.
+ */
+ if (fsstate->conn_state->async_query_sent)
+ fetch_more_data(node);
+
/* Close the cursor if open, to prevent accumulation of cursors */
if (fsstate->cursor_exists)
close_cursor(fsstate->conn, fsstate->cursor_number);
@@ -2384,7 +2402,7 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
* Get connection to the foreign server. Connection manager will
* establish new connection if necessary.
*/
- dmstate->conn = GetConnection(user, false);
+ dmstate->conn = GetConnection(user, false, NULL);
/* Update the foreign-join-related fields. */
if (fsplan->scan.scanrelid == 0)
@@ -2684,7 +2702,7 @@ estimate_path_cost_size(PlannerInfo *root,
false, &retrieved_attrs, NULL);
/* Get the remote estimate */
- conn = GetConnection(fpinfo->user, false);
+ conn = GetConnection(fpinfo->user, false, NULL);
get_remote_estimate(sql.data, conn, &rows, &width,
&startup_cost, &total_cost);
ReleaseConnection(conn);
@@ -3351,13 +3369,28 @@ fetch_more_data(ForeignScanState *node)
int numrows;
int i;
- snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
- fsstate->fetch_size, fsstate->cursor_number);
+ if (!fsstate->conn_state->async_query_sent)
+ {
+ /* This is a regular synchronous fetch. */
+ snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
+ fsstate->fetch_size, fsstate->cursor_number);
- res = pgfdw_exec_query(conn, sql);
- /* On error, report the original query, not the FETCH. */
- if (PQresultStatus(res) != PGRES_TUPLES_OK)
- pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+ res = pgfdw_exec_query(conn, sql);
+ /* On error, report the original query, not the FETCH. */
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+ }
+ else
+ {
+ /*
+ * The query was already sent by an earlier call to
+ * fetch_more_data_begin. So now we just fetch the result.
+ */
+ res = PQgetResult(conn);
+ /* On error, report the original query, not the FETCH. */
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+ }
/* Convert the data into HeapTuples */
numrows = PQntuples(res);
@@ -3386,6 +3419,15 @@ fetch_more_data(ForeignScanState *node)
fsstate->eof_reached = (numrows < fsstate->fetch_size);
PQclear(res);
+
+ /* If this was the second part of an async request, we must fetch until NULL. */
+ if (fsstate->conn_state->async_query_sent)
+ {
+ /* call once and raise error if not NULL as expected? */
+ while (PQgetResult(conn) != NULL)
+ ;
+ fsstate->conn_state->async_query_sent = false;
+ }
res = NULL;
}
PG_CATCH();
@@ -3399,6 +3441,35 @@ fetch_more_data(ForeignScanState *node)
MemoryContextSwitchTo(oldcontext);
}
+/*
+ * Begin an asynchronous data fetch.
+ * fetch_more_data must be called to fetch the results..
+ */
+static void
+fetch_more_data_begin(ForeignScanState *node)
+{
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ PGconn *conn = fsstate->conn;
+ char sql[64];
+
+ Assert(!fsstate->conn_state->async_query_sent);
+
+ /*
+ * Create the cursor synchronously. (With more state machine stuff we
+ * could do this asynchronously too).
+ */
+ if (!fsstate->cursor_exists)
+ create_cursor(node);
+
+ /* We will send this query, but not wait for the response. */
+ snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
+ fsstate->fetch_size, fsstate->cursor_number);
+
+ if (PQsendQuery(conn, sql) < 0)
+ pgfdw_report_error(ERROR, NULL, conn, false, fsstate->query);
+ fsstate->conn_state->async_query_sent = true;
+}
+
/*
* Force assorted GUC parameters to settings that ensure that we'll output
* data values in a form that is unambiguous to the remote server.
@@ -3512,7 +3583,7 @@ create_foreign_modify(EState *estate,
user = GetUserMapping(userid, table->serverid);
/* Open connection; report that we'll create a prepared statement. */
- fmstate->conn = GetConnection(user, true);
+ fmstate->conn = GetConnection(user, true, NULL);
fmstate->p_name = NULL; /* prepared statement not made yet */
/* Set up remote query information. */
@@ -4387,7 +4458,7 @@ postgresAnalyzeForeignTable(Relation relation,
*/
table = GetForeignTable(RelationGetRelid(relation));
user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
- conn = GetConnection(user, false);
+ conn = GetConnection(user, false, NULL);
/*
* Construct command to get page count for relation.
@@ -4477,7 +4548,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
table = GetForeignTable(RelationGetRelid(relation));
server = GetForeignServer(table->serverid);
user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
- conn = GetConnection(user, false);
+ conn = GetConnection(user, false, NULL);
/*
* Construct cursor that retrieves whole rows from remote.
@@ -4705,7 +4776,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
*/
server = GetForeignServer(serverOid);
mapping = GetUserMapping(GetUserId(), server->serverid);
- conn = GetConnection(mapping, false);
+ conn = GetConnection(mapping, false, NULL);
/* Don't attempt to import collation if remote server hasn't got it */
if (PQserverVersion(conn) < 90100)
@@ -5514,6 +5585,41 @@ postgresGetForeignJoinPaths(PlannerInfo *root,
/* XXX Consider parameterized paths for the join relation */
}
+static int
+postgresReady(ForeignScanState *node)
+{
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+
+ if (fsstate->conn_state->async_query_sent)
+ {
+ /*
+ * We have already started a query, for some other executor node. We
+ * currently can't handle two at the same time (we'd have to create
+ * more connections for that).
+ */
+ return EXEC_READY_BUSY;
+ }
+ else if (fsstate->next_tuple < fsstate->num_tuples)
+ {
+ /* We already have buffered tuples. */
+ return EXEC_READY_MORE;
+ }
+ else if (fsstate->eof_reached)
+ {
+ /* We have already hit the end of the scan. */
+ return EXEC_READY_EOF;
+ }
+ else
+ {
+ /*
+ * We will start a query now, and tell the caller to wait until the
+ * file descriptor says we're ready and then call ExecProcNode.
+ */
+ fetch_more_data_begin(node);
+ return PQsocket(fsstate->conn);
+ }
+}
+
/*
* Assess whether the aggregation, grouping and having operations can be pushed
* down to the foreign server. As a side effect, save information we obtain in
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 6acb7dcf6cd..b4a25bbe105 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -20,6 +20,15 @@
#include "libpq-fe.h"
+/*
+ * Extra control information relating to a connection.
+ */
+typedef struct PgFdwConnState
+{
+ /* Has an asynchronous query been sent? */
+ bool async_query_sent;
+} PgFdwConnState;
+
/*
* FDW-specific planner information kept in RelOptInfo.fdw_private for a
* postgres_fdw foreign table. For a baserel, this struct is created by
@@ -127,7 +136,8 @@ extern int set_transmission_modes(void);
extern void reset_transmission_modes(int nestlevel);
/* in connection.c */
-extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt);
+extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt,
+ PgFdwConnState **state);
extern void ReleaseConnection(PGconn *conn);
extern unsigned int GetCursorNumber(PGconn *conn);
extern unsigned int GetPrepStmtNumber(PGconn *conn);
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index bf72d0c3031..8e54d1957e4 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1310,7 +1310,11 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
<entry>Waiting in an extension.</entry>
</row>
<row>
- <entry morerows="36"><literal>IPC</literal></entry>
+ <entry morerows="37"><literal>IPC</literal></entry>
+ <entry><literal>AppendReady</literal></entry>
+ <entry>Waiting for a subplan of Append to be ready.</entry>
+ </row>
+ <row>
<entry><literal>BgWorkerShutdown</literal></entry>
<entry>Waiting for background worker to shut down.</entry>
</row>
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index c227282975a..6f946d859f5 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -73,6 +73,7 @@
#include "postgres.h"
#include "executor/executor.h"
+#include "executor/execReady.h"
#include "executor/nodeAgg.h"
#include "executor/nodeAppend.h"
#include "executor/nodeBitmapAnd.h"
@@ -732,6 +733,30 @@ ExecEndNode(PlanState *node)
}
}
+/*
+ * ExecReady
+ *
+ * Check whether the node would be able to produce a new tuple without
+ * blocking. EXEC_READY_MORE means a tuple can be returned by ExecProcNode
+ * immediately without waiting. EXEC_READY_EOF means there are no further
+ * tuples to consume. EXEC_READY_UNSUPPORTED means that this node doesn't
+ * support asynchronous interaction. EXEC_READY_BUSY means that this node
+ * currently can't provide asynchronous service. Any other value is a file
+ * descriptor which can be used to wait until the node is ready to produce a
+ * tuple.
+ */
+int
+ExecReady(PlanState *node)
+{
+ switch (nodeTag(node))
+ {
+ case T_ForeignScanState:
+ return ExecForeignScanReady((ForeignScanState *) node);
+ default:
+ return EXEC_READY_UNSUPPORTED;
+ }
+}
+
/*
* ExecShutdownNode
*
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index 5ff986ac7d3..5b88f8e7e43 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -59,8 +59,11 @@
#include "executor/execdebug.h"
#include "executor/execPartition.h"
+#include "executor/execReady.h"
#include "executor/nodeAppend.h"
#include "miscadmin.h"
+#include "pgstat.h"
+#include "storage/latch.h"
/* Shared state for parallel-aware Append. */
struct ParallelAppendState
@@ -239,9 +242,207 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
/* For parallel query, this will be overridden later. */
appendstate->choose_next_subplan = choose_next_subplan_locally;
+ /*
+ * Initially we consider all subplans to be potentially asynchronous.
+ */
+ appendstate->asyncplans = (PlanState **) palloc(nplans * sizeof(PlanState *));
+ appendstate->asyncfds = (int *) palloc0(nplans * sizeof(int));
+ appendstate->nasyncplans = nplans;
+ memcpy(appendstate->asyncplans, appendstate->appendplans, nplans * sizeof(PlanState *));
+ appendstate->lastreadyplan = 0;
+
return appendstate;
}
+/*
+ * Forget about an asynchronous subplan, given an async subplan index. Return
+ * the index of the next subplan.
+ */
+static int
+forget_async_subplan(AppendState *node, int i)
+{
+ int last = node->nasyncplans - 1;
+
+ if (i == last)
+ {
+ /* This was the last subplan, forget it and move to first. */
+ i = 0;
+ if (node->lastreadyplan == last)
+ node->lastreadyplan = 0;
+ }
+ else
+ {
+ /*
+ * Move the last one here (cheaper than memmov'ing the whole array
+ * down and we don't care about the order).
+ */
+ node->asyncplans[i] = node->asyncplans[last];
+ node->asyncfds[i] = node->asyncfds[last];
+ }
+ --node->nasyncplans;
+
+ return i;
+}
+
+/*
+ * Wait for the first asynchronous subplan's file descriptor to be ready to
+ * read or error, and then ask it for a tuple.
+ *
+ * This is called by append_next_async when every async subplan has provided a
+ * file descriptor to wait on, so we must begin waiting.
+ */
+static TupleTableSlot *
+append_next_async_wait(AppendState *node)
+{
+ while (node->nasyncplans > 0)
+ {
+ WaitEventSet *set;
+ WaitEvent event;
+ int i;
+
+ /*
+ * For now there is no facility to remove fds from WaitEventSets when
+ * they are no longer interesting, so we allocate, populate, free
+ * every time, a la select(). If we had RemoveWaitEventFromSet, we
+ * could use the same WaitEventSet object for the life of the append
+ * node, and add/remove as we go, a la epoll/kqueue.
+ *
+ * Note: We could make a single call to WaitEventSetWait and have a
+ * big enough output event buffer to learn about readiness on all
+ * interesting sockets and loop over those, but one implementation can
+ * only tell us about a single socket at a time, so we need to be
+ * prepared to call WaitEventSetWait repeatedly.
+ */
+ set = CreateWaitEventSet(CurrentMemoryContext, node->nasyncplans + 1);
+ AddWaitEventToSet(set, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, NULL,
+ NULL);
+ for (i = 0; i < node->nasyncplans; ++i)
+ {
+ Assert(node->asyncfds[i] > 0);
+ AddWaitEventToSet(set, WL_SOCKET_READABLE, node->asyncfds[i], NULL,
+ NULL);
+ }
+ i = WaitEventSetWait(set, -1, &event, 1, WAIT_EVENT_APPEND_READY);
+ Assert(i > 0);
+ FreeWaitEventSet(set);
+
+ if (event.events & WL_SOCKET_READABLE)
+ {
+ /* Linear search for the node that told us to wait for this fd. */
+ for (i = 0; i < node->nasyncplans; ++i)
+ {
+ if (event.fd == node->asyncfds[i])
+ {
+ TupleTableSlot *result;
+
+ /*
+ * We assume that because the fd is ready, it can produce
+ * a tuple now, which is not perfect. An improvement
+ * would be if it could say 'not yet, I'm still not
+ * ready', so eg postgres_fdw could PQconsumeInput and
+ * then say 'I need more input'.
+ */
+ result = ExecProcNode(node->asyncplans[i]);
+ if (!TupIsNull(result))
+ {
+ /*
+ * Remember this plan so that append_next_async will
+ * keep trying this subplan first until it stops
+ * feeding us buffered tuples.
+ */
+ node->lastreadyplan = i;
+ /* We can stop waiting for this fd. */
+ node->asyncfds[i] = 0;
+ return result;
+ }
+ else
+ {
+ /*
+ * This subplan has reached EOF. We'll go back and
+ * wait for another one.
+ */
+ forget_async_subplan(node, i);
+ break;
+ }
+ }
+ }
+ }
+ }
+ /*
+ * We visited every ready subplan, tried to pull a tuple, and they all
+ * reported EOF. There is no more async data available.
+ */
+ return NULL;
+}
+
+/*
+ * Fetch the next tuple available from any asynchronous subplan. If none can
+ * provide a tuple immediately, wait for the first one that is ready to
+ * provide a tuple. Return NULL when there are no more tuples available.
+ */
+static TupleTableSlot *
+append_next_async(AppendState *node)
+{
+ int count;
+ int i;
+
+ /*
+ * We'll start our scan of subplans at the last one that was able to give
+ * us a tuple, if there was one. It may be able to give us a new tuple
+ * straight away so we can leave early.
+ */
+ i = node->lastreadyplan;
+
+ /* Loop until we've visited each potentially async subplan. */
+ for (count = node->nasyncplans; count > 0; --count)
+ {
+ /*
+ * If we don't already have a file descriptor to wait on for this
+ * subplan, see if it is ready.
+ */
+ if (node->asyncfds[i] == 0)
+ {
+ int ready = ExecReady(node->asyncplans[i]);
+
+ switch (ready)
+ {
+ case EXEC_READY_MORE:
+ /* The node has a buffered tuple for us. */
+ return ExecProcNode(node->asyncplans[i]);
+
+ case EXEC_READY_UNSUPPORTED:
+ case EXEC_READY_EOF:
+ case EXEC_READY_BUSY:
+ /* This subplan can't give us anything asynchronously. */
+ i = forget_async_subplan(node, i);
+ continue;
+
+ default:
+ /* We have a new file descriptor to wait for. */
+ Assert(ready > 0);
+ node->asyncfds[i] = ready;
+ node->lastreadyplan = 0;
+ break;
+ }
+ }
+
+ /* Move on to the next plan (circular). */
+ i = (i + 1) % node->nasyncplans;
+ }
+
+ /* We might have removed all subplans; if so we can leave now. */
+ if (node->nasyncplans == 0)
+ return NULL;
+
+ /*
+ * If we reached here, then all remaining async subplans have given us a
+ * file descriptor to wait for. So do that, and pull a tuple as soon as
+ * one is ready.
+ */
+ return append_next_async_wait(node);
+}
+
+
/* ----------------------------------------------------------------
* ExecAppend
*
@@ -253,6 +454,16 @@ ExecAppend(PlanState *pstate)
{
AppendState *node = castNode(AppendState, pstate);
+ /* First, drain all asynchronous subplans as they become ready. */
+ if (node->nasyncplans > 0)
+ {
+ TupleTableSlot *result = append_next_async(node);
+
+ if (!TupIsNull(result))
+ return result;
+ }
+ Assert(node->nasyncplans == 0);
+
if (node->as_whichplan < 0)
{
/*
@@ -415,6 +626,9 @@ ExecAppendInitializeDSM(AppendState *node,
node->as_pstate = pstate;
node->choose_next_subplan = choose_next_subplan_for_leader;
+
+ /* TODO: for now disable async when running in parallel */
+ node->nasyncplans = 0;
}
/* ----------------------------------------------------------------
diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c
index 52af1dac5c4..95e7f66cb76 100644
--- a/src/backend/executor/nodeForeignscan.c
+++ b/src/backend/executor/nodeForeignscan.c
@@ -23,6 +23,7 @@
#include "postgres.h"
#include "executor/executor.h"
+#include "executor/execReady.h"
#include "executor/nodeForeignscan.h"
#include "foreign/fdwapi.h"
#include "utils/memutils.h"
@@ -384,3 +385,21 @@ ExecShutdownForeignScan(ForeignScanState *node)
if (fdwroutine->ShutdownForeignScan)
fdwroutine->ShutdownForeignScan(node);
}
+
+/* ----------------------------------------------------------------
+ * ExecForeignScanReady
+ *
+ * Checks if the foreign scan can emit data asynchronously
+ * using socket readiness as an indicator.
+ * ----------------------------------------------------------------
+ */
+int
+ExecForeignScanReady(ForeignScanState *node)
+{
+ FdwRoutine *fdwroutine = node->fdwroutine;
+
+ if (fdwroutine->Ready)
+ return fdwroutine->Ready(node);
+ else
+ return EXEC_READY_UNSUPPORTED;
+}
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index d362e7f7d7d..cae90b2f7d8 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3744,6 +3744,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
switch (w)
{
+ case WAIT_EVENT_APPEND_READY:
+ event_name = "AppendReady";
+ break;
case WAIT_EVENT_BGWORKER_SHUTDOWN:
event_name = "BgWorkerShutdown";
break;
diff --git a/src/include/executor/execReady.h b/src/include/executor/execReady.h
new file mode 100644
index 00000000000..01410ea7bcb
--- /dev/null
+++ b/src/include/executor/execReady.h
@@ -0,0 +1,31 @@
+/*-------------------------------------------------------------------------
+ *
+ * execReady.h
+ * Values used by FDW and the executor for async tuple iteration.
+ *
+ * Portions Copyright (c) 2019, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/include/executor/execReady.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef EXECREADY_H
+#define EXECREADY_H
+
+/*
+ * Asynchronous processing is not currently available (because an asynchronous
+ * request is already in progress).
+ */
+#define EXEC_READY_BUSY -3
+
+/* There are no more tuples. */
+#define EXEC_READY_EOF -2
+
+/* This FDW or executor node does not support asynchronous processing. */
+#define EXEC_READY_UNSUPPORTED -1
+
+/* More tuples are available immediately without waiting. */
+#define EXEC_READY_MORE 0
+
+#endif
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index affe6ad6982..2d84c6d01b4 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -219,6 +219,7 @@ extern void EvalPlanQualEnd(EPQState *epqstate);
extern PlanState *ExecInitNode(Plan *node, EState *estate, int eflags);
extern void ExecSetExecProcNode(PlanState *node, ExecProcNodeMtd function);
extern Node *MultiExecProcNode(PlanState *node);
+extern int ExecReady(PlanState *node);
extern void ExecEndNode(PlanState *node);
extern bool ExecShutdownNode(PlanState *node);
extern void ExecSetTupleBound(int64 tuples_needed, PlanState *child_node);
diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h
index ca7723c8997..32bc7215478 100644
--- a/src/include/executor/nodeForeignscan.h
+++ b/src/include/executor/nodeForeignscan.h
@@ -30,5 +30,6 @@ extern void ExecForeignScanReInitializeDSM(ForeignScanState *node,
extern void ExecForeignScanInitializeWorker(ForeignScanState *node,
ParallelWorkerContext *pwcxt);
extern void ExecShutdownForeignScan(ForeignScanState *node);
+extern int ExecForeignScanReady(ForeignScanState *node);
#endif /* NODEFOREIGNSCAN_H */
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index 822686033e4..7a4c07478a9 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -170,6 +170,8 @@ typedef List *(*ReparameterizeForeignPathByChild_function) (PlannerInfo *root,
List *fdw_private,
RelOptInfo *child_rel);
+typedef int (*Ready_function) (ForeignScanState *node);
+
/*
* FdwRoutine is the struct returned by a foreign-data wrapper's handler
* function. It provides pointers to the callback functions needed by the
@@ -246,6 +248,9 @@ typedef struct FdwRoutine
/* Support functions for path reparameterization. */
ReparameterizeForeignPathByChild_function ReparameterizeForeignPathByChild;
+
+ /* Support functions for asynchronous processing */
+ Ready_function Ready;
} FdwRoutine;
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index f42189d2bf6..da4aead8722 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1174,6 +1174,11 @@ struct AppendState
struct PartitionPruneState *as_prune_state;
Bitmapset *as_valid_subplans;
bool (*choose_next_subplan) (AppendState *);
+
+ PlanState **asyncplans;
+ int *asyncfds;
+ int nasyncplans;
+ int lastreadyplan;
};
/* ----------------
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index fe076d823db..298aebe3ddc 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -817,6 +817,7 @@ typedef enum
*/
typedef enum
{
+ WAIT_EVENT_APPEND_READY,
WAIT_EVENT_BGWORKER_SHUTDOWN = PG_WAIT_IPC,
WAIT_EVENT_BGWORKER_STARTUP,
WAIT_EVENT_BTREE_PAGE,
--
2.22.0
On Wed, Sep 4, 2019 at 06:18:31PM +1200, Thomas Munro wrote:
Hello,
A few years back[1] I experimented with a simple readiness API that
would allow Append to start emitting tuples from whichever Foreign
Scan has data available, when working with FDW-based sharding. I used
that primarily as a way to test Andres's new WaitEventSet stuff and my
kqueue implementation of that, but I didn't pursue it seriously
because I knew we wanted a more ambitious async executor rewrite and
many people had ideas about that, with schedulers capable of jumping
all over the tree etc.Anyway, Stephen Frost pinged me off-list to ask about that patch, and
asked why we don't just do this naive thing until we have something
better. It's a very localised feature that works only between Append
and its immediate children. The patch makes it work for postgres_fdw,
but it should work for any FDW that can get its hands on a socket.Here's a quick rebase of that old POC patch, along with a demo. Since
2016, Parallel Append landed, but I didn't have time to think about
how to integrate with that so I did a quick "sledgehammer" rebase that
disables itself if parallelism is in the picture.
Yes, sharding has been waiting on parallel FDW scans. Would this work
for parallel partition scans if the partitions were FDWs?
--
Bruce Momjian <bruce@momjian.us> http://momjian.us
EnterpriseDB http://enterprisedb.com
+ As you are, so once was I. As I am, so you will be. +
+ Ancient Roman grave inscription +
On Sat, Sep 28, 2019 at 4:20 AM Bruce Momjian <bruce@momjian.us> wrote:
On Wed, Sep 4, 2019 at 06:18:31PM +1200, Thomas Munro wrote:
A few years back[1] I experimented with a simple readiness API that
would allow Append to start emitting tuples from whichever Foreign
Scan has data available, when working with FDW-based sharding. I used
that primarily as a way to test Andres's new WaitEventSet stuff and my
kqueue implementation of that, but I didn't pursue it seriously
because I knew we wanted a more ambitious async executor rewrite and
many people had ideas about that, with schedulers capable of jumping
all over the tree etc.Anyway, Stephen Frost pinged me off-list to ask about that patch, and
asked why we don't just do this naive thing until we have something
better. It's a very localised feature that works only between Append
and its immediate children. The patch makes it work for postgres_fdw,
but it should work for any FDW that can get its hands on a socket.Here's a quick rebase of that old POC patch, along with a demo. Since
2016, Parallel Append landed, but I didn't have time to think about
how to integrate with that so I did a quick "sledgehammer" rebase that
disables itself if parallelism is in the picture.Yes, sharding has been waiting on parallel FDW scans. Would this work
for parallel partition scans if the partitions were FDWs?
Yeah, this works for partitions that are FDWs (as shown), but only for
Append, not for Parallel Append. So you'd have parallelism in the
sense that your N remote shard servers are all doing stuff at the same
time, but it couldn't be in a parallel query on your 'home' server,
which is probably good for things that push down aggregation and bring
back just a few tuples from each shard, but bad for anything wanting
to ship back millions of tuples to chew on locally. Do you think
that'd be useful enough on its own?
The problem is that parallel safe non-partial plans (like postgres_fdw
scans) are exclusively 'claimed' by one process under Parallel Append,
so with the patch as posted, if you modify it to allow parallelism
then it'll probably give correct answers but nothing prevents a single
process from claiming and starting all the scans and then waiting for
them to be ready, while the other processes miss out on doing any work
at all. There's probably some kludgy solution involving not letting
any one worker start more than X, and some space cadet solution
involving passing sockets around and teaching libpq to hand over
connections at certain controlled phases of the protocol (due to lack
of threads), but nothing like that has jumped out as the right path so
far.
One idea that seems promising but requires a bunch more infrastructure
is to offload the libpq multiplexing to a background worker that owns
all the sockets, and have it push tuples into a multi-consumer shared
memory queue that regular executor processes could read from. I have
been wondering if that would be best done by each FDW implementation,
or if there is a way to make a generic infrastructure for converting
parallel-safe executor nodes into partial plans by the use of a
'Scatter' (opposite of Gather) node that can spread the output of any
node over many workers.
If you had that, you'd still want a way for Parallel Append to be
readiness-based, but it would probably look a bit different to this
patch because it'd need to use (vapourware) multiconsumer shm queue
readiness, not fd readiness. And another kind of fd-readiness
multiplexing would be going on inside the new (vapourware) worker that
handles all the libpq connections (and maybe other kinds of work for
other FDWs that are able to expose a socket).
On Sun, Nov 17, 2019 at 09:54:55PM +1300, Thomas Munro wrote:
On Sat, Sep 28, 2019 at 4:20 AM Bruce Momjian <bruce@momjian.us> wrote:
On Wed, Sep 4, 2019 at 06:18:31PM +1200, Thomas Munro wrote:
A few years back[1] I experimented with a simple readiness API that
would allow Append to start emitting tuples from whichever Foreign
Scan has data available, when working with FDW-based sharding. I used
that primarily as a way to test Andres's new WaitEventSet stuff and my
kqueue implementation of that, but I didn't pursue it seriously
because I knew we wanted a more ambitious async executor rewrite and
many people had ideas about that, with schedulers capable of jumping
all over the tree etc.Anyway, Stephen Frost pinged me off-list to ask about that patch, and
asked why we don't just do this naive thing until we have something
better. It's a very localised feature that works only between Append
and its immediate children. The patch makes it work for postgres_fdw,
but it should work for any FDW that can get its hands on a socket.Here's a quick rebase of that old POC patch, along with a demo. Since
2016, Parallel Append landed, but I didn't have time to think about
how to integrate with that so I did a quick "sledgehammer" rebase that
disables itself if parallelism is in the picture.Yes, sharding has been waiting on parallel FDW scans. Would this work
for parallel partition scans if the partitions were FDWs?Yeah, this works for partitions that are FDWs (as shown), but only for
Append, not for Parallel Append. So you'd have parallelism in the
sense that your N remote shard servers are all doing stuff at the same
time, but it couldn't be in a parallel query on your 'home' server,
which is probably good for things that push down aggregation and bring
back just a few tuples from each shard, but bad for anything wanting
to ship back millions of tuples to chew on locally. Do you think
that'd be useful enough on its own?
Yes, I think so. There are many data warehouse queries that want to
return only aggregate values, or filter for a small number of rows.
Even OLTP queries might return only a few rows from multiple partitions.
This would allow for a proof-of-concept implementation so we can see how
realistic this approach is.
The problem is that parallel safe non-partial plans (like postgres_fdw
scans) are exclusively 'claimed' by one process under Parallel Append,
so with the patch as posted, if you modify it to allow parallelism
then it'll probably give correct answers but nothing prevents a single
process from claiming and starting all the scans and then waiting for
them to be ready, while the other processes miss out on doing any work
at all. There's probably some kludgy solution involving not letting
any one worker start more than X, and some space cadet solution
involving passing sockets around and teaching libpq to hand over
connections at certain controlled phases of the protocol (due to lack
of threads), but nothing like that has jumped out as the right path so
far.
I am unclear how many queries can do any meaningful work until all
shards have giving their full results.
--
Bruce Momjian <bruce@momjian.us> http://momjian.us
EnterpriseDB http://enterprisedb.com
+ As you are, so once was I. As I am, so you will be. +
+ Ancient Roman grave inscription +
Hello.
At Sat, 30 Nov 2019 14:26:11 -0500, Bruce Momjian <bruce@momjian.us> wrote in
On Sun, Nov 17, 2019 at 09:54:55PM +1300, Thomas Munro wrote:
On Sat, Sep 28, 2019 at 4:20 AM Bruce Momjian <bruce@momjian.us> wrote:
On Wed, Sep 4, 2019 at 06:18:31PM +1200, Thomas Munro wrote:
A few years back[1] I experimented with a simple readiness API that
would allow Append to start emitting tuples from whichever Foreign
Scan has data available, when working with FDW-based sharding. I used
that primarily as a way to test Andres's new WaitEventSet stuff and my
kqueue implementation of that, but I didn't pursue it seriously
because I knew we wanted a more ambitious async executor rewrite and
many people had ideas about that, with schedulers capable of jumping
all over the tree etc.Anyway, Stephen Frost pinged me off-list to ask about that patch, and
asked why we don't just do this naive thing until we have something
better. It's a very localised feature that works only between Append
and its immediate children. The patch makes it work for postgres_fdw,
but it should work for any FDW that can get its hands on a socket.Here's a quick rebase of that old POC patch, along with a demo. Since
2016, Parallel Append landed, but I didn't have time to think about
how to integrate with that so I did a quick "sledgehammer" rebase that
disables itself if parallelism is in the picture.Yes, sharding has been waiting on parallel FDW scans. Would this work
for parallel partition scans if the partitions were FDWs?Yeah, this works for partitions that are FDWs (as shown), but only for
Append, not for Parallel Append. So you'd have parallelism in the
sense that your N remote shard servers are all doing stuff at the same
time, but it couldn't be in a parallel query on your 'home' server,
which is probably good for things that push down aggregation and bring
back just a few tuples from each shard, but bad for anything wanting
to ship back millions of tuples to chew on locally. Do you think
that'd be useful enough on its own?Yes, I think so. There are many data warehouse queries that want to
return only aggregate values, or filter for a small number of rows.
Even OLTP queries might return only a few rows from multiple partitions.
This would allow for a proof-of-concept implementation so we can see how
realistic this approach is.The problem is that parallel safe non-partial plans (like postgres_fdw
scans) are exclusively 'claimed' by one process under Parallel Append,
so with the patch as posted, if you modify it to allow parallelism
then it'll probably give correct answers but nothing prevents a single
process from claiming and starting all the scans and then waiting for
them to be ready, while the other processes miss out on doing any work
at all. There's probably some kludgy solution involving not letting
any one worker start more than X, and some space cadet solution
involving passing sockets around and teaching libpq to hand over
connections at certain controlled phases of the protocol (due to lack
of threads), but nothing like that has jumped out as the right path so
far.I am unclear how many queries can do any meaningful work until all
shards have giving their full results.
There's my pending (somewhat stale) patch, which allows to run local
scans while waiting for remote servers.
/messages/by-id/20180515.202945.69332784.horiguchi.kyotaro@lab.ntt.co.jp
I (or we) wanted to introduce the asynchronous node mechanism as the
basis of async-capable postgres_fdw. The reason why it is stopping is
that we are seeing and I am waiting the executor change that makes
executor push-up style, on which the async-node mechanism will be
constructed. If that won't happen shortly, I'd like to continue that
work..
regards.
--
Kyotaro Horiguchi
NTT Open Source Software Center
On Thu, Dec 5, 2019 at 4:26 PM Kyotaro Horiguchi
<horikyota.ntt@gmail.com> wrote:
There's my pending (somewhat stale) patch, which allows to run local
scans while waiting for remote servers./messages/by-id/20180515.202945.69332784.horiguchi.kyotaro@lab.ntt.co.jp
I (or we) wanted to introduce the asynchronous node mechanism as the
basis of async-capable postgres_fdw. The reason why it is stopping is
that we are seeing and I am waiting the executor change that makes
executor push-up style, on which the async-node mechanism will be
constructed. If that won't happen shortly, I'd like to continue that
work..
After rereading some threads to remind myself what happened here...
right, my little patch began life in March 2016[1]/messages/by-id/CAEepm=1CuAWfxDk==jZ7pgCDCv52fiUnDSpUvmznmVmRKU5zpA@mail.gmail.com when I wanted a
test case to test Andres's work on WaitEventSets, and your patch set
started a couple of months later and is vastly more ambitious[2]/messages/by-id/CA+Tgmobx8su_bYtAa3DgrqB+R7xZG6kHRj0ccMUUshKAQVftww@mail.gmail.com[3]/messages/by-id/CA+TgmoaXQEt4tZ03FtQhnzeDEMzBck+Lrni0UWHVVgOTnA6C1w@mail.gmail.com.
It wants to escape from the volcano give-me-one-tuple-or-give-me-EOF
model. And I totally agree that there are lots of reason to want to
do that (including yielding to other parts of the plan instead of
waiting for I/O, locks and some parallelism primitives enabling new
kinds of parallelism), and I'm hoping to help with some small pieces
of that if I can.
My patch set (rebased upthread) was extremely primitive, with no new
planner concepts, and added only a very simple new executor node
method: ExecReady(). Append used that to try to ask its children if
they'd like some time to warm up. By default, ExecReady() says "I
don't know what you're talking about, go away", but FDWs can provide
an implementation that says "yes, please call me again when this fd is
ready" or "yes, I am ready, please call ExecProc() now". It doesn't
deal with anything more complicated than that, and in particular it
doesn't work if there are extra planner nodes in between Append and
the foreign scan. (It also doesn't mix particularly well with
parallelism, as mentioned.)
The reason I reposted this unambitious work is because Stephen keeps
asking me why we don't consider the stupidly simple thing that would
help with simple foreign partition-based queries today, instead of
waiting for someone to redesign the entire executor, because that's
... really hard.
[1]: /messages/by-id/CAEepm=1CuAWfxDk==jZ7pgCDCv52fiUnDSpUvmznmVmRKU5zpA@mail.gmail.com
[2]: /messages/by-id/CA+Tgmobx8su_bYtAa3DgrqB+R7xZG6kHRj0ccMUUshKAQVftww@mail.gmail.com
[3]: /messages/by-id/CA+TgmoaXQEt4tZ03FtQhnzeDEMzBck+Lrni0UWHVVgOTnA6C1w@mail.gmail.com
On Thu, Dec 5, 2019 at 05:45:24PM +1300, Thomas Munro wrote:
My patch set (rebased upthread) was extremely primitive, with no new
planner concepts, and added only a very simple new executor node
method: ExecReady(). Append used that to try to ask its children if
they'd like some time to warm up. By default, ExecReady() says "I
don't know what you're talking about, go away", but FDWs can provide
an implementation that says "yes, please call me again when this fd is
ready" or "yes, I am ready, please call ExecProc() now". It doesn't
deal with anything more complicated than that, and in particular it
doesn't work if there are extra planner nodes in between Append and
the foreign scan. (It also doesn't mix particularly well with
parallelism, as mentioned.)The reason I reposted this unambitious work is because Stephen keeps
asking me why we don't consider the stupidly simple thing that would
help with simple foreign partition-based queries today, instead of
waiting for someone to redesign the entire executor, because that's
... really hard.
I agree with Stephen's request. We have been waiting for the executor
rewrite for a while, so let's just do something simple and see how it
performs.
--
Bruce Momjian <bruce@momjian.us> http://momjian.us
EnterpriseDB http://enterprisedb.com
+ As you are, so once was I. As I am, so you will be. +
+ Ancient Roman grave inscription +
On Thu, Dec 5, 2019 at 1:12 PM Bruce Momjian <bruce@momjian.us> wrote:
I agree with Stephen's request. We have been waiting for the executor
rewrite for a while, so let's just do something simple and see how it
performs.
I'm sympathetic to the frustration here, and I think it would be great
if we could find a way forward that doesn't involve waiting for a full
rewrite of the executor. However, I seem to remember that when we
tested the various patches that various people had written for this
feature (I wrote one, too) they all had a noticeable performance
penalty in the case of a plain old Append that involved no FDWs and
nothing asynchronous. I don't think it's OK to have, say, a 2%
regression on every query that involves an Append, because especially
now that we have partitioning, that's a lot of queries.
I don't know whether this patch has that kind of problem. If it
doesn't, I would consider that a promising sign.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
On Fri, Dec 6, 2019 at 9:20 AM Robert Haas <robertmhaas@gmail.com> wrote:
On Thu, Dec 5, 2019 at 1:12 PM Bruce Momjian <bruce@momjian.us> wrote:
I agree with Stephen's request. We have been waiting for the executor
rewrite for a while, so let's just do something simple and see how it
performs.I'm sympathetic to the frustration here, and I think it would be great
if we could find a way forward that doesn't involve waiting for a full
rewrite of the executor. However, I seem to remember that when we
tested the various patches that various people had written for this
feature (I wrote one, too) they all had a noticeable performance
penalty in the case of a plain old Append that involved no FDWs and
nothing asynchronous. I don't think it's OK to have, say, a 2%
regression on every query that involves an Append, because especially
now that we have partitioning, that's a lot of queries.I don't know whether this patch has that kind of problem. If it
doesn't, I would consider that a promising sign.
I'll look into that. If there is a measurable impact, I suspect it
can be avoided by, for example, installing a different ExecProcNode
function.
At Fri, 6 Dec 2019 10:03:44 +1300, Thomas Munro <thomas.munro@gmail.com> wrote in
On Fri, Dec 6, 2019 at 9:20 AM Robert Haas <robertmhaas@gmail.com> wrote:
I don't know whether this patch has that kind of problem. If it
doesn't, I would consider that a promising sign.I'll look into that. If there is a measurable impact, I suspect it
can be avoided by, for example, installing a different ExecProcNode
function.
Replacing ExecProcNode perfectly isolates additional process in
ExecAppendAsync. Thus, for pure local appends, the patch can impact
performance through only planner and execinit. But I don't believe it
cannot be as large as observable in a large scan.
As the mail pointed upthread, the patch acceleartes all remote cases
when fetch_size is >= 200. The problem was that local scans seemed
slightly slowed down. I dusted off the old patch (FWIW I attached it)
and.. will re-run on the current development environment. (And
re-check the code.).
regards.
--
Kyotaro Horiguchi
NTT Open Source Software Center
Attachments:
v1-0001-Allow-wait-event-set-to-be-registered-to-resource.patchtext/x-patch; charset=us-asciiDownload
From 9f5dc3720ddade94cd66713f4aa79da575b09e31 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Mon, 22 May 2017 12:42:58 +0900
Subject: [PATCH v1 1/3] Allow wait event set to be registered to resource
owner
WaitEventSet needs to be released using resource owner for a certain
case. This change adds WaitEventSet reowner and allow the creator of a
WaitEventSet to specify a resource owner.
---
src/backend/libpq/pqcomm.c | 2 +-
src/backend/storage/ipc/latch.c | 18 ++++-
src/backend/storage/lmgr/condition_variable.c | 2 +-
src/backend/utils/resowner/resowner.c | 67 +++++++++++++++++++
src/include/storage/latch.h | 4 +-
src/include/utils/resowner_private.h | 8 +++
6 files changed, 96 insertions(+), 5 deletions(-)
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index cd517e8bb4..3912b8b3a0 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -220,7 +220,7 @@ pq_init(void)
(errmsg("could not set socket to nonblocking mode: %m")));
#endif
- FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, 3);
+ FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, NULL, 3);
AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock,
NULL, NULL);
AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, -1, MyLatch, NULL);
diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index 2426cbcf8e..dc04ee5f6f 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -52,6 +52,7 @@
#include "storage/latch.h"
#include "storage/pmsignal.h"
#include "storage/shmem.h"
+#include "utils/resowner_private.h"
/*
* Select the fd readiness primitive to use. Normally the "most modern"
@@ -78,6 +79,8 @@ struct WaitEventSet
int nevents; /* number of registered events */
int nevents_space; /* maximum number of events in this set */
+ ResourceOwner resowner; /* Resource owner */
+
/*
* Array, of nevents_space length, storing the definition of events this
* set is waiting for.
@@ -372,7 +375,7 @@ WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock,
int ret = 0;
int rc;
WaitEvent event;
- WaitEventSet *set = CreateWaitEventSet(CurrentMemoryContext, 3);
+ WaitEventSet *set = CreateWaitEventSet(CurrentMemoryContext, NULL, 3);
if (wakeEvents & WL_TIMEOUT)
Assert(timeout >= 0);
@@ -539,12 +542,15 @@ ResetLatch(Latch *latch)
* WaitEventSetWait().
*/
WaitEventSet *
-CreateWaitEventSet(MemoryContext context, int nevents)
+CreateWaitEventSet(MemoryContext context, ResourceOwner res, int nevents)
{
WaitEventSet *set;
char *data;
Size sz = 0;
+ if (res)
+ ResourceOwnerEnlargeWESs(res);
+
/*
* Use MAXALIGN size/alignment to guarantee that later uses of memory are
* aligned correctly. E.g. epoll_event might need 8 byte alignment on some
@@ -614,6 +620,11 @@ CreateWaitEventSet(MemoryContext context, int nevents)
StaticAssertStmt(WSA_INVALID_EVENT == NULL, "");
#endif
+ /* Register this wait event set if requested */
+ set->resowner = res;
+ if (res)
+ ResourceOwnerRememberWES(set->resowner, set);
+
return set;
}
@@ -655,6 +666,9 @@ FreeWaitEventSet(WaitEventSet *set)
}
#endif
+ if (set->resowner != NULL)
+ ResourceOwnerForgetWES(set->resowner, set);
+
pfree(set);
}
diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c
index e08507f0cc..5e88c48a1c 100644
--- a/src/backend/storage/lmgr/condition_variable.c
+++ b/src/backend/storage/lmgr/condition_variable.c
@@ -70,7 +70,7 @@ ConditionVariablePrepareToSleep(ConditionVariable *cv)
{
WaitEventSet *new_event_set;
- new_event_set = CreateWaitEventSet(TopMemoryContext, 2);
+ new_event_set = CreateWaitEventSet(TopMemoryContext, NULL, 2);
AddWaitEventToSet(new_event_set, WL_LATCH_SET, PGINVALID_SOCKET,
MyLatch, NULL);
AddWaitEventToSet(new_event_set, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c
index 7be11c48ab..829034516f 100644
--- a/src/backend/utils/resowner/resowner.c
+++ b/src/backend/utils/resowner/resowner.c
@@ -128,6 +128,7 @@ typedef struct ResourceOwnerData
ResourceArray filearr; /* open temporary files */
ResourceArray dsmarr; /* dynamic shmem segments */
ResourceArray jitarr; /* JIT contexts */
+ ResourceArray wesarr; /* wait event sets */
/* We can remember up to MAX_RESOWNER_LOCKS references to local locks. */
int nlocks; /* number of owned locks */
@@ -175,6 +176,7 @@ static void PrintTupleDescLeakWarning(TupleDesc tupdesc);
static void PrintSnapshotLeakWarning(Snapshot snapshot);
static void PrintFileLeakWarning(File file);
static void PrintDSMLeakWarning(dsm_segment *seg);
+static void PrintWESLeakWarning(WaitEventSet *events);
/*****************************************************************************
@@ -444,6 +446,7 @@ ResourceOwnerCreate(ResourceOwner parent, const char *name)
ResourceArrayInit(&(owner->filearr), FileGetDatum(-1));
ResourceArrayInit(&(owner->dsmarr), PointerGetDatum(NULL));
ResourceArrayInit(&(owner->jitarr), PointerGetDatum(NULL));
+ ResourceArrayInit(&(owner->wesarr), PointerGetDatum(NULL));
return owner;
}
@@ -553,6 +556,16 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
jit_release_context(context);
}
+
+ /* Ditto for wait event sets */
+ while (ResourceArrayGetAny(&(owner->wesarr), &foundres))
+ {
+ WaitEventSet *event = (WaitEventSet *) DatumGetPointer(foundres);
+
+ if (isCommit)
+ PrintWESLeakWarning(event);
+ FreeWaitEventSet(event);
+ }
}
else if (phase == RESOURCE_RELEASE_LOCKS)
{
@@ -701,6 +714,7 @@ ResourceOwnerDelete(ResourceOwner owner)
Assert(owner->filearr.nitems == 0);
Assert(owner->dsmarr.nitems == 0);
Assert(owner->jitarr.nitems == 0);
+ Assert(owner->wesarr.nitems == 0);
Assert(owner->nlocks == 0 || owner->nlocks == MAX_RESOWNER_LOCKS + 1);
/*
@@ -728,6 +742,7 @@ ResourceOwnerDelete(ResourceOwner owner)
ResourceArrayFree(&(owner->filearr));
ResourceArrayFree(&(owner->dsmarr));
ResourceArrayFree(&(owner->jitarr));
+ ResourceArrayFree(&(owner->wesarr));
pfree(owner);
}
@@ -1346,3 +1361,55 @@ ResourceOwnerForgetJIT(ResourceOwner owner, Datum handle)
elog(ERROR, "JIT context %p is not owned by resource owner %s",
DatumGetPointer(handle), owner->name);
}
+
+/*
+ * wait event set reference array.
+ *
+ * This is separate from actually inserting an entry because if we run out
+ * of memory, it's critical to do so *before* acquiring the resource.
+ */
+void
+ResourceOwnerEnlargeWESs(ResourceOwner owner)
+{
+ ResourceArrayEnlarge(&(owner->wesarr));
+}
+
+/*
+ * Remember that a wait event set is owned by a ResourceOwner
+ *
+ * Caller must have previously done ResourceOwnerEnlargeWESs()
+ */
+void
+ResourceOwnerRememberWES(ResourceOwner owner, WaitEventSet *events)
+{
+ ResourceArrayAdd(&(owner->wesarr), PointerGetDatum(events));
+}
+
+/*
+ * Forget that a wait event set is owned by a ResourceOwner
+ */
+void
+ResourceOwnerForgetWES(ResourceOwner owner, WaitEventSet *events)
+{
+ /*
+ * XXXX: There's no property to show as an identier of a wait event set,
+ * use its pointer instead.
+ */
+ if (!ResourceArrayRemove(&(owner->wesarr), PointerGetDatum(events)))
+ elog(ERROR, "wait event set %p is not owned by resource owner %s",
+ events, owner->name);
+}
+
+/*
+ * Debugging subroutine
+ */
+static void
+PrintWESLeakWarning(WaitEventSet *events)
+{
+ /*
+ * XXXX: There's no property to show as an identier of a wait event set,
+ * use its pointer instead.
+ */
+ elog(WARNING, "wait event set leak: %p still referenced",
+ events);
+}
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index bd7af11a8a..d136614587 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -101,6 +101,7 @@
#define LATCH_H
#include <signal.h>
+#include "utils/resowner.h"
/*
* Latch structure should be treated as opaque and only accessed through
@@ -163,7 +164,8 @@ extern void DisownLatch(Latch *latch);
extern void SetLatch(Latch *latch);
extern void ResetLatch(Latch *latch);
-extern WaitEventSet *CreateWaitEventSet(MemoryContext context, int nevents);
+extern WaitEventSet *CreateWaitEventSet(MemoryContext context,
+ ResourceOwner res, int nevents);
extern void FreeWaitEventSet(WaitEventSet *set);
extern int AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd,
Latch *latch, void *user_data);
diff --git a/src/include/utils/resowner_private.h b/src/include/utils/resowner_private.h
index b8261ad866..9c9c845784 100644
--- a/src/include/utils/resowner_private.h
+++ b/src/include/utils/resowner_private.h
@@ -18,6 +18,7 @@
#include "storage/dsm.h"
#include "storage/fd.h"
+#include "storage/latch.h"
#include "storage/lock.h"
#include "utils/catcache.h"
#include "utils/plancache.h"
@@ -95,4 +96,11 @@ extern void ResourceOwnerRememberJIT(ResourceOwner owner,
extern void ResourceOwnerForgetJIT(ResourceOwner owner,
Datum handle);
+/* support for wait event set management */
+extern void ResourceOwnerEnlargeWESs(ResourceOwner owner);
+extern void ResourceOwnerRememberWES(ResourceOwner owner,
+ WaitEventSet *);
+extern void ResourceOwnerForgetWES(ResourceOwner owner,
+ WaitEventSet *);
+
#endif /* RESOWNER_PRIVATE_H */
--
2.23.0
v1-0002-infrastructure-for-asynchronous-execution.patchtext/x-patch; charset=us-asciiDownload
From 65fc69bb64f4596aff0c37a2d090ddf1609120bb Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Tue, 15 May 2018 20:21:32 +0900
Subject: [PATCH v1 2/3] infrastructure for asynchronous execution
This patch add an infrastructure for asynchronous execution. As a PoC
this makes only Append capable to handle asynchronously executable
subnodes.
---
src/backend/commands/explain.c | 17 ++
src/backend/executor/Makefile | 1 +
src/backend/executor/execAsync.c | 145 ++++++++++++
src/backend/executor/nodeAppend.c | 286 +++++++++++++++++++++---
src/backend/executor/nodeForeignscan.c | 22 +-
src/backend/nodes/bitmapset.c | 72 ++++++
src/backend/nodes/copyfuncs.c | 2 +
src/backend/nodes/outfuncs.c | 2 +
src/backend/nodes/readfuncs.c | 2 +
src/backend/optimizer/plan/createplan.c | 83 +++++++
src/backend/postmaster/pgstat.c | 3 +
src/backend/postmaster/syslogger.c | 2 +-
src/backend/utils/adt/ruleutils.c | 8 +-
src/include/executor/execAsync.h | 23 ++
src/include/executor/executor.h | 1 +
src/include/executor/nodeForeignscan.h | 3 +
src/include/foreign/fdwapi.h | 11 +
src/include/nodes/bitmapset.h | 1 +
src/include/nodes/execnodes.h | 20 +-
src/include/nodes/plannodes.h | 9 +
src/include/pgstat.h | 3 +-
21 files changed, 676 insertions(+), 40 deletions(-)
create mode 100644 src/backend/executor/execAsync.c
create mode 100644 src/include/executor/execAsync.h
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 62fb3434a3..9f06e1fbdc 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -82,6 +82,7 @@ static void show_sort_keys(SortState *sortstate, List *ancestors,
ExplainState *es);
static void show_merge_append_keys(MergeAppendState *mstate, List *ancestors,
ExplainState *es);
+static void show_append_info(AppendState *astate, ExplainState *es);
static void show_agg_keys(AggState *astate, List *ancestors,
ExplainState *es);
static void show_grouping_sets(PlanState *planstate, Agg *agg,
@@ -1319,6 +1320,8 @@ ExplainNode(PlanState *planstate, List *ancestors,
}
if (plan->parallel_aware)
appendStringInfoString(es->str, "Parallel ");
+ if (plan->async_capable)
+ appendStringInfoString(es->str, "Async ");
appendStringInfoString(es->str, pname);
es->indent++;
}
@@ -1860,6 +1863,11 @@ ExplainNode(PlanState *planstate, List *ancestors,
case T_Hash:
show_hash_info(castNode(HashState, planstate), es);
break;
+
+ case T_Append:
+ show_append_info(castNode(AppendState, planstate), es);
+ break;
+
default:
break;
}
@@ -2197,6 +2205,15 @@ show_merge_append_keys(MergeAppendState *mstate, List *ancestors,
ancestors, es);
}
+static void
+show_append_info(AppendState *astate, ExplainState *es)
+{
+ Append *plan = (Append *) astate->ps.plan;
+
+ if (plan->nasyncplans > 0)
+ ExplainPropertyInteger("Async subplans", "", plan->nasyncplans, es);
+}
+
/*
* Show the grouping keys for an Agg node.
*/
diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile
index a983800e4b..8a2d6e9961 100644
--- a/src/backend/executor/Makefile
+++ b/src/backend/executor/Makefile
@@ -14,6 +14,7 @@ include $(top_builddir)/src/Makefile.global
OBJS = \
execAmi.o \
+ execAsync.o \
execCurrent.o \
execExpr.o \
execExprInterp.o \
diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c
new file mode 100644
index 0000000000..db477e2cf6
--- /dev/null
+++ b/src/backend/executor/execAsync.c
@@ -0,0 +1,145 @@
+/*-------------------------------------------------------------------------
+ *
+ * execAsync.c
+ * Support routines for asynchronous execution.
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/executor/execAsync.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "executor/execAsync.h"
+#include "executor/nodeAppend.h"
+#include "executor/nodeForeignscan.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "utils/memutils.h"
+#include "utils/resowner.h"
+
+void ExecAsyncSetState(PlanState *pstate, AsyncState status)
+{
+ pstate->asyncstate = status;
+}
+
+bool
+ExecAsyncConfigureWait(WaitEventSet *wes, PlanState *node,
+ void *data, bool reinit)
+{
+ switch (nodeTag(node))
+ {
+ case T_ForeignScanState:
+ return ExecForeignAsyncConfigureWait((ForeignScanState *)node,
+ wes, data, reinit);
+ break;
+ default:
+ elog(ERROR, "unrecognized node type: %d",
+ (int) nodeTag(node));
+ }
+}
+
+/*
+ * struct for memory context callback argument used in ExecAsyncEventWait
+ */
+typedef struct {
+ int **p_refind;
+ int *p_refindsize;
+} ExecAsync_mcbarg;
+
+/*
+ * callback function to reset static variables pointing to the memory in
+ * TopTransactionContext in ExecAsyncEventWait.
+ */
+static void ExecAsyncMemoryContextCallback(void *arg)
+{
+ /* arg is the address of the variable refind in ExecAsyncEventWait */
+ ExecAsync_mcbarg *mcbarg = (ExecAsync_mcbarg *) arg;
+ *mcbarg->p_refind = NULL;
+ *mcbarg->p_refindsize = 0;
+}
+
+#define EVENT_BUFFER_SIZE 16
+
+Bitmapset *
+ExecAsyncEventWait(PlanState **nodes, Bitmapset *waitnodes, long timeout)
+{
+ static int *refind = NULL;
+ static int refindsize = 0;
+ WaitEventSet *wes;
+ WaitEvent occurred_event[EVENT_BUFFER_SIZE];
+ int noccurred = 0;
+ Bitmapset *fired_events = NULL;
+ int i;
+ int n;
+
+ n = bms_num_members(waitnodes);
+ wes = CreateWaitEventSet(TopTransactionContext,
+ TopTransactionResourceOwner, n);
+ if (refindsize < n)
+ {
+ if (refindsize == 0)
+ refindsize = EVENT_BUFFER_SIZE; /* XXX */
+ while (refindsize < n)
+ refindsize *= 2;
+ if (refind)
+ refind = (int *) repalloc(refind, refindsize * sizeof(int));
+ else
+ {
+ static ExecAsync_mcbarg mcb_arg =
+ { &refind, &refindsize };
+ static MemoryContextCallback mcb =
+ { ExecAsyncMemoryContextCallback, (void *)&mcb_arg, NULL };
+ MemoryContext oldctxt =
+ MemoryContextSwitchTo(TopTransactionContext);
+
+ /*
+ * refind points to a memory block in
+ * TopTransactionContext. Register a callback to reset it.
+ */
+ MemoryContextRegisterResetCallback(TopTransactionContext, &mcb);
+ refind = (int *) palloc(refindsize * sizeof(int));
+ MemoryContextSwitchTo(oldctxt);
+ }
+ }
+
+ n = 0;
+ for (i = bms_next_member(waitnodes, -1) ; i >= 0 ;
+ i = bms_next_member(waitnodes, i))
+ {
+ refind[i] = i;
+ if (ExecAsyncConfigureWait(wes, nodes[i], refind + i, true))
+ n++;
+ }
+
+ if (n == 0)
+ {
+ FreeWaitEventSet(wes);
+ return NULL;
+ }
+
+ noccurred = WaitEventSetWait(wes, timeout, occurred_event,
+ EVENT_BUFFER_SIZE,
+ WAIT_EVENT_ASYNC_WAIT);
+ FreeWaitEventSet(wes);
+ if (noccurred == 0)
+ return NULL;
+
+ for (i = 0 ; i < noccurred ; i++)
+ {
+ WaitEvent *w = &occurred_event[i];
+
+ if ((w->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) != 0)
+ {
+ int n = *(int*)w->user_data;
+
+ fired_events = bms_add_member(fired_events, n);
+ }
+ }
+
+ return fired_events;
+}
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index 5ff986ac7d..03dec4d648 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -60,6 +60,7 @@
#include "executor/execdebug.h"
#include "executor/execPartition.h"
#include "executor/nodeAppend.h"
+#include "executor/execAsync.h"
#include "miscadmin.h"
/* Shared state for parallel-aware Append. */
@@ -81,6 +82,7 @@ struct ParallelAppendState
#define NO_MATCHING_SUBPLANS -2
static TupleTableSlot *ExecAppend(PlanState *pstate);
+static TupleTableSlot *ExecAppendAsync(PlanState *pstate);
static bool choose_next_subplan_locally(AppendState *node);
static bool choose_next_subplan_for_leader(AppendState *node);
static bool choose_next_subplan_for_worker(AppendState *node);
@@ -104,22 +106,28 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
PlanState **appendplanstates;
Bitmapset *validsubplans;
int nplans;
+ int nasyncplans;
int firstvalid;
int i,
j;
/* check for unsupported flags */
- Assert(!(eflags & EXEC_FLAG_MARK));
+ Assert(!(eflags & (EXEC_FLAG_MARK | EXEC_FLAG_ASYNC)));
/*
* create new AppendState for our append node
*/
appendstate->ps.plan = (Plan *) node;
appendstate->ps.state = estate;
- appendstate->ps.ExecProcNode = ExecAppend;
+
+ /* choose appropriate version of Exec function */
+ if (node->nasyncplans == 0)
+ appendstate->ps.ExecProcNode = ExecAppend;
+ else
+ appendstate->ps.ExecProcNode = ExecAppendAsync;
/* Let choose_next_subplan_* function handle setting the first subplan */
- appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
+ appendstate->as_whichsyncplan = INVALID_SUBPLAN_INDEX;
/* If run-time partition pruning is enabled, then set that up now */
if (node->part_prune_info != NULL)
@@ -152,7 +160,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
*/
if (bms_is_empty(validsubplans))
{
- appendstate->as_whichplan = NO_MATCHING_SUBPLANS;
+ appendstate->as_whichsyncplan = NO_MATCHING_SUBPLANS;
/* Mark the first as valid so that it's initialized below */
validsubplans = bms_make_singleton(0);
@@ -212,10 +220,20 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
*/
j = 0;
firstvalid = nplans;
+ nasyncplans = 0;
+
i = -1;
while ((i = bms_next_member(validsubplans, i)) >= 0)
{
Plan *initNode = (Plan *) list_nth(node->appendplans, i);
+ int sub_eflags = eflags;
+
+ /* Let async-capable subplans run asynchronously */
+ if (i < node->nasyncplans)
+ {
+ sub_eflags |= EXEC_FLAG_ASYNC;
+ nasyncplans++;
+ }
/*
* Record the lowest appendplans index which is a valid partial plan.
@@ -223,13 +241,28 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
if (i >= node->first_partial_plan && j < firstvalid)
firstvalid = j;
- appendplanstates[j++] = ExecInitNode(initNode, estate, eflags);
+ appendplanstates[j++] = ExecInitNode(initNode, estate, sub_eflags);
}
appendstate->as_first_partial_plan = firstvalid;
appendstate->appendplans = appendplanstates;
appendstate->as_nplans = nplans;
+ /* fill in async stuff */
+ appendstate->as_nasyncplans = nasyncplans;
+ appendstate->as_syncdone = (nasyncplans == nplans);
+
+ if (appendstate->as_nasyncplans)
+ {
+ appendstate->as_asyncresult = (TupleTableSlot **)
+ palloc0(node->nasyncplans * sizeof(TupleTableSlot *));
+
+ /* initially, all async requests need a request */
+ for (i = 0; i < appendstate->as_nasyncplans; ++i)
+ appendstate->as_needrequest =
+ bms_add_member(appendstate->as_needrequest, i);
+ }
+
/*
* Miscellaneous initialization
*/
@@ -253,21 +286,23 @@ ExecAppend(PlanState *pstate)
{
AppendState *node = castNode(AppendState, pstate);
- if (node->as_whichplan < 0)
+ if (node->as_whichsyncplan < 0)
{
/*
* If no subplan has been chosen, we must choose one before
* proceeding.
*/
- if (node->as_whichplan == INVALID_SUBPLAN_INDEX &&
+ if (node->as_whichsyncplan == INVALID_SUBPLAN_INDEX &&
!node->choose_next_subplan(node))
return ExecClearTuple(node->ps.ps_ResultTupleSlot);
/* Nothing to do if there are no matching subplans */
- else if (node->as_whichplan == NO_MATCHING_SUBPLANS)
+ else if (node->as_whichsyncplan == NO_MATCHING_SUBPLANS)
return ExecClearTuple(node->ps.ps_ResultTupleSlot);
}
+ Assert(node->as_nasyncplans == 0);
+
for (;;)
{
PlanState *subnode;
@@ -278,8 +313,9 @@ ExecAppend(PlanState *pstate)
/*
* figure out which subplan we are currently processing
*/
- Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
- subnode = node->appendplans[node->as_whichplan];
+ Assert(node->as_whichsyncplan >= 0 &&
+ node->as_whichsyncplan < node->as_nplans);
+ subnode = node->appendplans[node->as_whichsyncplan];
/*
* get a tuple from the subplan
@@ -302,6 +338,175 @@ ExecAppend(PlanState *pstate)
}
}
+static TupleTableSlot *
+ExecAppendAsync(PlanState *pstate)
+{
+ AppendState *node = castNode(AppendState, pstate);
+ Bitmapset *needrequest;
+ int i;
+
+ Assert(node->as_nasyncplans > 0);
+
+restart:
+ if (node->as_nasyncresult > 0)
+ {
+ --node->as_nasyncresult;
+ return node->as_asyncresult[node->as_nasyncresult];
+ }
+
+ needrequest = node->as_needrequest;
+ node->as_needrequest = NULL;
+ while ((i = bms_first_member(needrequest)) >= 0)
+ {
+ TupleTableSlot *slot;
+ PlanState *subnode = node->appendplans[i];
+
+ slot = ExecProcNode(subnode);
+ if (subnode->asyncstate == AS_AVAILABLE)
+ {
+ if (!TupIsNull(slot))
+ {
+ node->as_asyncresult[node->as_nasyncresult++] = slot;
+ node->as_needrequest = bms_add_member(node->as_needrequest, i);
+ }
+ }
+ else
+ node->as_pending_async = bms_add_member(node->as_pending_async, i);
+ }
+ bms_free(needrequest);
+
+ for (;;)
+ {
+ TupleTableSlot *result;
+
+ /* return now if a result is available */
+ if (node->as_nasyncresult > 0)
+ {
+ --node->as_nasyncresult;
+ return node->as_asyncresult[node->as_nasyncresult];
+ }
+
+ while (!bms_is_empty(node->as_pending_async))
+ {
+ long timeout = node->as_syncdone ? -1 : 0;
+ Bitmapset *fired;
+ int i;
+
+ fired = ExecAsyncEventWait(node->appendplans,
+ node->as_pending_async,
+ timeout);
+
+ if (bms_is_empty(fired) && node->as_syncdone)
+ {
+ /*
+ * No subplan fired. This happens when even in normal
+ * operation where the subnode already prepared results before
+ * waiting. as_pending_result is storing stale information so
+ * restart from the beginning.
+ */
+ node->as_needrequest = node->as_pending_async;
+ node->as_pending_async = NULL;
+ goto restart;
+ }
+
+ while ((i = bms_first_member(fired)) >= 0)
+ {
+ TupleTableSlot *slot;
+ PlanState *subnode = node->appendplans[i];
+ slot = ExecProcNode(subnode);
+ if (subnode->asyncstate == AS_AVAILABLE)
+ {
+ if (!TupIsNull(slot))
+ {
+ node->as_asyncresult[node->as_nasyncresult++] = slot;
+ node->as_needrequest =
+ bms_add_member(node->as_needrequest, i);
+ }
+ node->as_pending_async =
+ bms_del_member(node->as_pending_async, i);
+ }
+ }
+ bms_free(fired);
+
+ /* return now if a result is available */
+ if (node->as_nasyncresult > 0)
+ {
+ --node->as_nasyncresult;
+ return node->as_asyncresult[node->as_nasyncresult];
+ }
+
+ if (!node->as_syncdone)
+ break;
+ }
+
+ /*
+ * If there is no asynchronous activity still pending and the
+ * synchronous activity is also complete, we're totally done scanning
+ * this node. Otherwise, we're done with the asynchronous stuff but
+ * must continue scanning the synchronous children.
+ */
+ if (node->as_syncdone)
+ {
+ Assert(bms_is_empty(node->as_pending_async));
+ return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ }
+
+ /*
+ * get a tuple from the subplan
+ */
+
+ if (node->as_whichsyncplan < 0)
+ {
+ /*
+ * If no subplan has been chosen, we must choose one before
+ * proceeding.
+ */
+ if (node->as_whichsyncplan == INVALID_SUBPLAN_INDEX &&
+ !node->choose_next_subplan(node))
+ {
+ node->as_syncdone = true;
+ return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ }
+
+ /* Nothing to do if there are no matching subplans */
+ else if (node->as_whichsyncplan == NO_MATCHING_SUBPLANS)
+ {
+ node->as_syncdone = true;
+ return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ }
+ }
+
+ result = ExecProcNode(node->appendplans[node->as_whichsyncplan]);
+
+ if (!TupIsNull(result))
+ {
+ /*
+ * If the subplan gave us something then return it as-is. We do
+ * NOT make use of the result slot that was set up in
+ * ExecInitAppend; there's no need for it.
+ */
+ return result;
+ }
+
+ /*
+ * Go on to the "next" subplan. If no more subplans, return the empty
+ * slot set up for us by ExecInitAppend, unless there are async plans
+ * we have yet to finish.
+ */
+ if (!node->choose_next_subplan(node))
+ {
+ node->as_syncdone = true;
+ if (bms_is_empty(node->as_pending_async))
+ {
+ Assert(bms_is_empty(node->as_needrequest));
+ return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ }
+ }
+
+ /* Else loop back and try to get a tuple from the new subplan */
+ }
+}
+
/* ----------------------------------------------------------------
* ExecEndAppend
*
@@ -348,6 +553,15 @@ ExecReScanAppend(AppendState *node)
node->as_valid_subplans = NULL;
}
+ /* Reset async state. */
+ for (i = 0; i < node->as_nasyncplans; ++i)
+ {
+ ExecShutdownNode(node->appendplans[i]);
+ node->as_needrequest = bms_add_member(node->as_needrequest, i);
+ }
+ node->as_nasyncresult = 0;
+ node->as_syncdone = (node->as_nasyncplans == node->as_nplans);
+
for (i = 0; i < node->as_nplans; i++)
{
PlanState *subnode = node->appendplans[i];
@@ -368,7 +582,7 @@ ExecReScanAppend(AppendState *node)
}
/* Let choose_next_subplan_* function handle setting the first subplan */
- node->as_whichplan = INVALID_SUBPLAN_INDEX;
+ node->as_whichsyncplan = INVALID_SUBPLAN_INDEX;
}
/* ----------------------------------------------------------------
@@ -456,7 +670,7 @@ ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
static bool
choose_next_subplan_locally(AppendState *node)
{
- int whichplan = node->as_whichplan;
+ int whichplan = node->as_whichsyncplan;
int nextplan;
/* We should never be called when there are no subplans */
@@ -475,6 +689,10 @@ choose_next_subplan_locally(AppendState *node)
node->as_valid_subplans =
ExecFindMatchingSubPlans(node->as_prune_state);
+ /* Exclude async plans */
+ if (node->as_nasyncplans > 0)
+ bms_del_range(node->as_valid_subplans, 0, node->as_nasyncplans - 1);
+
whichplan = -1;
}
@@ -489,7 +707,7 @@ choose_next_subplan_locally(AppendState *node)
if (nextplan < 0)
return false;
- node->as_whichplan = nextplan;
+ node->as_whichsyncplan = nextplan;
return true;
}
@@ -511,19 +729,19 @@ choose_next_subplan_for_leader(AppendState *node)
Assert(ScanDirectionIsForward(node->ps.state->es_direction));
/* We should never be called when there are no subplans */
- Assert(node->as_whichplan != NO_MATCHING_SUBPLANS);
+ Assert(node->as_whichsyncplan != NO_MATCHING_SUBPLANS);
LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
- if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
+ if (node->as_whichsyncplan != INVALID_SUBPLAN_INDEX)
{
/* Mark just-completed subplan as finished. */
- node->as_pstate->pa_finished[node->as_whichplan] = true;
+ node->as_pstate->pa_finished[node->as_whichsyncplan] = true;
}
else
{
/* Start with last subplan. */
- node->as_whichplan = node->as_nplans - 1;
+ node->as_whichsyncplan = node->as_nplans - 1;
/*
* If we've yet to determine the valid subplans then do so now. If
@@ -544,12 +762,12 @@ choose_next_subplan_for_leader(AppendState *node)
}
/* Loop until we find a subplan to execute. */
- while (pstate->pa_finished[node->as_whichplan])
+ while (pstate->pa_finished[node->as_whichsyncplan])
{
- if (node->as_whichplan == 0)
+ if (node->as_whichsyncplan == 0)
{
pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
- node->as_whichplan = INVALID_SUBPLAN_INDEX;
+ node->as_whichsyncplan = INVALID_SUBPLAN_INDEX;
LWLockRelease(&pstate->pa_lock);
return false;
}
@@ -558,12 +776,12 @@ choose_next_subplan_for_leader(AppendState *node)
* We needn't pay attention to as_valid_subplans here as all invalid
* plans have been marked as finished.
*/
- node->as_whichplan--;
+ node->as_whichsyncplan--;
}
/* If non-partial, immediately mark as finished. */
- if (node->as_whichplan < node->as_first_partial_plan)
- node->as_pstate->pa_finished[node->as_whichplan] = true;
+ if (node->as_whichsyncplan < node->as_first_partial_plan)
+ node->as_pstate->pa_finished[node->as_whichsyncplan] = true;
LWLockRelease(&pstate->pa_lock);
@@ -592,13 +810,13 @@ choose_next_subplan_for_worker(AppendState *node)
Assert(ScanDirectionIsForward(node->ps.state->es_direction));
/* We should never be called when there are no subplans */
- Assert(node->as_whichplan != NO_MATCHING_SUBPLANS);
+ Assert(node->as_whichsyncplan != NO_MATCHING_SUBPLANS);
LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
/* Mark just-completed subplan as finished. */
- if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
- node->as_pstate->pa_finished[node->as_whichplan] = true;
+ if (node->as_whichsyncplan != INVALID_SUBPLAN_INDEX)
+ node->as_pstate->pa_finished[node->as_whichsyncplan] = true;
/*
* If we've yet to determine the valid subplans then do so now. If
@@ -620,7 +838,7 @@ choose_next_subplan_for_worker(AppendState *node)
}
/* Save the plan from which we are starting the search. */
- node->as_whichplan = pstate->pa_next_plan;
+ node->as_whichsyncplan = pstate->pa_next_plan;
/* Loop until we find a valid subplan to execute. */
while (pstate->pa_finished[pstate->pa_next_plan])
@@ -634,7 +852,7 @@ choose_next_subplan_for_worker(AppendState *node)
/* Advance to the next valid plan. */
pstate->pa_next_plan = nextplan;
}
- else if (node->as_whichplan > node->as_first_partial_plan)
+ else if (node->as_whichsyncplan > node->as_first_partial_plan)
{
/*
* Try looping back to the first valid partial plan, if there is
@@ -643,7 +861,7 @@ choose_next_subplan_for_worker(AppendState *node)
nextplan = bms_next_member(node->as_valid_subplans,
node->as_first_partial_plan - 1);
pstate->pa_next_plan =
- nextplan < 0 ? node->as_whichplan : nextplan;
+ nextplan < 0 ? node->as_whichsyncplan : nextplan;
}
else
{
@@ -651,10 +869,10 @@ choose_next_subplan_for_worker(AppendState *node)
* At last plan, and either there are no partial plans or we've
* tried them all. Arrange to bail out.
*/
- pstate->pa_next_plan = node->as_whichplan;
+ pstate->pa_next_plan = node->as_whichsyncplan;
}
- if (pstate->pa_next_plan == node->as_whichplan)
+ if (pstate->pa_next_plan == node->as_whichsyncplan)
{
/* We've tried everything! */
pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
@@ -664,7 +882,7 @@ choose_next_subplan_for_worker(AppendState *node)
}
/* Pick the plan we found, and advance pa_next_plan one more time. */
- node->as_whichplan = pstate->pa_next_plan;
+ node->as_whichsyncplan = pstate->pa_next_plan;
pstate->pa_next_plan = bms_next_member(node->as_valid_subplans,
pstate->pa_next_plan);
@@ -691,8 +909,8 @@ choose_next_subplan_for_worker(AppendState *node)
}
/* If non-partial, immediately mark as finished. */
- if (node->as_whichplan < node->as_first_partial_plan)
- node->as_pstate->pa_finished[node->as_whichplan] = true;
+ if (node->as_whichsyncplan < node->as_first_partial_plan)
+ node->as_pstate->pa_finished[node->as_whichsyncplan] = true;
LWLockRelease(&pstate->pa_lock);
diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c
index 52af1dac5c..1a54383ec8 100644
--- a/src/backend/executor/nodeForeignscan.c
+++ b/src/backend/executor/nodeForeignscan.c
@@ -117,7 +117,6 @@ ExecForeignScan(PlanState *pstate)
(ExecScanRecheckMtd) ForeignRecheck);
}
-
/* ----------------------------------------------------------------
* ExecInitForeignScan
* ----------------------------------------------------------------
@@ -141,6 +140,10 @@ ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags)
scanstate->ss.ps.plan = (Plan *) node;
scanstate->ss.ps.state = estate;
scanstate->ss.ps.ExecProcNode = ExecForeignScan;
+ scanstate->ss.ps.asyncstate = AS_AVAILABLE;
+
+ if ((eflags & EXEC_FLAG_ASYNC) != 0)
+ scanstate->fs_async = true;
/*
* Miscellaneous initialization
@@ -384,3 +387,20 @@ ExecShutdownForeignScan(ForeignScanState *node)
if (fdwroutine->ShutdownForeignScan)
fdwroutine->ShutdownForeignScan(node);
}
+
+/* ----------------------------------------------------------------
+ * ExecAsyncForeignScanConfigureWait
+ *
+ * In async mode, configure for a wait
+ * ----------------------------------------------------------------
+ */
+bool
+ExecForeignAsyncConfigureWait(ForeignScanState *node, WaitEventSet *wes,
+ void *caller_data, bool reinit)
+{
+ FdwRoutine *fdwroutine = node->fdwroutine;
+
+ Assert(fdwroutine->ForeignAsyncConfigureWait != NULL);
+ return fdwroutine->ForeignAsyncConfigureWait(node, wes,
+ caller_data, reinit);
+}
diff --git a/src/backend/nodes/bitmapset.c b/src/backend/nodes/bitmapset.c
index 665149defe..5d4e19a052 100644
--- a/src/backend/nodes/bitmapset.c
+++ b/src/backend/nodes/bitmapset.c
@@ -895,6 +895,78 @@ bms_add_range(Bitmapset *a, int lower, int upper)
return a;
}
+/*
+ * bms_del_range
+ * Delete members in the range of 'lower' to 'upper' from the set.
+ *
+ * Note this could also be done by calling bms_del_member in a loop, however,
+ * using this function will be faster when the range is large as we work at
+ * the bitmapword level rather than at bit level.
+ */
+Bitmapset *
+bms_del_range(Bitmapset *a, int lower, int upper)
+{
+ int lwordnum,
+ lbitnum,
+ uwordnum,
+ ushiftbits,
+ wordnum;
+
+ if (lower < 0 || upper < 0)
+ elog(ERROR, "negative bitmapset member not allowed");
+ if (lower > upper)
+ elog(ERROR, "lower range must not be above upper range");
+ uwordnum = WORDNUM(upper);
+
+ if (a == NULL)
+ {
+ a = (Bitmapset *) palloc0(BITMAPSET_SIZE(uwordnum + 1));
+ a->nwords = uwordnum + 1;
+ }
+
+ /* ensure we have enough words to store the upper bit */
+ else if (uwordnum >= a->nwords)
+ {
+ int oldnwords = a->nwords;
+ int i;
+
+ a = (Bitmapset *) repalloc(a, BITMAPSET_SIZE(uwordnum + 1));
+ a->nwords = uwordnum + 1;
+ /* zero out the enlarged portion */
+ for (i = oldnwords; i < a->nwords; i++)
+ a->words[i] = 0;
+ }
+
+ wordnum = lwordnum = WORDNUM(lower);
+
+ lbitnum = BITNUM(lower);
+ ushiftbits = BITNUM(upper) + 1;
+
+ /*
+ * Special case when lwordnum is the same as uwordnum we must perform the
+ * upper and lower masking on the word.
+ */
+ if (lwordnum == uwordnum)
+ {
+ a->words[lwordnum] &= ((bitmapword) (((bitmapword) 1 << lbitnum) - 1)
+ | (~(bitmapword) 0) << ushiftbits);
+ }
+ else
+ {
+ /* turn off lbitnum and all bits left of it */
+ a->words[wordnum++] &= (bitmapword) (((bitmapword) 1 << lbitnum) - 1);
+
+ /* turn off all bits for any intermediate words */
+ while (wordnum < uwordnum)
+ a->words[wordnum++] = (bitmapword) 0;
+
+ /* turn off upper's bit and all bits right of it. */
+ a->words[uwordnum] &= (~(bitmapword) 0) << ushiftbits;
+ }
+
+ return a;
+}
+
/*
* bms_int_members - like bms_intersect, but left input is recycled
*/
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index a74b56bb59..a266904010 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -244,6 +244,8 @@ _copyAppend(const Append *from)
COPY_NODE_FIELD(appendplans);
COPY_SCALAR_FIELD(first_partial_plan);
COPY_NODE_FIELD(part_prune_info);
+ COPY_SCALAR_FIELD(nasyncplans);
+ COPY_SCALAR_FIELD(referent);
return newnode;
}
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index a80eccc2c1..bf87e721a5 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -434,6 +434,8 @@ _outAppend(StringInfo str, const Append *node)
WRITE_NODE_FIELD(appendplans);
WRITE_INT_FIELD(first_partial_plan);
WRITE_NODE_FIELD(part_prune_info);
+ WRITE_INT_FIELD(nasyncplans);
+ WRITE_INT_FIELD(referent);
}
static void
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index 764e3bb90c..25b84b3f15 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -1639,6 +1639,8 @@ _readAppend(void)
READ_NODE_FIELD(appendplans);
READ_INT_FIELD(first_partial_plan);
READ_NODE_FIELD(part_prune_info);
+ READ_INT_FIELD(nasyncplans);
+ READ_INT_FIELD(referent);
READ_DONE();
}
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index aee81bd755..c676980a30 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -207,6 +207,9 @@ static NamedTuplestoreScan *make_namedtuplestorescan(List *qptlist, List *qpqual
Index scanrelid, char *enrname);
static WorkTableScan *make_worktablescan(List *qptlist, List *qpqual,
Index scanrelid, int wtParam);
+static Append *make_append(List *appendplans, int first_partial_plan,
+ int nasyncplans, int referent,
+ List *tlist, PartitionPruneInfo *partpruneinfos);
static RecursiveUnion *make_recursive_union(List *tlist,
Plan *lefttree,
Plan *righttree,
@@ -292,6 +295,7 @@ static ModifyTable *make_modifytable(PlannerInfo *root,
List *rowMarks, OnConflictExpr *onconflict, int epqParam);
static GatherMerge *create_gather_merge_plan(PlannerInfo *root,
GatherMergePath *best_path);
+static bool is_async_capable_path(Path *path);
/*
@@ -1069,6 +1073,8 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
bool tlist_was_changed = false;
List *pathkeys = best_path->path.pathkeys;
List *subplans = NIL;
+ List *asyncplans = NIL;
+ List *syncplans = NIL;
ListCell *subpaths;
RelOptInfo *rel = best_path->path.parent;
PartitionPruneInfo *partpruneinfo = NULL;
@@ -1077,6 +1083,9 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
Oid *nodeSortOperators = NULL;
Oid *nodeCollations = NULL;
bool *nodeNullsFirst = NULL;
+ int nasyncplans = 0;
+ bool first = true;
+ bool referent_is_sync = true;
/*
* The subpaths list could be empty, if every child was proven empty by
@@ -1206,6 +1215,23 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
}
subplans = lappend(subplans, subplan);
+
+ /*
+ * Classify as async-capable or not. If we have decided to run the
+ * chidlren in parallel, we cannot any one of them run asynchronously.
+ */
+ if (!best_path->path.parallel_safe && is_async_capable_path(subpath))
+ {
+ subplan->async_capable = true;
+ asyncplans = lappend(asyncplans, subplan);
+ ++nasyncplans;
+ if (first)
+ referent_is_sync = false;
+ }
+ else
+ syncplans = lappend(syncplans, subplan);
+
+ first = false;
}
/*
@@ -1244,6 +1270,18 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
plan->first_partial_plan = best_path->first_partial_path;
plan->part_prune_info = partpruneinfo;
+ /*
+ * XXX ideally, if there's just one child, we'd not bother to generate an
+ * Append node but just return the single child. At the moment this does
+ * not work because the varno of the child scan plan won't match the
+ * parent-rel Vars it'll be asked to emit.
+ */
+
+ plan = make_append(list_concat(asyncplans, syncplans),
+ best_path->first_partial_path, nasyncplans,
+ referent_is_sync ? nasyncplans : 0, tlist,
+ partpruneinfo);
+
copy_generic_path_info(&plan->plan, (Path *) best_path);
/*
@@ -5462,6 +5500,27 @@ make_foreignscan(List *qptlist,
return node;
}
+static Append *
+make_append(List *appendplans, int first_partial_plan, int nasyncplans,
+ int referent, List *tlist, PartitionPruneInfo *partpruneinfo)
+{
+ Append *node = makeNode(Append);
+ Plan *plan = &node->plan;
+
+ plan->targetlist = tlist;
+ plan->qual = NIL;
+ plan->lefttree = NULL;
+ plan->righttree = NULL;
+
+ node->appendplans = appendplans;
+ node->first_partial_plan = first_partial_plan;
+ node->part_prune_info = partpruneinfo;
+ node->nasyncplans = nasyncplans;
+ node->referent = referent;
+
+ return node;
+}
+
static RecursiveUnion *
make_recursive_union(List *tlist,
Plan *lefttree,
@@ -6836,3 +6895,27 @@ is_projection_capable_plan(Plan *plan)
}
return true;
}
+
+/*
+ * is_projection_capable_path
+ * Check whether a given Path node is async-capable.
+ */
+static bool
+is_async_capable_path(Path *path)
+{
+ switch (nodeTag(path))
+ {
+ case T_ForeignPath:
+ {
+ FdwRoutine *fdwroutine = path->parent->fdwroutine;
+
+ Assert(fdwroutine != NULL);
+ if (fdwroutine->IsForeignPathAsyncCapable != NULL &&
+ fdwroutine->IsForeignPathAsyncCapable((ForeignPath *) path))
+ return true;
+ }
+ default:
+ break;
+ }
+ return false;
+}
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index fabcf31de8..575ccd5def 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3853,6 +3853,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
case WAIT_EVENT_SYNC_REP:
event_name = "SyncRep";
break;
+ case WAIT_EVENT_ASYNC_WAIT:
+ event_name = "AsyncExecWait";
+ break;
/* no default case, so that compiler will warn */
}
diff --git a/src/backend/postmaster/syslogger.c b/src/backend/postmaster/syslogger.c
index bb2baff763..7669e6ff53 100644
--- a/src/backend/postmaster/syslogger.c
+++ b/src/backend/postmaster/syslogger.c
@@ -306,7 +306,7 @@ SysLoggerMain(int argc, char *argv[])
* syslog pipe, which implies that all other backends have exited
* (including the postmaster).
*/
- wes = CreateWaitEventSet(CurrentMemoryContext, 2);
+ wes = CreateWaitEventSet(CurrentMemoryContext, NULL, 2);
AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL);
#ifndef WIN32
AddWaitEventToSet(wes, WL_SOCKET_READABLE, syslogPipe[0], NULL, NULL);
diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c
index 13685a0a0e..60b749f062 100644
--- a/src/backend/utils/adt/ruleutils.c
+++ b/src/backend/utils/adt/ruleutils.c
@@ -4640,7 +4640,7 @@ set_deparse_planstate(deparse_namespace *dpns, PlanState *ps)
dpns->planstate = ps;
/*
- * We special-case Append and MergeAppend to pretend that the first child
+ * We special-case Append and MergeAppend to pretend that a specific child
* plan is the OUTER referent; we have to interpret OUTER Vars in their
* tlists according to one of the children, and the first one is the most
* natural choice. Likewise special-case ModifyTable to pretend that the
@@ -4648,7 +4648,11 @@ set_deparse_planstate(deparse_namespace *dpns, PlanState *ps)
* lists containing references to non-target relations.
*/
if (IsA(ps, AppendState))
- dpns->outer_planstate = ((AppendState *) ps)->appendplans[0];
+ {
+ AppendState *aps = (AppendState *) ps;
+ Append *app = (Append *) ps->plan;
+ dpns->outer_planstate = aps->appendplans[app->referent];
+ }
else if (IsA(ps, MergeAppendState))
dpns->outer_planstate = ((MergeAppendState *) ps)->mergeplans[0];
else if (IsA(ps, ModifyTableState))
diff --git a/src/include/executor/execAsync.h b/src/include/executor/execAsync.h
new file mode 100644
index 0000000000..5fd67d9004
--- /dev/null
+++ b/src/include/executor/execAsync.h
@@ -0,0 +1,23 @@
+/*--------------------------------------------------------------------
+ * execAsync.c
+ * Support functions for asynchronous query execution
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/executor/execAsync.c
+ *--------------------------------------------------------------------
+ */
+#ifndef EXECASYNC_H
+#define EXECASYNC_H
+
+#include "nodes/execnodes.h"
+#include "storage/latch.h"
+
+extern void ExecAsyncSetState(PlanState *pstate, AsyncState status);
+extern bool ExecAsyncConfigureWait(WaitEventSet *wes, PlanState *node,
+ void *data, bool reinit);
+extern Bitmapset *ExecAsyncEventWait(PlanState **nodes, Bitmapset *waitnodes,
+ long timeout);
+#endif /* EXECASYNC_H */
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 6298c7c8ca..4adb2efe76 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -59,6 +59,7 @@
#define EXEC_FLAG_MARK 0x0008 /* need mark/restore */
#define EXEC_FLAG_SKIP_TRIGGERS 0x0010 /* skip AfterTrigger calls */
#define EXEC_FLAG_WITH_NO_DATA 0x0020 /* rel scannability doesn't matter */
+#define EXEC_FLAG_ASYNC 0x0040 /* request async execution */
/* Hook for plugins to get control in ExecutorStart() */
diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h
index ca7723c899..81791033af 100644
--- a/src/include/executor/nodeForeignscan.h
+++ b/src/include/executor/nodeForeignscan.h
@@ -30,5 +30,8 @@ extern void ExecForeignScanReInitializeDSM(ForeignScanState *node,
extern void ExecForeignScanInitializeWorker(ForeignScanState *node,
ParallelWorkerContext *pwcxt);
extern void ExecShutdownForeignScan(ForeignScanState *node);
+extern bool ExecForeignAsyncConfigureWait(ForeignScanState *node,
+ WaitEventSet *wes,
+ void *caller_data, bool reinit);
#endif /* NODEFOREIGNSCAN_H */
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index 822686033e..851cd15e65 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -169,6 +169,11 @@ typedef bool (*IsForeignScanParallelSafe_function) (PlannerInfo *root,
typedef List *(*ReparameterizeForeignPathByChild_function) (PlannerInfo *root,
List *fdw_private,
RelOptInfo *child_rel);
+typedef bool (*IsForeignPathAsyncCapable_function) (ForeignPath *path);
+typedef bool (*ForeignAsyncConfigureWait_function) (ForeignScanState *node,
+ WaitEventSet *wes,
+ void *caller_data,
+ bool reinit);
/*
* FdwRoutine is the struct returned by a foreign-data wrapper's handler
@@ -190,6 +195,7 @@ typedef struct FdwRoutine
GetForeignPlan_function GetForeignPlan;
BeginForeignScan_function BeginForeignScan;
IterateForeignScan_function IterateForeignScan;
+ IterateForeignScan_function IterateForeignScanAsync;
ReScanForeignScan_function ReScanForeignScan;
EndForeignScan_function EndForeignScan;
@@ -242,6 +248,11 @@ typedef struct FdwRoutine
InitializeDSMForeignScan_function InitializeDSMForeignScan;
ReInitializeDSMForeignScan_function ReInitializeDSMForeignScan;
InitializeWorkerForeignScan_function InitializeWorkerForeignScan;
+
+ /* Support functions for asynchronous execution */
+ IsForeignPathAsyncCapable_function IsForeignPathAsyncCapable;
+ ForeignAsyncConfigureWait_function ForeignAsyncConfigureWait;
+
ShutdownForeignScan_function ShutdownForeignScan;
/* Support functions for path reparameterization. */
diff --git a/src/include/nodes/bitmapset.h b/src/include/nodes/bitmapset.h
index 0c645628e5..d97a4c2235 100644
--- a/src/include/nodes/bitmapset.h
+++ b/src/include/nodes/bitmapset.h
@@ -107,6 +107,7 @@ extern Bitmapset *bms_add_members(Bitmapset *a, const Bitmapset *b);
extern Bitmapset *bms_add_range(Bitmapset *a, int lower, int upper);
extern Bitmapset *bms_int_members(Bitmapset *a, const Bitmapset *b);
extern Bitmapset *bms_del_members(Bitmapset *a, const Bitmapset *b);
+extern Bitmapset *bms_del_range(Bitmapset *a, int lower, int upper);
extern Bitmapset *bms_join(Bitmapset *a, Bitmapset *b);
/* support for iterating through the integer elements of a set: */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 6eb647290b..46d7fbab3a 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -932,6 +932,12 @@ typedef TupleTableSlot *(*ExecProcNodeMtd) (struct PlanState *pstate);
* abstract superclass for all PlanState-type nodes.
* ----------------
*/
+typedef enum AsyncState
+{
+ AS_AVAILABLE,
+ AS_WAITING
+} AsyncState;
+
typedef struct PlanState
{
NodeTag type;
@@ -1020,6 +1026,11 @@ typedef struct PlanState
bool outeropsset;
bool inneropsset;
bool resultopsset;
+
+ /* Async subnode execution sutff */
+ AsyncState asyncstate;
+
+ int32 padding; /* to keep alignment of derived types */
} PlanState;
/* ----------------
@@ -1216,14 +1227,20 @@ struct AppendState
PlanState ps; /* its first field is NodeTag */
PlanState **appendplans; /* array of PlanStates for my inputs */
int as_nplans;
- int as_whichplan;
+ int as_whichsyncplan; /* which sync plan is being executed */
int as_first_partial_plan; /* Index of 'appendplans' containing
* the first partial plan */
+ int as_nasyncplans; /* # of async-capable children */
ParallelAppendState *as_pstate; /* parallel coordination info */
Size pstate_len; /* size of parallel coordination info */
struct PartitionPruneState *as_prune_state;
Bitmapset *as_valid_subplans;
bool (*choose_next_subplan) (AppendState *);
+ bool as_syncdone; /* all synchronous plans done? */
+ Bitmapset *as_needrequest; /* async plans needing a new request */
+ Bitmapset *as_pending_async; /* pending async plans */
+ TupleTableSlot **as_asyncresult; /* unreturned results of async plans */
+ int as_nasyncresult; /* # of valid entries in as_asyncresult */
};
/* ----------------
@@ -1786,6 +1803,7 @@ typedef struct ForeignScanState
Size pscan_len; /* size of parallel coordination information */
/* use struct pointer to avoid including fdwapi.h here */
struct FdwRoutine *fdwroutine;
+ bool fs_async;
void *fdw_state; /* foreign-data wrapper can keep state here */
} ForeignScanState;
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index 8e6594e355..26810915e5 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -133,6 +133,11 @@ typedef struct Plan
bool parallel_aware; /* engage parallel-aware logic? */
bool parallel_safe; /* OK to use as part of parallel plan? */
+ /*
+ * information needed for asynchronous execution
+ */
+ bool async_capable; /* engage asyncronous execution logic? */
+
/*
* Common structural data for all Plan types.
*/
@@ -259,6 +264,10 @@ typedef struct Append
/* Info for run-time subplan pruning; NULL if we're not doing that */
struct PartitionPruneInfo *part_prune_info;
+
+ /* Async child node execution stuff */
+ int nasyncplans; /* # async subplans, always at start of list */
+ int referent; /* index of inheritance tree referent */
} Append;
/* ----------------
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index fe076d823d..d57ef809fc 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -853,7 +853,8 @@ typedef enum
WAIT_EVENT_REPLICATION_ORIGIN_DROP,
WAIT_EVENT_REPLICATION_SLOT_DROP,
WAIT_EVENT_SAFE_SNAPSHOT,
- WAIT_EVENT_SYNC_REP
+ WAIT_EVENT_SYNC_REP,
+ WAIT_EVENT_ASYNC_WAIT
} WaitEventIPC;
/* ----------
--
2.23.0
v1-0003-async-postgres_fdw.patchtext/x-patch; charset=us-asciiDownload
From 8984d4e3360eaf79fc0f23b3e6817770be13fcfd Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 19 Oct 2017 17:24:07 +0900
Subject: [PATCH v1 3/3] async postgres_fdw
---
contrib/postgres_fdw/connection.c | 26 +
.../postgres_fdw/expected/postgres_fdw.out | 222 ++++---
contrib/postgres_fdw/postgres_fdw.c | 615 ++++++++++++++++--
contrib/postgres_fdw/postgres_fdw.h | 2 +
contrib/postgres_fdw/sql/postgres_fdw.sql | 20 +-
5 files changed, 711 insertions(+), 174 deletions(-)
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 27b86a03f8..1afd99cad8 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -56,6 +56,7 @@ typedef struct ConnCacheEntry
bool invalidated; /* true if reconnect is pending */
uint32 server_hashvalue; /* hash value of foreign server OID */
uint32 mapping_hashvalue; /* hash value of user mapping OID */
+ void *storage; /* connection specific storage */
} ConnCacheEntry;
/*
@@ -200,6 +201,7 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
entry->conn, server->servername, user->umid, user->userid);
+ entry->storage = NULL;
}
/*
@@ -213,6 +215,30 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
return entry->conn;
}
+/*
+ * Rerturns the connection specific storage for this user. Allocate with
+ * initsize if not exists.
+ */
+void *
+GetConnectionSpecificStorage(UserMapping *user, size_t initsize)
+{
+ bool found;
+ ConnCacheEntry *entry;
+ ConnCacheKey key;
+
+ key = user->umid;
+ entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
+ Assert(found);
+
+ if (entry->storage == NULL)
+ {
+ entry->storage = MemoryContextAlloc(CacheMemoryContext, initsize);
+ memset(entry->storage, 0, initsize);
+ }
+
+ return entry->storage;
+}
+
/*
* Connect to remote server using specified server and user mapping properties.
*/
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 48282ab151..bd2e835d2d 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -6900,7 +6900,7 @@ INSERT INTO a(aa) VALUES('aaaaa');
INSERT INTO b(aa) VALUES('bbb');
INSERT INTO b(aa) VALUES('bbbb');
INSERT INTO b(aa) VALUES('bbbbb');
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
tableoid | aa
----------+-------
a | aaa
@@ -6928,7 +6928,7 @@ SELECT tableoid::regclass, * FROM ONLY a;
(3 rows)
UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
tableoid | aa
----------+--------
a | aaa
@@ -6956,7 +6956,7 @@ SELECT tableoid::regclass, * FROM ONLY a;
(3 rows)
UPDATE b SET aa = 'new';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
tableoid | aa
----------+--------
a | aaa
@@ -6984,7 +6984,7 @@ SELECT tableoid::regclass, * FROM ONLY a;
(3 rows)
UPDATE a SET aa = 'newtoo';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
tableoid | aa
----------+--------
a | newtoo
@@ -7054,35 +7054,41 @@ insert into bar2 values(3,33,33);
insert into bar2 values(4,44,44);
insert into bar2 values(7,77,77);
explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for update;
- QUERY PLAN
-----------------------------------------------------------------------------------------------
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
+ QUERY PLAN
+-----------------------------------------------------------------------------------------------------------------
LockRows
Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid
- -> Hash Join
+ -> Merge Join
Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid
Inner Unique: true
- Hash Cond: (bar.f1 = foo.f1)
- -> Append
- -> Seq Scan on public.bar
+ Merge Cond: (bar.f1 = foo.f1)
+ -> Merge Append
+ Sort Key: bar.f1
+ -> Sort
Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid
+ Sort Key: bar.f1
+ -> Seq Scan on public.bar
+ Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid
-> Foreign Scan on public.bar2 bar_1
Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid
- Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR UPDATE
- -> Hash
+ Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 ORDER BY f1 ASC NULLS LAST FOR UPDATE
+ -> Sort
Output: foo.ctid, foo.f1, foo.*, foo.tableoid
+ Sort Key: foo.f1
-> HashAggregate
Output: foo.ctid, foo.f1, foo.*, foo.tableoid
Group Key: foo.f1
-> Append
- -> Seq Scan on public.foo
- Output: foo.ctid, foo.f1, foo.*, foo.tableoid
- -> Foreign Scan on public.foo2 foo_1
+ Async subplans: 1
+ -> Async Foreign Scan on public.foo2 foo_1
Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid
Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
-(23 rows)
+ -> Seq Scan on public.foo
+ Output: foo.ctid, foo.f1, foo.*, foo.tableoid
+(29 rows)
-select * from bar where f1 in (select f1 from foo) for update;
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
f1 | f2
----+----
1 | 11
@@ -7092,35 +7098,41 @@ select * from bar where f1 in (select f1 from foo) for update;
(4 rows)
explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for share;
- QUERY PLAN
-----------------------------------------------------------------------------------------------
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
+ QUERY PLAN
+----------------------------------------------------------------------------------------------------------------
LockRows
Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid
- -> Hash Join
+ -> Merge Join
Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid
Inner Unique: true
- Hash Cond: (bar.f1 = foo.f1)
- -> Append
- -> Seq Scan on public.bar
+ Merge Cond: (bar.f1 = foo.f1)
+ -> Merge Append
+ Sort Key: bar.f1
+ -> Sort
Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid
+ Sort Key: bar.f1
+ -> Seq Scan on public.bar
+ Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid
-> Foreign Scan on public.bar2 bar_1
Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid
- Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR SHARE
- -> Hash
+ Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 ORDER BY f1 ASC NULLS LAST FOR SHARE
+ -> Sort
Output: foo.ctid, foo.f1, foo.*, foo.tableoid
+ Sort Key: foo.f1
-> HashAggregate
Output: foo.ctid, foo.f1, foo.*, foo.tableoid
Group Key: foo.f1
-> Append
- -> Seq Scan on public.foo
- Output: foo.ctid, foo.f1, foo.*, foo.tableoid
- -> Foreign Scan on public.foo2 foo_1
+ Async subplans: 1
+ -> Async Foreign Scan on public.foo2 foo_1
Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid
Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
-(23 rows)
+ -> Seq Scan on public.foo
+ Output: foo.ctid, foo.f1, foo.*, foo.tableoid
+(29 rows)
-select * from bar where f1 in (select f1 from foo) for share;
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
f1 | f2
----+----
1 | 11
@@ -7150,11 +7162,12 @@ update bar set f2 = f2 + 100 where f1 in (select f1 from foo);
Output: foo.ctid, foo.f1, foo.*, foo.tableoid
Group Key: foo.f1
-> Append
- -> Seq Scan on public.foo
- Output: foo.ctid, foo.f1, foo.*, foo.tableoid
- -> Foreign Scan on public.foo2 foo_1
+ Async subplans: 1
+ -> Async Foreign Scan on public.foo2 foo_1
Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid
Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
+ -> Seq Scan on public.foo
+ Output: foo.ctid, foo.f1, foo.*, foo.tableoid
-> Hash Join
Output: bar_1.f1, (bar_1.f2 + 100), bar_1.f3, bar_1.ctid, foo.ctid, foo.*, foo.tableoid
Inner Unique: true
@@ -7168,12 +7181,13 @@ update bar set f2 = f2 + 100 where f1 in (select f1 from foo);
Output: foo.ctid, foo.f1, foo.*, foo.tableoid
Group Key: foo.f1
-> Append
- -> Seq Scan on public.foo
- Output: foo.ctid, foo.f1, foo.*, foo.tableoid
- -> Foreign Scan on public.foo2 foo_1
+ Async subplans: 1
+ -> Async Foreign Scan on public.foo2 foo_1
Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid
Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
-(39 rows)
+ -> Seq Scan on public.foo
+ Output: foo.ctid, foo.f1, foo.*, foo.tableoid
+(41 rows)
update bar set f2 = f2 + 100 where f1 in (select f1 from foo);
select tableoid::regclass, * from bar order by 1,2;
@@ -7203,16 +7217,17 @@ where bar.f1 = ss.f1;
Output: bar.f1, (bar.f2 + 100), bar.ctid, (ROW(foo.f1))
Hash Cond: (foo.f1 = bar.f1)
-> Append
- -> Seq Scan on public.foo
- Output: ROW(foo.f1), foo.f1
- -> Foreign Scan on public.foo2 foo_1
+ Async subplans: 2
+ -> Async Foreign Scan on public.foo2 foo_1
Output: ROW(foo_1.f1), foo_1.f1
Remote SQL: SELECT f1 FROM public.loct1
- -> Seq Scan on public.foo foo_2
- Output: ROW((foo_2.f1 + 3)), (foo_2.f1 + 3)
- -> Foreign Scan on public.foo2 foo_3
+ -> Async Foreign Scan on public.foo2 foo_3
Output: ROW((foo_3.f1 + 3)), (foo_3.f1 + 3)
Remote SQL: SELECT f1 FROM public.loct1
+ -> Seq Scan on public.foo
+ Output: ROW(foo.f1), foo.f1
+ -> Seq Scan on public.foo foo_2
+ Output: ROW((foo_2.f1 + 3)), (foo_2.f1 + 3)
-> Hash
Output: bar.f1, bar.f2, bar.ctid
-> Seq Scan on public.bar
@@ -7230,17 +7245,18 @@ where bar.f1 = ss.f1;
Output: (ROW(foo.f1)), foo.f1
Sort Key: foo.f1
-> Append
- -> Seq Scan on public.foo
- Output: ROW(foo.f1), foo.f1
- -> Foreign Scan on public.foo2 foo_1
+ Async subplans: 2
+ -> Async Foreign Scan on public.foo2 foo_1
Output: ROW(foo_1.f1), foo_1.f1
Remote SQL: SELECT f1 FROM public.loct1
- -> Seq Scan on public.foo foo_2
- Output: ROW((foo_2.f1 + 3)), (foo_2.f1 + 3)
- -> Foreign Scan on public.foo2 foo_3
+ -> Async Foreign Scan on public.foo2 foo_3
Output: ROW((foo_3.f1 + 3)), (foo_3.f1 + 3)
Remote SQL: SELECT f1 FROM public.loct1
-(45 rows)
+ -> Seq Scan on public.foo
+ Output: ROW(foo.f1), foo.f1
+ -> Seq Scan on public.foo foo_2
+ Output: ROW((foo_2.f1 + 3)), (foo_2.f1 + 3)
+(47 rows)
update bar set f2 = f2 + 100
from
@@ -7390,27 +7406,33 @@ delete from foo where f1 < 5 returning *;
(5 rows)
explain (verbose, costs off)
-update bar set f2 = f2 + 100 returning *;
- QUERY PLAN
-------------------------------------------------------------------------------
- Update on public.bar
- Output: bar.f1, bar.f2
- Update on public.bar
- Foreign Update on public.bar2 bar_1
- -> Seq Scan on public.bar
- Output: bar.f1, (bar.f2 + 100), bar.ctid
- -> Foreign Update on public.bar2 bar_1
- Remote SQL: UPDATE public.loct2 SET f2 = (f2 + 100) RETURNING f1, f2
-(8 rows)
+with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1;
+ QUERY PLAN
+--------------------------------------------------------------------------------------
+ Sort
+ Output: u.f1, u.f2
+ Sort Key: u.f1
+ CTE u
+ -> Update on public.bar
+ Output: bar.f1, bar.f2
+ Update on public.bar
+ Foreign Update on public.bar2 bar_1
+ -> Seq Scan on public.bar
+ Output: bar.f1, (bar.f2 + 100), bar.ctid
+ -> Foreign Update on public.bar2 bar_1
+ Remote SQL: UPDATE public.loct2 SET f2 = (f2 + 100) RETURNING f1, f2
+ -> CTE Scan on u
+ Output: u.f1, u.f2
+(14 rows)
-update bar set f2 = f2 + 100 returning *;
+with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1;
f1 | f2
----+-----
1 | 311
2 | 322
- 6 | 266
3 | 333
4 | 344
+ 6 | 266
7 | 277
(6 rows)
@@ -8485,11 +8507,12 @@ SELECT t1.a,t2.b,t3.c FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) INNER J
Sort
Sort Key: t1.a, t3.c
-> Append
- -> Foreign Scan
+ Async subplans: 2
+ -> Async Foreign Scan
Relations: ((ftprt1_p1 t1) INNER JOIN (ftprt2_p1 t2)) INNER JOIN (ftprt1_p1 t3)
- -> Foreign Scan
+ -> Async Foreign Scan
Relations: ((ftprt1_p2 t1_1) INNER JOIN (ftprt2_p2 t2_1)) INNER JOIN (ftprt1_p2 t3_1)
-(7 rows)
+(8 rows)
SELECT t1.a,t2.b,t3.c FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) INNER JOIN fprt1 t3 ON (t2.b = t3.a) WHERE t1.a % 25 =0 ORDER BY 1,2,3;
a | b | c
@@ -8524,20 +8547,22 @@ SELECT t1.a,t2.b,t2.c FROM fprt1 t1 LEFT JOIN (SELECT * FROM fprt2 WHERE a < 10)
-- with whole-row reference; partitionwise join does not apply
EXPLAIN (COSTS OFF)
SELECT t1.wr, t2.wr FROM (SELECT t1 wr, a FROM fprt1 t1 WHERE t1.a % 25 = 0) t1 FULL JOIN (SELECT t2 wr, b FROM fprt2 t2 WHERE t2.b % 25 = 0) t2 ON (t1.a = t2.b) ORDER BY 1,2;
- QUERY PLAN
---------------------------------------------------------
+ QUERY PLAN
+--------------------------------------------------------------
Sort
Sort Key: ((t1.*)::fprt1), ((t2.*)::fprt2)
-> Hash Full Join
Hash Cond: (t1.a = t2.b)
-> Append
- -> Foreign Scan on ftprt1_p1 t1
- -> Foreign Scan on ftprt1_p2 t1_1
+ Async subplans: 2
+ -> Async Foreign Scan on ftprt1_p1 t1
+ -> Async Foreign Scan on ftprt1_p2 t1_1
-> Hash
-> Append
- -> Foreign Scan on ftprt2_p1 t2
- -> Foreign Scan on ftprt2_p2 t2_1
-(11 rows)
+ Async subplans: 2
+ -> Async Foreign Scan on ftprt2_p1 t2
+ -> Async Foreign Scan on ftprt2_p2 t2_1
+(13 rows)
SELECT t1.wr, t2.wr FROM (SELECT t1 wr, a FROM fprt1 t1 WHERE t1.a % 25 = 0) t1 FULL JOIN (SELECT t2 wr, b FROM fprt2 t2 WHERE t2.b % 25 = 0) t2 ON (t1.a = t2.b) ORDER BY 1,2;
wr | wr
@@ -8566,11 +8591,12 @@ SELECT t1.a,t1.b FROM fprt1 t1, LATERAL (SELECT t2.a, t2.b FROM fprt2 t2 WHERE t
Sort
Sort Key: t1.a, t1.b
-> Append
- -> Foreign Scan
+ Async subplans: 2
+ -> Async Foreign Scan
Relations: (ftprt1_p1 t1) INNER JOIN (ftprt2_p1 t2)
- -> Foreign Scan
+ -> Async Foreign Scan
Relations: (ftprt1_p2 t1_1) INNER JOIN (ftprt2_p2 t2_1)
-(7 rows)
+(8 rows)
SELECT t1.a,t1.b FROM fprt1 t1, LATERAL (SELECT t2.a, t2.b FROM fprt2 t2 WHERE t1.a = t2.b AND t1.b = t2.a) q WHERE t1.a%25 = 0 ORDER BY 1,2;
a | b
@@ -8623,21 +8649,23 @@ SELECT t1.a, t1.phv, t2.b, t2.phv FROM (SELECT 't1_phv' phv, * FROM fprt1 WHERE
-- test FOR UPDATE; partitionwise join does not apply
EXPLAIN (COSTS OFF)
SELECT t1.a, t2.b FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) WHERE t1.a % 25 = 0 ORDER BY 1,2 FOR UPDATE OF t1;
- QUERY PLAN
---------------------------------------------------------------
+ QUERY PLAN
+--------------------------------------------------------------------
LockRows
-> Sort
Sort Key: t1.a
-> Hash Join
Hash Cond: (t2.b = t1.a)
-> Append
- -> Foreign Scan on ftprt2_p1 t2
- -> Foreign Scan on ftprt2_p2 t2_1
+ Async subplans: 2
+ -> Async Foreign Scan on ftprt2_p1 t2
+ -> Async Foreign Scan on ftprt2_p2 t2_1
-> Hash
-> Append
- -> Foreign Scan on ftprt1_p1 t1
- -> Foreign Scan on ftprt1_p2 t1_1
-(12 rows)
+ Async subplans: 2
+ -> Async Foreign Scan on ftprt1_p1 t1
+ -> Async Foreign Scan on ftprt1_p2 t1_1
+(14 rows)
SELECT t1.a, t2.b FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) WHERE t1.a % 25 = 0 ORDER BY 1,2 FOR UPDATE OF t1;
a | b
@@ -8672,18 +8700,19 @@ ANALYZE fpagg_tab_p3;
SET enable_partitionwise_aggregate TO false;
EXPLAIN (COSTS OFF)
SELECT a, sum(b), min(b), count(*) FROM pagg_tab GROUP BY a HAVING avg(b) < 22 ORDER BY 1;
- QUERY PLAN
------------------------------------------------------------
+ QUERY PLAN
+-----------------------------------------------------------------
Sort
Sort Key: pagg_tab.a
-> HashAggregate
Group Key: pagg_tab.a
Filter: (avg(pagg_tab.b) < '22'::numeric)
-> Append
- -> Foreign Scan on fpagg_tab_p1 pagg_tab
- -> Foreign Scan on fpagg_tab_p2 pagg_tab_1
- -> Foreign Scan on fpagg_tab_p3 pagg_tab_2
-(9 rows)
+ Async subplans: 3
+ -> Async Foreign Scan on fpagg_tab_p1 pagg_tab
+ -> Async Foreign Scan on fpagg_tab_p2 pagg_tab_1
+ -> Async Foreign Scan on fpagg_tab_p3 pagg_tab_2
+(10 rows)
-- Plan with partitionwise aggregates is enabled
SET enable_partitionwise_aggregate TO true;
@@ -8694,13 +8723,14 @@ SELECT a, sum(b), min(b), count(*) FROM pagg_tab GROUP BY a HAVING avg(b) < 22 O
Sort
Sort Key: pagg_tab.a
-> Append
- -> Foreign Scan
+ Async subplans: 3
+ -> Async Foreign Scan
Relations: Aggregate on (fpagg_tab_p1 pagg_tab)
- -> Foreign Scan
+ -> Async Foreign Scan
Relations: Aggregate on (fpagg_tab_p2 pagg_tab_1)
- -> Foreign Scan
+ -> Async Foreign Scan
Relations: Aggregate on (fpagg_tab_p3 pagg_tab_2)
-(9 rows)
+(10 rows)
SELECT a, sum(b), min(b), count(*) FROM pagg_tab GROUP BY a HAVING avg(b) < 22 ORDER BY 1;
a | sum | min | count
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index bdc21b36d1..f3212aac90 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -21,6 +21,8 @@
#include "commands/defrem.h"
#include "commands/explain.h"
#include "commands/vacuum.h"
+#include "executor/execAsync.h"
+#include "executor/nodeForeignscan.h"
#include "foreign/fdwapi.h"
#include "funcapi.h"
#include "miscadmin.h"
@@ -35,6 +37,7 @@
#include "optimizer/restrictinfo.h"
#include "optimizer/tlist.h"
#include "parser/parsetree.h"
+#include "pgstat.h"
#include "postgres_fdw.h"
#include "utils/builtins.h"
#include "utils/float.h"
@@ -56,6 +59,9 @@ PG_MODULE_MAGIC;
/* If no remote estimates, assume a sort costs 20% extra */
#define DEFAULT_FDW_SORT_MULTIPLIER 1.2
+/* Retrive PgFdwScanState struct from ForeginScanState */
+#define GetPgFdwScanState(n) ((PgFdwScanState *)(n)->fdw_state)
+
/*
* Indexes of FDW-private information stored in fdw_private lists.
*
@@ -122,11 +128,28 @@ enum FdwDirectModifyPrivateIndex
FdwDirectModifyPrivateSetProcessed
};
+/*
+ * Connection private area structure.
+ */
+typedef struct PgFdwConnpriv
+{
+ ForeignScanState *leader; /* leader node of this connection */
+ bool busy; /* true if this connection is busy */
+} PgFdwConnpriv;
+
+/* Execution state base type */
+typedef struct PgFdwState
+{
+ PGconn *conn; /* connection for the scan */
+ PgFdwConnpriv *connpriv; /* connection private memory */
+} PgFdwState;
+
/*
* Execution state of a foreign scan using postgres_fdw.
*/
typedef struct PgFdwScanState
{
+ PgFdwState s; /* common structure */
Relation rel; /* relcache entry for the foreign table. NULL
* for a foreign join scan. */
TupleDesc tupdesc; /* tuple descriptor of scan */
@@ -137,7 +160,7 @@ typedef struct PgFdwScanState
List *retrieved_attrs; /* list of retrieved attribute numbers */
/* for remote query execution */
- PGconn *conn; /* connection for the scan */
+ bool result_ready;
unsigned int cursor_number; /* quasi-unique ID for my cursor */
bool cursor_exists; /* have we created the cursor? */
int numParams; /* number of parameters passed to query */
@@ -153,6 +176,12 @@ typedef struct PgFdwScanState
/* batch-level state, for optimizing rewinds and avoiding useless fetch */
int fetch_ct_2; /* Min(# of fetches done, 2) */
bool eof_reached; /* true if last fetch reached EOF */
+ bool run_async; /* true if run asynchronously */
+ bool inqueue; /* true if this node is in waiter queue */
+ ForeignScanState *waiter; /* Next node to run a query among nodes
+ * sharing the same connection */
+ ForeignScanState *last_waiter; /* last waiting node in waiting queue.
+ * valid only on the leader node */
/* working memory contexts */
MemoryContext batch_cxt; /* context holding current batch of tuples */
@@ -166,11 +195,11 @@ typedef struct PgFdwScanState
*/
typedef struct PgFdwModifyState
{
+ PgFdwState s; /* common structure */
Relation rel; /* relcache entry for the foreign table */
AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
/* for remote query execution */
- PGconn *conn; /* connection for the scan */
char *p_name; /* name of prepared statement, if created */
/* extracted fdw_private data */
@@ -197,6 +226,7 @@ typedef struct PgFdwModifyState
*/
typedef struct PgFdwDirectModifyState
{
+ PgFdwState s; /* common structure */
Relation rel; /* relcache entry for the foreign table */
AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
@@ -326,6 +356,7 @@ static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node);
static void postgresReScanForeignScan(ForeignScanState *node);
static void postgresEndForeignScan(ForeignScanState *node);
+static void postgresShutdownForeignScan(ForeignScanState *node);
static void postgresAddForeignUpdateTargets(Query *parsetree,
RangeTblEntry *target_rte,
Relation target_relation);
@@ -391,6 +422,10 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root,
RelOptInfo *input_rel,
RelOptInfo *output_rel,
void *extra);
+static bool postgresIsForeignPathAsyncCapable(ForeignPath *path);
+static bool postgresForeignAsyncConfigureWait(ForeignScanState *node,
+ WaitEventSet *wes,
+ void *caller_data, bool reinit);
/*
* Helper functions
@@ -419,7 +454,9 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
EquivalenceClass *ec, EquivalenceMember *em,
void *arg);
static void create_cursor(ForeignScanState *node);
-static void fetch_more_data(ForeignScanState *node);
+static void request_more_data(ForeignScanState *node);
+static void fetch_received_data(ForeignScanState *node);
+static void vacate_connection(PgFdwState *fdwconn, bool clear_queue);
static void close_cursor(PGconn *conn, unsigned int cursor_number);
static PgFdwModifyState *create_foreign_modify(EState *estate,
RangeTblEntry *rte,
@@ -522,6 +559,7 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
routine->IterateForeignScan = postgresIterateForeignScan;
routine->ReScanForeignScan = postgresReScanForeignScan;
routine->EndForeignScan = postgresEndForeignScan;
+ routine->ShutdownForeignScan = postgresShutdownForeignScan;
/* Functions for updating foreign tables */
routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets;
@@ -558,6 +596,10 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
/* Support functions for upper relation push-down */
routine->GetForeignUpperPaths = postgresGetForeignUpperPaths;
+ /* Support functions for async execution */
+ routine->IsForeignPathAsyncCapable = postgresIsForeignPathAsyncCapable;
+ routine->ForeignAsyncConfigureWait = postgresForeignAsyncConfigureWait;
+
PG_RETURN_POINTER(routine);
}
@@ -1434,12 +1476,22 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
* Get connection to the foreign server. Connection manager will
* establish new connection if necessary.
*/
- fsstate->conn = GetConnection(user, false);
+ fsstate->s.conn = GetConnection(user, false);
+ fsstate->s.connpriv = (PgFdwConnpriv *)
+ GetConnectionSpecificStorage(user, sizeof(PgFdwConnpriv));
+ fsstate->s.connpriv->leader = NULL;
+ fsstate->s.connpriv->busy = false;
+ fsstate->waiter = NULL;
+ fsstate->last_waiter = node;
/* Assign a unique ID for my cursor */
- fsstate->cursor_number = GetCursorNumber(fsstate->conn);
+ fsstate->cursor_number = GetCursorNumber(fsstate->s.conn);
fsstate->cursor_exists = false;
+ /* Initialize async execution status */
+ fsstate->run_async = false;
+ fsstate->inqueue = false;
+
/* Get private info created by planner functions. */
fsstate->query = strVal(list_nth(fsplan->fdw_private,
FdwScanPrivateSelectSql));
@@ -1487,40 +1539,259 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
&fsstate->param_values);
}
+/*
+ * Async queue manipuration functions
+ */
+
+/*
+ * add_async_waiter:
+ *
+ * adds the node to the end of waiter queue. Immediately starts the node if no
+ * node is running
+ */
+static inline void
+add_async_waiter(ForeignScanState *node)
+{
+ PgFdwScanState *fsstate = GetPgFdwScanState(node);
+ ForeignScanState *leader = fsstate->s.connpriv->leader;
+
+ /* do nothing if the node is already in the queue or already eof'ed */
+ if (leader == node || fsstate->inqueue || fsstate->eof_reached)
+ return;
+
+ if (leader == NULL)
+ {
+ /* immediately send request if not busy */
+ request_more_data(node);
+ }
+ else
+ {
+ PgFdwScanState *leader_state = GetPgFdwScanState(leader);
+ PgFdwScanState *last_waiter_state
+ = GetPgFdwScanState(leader_state->last_waiter);
+
+ last_waiter_state->waiter = node;
+ leader_state->last_waiter = node;
+ fsstate->inqueue = true;
+ }
+}
+
+/*
+ * move_to_next_waiter:
+ *
+ * Makes the first waiter be next leader
+ * Returns the new leader or NULL if there's no waiter.
+ */
+static inline ForeignScanState *
+move_to_next_waiter(ForeignScanState *node)
+{
+ PgFdwScanState *fsstate = GetPgFdwScanState(node);
+ ForeignScanState *ret = fsstate->waiter;
+
+ Assert(fsstate->s.connpriv->leader = node);
+
+ if (ret)
+ {
+ PgFdwScanState *retstate = GetPgFdwScanState(ret);
+ fsstate->waiter = NULL;
+ retstate->last_waiter = fsstate->last_waiter;
+ retstate->inqueue = false;
+ }
+
+ fsstate->s.connpriv->leader = ret;
+
+ return ret;
+}
+
+/*
+ * remove the node from waiter queue
+ *
+ * This is a bit different from the two above in the sense that this can
+ * operate on connection leader. The result is absorbed when this is called on
+ * active leader.
+ *
+ * Returns true if the node was found.
+ */
+static inline bool
+remove_async_node(ForeignScanState *node)
+{
+ PgFdwScanState *fsstate = GetPgFdwScanState(node);
+ ForeignScanState *leader = fsstate->s.connpriv->leader;
+ PgFdwScanState *leader_state;
+ ForeignScanState *prev;
+ PgFdwScanState *prev_state;
+ ForeignScanState *cur;
+
+ /* no need to remove me */
+ if (!leader || !fsstate->inqueue)
+ return false;
+
+ leader_state = GetPgFdwScanState(leader);
+
+ /* Remove the leader node */
+ if (leader == node)
+ {
+ ForeignScanState *next_leader;
+
+ if (leader_state->s.connpriv->busy)
+ {
+ /*
+ * this node is waiting for result, absorb the result first so
+ * that the following commands can be sent on the connection.
+ */
+ PgFdwScanState *leader_state = GetPgFdwScanState(leader);
+ PGconn *conn = leader_state->s.conn;
+
+ while(PQisBusy(conn))
+ PQclear(PQgetResult(conn));
+
+ leader_state->s.connpriv->busy = false;
+ }
+
+ /* Make the first waiter the leader */
+ if (leader_state->waiter)
+ {
+ PgFdwScanState *next_leader_state;
+
+ next_leader = leader_state->waiter;
+ next_leader_state = GetPgFdwScanState(next_leader);
+
+ leader_state->s.connpriv->leader = next_leader;
+ next_leader_state->last_waiter = leader_state->last_waiter;
+ }
+ leader_state->waiter = NULL;
+
+ return true;
+ }
+
+ /*
+ * Just remove the node in queue
+ *
+ * This function is called on the shutdown path. We don't bother
+ * considering faster way to do this.
+ */
+ prev = leader;
+ prev_state = leader_state;
+ cur = GetPgFdwScanState(prev)->waiter;
+ while (cur)
+ {
+ PgFdwScanState *curstate = GetPgFdwScanState(cur);
+
+ if (cur == node)
+ {
+ prev_state->waiter = curstate->waiter;
+ if (leader_state->last_waiter == cur)
+ leader_state->last_waiter = prev;
+ else
+ leader_state->last_waiter = cur;
+
+ fsstate->inqueue = false;
+
+ return true;
+ }
+ prev = cur;
+ prev_state = curstate;
+ cur = curstate->waiter;
+ }
+
+ return false;
+}
+
/*
* postgresIterateForeignScan
- * Retrieve next row from the result set, or clear tuple slot to indicate
- * EOF.
+ * Retrieve next row from the result set.
+ *
+ * For synchronous nodes, returns clear tuples slot to indicte EOF.
+ *
+ * If the node is asynchronous one, clear tuple slot has two meanings.
+ * If the caller receives clear tuple slot, asyncstate indicates wheter
+ * the node is EOF (AS_AVAILABLE) or waiting for data to
+ * come(AS_WAITING).
*/
static TupleTableSlot *
postgresIterateForeignScan(ForeignScanState *node)
{
- PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ PgFdwScanState *fsstate = GetPgFdwScanState(node);
TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
- /*
- * If this is the first call after Begin or ReScan, we need to create the
- * cursor on the remote side.
- */
- if (!fsstate->cursor_exists)
- create_cursor(node);
+ if (fsstate->next_tuple >= fsstate->num_tuples && !fsstate->eof_reached)
+ {
+ /* we've run out, get some more tuples */
+ if (!node->fs_async)
+ {
+ /* finish running query to send my command */
+ if (!fsstate->s.connpriv->busy)
+ vacate_connection((PgFdwState *)fsstate, false);
+
+ request_more_data(node);
+
+ /*
+ * Fetch the result immediately. This executes the next waiter if
+ * any.
+ */
+ fetch_received_data(node);
+ }
+ else if (!fsstate->s.connpriv->busy)
+ {
+ /* If the connection is not busy, just send the request. */
+ request_more_data(node);
+ }
+ else
+ {
+ /* This connection is busy */
+ bool available = true;
+ ForeignScanState *leader = fsstate->s.connpriv->leader;
+ PgFdwScanState *leader_state = GetPgFdwScanState(leader);
+
+ /* Check if the result is immediately available */
+ if (PQisBusy(leader_state->s.conn))
+ {
+ int rc = WaitLatchOrSocket(NULL,
+ WL_SOCKET_READABLE | WL_TIMEOUT |
+ WL_EXIT_ON_PM_DEATH,
+ PQsocket(leader_state->s.conn), 0,
+ WAIT_EVENT_ASYNC_WAIT);
+ if (!(rc & WL_SOCKET_READABLE))
+ available = false;
+ }
+
+ /* The next waiter is executed automatcically */
+ if (available)
+ fetch_received_data(leader);
+
+ /* add the requested node */
+ add_async_waiter(node);
+
+ /* add the previous leader */
+ add_async_waiter(leader);
+ }
+ }
/*
- * Get some more tuples, if we've run out.
+ * If we haven't received a result for the given node this time,
+ * return with no tuple to give way to another node.
*/
if (fsstate->next_tuple >= fsstate->num_tuples)
{
- /* No point in another fetch if we already detected EOF, though. */
- if (!fsstate->eof_reached)
- fetch_more_data(node);
- /* If we didn't get any tuples, must be end of data. */
- if (fsstate->next_tuple >= fsstate->num_tuples)
- return ExecClearTuple(slot);
+ if (fsstate->eof_reached)
+ {
+ fsstate->result_ready = true;
+ node->ss.ps.asyncstate = AS_AVAILABLE;
+ }
+ else
+ {
+ fsstate->result_ready = false;
+ node->ss.ps.asyncstate = AS_WAITING;
+ }
+
+ return ExecClearTuple(slot);
}
/*
* Return the next tuple.
*/
+ fsstate->result_ready = true;
+ node->ss.ps.asyncstate = AS_AVAILABLE;
ExecStoreHeapTuple(fsstate->tuples[fsstate->next_tuple++],
slot,
false);
@@ -1535,7 +1806,7 @@ postgresIterateForeignScan(ForeignScanState *node)
static void
postgresReScanForeignScan(ForeignScanState *node)
{
- PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ PgFdwScanState *fsstate = GetPgFdwScanState(node);
char sql[64];
PGresult *res;
@@ -1543,6 +1814,8 @@ postgresReScanForeignScan(ForeignScanState *node)
if (!fsstate->cursor_exists)
return;
+ vacate_connection((PgFdwState *)fsstate, true);
+
/*
* If any internal parameters affecting this node have changed, we'd
* better destroy and recreate the cursor. Otherwise, rewinding it should
@@ -1571,9 +1844,9 @@ postgresReScanForeignScan(ForeignScanState *node)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = pgfdw_exec_query(fsstate->conn, sql);
+ res = pgfdw_exec_query(fsstate->s.conn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
- pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
+ pgfdw_report_error(ERROR, res, fsstate->s.conn, true, sql);
PQclear(res);
/* Now force a fresh FETCH. */
@@ -1591,7 +1864,7 @@ postgresReScanForeignScan(ForeignScanState *node)
static void
postgresEndForeignScan(ForeignScanState *node)
{
- PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ PgFdwScanState *fsstate = GetPgFdwScanState(node);
/* if fsstate is NULL, we are in EXPLAIN; nothing to do */
if (fsstate == NULL)
@@ -1599,15 +1872,31 @@ postgresEndForeignScan(ForeignScanState *node)
/* Close the cursor if open, to prevent accumulation of cursors */
if (fsstate->cursor_exists)
- close_cursor(fsstate->conn, fsstate->cursor_number);
+ close_cursor(fsstate->s.conn, fsstate->cursor_number);
/* Release remote connection */
- ReleaseConnection(fsstate->conn);
- fsstate->conn = NULL;
+ ReleaseConnection(fsstate->s.conn);
+ fsstate->s.conn = NULL;
/* MemoryContexts will be deleted automatically. */
}
+/*
+ * postgresShutdownForeignScan
+ * Remove asynchrony stuff and cleanup garbage on the connection.
+ */
+static void
+postgresShutdownForeignScan(ForeignScanState *node)
+{
+ ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
+
+ if (plan->operation != CMD_SELECT)
+ return;
+
+ /* remove the node from waiting queue */
+ remove_async_node(node);
+}
+
/*
* postgresAddForeignUpdateTargets
* Add resjunk column(s) needed for update/delete on a foreign table
@@ -2372,7 +2661,9 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
* Get connection to the foreign server. Connection manager will
* establish new connection if necessary.
*/
- dmstate->conn = GetConnection(user, false);
+ dmstate->s.conn = GetConnection(user, false);
+ dmstate->s.connpriv = (PgFdwConnpriv *)
+ GetConnectionSpecificStorage(user, sizeof(PgFdwConnpriv));
/* Update the foreign-join-related fields. */
if (fsplan->scan.scanrelid == 0)
@@ -2457,7 +2748,11 @@ postgresIterateDirectModify(ForeignScanState *node)
* If this is the first call after Begin, execute the statement.
*/
if (dmstate->num_tuples == -1)
+ {
+ /* finish running query to send my command */
+ vacate_connection((PgFdwState *)dmstate, true);
execute_dml_stmt(node);
+ }
/*
* If the local query doesn't specify RETURNING, just clear tuple slot.
@@ -2504,8 +2799,8 @@ postgresEndDirectModify(ForeignScanState *node)
PQclear(dmstate->result);
/* Release remote connection */
- ReleaseConnection(dmstate->conn);
- dmstate->conn = NULL;
+ ReleaseConnection(dmstate->s.conn);
+ dmstate->s.conn = NULL;
/* MemoryContext will be deleted automatically. */
}
@@ -2703,6 +2998,7 @@ estimate_path_cost_size(PlannerInfo *root,
List *local_param_join_conds;
StringInfoData sql;
PGconn *conn;
+ PgFdwConnpriv *connpriv;
Selectivity local_sel;
QualCost local_cost;
List *fdw_scan_tlist = NIL;
@@ -2747,6 +3043,18 @@ estimate_path_cost_size(PlannerInfo *root,
/* Get the remote estimate */
conn = GetConnection(fpinfo->user, false);
+ connpriv = GetConnectionSpecificStorage(fpinfo->user,
+ sizeof(PgFdwConnpriv));
+ if (connpriv)
+ {
+ PgFdwState tmpstate;
+ tmpstate.conn = conn;
+ tmpstate.connpriv = connpriv;
+
+ /* finish running query to send my command */
+ vacate_connection(&tmpstate, true);
+ }
+
get_remote_estimate(sql.data, conn, &rows, &width,
&startup_cost, &total_cost);
ReleaseConnection(conn);
@@ -3317,11 +3625,11 @@ ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
static void
create_cursor(ForeignScanState *node)
{
- PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ PgFdwScanState *fsstate = GetPgFdwScanState(node);
ExprContext *econtext = node->ss.ps.ps_ExprContext;
int numParams = fsstate->numParams;
const char **values = fsstate->param_values;
- PGconn *conn = fsstate->conn;
+ PGconn *conn = fsstate->s.conn;
StringInfoData buf;
PGresult *res;
@@ -3384,50 +3692,127 @@ create_cursor(ForeignScanState *node)
}
/*
- * Fetch some more rows from the node's cursor.
+ * Sends the next request of the node. If the given node is different from the
+ * current connection leader, pushes it back to waiter queue and let the given
+ * node be the leader.
*/
static void
-fetch_more_data(ForeignScanState *node)
+request_more_data(ForeignScanState *node)
{
- PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ PgFdwScanState *fsstate = GetPgFdwScanState(node);
+ ForeignScanState *leader = fsstate->s.connpriv->leader;
+ PGconn *conn = fsstate->s.conn;
+ char sql[64];
+
+ /* must be non-busy */
+ Assert(!fsstate->s.connpriv->busy);
+ /* must be not-eof */
+ Assert(!fsstate->eof_reached);
+
+ /*
+ * If this is the first call after Begin or ReScan, we need to create the
+ * cursor on the remote side.
+ */
+ if (!fsstate->cursor_exists)
+ create_cursor(node);
+
+ snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
+ fsstate->fetch_size, fsstate->cursor_number);
+
+ if (!PQsendQuery(conn, sql))
+ pgfdw_report_error(ERROR, NULL, conn, false, sql);
+
+ fsstate->s.connpriv->busy = true;
+
+ /* Let the node be the leader if it is different from current one */
+ if (leader != node)
+ {
+ /*
+ * If the connection leader exists, insert the node as the connection
+ * leader making the current leader be the first waiter.
+ */
+ if (leader != NULL)
+ {
+ remove_async_node(node);
+ fsstate->last_waiter = GetPgFdwScanState(leader)->last_waiter;
+ fsstate->waiter = leader;
+ }
+ else
+ {
+ fsstate->last_waiter = node;
+ fsstate->waiter = NULL;
+ }
+
+ fsstate->s.connpriv->leader = node;
+ }
+}
+
+/*
+ * Fetches received data and automatically send requests of the next waiter.
+ */
+static void
+fetch_received_data(ForeignScanState *node)
+{
+ PgFdwScanState *fsstate = GetPgFdwScanState(node);
PGresult *volatile res = NULL;
MemoryContext oldcontext;
+ ForeignScanState *waiter;
+
+ /* I should be the current connection leader */
+ Assert(fsstate->s.connpriv->leader == node);
/*
* We'll store the tuples in the batch_cxt. First, flush the previous
- * batch.
+ * batch if no tuple is remaining
*/
- fsstate->tuples = NULL;
- MemoryContextReset(fsstate->batch_cxt);
+ if (fsstate->next_tuple >= fsstate->num_tuples)
+ {
+ fsstate->tuples = NULL;
+ fsstate->num_tuples = 0;
+ MemoryContextReset(fsstate->batch_cxt);
+ }
+ else if (fsstate->next_tuple > 0)
+ {
+ /* move the remaining tuples to the beginning of the store */
+ int n = 0;
+
+ while(fsstate->next_tuple < fsstate->num_tuples)
+ fsstate->tuples[n++] = fsstate->tuples[fsstate->next_tuple++];
+ fsstate->num_tuples = n;
+ }
+
oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
/* PGresult must be released before leaving this function. */
PG_TRY();
{
- PGconn *conn = fsstate->conn;
+ PGconn *conn = fsstate->s.conn;
char sql[64];
- int numrows;
+ int addrows;
+ size_t newsize;
int i;
snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
fsstate->fetch_size, fsstate->cursor_number);
- res = pgfdw_exec_query(conn, sql);
+ res = pgfdw_get_result(conn, sql);
/* On error, report the original query, not the FETCH. */
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
/* Convert the data into HeapTuples */
- numrows = PQntuples(res);
- fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
- fsstate->num_tuples = numrows;
- fsstate->next_tuple = 0;
+ addrows = PQntuples(res);
+ newsize = (fsstate->num_tuples + addrows) * sizeof(HeapTuple);
+ if (fsstate->tuples)
+ fsstate->tuples = (HeapTuple *) repalloc(fsstate->tuples, newsize);
+ else
+ fsstate->tuples = (HeapTuple *) palloc(newsize);
- for (i = 0; i < numrows; i++)
+ for (i = 0; i < addrows; i++)
{
Assert(IsA(node->ss.ps.plan, ForeignScan));
- fsstate->tuples[i] =
+ fsstate->tuples[fsstate->num_tuples + i] =
make_tuple_from_result_row(res, i,
fsstate->rel,
fsstate->attinmeta,
@@ -3437,22 +3822,75 @@ fetch_more_data(ForeignScanState *node)
}
/* Update fetch_ct_2 */
- if (fsstate->fetch_ct_2 < 2)
+ if (fsstate->fetch_ct_2 < 2 && fsstate->next_tuple == 0)
fsstate->fetch_ct_2++;
+ fsstate->next_tuple = 0;
+ fsstate->num_tuples += addrows;
+
/* Must be EOF if we didn't get as many tuples as we asked for. */
- fsstate->eof_reached = (numrows < fsstate->fetch_size);
+ fsstate->eof_reached = (addrows < fsstate->fetch_size);
+
+ PQclear(res);
+ res = NULL;
}
PG_FINALLY();
{
+ fsstate->s.connpriv->busy = false;
+
if (res)
PQclear(res);
}
PG_END_TRY();
+ fsstate->s.connpriv->busy = false;
+
+ /* let the first waiter be the next leader of this connection */
+ waiter = move_to_next_waiter(node);
+
+ /* send the next request if any */
+ if (waiter)
+ request_more_data(waiter);
+
MemoryContextSwitchTo(oldcontext);
}
+/*
+ * Vacate a connection so that this node can send the next query
+ */
+static void
+vacate_connection(PgFdwState *fdwstate, bool clear_queue)
+{
+ PgFdwConnpriv *connpriv = fdwstate->connpriv;
+ ForeignScanState *leader;
+
+ /* the connection is alrady available */
+ if (connpriv == NULL || connpriv->leader == NULL || !connpriv->busy)
+ return;
+
+ /*
+ * let the current connection leader read the result for the running query
+ */
+ leader = connpriv->leader;
+ fetch_received_data(leader);
+
+ /* let the first waiter be the next leader of this connection */
+ move_to_next_waiter(leader);
+
+ if (!clear_queue)
+ return;
+
+ /* Clear the waiting list */
+ while (leader)
+ {
+ PgFdwScanState *fsstate = GetPgFdwScanState(leader);
+
+ fsstate->last_waiter = NULL;
+ leader = fsstate->waiter;
+ fsstate->waiter = NULL;
+ }
+}
+
/*
* Force assorted GUC parameters to settings that ensure that we'll output
* data values in a form that is unambiguous to the remote server.
@@ -3566,7 +4004,9 @@ create_foreign_modify(EState *estate,
user = GetUserMapping(userid, table->serverid);
/* Open connection; report that we'll create a prepared statement. */
- fmstate->conn = GetConnection(user, true);
+ fmstate->s.conn = GetConnection(user, true);
+ fmstate->s.connpriv = (PgFdwConnpriv *)
+ GetConnectionSpecificStorage(user, sizeof(PgFdwConnpriv));
fmstate->p_name = NULL; /* prepared statement not made yet */
/* Set up remote query information. */
@@ -3653,6 +4093,9 @@ execute_foreign_modify(EState *estate,
operation == CMD_UPDATE ||
operation == CMD_DELETE);
+ /* finish running query to send my command */
+ vacate_connection((PgFdwState *)fmstate, true);
+
/* Set up the prepared statement on the remote server, if we didn't yet */
if (!fmstate->p_name)
prepare_foreign_modify(fmstate);
@@ -3680,14 +4123,14 @@ execute_foreign_modify(EState *estate,
/*
* Execute the prepared statement.
*/
- if (!PQsendQueryPrepared(fmstate->conn,
+ if (!PQsendQueryPrepared(fmstate->s.conn,
fmstate->p_name,
fmstate->p_nums,
p_values,
NULL,
NULL,
0))
- pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+ pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query);
/*
* Get the result, and check for success.
@@ -3695,10 +4138,10 @@ execute_foreign_modify(EState *estate,
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = pgfdw_get_result(fmstate->conn, fmstate->query);
+ res = pgfdw_get_result(fmstate->s.conn, fmstate->query);
if (PQresultStatus(res) !=
(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
- pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+ pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query);
/* Check number of rows affected, and fetch RETURNING tuple if any */
if (fmstate->has_returning)
@@ -3734,7 +4177,7 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
/* Construct name we'll use for the prepared statement. */
snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
- GetPrepStmtNumber(fmstate->conn));
+ GetPrepStmtNumber(fmstate->s.conn));
p_name = pstrdup(prep_name);
/*
@@ -3744,12 +4187,12 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
* the prepared statements we use in this module are simple enough that
* the remote server will make the right choices.
*/
- if (!PQsendPrepare(fmstate->conn,
+ if (!PQsendPrepare(fmstate->s.conn,
p_name,
fmstate->query,
0,
NULL))
- pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+ pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query);
/*
* Get the result, and check for success.
@@ -3757,9 +4200,9 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = pgfdw_get_result(fmstate->conn, fmstate->query);
+ res = pgfdw_get_result(fmstate->s.conn, fmstate->query);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
- pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+ pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query);
PQclear(res);
/* This action shows that the prepare has been done. */
@@ -3888,16 +4331,16 @@ finish_foreign_modify(PgFdwModifyState *fmstate)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = pgfdw_exec_query(fmstate->conn, sql);
+ res = pgfdw_exec_query(fmstate->s.conn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
- pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
+ pgfdw_report_error(ERROR, res, fmstate->s.conn, true, sql);
PQclear(res);
fmstate->p_name = NULL;
}
/* Release remote connection */
- ReleaseConnection(fmstate->conn);
- fmstate->conn = NULL;
+ ReleaseConnection(fmstate->s.conn);
+ fmstate->s.conn = NULL;
}
/*
@@ -4056,9 +4499,9 @@ execute_dml_stmt(ForeignScanState *node)
* the desired result. This allows us to avoid assuming that the remote
* server has the same OIDs we do for the parameters' types.
*/
- if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
+ if (!PQsendQueryParams(dmstate->s.conn, dmstate->query, numParams,
NULL, values, NULL, NULL, 0))
- pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
+ pgfdw_report_error(ERROR, NULL, dmstate->s.conn, false, dmstate->query);
/*
* Get the result, and check for success.
@@ -4066,10 +4509,10 @@ execute_dml_stmt(ForeignScanState *node)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
+ dmstate->result = pgfdw_get_result(dmstate->s.conn, dmstate->query);
if (PQresultStatus(dmstate->result) !=
(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
- pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
+ pgfdw_report_error(ERROR, dmstate->result, dmstate->s.conn, true,
dmstate->query);
/* Get the number of rows affected. */
@@ -5560,6 +6003,42 @@ postgresGetForeignJoinPaths(PlannerInfo *root,
/* XXX Consider parameterized paths for the join relation */
}
+static bool
+postgresIsForeignPathAsyncCapable(ForeignPath *path)
+{
+ return true;
+}
+
+
+/*
+ * Configure waiting event.
+ *
+ * Add an wait event only when the node is the connection leader. Elsewise
+ * another node on this connection is the leader.
+ */
+static bool
+postgresForeignAsyncConfigureWait(ForeignScanState *node, WaitEventSet *wes,
+ void *caller_data, bool reinit)
+{
+ PgFdwScanState *fsstate = GetPgFdwScanState(node);
+
+
+ /* If the caller didn't reinit, this event is already in event set */
+ if (!reinit)
+ return true;
+
+ if (fsstate->s.connpriv->leader == node)
+ {
+ AddWaitEventToSet(wes,
+ WL_SOCKET_READABLE, PQsocket(fsstate->s.conn),
+ NULL, caller_data);
+ return true;
+ }
+
+ return false;
+}
+
+
/*
* Assess whether the aggregation, grouping and having operations can be pushed
* down to the foreign server. As a side effect, save information we obtain in
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index ea052872c3..696af73408 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -85,6 +85,7 @@ typedef struct PgFdwRelationInfo
UserMapping *user; /* only set in use_remote_estimate mode */
int fetch_size; /* fetch size for this remote table */
+ bool allow_prefetch; /* true to allow overlapped fetching */
/*
* Name of the relation, for use while EXPLAINing ForeignScan. It is used
@@ -130,6 +131,7 @@ extern void reset_transmission_modes(int nestlevel);
/* in connection.c */
extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt);
+void *GetConnectionSpecificStorage(UserMapping *user, size_t initsize);
extern void ReleaseConnection(PGconn *conn);
extern unsigned int GetCursorNumber(PGconn *conn);
extern unsigned int GetPrepStmtNumber(PGconn *conn);
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 1c5c37b783..69c06ac6e4 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -1730,25 +1730,25 @@ INSERT INTO b(aa) VALUES('bbb');
INSERT INTO b(aa) VALUES('bbbb');
INSERT INTO b(aa) VALUES('bbbbb');
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
SELECT tableoid::regclass, * FROM b;
SELECT tableoid::regclass, * FROM ONLY a;
UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
SELECT tableoid::regclass, * FROM b;
SELECT tableoid::regclass, * FROM ONLY a;
UPDATE b SET aa = 'new';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
SELECT tableoid::regclass, * FROM b;
SELECT tableoid::regclass, * FROM ONLY a;
UPDATE a SET aa = 'newtoo';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
SELECT tableoid::regclass, * FROM b;
SELECT tableoid::regclass, * FROM ONLY a;
@@ -1790,12 +1790,12 @@ insert into bar2 values(4,44,44);
insert into bar2 values(7,77,77);
explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for update;
-select * from bar where f1 in (select f1 from foo) for update;
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for share;
-select * from bar where f1 in (select f1 from foo) for share;
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
-- Check UPDATE with inherited target and an inherited source table
explain (verbose, costs off)
@@ -1854,8 +1854,8 @@ explain (verbose, costs off)
delete from foo where f1 < 5 returning *;
delete from foo where f1 < 5 returning *;
explain (verbose, costs off)
-update bar set f2 = f2 + 100 returning *;
-update bar set f2 = f2 + 100 returning *;
+with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1;
+with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1;
-- Test that UPDATE/DELETE with inherited target works with row-level triggers
CREATE TRIGGER trig_row_before
--
2.23.0
On Thu, Dec 5, 2019 at 03:19:50PM -0500, Robert Haas wrote:
On Thu, Dec 5, 2019 at 1:12 PM Bruce Momjian <bruce@momjian.us> wrote:
I agree with Stephen's request. We have been waiting for the executor
rewrite for a while, so let's just do something simple and see how it
performs.I'm sympathetic to the frustration here, and I think it would be great
if we could find a way forward that doesn't involve waiting for a full
rewrite of the executor. However, I seem to remember that when we
tested the various patches that various people had written for this
feature (I wrote one, too) they all had a noticeable performance
penalty in the case of a plain old Append that involved no FDWs and
nothing asynchronous. I don't think it's OK to have, say, a 2%
regression on every query that involves an Append, because especially
now that we have partitioning, that's a lot of queries.I don't know whether this patch has that kind of problem. If it
doesn't, I would consider that a promising sign.
Certainly any overhead on normal queries would be unacceptable.
--
Bruce Momjian <bruce@momjian.us> http://momjian.us
EnterpriseDB http://enterprisedb.com
+ As you are, so once was I. As I am, so you will be. +
+ Ancient Roman grave inscription +
Hello.
I think I can say that this patch doesn't slows non-AsyncAppend,
non-postgres_fdw scans.
At Mon, 9 Dec 2019 12:18:44 -0500, Bruce Momjian <bruce@momjian.us> wrote in
Certainly any overhead on normal queries would be unacceptable.
I took performance numbers on the current shape of the async execution
patch for the following scan cases.
t0 : single local table (parallel disabled)
pll : local partitioning (local Append, parallel disabled)
ft0 : single foreign table
pf0 : inheritance on 4 foreign tables, single connection
pf1 : inheritance on 4 foreign tables, 4 connections
ptf0 : partition on 4 foreign tables, single connection
ptf1 : partition on 4 foreign tables, 4 connections
The benchmarking system is configured as the follows on a single
machine.
[ benchmark client ]
| |
(localhost:5433) (localhost:5432)
| |
+----+ | +------+ |
| V V V | V
| [master server] | [async server]
| V | V
+--fdw--+ +--fdw--+
The patch works roughly in the following steps.
1. Planner decides how many children out of an append can run
asynchrnously (called as async-capable.).
2. While ExecInit if an Append doesn't have an async-capable children,
ExecAppend that is exactly the same function is set as
ExecProcNode. Otherwise ExecAppendAsync is used.
If the infrastructure part in the patch causes any degradation, the
"t0"(scan on local single table) and/or "pll" test (scan on a local
paritioned table) gets slow.
3. postgresql_fdw always runs async-capable code path.
If the postgres_fdw part causes degradation, ft0 reflects that.
The tables has two integers and the query does sum(a) on all tuples.
With the default fetch_size = 100, number is run time in ms. Each
number is the average of 14 runs.
master patched gain
t0 7325 7130 +2.7%
pll 4558 4484 +1.7%
ft0 3670 3675 -0.1%
pf0 2322 1550 +33.3%
pf1 2367 1475 +37.7%
ptf0 2517 1624 +35.5%
ptf1 2343 1497 +36.2%
With larger fetch_size (200) the gain mysteriously decreases for
sharing single connection cases (pf0, ptf0), but others don't seem
change so much.
master patched gain
t0 7212 7252 -0.6%
pll 4546 4397 +3.3%
ft0 3712 3731 -0.5%
pf0 2131 1570 +26.4%
pf1 1926 1189 +38.3%
ptf0 2001 1557 +22.2%
ptf1 1903 1193 +37.4%
FWIW, attached are the test script.
gentblr2.sql: Table creation script.
testrun.sh : Benchmarking script.
regards.
--
Kyotaro Horiguchi
NTT Open Source Software Center
Hi Hackers,
Sharing the email below from Movead Li, I believe he wanted to share the
benchmarking results as a response to this email thread but it started a
new thread.. Here it is...
"
Hello
I have tested the patch with a partition table with several foreign
partitions living on seperate data nodes. The initial testing was done
with a partition table having 3 foreign partitions, test was done with
variety of scale facters. The seonnd test was with fixed data per data
node but number of data nodes were increased incrementally to see
the peformance impact as more nodes are added to the cluster. The
test three is similar to the initial test but with much huge data and
4 nodes.
The results are summary is given below and test script attached:
*Test ENV*
Parent node:2Core 8G
Child Nodes:2Core 4G
*Test one:*
1.1 The partition struct as below:
[ ptf:(a int, b int, c varchar)]
(Parent node)
| | |
[ptf1] [ptf2] [ptf3]
(Node1) (Node2) (Node3)
The table data is partitioned across nodes, the test is done using a
simple select query and a count aggregate as shown below. The result
is an average of executing each query multiple times to ensure reliable
and consistent results.
①select * from ptf where b = 100;
②select count(*) from ptf;
1.2. Test Results
For ① result:
scalepernode master patched performance
2G 7s 2s 350%
5G 173s 63s 275%
10G 462s 156s 296%
20G 968s 327s 296%
30G 1472s 494s 297%
For ② result:
scalepernode master patched performance
2G 1079s 291s 370%
5G 2688s 741s 362%
10G 4473s 1493s 299%
It takes too long time to test a aggregate so the test was done with a
smaller data size.
1.3. summary
With the table partitioned over 3 nodes, the average performance gain
across variety of scale factors is almost 300%
*Test Two*
2.1 The partition struct as below:
[ ptf:(a int, b int, c varchar)]
(Parent node)
| | |
[ptf1] ... [ptfN]
(Node1) (...) (NodeN)
①select * from ptf
②select * from ptf where b = 100;
This test is done with same size of data per node but table is partitioned
across N number of nodes. Each varation (master or patches) is tested
at-least 3 times to get reliable and consistent results. The purpose of the
test is to see impact on performance as number of data nodes are increased.
2.2 The results
For ① result(scalepernode=2G):
nodenumber master patched performance
2 432s 180s 240%
3 636s 223s 285%
4 830s 283s 293%
5 1065s 361s 295%
For ② result(scalepernode=10G):
nodenumber master patched performance
2 281s 140s 201%
3 421s 140s 300%
4 562s 141s 398%
5 702s 141s 497%
6 833s 139s 599%
7 986s 141s 699%
8 1125s 140s 803%
*Test Three*
This test is similar to the [test one] but with much huge data and
4 nodes.
For ① result:
scalepernode master patched performance
100G 6592s 1649s 399%
For ② result:
scalepernode master patched performance
100G 35383 12363 286%
The result show it work well in much huge data.
*Summary*
The patch is pretty good, it works well when there were little data back to
the parent node. The patch doesn’t provide parallel FDW scan, it ensures
that child nodes can send data to parent in parallel but the parent can
only
sequennly process the data from data nodes.
Providing there is no performance degrdation for non FDW append queries,
I would recomend to consider this patch as an interim soluton while we are
waiting for parallel FDW scan.
"
On Thu, Dec 12, 2019 at 5:41 PM Kyotaro Horiguchi <horikyota.ntt@gmail.com>
wrote:
Show quoted text
Hello.
I think I can say that this patch doesn't slows non-AsyncAppend,
non-postgres_fdw scans.At Mon, 9 Dec 2019 12:18:44 -0500, Bruce Momjian <bruce@momjian.us> wrote
inCertainly any overhead on normal queries would be unacceptable.
I took performance numbers on the current shape of the async execution
patch for the following scan cases.t0 : single local table (parallel disabled)
pll : local partitioning (local Append, parallel disabled)
ft0 : single foreign table
pf0 : inheritance on 4 foreign tables, single connection
pf1 : inheritance on 4 foreign tables, 4 connections
ptf0 : partition on 4 foreign tables, single connection
ptf1 : partition on 4 foreign tables, 4 connectionsThe benchmarking system is configured as the follows on a single
machine.[ benchmark client ]
| |
(localhost:5433) (localhost:5432)
| |
+----+ | +------+ |
| V V V | V
| [master server] | [async server]
| V | V
+--fdw--+ +--fdw--+The patch works roughly in the following steps.
1. Planner decides how many children out of an append can run
asynchrnously (called as async-capable.).2. While ExecInit if an Append doesn't have an async-capable children,
ExecAppend that is exactly the same function is set as
ExecProcNode. Otherwise ExecAppendAsync is used.If the infrastructure part in the patch causes any degradation, the
"t0"(scan on local single table) and/or "pll" test (scan on a local
paritioned table) gets slow.3. postgresql_fdw always runs async-capable code path.
If the postgres_fdw part causes degradation, ft0 reflects that.
The tables has two integers and the query does sum(a) on all tuples.
With the default fetch_size = 100, number is run time in ms. Each
number is the average of 14 runs.master patched gain
t0 7325 7130 +2.7%
pll 4558 4484 +1.7%
ft0 3670 3675 -0.1%
pf0 2322 1550 +33.3%
pf1 2367 1475 +37.7%
ptf0 2517 1624 +35.5%
ptf1 2343 1497 +36.2%With larger fetch_size (200) the gain mysteriously decreases for
sharing single connection cases (pf0, ptf0), but others don't seem
change so much.master patched gain
t0 7212 7252 -0.6%
pll 4546 4397 +3.3%
ft0 3712 3731 -0.5%
pf0 2131 1570 +26.4%
pf1 1926 1189 +38.3%
ptf0 2001 1557 +22.2%
ptf1 1903 1193 +37.4%FWIW, attached are the test script.
gentblr2.sql: Table creation script.
testrun.sh : Benchmarking script.regards.
--
Kyotaro Horiguchi
NTT Open Source Software Center
Attachments:
On Tue, Jan 14, 2020 at 02:37:48PM +0500, Ahsan Hadi wrote:
Summary
The patch is pretty good, it works well when there were little data back to
the parent node. The patch doesn’t provide parallel FDW scan, it ensures
that child nodes can send data to parent in parallel but the parent can only
sequennly process the data from data nodes.Providing there is no performance degrdation for non FDW append queries,
I would recomend to consider this patch as an interim soluton while we are
waiting for parallel FDW scan.
Wow, these are very impressive results!
--
Bruce Momjian <bruce@momjian.us> http://momjian.us
EnterpriseDB http://enterprisedb.com
+ As you are, so once was I. As I am, so you will be. +
+ Ancient Roman grave inscription +
Thank you very much for the testing of the patch, Ahsan!
At Wed, 15 Jan 2020 15:41:04 -0500, Bruce Momjian <bruce@momjian.us> wrote in
On Tue, Jan 14, 2020 at 02:37:48PM +0500, Ahsan Hadi wrote:
Summary
The patch is pretty good, it works well when there were little data back to
the parent node. The patch doesn’t provide parallel FDW scan, it ensures
that child nodes can send data to parent in parallel but the parent can only
sequennly process the data from data nodes.
"Parallel scan" at the moment means multiple workers fetch unique
blocks from *one* table in an arbitrated manner. In this sense
"parallel FDW scan" means multiple local workers fetch unique bundles
of tuples from *one* foreign table, which means it is running on a
single session. That doesn't offer an advantage.
If parallel query processing worked in worker-per-table mode,
especially on partitioned tables, maybe the current FDW would work
without much of modification. But I believe asynchronous append on
foreign tables on a single process is far resource-effective and
moderately faster than parallel append.
Providing there is no performance degrdation for non FDW append queries,
I would recomend to consider this patch as an interim soluton while we are
waiting for parallel FDW scan.Wow, these are very impressive results!
Thanks.
--
Kyotaro Horiguchi
NTT Open Source Software Center
On Thu, Jan 16, 2020 at 9:41 AM Bruce Momjian <bruce@momjian.us> wrote:
On Tue, Jan 14, 2020 at 02:37:48PM +0500, Ahsan Hadi wrote:
Summary
The patch is pretty good, it works well when there were little data back to
the parent node. The patch doesn’t provide parallel FDW scan, it ensures
that child nodes can send data to parent in parallel but the parent can only
sequennly process the data from data nodes.Providing there is no performance degrdation for non FDW append queries,
I would recomend to consider this patch as an interim soluton while we are
waiting for parallel FDW scan.Wow, these are very impressive results!
+1
Thanks Ahsan and Movead. Could you please confirm which patch set you tested?
Hello Kyotaro,
"Parallel scan" at the moment means multiple workers fetch unique
blocks from *one* table in an arbitrated manner. In this sense
"parallel FDW scan" means multiple local workers fetch unique bundles
of tuples from *one* foreign table, which means it is running on a
single session. That doesn't offer an advantage.
It maybe not "parallel FDW scan", it can be "parallel shards scan"
the local workers will pick every foreign partition to scan. I have ever
draw a picture about that you can see it in the link below.
https://www.highgo.ca/2019/08/22/parallel-foreign-scan-of-postgresql/
I think the "parallel shards scan" make sence in this way.
If parallel query processing worked in worker-per-table mode,
especially on partitioned tables, maybe the current FDW would work
without much of modification. But I believe asynchronous append on
foreign tables on a single process is far resource-effective and
moderately faster than parallel append.
As the test result, current patch can not gain more performance when
it returns a huge number of tuples. By "parallel shards scan" method,
it can work well, because the 'parallel' can take full use of CPUs while
'asynchronous' can't.
Highgo Software (Canada/China/Pakistan)
URL : http://www.highgo.ca/
EMAIL: mailto:movead(dot)li(at)highgo(dot)ca
Thanks!
At Wed, 29 Jan 2020 14:41:07 +0800, Movead Li <movead.li@highgo.ca> wrote in
"Parallel scan" at the moment means multiple workers fetch unique
blocks from *one* table in an arbitrated manner. In this sense
"parallel FDW scan" means multiple local workers fetch unique bundles
of tuples from *one* foreign table, which means it is running on a
single session. That doesn't offer an advantage.It maybe not "parallel FDW scan", it can be "parallel shards scan"
the local workers will pick every foreign partition to scan. I have ever
draw a picture about that you can see it in the link below.https://www.highgo.ca/2019/08/22/parallel-foreign-scan-of-postgresql/
I think the "parallel shards scan" make sence in this way.
It is "asynchronous append on async-capable'd postgres-fdw scans". It
could be called as such in the sense that it is intended to be used
with sharding.
If parallel query processing worked in worker-per-table mode,
especially on partitioned tables, maybe the current FDW would work
without much of modification. But I believe asynchronous append on
foreign tables on a single process is far resource-effective and
moderately faster than parallel append.As the test result, current patch can not gain more performance when
it returns a huge number of tuples. By "parallel shards scan" method,
it can work well, because the 'parallel' can take full use of CPUs while
'asynchronous' can't.
Did you looked at my benchmarking result upthread? Even it gives
significant gain even when gathering large number of tuples from
multiple servers or even from a single server. It is because of its
asynchronous nature.
regards.
--
Kyotaro Horiguchi
NTT Open Source Software Center
Hello,
It is "asynchronous append on async-capable'd postgres-fdw scans". It
could be called as such in the sense that it is intended to be used
with sharding.
Yes that's it.
Did you looked at my benchmarking result upthread? Even it gives
significant gain even when gathering large number of tuples from
multiple servers or even from a single server. It is because of its
asynchronous nature.
I mean it gain performance at first, but it mets bottleneck while
increase the number of the nodes.
For example:
It has 2 nodes, it will gain 200% performance.
It has 3 nodes, it will gain 300% performance.
However,
It has 4 nodes, it gain 300% performance.
It has 5 nodes, it gain 300% performance.
...
----
Highgo Software (Canada/China/Pakistan)
URL : www.highgo.ca
EMAIL: mailto:movead(dot)li(at)highgo(dot)ca
On Sun, Dec 1, 2019 at 4:26 AM Bruce Momjian <bruce@momjian.us> wrote:
On Sun, Nov 17, 2019 at 09:54:55PM +1300, Thomas Munro wrote:
On Sat, Sep 28, 2019 at 4:20 AM Bruce Momjian <bruce@momjian.us> wrote:
On Wed, Sep 4, 2019 at 06:18:31PM +1200, Thomas Munro wrote:
A few years back[1] I experimented with a simple readiness API that
would allow Append to start emitting tuples from whichever Foreign
Scan has data available, when working with FDW-based sharding. I used
that primarily as a way to test Andres's new WaitEventSet stuff and my
kqueue implementation of that, but I didn't pursue it seriously
because I knew we wanted a more ambitious async executor rewrite and
many people had ideas about that, with schedulers capable of jumping
all over the tree etc.Anyway, Stephen Frost pinged me off-list to ask about that patch, and
asked why we don't just do this naive thing until we have something
better. It's a very localised feature that works only between Append
and its immediate children. The patch makes it work for postgres_fdw,
but it should work for any FDW that can get its hands on a socket.Here's a quick rebase of that old POC patch, along with a demo. Since
2016, Parallel Append landed, but I didn't have time to think about
how to integrate with that so I did a quick "sledgehammer" rebase that
disables itself if parallelism is in the picture.Yes, sharding has been waiting on parallel FDW scans. Would this work
for parallel partition scans if the partitions were FDWs?Yeah, this works for partitions that are FDWs (as shown), but only for
Append, not for Parallel Append. So you'd have parallelism in the
sense that your N remote shard servers are all doing stuff at the same
time, but it couldn't be in a parallel query on your 'home' server,
which is probably good for things that push down aggregation and bring
back just a few tuples from each shard, but bad for anything wanting
to ship back millions of tuples to chew on locally. Do you think
that'd be useful enough on its own?Yes, I think so. There are many data warehouse queries that want to
return only aggregate values, or filter for a small number of rows.
Even OLTP queries might return only a few rows from multiple partitions.
This would allow for a proof-of-concept implementation so we can see how
realistic this approach is.
+1
Best regards,
Etsuro Fujita
On Thu, Dec 5, 2019 at 1:46 PM Thomas Munro <thomas.munro@gmail.com> wrote:
On Thu, Dec 5, 2019 at 4:26 PM Kyotaro Horiguchi
<horikyota.ntt@gmail.com> wrote:There's my pending (somewhat stale) patch, which allows to run local
scans while waiting for remote servers./messages/by-id/20180515.202945.69332784.horiguchi.kyotaro@lab.ntt.co.jp
I think it’s great to execute local scans while waiting for the
results of remote scans, but looking at your patch (the 0002 patch of
your patch set in [1]/messages/by-id/20200820.163608.1893015081639298019.horikyota.ntt@gmail.com), I’m not sure that the 0002 patch does it much
efficiently, because it modifies nodeAppend.c so that all the work is
done by a single process. Rather than doing so, I’m wondering if it
would be better to modify Parallel Append so that some processes
execute remote scans and others execute local scans. I’m not sure
that we need to have this improvement as well in the first cut of this
feature, though.
After rereading some threads to remind myself what happened here...
right, my little patch began life in March 2016[1] when I wanted a
test case to test Andres's work on WaitEventSets, and your patch set
started a couple of months later and is vastly more ambitious[2][3].
It wants to escape from the volcano give-me-one-tuple-or-give-me-EOF
model. And I totally agree that there are lots of reason to want to
do that (including yielding to other parts of the plan instead of
waiting for I/O, locks and some parallelism primitives enabling new
kinds of parallelism), and I'm hoping to help with some small pieces
of that if I can.My patch set (rebased upthread) was extremely primitive, with no new
planner concepts, and added only a very simple new executor node
method: ExecReady(). Append used that to try to ask its children if
they'd like some time to warm up. By default, ExecReady() says "I
don't know what you're talking about, go away", but FDWs can provide
an implementation that says "yes, please call me again when this fd is
ready" or "yes, I am ready, please call ExecProc() now". It doesn't
deal with anything more complicated than that, and in particular it
doesn't work if there are extra planner nodes in between Append and
the foreign scan. (It also doesn't mix particularly well with
parallelism, as mentioned.)The reason I reposted this unambitious work is because Stephen keeps
asking me why we don't consider the stupidly simple thing that would
help with simple foreign partition-based queries today, instead of
waiting for someone to redesign the entire executor, because that's
... really hard.
Yeah, I think your patch is much simpler, compared to Horiguchi-san’s
patch set, which I think is a good thing, considering this would be
rather an interim solution until executor rewrite is done. Here are a
few comments that I have for now:
* I know your patch is a POC one, but one concern about it (and
Horiguchi-san's patch set) is concurrent data fetches by multiple
foreign scan nodes using the same connection in the case of
postgres_fdw. Here is an example causing an error:
create or replace function slow_data_ext(name text, secs float)
returns setof t as
$$
begin
perform pg_sleep(secs);
return query select name, generate_series(1, 100)::text as i;
end;
$$
language plpgsql;
create view t11 as select * from slow_data_ext('t11', 1.0);
create view t12 as select * from slow_data_ext('t12', 2.0);
create view t13 as select * from slow_data_ext('t13', 3.0);
create foreign table ft11 (a text, b text) server server1 options
(table_name 't11');
create foreign table ft12 (a text, b text) server server2 options
(table_name 't12');
create foreign table ft13 (a text, b text) server server3 options
(table_name 't13');
create table pt1 (a text, b text) partition by list (a);
alter table pt1 attach partition ft11 for values in ('t11');
alter table pt1 attach partition ft12 for values in ('t12');
alter table pt1 attach partition ft13 for values in ('t13');
create view t21 as select * from slow_data_ext('t21', 1.0);
create view t22 as select * from slow_data_ext('t22', 2.0);
create view t23 as select * from slow_data_ext('t23', 3.0);
create foreign table ft21 (a text, b text) server server1 options
(table_name 't21');
create foreign table ft22 (a text, b text) server server2 options
(table_name 't22');
create foreign table ft23 (a text, b text) server server3 options
(table_name 't23');
create table pt2 (a text, b text) partition by list (a);
alter table pt2 attach partition ft21 for values in ('t21');
alter table pt2 attach partition ft22 for values in ('t22');
alter table pt2 attach partition ft23 for values in ('t23');
explain verbose select * from pt1, pt2 where pt2.a = 't22' or pt2.a = 't23';
QUERY PLAN
--------------------------------------------------------------------------------------------------------------
Nested Loop (cost=200.00..1303.80 rows=50220 width=128)
Output: pt1.a, pt1.b, pt2.a, pt2.b
-> Append (cost=100.00..427.65 rows=2790 width=64)
-> Foreign Scan on public.ft11 pt1_1 (cost=100.00..137.90
rows=930 width=64)
Output: pt1_1.a, pt1_1.b
Remote SQL: SELECT a, b FROM public.t11
-> Foreign Scan on public.ft12 pt1_2 (cost=100.00..137.90
rows=930 width=64)
Output: pt1_2.a, pt1_2.b
Remote SQL: SELECT a, b FROM public.t12
-> Foreign Scan on public.ft13 pt1_3 (cost=100.00..137.90
rows=930 width=64)
Output: pt1_3.a, pt1_3.b
Remote SQL: SELECT a, b FROM public.t13
-> Materialize (cost=100.00..248.44 rows=18 width=64)
Output: pt2.a, pt2.b
-> Append (cost=100.00..248.35 rows=18 width=64)
-> Foreign Scan on public.ft22 pt2_1
(cost=100.00..124.13 rows=9 width=64)
Output: pt2_1.a, pt2_1.b
Remote SQL: SELECT a, b FROM public.t22 WHERE
(((a = 't22'::text) OR (a = 't23'::text)))
-> Foreign Scan on public.ft23 pt2_2
(cost=100.00..124.13 rows=9 width=64)
Output: pt2_2.a, pt2_2.b
Remote SQL: SELECT a, b FROM public.t23 WHERE
(((a = 't22'::text) OR (a = 't23'::text)))
(21 rows)
select * from pt1, pt2 where pt2.a = 't22' or pt2.a = 't23';
ERROR: another command is already in progress
CONTEXT: remote SQL command: DECLARE c4 CURSOR FOR
SELECT a, b FROM public.t22 WHERE (((a = 't22'::text) OR (a = 't23'::text)))
I think the cause of this error is that an asynchronous data fetch for
ft22 is blocked by that for ft12 that is in progress.
(Horiguchi-san’s patch set doesn't work for this query either, causing
the same error. Though, it looks like he intended to handle cases
like this by a queuing system added to postgres_fdw to process such
concurrent data fetches.) I think a simple solution for this issue
would be to just disable asynchrony optimization for such cases. I
think we could do so by tracking foreign scan nodes across the entire
final plan tree that use the same connection at plan or execution
time.
* Another one is “no new planner concepts”. I think it leads to this:
@@ -239,9 +242,207 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
/* For parallel query, this will be overridden later. */
appendstate->choose_next_subplan = choose_next_subplan_locally;
+ /*
+ * Initially we consider all subplans to be potentially
asynchronous.
+ */
+ appendstate->asyncplans = (PlanState **) palloc(nplans *
sizeof(PlanState *));
+ appendstate->asyncfds = (int *) palloc0(nplans * sizeof(int));
+ appendstate->nasyncplans = nplans;
+ memcpy(appendstate->asyncplans, appendstate->appendplans, nplans *
sizeof(PlanState *));
+ appendstate->lastreadyplan = 0;
I’m not sure that this would cause performance degradation in the case
of an plain Append that involves no FDWs supporting asynchrony
optimization, but I think it would be better to determine subplans
with that optimization at plan time and save cycles at execution time
as done in Horiguchi-san’s patch set. (I’m not sure that we need a
new ExecAppend function proposed there, though.) Also, consider this
ordered Append example:
create table t31 (a int check (a >= 10 and a < 20), b text);
create table t32 (a int check (a >= 20 and a < 30), b text);
create table t33 (a int check (a >= 30 and a < 40), b text);
create foreign table ft31 (a int check (a >= 10 and a < 20), b text)
server server1 options (table_name 't31');
create foreign table ft32 (a int check (a >= 20 and a < 30), b text)
server server2 options (table_name 't32');
create foreign table ft33 (a int check (a >= 30 and a < 40), b text)
server server3 options (table_name 't33');
create table pt3 (a int, b text) partition by range (a);
alter table pt3 attach partition ft31 for values from (10) to (20);
alter table pt3 attach partition ft32 for values from (20) to (30);
alter table pt3 attach partition ft33 for values from (30) to (40);
explain verbose select * from pt3 order by a;
QUERY PLAN
-----------------------------------------------------------------------------------
Append (cost=300.00..487.52 rows=4095 width=36)
-> Foreign Scan on public.ft31 pt3_1 (cost=100.00..155.68
rows=1365 width=36)
Output: pt3_1.a, pt3_1.b
Remote SQL: SELECT a, b FROM public.t31 ORDER BY a ASC NULLS LAST
-> Foreign Scan on public.ft32 pt3_2 (cost=100.00..155.68
rows=1365 width=36)
Output: pt3_2.a, pt3_2.b
Remote SQL: SELECT a, b FROM public.t32 ORDER BY a ASC NULLS LAST
-> Foreign Scan on public.ft33 pt3_3 (cost=100.00..155.68
rows=1365 width=36)
Output: pt3_3.a, pt3_3.b
Remote SQL: SELECT a, b FROM public.t33 ORDER BY a ASC NULLS LAST
(10 rows)
For this query, we can’t apply asynchrony optimization. To disable it
for such cases I think it would be better to do something at plan time
as well as done in his patch set.
I haven’t finished reviewing your patch, but before doing so, I’ll
review Horiguchi-san's patch set in more detail for further
comparison. Attached is a rebased version of your patch, in which I
added the same changes to the postgres_fdw regression tests as
Horiguchi-san so that the tests run successfully.
Thank you for working on this, Thomas and Horiguchi-san! Sorry for the delay.
Best regards,
Etsuro Fujita
[1]: /messages/by-id/20200820.163608.1893015081639298019.horikyota.ntt@gmail.com
Attachments:
0001-Multiplexing-Append-POC-v2.patchapplication/octet-stream; name=0001-Multiplexing-Append-POC-v2.patchDownload
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 08daf26fdf..413d603a03 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -59,6 +59,7 @@ typedef struct ConnCacheEntry
bool invalidated; /* true if reconnect is pending */
uint32 server_hashvalue; /* hash value of foreign server OID */
uint32 mapping_hashvalue; /* hash value of user mapping OID */
+ PgFdwConnState state; /* extra per-connection state */
} ConnCacheEntry;
/*
@@ -105,7 +106,7 @@ static bool UserMappingPasswordRequired(UserMapping *user);
* (not even on error), we need this flag to cue manual cleanup.
*/
PGconn *
-GetConnection(UserMapping *user, bool will_prep_stmt)
+GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
{
bool found;
ConnCacheEntry *entry;
@@ -197,6 +198,7 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
entry->mapping_hashvalue =
GetSysCacheHashValue1(USERMAPPINGOID,
ObjectIdGetDatum(user->umid));
+ memset(&entry->state, 0, sizeof(entry->state));
/* Now try to make the connection */
entry->conn = connect_pg_server(server, user);
@@ -213,6 +215,10 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
/* Remember if caller will prepare statements */
entry->have_prep_stmt |= will_prep_stmt;
+ /* If caller needs access to the per-connection state, return it. */
+ if (state)
+ *state = &entry->state;
+
return entry->conn;
}
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 90db550b92..540ed3e711 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -6973,7 +6973,7 @@ INSERT INTO a(aa) VALUES('aaaaa');
INSERT INTO b(aa) VALUES('bbb');
INSERT INTO b(aa) VALUES('bbbb');
INSERT INTO b(aa) VALUES('bbbbb');
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
tableoid | aa
----------+-------
a | aaa
@@ -7001,7 +7001,7 @@ SELECT tableoid::regclass, * FROM ONLY a;
(3 rows)
UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
tableoid | aa
----------+--------
a | aaa
@@ -7029,7 +7029,7 @@ SELECT tableoid::regclass, * FROM ONLY a;
(3 rows)
UPDATE b SET aa = 'new';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
tableoid | aa
----------+--------
a | aaa
@@ -7057,7 +7057,7 @@ SELECT tableoid::regclass, * FROM ONLY a;
(3 rows)
UPDATE a SET aa = 'newtoo';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
tableoid | aa
----------+--------
a | newtoo
@@ -7085,7 +7085,7 @@ SELECT tableoid::regclass, * FROM ONLY a;
(3 rows)
DELETE FROM a;
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
tableoid | aa
----------+----
(0 rows)
@@ -7127,23 +7127,28 @@ insert into bar2 values(3,33,33);
insert into bar2 values(4,44,44);
insert into bar2 values(7,77,77);
explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for update;
- QUERY PLAN
-----------------------------------------------------------------------------------------------
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
+ QUERY PLAN
+-----------------------------------------------------------------------------------------------------------------
LockRows
Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid
- -> Hash Join
+ -> Merge Join
Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid
Inner Unique: true
- Hash Cond: (bar.f1 = foo.f1)
- -> Append
- -> Seq Scan on public.bar bar_1
+ Merge Cond: (bar.f1 = foo.f1)
+ -> Merge Append
+ Sort Key: bar.f1
+ -> Sort
Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid
+ Sort Key: bar_1.f1
+ -> Seq Scan on public.bar bar_1
+ Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid
-> Foreign Scan on public.bar2 bar_2
Output: bar_2.f1, bar_2.f2, bar_2.ctid, bar_2.*, bar_2.tableoid
- Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR UPDATE
- -> Hash
+ Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 ORDER BY f1 ASC NULLS LAST FOR UPDATE
+ -> Sort
Output: foo.ctid, foo.f1, foo.*, foo.tableoid
+ Sort Key: foo.f1
-> HashAggregate
Output: foo.ctid, foo.f1, foo.*, foo.tableoid
Group Key: foo.f1
@@ -7153,9 +7158,9 @@ select * from bar where f1 in (select f1 from foo) for update;
-> Foreign Scan on public.foo2 foo_2
Output: foo_2.ctid, foo_2.f1, foo_2.*, foo_2.tableoid
Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
-(23 rows)
+(28 rows)
-select * from bar where f1 in (select f1 from foo) for update;
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
f1 | f2
----+----
1 | 11
@@ -7165,23 +7170,28 @@ select * from bar where f1 in (select f1 from foo) for update;
(4 rows)
explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for share;
- QUERY PLAN
-----------------------------------------------------------------------------------------------
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
+ QUERY PLAN
+----------------------------------------------------------------------------------------------------------------
LockRows
Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid
- -> Hash Join
+ -> Merge Join
Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid
Inner Unique: true
- Hash Cond: (bar.f1 = foo.f1)
- -> Append
- -> Seq Scan on public.bar bar_1
+ Merge Cond: (bar.f1 = foo.f1)
+ -> Merge Append
+ Sort Key: bar.f1
+ -> Sort
Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid
+ Sort Key: bar_1.f1
+ -> Seq Scan on public.bar bar_1
+ Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid
-> Foreign Scan on public.bar2 bar_2
Output: bar_2.f1, bar_2.f2, bar_2.ctid, bar_2.*, bar_2.tableoid
- Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR SHARE
- -> Hash
+ Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 ORDER BY f1 ASC NULLS LAST FOR SHARE
+ -> Sort
Output: foo.ctid, foo.f1, foo.*, foo.tableoid
+ Sort Key: foo.f1
-> HashAggregate
Output: foo.ctid, foo.f1, foo.*, foo.tableoid
Group Key: foo.f1
@@ -7191,9 +7201,9 @@ select * from bar where f1 in (select f1 from foo) for share;
-> Foreign Scan on public.foo2 foo_2
Output: foo_2.ctid, foo_2.f1, foo_2.*, foo_2.tableoid
Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
-(23 rows)
+(28 rows)
-select * from bar where f1 in (select f1 from foo) for share;
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
f1 | f2
----+----
1 | 11
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index a31abce7c9..d5f5a7d4e8 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -21,6 +21,7 @@
#include "commands/defrem.h"
#include "commands/explain.h"
#include "commands/vacuum.h"
+#include "executor/execReady.h"
#include "foreign/fdwapi.h"
#include "funcapi.h"
#include "miscadmin.h"
@@ -159,6 +160,9 @@ typedef struct PgFdwScanState
MemoryContext temp_cxt; /* context for per-tuple temporary data */
int fetch_size; /* number of tuples per fetch */
+
+ /* per-connection state */
+ PgFdwConnState *conn_state;
} PgFdwScanState;
/*
@@ -392,6 +396,8 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root,
RelOptInfo *output_rel,
void *extra);
+static int postgresReady(ForeignScanState *node);
+
/*
* Helper functions
*/
@@ -419,6 +425,7 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
EquivalenceClass *ec, EquivalenceMember *em,
void *arg);
static void create_cursor(ForeignScanState *node);
+static void fetch_more_data_begin(ForeignScanState *node);
static void fetch_more_data(ForeignScanState *node);
static void close_cursor(PGconn *conn, unsigned int cursor_number);
static PgFdwModifyState *create_foreign_modify(EState *estate,
@@ -558,6 +565,9 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
/* Support functions for upper relation push-down */
routine->GetForeignUpperPaths = postgresGetForeignUpperPaths;
+ /* Support for asynchrony */
+ routine->Ready = postgresReady;
+
PG_RETURN_POINTER(routine);
}
@@ -1433,7 +1443,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
* Get connection to the foreign server. Connection manager will
* establish new connection if necessary.
*/
- fsstate->conn = GetConnection(user, false);
+ fsstate->conn = GetConnection(user, false, &fsstate->conn_state);
/* Assign a unique ID for my cursor */
fsstate->cursor_number = GetCursorNumber(fsstate->conn);
@@ -1484,6 +1494,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
&fsstate->param_flinfo,
&fsstate->param_exprs,
&fsstate->param_values);
+ fsstate->conn_state->async_query_sent = false;
}
/*
@@ -1596,6 +1607,13 @@ postgresEndForeignScan(ForeignScanState *node)
if (fsstate == NULL)
return;
+ /*
+ * If we're ending before we've collected a response from an asynchronous
+ * query, we have to consume the response.
+ */
+ if (fsstate->conn_state->async_query_sent)
+ fetch_more_data(node);
+
/* Close the cursor if open, to prevent accumulation of cursors */
if (fsstate->cursor_exists)
close_cursor(fsstate->conn, fsstate->cursor_number);
@@ -2371,7 +2389,7 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
* Get connection to the foreign server. Connection manager will
* establish new connection if necessary.
*/
- dmstate->conn = GetConnection(user, false);
+ dmstate->conn = GetConnection(user, false, NULL);
/* Update the foreign-join-related fields. */
if (fsplan->scan.scanrelid == 0)
@@ -2745,7 +2763,7 @@ estimate_path_cost_size(PlannerInfo *root,
false, &retrieved_attrs, NULL);
/* Get the remote estimate */
- conn = GetConnection(fpinfo->user, false);
+ conn = GetConnection(fpinfo->user, false, NULL);
get_remote_estimate(sql.data, conn, &rows, &width,
&startup_cost, &total_cost);
ReleaseConnection(conn);
@@ -3382,6 +3400,35 @@ create_cursor(ForeignScanState *node)
pfree(buf.data);
}
+/*
+ * Begin an asynchronous data fetch.
+ * fetch_more_data must be called to fetch the results..
+ */
+static void
+fetch_more_data_begin(ForeignScanState *node)
+{
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ PGconn *conn = fsstate->conn;
+ char sql[64];
+
+ Assert(!fsstate->conn_state->async_query_sent);
+
+ /*
+ * Create the cursor synchronously. (With more state machine stuff we
+ * could do this asynchronously too).
+ */
+ if (!fsstate->cursor_exists)
+ create_cursor(node);
+
+ /* We will send this query, but not wait for the response. */
+ snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
+ fsstate->fetch_size, fsstate->cursor_number);
+
+ if (PQsendQuery(conn, sql) < 0)
+ pgfdw_report_error(ERROR, NULL, conn, false, fsstate->query);
+ fsstate->conn_state->async_query_sent = true;
+}
+
/*
* Fetch some more rows from the node's cursor.
*/
@@ -3408,13 +3455,28 @@ fetch_more_data(ForeignScanState *node)
int numrows;
int i;
- snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
- fsstate->fetch_size, fsstate->cursor_number);
+ if (!fsstate->conn_state->async_query_sent)
+ {
+ /* This is a regular synchronous fetch. */
+ snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
+ fsstate->fetch_size, fsstate->cursor_number);
- res = pgfdw_exec_query(conn, sql);
- /* On error, report the original query, not the FETCH. */
- if (PQresultStatus(res) != PGRES_TUPLES_OK)
- pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+ res = pgfdw_exec_query(conn, sql);
+ /* On error, report the original query, not the FETCH. */
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+ }
+ else
+ {
+ /*
+ * The query was already sent by an earlier call to
+ * fetch_more_data_begin. So now we just fetch the result.
+ */
+ res = PQgetResult(conn);
+ /* On error, report the original query, not the FETCH. */
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+ }
/* Convert the data into HeapTuples */
numrows = PQntuples(res);
@@ -3441,6 +3503,15 @@ fetch_more_data(ForeignScanState *node)
/* Must be EOF if we didn't get as many tuples as we asked for. */
fsstate->eof_reached = (numrows < fsstate->fetch_size);
+
+ /* If this was the second part of an async request, we must fetch until NULL. */
+ if (fsstate->conn_state->async_query_sent)
+ {
+ /* call once and raise error if not NULL as expected? */
+ while (PQgetResult(conn) != NULL)
+ ;
+ fsstate->conn_state->async_query_sent = false;
+ }
}
PG_FINALLY();
{
@@ -3565,7 +3636,7 @@ create_foreign_modify(EState *estate,
user = GetUserMapping(userid, table->serverid);
/* Open connection; report that we'll create a prepared statement. */
- fmstate->conn = GetConnection(user, true);
+ fmstate->conn = GetConnection(user, true, NULL);
fmstate->p_name = NULL; /* prepared statement not made yet */
/* Set up remote query information. */
@@ -4440,7 +4511,7 @@ postgresAnalyzeForeignTable(Relation relation,
*/
table = GetForeignTable(RelationGetRelid(relation));
user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
- conn = GetConnection(user, false);
+ conn = GetConnection(user, false, NULL);
/*
* Construct command to get page count for relation.
@@ -4526,7 +4597,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
table = GetForeignTable(RelationGetRelid(relation));
server = GetForeignServer(table->serverid);
user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
- conn = GetConnection(user, false);
+ conn = GetConnection(user, false, NULL);
/*
* Construct cursor that retrieves whole rows from remote.
@@ -4754,7 +4825,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
*/
server = GetForeignServer(serverOid);
mapping = GetUserMapping(GetUserId(), server->serverid);
- conn = GetConnection(mapping, false);
+ conn = GetConnection(mapping, false, NULL);
/* Don't attempt to import collation if remote server hasn't got it */
if (PQserverVersion(conn) < 90100)
@@ -5559,6 +5630,41 @@ postgresGetForeignJoinPaths(PlannerInfo *root,
/* XXX Consider parameterized paths for the join relation */
}
+static int
+postgresReady(ForeignScanState *node)
+{
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+
+ if (fsstate->conn_state->async_query_sent)
+ {
+ /*
+ * We have already started a query, for some other executor node. We
+ * currently can't handle two at the same time (we'd have to create
+ * more connections for that).
+ */
+ return EXEC_READY_BUSY;
+ }
+ else if (fsstate->next_tuple < fsstate->num_tuples)
+ {
+ /* We already have buffered tuples. */
+ return EXEC_READY_MORE;
+ }
+ else if (fsstate->eof_reached)
+ {
+ /* We have already hit the end of the scan. */
+ return EXEC_READY_EOF;
+ }
+ else
+ {
+ /*
+ * We will start a query now, and tell the caller to wait until the
+ * file descriptor says we're ready and then call ExecProcNode.
+ */
+ fetch_more_data_begin(node);
+ return PQsocket(fsstate->conn);
+ }
+}
+
/*
* Assess whether the aggregation, grouping and having operations can be pushed
* down to the foreign server. As a side effect, save information we obtain in
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index eef410db39..a3c11e8d96 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -19,6 +19,15 @@
#include "nodes/pathnodes.h"
#include "utils/relcache.h"
+/*
+ * Extra control information relating to a connection.
+ */
+typedef struct PgFdwConnState
+{
+ /* Has an asynchronous query been sent? */
+ bool async_query_sent;
+} PgFdwConnState;
+
/*
* FDW-specific planner information kept in RelOptInfo.fdw_private for a
* postgres_fdw foreign table. For a baserel, this struct is created by
@@ -129,7 +138,8 @@ extern int set_transmission_modes(void);
extern void reset_transmission_modes(int nestlevel);
/* in connection.c */
-extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt);
+extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt,
+ PgFdwConnState **state);
extern void ReleaseConnection(PGconn *conn);
extern unsigned int GetCursorNumber(PGconn *conn);
extern unsigned int GetPrepStmtNumber(PGconn *conn);
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 83971665e3..4ac211f250 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -1780,31 +1780,31 @@ INSERT INTO b(aa) VALUES('bbb');
INSERT INTO b(aa) VALUES('bbbb');
INSERT INTO b(aa) VALUES('bbbbb');
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
SELECT tableoid::regclass, * FROM b;
SELECT tableoid::regclass, * FROM ONLY a;
UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
SELECT tableoid::regclass, * FROM b;
SELECT tableoid::regclass, * FROM ONLY a;
UPDATE b SET aa = 'new';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
SELECT tableoid::regclass, * FROM b;
SELECT tableoid::regclass, * FROM ONLY a;
UPDATE a SET aa = 'newtoo';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
SELECT tableoid::regclass, * FROM b;
SELECT tableoid::regclass, * FROM ONLY a;
DELETE FROM a;
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
SELECT tableoid::regclass, * FROM b;
SELECT tableoid::regclass, * FROM ONLY a;
@@ -1840,12 +1840,12 @@ insert into bar2 values(4,44,44);
insert into bar2 values(7,77,77);
explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for update;
-select * from bar where f1 in (select f1 from foo) for update;
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for share;
-select * from bar where f1 in (select f1 from foo) for share;
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
-- Check UPDATE with inherited target and an inherited source table
explain (verbose, costs off)
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 17a0df6978..7548deae4d 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1524,6 +1524,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
</thead>
<tbody>
+ <row>
+ <entry><literal>AppendReady</literal></entry>
+ <entry>Waiting for a subplan of Append to be ready.</entry>
+ </row>
<row>
<entry><literal>BackupWaitWalArchive</literal></entry>
<entry>Waiting for WAL files required for a backup to be successfully
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index 01b7b926bf..25cf988606 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -73,6 +73,7 @@
#include "postgres.h"
#include "executor/executor.h"
+#include "executor/execReady.h"
#include "executor/nodeAgg.h"
#include "executor/nodeAppend.h"
#include "executor/nodeBitmapAnd.h"
@@ -741,6 +742,30 @@ ExecEndNode(PlanState *node)
}
}
+/*
+ * ExecReady
+ *
+ * Check whether the node would be able to produce a new tuple without
+ * blocking. EXEC_READY_MORE means a tuple can be returned by ExecProcNode
+ * immediately without waiting. EXEC_READY_EOF means there are no further
+ * tuples to consume. EXEC_READY_UNSUPPORTED means that this node doesn't
+ * support asynchronous interaction. EXEC_READY_BUSY means that this node
+ * currently can't provide asynchronous service. Any other value is a file
+ * descriptor which can be used to wait until the node is ready to produce a
+ * tuple.
+ */
+int
+ExecReady(PlanState *node)
+{
+ switch (nodeTag(node))
+ {
+ case T_ForeignScanState:
+ return ExecForeignScanReady((ForeignScanState *) node);
+ default:
+ return EXEC_READY_UNSUPPORTED;
+ }
+}
+
/*
* ExecShutdownNode
*
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index 88919e62fa..1c0935a079 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -59,8 +59,11 @@
#include "executor/execdebug.h"
#include "executor/execPartition.h"
+#include "executor/execReady.h"
#include "executor/nodeAppend.h"
#include "miscadmin.h"
+#include "pgstat.h"
+#include "storage/latch.h"
/* Shared state for parallel-aware Append. */
struct ParallelAppendState
@@ -219,9 +222,207 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
/* For parallel query, this will be overridden later. */
appendstate->choose_next_subplan = choose_next_subplan_locally;
+ /*
+ * Initially we consider all subplans to be potentially asynchronous.
+ */
+ appendstate->asyncplans = (PlanState **) palloc(nplans * sizeof(PlanState *));
+ appendstate->asyncfds = (int *) palloc0(nplans * sizeof(int));
+ appendstate->nasyncplans = nplans;
+ memcpy(appendstate->asyncplans, appendstate->appendplans, nplans * sizeof(PlanState *));
+ appendstate->lastreadyplan = 0;
+
return appendstate;
}
+/*
+ * Forget about an asynchronous subplan, given an async subplan index. Return
+ * the index of the next subplan.
+ */
+static int
+forget_async_subplan(AppendState *node, int i)
+{
+ int last = node->nasyncplans - 1;
+
+ if (i == last)
+ {
+ /* This was the last subplan, forget it and move to first. */
+ i = 0;
+ if (node->lastreadyplan == last)
+ node->lastreadyplan = 0;
+ }
+ else
+ {
+ /*
+ * Move the last one here (cheaper than memmov'ing the whole array
+ * down and we don't care about the order).
+ */
+ node->asyncplans[i] = node->asyncplans[last];
+ node->asyncfds[i] = node->asyncfds[last];
+ }
+ --node->nasyncplans;
+
+ return i;
+}
+
+/*
+ * Wait for the first asynchronous subplan's file descriptor to be ready to
+ * read or error, and then ask it for a tuple.
+ *
+ * This is called by append_next_async when every async subplan has provided a
+ * file descriptor to wait on, so we must begin waiting.
+ */
+static TupleTableSlot *
+append_next_async_wait(AppendState *node)
+{
+ while (node->nasyncplans > 0)
+ {
+ WaitEventSet *set;
+ WaitEvent event;
+ int i;
+
+ /*
+ * For now there is no facility to remove fds from WaitEventSets when
+ * they are no longer interesting, so we allocate, populate, free
+ * every time, a la select(). If we had RemoveWaitEventFromSet, we
+ * could use the same WaitEventSet object for the life of the append
+ * node, and add/remove as we go, a la epoll/kqueue.
+ *
+ * Note: We could make a single call to WaitEventSetWait and have a
+ * big enough output event buffer to learn about readiness on all
+ * interesting sockets and loop over those, but one implementation can
+ * only tell us about a single socket at a time, so we need to be
+ * prepared to call WaitEventSetWait repeatedly.
+ */
+ set = CreateWaitEventSet(CurrentMemoryContext, node->nasyncplans + 1);
+ AddWaitEventToSet(set, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, NULL,
+ NULL);
+ for (i = 0; i < node->nasyncplans; ++i)
+ {
+ Assert(node->asyncfds[i] > 0);
+ AddWaitEventToSet(set, WL_SOCKET_READABLE, node->asyncfds[i], NULL,
+ NULL);
+ }
+ i = WaitEventSetWait(set, -1, &event, 1, WAIT_EVENT_APPEND_READY);
+ Assert(i > 0);
+ FreeWaitEventSet(set);
+
+ if (event.events & WL_SOCKET_READABLE)
+ {
+ /* Linear search for the node that told us to wait for this fd. */
+ for (i = 0; i < node->nasyncplans; ++i)
+ {
+ if (event.fd == node->asyncfds[i])
+ {
+ TupleTableSlot *result;
+
+ /*
+ * We assume that because the fd is ready, it can produce
+ * a tuple now, which is not perfect. An improvement
+ * would be if it could say 'not yet, I'm still not
+ * ready', so eg postgres_fdw could PQconsumeInput and
+ * then say 'I need more input'.
+ */
+ result = ExecProcNode(node->asyncplans[i]);
+ if (!TupIsNull(result))
+ {
+ /*
+ * Remember this plan so that append_next_async will
+ * keep trying this subplan first until it stops
+ * feeding us buffered tuples.
+ */
+ node->lastreadyplan = i;
+ /* We can stop waiting for this fd. */
+ node->asyncfds[i] = 0;
+ return result;
+ }
+ else
+ {
+ /*
+ * This subplan has reached EOF. We'll go back and
+ * wait for another one.
+ */
+ forget_async_subplan(node, i);
+ break;
+ }
+ }
+ }
+ }
+ }
+ /*
+ * We visited every ready subplan, tried to pull a tuple, and they all
+ * reported EOF. There is no more async data available.
+ */
+ return NULL;
+}
+
+/*
+ * Fetch the next tuple available from any asynchronous subplan. If none can
+ * provide a tuple immediately, wait for the first one that is ready to
+ * provide a tuple. Return NULL when there are no more tuples available.
+ */
+static TupleTableSlot *
+append_next_async(AppendState *node)
+{
+ int count;
+ int i;
+
+ /*
+ * We'll start our scan of subplans at the last one that was able to give
+ * us a tuple, if there was one. It may be able to give us a new tuple
+ * straight away so we can leave early.
+ */
+ i = node->lastreadyplan;
+
+ /* Loop until we've visited each potentially async subplan. */
+ for (count = node->nasyncplans; count > 0; --count)
+ {
+ /*
+ * If we don't already have a file descriptor to wait on for this
+ * subplan, see if it is ready.
+ */
+ if (node->asyncfds[i] == 0)
+ {
+ int ready = ExecReady(node->asyncplans[i]);
+
+ switch (ready)
+ {
+ case EXEC_READY_MORE:
+ /* The node has a buffered tuple for us. */
+ return ExecProcNode(node->asyncplans[i]);
+
+ case EXEC_READY_UNSUPPORTED:
+ case EXEC_READY_EOF:
+ case EXEC_READY_BUSY:
+ /* This subplan can't give us anything asynchronously. */
+ i = forget_async_subplan(node, i);
+ continue;
+
+ default:
+ /* We have a new file descriptor to wait for. */
+ Assert(ready > 0);
+ node->asyncfds[i] = ready;
+ node->lastreadyplan = 0;
+ break;
+ }
+ }
+
+ /* Move on to the next plan (circular). */
+ i = (i + 1) % node->nasyncplans;
+ }
+
+ /* We might have removed all subplans; if so we can leave now. */
+ if (node->nasyncplans == 0)
+ return NULL;
+
+ /*
+ * If we reached here, then all remaining async subplans have given us a
+ * file descriptor to wait for. So do that, and pull a tuple as soon as
+ * one is ready.
+ */
+ return append_next_async_wait(node);
+}
+
+
/* ----------------------------------------------------------------
* ExecAppend
*
@@ -233,6 +434,16 @@ ExecAppend(PlanState *pstate)
{
AppendState *node = castNode(AppendState, pstate);
+ /* First, drain all asynchronous subplans as they become ready. */
+ if (node->nasyncplans > 0)
+ {
+ TupleTableSlot *result = append_next_async(node);
+
+ if (!TupIsNull(result))
+ return result;
+ }
+ Assert(node->nasyncplans == 0);
+
if (node->as_whichplan < 0)
{
/* Nothing to do if there are no subplans */
@@ -395,6 +606,9 @@ ExecAppendInitializeDSM(AppendState *node,
node->as_pstate = pstate;
node->choose_next_subplan = choose_next_subplan_for_leader;
+
+ /* TODO: for now disable async when running in parallel */
+ node->nasyncplans = 0;
}
/* ----------------------------------------------------------------
diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c
index 513471ab9b..fe32cbc123 100644
--- a/src/backend/executor/nodeForeignscan.c
+++ b/src/backend/executor/nodeForeignscan.c
@@ -23,6 +23,7 @@
#include "postgres.h"
#include "executor/executor.h"
+#include "executor/execReady.h"
#include "executor/nodeForeignscan.h"
#include "foreign/fdwapi.h"
#include "utils/memutils.h"
@@ -384,3 +385,21 @@ ExecShutdownForeignScan(ForeignScanState *node)
if (fdwroutine->ShutdownForeignScan)
fdwroutine->ShutdownForeignScan(node);
}
+
+/* ----------------------------------------------------------------
+ * ExecForeignScanReady
+ *
+ * Checks if the foreign scan can emit data asynchronously
+ * using socket readiness as an indicator.
+ * ----------------------------------------------------------------
+ */
+int
+ExecForeignScanReady(ForeignScanState *node)
+{
+ FdwRoutine *fdwroutine = node->fdwroutine;
+
+ if (fdwroutine->Ready)
+ return fdwroutine->Ready(node);
+ else
+ return EXEC_READY_UNSUPPORTED;
+}
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 8116b23614..88f0d376c3 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3752,6 +3752,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
switch (w)
{
+ case WAIT_EVENT_APPEND_READY:
+ event_name = "AppendReady";
+ break;
case WAIT_EVENT_BACKUP_WAIT_WAL_ARCHIVE:
event_name = "BackupWaitWalArchive";
break;
diff --git a/src/include/executor/execReady.h b/src/include/executor/execReady.h
index e69de29bb2..01410ea7bc 100644
--- a/src/include/executor/execReady.h
+++ b/src/include/executor/execReady.h
@@ -0,0 +1,31 @@
+/*-------------------------------------------------------------------------
+ *
+ * execReady.h
+ * Values used by FDW and the executor for async tuple iteration.
+ *
+ * Portions Copyright (c) 2019, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/include/executor/execReady.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef EXECREADY_H
+#define EXECREADY_H
+
+/*
+ * Asynchronous processing is not currently available (because an asynchronous
+ * request is already in progress).
+ */
+#define EXEC_READY_BUSY -3
+
+/* There are no more tuples. */
+#define EXEC_READY_EOF -2
+
+/* This FDW or executor node does not support asynchronous processing. */
+#define EXEC_READY_UNSUPPORTED -1
+
+/* More tuples are available immediately without waiting. */
+#define EXEC_READY_MORE 0
+
+#endif
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 415e117407..243254e61c 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -224,6 +224,7 @@ extern void EvalPlanQualEnd(EPQState *epqstate);
extern PlanState *ExecInitNode(Plan *node, EState *estate, int eflags);
extern void ExecSetExecProcNode(PlanState *node, ExecProcNodeMtd function);
extern Node *MultiExecProcNode(PlanState *node);
+extern int ExecReady(PlanState *node);
extern void ExecEndNode(PlanState *node);
extern bool ExecShutdownNode(PlanState *node);
extern void ExecSetTupleBound(int64 tuples_needed, PlanState *child_node);
diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h
index 326d713ebf..151490572d 100644
--- a/src/include/executor/nodeForeignscan.h
+++ b/src/include/executor/nodeForeignscan.h
@@ -30,5 +30,6 @@ extern void ExecForeignScanReInitializeDSM(ForeignScanState *node,
extern void ExecForeignScanInitializeWorker(ForeignScanState *node,
ParallelWorkerContext *pwcxt);
extern void ExecShutdownForeignScan(ForeignScanState *node);
+extern int ExecForeignScanReady(ForeignScanState *node);
#endif /* NODEFOREIGNSCAN_H */
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index 95556dfb15..253288e3c0 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -170,6 +170,8 @@ typedef List *(*ReparameterizeForeignPathByChild_function) (PlannerInfo *root,
List *fdw_private,
RelOptInfo *child_rel);
+typedef int (*Ready_function) (ForeignScanState *node);
+
/*
* FdwRoutine is the struct returned by a foreign-data wrapper's handler
* function. It provides pointers to the callback functions needed by the
@@ -246,6 +248,9 @@ typedef struct FdwRoutine
/* Support functions for path reparameterization. */
ReparameterizeForeignPathByChild_function ReparameterizeForeignPathByChild;
+
+ /* Support functions for asynchronous processing */
+ Ready_function Ready;
} FdwRoutine;
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 0b42dd6f94..70d112cced 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1231,6 +1231,11 @@ struct AppendState
struct PartitionPruneState *as_prune_state;
Bitmapset *as_valid_subplans;
bool (*choose_next_subplan) (AppendState *);
+
+ PlanState **asyncplans;
+ int *asyncfds;
+ int nasyncplans;
+ int lastreadyplan;
};
/* ----------------
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 807a9c1edf..a560797b82 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -846,6 +846,7 @@ typedef enum
*/
typedef enum
{
+ WAIT_EVENT_APPEND_READY,
WAIT_EVENT_BACKUP_WAIT_WAL_ARCHIVE = PG_WAIT_IPC,
WAIT_EVENT_BGWORKER_SHUTDOWN,
WAIT_EVENT_BGWORKER_STARTUP,
On Mon, Aug 31, 2020 at 6:20 PM Etsuro Fujita <etsuro.fujita@gmail.com> wrote:
* I know your patch is a POC one, but one concern about it (and
Horiguchi-san's patch set) is concurrent data fetches by multiple
foreign scan nodes using the same connection in the case of
postgres_fdw. Here is an example causing an error:
select * from pt1, pt2 where pt2.a = 't22' or pt2.a = 't23';
ERROR: another command is already in progress
CONTEXT: remote SQL command: DECLARE c4 CURSOR FOR
SELECT a, b FROM public.t22 WHERE (((a = 't22'::text) OR (a = 't23'::text)))
(Horiguchi-san’s patch set doesn't work for this query either, causing
the same error. Though, it looks like he intended to handle cases
like this by a queuing system added to postgres_fdw to process such
concurrent data fetches.)
I was wrong here; Horiguchi-san's patch set works well for this query.
Maybe I did something wrong when testing his patch set. Sorry for
that.
Best regards,
Etsuro Fujita
Fujita-san, thank you for taking time!
At Mon, 31 Aug 2020 19:10:39 +0900, Etsuro Fujita <etsuro.fujita@gmail.com> wrote in
On Mon, Aug 31, 2020 at 6:20 PM Etsuro Fujita <etsuro.fujita@gmail.com> wrote:
* I know your patch is a POC one, but one concern about it (and
Horiguchi-san's patch set) is concurrent data fetches by multiple
foreign scan nodes using the same connection in the case of
postgres_fdw. Here is an example causing an error:select * from pt1, pt2 where pt2.a = 't22' or pt2.a = 't23';
ERROR: another command is already in progress
CONTEXT: remote SQL command: DECLARE c4 CURSOR FOR
SELECT a, b FROM public.t22 WHERE (((a = 't22'::text) OR (a = 't23'::text)))(Horiguchi-san’s patch set doesn't work for this query either, causing
the same error. Though, it looks like he intended to handle cases
like this by a queuing system added to postgres_fdw to process such
concurrent data fetches.)I was wrong here; Horiguchi-san's patch set works well for this query.
Maybe I did something wrong when testing his patch set. Sorry for
that.
Yeah. postgresIterateForeignScan calls vacate_connection() to make the
underlying connection available if a server connection is busy with
another remote query. The mechanism is backed by a waiting queue
(add_async_waiter, move_to_next_waiter, remove_async_node).
regards.
--
Kyotaro Horiguchi
NTT Open Source Software Center
On Tue, Sep 1, 2020 at 9:45 AM Kyotaro Horiguchi
<horikyota.ntt@gmail.com> wrote:
At Mon, 31 Aug 2020 19:10:39 +0900, Etsuro Fujita <etsuro.fujita@gmail.com> wrote in
On Mon, Aug 31, 2020 at 6:20 PM Etsuro Fujita <etsuro.fujita@gmail.com> wrote:
* I know your patch is a POC one, but one concern about it (and
Horiguchi-san's patch set) is concurrent data fetches by multiple
foreign scan nodes using the same connection in the case of
postgres_fdw. Here is an example causing an error:select * from pt1, pt2 where pt2.a = 't22' or pt2.a = 't23';
ERROR: another command is already in progress
CONTEXT: remote SQL command: DECLARE c4 CURSOR FOR
SELECT a, b FROM public.t22 WHERE (((a = 't22'::text) OR (a = 't23'::text)))(Horiguchi-san’s patch set doesn't work for this query either, causing
the same error. Though, it looks like he intended to handle cases
like this by a queuing system added to postgres_fdw to process such
concurrent data fetches.)I was wrong here; Horiguchi-san's patch set works well for this query.
Maybe I did something wrong when testing his patch set. Sorry for
that.Yeah. postgresIterateForeignScan calls vacate_connection() to make the
underlying connection available if a server connection is busy with
another remote query. The mechanism is backed by a waiting queue
(add_async_waiter, move_to_next_waiter, remove_async_node).
Thanks for the explanation, Horiguchi-san!
So your version of the patch processes the query successfully, because
1) before performing an asynchronous data fetch of ft22, it waits for
the in-progress data fetch of ft12 using the same connection to
complete so that the data fetch of ft22 can be done, and 2) before
performing an asynchronous data fetch of ft23, it waits for the
in-progress data fetch of ft13 using the same connection to complete
so that the data fetch of ft23 can be done. Right? If so, I think in
some cases such handling would impact performance negatively.
Consider the same query with LIMIT processed by your version:
explain verbose select * from pt1, pt2 where pt2.a = 't22' or pt2.a =
't23' limit 1;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------
Limit (cost=200.00..200.01 rows=1 width=128)
Output: pt1.a, pt1.b, pt2.a, pt2.b
-> Nested Loop (cost=200.00..903.87 rows=50220 width=128)
Output: pt1.a, pt1.b, pt2.a, pt2.b
-> Append (cost=100.00..151.85 rows=2790 width=64)
Async subplans: 3
-> Async Foreign Scan on public.ft11 pt1_1
(cost=100.00..137.90 rows=930 width=64)
Output: pt1_1.a, pt1_1.b
Remote SQL: SELECT a, b FROM public.t11
-> Async Foreign Scan on public.ft12 pt1_2
(cost=100.00..137.90 rows=930 width=64)
Output: pt1_2.a, pt1_2.b
Remote SQL: SELECT a, b FROM public.t12
-> Async Foreign Scan on public.ft13 pt1_3
(cost=100.00..137.90 rows=930 width=64)
Output: pt1_3.a, pt1_3.b
Remote SQL: SELECT a, b FROM public.t13
-> Materialize (cost=100.00..124.31 rows=18 width=64)
Output: pt2.a, pt2.b
-> Append (cost=100.00..124.22 rows=18 width=64)
Async subplans: 2
-> Async Foreign Scan on public.ft22 pt2_1
(cost=100.00..124.13 rows=9 width=64)
Output: pt2_1.a, pt2_1.b
Remote SQL: SELECT a, b FROM public.t22
WHERE (((a = 't22'::text) OR (a = 't23'::text)))
-> Async Foreign Scan on public.ft23 pt2_2
(cost=100.00..124.13 rows=9 width=64)
Output: pt2_2.a, pt2_2.b
Remote SQL: SELECT a, b FROM public.t23
WHERE (((a = 't22'::text) OR (a = 't23'::text)))
(25 rows)
I think your version would require extra time to process this query
compared to HEAD due to such handling. This query throws an error
with your version, though:
select * from pt1, pt2 where pt2.a = 't22' or pt2.a = 't23' limit 1;
ERROR: another command is already in progress
CONTEXT: remote SQL command: CLOSE c1
Best regards,
Etsuro Fujita