From a1056490595507cc51cf9f6c58308b970f8d1a93 Mon Sep 17 00:00:00 2001 From: Shinya Kato Date: Sun, 4 Jan 2026 15:55:23 +0900 Subject: [PATCH v1 2/2] Add LIMIT option to COPY FROM This commit adds a LIMIT option to COPY FROM so callers can cap the final number of rows loaded, after any WHERE clauses or ON_ERROR filtering have been applied. COPY TO could already restrict rows by using LIMIT in the query, but COPY FROM had no equivalent. Author: Shinya Kato Reviewed-by: Discussion: https://postgr.es/m/ --- doc/src/sgml/ref/copy.sgml | 16 +++++++++ src/backend/commands/copy.c | 18 ++++++++++ src/backend/commands/copyfrom.c | 26 ++++++++++++++ src/backend/commands/copyfromparse.c | 28 +++++++++++++++ src/bin/psql/tab-complete.in.c | 2 +- src/include/commands/copy.h | 1 + src/include/commands/copyfrom_internal.h | 1 + src/test/regress/expected/copy2.out | 43 +++++++++++++++++++++++ src/test/regress/sql/copy2.sql | 44 ++++++++++++++++++++++++ 9 files changed, 178 insertions(+), 1 deletion(-) diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml index 53b0ea8f573..d521899b56e 100644 --- a/doc/src/sgml/ref/copy.sgml +++ b/doc/src/sgml/ref/copy.sgml @@ -38,6 +38,7 @@ COPY { table_name [ ( null_string' DEFAULT 'default_string' HEADER [ boolean | integer | MATCH ] + LIMIT rowcount QUOTE 'quote_character' ESCAPE 'escape_character' FORCE_QUOTE { ( column_name [, ...] ) | * } @@ -335,6 +336,21 @@ COPY { table_name [ ( + + LIMIT + + + Specifies the maximum number of rows that COPY FROM + inserts. Rows filtered out by the WHERE clause or + skipped because of ON_ERROR=ignore + do not count toward the limit. When the limit is reached, any remaining + input is ignored; if the source is STDIN, the server + will read and discard the rest of the stream to finish the copy + protocol. This option is allowed only in COPY FROM. + + + + QUOTE diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 5f4bf570fdf..599b7798424 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -568,6 +568,7 @@ ProcessCopyOptions(ParseState *pstate, bool format_specified = false; bool freeze_specified = false; bool header_specified = false; + bool limit_specified = false; bool on_error_specified = false; bool log_verbosity_specified = false; bool reject_limit_specified = false; @@ -635,6 +636,13 @@ ProcessCopyOptions(ParseState *pstate, header_specified = true; opts_out->header_line = defGetCopyHeaderOption(defel, is_from); } + else if (strcmp(defel->defname, "limit") == 0) + { + if (limit_specified) + errorConflictingDefElem(defel, pstate); + limit_specified = true; + opts_out->limit = defGetCopyPositiveInt64Option(defel, "LIMIT"); + } else if (strcmp(defel->defname, "quote") == 0) { if (opts_out->quote) @@ -843,6 +851,16 @@ ProcessCopyOptions(ParseState *pstate, /*- translator: %s is the name of a COPY option, e.g. ON_ERROR */ errmsg("cannot specify %s in BINARY mode", "HEADER"))); + /* Check limit */ + if (opts_out->limit && !is_from) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + /*- translator: first %s is the name of a COPY option, e.g. ON_ERROR, + second %s is a COPY with direction, e.g. COPY TO */ + errmsg("COPY %s cannot be used with %s", "LIMIT", "COPY TO"))); + } + /* Check quote */ if (!opts_out->csv_mode && opts_out->quote != NULL) ereport(ERROR, diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index 25ee20b23db..021b42c72c4 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -1122,6 +1122,32 @@ CopyFrom(CopyFromState cstate) */ ResetPerTupleExprContext(estate); + /* + * Stop early when LIMIT would be exceeded. In multi-insert mode, + * include already buffered tuples so we don't overshoot. + */ + if (cstate->opts.limit > 0) + { + int64 pending_tuples = 0; + + if (insertMethod != CIM_SINGLE) + pending_tuples = multiInsertInfo.bufferedTuples; + + if (processed + pending_tuples >= cstate->opts.limit) + { + /* + * COPY FROM STDIN uses the FE/BE COPY IN protocol. To finish + * cleanly after reaching LIMIT, we must consume incoming + * CopyData up to CopyDone/CopyFail so that protocol state + * stays synchronized. + */ + if (cstate->copy_src == COPY_FRONTEND) + CopyFromDrainInput(cstate); + + break; + } + } + /* select slot to (initially) load row into */ if (insertMethod == CIM_SINGLE || proute) { diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index 5868a7fa11f..31df4131114 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -2066,3 +2066,31 @@ CopyReadBinaryAttribute(CopyFromState cstate, FmgrInfo *flinfo, *isnull = false; return result; } + +/* + * Discard remaining COPY FROM STDIN data after reaching a row limit. + */ +void +CopyFromDrainInput(CopyFromState cstate) +{ + Assert(cstate->copy_src == COPY_FRONTEND); + + /* Read to EOF on the raw input */ + while (!cstate->raw_reached_eof) + { + int inbytes; + + inbytes = CopyGetData(cstate, cstate->raw_buf, 1, RAW_BUF_SIZE); + if (inbytes <= 0) + break; + } + + /* Reset buffer pointers */ + cstate->raw_buf_index = 0; + cstate->raw_buf_len = 0; + if (cstate->input_buf != NULL) + { + cstate->input_buf_index = 0; + cstate->input_buf_len = 0; + } +} diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c index 8b91bc00062..f34b7e7ad7d 100644 --- a/src/bin/psql/tab-complete.in.c +++ b/src/bin/psql/tab-complete.in.c @@ -1228,7 +1228,7 @@ Alter_routine_options, "CALLED ON NULL INPUT", "RETURNS NULL ON NULL INPUT", \ /* COPY FROM options */ #define Copy_from_options \ Copy_common_options, "DEFAULT", "FORCE_NOT_NULL", "FORCE_NULL", "FREEZE", \ -"LOG_VERBOSITY", "ON_ERROR", "REJECT_LIMIT" +"LIMIT", "LOG_VERBOSITY", "ON_ERROR", "REJECT_LIMIT" /* COPY TO options */ #define Copy_to_options \ diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index 43c2580539f..4c827b0665b 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -63,6 +63,7 @@ typedef struct CopyFormatOptions bool csv_mode; /* Comma Separated Value format? */ int header_line; /* number of lines to skip or COPY_HEADER_XXX * value (see the above) */ + int64 limit; /* maximum rows to load */ char *null_print; /* NULL marker string (server encoding!) */ int null_print_len; /* length of same */ char *null_print_client; /* same converted to file encoding */ diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h index 822ef33cf69..9669b6bbcdf 100644 --- a/src/include/commands/copyfrom_internal.h +++ b/src/include/commands/copyfrom_internal.h @@ -193,5 +193,6 @@ extern bool CopyFromCSVOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls); extern bool CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls); +extern void CopyFromDrainInput(CopyFromState cstate); #endif /* COPYFROM_INTERNAL_H */ diff --git a/src/test/regress/expected/copy2.out b/src/test/regress/expected/copy2.out index 3145b314e48..53ee086dff5 100644 --- a/src/test/regress/expected/copy2.out +++ b/src/test/regress/expected/copy2.out @@ -85,6 +85,10 @@ COPY x from stdin (log_verbosity default, log_verbosity verbose); ERROR: conflicting or redundant options LINE 1: COPY x from stdin (log_verbosity default, log_verbosity verb... ^ +COPY x from stdin (limit 10, limit 10); +ERROR: conflicting or redundant options +LINE 1: COPY x from stdin (limit 10, limit 10); + ^ -- incorrect options COPY x from stdin (format BINARY, delimiter ','); ERROR: cannot specify DELIMITER in BINARY mode @@ -144,6 +148,12 @@ COPY x from stdin with (header '2.5'); ERROR: header requires a Boolean value, an integer value greater than or equal to zero, or the string "match" COPY x to stdout with (header '2'); ERROR: cannot use multi-line header in COPY TO +COPY x from stdin (limit 0); +ERROR: LIMIT (0) must be greater than zero +COPY x from stdin (limit 1.5); +ERROR: invalid input syntax for type bigint: "1.5" +COPY x to stdout (limit 1); +ERROR: COPY LIMIT cannot be used with COPY TO -- too many columns in column list: should fail COPY x (a, b, c, d, e, d, c) from stdin; ERROR: column "d" specified more than once @@ -192,6 +202,39 @@ COPY x from stdin WHERE a = row_number() over(b); ERROR: window functions are not allowed in COPY FROM WHERE conditions LINE 1: COPY x from stdin WHERE a = row_number() over(b); ^ +-- tests for LIMIT option +CREATE TEMP TABLE copy_limit_basic (a int); +COPY copy_limit_basic FROM stdin (limit 2); +SELECT * FROM copy_limit_basic ORDER BY a; + a +--- + 1 + 2 +(2 rows) + +-- test LIMIT with WHERE option +CREATE TEMP TABLE copy_limit (a int, b text); +COPY copy_limit FROM stdin (limit 2); +COPY copy_limit FROM stdin (limit 1) WHERE a > 5; +SELECT * FROM copy_limit ORDER BY a; + a | b +---+----- + 1 | one + 2 | two + 6 | six +(3 rows) + +-- test LIMIT with ON_ERROR option +CREATE TEMP TABLE copy_limit_err (a int); +COPY copy_limit_err FROM stdin (on_error ignore, limit 2); +NOTICE: 1 row was skipped due to data type incompatibility +SELECT * FROM copy_limit_err ORDER BY a; + a +--- + 1 + 2 +(2 rows) + -- check results of copy in SELECT * FROM x; a | b | c | d | e diff --git a/src/test/regress/sql/copy2.sql b/src/test/regress/sql/copy2.sql index 66435167500..f56a111052c 100644 --- a/src/test/regress/sql/copy2.sql +++ b/src/test/regress/sql/copy2.sql @@ -68,6 +68,7 @@ COPY x from stdin (convert_selectively (a), convert_selectively (b)); COPY x from stdin (encoding 'sql_ascii', encoding 'sql_ascii'); COPY x from stdin (on_error ignore, on_error ignore); COPY x from stdin (log_verbosity default, log_verbosity verbose); +COPY x from stdin (limit 10, limit 10); -- incorrect options COPY x from stdin (format BINARY, delimiter ','); @@ -96,6 +97,9 @@ COPY x to stdout with (header 2); COPY x to stdout with (header '-1'); COPY x from stdin with (header '2.5'); COPY x to stdout with (header '2'); +COPY x from stdin (limit 0); +COPY x from stdin (limit 1.5); +COPY x to stdout (limit 1); -- too many columns in column list: should fail COPY x (a, b, c, d, e, d, c) from stdin; @@ -164,6 +168,46 @@ COPY x from stdin WHERE a IN (generate_series(1,5)); COPY x from stdin WHERE a = row_number() over(b); +-- tests for LIMIT option +CREATE TEMP TABLE copy_limit_basic (a int); + +COPY copy_limit_basic FROM stdin (limit 2); +1 +2 +3 +\. + +SELECT * FROM copy_limit_basic ORDER BY a; + +-- test LIMIT with WHERE option +CREATE TEMP TABLE copy_limit (a int, b text); + +COPY copy_limit FROM stdin (limit 2); +1 one +2 two +3 three +\. + +COPY copy_limit FROM stdin (limit 1) WHERE a > 5; +5 five +6 six +7 seven +\. + +SELECT * FROM copy_limit ORDER BY a; + +-- test LIMIT with ON_ERROR option +CREATE TEMP TABLE copy_limit_err (a int); + +COPY copy_limit_err FROM stdin (on_error ignore, limit 2); +1 +bad +2 +3 +\. + +SELECT * FROM copy_limit_err ORDER BY a; + -- check results of copy in SELECT * FROM x; -- 2.47.3