Parallel copy
This work is to parallelize the copy command and in particular "Copy
<table_name> from 'filename' Where <condition>;" command.
Before going into how and what portion of 'copy command' processing we
can parallelize, let us see in brief what are the top-level operations
we perform while copying from the file into a table. We read the file
in 64KB chunks, then find the line endings and process that data line
by line, where each line corresponds to one tuple. We first form the
tuple (in form of value/null array) from that line, check if it
qualifies the where condition and if it qualifies, then perform
constraint check and few other checks and then finally store it in
local tuple array. Once we reach 1000 tuples or consumed 64KB
(whichever occurred first), we insert them together via
table_multi_insert API and then for each tuple insert into the
index(es) and execute after row triggers.
So if we see here we do a lot of work after reading each 64K chunk.
We can read the next chunk only after all the tuples are processed in
the previous chunk we read. This brings us an opportunity to
parallelize each 64K chunk processing. I think we can do this in more
than one way.
The first idea is that we allocate each chunk to a worker and once the
worker has finished processing the current chunk, it can start with
the next unprocessed chunk. Here, we need to see how to handle the
partial tuples at the end or beginning of each chunk. We can read the
chunks in dsa/dsm instead of in local buffer for processing.
Alternatively, if we think that accessing shared memory can be costly
we can read the entire chunk in local memory, but copy the partial
tuple at the beginning of a chunk (if any) to a dsa. We mainly need
partial tuple in the shared memory area. The worker which has found
the initial part of the partial tuple will be responsible to process
the entire tuple. Now, to detect whether there is a partial tuple at
the beginning of a chunk, we always start reading one byte, prior to
the start of the current chunk and if that byte is not a terminating
line byte, we know that it is a partial tuple. Now, while processing
the chunk, we will ignore this first line and start after the first
terminating line.
To connect the partial tuple in two consecutive chunks, we need to
have another data structure (for the ease of reference in this email,
I call it CTM (chunk-tuple-map)) in shared memory where we store some
per-chunk information like the chunk-number, dsa location of that
chunk and a variable which indicates whether we can free/reuse the
current entry. Whenever we encounter the partial tuple at the
beginning of a chunk we note down its chunk number, and dsa location
in CTM. Next, whenever we encounter any partial tuple at the end of
the chunk, we search CTM for next chunk-number and read from
corresponding dsa location till we encounter terminating line byte.
Once we have read and processed this partial tuple, we can mark the
entry as available for reuse. There are some loose ends here like how
many entries shall we allocate in this data structure. It depends on
whether we want to allow the worker to start reading the next chunk
before the partial tuple of the previous chunk is processed. To keep
it simple, we can allow the worker to process the next chunk only when
the partial tuple in the previous chunk is processed. This will allow
us to keep the entries equal to a number of workers in CTM. I think
we can easily improve this if we want but I don't think it will matter
too much as in most cases by the time we processed the tuples in that
chunk, the partial tuple would have been consumed by the other worker.
Another approach that came up during an offlist discussion with Robert
is that we have one dedicated worker for reading the chunks from file
and it copies the complete tuples of one chunk in the shared memory
and once that is done, a handover that chunks to another worker which
can process tuples in that area. We can imagine that the reader
worker is responsible to form some sort of work queue that can be
processed by the other workers. In this idea, we won't be able to get
the benefit of initial tokenization (forming tuple boundaries) via
parallel workers and might need some additional memory processing as
after reader worker has handed the initial shared memory segment, we
need to somehow identify tuple boundaries and then process them.
Another thing we need to figure out is the how many workers to use for
the copy command. I think we can use it based on the file size which
needs some experiments or may be based on user input.
I think we have two related problems to solve for this (a) relation
extension lock (required for extending the relation) which won't
conflict among workers due to group locking, we are working on a
solution for this in another thread [1]/messages/by-id/CAD21AoCmT3cFQUN4aVvzy5chw7DuzXrJCbrjTU05B+Ss=Gn1LA@mail.gmail.com, (b) Use of Page locks in Gin
indexes, we can probably disallow parallelism if the table has Gin
index which is not a great thing but not bad either.
To be clear, this work is for PG14.
Thoughts?
[1]: /messages/by-id/CAD21AoCmT3cFQUN4aVvzy5chw7DuzXrJCbrjTU05B+Ss=Gn1LA@mail.gmail.com
--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com
On Fri, Feb 14, 2020 at 9:12 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
This work is to parallelize the copy command and in particular "Copy
<table_name> from 'filename' Where <condition>;" command.
Nice project, and a great stepping stone towards parallel DML.
The first idea is that we allocate each chunk to a worker and once the
worker has finished processing the current chunk, it can start with
the next unprocessed chunk. Here, we need to see how to handle the
partial tuples at the end or beginning of each chunk. We can read the
chunks in dsa/dsm instead of in local buffer for processing.
Alternatively, if we think that accessing shared memory can be costly
we can read the entire chunk in local memory, but copy the partial
tuple at the beginning of a chunk (if any) to a dsa. We mainly need
partial tuple in the shared memory area. The worker which has found
the initial part of the partial tuple will be responsible to process
the entire tuple. Now, to detect whether there is a partial tuple at
the beginning of a chunk, we always start reading one byte, prior to
the start of the current chunk and if that byte is not a terminating
line byte, we know that it is a partial tuple. Now, while processing
the chunk, we will ignore this first line and start after the first
terminating line.
That's quiet similar to the approach I took with a parallel file_fdw
patch[1]/messages/by-id/CA+hUKGKZu8fpZo0W=POmQEN46kXhLedzqqAnt5iJZy7tD0x6sw@mail.gmail.com, which mostly consisted of parallelising the reading part of
copy.c, except that...
To connect the partial tuple in two consecutive chunks, we need to
have another data structure (for the ease of reference in this email,
I call it CTM (chunk-tuple-map)) in shared memory where we store some
per-chunk information like the chunk-number, dsa location of that
chunk and a variable which indicates whether we can free/reuse the
current entry. Whenever we encounter the partial tuple at the
beginning of a chunk we note down its chunk number, and dsa location
in CTM. Next, whenever we encounter any partial tuple at the end of
the chunk, we search CTM for next chunk-number and read from
corresponding dsa location till we encounter terminating line byte.
Once we have read and processed this partial tuple, we can mark the
entry as available for reuse. There are some loose ends here like how
many entries shall we allocate in this data structure. It depends on
whether we want to allow the worker to start reading the next chunk
before the partial tuple of the previous chunk is processed. To keep
it simple, we can allow the worker to process the next chunk only when
the partial tuple in the previous chunk is processed. This will allow
us to keep the entries equal to a number of workers in CTM. I think
we can easily improve this if we want but I don't think it will matter
too much as in most cases by the time we processed the tuples in that
chunk, the partial tuple would have been consumed by the other worker.
... I didn't use a shm 'partial tuple' exchanging mechanism, I just
had each worker follow the final tuple in its chunk into the next
chunk, and have each worker ignore the first tuple in chunk after
chunk 0 because it knows someone else is looking after that. That
means that there was some double reading going on near the boundaries,
and considering how much I've been complaining about bogus extra
system calls on this mailing list lately, yeah, your idea of doing a
bit more coordination is a better idea. If you go this way, you might
at least find the copy.c part of the patch I wrote useful as stand-in
scaffolding code in the meantime while you prototype the parallel
writing side, if you don't already have something better for this?
Another approach that came up during an offlist discussion with Robert
is that we have one dedicated worker for reading the chunks from file
and it copies the complete tuples of one chunk in the shared memory
and once that is done, a handover that chunks to another worker which
can process tuples in that area. We can imagine that the reader
worker is responsible to form some sort of work queue that can be
processed by the other workers. In this idea, we won't be able to get
the benefit of initial tokenization (forming tuple boundaries) via
parallel workers and might need some additional memory processing as
after reader worker has handed the initial shared memory segment, we
need to somehow identify tuple boundaries and then process them.
Yeah, I have also wondered about something like this in a slightly
different context. For parallel query in general, I wondered if there
should be a Parallel Scatter node, that can be put on top of any
parallel-safe plan, and it runs it in a worker process that just
pushes tuples into a single-producer multi-consumer shm queue, and
then other workers read from that whenever they need a tuple. Hmm,
but for COPY, I suppose you'd want to push the raw lines with minimal
examination, not tuples, into a shm queue, so I guess that's a bit
different.
Another thing we need to figure out is the how many workers to use for
the copy command. I think we can use it based on the file size which
needs some experiments or may be based on user input.
It seems like we don't even really have a general model for that sort
of thing in the rest of the system yet, and I guess some kind of
fairly dumb explicit system would make sense in the early days...
Thoughts?
This is cool.
[1]: /messages/by-id/CA+hUKGKZu8fpZo0W=POmQEN46kXhLedzqqAnt5iJZy7tD0x6sw@mail.gmail.com
On Fri, Feb 14, 2020 at 3:36 PM Thomas Munro <thomas.munro@gmail.com> wrote:
On Fri, Feb 14, 2020 at 9:12 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
This work is to parallelize the copy command and in particular "Copy
<table_name> from 'filename' Where <condition>;" command.Nice project, and a great stepping stone towards parallel DML.
Thanks.
The first idea is that we allocate each chunk to a worker and once the
worker has finished processing the current chunk, it can start with
the next unprocessed chunk. Here, we need to see how to handle the
partial tuples at the end or beginning of each chunk. We can read the
chunks in dsa/dsm instead of in local buffer for processing.
Alternatively, if we think that accessing shared memory can be costly
we can read the entire chunk in local memory, but copy the partial
tuple at the beginning of a chunk (if any) to a dsa. We mainly need
partial tuple in the shared memory area. The worker which has found
the initial part of the partial tuple will be responsible to process
the entire tuple. Now, to detect whether there is a partial tuple at
the beginning of a chunk, we always start reading one byte, prior to
the start of the current chunk and if that byte is not a terminating
line byte, we know that it is a partial tuple. Now, while processing
the chunk, we will ignore this first line and start after the first
terminating line.That's quiet similar to the approach I took with a parallel file_fdw
patch[1], which mostly consisted of parallelising the reading part of
copy.c, except that...To connect the partial tuple in two consecutive chunks, we need to
have another data structure (for the ease of reference in this email,
I call it CTM (chunk-tuple-map)) in shared memory where we store some
per-chunk information like the chunk-number, dsa location of that
chunk and a variable which indicates whether we can free/reuse the
current entry. Whenever we encounter the partial tuple at the
beginning of a chunk we note down its chunk number, and dsa location
in CTM. Next, whenever we encounter any partial tuple at the end of
the chunk, we search CTM for next chunk-number and read from
corresponding dsa location till we encounter terminating line byte.
Once we have read and processed this partial tuple, we can mark the
entry as available for reuse. There are some loose ends here like how
many entries shall we allocate in this data structure. It depends on
whether we want to allow the worker to start reading the next chunk
before the partial tuple of the previous chunk is processed. To keep
it simple, we can allow the worker to process the next chunk only when
the partial tuple in the previous chunk is processed. This will allow
us to keep the entries equal to a number of workers in CTM. I think
we can easily improve this if we want but I don't think it will matter
too much as in most cases by the time we processed the tuples in that
chunk, the partial tuple would have been consumed by the other worker.... I didn't use a shm 'partial tuple' exchanging mechanism, I just
had each worker follow the final tuple in its chunk into the next
chunk, and have each worker ignore the first tuple in chunk after
chunk 0 because it knows someone else is looking after that. That
means that there was some double reading going on near the boundaries,
Right and especially if the part in the second chunk is bigger, then
we might need to read most of the second chunk.
and considering how much I've been complaining about bogus extra
system calls on this mailing list lately, yeah, your idea of doing a
bit more coordination is a better idea. If you go this way, you might
at least find the copy.c part of the patch I wrote useful as stand-in
scaffolding code in the meantime while you prototype the parallel
writing side, if you don't already have something better for this?
No, I haven't started writing anything yet, but I have some ideas on
how to achieve this. I quickly skimmed through your patch and I think
that can be used as a starting point though if we decide to go with
accumulating the partial tuple or all the data in shm, then the things
might differ.
Another approach that came up during an offlist discussion with Robert
is that we have one dedicated worker for reading the chunks from file
and it copies the complete tuples of one chunk in the shared memory
and once that is done, a handover that chunks to another worker which
can process tuples in that area. We can imagine that the reader
worker is responsible to form some sort of work queue that can be
processed by the other workers. In this idea, we won't be able to get
the benefit of initial tokenization (forming tuple boundaries) via
parallel workers and might need some additional memory processing as
after reader worker has handed the initial shared memory segment, we
need to somehow identify tuple boundaries and then process them.Yeah, I have also wondered about something like this in a slightly
different context. For parallel query in general, I wondered if there
should be a Parallel Scatter node, that can be put on top of any
parallel-safe plan, and it runs it in a worker process that just
pushes tuples into a single-producer multi-consumer shm queue, and
then other workers read from that whenever they need a tuple.
The idea sounds great but the past experience shows that shoving all
the tuples through queue might add a significant overhead. However, I
don't know how exactly you are planning to use it?
Hmm,
but for COPY, I suppose you'd want to push the raw lines with minimal
examination, not tuples, into a shm queue, so I guess that's a bit
different.
Yeah.
Another thing we need to figure out is the how many workers to use for
the copy command. I think we can use it based on the file size which
needs some experiments or may be based on user input.It seems like we don't even really have a general model for that sort
of thing in the rest of the system yet, and I guess some kind of
fairly dumb explicit system would make sense in the early days...
makes sense.
--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com
On Fri, 14 Feb 2020 at 11:57, Amit Kapila <amit.kapila16@gmail.com> wrote:
On Fri, Feb 14, 2020 at 3:36 PM Thomas Munro <thomas.munro@gmail.com>
wrote:On Fri, Feb 14, 2020 at 9:12 PM Amit Kapila <amit.kapila16@gmail.com>
wrote:
...
Another approach that came up during an offlist discussion with Robert
is that we have one dedicated worker for reading the chunks from file
and it copies the complete tuples of one chunk in the shared memory
and once that is done, a handover that chunks to another worker which
can process tuples in that area. We can imagine that the reader
worker is responsible to form some sort of work queue that can be
processed by the other workers. In this idea, we won't be able to get
the benefit of initial tokenization (forming tuple boundaries) via
parallel workers and might need some additional memory processing as
after reader worker has handed the initial shared memory segment, we
need to somehow identify tuple boundaries and then process them.
Parsing rows from the raw input (the work done by CopyReadLine()) in a
single process would accommodate line returns in quoted fields. I don't
think there's a way of getting parallel workers to manage the
in-quote/out-of-quote state required. A single worker could also process a
stream without having to reread/rewind so it would be able to process input
from STDIN or PROGRAM sources, making the improvements applicable to load
operations done by third party tools and scripted \copy in psql.
...
Another thing we need to figure out is the how many workers to use for
the copy command. I think we can use it based on the file size which
needs some experiments or may be based on user input.It seems like we don't even really have a general model for that sort
of thing in the rest of the system yet, and I guess some kind of
fairly dumb explicit system would make sense in the early days...makes sense.
The ratio between chunking or line parsing processes and the parallel
worker pool would vary with the width of the table, complexity of the data
or file (dates, encoding conversions), complexity of constraints and
acceptable impact of the load. Being able to control it through user input
would be great.
--
Alastair
On Fri, Feb 14, 2020 at 7:16 PM Alastair Turner <minion@decodable.me> wrote:
On Fri, 14 Feb 2020 at 11:57, Amit Kapila <amit.kapila16@gmail.com> wrote:
On Fri, Feb 14, 2020 at 3:36 PM Thomas Munro <thomas.munro@gmail.com> wrote:
On Fri, Feb 14, 2020 at 9:12 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
...
Another approach that came up during an offlist discussion with Robert
is that we have one dedicated worker for reading the chunks from file
and it copies the complete tuples of one chunk in the shared memory
and once that is done, a handover that chunks to another worker which
can process tuples in that area. We can imagine that the reader
worker is responsible to form some sort of work queue that can be
processed by the other workers. In this idea, we won't be able to get
the benefit of initial tokenization (forming tuple boundaries) via
parallel workers and might need some additional memory processing as
after reader worker has handed the initial shared memory segment, we
need to somehow identify tuple boundaries and then process them.Parsing rows from the raw input (the work done by CopyReadLine()) in a single process would accommodate line returns in quoted fields. I don't think there's a way of getting parallel workers to manage the in-quote/out-of-quote state required.
AFAIU, the whole of this in-quote/out-of-quote state is manged inside
CopyReadLineText which will be done by each of the parallel workers,
something on the lines of what Thomas did in his patch [1]/messages/by-id/CA+hUKGKZu8fpZo0W=POmQEN46kXhLedzqqAnt5iJZy7tD0x6sw@mail.gmail.com.
Basically, we need to invent a mechanism to allocate chunks to
individual workers and then the whole processing will be done as we
are doing now except for special handling for partial tuples which I
have explained in my previous email. Am, I missing something here?
...
Another thing we need to figure out is the how many workers to use for
the copy command. I think we can use it based on the file size which
needs some experiments or may be based on user input.It seems like we don't even really have a general model for that sort
of thing in the rest of the system yet, and I guess some kind of
fairly dumb explicit system would make sense in the early days...makes sense.
The ratio between chunking or line parsing processes and the parallel worker pool would vary with the width of the table, complexity of the data or file (dates, encoding conversions), complexity of constraints and acceptable impact of the load. Being able to control it through user input would be great.
Okay, I think one simple way could be that we compute the number of
workers based on filesize (some experiments are required to determine
this) unless the user has given the input. If the user has provided
the input then we can use that with an upper limit to
max_parallel_workers.
[1]: /messages/by-id/CA+hUKGKZu8fpZo0W=POmQEN46kXhLedzqqAnt5iJZy7tD0x6sw@mail.gmail.com
--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com
On Sat, 15 Feb 2020 at 04:55, Amit Kapila <amit.kapila16@gmail.com> wrote:
On Fri, Feb 14, 2020 at 7:16 PM Alastair Turner <minion@decodable.me> wrote:
...
Parsing rows from the raw input (the work done by CopyReadLine()) in a single process would accommodate line returns in quoted fields. I don't think there's a way of getting parallel workers to manage the in-quote/out-of-quote state required.
AFAIU, the whole of this in-quote/out-of-quote state is manged inside
CopyReadLineText which will be done by each of the parallel workers,
something on the lines of what Thomas did in his patch [1].
Basically, we need to invent a mechanism to allocate chunks to
individual workers and then the whole processing will be done as we
are doing now except for special handling for partial tuples which I
have explained in my previous email. Am, I missing something here?
The problem case that I see is the chunk boundary falling in the
middle of a quoted field where
- The quote opens in chunk 1
- The quote closes in chunk 2
- There is an EoL character between the start of chunk 2 and the closing quote
When the worker processing chunk 2 starts, it believes itself to be in
out-of-quote state, so only data between the start of the chunk and
the EoL is regarded as belonging to the partial line. From that point
on the parsing of the rest of the chunk goes off track.
Some of the resulting errors can be avoided by, for instance,
requiring a quote to be preceded by a delimiter or EoL. That answer
fails when fields end with EoL characters, which happens often enough
in the wild.
Recovering from an incorrect in-quote/out-of-quote state assumption at
the start of parsing a chunk just seems like a hole with no bottom. So
it looks to me like it's best done in a single process which can keep
track of that state reliably.
--
Aastair
On Sat, Feb 15, 2020 at 4:08 PM Alastair Turner <minion@decodable.me> wrote:
On Sat, 15 Feb 2020 at 04:55, Amit Kapila <amit.kapila16@gmail.com> wrote:
On Fri, Feb 14, 2020 at 7:16 PM Alastair Turner <minion@decodable.me> wrote:
...
Parsing rows from the raw input (the work done by CopyReadLine()) in a single process would accommodate line returns in quoted fields. I don't think there's a way of getting parallel workers to manage the in-quote/out-of-quote state required.
AFAIU, the whole of this in-quote/out-of-quote state is manged inside
CopyReadLineText which will be done by each of the parallel workers,
something on the lines of what Thomas did in his patch [1].
Basically, we need to invent a mechanism to allocate chunks to
individual workers and then the whole processing will be done as we
are doing now except for special handling for partial tuples which I
have explained in my previous email. Am, I missing something here?The problem case that I see is the chunk boundary falling in the
middle of a quoted field where
- The quote opens in chunk 1
- The quote closes in chunk 2
- There is an EoL character between the start of chunk 2 and the closing quoteWhen the worker processing chunk 2 starts, it believes itself to be in
out-of-quote state, so only data between the start of the chunk and
the EoL is regarded as belonging to the partial line. From that point
on the parsing of the rest of the chunk goes off track.Some of the resulting errors can be avoided by, for instance,
requiring a quote to be preceded by a delimiter or EoL. That answer
fails when fields end with EoL characters, which happens often enough
in the wild.Recovering from an incorrect in-quote/out-of-quote state assumption at
the start of parsing a chunk just seems like a hole with no bottom. So
it looks to me like it's best done in a single process which can keep
track of that state reliably.
Good point and I agree with you that having a single process would
avoid any such stuff. However, I will think some more on it and if
you/anyone else gets some idea on how to deal with this in a
multi-worker system (where we can allow each worker to read and
process the chunk) then feel free to share your thoughts.
--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com
On Sat, Feb 15, 2020 at 06:02:06PM +0530, Amit Kapila wrote:
On Sat, Feb 15, 2020 at 4:08 PM Alastair Turner <minion@decodable.me> wrote:
On Sat, 15 Feb 2020 at 04:55, Amit Kapila <amit.kapila16@gmail.com> wrote:
On Fri, Feb 14, 2020 at 7:16 PM Alastair Turner <minion@decodable.me> wrote:
...
Parsing rows from the raw input (the work done by CopyReadLine()) in a single process would accommodate line returns in quoted fields. I don't think there's a way of getting parallel workers to manage the in-quote/out-of-quote state required.
AFAIU, the whole of this in-quote/out-of-quote state is manged inside
CopyReadLineText which will be done by each of the parallel workers,
something on the lines of what Thomas did in his patch [1].
Basically, we need to invent a mechanism to allocate chunks to
individual workers and then the whole processing will be done as we
are doing now except for special handling for partial tuples which I
have explained in my previous email. Am, I missing something here?The problem case that I see is the chunk boundary falling in the
middle of a quoted field where
- The quote opens in chunk 1
- The quote closes in chunk 2
- There is an EoL character between the start of chunk 2 and the closing quoteWhen the worker processing chunk 2 starts, it believes itself to be in
out-of-quote state, so only data between the start of the chunk and
the EoL is regarded as belonging to the partial line. From that point
on the parsing of the rest of the chunk goes off track.Some of the resulting errors can be avoided by, for instance,
requiring a quote to be preceded by a delimiter or EoL. That answer
fails when fields end with EoL characters, which happens often enough
in the wild.Recovering from an incorrect in-quote/out-of-quote state assumption at
the start of parsing a chunk just seems like a hole with no bottom. So
it looks to me like it's best done in a single process which can keep
track of that state reliably.Good point and I agree with you that having a single process would
avoid any such stuff. However, I will think some more on it and if
you/anyone else gets some idea on how to deal with this in a
multi-worker system (where we can allow each worker to read and
process the chunk) then feel free to share your thoughts.
I see two pieces of this puzzle: an input format we control, and the
ones we don't.
In the former case, we could encode all fields with base85 (or
something similar that reduces the input alphabet efficiently), then
reserve bytes that denote delimiters of various types. ASCII has
separators for file, group, record, and unit that we could use as
inspiration.
I don't have anything to offer for free-form input other than to agree
that it looks like a hole with no bottom, and maybe we should just
keep that process serial, at least until someone finds a bottom.
Best,
David.
--
David Fetter <david(at)fetter(dot)org> http://fetter.org/
Phone: +1 415 235 3778
Remember to vote!
Consider donating to Postgres: http://www.postgresql.org/about/donate
On 2/15/20 7:32 AM, Amit Kapila wrote:
On Sat, Feb 15, 2020 at 4:08 PM Alastair Turner <minion@decodable.me> wrote:
On Sat, 15 Feb 2020 at 04:55, Amit Kapila <amit.kapila16@gmail.com> wrote:
On Fri, Feb 14, 2020 at 7:16 PM Alastair Turner <minion@decodable.me> wrote:
...
Parsing rows from the raw input (the work done by CopyReadLine()) in a single process would accommodate line returns in quoted fields. I don't think there's a way of getting parallel workers to manage the in-quote/out-of-quote state required.
AFAIU, the whole of this in-quote/out-of-quote state is manged inside
CopyReadLineText which will be done by each of the parallel workers,
something on the lines of what Thomas did in his patch [1].
Basically, we need to invent a mechanism to allocate chunks to
individual workers and then the whole processing will be done as we
are doing now except for special handling for partial tuples which I
have explained in my previous email. Am, I missing something here?The problem case that I see is the chunk boundary falling in the
middle of a quoted field where
- The quote opens in chunk 1
- The quote closes in chunk 2
- There is an EoL character between the start of chunk 2 and the closing quoteWhen the worker processing chunk 2 starts, it believes itself to be in
out-of-quote state, so only data between the start of the chunk and
the EoL is regarded as belonging to the partial line. From that point
on the parsing of the rest of the chunk goes off track.Some of the resulting errors can be avoided by, for instance,
requiring a quote to be preceded by a delimiter or EoL. That answer
fails when fields end with EoL characters, which happens often enough
in the wild.Recovering from an incorrect in-quote/out-of-quote state assumption at
the start of parsing a chunk just seems like a hole with no bottom. So
it looks to me like it's best done in a single process which can keep
track of that state reliably.Good point and I agree with you that having a single process would
avoid any such stuff. However, I will think some more on it and if
you/anyone else gets some idea on how to deal with this in a
multi-worker system (where we can allow each worker to read and
process the chunk) then feel free to share your thoughts.
IIRC, in_quote only matters here in CSV mode (because CSV fields can
have embedded newlines). So why not just forbid parallel copy in CSV
mode, at least for now? I guess it depends on the actual use case. If we
expect to be parallel loading humungous CSVs then that won't fly.
cheers
andrew
--
Andrew Dunstan https://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
On Sun, Feb 16, 2020 at 12:21 PM Andrew Dunstan
<andrew.dunstan@2ndquadrant.com> wrote:
On 2/15/20 7:32 AM, Amit Kapila wrote:
On Sat, Feb 15, 2020 at 4:08 PM Alastair Turner <minion@decodable.me> wrote:
The problem case that I see is the chunk boundary falling in the
middle of a quoted field where
- The quote opens in chunk 1
- The quote closes in chunk 2
- There is an EoL character between the start of chunk 2 and the closing quoteWhen the worker processing chunk 2 starts, it believes itself to be in
out-of-quote state, so only data between the start of the chunk and
the EoL is regarded as belonging to the partial line. From that point
on the parsing of the rest of the chunk goes off track.Some of the resulting errors can be avoided by, for instance,
requiring a quote to be preceded by a delimiter or EoL. That answer
fails when fields end with EoL characters, which happens often enough
in the wild.Recovering from an incorrect in-quote/out-of-quote state assumption at
the start of parsing a chunk just seems like a hole with no bottom. So
it looks to me like it's best done in a single process which can keep
track of that state reliably.Good point and I agree with you that having a single process would
avoid any such stuff. However, I will think some more on it and if
you/anyone else gets some idea on how to deal with this in a
multi-worker system (where we can allow each worker to read and
process the chunk) then feel free to share your thoughts.IIRC, in_quote only matters here in CSV mode (because CSV fields can
have embedded newlines).
AFAIU, that is correct.
So why not just forbid parallel copy in CSV
mode, at least for now? I guess it depends on the actual use case. If we
expect to be parallel loading humungous CSVs then that won't fly.
I am not sure about this part. However, I guess we should at the very
least have some extendable solution that can deal with csv, otherwise,
we might end up re-designing everything if someday we want to deal
with CSV. One naive idea is that in csv mode, we can set up the
things slightly differently like the worker, won't start processing
the chunk unless the previous chunk is completely parsed. So each
worker would first parse and tokenize the entire chunk and then start
writing it. So, this will make the reading/parsing part serialized,
but writes can still be parallel. Now, I don't know if it is a good
idea to process in a different way for csv mode.
--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com
On Sat, 15 Feb 2020 at 14:32, Amit Kapila <amit.kapila16@gmail.com> wrote:
Good point and I agree with you that having a single process would
avoid any such stuff. However, I will think some more on it and if
you/anyone else gets some idea on how to deal with this in a
multi-worker system (where we can allow each worker to read and
process the chunk) then feel free to share your thoughts.
I think having a single process handle splitting the input into tuples makes
most sense. It's possible to parse csv at multiple GB/s rates [1]https://github.com/geofflangdale/simdcsv/, finding
tuple boundaries is a subset of that task.
My first thought for a design would be to have two shared memory ring buffers,
one for data and one for tuple start positions. Reader process reads the CSV
data into the main buffer, finds tuple start locations in there and writes
those to the secondary buffer.
Worker processes claim a chunk of tuple positions from the secondary buffer and
update their "keep this data around" position with the first position. Then
proceed to parse and insert the tuples, updating their position until they find
the end of the last tuple in the chunk.
Buffer size, maximum and minimum chunk size could be tunable. Ideally the
buffers would be at least big enough to absorb one of the workers getting
scheduled out for a timeslice, which could be up to tens of megabytes.
Regards,
Ants Aasma
At Mon, 17 Feb 2020 16:49:22 +0530, Amit Kapila <amit.kapila16@gmail.com> wrote in
On Sun, Feb 16, 2020 at 12:21 PM Andrew Dunstan
<andrew.dunstan@2ndquadrant.com> wrote:On 2/15/20 7:32 AM, Amit Kapila wrote:
On Sat, Feb 15, 2020 at 4:08 PM Alastair Turner <minion@decodable.me> wrot> > So why not just forbid parallel copy in CSV
mode, at least for now? I guess it depends on the actual use case. If we
expect to be parallel loading humungous CSVs then that won't fly.I am not sure about this part. However, I guess we should at the very
least have some extendable solution that can deal with csv, otherwise,
we might end up re-designing everything if someday we want to deal
with CSV. One naive idea is that in csv mode, we can set up the
things slightly differently like the worker, won't start processing
the chunk unless the previous chunk is completely parsed. So each
worker would first parse and tokenize the entire chunk and then start
writing it. So, this will make the reading/parsing part serialized,
but writes can still be parallel. Now, I don't know if it is a good
idea to process in a different way for csv mode.
In an extreme case, if we didn't see a QUOTE in a chunk, we cannot
know the chunk is in a quoted section or not, until all the past
chunks are parsed. After all we are forced to parse fully
sequentially as far as we allow QUOTE.
On the other hand, if we allowed "COPY t FROM f WITH (FORMAT CSV,
QUOTE '')" in order to signal that there's no quoted section in the
file then all chunks would be fully concurrently parsable.
regards.
--
Kyotaro Horiguchi
NTT Open Source Software Center
On Tue, Feb 18, 2020 at 4:04 AM Ants Aasma <ants@cybertec.at> wrote:
On Sat, 15 Feb 2020 at 14:32, Amit Kapila <amit.kapila16@gmail.com> wrote:
Good point and I agree with you that having a single process would
avoid any such stuff. However, I will think some more on it and if
you/anyone else gets some idea on how to deal with this in a
multi-worker system (where we can allow each worker to read and
process the chunk) then feel free to share your thoughts.I think having a single process handle splitting the input into tuples makes
most sense. It's possible to parse csv at multiple GB/s rates [1], finding
tuple boundaries is a subset of that task.
Yeah, this is compelling. Even though it has to read the file
serially, the real gains from parallel COPY should come from doing the
real work in parallel: data-type parsing, tuple forming, WHERE clause
filtering, partition routing, buffer management, insertion and
associated triggers, FKs and index maintenance.
The reason I used the other approach for the file_fdw patch is that I
was trying to make it look as much as possible like parallel
sequential scan and not create an extra worker, because I didn't feel
like an FDW should be allowed to do that (what if executor nodes all
over the query tree created worker processes willy-nilly?). Obviously
it doesn't work correctly for embedded newlines, and even if you
decree that multi-line values aren't allowed in parallel COPY, the
stuff about tuples crossing chunk boundaries is still a bit unpleasant
(whether solved by double reading as I showed, or a bunch of tap
dancing in shared memory) and creates overheads.
My first thought for a design would be to have two shared memory ring buffers,
one for data and one for tuple start positions. Reader process reads the CSV
data into the main buffer, finds tuple start locations in there and writes
those to the secondary buffer.Worker processes claim a chunk of tuple positions from the secondary buffer and
update their "keep this data around" position with the first position. Then
proceed to parse and insert the tuples, updating their position until they find
the end of the last tuple in the chunk.
+1. That sort of two-queue scheme is exactly how I sketched out a
multi-consumer queue for a hypothetical Parallel Scatter node. It
probably gets a bit trickier when the payload has to be broken up into
fragments to wrap around the "data" buffer N times.
On Tue, 18 Feb 2020 at 04:40, Thomas Munro <thomas.munro@gmail.com> wrote:
+1. That sort of two-queue scheme is exactly how I sketched out a
multi-consumer queue for a hypothetical Parallel Scatter node. It
probably gets a bit trickier when the payload has to be broken up into
fragments to wrap around the "data" buffer N times.
At least for copy it should be easy enough - it already has to handle reading
data block by block. If worker updates its position while doing so the reader
can wrap around the data buffer.
There will be no parallelism while one worker is buffering up a line larger
than the data buffer, but that doesn't seem like a major issue. Once the line is
buffered and begins inserting next worker can start buffering the next tuple.
Regards,
Ants Aasma
On Mon, Feb 17, 2020 at 8:34 PM Ants Aasma <ants@cybertec.at> wrote:
On Sat, 15 Feb 2020 at 14:32, Amit Kapila <amit.kapila16@gmail.com> wrote:
Good point and I agree with you that having a single process would
avoid any such stuff. However, I will think some more on it and if
you/anyone else gets some idea on how to deal with this in a
multi-worker system (where we can allow each worker to read and
process the chunk) then feel free to share your thoughts.I think having a single process handle splitting the input into tuples makes
most sense. It's possible to parse csv at multiple GB/s rates [1], finding
tuple boundaries is a subset of that task.My first thought for a design would be to have two shared memory ring buffers,
one for data and one for tuple start positions. Reader process reads the CSV
data into the main buffer, finds tuple start locations in there and writes
those to the secondary buffer.Worker processes claim a chunk of tuple positions from the secondary buffer and
update their "keep this data around" position with the first position. Then
proceed to parse and insert the tuples, updating their position until they find
the end of the last tuple in the chunk.
This is something similar to what I had also in mind for this idea. I
had thought of handing over complete chunk (64K or whatever we
decide). The one thing that slightly bothers me is that we will add
some additional overhead of copying to and from shared memory which
was earlier from local process memory. And, the tokenization (finding
line boundaries) would be serial. I think that tokenization should be
a small part of the overall work we do during the copy operation, but
will do some measurements to ascertain the same.
--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com
On Tue, Feb 18, 2020 at 7:28 AM Kyotaro Horiguchi
<horikyota.ntt@gmail.com> wrote:
At Mon, 17 Feb 2020 16:49:22 +0530, Amit Kapila <amit.kapila16@gmail.com> wrote in
On Sun, Feb 16, 2020 at 12:21 PM Andrew Dunstan
<andrew.dunstan@2ndquadrant.com> wrote:On 2/15/20 7:32 AM, Amit Kapila wrote:
On Sat, Feb 15, 2020 at 4:08 PM Alastair Turner <minion@decodable.me> wrot> > So why not just forbid parallel copy in CSV
mode, at least for now? I guess it depends on the actual use case. If we
expect to be parallel loading humungous CSVs then that won't fly.I am not sure about this part. However, I guess we should at the very
least have some extendable solution that can deal with csv, otherwise,
we might end up re-designing everything if someday we want to deal
with CSV. One naive idea is that in csv mode, we can set up the
things slightly differently like the worker, won't start processing
the chunk unless the previous chunk is completely parsed. So each
worker would first parse and tokenize the entire chunk and then start
writing it. So, this will make the reading/parsing part serialized,
but writes can still be parallel. Now, I don't know if it is a good
idea to process in a different way for csv mode.In an extreme case, if we didn't see a QUOTE in a chunk, we cannot
know the chunk is in a quoted section or not, until all the past
chunks are parsed. After all we are forced to parse fully
sequentially as far as we allow QUOTE.
Right, I think the benefits of this as compared to single reader idea
would be (a) we can save accessing shared memory for the most part of
the chunk (b) for non-csv mode, even the tokenization (finding line
boundaries) would also be parallel. OTOH, doing processing
differently for csv and non-csv mode might not be good.
On the other hand, if we allowed "COPY t FROM f WITH (FORMAT CSV,
QUOTE '')" in order to signal that there's no quoted section in the
file then all chunks would be fully concurrently parsable.
Yeah, if we can provide such an option, we can probably make parallel
csv processing equivalent to non-csv. However, users might not like
this as I think in some cases it won't be easier for them to tell
whether the file has quoted fields or not. I am not very sure of this
point.
--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com
At Tue, 18 Feb 2020 15:59:36 +0530, Amit Kapila <amit.kapila16@gmail.com> wrote in
On Tue, Feb 18, 2020 at 7:28 AM Kyotaro Horiguchi
<horikyota.ntt@gmail.com> wrote:In an extreme case, if we didn't see a QUOTE in a chunk, we cannot
know the chunk is in a quoted section or not, until all the past
chunks are parsed. After all we are forced to parse fully
sequentially as far as we allow QUOTE.Right, I think the benefits of this as compared to single reader idea
would be (a) we can save accessing shared memory for the most part of
the chunk (b) for non-csv mode, even the tokenization (finding line
boundaries) would also be parallel. OTOH, doing processing
differently for csv and non-csv mode might not be good.
Agreed. So I think it's a good point of compromize.
On the other hand, if we allowed "COPY t FROM f WITH (FORMAT CSV,
QUOTE '')" in order to signal that there's no quoted section in the
file then all chunks would be fully concurrently parsable.Yeah, if we can provide such an option, we can probably make parallel
csv processing equivalent to non-csv. However, users might not like
this as I think in some cases it won't be easier for them to tell
whether the file has quoted fields or not. I am not very sure of this
point.
I'm not sure how large portion of the usage contains quoted sections,
so I'm not sure how it is useful..
regards.
--
Kyotaro Horiguchi
NTT Open Source Software Center
On Tue, 18 Feb 2020 at 12:20, Amit Kapila <amit.kapila16@gmail.com> wrote:
This is something similar to what I had also in mind for this idea. I
had thought of handing over complete chunk (64K or whatever we
decide). The one thing that slightly bothers me is that we will add
some additional overhead of copying to and from shared memory which
was earlier from local process memory. And, the tokenization (finding
line boundaries) would be serial. I think that tokenization should be
a small part of the overall work we do during the copy operation, but
will do some measurements to ascertain the same.
I don't think any extra copying is needed. The reader can directly
fread()/pq_copymsgbytes() into shared memory, and the workers can run
CopyReadLineText() inner loop directly off of the buffer in shared memory.
For serial performance of tokenization into lines, I really think a SIMD
based approach will be fast enough for quite some time. I hacked up the code in
the simdcsv project to only tokenize on line endings and it was able to
tokenize a CSV file with short lines at 8+ GB/s. There are going to be many
other bottlenecks before this one starts limiting. Patch attached if you'd
like to try that out.
Regards,
Ants Aasma
Attachments:
simdcsv-find-only-lineendings.difftext/x-patch; charset=US-ASCII; name=simdcsv-find-only-lineendings.diffDownload+2-4
On Tue, Feb 18, 2020 at 5:59 PM Ants Aasma <ants@cybertec.at> wrote:
On Tue, 18 Feb 2020 at 12:20, Amit Kapila <amit.kapila16@gmail.com> wrote:
This is something similar to what I had also in mind for this idea. I
had thought of handing over complete chunk (64K or whatever we
decide). The one thing that slightly bothers me is that we will add
some additional overhead of copying to and from shared memory which
was earlier from local process memory. And, the tokenization (finding
line boundaries) would be serial. I think that tokenization should be
a small part of the overall work we do during the copy operation, but
will do some measurements to ascertain the same.I don't think any extra copying is needed.
I am talking about access to shared memory instead of the process
local memory. I understand that an extra copy won't be required.
The reader can directly
fread()/pq_copymsgbytes() into shared memory, and the workers can run
CopyReadLineText() inner loop directly off of the buffer in shared memory.
I am slightly confused here. AFAIU, the for(;;) loop in
CopyReadLineText is about finding the line endings which we thought
that the reader process will do.
--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com
On Sun, Feb 16, 2020 at 12:51 AM Andrew Dunstan <
andrew.dunstan@2ndquadrant.com> wrote:
IIRC, in_quote only matters here in CSV mode (because CSV fields can
have embedded newlines). So why not just forbid parallel copy in CSV
mode, at least for now? I guess it depends on the actual use case. If we
expect to be parallel loading humungous CSVs then that won't fly.
Loading large CSV files is pretty common here. I hope this can be
supported.
MIKE BLACKWELL
* <Mike.Blackwell@rrd.com>*