From 87894c42dfa222a273170f4783ef8f3fae9537f5 Mon Sep 17 00:00:00 2001
From: Atsushi Torikoshi <torikoshia@oss.nttdata.com>
Date: Mon, 22 Jul 2024 21:23:42 +0900
Subject: [PATCH v3] Add new COPY option REJECT_LIMIT ratio

---
 doc/src/sgml/ref/copy.sgml          | 11 ++++++++++-
 src/backend/commands/copy.c         | 18 +++++++++++++-----
 src/backend/commands/copyfrom.c     | 10 ++++++++++
 src/include/commands/copy.h         |  1 +
 src/test/regress/expected/copy2.out |  7 +++++++
 src/test/regress/sql/copy2.sql      | 19 +++++++++++++++++++
 6 files changed, 60 insertions(+), 6 deletions(-)

diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml
index beb455372d..e2e0cc9665 100644
--- a/doc/src/sgml/ref/copy.sgml
+++ b/doc/src/sgml/ref/copy.sgml
@@ -44,7 +44,7 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
     FORCE_NOT_NULL { ( <replaceable class="parameter">column_name</replaceable> [, ...] ) | * }
     FORCE_NULL { ( <replaceable class="parameter">column_name</replaceable> [, ...] ) | * }
     ON_ERROR <replaceable class="parameter">error_action</replaceable>
-    REJECT_LIMIT { <replaceable class="parameter">integer</replaceable> | INFINITY }
+    REJECT_LIMIT { <replaceable class="parameter">integer</replaceable> | <replaceable class="parameter">floating point</replaceable> | INFINITY }
     ENCODING '<replaceable class="parameter">encoding_name</replaceable>'
     LOG_VERBOSITY <replaceable class="parameter">verbosity</replaceable>
 </synopsis>
@@ -426,6 +426,15 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
       This option must be used with <literal>ON_ERROR</literal> to be set to
       other than <literal>stop</literal>.
      </para>
+     <para>
+      When a positive floating point value is specified, <command>COPY</command>
+      limits the maximum ratio of errors while converting a column's input
+      value into its data type.
+      If input data caused an error ratio greater than the specified value,
+      entire <command>COPY</command> fails.
+      Otherwise, <command>COPY</command> discards only the input rows where
+      errors occured and copies all the other rows.
+     </para>
      <para>
       When specified <literal>INFINITY</literal>, <command>COPY</command> ignores all
       the errors. This is a synonym for <literal>ON_ERROR</literal> <literal>ignore</literal>.
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 8fbab9336f..b3cc63d44f 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -422,7 +422,8 @@ static CopyRejectLimits
 defGetCopyRejectLimitOptions(DefElem *def)
 {
 	CopyRejectLimits	limits;
-	int64					num_err;
+	uint64				num_err = 0;
+	double				ratio_err = 0;
 
 	switch(nodeTag(def->arg))
 	{
@@ -433,14 +434,20 @@ defGetCopyRejectLimitOptions(DefElem *def)
 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 						 errmsg("number for REJECT_LIMIT must be greater than zero")));
 			break;
+		case T_Float:
+			ratio_err = defGetNumeric(def);
+			if (ratio_err <= 0 || ratio_err >= 1)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("float for REJECT_LIMIT must be greater than zero and smaller than 1")));
+			break;
 		case T_String:
-			if (pg_strcasecmp(defGetString(def), "INFINITY") == 0)
-				/* when set to 0, it is treated as no limit */
-				num_err = 0;
-			else
+			if (pg_strcasecmp(defGetString(def), "INFINITY") != 0)
 				ereport(ERROR,
 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 						 errmsg("string for REJECT_LIMIT must be 'INFINITY'")));
+
+			/* when set to 0, it is treated as no limit */
 			break;
 		default:
 			ereport(ERROR,
@@ -448,6 +455,7 @@ defGetCopyRejectLimitOptions(DefElem *def)
 					 errmsg("value for REJECT_LIMIT must be positive integer or 'INFINITY'")));
 	}
 	limits.num_err = num_err;
+	limits.ratio_err = ratio_err;
 
 	return limits;
 }
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 65d950ef07..69485a25a1 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -646,6 +646,7 @@ CopyFrom(CopyFromState cstate)
 	int64		processed = 0;
 	int64		excluded = 0;
 	int64		skipped = 0;
