CustomScan under the Gather node?
Hello,
What enhancement will be necessary to implement similar feature of
partial seq-scan using custom-scan interface?
It seems to me callbacks on the three points below are needed.
* ExecParallelEstimate
* ExecParallelInitializeDSM
* ExecParallelInitializeWorker
Anything else?
Does ForeignScan also need equivalent enhancement?
Background of my motivation is the slides below:
http://www.slideshare.net/kaigai/sqlgpussd-english
(LT slides in JPUG conference last Dec)
I'm under investigation of SSD-to-GPU direct feature on top of
the custom-scan interface. It intends to load a bunch of data
blocks on NVMe-SSD to GPU RAM using peer-to-peer DMA, prior to
data loading onto CPU/RAM. (Probably, it shall be loaded only
all-visible blocks like as index-only scan.)
Once we load the data blocks onto GPU RAM, we can reduce rows
to be filtered out later but consumes CPU RAM.
An expected major bottleneck is CPU thread which issues the
peer-to-peer DMA requests to the device, rather than GPU tasks.
So, utilization of parallel execution is a natural thought.
However, a CustomScan node that takes underlying PartialSeqScan
node is not sufficient because it once loads the data blocks
onto CPU RAM. P2P DMA does not make sense.
The expected "GpuSsdScan" on CustomScan will reference a shared
block-index to be incremented by multiple backend, then it
enqueues P2P DMA request (if all visible) to the device driver.
Then it receives the rows only visible towards the scan qualifiers.
It is almost equivalent to SeqScan, but wants to bypass heap layer
to utilize SSD-to-GPU direct data translation path.
Thanks,
--
NEC Business Creation Division / PG-Strom Project
KaiGai Kohei <kaigai@ak.jp.nec.com>
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Tue, Jan 26, 2016 at 12:00 PM, Kouhei Kaigai <kaigai@ak.jp.nec.com>
wrote:
Hello,
What enhancement will be necessary to implement similar feature of
partial seq-scan using custom-scan interface?It seems to me callbacks on the three points below are needed.
* ExecParallelEstimate
* ExecParallelInitializeDSM
* ExecParallelInitializeWorkerAnything else?
I don't think so.
Does ForeignScan also need equivalent enhancement?
I think this depends on the way ForeignScan is supposed to be
parallelized, basically if it needs to coordinate any information
with other set of workers, then it will require such an enhancement.
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com
-----Original Message-----
From: Amit Kapila [mailto:amit.kapila16@gmail.com]
Sent: Wednesday, January 27, 2016 2:30 PM
To: Kaigai Kouhei(海外 浩平)
Cc: pgsql-hackers@postgresql.org
Subject: ##freemail## Re: [HACKERS] CustomScan under the Gather node?On Tue, Jan 26, 2016 at 12:00 PM, Kouhei Kaigai <kaigai@ak.jp.nec.com> wrote:
Hello,
What enhancement will be necessary to implement similar feature of
partial seq-scan using custom-scan interface?It seems to me callbacks on the three points below are needed.
* ExecParallelEstimate
* ExecParallelInitializeDSM
* ExecParallelInitializeWorkerAnything else?
I don't think so.
Does ForeignScan also need equivalent enhancement?
I think this depends on the way ForeignScan is supposed to be
parallelized, basically if it needs to coordinate any information
with other set of workers, then it will require such an enhancement.
After the post yesterday, I reminded an possible scenario around FDW
if it manages own private storage, like cstore_fdw.
Probably, ForeignScan node performing on columnar store (for example)
will need a coordination information like as partial seq-scan doing.
It is a case very similar to the implementation on local storage.
On the other hands, if we try postgres_fdw (or others) to get parallelized
with background worker, I doubt whether we need this coordination information
on local side. Remote query will have an additional qualifier to skip blocks
already fetched for this purpose.
At least, it does not needs something special enhancement.
Thanks,
--
NEC Business Creation Division / PG-Strom Project
KaiGai Kohei <kaigai@ak.jp.nec.com>
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Import Notes
Resolved by subject fallback
On Tue, Jan 26, 2016 at 1:30 AM, Kouhei Kaigai <kaigai@ak.jp.nec.com> wrote:
What enhancement will be necessary to implement similar feature of
partial seq-scan using custom-scan interface?It seems to me callbacks on the three points below are needed.
* ExecParallelEstimate
* ExecParallelInitializeDSM
* ExecParallelInitializeWorkerAnything else?
Does ForeignScan also need equivalent enhancement?
For postgres_fdw, running the query from a parallel worker would
change the transaction semantics. Suppose you begin a transaction,
UPDATE data on the foreign server, and then run a parallel query. If
the leader performs the ForeignScan it will see the uncommitted
UPDATE, but a worker would have to make its own connection which not
be part of the same transaction and which would therefore not see the
update. That's a problem.
Also, for postgres_fdw, and many other FDWs I suspect, the assumption
is that most of the work is being done on the remote side, so doing
the work in a parallel worker doesn't seem super interesting. Instead
of incurring transfer costs to move the data from remote to local, we
incur two sets of transfer costs: first remote to local, then worker
to leader. Ouch. I think a more promising line of inquiry is to try
to provide asynchronous execution when we have something like:
Append
-> Foreign Scan
-> Foreign Scan
...so that we can return a row from whichever Foreign Scan receives
data back from the remote server first.
So it's not impossible that an FDW author could want this, but mostly
probably not. I think.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Tue, Jan 26, 2016 at 1:30 AM, Kouhei Kaigai <kaigai@ak.jp.nec.com> wrote:
What enhancement will be necessary to implement similar feature of
partial seq-scan using custom-scan interface?It seems to me callbacks on the three points below are needed.
* ExecParallelEstimate
* ExecParallelInitializeDSM
* ExecParallelInitializeWorkerAnything else?
Does ForeignScan also need equivalent enhancement?For postgres_fdw, running the query from a parallel worker would
change the transaction semantics. Suppose you begin a transaction,
UPDATE data on the foreign server, and then run a parallel query. If
the leader performs the ForeignScan it will see the uncommitted
UPDATE, but a worker would have to make its own connection which not
be part of the same transaction and which would therefore not see the
update. That's a problem.
Ah, yes, as long as FDW driver ensure the remote session has no
uncommitted data, pg_export_snapshot() might provide us an opportunity,
however, once a session writes something, FDW driver has to prohibit it.
Also, for postgres_fdw, and many other FDWs I suspect, the assumption
is that most of the work is being done on the remote side, so doing
the work in a parallel worker doesn't seem super interesting. Instead
of incurring transfer costs to move the data from remote to local, we
incur two sets of transfer costs: first remote to local, then worker
to leader. Ouch. I think a more promising line of inquiry is to try
to provide asynchronous execution when we have something like:Append
-> Foreign Scan
-> Foreign Scan...so that we can return a row from whichever Foreign Scan receives
data back from the remote server first.So it's not impossible that an FDW author could want this, but mostly
probably not. I think.
Yes, I also have same opinion. Likely, local parallelism is not
valuable for the class of FDWs that obtains data from the remote
server (e.g, postgres_fdw, ...), expect for the case when packing
and unpacking cost over the network is major bottleneck.
On the other hands, it will be valuable for the class of FDW that
performs as a wrapper to local data structure, as like current
partial seq-scan doing. (e.g, file_fdw, ...)
Its data source is not under the transaction control, and 'remote
execution' of these FDWs are eventually executed on the local
computing resources.
If I would make a proof-of-concept patch with interface itself, it
seems to me file_fdw may be a good candidate for this enhancement.
It is not a field for postgres_fdw.
Thanks,
--
NEC Business Creation Division / PG-Strom Project
KaiGai Kohei <kaigai@ak.jp.nec.com>
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Import Notes
Resolved by subject fallback
If I would make a proof-of-concept patch with interface itself, it
seems to me file_fdw may be a good candidate for this enhancement.
It is not a field for postgres_fdw.
The attached patch is enhancement of FDW/CSP interface and PoC feature
of file_fdw to scan source file partially. It was smaller enhancement
than my expectations.
It works as follows. This query tried to read 20M rows from a CSV file,
using 3 background worker processes.
postgres=# set max_parallel_degree = 3;
SET
postgres=# explain analyze select * from test_csv where id % 20 = 6;
QUERY PLAN
--------------------------------------------------------------------------------
Gather (cost=1000.00..194108.60 rows=94056 width=52)
(actual time=0.570..19268.010 rows=2000000 loops=1)
Number of Workers: 3
-> Parallel Foreign Scan on test_csv (cost=0.00..183703.00 rows=94056 width=52)
(actual time=0.180..12744.655 rows=500000 loops=4)
Filter: ((id % 20) = 6)
Rows Removed by Filter: 9500000
Foreign File: /tmp/testdata.csv
Foreign File Size: 1504892535
Planning time: 0.147 ms
Execution time: 19330.201 ms
(9 rows)
I'm not 100% certain whether this implementation of file_fdw is reasonable
for partial read, however, the callbacks located on the following functions
enabled to implement a parallel-aware custom logic based on the coordination
information.
* ExecParallelEstimate
* ExecParallelInitializeDSM
* ExecParallelInitializeWorker
Thanks,
--
NEC Business Creation Division / PG-Strom Project
KaiGai Kohei <kaigai@ak.jp.nec.com>
Show quoted text
-----Original Message-----
From: Kaigai Kouhei(海外 浩平)
Sent: Thursday, January 28, 2016 9:33 AM
To: 'Robert Haas'
Cc: pgsql-hackers@postgresql.org
Subject: Re: [HACKERS] CustomScan under the Gather node?On Tue, Jan 26, 2016 at 1:30 AM, Kouhei Kaigai <kaigai@ak.jp.nec.com> wrote:
What enhancement will be necessary to implement similar feature of
partial seq-scan using custom-scan interface?It seems to me callbacks on the three points below are needed.
* ExecParallelEstimate
* ExecParallelInitializeDSM
* ExecParallelInitializeWorkerAnything else?
Does ForeignScan also need equivalent enhancement?For postgres_fdw, running the query from a parallel worker would
change the transaction semantics. Suppose you begin a transaction,
UPDATE data on the foreign server, and then run a parallel query. If
the leader performs the ForeignScan it will see the uncommitted
UPDATE, but a worker would have to make its own connection which not
be part of the same transaction and which would therefore not see the
update. That's a problem.Ah, yes, as long as FDW driver ensure the remote session has no
uncommitted data, pg_export_snapshot() might provide us an opportunity,
however, once a session writes something, FDW driver has to prohibit it.Also, for postgres_fdw, and many other FDWs I suspect, the assumption
is that most of the work is being done on the remote side, so doing
the work in a parallel worker doesn't seem super interesting. Instead
of incurring transfer costs to move the data from remote to local, we
incur two sets of transfer costs: first remote to local, then worker
to leader. Ouch. I think a more promising line of inquiry is to try
to provide asynchronous execution when we have something like:Append
-> Foreign Scan
-> Foreign Scan...so that we can return a row from whichever Foreign Scan receives
data back from the remote server first.So it's not impossible that an FDW author could want this, but mostly
probably not. I think.Yes, I also have same opinion. Likely, local parallelism is not
valuable for the class of FDWs that obtains data from the remote
server (e.g, postgres_fdw, ...), expect for the case when packing
and unpacking cost over the network is major bottleneck.On the other hands, it will be valuable for the class of FDW that
performs as a wrapper to local data structure, as like current
partial seq-scan doing. (e.g, file_fdw, ...)
Its data source is not under the transaction control, and 'remote
execution' of these FDWs are eventually executed on the local
computing resources.If I would make a proof-of-concept patch with interface itself, it
seems to me file_fdw may be a good candidate for this enhancement.
It is not a field for postgres_fdw.Thanks,
--
NEC Business Creation Division / PG-Strom Project
KaiGai Kohei <kaigai@ak.jp.nec.com>
Attachments:
pgsql-v9.6-parallel-cspfdw.v1.patchapplication/octet-stream; name=pgsql-v9.6-parallel-cspfdw.v1.patchDownload
contrib/file_fdw/file_fdw.c | 86 ++++++++++++++++++++++++++++++++++
doc/src/sgml/custom-scan.sgml | 44 +++++++++++++++++
doc/src/sgml/fdwhandler.sgml | 62 ++++++++++++++++++++++++
src/backend/commands/copy.c | 62 ++++++++++++++++++++----
src/backend/executor/execParallel.c | 26 ++++++++++
src/backend/executor/nodeCustom.c | 45 ++++++++++++++++++
src/backend/executor/nodeForeignscan.c | 62 ++++++++++++++++++++++++
src/include/commands/copy.h | 4 ++
src/include/executor/nodeCustom.h | 11 +++++
src/include/executor/nodeForeignscan.h | 8 ++++
src/include/foreign/fdwapi.h | 14 ++++++
src/include/nodes/execnodes.h | 14 +++++-
12 files changed, 427 insertions(+), 11 deletions(-)
diff --git a/contrib/file_fdw/file_fdw.c b/contrib/file_fdw/file_fdw.c
index f13316b..788b9e0 100644
--- a/contrib/file_fdw/file_fdw.c
+++ b/contrib/file_fdw/file_fdw.c
@@ -29,6 +29,7 @@
#include "nodes/makefuncs.h"
#include "optimizer/cost.h"
#include "optimizer/pathnode.h"
+#include "optimizer/paths.h"
#include "optimizer/planmain.h"
#include "optimizer/restrictinfo.h"
#include "optimizer/var.h"
@@ -131,6 +132,14 @@ static void fileEndForeignScan(ForeignScanState *node);
static bool fileAnalyzeForeignTable(Relation relation,
AcquireSampleRowsFunc *func,
BlockNumber *totalpages);
+static Size fileEstimateDSMForeignScan(ForeignScanState *node,
+ ParallelContext *pcxt);
+static void fileInitializeDSMForeignScan(ForeignScanState *node,
+ ParallelContext *pcxt,
+ void *coordinate);
+static void fileInitializeWorkerForeignScan(ForeignScanState *node,
+ shm_toc *toc,
+ void *coordinate);
/*
* Helper functions
@@ -170,6 +179,9 @@ file_fdw_handler(PG_FUNCTION_ARGS)
fdwroutine->ReScanForeignScan = fileReScanForeignScan;
fdwroutine->EndForeignScan = fileEndForeignScan;
fdwroutine->AnalyzeForeignTable = fileAnalyzeForeignTable;
+ fdwroutine->EstimateDSMForeignScan = fileEstimateDSMForeignScan;
+ fdwroutine->InitializeDSMForeignScan = fileInitializeDSMForeignScan;
+ fdwroutine->InitializeWorkerForeignScan = fileInitializeWorkerForeignScan;
PG_RETURN_POINTER(fdwroutine);
}
@@ -502,6 +514,8 @@ fileGetForeignPaths(PlannerInfo *root,
Cost total_cost;
List *columns;
List *coptions = NIL;
+ int parallel_threshold = 1000;
+ int parallel_degree = 1;
/* Decide whether to selectively perform binary conversion */
if (check_selective_binary_conversion(baserel,
@@ -530,6 +544,53 @@ fileGetForeignPaths(PlannerInfo *root,
coptions));
/*
+ * Also add a pair of GatherPath and ForeignPath if it is capable to
+ * run in parallel. See the logic in create_parallel_paths().
+ */
+ if (baserel->consider_parallel &&
+ fdw_private->pages >= parallel_threshold)
+ {
+ Path *partial_path;
+ Cost cpu_per_tuple;
+
+ while (fdw_private->pages > parallel_threshold * 3 &&
+ parallel_degree < max_parallel_degree)
+ {
+ parallel_degree++;
+ parallel_threshold *= 3;
+ if (parallel_threshold >= PG_INT32_MAX / 3)
+ break;
+ }
+
+ /*
+ * adjust the cost - we assume a part of the total_cost consumed
+ * by the cpu_per_tuple can be reduced, but inter-processes
+ * communication cost (added by create_gather_path) will be added.
+ */
+ cpu_per_tuple = (10 * cpu_tuple_cost +
+ baserel->baserestrictcost.per_tuple);
+ total_cost -= cpu_per_tuple * fdw_private->ntuples;
+
+ /* make a parallel aware pathnode */
+ partial_path = (Path *)
+ create_foreignscan_path(root, baserel,
+ baserel->rows,
+ startup_cost,
+ total_cost,
+ NIL, /* no pathkeys */
+ NULL, /* no outer rel either */
+ NULL, /* no extra plan */
+ coptions);
+ partial_path->parallel_aware = parallel_degree > 0 ? true : false;
+ partial_path->parallel_safe = baserel->consider_parallel;
+ partial_path->parallel_degree = parallel_degree;
+
+ /* add a partial path and makes gather path */
+ add_partial_path(baserel, partial_path);
+ generate_gather_paths(root, baserel);
+ }
+
+ /*
* If data file was sorted, and we knew it somehow, we could insert
* appropriate pathkeys into the ForeignPath node to tell the planner
* that.
@@ -761,6 +822,31 @@ fileAnalyzeForeignTable(Relation relation,
return true;
}
+static Size
+fileEstimateDSMForeignScan(ForeignScanState *node,
+ ParallelContext *pcxt)
+{
+ return sizeof(pg_atomic_uint32);
+}
+
+static void
+fileInitializeDSMForeignScan(ForeignScanState *node,
+ ParallelContext *pcxt,
+ void *coordinate)
+{
+ pg_atomic_write_u32((pg_atomic_uint32 *)coordinate, 0);
+}
+
+static void
+fileInitializeWorkerForeignScan(ForeignScanState *node,
+ shm_toc *toc,
+ void *coordinate)
+{
+ FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
+
+ SetCopyFromPartial(festate->cstate, (pg_atomic_uint32 *)coordinate);
+}
+
/*
* check_selective_binary_conversion
*
diff --git a/doc/src/sgml/custom-scan.sgml b/doc/src/sgml/custom-scan.sgml
index 5bba125..efe6930 100644
--- a/doc/src/sgml/custom-scan.sgml
+++ b/doc/src/sgml/custom-scan.sgml
@@ -316,6 +316,50 @@ void (*RestrPosCustomScan) (CustomScanState *node);
<para>
<programlisting>
+Size (*EstimateDSMCustomScan) (CustomScanState *node,
+ ParallelContext *pcxt);
+</programlisting>
+ It
+
+ This callback is called when <structname>Gather</> node tries to
+ execute underlying sub-plan and this custom scan provider intends
+ to perform coordinately with other background worker processes.
+ The coordination structure shall be allocated on the shared memory
+ segment, then custom scan provider can use this ares for inter-
+ processes communication. This callback informs the core backend its
+ required length of the coordination information, and is optional if
+ parallel aware. Elsewhere, the pointer can be set to <literal>NULL</>.
+ </para>
+
+ <para>
+<programlisting>
+void (*InitializeDSMCustomScan) (CustomScanState *node,
+ ParallelContext *pcxt,
+ void *coordinate);
+</programlisting>
+
+ It set up the coordination information, with length which was informed
+ by <functionname>EstimateDSMCustomScan</>, to be referenced by the
+ background worker processes under the <structname>Gather</> node.
+ This callback is optional, and needs only be supplied if relevant
+ <structname>CustomPath</> is added as parallel executable path.
+ </para>
+
+ <para>
+<programlisting>
+void (*InitializeWorkerCustomScan) (CustomScanState *node,
+ shm_toc *toc,
+ void *coordinate);
+</programlisting>
+ It allows the custom scan provider to have extra initialization of
+ the <structname>CustomScanState</> on background worker processes
+ context, according to the coordination information.
+ This callback is optional, and needs only be supplied if relevant
+ <structname>CustomPath</> is added as parallel executable path.
+ </para>
+
+ <para>
+<programlisting>
void (*ExplainCustomScan) (CustomScanState *node,
List *ancestors,
ExplainState *es);
diff --git a/doc/src/sgml/fdwhandler.sgml b/doc/src/sgml/fdwhandler.sgml
index dc2d890..bf041af 100644
--- a/doc/src/sgml/fdwhandler.sgml
+++ b/doc/src/sgml/fdwhandler.sgml
@@ -955,6 +955,68 @@ ImportForeignSchema (ImportForeignSchemaStmt *stmt, Oid serverOid);
</sect2>
+ <sect2 id="fdw-callbacks-parallel">
+ <title>FDW Routines for parallelism support</title>
+ <para>
+ <structname>ForeignScan</> node can, optionally, perform coordinately
+ with concurrent and multiple background worker processes under the
+ <structname>Gather</> node. Individual <structname>ForeignScan</>
+ node scans on its data source (which may be local or remote; depending
+ on the characteristics of FDW driver) partially, then rows produced
+ shall be merged eventually. So, FDW driver has to pay attention not
+ to read same rows again.
+ The key feature to achieve this requirement is a fraction on the
+ dynamic shared memory segment; called coordination information.
+ It shall be mapped on the individual background worker process's
+ address space prior to execution, then <structname>ForeignScan</>
+ node can reference this shared state during execution on the data
+ source of this FDW.
+ </para>
+
+ <para>
+<programlisting>
+Size
+EstimateDSMForeignScan(ForeignScanState *node, ParallelContext *pcxt);
+</programlisting>
+ This function is called when <structname>Gather</> node tries to
+ execute underlying sub-plan and this FDW driver intends to perform
+ coordinately with other background worker processes.
+ The coordination structure shall be allocated on the shared memory
+ segment, then FDW driver can use this ares for inter-processes
+ communication. This function inform the core backend its required
+ length of the coordination information, and is optional if parallel
+ aware. Elsewhere, the pointer can be set to <literal>NULL</>.
+ </para>
+
+ <para>
+<programlisting>
+void
+InitializeDSMForeignScan(ForeignScanState *node, ParallelContext *pcxt,
+ void *coordinate);
+</programlisting>
+ This function is called when <structname>Gather</> node allocates
+ a dynamic shared memory segment for coordinated parallel execution.
+ It allows FDW driver to set up the supplied coordination information
+ area (<literal>coordinate</>; its length is result of
+ <functionname>EstimateDSMForeignScan</>) prior to the launch of
+ background worker processes.
+ This function is optional if parallel aware. Elsewhere, the pointer
+ can be set to <literal>NULL</>.
+ </para>
+
+ <para>
+<programlisting>
+void
+InitializeWorkerForeignScan(ForeignScanState *node, shm_toc *toc,
+ void *coordinate);
+</programlisting>
+ This function is called at beginning of the execution of background
+ worker processes. It allows FDW drivers to have extra initialization
+ of <structname>ForeignScanState</> according to the supplied
+ coordination information; which is already initialized at
+ <functionname>InitializeDSMForeignScan</> on the master process.
+ </para>
+ </sect2>
</sect1>
<sect1 id="fdw-helpers">
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 3201476..eae7418 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -137,6 +137,9 @@ typedef struct CopyStateData
const char *cur_attname; /* current att for error messages */
const char *cur_attval; /* current att value for error messages */
+ /* shared state if COPY FROM performs in partial scan mode */
+ pg_atomic_uint32 *coordinate_lineno;
+
/*
* Working state for COPY TO/FROM
*/
@@ -2891,32 +2894,46 @@ BeginCopyFrom(Relation rel,
bool
NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields)
{
+ uint32 next_lineno = 0;
int fldct;
bool done;
/* only available for text or csv input */
Assert(!cstate->binary);
+ /* get my responsible lineno if partial COPY FROM mode */
+retry:
+ if (cstate->coordinate_lineno)
+ {
+ next_lineno = pg_atomic_add_fetch_u32(cstate->coordinate_lineno, 1);
+ Assert(cstate->cur_lineno < next_lineno);
+ }
+
/* on input just throw the header line away */
if (cstate->cur_lineno == 0 && cstate->header_line)
{
cstate->cur_lineno++;
if (CopyReadLine(cstate))
return false; /* done */
+
+ if (next_lineno > 0 && next_lineno == cstate->cur_lineno)
+ goto retry;
}
- cstate->cur_lineno++;
+ do {
+ cstate->cur_lineno++;
- /* Actually read the line into memory here */
- done = CopyReadLine(cstate);
+ /* Actually read the line into memory here */
+ done = CopyReadLine(cstate);
- /*
- * EOF at start of line means we're done. If we see EOF after some
- * characters, we act as though it was newline followed by EOF, ie,
- * process the line and then exit loop on next iteration.
- */
- if (done && cstate->line_buf.len == 0)
- return false;
+ /*
+ * EOF at start of line means we're done. If we see EOF after some
+ * characters, we act as though it was newline followed by EOF, ie,
+ * process the line and then exit loop on next iteration.
+ */
+ if (done && cstate->line_buf.len == 0)
+ return false;
+ } while (next_lineno > 0 && next_lineno != cstate->cur_lineno);
/* Parse the line into de-escaped field values */
if (cstate->csv_mode)
@@ -3190,6 +3207,31 @@ EndCopyFrom(CopyState cstate)
}
/*
+ * Turn on/off partial scan mode on the COPY FROM
+ */
+void
+SetCopyFromPartial(CopyState cstate, pg_atomic_uint32 *coordinate_lineno)
+{
+ /*
+ * Only regular file with text format can support partial COPY FROM.
+ * (The binary format calls type receive functions during row scan,
+ * so it makes sense little to run in parallel.)
+ */
+ if (!cstate->filename || cstate->is_program)
+ ereport(ERROR,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("Partial COPY FROM is only supported on regular files")));
+
+ if (cstate->binary)
+ ereport(ERROR,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("Partial COPY is not supported on binary format")));
+
+ /* assign coordinate information */
+ cstate->coordinate_lineno = coordinate_lineno;
+}
+
+/*
* Read the next input line and stash it in line_buf, with conversion to
* server encoding.
*
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index c30b348..3ea8a95 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -25,6 +25,8 @@
#include "executor/execParallel.h"
#include "executor/executor.h"
+#include "executor/nodeCustom.h"
+#include "executor/nodeForeignscan.h"
#include "executor/nodeSeqscan.h"
#include "executor/tqueue.h"
#include "nodes/nodeFuncs.h"
@@ -175,6 +177,14 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
ExecSeqScanEstimate((SeqScanState *) planstate,
e->pcxt);
break;
+ case T_ForeignScanState:
+ ExecForeignScanEstimate((ForeignScanState *) planstate,
+ e->pcxt);
+ break;
+ case T_CustomScanState:
+ ExecCustomScanEstimate((CustomScanState *) planstate,
+ e->pcxt);
+ break;
default:
break;
}
@@ -219,6 +229,14 @@ ExecParallelInitializeDSM(PlanState *planstate,
ExecSeqScanInitializeDSM((SeqScanState *) planstate,
d->pcxt);
break;
+ case T_ForeignScanState:
+ ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
+ d->pcxt);
+ break;
+ case T_CustomScanState:
+ ExecCustomScanInitializeDSM((CustomScanState *) planstate,
+ d->pcxt);
+ break;
default:
break;
}
@@ -641,6 +659,14 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
case T_SeqScanState:
ExecSeqScanInitializeWorker((SeqScanState *) planstate, toc);
break;
+ case T_ForeignScanState:
+ ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
+ toc);
+ break;
+ case T_CustomScanState:
+ ExecCustomScanInitializeWorker((CustomScanState *) planstate,
+ toc);
+ break;
default:
break;
}
diff --git a/src/backend/executor/nodeCustom.c b/src/backend/executor/nodeCustom.c
index 640289e..7416f1e 100644
--- a/src/backend/executor/nodeCustom.c
+++ b/src/backend/executor/nodeCustom.c
@@ -10,6 +10,7 @@
*/
#include "postgres.h"
+#include "access/parallel.h"
#include "executor/executor.h"
#include "executor/nodeCustom.h"
#include "nodes/execnodes.h"
@@ -159,3 +160,47 @@ ExecCustomRestrPos(CustomScanState *node)
node->methods->CustomName)));
node->methods->RestrPosCustomScan(node);
}
+
+void
+ExecCustomScanEstimate(CustomScanState *node, ParallelContext *pcxt)
+{
+ const CustomExecMethods *methods = node->methods;
+
+ if (methods->EstimateDSMCustomScan)
+ {
+ node->pscan_len = methods->EstimateDSMCustomScan(node, pcxt);
+ shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+ }
+}
+
+void
+ExecCustomScanInitializeDSM(CustomScanState *node, ParallelContext *pcxt)
+{
+ const CustomExecMethods *methods = node->methods;
+
+ if (methods->InitializeDSMCustomScan)
+ {
+ int plan_node_id = node->ss.ps.plan->plan_node_id;
+ void *coordinate;
+
+ coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len);
+ methods->InitializeDSMCustomScan(node, pcxt, coordinate);
+ shm_toc_insert(pcxt->toc, plan_node_id, coordinate);
+ }
+}
+
+void
+ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc)
+{
+ const CustomExecMethods *methods = node->methods;
+
+ if (methods->InitializeWorkerCustomScan)
+ {
+ int plan_node_id = node->ss.ps.plan->plan_node_id;
+ void *coordinate;
+
+ coordinate = shm_toc_lookup(toc, plan_node_id);
+ methods->InitializeWorkerCustomScan(node, toc, coordinate);
+ }
+}
diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c
index 64a07bc..cdaa4a7 100644
--- a/src/backend/executor/nodeForeignscan.c
+++ b/src/backend/executor/nodeForeignscan.c
@@ -282,3 +282,65 @@ ExecReScanForeignScan(ForeignScanState *node)
ExecScanReScan(&node->ss);
}
+
+/* ----------------------------------------------------------------
+ * ExecForeignScanEstimate
+ *
+ * Informs size of the parallel coordination information, if any
+ * ----------------------------------------------------------------
+ */
+void
+ExecForeignScanEstimate(ForeignScanState *node, ParallelContext *pcxt)
+{
+ FdwRoutine *fdwroutine = node->fdwroutine;
+
+ if (fdwroutine->EstimateDSMForeignScan)
+ {
+ node->pscan_len = fdwroutine->EstimateDSMForeignScan(node, pcxt);
+ shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+ }
+}
+
+/* ----------------------------------------------------------------
+ * ExecForeignScanInitializeDSM
+ *
+ * Initialize the parallel coordination information
+ * ----------------------------------------------------------------
+ */
+void
+ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
+{
+ FdwRoutine *fdwroutine = node->fdwroutine;
+
+ if (fdwroutine->InitializeDSMForeignScan)
+ {
+ int plan_node_id = node->ss.ps.plan->plan_node_id;
+ void *coordinate;
+
+ coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len);
+ fdwroutine->InitializeDSMForeignScan(node, pcxt, coordinate);
+ shm_toc_insert(pcxt->toc, plan_node_id, coordinate);
+ }
+}
+
+/* ----------------------------------------------------------------
+ * ExecForeignScanInitializeDSM
+ *
+ * Initialization according to the parallel coordination information
+ * ----------------------------------------------------------------
+ */
+void
+ExecForeignScanInitializeWorker(ForeignScanState *node, shm_toc *toc)
+{
+ FdwRoutine *fdwroutine = node->fdwroutine;
+
+ if (fdwroutine->InitializeWorkerForeignScan)
+ {
+ int plan_node_id = node->ss.ps.plan->plan_node_id;
+ void *coordinate;
+
+ coordinate = shm_toc_lookup(toc, plan_node_id);
+ fdwroutine->InitializeWorkerForeignScan(node, toc, coordinate);
+ }
+}
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index 314d1f7..d5f89e1 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -16,6 +16,7 @@
#include "nodes/execnodes.h"
#include "nodes/parsenodes.h"
+#include "port/atomics.h"
#include "tcop/dest.h"
/* CopyStateData is private in commands/copy.c */
@@ -36,4 +37,7 @@ extern void CopyFromErrorCallback(void *arg);
extern DestReceiver *CreateCopyDestReceiver(void);
+extern void SetCopyFromPartial(CopyState cstate,
+ pg_atomic_uint32 *coordinate_lineno);
+
#endif /* COPY_H */
diff --git a/src/include/executor/nodeCustom.h b/src/include/executor/nodeCustom.h
index e244942..410a3ad 100644
--- a/src/include/executor/nodeCustom.h
+++ b/src/include/executor/nodeCustom.h
@@ -12,6 +12,7 @@
#ifndef NODECUSTOM_H
#define NODECUSTOM_H
+#include "access/parallel.h"
#include "nodes/execnodes.h"
/*
@@ -26,4 +27,14 @@ extern void ExecReScanCustomScan(CustomScanState *node);
extern void ExecCustomMarkPos(CustomScanState *node);
extern void ExecCustomRestrPos(CustomScanState *node);
+/*
+ * Parallel execution support
+ */
+extern void ExecCustomScanEstimate(CustomScanState *node,
+ ParallelContext *pcxt);
+extern void ExecCustomScanInitializeDSM(CustomScanState *node,
+ ParallelContext *pcxt);
+extern void ExecCustomScanInitializeWorker(CustomScanState *node,
+ shm_toc *toc);
+
#endif /* NODECUSTOM_H */
diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h
index a92ce5c..c255329 100644
--- a/src/include/executor/nodeForeignscan.h
+++ b/src/include/executor/nodeForeignscan.h
@@ -14,6 +14,7 @@
#ifndef NODEFOREIGNSCAN_H
#define NODEFOREIGNSCAN_H
+#include "access/parallel.h"
#include "nodes/execnodes.h"
extern ForeignScanState *ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags);
@@ -21,4 +22,11 @@ extern TupleTableSlot *ExecForeignScan(ForeignScanState *node);
extern void ExecEndForeignScan(ForeignScanState *node);
extern void ExecReScanForeignScan(ForeignScanState *node);
+extern void ExecForeignScanEstimate(ForeignScanState *node,
+ ParallelContext *pcxt);
+extern void ExecForeignScanInitializeDSM(ForeignScanState *node,
+ ParallelContext *pcxt);
+extern void ExecForeignScanInitializeWorker(ForeignScanState *node,
+ shm_toc *toc);
+
#endif /* NODEFOREIGNSCAN_H */
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index db73233..e16fbf3 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -12,6 +12,7 @@
#ifndef FDWAPI_H
#define FDWAPI_H
+#include "access/parallel.h"
#include "nodes/execnodes.h"
#include "nodes/relation.h"
@@ -122,6 +123,14 @@ typedef bool (*AnalyzeForeignTable_function) (Relation relation,
typedef List *(*ImportForeignSchema_function) (ImportForeignSchemaStmt *stmt,
Oid serverOid);
+typedef Size (*EstimateDSMForeignScan_function) (ForeignScanState *node,
+ ParallelContext *pcxt);
+typedef void (*InitializeDSMForeignScan_function) (ForeignScanState *node,
+ ParallelContext *pcxt,
+ void *coordinate);
+typedef void (*InitializeWorkerForeignScan_function) (ForeignScanState *node,
+ shm_toc *toc,
+ void *coordinate);
/*
* FdwRoutine is the struct returned by a foreign-data wrapper's handler
* function. It provides pointers to the callback functions needed by the
@@ -177,6 +186,11 @@ typedef struct FdwRoutine
/* Support functions for IMPORT FOREIGN SCHEMA */
ImportForeignSchema_function ImportForeignSchema;
+
+ /* Support functions for parallelism under Gather node */
+ EstimateDSMForeignScan_function EstimateDSMForeignScan;
+ InitializeDSMForeignScan_function InitializeDSMForeignScan;
+ InitializeWorkerForeignScan_function InitializeWorkerForeignScan;
} FdwRoutine;
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 07cd20a..ab9f80f 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1585,6 +1585,7 @@ typedef struct ForeignScanState
{
ScanState ss; /* its first field is NodeTag */
List *fdw_recheck_quals; /* original quals not in ss.ps.qual */
+ Size pscan_len; /* size of parallel coordination information */
/* use struct pointer to avoid including fdwapi.h here */
struct FdwRoutine *fdwroutine;
void *fdw_state; /* foreign-data wrapper can keep state here */
@@ -1603,6 +1604,8 @@ typedef struct ForeignScanState
* the BeginCustomScan method.
* ----------------
*/
+struct ParallelContext; /* avoid including parallel.h here */
+struct shm_toc; /* avoid including shm_toc.h here */
struct ExplainState; /* avoid including explain.h here */
struct CustomScanState;
@@ -1619,7 +1622,15 @@ typedef struct CustomExecMethods
void (*ReScanCustomScan) (struct CustomScanState *node);
void (*MarkPosCustomScan) (struct CustomScanState *node);
void (*RestrPosCustomScan) (struct CustomScanState *node);
-
+ /* Optional: parallel execution support */
+ Size (*EstimateDSMCustomScan) (struct CustomScanState *node,
+ struct ParallelContext *pcxt);
+ void (*InitializeDSMCustomScan) (struct CustomScanState *node,
+ struct ParallelContext *pcxt,
+ void *coordinate);
+ void (*InitializeWorkerCustomScan) (struct CustomScanState *node,
+ struct shm_toc *toc,
+ void *coordinate);
/* Optional: print additional information in EXPLAIN */
void (*ExplainCustomScan) (struct CustomScanState *node,
List *ancestors,
@@ -1631,6 +1642,7 @@ typedef struct CustomScanState
ScanState ss;
uint32 flags; /* mask of CUSTOMPATH_* flags, see relation.h */
List *custom_ps; /* list of child PlanState nodes, if any */
+ Size pscan_len; /* size of parallel coordination information */
const CustomExecMethods *methods;
} CustomScanState;
Import Notes
Resolved by subject fallback
On Thu, Jan 28, 2016 at 10:50 AM, Kouhei Kaigai <kaigai@ak.jp.nec.com> wrote:
If I would make a proof-of-concept patch with interface itself, it
seems to me file_fdw may be a good candidate for this enhancement.
It is not a field for postgres_fdw.The attached patch is enhancement of FDW/CSP interface and PoC feature
of file_fdw to scan source file partially. It was smaller enhancement
than my expectations.It works as follows. This query tried to read 20M rows from a CSV file,
using 3 background worker processes.postgres=# set max_parallel_degree = 3;
SET
postgres=# explain analyze select * from test_csv where id % 20 = 6;
QUERY PLAN
--------------------------------------------------------------------------------
Gather (cost=1000.00..194108.60 rows=94056 width=52)
(actual time=0.570..19268.010 rows=2000000 loops=1)
Number of Workers: 3
-> Parallel Foreign Scan on test_csv (cost=0.00..183703.00 rows=94056 width=52)
(actual time=0.180..12744.655 rows=500000 loops=4)
Filter: ((id % 20) = 6)
Rows Removed by Filter: 9500000
Foreign File: /tmp/testdata.csv
Foreign File Size: 1504892535
Planning time: 0.147 ms
Execution time: 19330.201 ms
(9 rows)
Could you try it not in parallel and then with 1, 2, 3, and 4 workers
and post the times for all?
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Thu, Jan 28, 2016 at 10:50 AM, Kouhei Kaigai <kaigai@ak.jp.nec.com> wrote:
If I would make a proof-of-concept patch with interface itself, it
seems to me file_fdw may be a good candidate for this enhancement.
It is not a field for postgres_fdw.The attached patch is enhancement of FDW/CSP interface and PoC feature
of file_fdw to scan source file partially. It was smaller enhancement
than my expectations.It works as follows. This query tried to read 20M rows from a CSV file,
using 3 background worker processes.postgres=# set max_parallel_degree = 3;
SET
postgres=# explain analyze select * from test_csv where id % 20 = 6;
QUERY PLAN----------------------------------------------------------------------------
----Gather (cost=1000.00..194108.60 rows=94056 width=52)
(actual time=0.570..19268.010 rows=2000000 loops=1)
Number of Workers: 3
-> Parallel Foreign Scan on test_csv (cost=0.00..183703.00 rows=94056width=52)
(actual time=0.180..12744.655 rows=500000
loops=4)
Filter: ((id % 20) = 6)
Rows Removed by Filter: 9500000
Foreign File: /tmp/testdata.csv
Foreign File Size: 1504892535
Planning time: 0.147 ms
Execution time: 19330.201 ms
(9 rows)Could you try it not in parallel and then with 1, 2, 3, and 4 workers
and post the times for all?
The above query has 5% selectivity on the entire CSV file.
Its execution time (total, only ForeignScan) are below
total ForeignScan diff
0 workers: 17584.319 ms 17555.904 ms 28.415 ms
1 workers: 18464.476 ms 18110.968 ms 353.508 ms
2 workers: 19042.755 ms 14580.335 ms 4462.420 ms
3 workers: 19318.254 ms 12668.912 ms 6649.342 ms
4 workers: 21732.910 ms 13596.788 ms 8136.122 ms
5 workers: 23486.846 ms 14533.409 ms 8953.437 ms
This workstation has 4 CPU cores, so it is natural nworkers=3 records the
peak performance on ForeignScan portion. On the other hands, nworkers>1 also
recorded unignorable time consumption (probably, by Gather node?)
An interesting observation was, less selectivity (1% and 0%) didn't change the
result so much. Something consumes CPU time other than file_fdw.
* selectivity 1%
total ForeignScan diff
0 workers: 17573.572 ms 17566.875 ms 6.697 ms
1 workers: 18098.070 ms 18020.790 ms 77.280 ms
2 workers: 18676.078 ms 14600.749 ms 4075.329 ms
3 workers: 18830.597 ms 12731.459 ms 6099.138 ms
4 workers: 21015.842 ms 13590.657 ms 7425.185 ms
5 workers: 22865.496 ms 14634.342 ms 8231.154 ms
* selectivity 0% (...so Gather didn't work hard actually)
total ForeignScan diff
0 workers: 17551.011 ms 17550.811 ms 0.200 ms
1 workers: 18055.185 ms 18048.975 ms 6.210 ms
2 workers: 18567.660 ms 14593.974 ms 3973.686 ms
3 workers: 18649.819 ms 12671.429 ms 5978.390 ms
4 workers: 20619.184 ms 13606.715 ms 7012.469 ms
5 workers: 22557.575 ms 14594.420 ms 7963.155 ms
Further investigation will need....
Thanks,
--
NEC Business Creation Division / PG-Strom Project
KaiGai Kohei <kaigai@ak.jp.nec.com>
Import Notes
Resolved by subject fallback
total ForeignScan diff
0 workers: 17584.319 ms 17555.904 ms 28.415 ms
1 workers: 18464.476 ms 18110.968 ms 353.508 ms
2 workers: 19042.755 ms 14580.335 ms 4462.420 ms
3 workers: 19318.254 ms 12668.912 ms 6649.342 ms
4 workers: 21732.910 ms 13596.788 ms 8136.122 ms
5 workers: 23486.846 ms 14533.409 ms 8953.437 msThis workstation has 4 CPU cores, so it is natural nworkers=3 records the
peak performance on ForeignScan portion. On the other hands, nworkers>1 also
recorded unignorable time consumption (probably, by Gather node?)
:
Further investigation will need....
It was a bug of my file_fdw patch. ForeignScan node in the master process was
also kicked by the Gather node, however, it didn't have coordinate information
due to oversight of the initialization at InitializeDSMForeignScan callback.
In the result, local ForeignScan node is still executed after the completion
of coordinated background worker processes, and returned twice amount of rows.
In the revised patch, results seems to me reasonable.
total ForeignScan diff
0 workers: 17592.498 ms 17564.457 ms 28.041ms
1 workers: 12152.998 ms 11983.485 ms 169.513 ms
2 workers: 10647.858 ms 10502.100 ms 145.758 ms
3 workers: 9635.445 ms 9509.899 ms 125.546 ms
4 workers: 11175.456 ms 10863.293 ms 312.163 ms
5 workers: 12586.457 ms 12279.323 ms 307.134 ms
Thanks,
--
NEC Business Creation Division / PG-Strom Project
KaiGai Kohei <kaigai@ak.jp.nec.com>
Show quoted text
-----Original Message-----
From: pgsql-hackers-owner@postgresql.org
[mailto:pgsql-hackers-owner@postgresql.org] On Behalf Of Kouhei Kaigai
Sent: Friday, January 29, 2016 8:51 AM
To: Robert Haas
Cc: pgsql-hackers@postgresql.org
Subject: Re: [HACKERS] CustomScan under the Gather node?On Thu, Jan 28, 2016 at 10:50 AM, Kouhei Kaigai <kaigai@ak.jp.nec.com> wrote:
If I would make a proof-of-concept patch with interface itself, it
seems to me file_fdw may be a good candidate for this enhancement.
It is not a field for postgres_fdw.The attached patch is enhancement of FDW/CSP interface and PoC feature
of file_fdw to scan source file partially. It was smaller enhancement
than my expectations.It works as follows. This query tried to read 20M rows from a CSV file,
using 3 background worker processes.postgres=# set max_parallel_degree = 3;
SET
postgres=# explain analyze select * from test_csv where id % 20 = 6;
QUERY PLAN----------------------------------------------------------------------------
----
Gather (cost=1000.00..194108.60 rows=94056 width=52)
(actual time=0.570..19268.010 rows=2000000 loops=1)
Number of Workers: 3
-> Parallel Foreign Scan on test_csv (cost=0.00..183703.00 rows=94056width=52)
(actual time=0.180..12744.655
rows=500000
loops=4)
Filter: ((id % 20) = 6)
Rows Removed by Filter: 9500000
Foreign File: /tmp/testdata.csv
Foreign File Size: 1504892535
Planning time: 0.147 ms
Execution time: 19330.201 ms
(9 rows)Could you try it not in parallel and then with 1, 2, 3, and 4 workers
and post the times for all?The above query has 5% selectivity on the entire CSV file.
Its execution time (total, only ForeignScan) are belowtotal ForeignScan diff
0 workers: 17584.319 ms 17555.904 ms 28.415 ms
1 workers: 18464.476 ms 18110.968 ms 353.508 ms
2 workers: 19042.755 ms 14580.335 ms 4462.420 ms
3 workers: 19318.254 ms 12668.912 ms 6649.342 ms
4 workers: 21732.910 ms 13596.788 ms 8136.122 ms
5 workers: 23486.846 ms 14533.409 ms 8953.437 msThis workstation has 4 CPU cores, so it is natural nworkers=3 records the
peak performance on ForeignScan portion. On the other hands, nworkers>1 also
recorded unignorable time consumption (probably, by Gather node?)An interesting observation was, less selectivity (1% and 0%) didn't change the
result so much. Something consumes CPU time other than file_fdw.* selectivity 1%
total ForeignScan diff
0 workers: 17573.572 ms 17566.875 ms 6.697 ms
1 workers: 18098.070 ms 18020.790 ms 77.280 ms
2 workers: 18676.078 ms 14600.749 ms 4075.329 ms
3 workers: 18830.597 ms 12731.459 ms 6099.138 ms
4 workers: 21015.842 ms 13590.657 ms 7425.185 ms
5 workers: 22865.496 ms 14634.342 ms 8231.154 ms* selectivity 0% (...so Gather didn't work hard actually)
total ForeignScan diff
0 workers: 17551.011 ms 17550.811 ms 0.200 ms
1 workers: 18055.185 ms 18048.975 ms 6.210 ms
2 workers: 18567.660 ms 14593.974 ms 3973.686 ms
3 workers: 18649.819 ms 12671.429 ms 5978.390 ms
4 workers: 20619.184 ms 13606.715 ms 7012.469 ms
5 workers: 22557.575 ms 14594.420 ms 7963.155 msFurther investigation will need....
Thanks,
--
NEC Business Creation Division / PG-Strom Project
KaiGai Kohei <kaigai@ak.jp.nec.com>
Attachments:
pgsql-v9.6-parallel-cspfdw.v2.patchapplication/octet-stream; name=pgsql-v9.6-parallel-cspfdw.v2.patchDownload
contrib/file_fdw/file_fdw.c | 130 +++++++++++++++++++++++++++++++--
doc/src/sgml/custom-scan.sgml | 44 +++++++++++
doc/src/sgml/fdwhandler.sgml | 62 ++++++++++++++++
src/backend/commands/copy.c | 62 +++++++++++++---
src/backend/executor/execParallel.c | 26 +++++++
src/backend/executor/nodeCustom.c | 45 ++++++++++++
src/backend/executor/nodeForeignscan.c | 62 ++++++++++++++++
src/include/commands/copy.h | 4 +
src/include/executor/nodeCustom.h | 11 +++
src/include/executor/nodeForeignscan.h | 8 ++
src/include/foreign/fdwapi.h | 14 ++++
src/include/nodes/execnodes.h | 14 +++-
12 files changed, 466 insertions(+), 16 deletions(-)
diff --git a/contrib/file_fdw/file_fdw.c b/contrib/file_fdw/file_fdw.c
index f13316b..d7d310a 100644
--- a/contrib/file_fdw/file_fdw.c
+++ b/contrib/file_fdw/file_fdw.c
@@ -29,6 +29,7 @@
#include "nodes/makefuncs.h"
#include "optimizer/cost.h"
#include "optimizer/pathnode.h"
+#include "optimizer/paths.h"
#include "optimizer/planmain.h"
#include "optimizer/restrictinfo.h"
#include "optimizer/var.h"
@@ -99,9 +100,19 @@ typedef struct FileFdwExecutionState
char *filename; /* file to read */
List *options; /* merged COPY options, excluding filename */
CopyState cstate; /* state of reading file */
+ pg_atomic_flag *eof_reach; /* coordinate information if parallel mode */
} FileFdwExecutionState;
/*
+ * FDW-specific information for parallel coordination
+ */
+typedef struct FileFdwCoordinate
+{
+ pg_atomic_uint32 next_lineno; /* needed for NextCopyFrom */
+ pg_atomic_flag eof_reach; /* no need to scan any more, if true */
+} FileFdwCoordinate;
+
+/*
* SQL functions
*/
PG_FUNCTION_INFO_V1(file_fdw_handler);
@@ -131,6 +142,14 @@ static void fileEndForeignScan(ForeignScanState *node);
static bool fileAnalyzeForeignTable(Relation relation,
AcquireSampleRowsFunc *func,
BlockNumber *totalpages);
+static Size fileEstimateDSMForeignScan(ForeignScanState *node,
+ ParallelContext *pcxt);
+static void fileInitializeDSMForeignScan(ForeignScanState *node,
+ ParallelContext *pcxt,
+ void *coordinate);
+static void fileInitializeWorkerForeignScan(ForeignScanState *node,
+ shm_toc *toc,
+ void *coordinate);
/*
* Helper functions
@@ -170,6 +189,9 @@ file_fdw_handler(PG_FUNCTION_ARGS)
fdwroutine->ReScanForeignScan = fileReScanForeignScan;
fdwroutine->EndForeignScan = fileEndForeignScan;
fdwroutine->AnalyzeForeignTable = fileAnalyzeForeignTable;
+ fdwroutine->EstimateDSMForeignScan = fileEstimateDSMForeignScan;
+ fdwroutine->InitializeDSMForeignScan = fileInitializeDSMForeignScan;
+ fdwroutine->InitializeWorkerForeignScan = fileInitializeWorkerForeignScan;
PG_RETURN_POINTER(fdwroutine);
}
@@ -502,6 +524,8 @@ fileGetForeignPaths(PlannerInfo *root,
Cost total_cost;
List *columns;
List *coptions = NIL;
+ int parallel_threshold = 1000;
+ int parallel_degree = 1;
/* Decide whether to selectively perform binary conversion */
if (check_selective_binary_conversion(baserel,
@@ -530,6 +554,53 @@ fileGetForeignPaths(PlannerInfo *root,
coptions));
/*
+ * Also add a pair of GatherPath and ForeignPath if it is capable to
+ * run in parallel. See the logic in create_parallel_paths().
+ */
+ if (baserel->consider_parallel &&
+ fdw_private->pages >= parallel_threshold)
+ {
+ Path *partial_path;
+ Cost cpu_per_tuple;
+
+ while (fdw_private->pages > parallel_threshold * 3 &&
+ parallel_degree < max_parallel_degree)
+ {
+ parallel_degree++;
+ parallel_threshold *= 3;
+ if (parallel_threshold >= PG_INT32_MAX / 3)
+ break;
+ }
+
+ /*
+ * adjust the cost - we assume a part of the total_cost consumed
+ * by the cpu_per_tuple can be reduced, but inter-processes
+ * communication cost (added by create_gather_path) will be added.
+ */
+ cpu_per_tuple = (10 * cpu_tuple_cost +
+ baserel->baserestrictcost.per_tuple);
+ total_cost -= cpu_per_tuple * fdw_private->ntuples;
+
+ /* make a parallel aware pathnode */
+ partial_path = (Path *)
+ create_foreignscan_path(root, baserel,
+ baserel->rows,
+ startup_cost,
+ total_cost,
+ NIL, /* no pathkeys */
+ NULL, /* no outer rel either */
+ NULL, /* no extra plan */
+ coptions);
+ partial_path->parallel_aware = parallel_degree > 0 ? true : false;
+ partial_path->parallel_safe = baserel->consider_parallel;
+ partial_path->parallel_degree = parallel_degree;
+
+ /* add a partial path and makes gather path */
+ add_partial_path(baserel, partial_path);
+ generate_gather_paths(root, baserel);
+ }
+
+ /*
* If data file was sorted, and we knew it somehow, we could insert
* appropriate pathkeys into the ForeignPath node to tell the planner
* that.
@@ -678,12 +749,26 @@ fileIterateForeignScan(ForeignScanState *node)
* foreign tables.
*/
ExecClearTuple(slot);
- found = NextCopyFrom(festate->cstate, NULL,
- slot->tts_values, slot->tts_isnull,
- NULL);
- if (found)
- ExecStoreVirtualTuple(slot);
+ /*
+ * NOTE: If and when file_fdw runs under the Gather node coordinately,
+ * we don't need to read any extra lines once other background workers
+ * already reached end of the file. Especially, Gather node also kicks
+ * underlying SubPlan in the master process. Unlike SeqScan case, we
+ * cannot determine the current line number unless we don't actually
+ * read the source file. So, we check eof_reach on DSM segment first.
+ */
+ if (!festate->eof_reach ||
+ pg_atomic_unlocked_test_flag(festate->eof_reach))
+ {
+ found = NextCopyFrom(festate->cstate, NULL,
+ slot->tts_values, slot->tts_isnull,
+ NULL);
+ if (found)
+ ExecStoreVirtualTuple(slot);
+ else if (festate->eof_reach)
+ pg_atomic_test_set_flag(festate->eof_reach);
+ }
/* Remove error callback. */
error_context_stack = errcallback.previous;
@@ -761,6 +846,41 @@ fileAnalyzeForeignTable(Relation relation,
return true;
}
+static Size
+fileEstimateDSMForeignScan(ForeignScanState *node,
+ ParallelContext *pcxt)
+{
+ return sizeof(FileFdwCoordinate);
+}
+
+static void
+fileInitializeDSMForeignScan(ForeignScanState *node,
+ ParallelContext *pcxt,
+ void *__coordinate)
+{
+ FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
+ FileFdwCoordinate *coordinate = __coordinate;
+
+ pg_atomic_write_u32(&coordinate->next_lineno, 0);
+ pg_atomic_init_flag(&coordinate->eof_reach);
+
+ /* initialization for master process */
+ SetCopyFromPartial(festate->cstate, &coordinate->next_lineno);
+ festate->eof_reach = &coordinate->eof_reach;
+}
+
+static void
+fileInitializeWorkerForeignScan(ForeignScanState *node,
+ shm_toc *toc,
+ void *__coordinate)
+{
+ FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
+ FileFdwCoordinate *coordinate = __coordinate;
+
+ SetCopyFromPartial(festate->cstate, &coordinate->next_lineno);
+ festate->eof_reach = &coordinate->eof_reach;
+}
+
/*
* check_selective_binary_conversion
*
diff --git a/doc/src/sgml/custom-scan.sgml b/doc/src/sgml/custom-scan.sgml
index 5bba125..efe6930 100644
--- a/doc/src/sgml/custom-scan.sgml
+++ b/doc/src/sgml/custom-scan.sgml
@@ -316,6 +316,50 @@ void (*RestrPosCustomScan) (CustomScanState *node);
<para>
<programlisting>
+Size (*EstimateDSMCustomScan) (CustomScanState *node,
+ ParallelContext *pcxt);
+</programlisting>
+ It
+
+ This callback is called when <structname>Gather</> node tries to
+ execute underlying sub-plan and this custom scan provider intends
+ to perform coordinately with other background worker processes.
+ The coordination structure shall be allocated on the shared memory
+ segment, then custom scan provider can use this ares for inter-
+ processes communication. This callback informs the core backend its
+ required length of the coordination information, and is optional if
+ parallel aware. Elsewhere, the pointer can be set to <literal>NULL</>.
+ </para>
+
+ <para>
+<programlisting>
+void (*InitializeDSMCustomScan) (CustomScanState *node,
+ ParallelContext *pcxt,
+ void *coordinate);
+</programlisting>
+
+ It set up the coordination information, with length which was informed
+ by <functionname>EstimateDSMCustomScan</>, to be referenced by the
+ background worker processes under the <structname>Gather</> node.
+ This callback is optional, and needs only be supplied if relevant
+ <structname>CustomPath</> is added as parallel executable path.
+ </para>
+
+ <para>
+<programlisting>
+void (*InitializeWorkerCustomScan) (CustomScanState *node,
+ shm_toc *toc,
+ void *coordinate);
+</programlisting>
+ It allows the custom scan provider to have extra initialization of
+ the <structname>CustomScanState</> on background worker processes
+ context, according to the coordination information.
+ This callback is optional, and needs only be supplied if relevant
+ <structname>CustomPath</> is added as parallel executable path.
+ </para>
+
+ <para>
+<programlisting>
void (*ExplainCustomScan) (CustomScanState *node,
List *ancestors,
ExplainState *es);
diff --git a/doc/src/sgml/fdwhandler.sgml b/doc/src/sgml/fdwhandler.sgml
index dc2d890..bf041af 100644
--- a/doc/src/sgml/fdwhandler.sgml
+++ b/doc/src/sgml/fdwhandler.sgml
@@ -955,6 +955,68 @@ ImportForeignSchema (ImportForeignSchemaStmt *stmt, Oid serverOid);
</sect2>
+ <sect2 id="fdw-callbacks-parallel">
+ <title>FDW Routines for parallelism support</title>
+ <para>
+ <structname>ForeignScan</> node can, optionally, perform coordinately
+ with concurrent and multiple background worker processes under the
+ <structname>Gather</> node. Individual <structname>ForeignScan</>
+ node scans on its data source (which may be local or remote; depending
+ on the characteristics of FDW driver) partially, then rows produced
+ shall be merged eventually. So, FDW driver has to pay attention not
+ to read same rows again.
+ The key feature to achieve this requirement is a fraction on the
+ dynamic shared memory segment; called coordination information.
+ It shall be mapped on the individual background worker process's
+ address space prior to execution, then <structname>ForeignScan</>
+ node can reference this shared state during execution on the data
+ source of this FDW.
+ </para>
+
+ <para>
+<programlisting>
+Size
+EstimateDSMForeignScan(ForeignScanState *node, ParallelContext *pcxt);
+</programlisting>
+ This function is called when <structname>Gather</> node tries to
+ execute underlying sub-plan and this FDW driver intends to perform
+ coordinately with other background worker processes.
+ The coordination structure shall be allocated on the shared memory
+ segment, then FDW driver can use this ares for inter-processes
+ communication. This function inform the core backend its required
+ length of the coordination information, and is optional if parallel
+ aware. Elsewhere, the pointer can be set to <literal>NULL</>.
+ </para>
+
+ <para>
+<programlisting>
+void
+InitializeDSMForeignScan(ForeignScanState *node, ParallelContext *pcxt,
+ void *coordinate);
+</programlisting>
+ This function is called when <structname>Gather</> node allocates
+ a dynamic shared memory segment for coordinated parallel execution.
+ It allows FDW driver to set up the supplied coordination information
+ area (<literal>coordinate</>; its length is result of
+ <functionname>EstimateDSMForeignScan</>) prior to the launch of
+ background worker processes.
+ This function is optional if parallel aware. Elsewhere, the pointer
+ can be set to <literal>NULL</>.
+ </para>
+
+ <para>
+<programlisting>
+void
+InitializeWorkerForeignScan(ForeignScanState *node, shm_toc *toc,
+ void *coordinate);
+</programlisting>
+ This function is called at beginning of the execution of background
+ worker processes. It allows FDW drivers to have extra initialization
+ of <structname>ForeignScanState</> according to the supplied
+ coordination information; which is already initialized at
+ <functionname>InitializeDSMForeignScan</> on the master process.
+ </para>
+ </sect2>
</sect1>
<sect1 id="fdw-helpers">
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 3201476..eae7418 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -137,6 +137,9 @@ typedef struct CopyStateData
const char *cur_attname; /* current att for error messages */
const char *cur_attval; /* current att value for error messages */
+ /* shared state if COPY FROM performs in partial scan mode */
+ pg_atomic_uint32 *coordinate_lineno;
+
/*
* Working state for COPY TO/FROM
*/
@@ -2891,32 +2894,46 @@ BeginCopyFrom(Relation rel,
bool
NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields)
{
+ uint32 next_lineno = 0;
int fldct;
bool done;
/* only available for text or csv input */
Assert(!cstate->binary);
+ /* get my responsible lineno if partial COPY FROM mode */
+retry:
+ if (cstate->coordinate_lineno)
+ {
+ next_lineno = pg_atomic_add_fetch_u32(cstate->coordinate_lineno, 1);
+ Assert(cstate->cur_lineno < next_lineno);
+ }
+
/* on input just throw the header line away */
if (cstate->cur_lineno == 0 && cstate->header_line)
{
cstate->cur_lineno++;
if (CopyReadLine(cstate))
return false; /* done */
+
+ if (next_lineno > 0 && next_lineno == cstate->cur_lineno)
+ goto retry;
}
- cstate->cur_lineno++;
+ do {
+ cstate->cur_lineno++;
- /* Actually read the line into memory here */
- done = CopyReadLine(cstate);
+ /* Actually read the line into memory here */
+ done = CopyReadLine(cstate);
- /*
- * EOF at start of line means we're done. If we see EOF after some
- * characters, we act as though it was newline followed by EOF, ie,
- * process the line and then exit loop on next iteration.
- */
- if (done && cstate->line_buf.len == 0)
- return false;
+ /*
+ * EOF at start of line means we're done. If we see EOF after some
+ * characters, we act as though it was newline followed by EOF, ie,
+ * process the line and then exit loop on next iteration.
+ */
+ if (done && cstate->line_buf.len == 0)
+ return false;
+ } while (next_lineno > 0 && next_lineno != cstate->cur_lineno);
/* Parse the line into de-escaped field values */
if (cstate->csv_mode)
@@ -3190,6 +3207,31 @@ EndCopyFrom(CopyState cstate)
}
/*
+ * Turn on/off partial scan mode on the COPY FROM
+ */
+void
+SetCopyFromPartial(CopyState cstate, pg_atomic_uint32 *coordinate_lineno)
+{
+ /*
+ * Only regular file with text format can support partial COPY FROM.
+ * (The binary format calls type receive functions during row scan,
+ * so it makes sense little to run in parallel.)
+ */
+ if (!cstate->filename || cstate->is_program)
+ ereport(ERROR,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("Partial COPY FROM is only supported on regular files")));
+
+ if (cstate->binary)
+ ereport(ERROR,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("Partial COPY is not supported on binary format")));
+
+ /* assign coordinate information */
+ cstate->coordinate_lineno = coordinate_lineno;
+}
+
+/*
* Read the next input line and stash it in line_buf, with conversion to
* server encoding.
*
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index c30b348..3ea8a95 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -25,6 +25,8 @@
#include "executor/execParallel.h"
#include "executor/executor.h"
+#include "executor/nodeCustom.h"
+#include "executor/nodeForeignscan.h"
#include "executor/nodeSeqscan.h"
#include "executor/tqueue.h"
#include "nodes/nodeFuncs.h"
@@ -175,6 +177,14 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
ExecSeqScanEstimate((SeqScanState *) planstate,
e->pcxt);
break;
+ case T_ForeignScanState:
+ ExecForeignScanEstimate((ForeignScanState *) planstate,
+ e->pcxt);
+ break;
+ case T_CustomScanState:
+ ExecCustomScanEstimate((CustomScanState *) planstate,
+ e->pcxt);
+ break;
default:
break;
}
@@ -219,6 +229,14 @@ ExecParallelInitializeDSM(PlanState *planstate,
ExecSeqScanInitializeDSM((SeqScanState *) planstate,
d->pcxt);
break;
+ case T_ForeignScanState:
+ ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
+ d->pcxt);
+ break;
+ case T_CustomScanState:
+ ExecCustomScanInitializeDSM((CustomScanState *) planstate,
+ d->pcxt);
+ break;
default:
break;
}
@@ -641,6 +659,14 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
case T_SeqScanState:
ExecSeqScanInitializeWorker((SeqScanState *) planstate, toc);
break;
+ case T_ForeignScanState:
+ ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
+ toc);
+ break;
+ case T_CustomScanState:
+ ExecCustomScanInitializeWorker((CustomScanState *) planstate,
+ toc);
+ break;
default:
break;
}
diff --git a/src/backend/executor/nodeCustom.c b/src/backend/executor/nodeCustom.c
index 640289e..7416f1e 100644
--- a/src/backend/executor/nodeCustom.c
+++ b/src/backend/executor/nodeCustom.c
@@ -10,6 +10,7 @@
*/
#include "postgres.h"
+#include "access/parallel.h"
#include "executor/executor.h"
#include "executor/nodeCustom.h"
#include "nodes/execnodes.h"
@@ -159,3 +160,47 @@ ExecCustomRestrPos(CustomScanState *node)
node->methods->CustomName)));
node->methods->RestrPosCustomScan(node);
}
+
+void
+ExecCustomScanEstimate(CustomScanState *node, ParallelContext *pcxt)
+{
+ const CustomExecMethods *methods = node->methods;
+
+ if (methods->EstimateDSMCustomScan)
+ {
+ node->pscan_len = methods->EstimateDSMCustomScan(node, pcxt);
+ shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+ }
+}
+
+void
+ExecCustomScanInitializeDSM(CustomScanState *node, ParallelContext *pcxt)
+{
+ const CustomExecMethods *methods = node->methods;
+
+ if (methods->InitializeDSMCustomScan)
+ {
+ int plan_node_id = node->ss.ps.plan->plan_node_id;
+ void *coordinate;
+
+ coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len);
+ methods->InitializeDSMCustomScan(node, pcxt, coordinate);
+ shm_toc_insert(pcxt->toc, plan_node_id, coordinate);
+ }
+}
+
+void
+ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc)
+{
+ const CustomExecMethods *methods = node->methods;
+
+ if (methods->InitializeWorkerCustomScan)
+ {
+ int plan_node_id = node->ss.ps.plan->plan_node_id;
+ void *coordinate;
+
+ coordinate = shm_toc_lookup(toc, plan_node_id);
+ methods->InitializeWorkerCustomScan(node, toc, coordinate);
+ }
+}
diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c
index 64a07bc..cdaa4a7 100644
--- a/src/backend/executor/nodeForeignscan.c
+++ b/src/backend/executor/nodeForeignscan.c
@@ -282,3 +282,65 @@ ExecReScanForeignScan(ForeignScanState *node)
ExecScanReScan(&node->ss);
}
+
+/* ----------------------------------------------------------------
+ * ExecForeignScanEstimate
+ *
+ * Informs size of the parallel coordination information, if any
+ * ----------------------------------------------------------------
+ */
+void
+ExecForeignScanEstimate(ForeignScanState *node, ParallelContext *pcxt)
+{
+ FdwRoutine *fdwroutine = node->fdwroutine;
+
+ if (fdwroutine->EstimateDSMForeignScan)
+ {
+ node->pscan_len = fdwroutine->EstimateDSMForeignScan(node, pcxt);
+ shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+ }
+}
+
+/* ----------------------------------------------------------------
+ * ExecForeignScanInitializeDSM
+ *
+ * Initialize the parallel coordination information
+ * ----------------------------------------------------------------
+ */
+void
+ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
+{
+ FdwRoutine *fdwroutine = node->fdwroutine;
+
+ if (fdwroutine->InitializeDSMForeignScan)
+ {
+ int plan_node_id = node->ss.ps.plan->plan_node_id;
+ void *coordinate;
+
+ coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len);
+ fdwroutine->InitializeDSMForeignScan(node, pcxt, coordinate);
+ shm_toc_insert(pcxt->toc, plan_node_id, coordinate);
+ }
+}
+
+/* ----------------------------------------------------------------
+ * ExecForeignScanInitializeDSM
+ *
+ * Initialization according to the parallel coordination information
+ * ----------------------------------------------------------------
+ */
+void
+ExecForeignScanInitializeWorker(ForeignScanState *node, shm_toc *toc)
+{
+ FdwRoutine *fdwroutine = node->fdwroutine;
+
+ if (fdwroutine->InitializeWorkerForeignScan)
+ {
+ int plan_node_id = node->ss.ps.plan->plan_node_id;
+ void *coordinate;
+
+ coordinate = shm_toc_lookup(toc, plan_node_id);
+ fdwroutine->InitializeWorkerForeignScan(node, toc, coordinate);
+ }
+}
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index 314d1f7..d5f89e1 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -16,6 +16,7 @@
#include "nodes/execnodes.h"
#include "nodes/parsenodes.h"
+#include "port/atomics.h"
#include "tcop/dest.h"
/* CopyStateData is private in commands/copy.c */
@@ -36,4 +37,7 @@ extern void CopyFromErrorCallback(void *arg);
extern DestReceiver *CreateCopyDestReceiver(void);
+extern void SetCopyFromPartial(CopyState cstate,
+ pg_atomic_uint32 *coordinate_lineno);
+
#endif /* COPY_H */
diff --git a/src/include/executor/nodeCustom.h b/src/include/executor/nodeCustom.h
index e244942..410a3ad 100644
--- a/src/include/executor/nodeCustom.h
+++ b/src/include/executor/nodeCustom.h
@@ -12,6 +12,7 @@
#ifndef NODECUSTOM_H
#define NODECUSTOM_H
+#include "access/parallel.h"
#include "nodes/execnodes.h"
/*
@@ -26,4 +27,14 @@ extern void ExecReScanCustomScan(CustomScanState *node);
extern void ExecCustomMarkPos(CustomScanState *node);
extern void ExecCustomRestrPos(CustomScanState *node);
+/*
+ * Parallel execution support
+ */
+extern void ExecCustomScanEstimate(CustomScanState *node,
+ ParallelContext *pcxt);
+extern void ExecCustomScanInitializeDSM(CustomScanState *node,
+ ParallelContext *pcxt);
+extern void ExecCustomScanInitializeWorker(CustomScanState *node,
+ shm_toc *toc);
+
#endif /* NODECUSTOM_H */
diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h
index a92ce5c..c255329 100644
--- a/src/include/executor/nodeForeignscan.h
+++ b/src/include/executor/nodeForeignscan.h
@@ -14,6 +14,7 @@
#ifndef NODEFOREIGNSCAN_H
#define NODEFOREIGNSCAN_H
+#include "access/parallel.h"
#include "nodes/execnodes.h"
extern ForeignScanState *ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags);
@@ -21,4 +22,11 @@ extern TupleTableSlot *ExecForeignScan(ForeignScanState *node);
extern void ExecEndForeignScan(ForeignScanState *node);
extern void ExecReScanForeignScan(ForeignScanState *node);
+extern void ExecForeignScanEstimate(ForeignScanState *node,
+ ParallelContext *pcxt);
+extern void ExecForeignScanInitializeDSM(ForeignScanState *node,
+ ParallelContext *pcxt);
+extern void ExecForeignScanInitializeWorker(ForeignScanState *node,
+ shm_toc *toc);
+
#endif /* NODEFOREIGNSCAN_H */
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index db73233..e16fbf3 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -12,6 +12,7 @@
#ifndef FDWAPI_H
#define FDWAPI_H
+#include "access/parallel.h"
#include "nodes/execnodes.h"
#include "nodes/relation.h"
@@ -122,6 +123,14 @@ typedef bool (*AnalyzeForeignTable_function) (Relation relation,
typedef List *(*ImportForeignSchema_function) (ImportForeignSchemaStmt *stmt,
Oid serverOid);
+typedef Size (*EstimateDSMForeignScan_function) (ForeignScanState *node,
+ ParallelContext *pcxt);
+typedef void (*InitializeDSMForeignScan_function) (ForeignScanState *node,
+ ParallelContext *pcxt,
+ void *coordinate);
+typedef void (*InitializeWorkerForeignScan_function) (ForeignScanState *node,
+ shm_toc *toc,
+ void *coordinate);
/*
* FdwRoutine is the struct returned by a foreign-data wrapper's handler
* function. It provides pointers to the callback functions needed by the
@@ -177,6 +186,11 @@ typedef struct FdwRoutine
/* Support functions for IMPORT FOREIGN SCHEMA */
ImportForeignSchema_function ImportForeignSchema;
+
+ /* Support functions for parallelism under Gather node */
+ EstimateDSMForeignScan_function EstimateDSMForeignScan;
+ InitializeDSMForeignScan_function InitializeDSMForeignScan;
+ InitializeWorkerForeignScan_function InitializeWorkerForeignScan;
} FdwRoutine;
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 07cd20a..ab9f80f 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1585,6 +1585,7 @@ typedef struct ForeignScanState
{
ScanState ss; /* its first field is NodeTag */
List *fdw_recheck_quals; /* original quals not in ss.ps.qual */
+ Size pscan_len; /* size of parallel coordination information */
/* use struct pointer to avoid including fdwapi.h here */
struct FdwRoutine *fdwroutine;
void *fdw_state; /* foreign-data wrapper can keep state here */
@@ -1603,6 +1604,8 @@ typedef struct ForeignScanState
* the BeginCustomScan method.
* ----------------
*/
+struct ParallelContext; /* avoid including parallel.h here */
+struct shm_toc; /* avoid including shm_toc.h here */
struct ExplainState; /* avoid including explain.h here */
struct CustomScanState;
@@ -1619,7 +1622,15 @@ typedef struct CustomExecMethods
void (*ReScanCustomScan) (struct CustomScanState *node);
void (*MarkPosCustomScan) (struct CustomScanState *node);
void (*RestrPosCustomScan) (struct CustomScanState *node);
-
+ /* Optional: parallel execution support */
+ Size (*EstimateDSMCustomScan) (struct CustomScanState *node,
+ struct ParallelContext *pcxt);
+ void (*InitializeDSMCustomScan) (struct CustomScanState *node,
+ struct ParallelContext *pcxt,
+ void *coordinate);
+ void (*InitializeWorkerCustomScan) (struct CustomScanState *node,
+ struct shm_toc *toc,
+ void *coordinate);
/* Optional: print additional information in EXPLAIN */
void (*ExplainCustomScan) (struct CustomScanState *node,
List *ancestors,
@@ -1631,6 +1642,7 @@ typedef struct CustomScanState
ScanState ss;
uint32 flags; /* mask of CUSTOMPATH_* flags, see relation.h */
List *custom_ps; /* list of child PlanState nodes, if any */
+ Size pscan_len; /* size of parallel coordination information */
const CustomExecMethods *methods;
} CustomScanState;
On Thu, Jan 28, 2016 at 8:14 PM, Kouhei Kaigai <kaigai@ak.jp.nec.com> wrote:
total ForeignScan diff
0 workers: 17584.319 ms 17555.904 ms 28.415 ms
1 workers: 18464.476 ms 18110.968 ms 353.508 ms
2 workers: 19042.755 ms 14580.335 ms 4462.420 ms
3 workers: 19318.254 ms 12668.912 ms 6649.342 ms
4 workers: 21732.910 ms 13596.788 ms 8136.122 ms
5 workers: 23486.846 ms 14533.409 ms 8953.437 msThis workstation has 4 CPU cores, so it is natural nworkers=3 records the
peak performance on ForeignScan portion. On the other hands, nworkers>1 also
recorded unignorable time consumption (probably, by Gather node?):
Further investigation will need....
It was a bug of my file_fdw patch. ForeignScan node in the master process was
also kicked by the Gather node, however, it didn't have coordinate information
due to oversight of the initialization at InitializeDSMForeignScan callback.
In the result, local ForeignScan node is still executed after the completion
of coordinated background worker processes, and returned twice amount of rows.In the revised patch, results seems to me reasonable.
total ForeignScan diff
0 workers: 17592.498 ms 17564.457 ms 28.041ms
1 workers: 12152.998 ms 11983.485 ms 169.513 ms
2 workers: 10647.858 ms 10502.100 ms 145.758 ms
3 workers: 9635.445 ms 9509.899 ms 125.546 ms
4 workers: 11175.456 ms 10863.293 ms 312.163 ms
5 workers: 12586.457 ms 12279.323 ms 307.134 ms
Hmm. Is the file_fdw part of this just a demo, or do you want to try
to get that committed? If so, maybe start a new thread with a more
appropriate subject line to just talk about that. I haven't
scrutinized that part of the patch in any detail, but the general
infrastructure for FDWs and custom scans to use parallelism seems to
be in good shape, so I rewrote the documentation and committed that
part.
Do you have any idea why this isn't scaling beyond, uh, 1 worker?
That seems like a good thing to try to figure out.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
-----Original Message-----
From: Robert Haas [mailto:robertmhaas@gmail.com]
Sent: Thursday, February 04, 2016 2:54 AM
To: Kaigai Kouhei(海外 浩平)
Cc: pgsql-hackers@postgresql.org
Subject: ##freemail## Re: [HACKERS] CustomScan under the Gather node?On Thu, Jan 28, 2016 at 8:14 PM, Kouhei Kaigai <kaigai@ak.jp.nec.com> wrote:
total ForeignScan diff
0 workers: 17584.319 ms 17555.904 ms 28.415 ms
1 workers: 18464.476 ms 18110.968 ms 353.508 ms
2 workers: 19042.755 ms 14580.335 ms 4462.420 ms
3 workers: 19318.254 ms 12668.912 ms 6649.342 ms
4 workers: 21732.910 ms 13596.788 ms 8136.122 ms
5 workers: 23486.846 ms 14533.409 ms 8953.437 msThis workstation has 4 CPU cores, so it is natural nworkers=3 records the
peak performance on ForeignScan portion. On the other hands, nworkers>1 also
recorded unignorable time consumption (probably, by Gather node?):
Further investigation will need....
It was a bug of my file_fdw patch. ForeignScan node in the master process was
also kicked by the Gather node, however, it didn't have coordinate information
due to oversight of the initialization at InitializeDSMForeignScan callback.
In the result, local ForeignScan node is still executed after the completion
of coordinated background worker processes, and returned twice amount of rows.In the revised patch, results seems to me reasonable.
total ForeignScan diff
0 workers: 17592.498 ms 17564.457 ms 28.041ms
1 workers: 12152.998 ms 11983.485 ms 169.513 ms
2 workers: 10647.858 ms 10502.100 ms 145.758 ms
3 workers: 9635.445 ms 9509.899 ms 125.546 ms
4 workers: 11175.456 ms 10863.293 ms 312.163 ms
5 workers: 12586.457 ms 12279.323 ms 307.134 msHmm. Is the file_fdw part of this just a demo, or do you want to try
to get that committed? If so, maybe start a new thread with a more
appropriate subject line to just talk about that. I haven't
scrutinized that part of the patch in any detail, but the general
infrastructure for FDWs and custom scans to use parallelism seems to
be in good shape, so I rewrote the documentation and committed that
part.
Thanks, I expect file_fdw part is just for demonstration.
It does not require any special hardware to reproduce this parallel
execution, rather than GpuScan of PG-Strom.
Do you have any idea why this isn't scaling beyond, uh, 1 worker?
That seems like a good thing to try to figure out.
The hardware I run the above query has 4 CPU cores, so it is not
surprising that 3 workers (+ 1 master) recorded the peak performance.
In addition, enhancement of file_fdw part is a corner-cutting work.
It picks up the next line number to be fetched from the shared memory
segment using pg_atomic_add_fetch_u32(), then it reads the input file
until worker meets the target line. Unrelated line shall be ignored.
Individual worker parses its responsible line only, thus, parallel
execution makes sense in this part. On the other hands, total amount
of CPU cycles for file scan will increase because all the workers
at least have to parse all the lines.
If we would simply split time consumption factor in 0 worker case
as follows:
(time to scan file; TSF) + (time to parse lines; TPL)
Total amount of workloads when we distribute file_fdw into N workers is:
N * (TSF) + (TPL)
Thus, individual worker has to process the following amount of works:
(TSF) + (TPL)/N
It is a typical formula of Amdahl's law when sequencial part is not
small. The above result says, TSF part is about 7.4s, TPL part is
about 10.1s.
Thanks,
--
NEC Business Creation Division / PG-Strom Project
KaiGai Kohei <kaigai@ak.jp.nec.com>
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Import Notes
Resolved by subject fallback