+	double		ratio_err = 0;
 	bool		has_before_insert_row_trig;
 	bool		has_instead_insert_row_trig;
 	bool		leafpart_use_multi_insert = false;
@@ -1310,6 +1311,15 @@ CopyFrom(CopyFromState cstate)
 			CopyMultiInsertInfoFlush(&multiInsertInfo, NULL, &processed);
 	}
 
+	ratio_err = (double) skipped / (processed + skipped);
+	if (cstate->opts.reject_limits.ratio_err &&
+		cstate->opts.reject_limits.ratio_err < ratio_err)
+		ereport(ERROR,
+			(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+				errmsg("exceeded the ratio specified by REJECT_LIMIT \"%f\", the error ratio was: \"%f\"",
+				cstate->opts.reject_limits.ratio_err,
+				ratio_err)));
+
 	/* Done, clean up */
 	error_context_stack = errcallback.previous;
 
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index 86ba1cbe16..7ea88da894 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -56,6 +56,7 @@ typedef enum CopyLogVerbosityChoice
 typedef struct CopyRejectLimits
 {
 	int64		num_err;	/* maximum tolerable number of errors */
+	double		ratio_err;	/* maximum tolerable ratio of errors */
 } CopyRejectLimits;
 
 /*
diff --git a/src/test/regress/expected/copy2.out b/src/test/regress/expected/copy2.out
index 8ce12f5b6f..173bd7170e 100644
--- a/src/test/regress/expected/copy2.out
+++ b/src/test/regress/expected/copy2.out
@@ -122,6 +122,8 @@ LINE 1: COPY x to stdout (log_verbosity unsupported);
                           ^
 COPY x from stdin with (on_error ignore, reject_limit 0);
 ERROR:  number for REJECT_LIMIT must be greater than zero
+COPY x from stdin with (on_error ignore, reject_limit 1.1);
+ERROR:  float for REJECT_LIMIT must be greater than zero and smaller than 1
 -- too many columns in column list: should fail
 COPY x (a, b, c, d, e, d, c) from stdin;
 ERROR:  column "d" specified more than once
@@ -803,6 +805,11 @@ COPY check_ign_err FROM STDIN WITH (on_error ignore, reject_limit 4);
 NOTICE:  4 rows were skipped due to data type incompatibility
 COPY check_ign_err FROM STDIN WITH (on_error ignore, reject_limit 'INFINITY');
 NOTICE:  4 rows were skipped due to data type incompatibility
+COPY check_ign_err FROM STDIN WITH (on_error ignore, reject_limit 0.6);
+ERROR:  exceeded the ratio specified by REJECT_LIMIT "0.600000", the error ratio was: "0.666667"
+CONTEXT:  COPY check_ign_err, line 7: ""
+COPY check_ign_err FROM STDIN WITH (on_error ignore, reject_limit 0.7);
+NOTICE:  4 rows were skipped due to data type incompatibility
 -- clean up
 DROP TABLE forcetest;
 DROP TABLE vistest;
diff --git a/src/test/regress/sql/copy2.sql b/src/test/regress/sql/copy2.sql
index 8cc82990f4..1b90ba0b35 100644
--- a/src/test/regress/sql/copy2.sql
+++ b/src/test/regress/sql/copy2.sql
@@ -84,6 +84,7 @@ COPY x to stdin (format CSV, force_null(a));
 COPY x to stdin (format BINARY, on_error unsupported);
 COPY x to stdout (log_verbosity unsupported);
 COPY x from stdin with (on_error ignore, reject_limit 0);
+COPY x from stdin with (on_error ignore, reject_limit 1.1);
 
 -- too many columns in column list: should fail
 COPY x (a, b, c, d, e, d, c) from stdin;
@@ -587,6 +588,24 @@ a	{7}	7
 10	{10}	10
 \.
 
+COPY check_ign_err FROM STDIN WITH (on_error ignore, reject_limit 0.6);
+6	{6}	6
+a	{7}	7
+8	{8}	8888888888
+9	{a, 9}	9
+
+10	{10}	10
+\.
+
+COPY check_ign_err FROM STDIN WITH (on_error ignore, reject_limit 0.7);
+6	{6}	6
+a	{7}	7
+8	{8}	8888888888
+9	{a, 9}	9
+
+10	{10}	10
+\.
+
 -- clean up
 DROP TABLE forcetest;
 DROP TABLE vistest;
-- 
2.39.2

