From dd8f20716524982e3687ea3e6a44a11e5f0b886c Mon Sep 17 00:00:00 2001
From: Atsushi Torikoshi <torikoshia@oss.nttdata.com>
Date: Wed, 25 Sep 2024 21:29:48 +0900
Subject: [PATCH v5 2/3] Add on_error and log_verbosity options to file_fdw

9e2d870, b725b7e and f5a2278 introduced on_error and log_verbosity to COPY.
Since these options seem useful to file_fdw when accessing dirty data,
this patch adds them to file_fdw.
---
 contrib/file_fdw/expected/file_fdw.out | 18 +++++++
 contrib/file_fdw/file_fdw.c            | 74 +++++++++++++++++++++++---
 contrib/file_fdw/sql/file_fdw.sql      |  6 +++
 doc/src/sgml/file-fdw.sgml             | 23 ++++++++
 4 files changed, 115 insertions(+), 6 deletions(-)

diff --git a/contrib/file_fdw/expected/file_fdw.out b/contrib/file_fdw/expected/file_fdw.out
index 86c148a86b..2ea0deeb10 100644
--- a/contrib/file_fdw/expected/file_fdw.out
+++ b/contrib/file_fdw/expected/file_fdw.out
@@ -206,6 +206,24 @@ SELECT * FROM agg_csv c JOIN agg_text t ON (t.a = c.a) ORDER BY c.a;
 SELECT * FROM agg_bad;               -- ERROR
 ERROR:  invalid input syntax for type real: "aaa"
 CONTEXT:  COPY agg_bad, line 3, column b: "aaa"
+-- on_error and log_verbosity tests
+ALTER FOREIGN TABLE agg_bad OPTIONS (ADD on_error 'ignore');
+SELECT * FROM agg_bad;
+NOTICE:  1 row was skipped due to data type incompatibility
+  a  |   b    
+-----+--------
+ 100 | 99.097
+  42 | 324.78
+(2 rows)
+
+ALTER FOREIGN TABLE agg_bad OPTIONS (ADD log_verbosity 'silent');
+SELECT * FROM agg_bad;
+  a  |   b    
+-----+--------
+ 100 | 99.097
+  42 | 324.78
+(2 rows)
+
 -- misc query tests
 \t on
 SELECT explain_filter('EXPLAIN (VERBOSE, COSTS FALSE) SELECT * FROM agg_csv');
diff --git a/contrib/file_fdw/file_fdw.c b/contrib/file_fdw/file_fdw.c
index d16821f8e1..543b1ca4f9 100644
--- a/contrib/file_fdw/file_fdw.c
+++ b/contrib/file_fdw/file_fdw.c
@@ -22,8 +22,10 @@
 #include "catalog/pg_authid.h"
 #include "catalog/pg_foreign_table.h"
 #include "commands/copy.h"
+#include "commands/copyfrom_internal.h"
 #include "commands/defrem.h"
 #include "commands/explain.h"
+#include "commands/progress.h"
 #include "commands/vacuum.h"
 #include "foreign/fdwapi.h"
 #include "foreign/foreign.h"
@@ -34,6 +36,7 @@
 #include "optimizer/planmain.h"
 #include "optimizer/restrictinfo.h"
 #include "utils/acl.h"
+#include "utils/backend_progress.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
 #include "utils/sampling.h"
@@ -74,6 +77,8 @@ static const struct FileFdwOption valid_options[] = {
 	{"null", ForeignTableRelationId},
 	{"default", ForeignTableRelationId},
 	{"encoding", ForeignTableRelationId},
+	{"on_error", ForeignTableRelationId},
+	{"log_verbosity", ForeignTableRelationId},
 	{"force_not_null", AttributeRelationId},
 	{"force_null", AttributeRelationId},
 
@@ -726,11 +731,12 @@ fileIterateForeignScan(ForeignScanState *node)
 	MemoryContext oldcontext;
 	TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
 	bool		found;
+	CopyFromState cstate = festate->cstate;
 	ErrorContextCallback errcallback;
 
 	/* Set up callback to identify error line number. */
 	errcallback.callback = CopyFromErrorCallback;
-	errcallback.arg = (void *) festate->cstate;
+	errcallback.arg = (void *) cstate;
 	errcallback.previous = error_context_stack;
 	error_context_stack = &errcallback;
 
@@ -751,10 +757,36 @@ fileIterateForeignScan(ForeignScanState *node)
 	 * switch in case we are doing that.
 	 */
 	oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-	found = NextCopyFrom(festate->cstate, econtext,
-						 slot->tts_values, slot->tts_isnull);
-	if (found)
+
+	for(;;)
+	{
+		found = NextCopyFrom(cstate, econtext,
+					 slot->tts_values, slot->tts_isnull);
+		if (!found)
+			break;
+
+		if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE &&
+			cstate->escontext->error_occurred)
+		{
+			/*
+			 * Soft error occurred, skip this tuple and just make
+			 * ErrorSaveContext ready for the next NextCopyFrom. Since we don't
+			 * set details_wanted and error_data is not to be filled, just
+			 * resetting error_occurred is enough.
+			 */
+			cstate->escontext->error_occurred = false;
+
+			/* Report that this tuple was skipped by the ON_ERROR clause */
+			pgstat_progress_update_param(PROGRESS_COPY_TUPLES_SKIPPED,
+										cstate->num_errors);
+
+			/* Repeat NextCopyFrom() until no soft error occurs */
+			continue;
+		}
+
 		ExecStoreVirtualTuple(slot);
+		break;
+	}
 
 	/* Switch back to original memory context */
 	MemoryContextSwitchTo(oldcontext);
@@ -796,8 +828,19 @@ fileEndForeignScan(ForeignScanState *node)
 	FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
 
 	/* if festate is NULL, we are in EXPLAIN; nothing to do */
-	if (festate)
-		EndCopyFrom(festate->cstate);
+	if (!festate)
+		return;
+
+	if (festate->cstate->opts.on_error != COPY_ON_ERROR_STOP &&
+		festate->cstate->num_errors > 0 &&
+		festate->cstate->opts.log_verbosity >= COPY_LOG_VERBOSITY_DEFAULT)
+		ereport(NOTICE,
+				errmsg_plural("%llu row was skipped due to data type incompatibility",
+							  "%llu rows were skipped due to data type incompatibility",
+							  (unsigned long long) festate->cstate->num_errors,
+							  (unsigned long long) festate->cstate->num_errors));
+
+	EndCopyFrom(festate->cstate);
 }
 
 /*
@@ -1191,6 +1234,25 @@ file_acquire_sample_rows(Relation onerel, int elevel,
 		if (!found)
 			break;
 
+		if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE &&
+			cstate->escontext->error_occurred)
+		{
+			/*
+			 * Soft error occurred, skip this tuple and just make
+			 * ErrorSaveContext ready for the next NextCopyFrom. Since we don't
+			 * set details_wanted and error_data is not to be filled, just
+			 * resetting error_occurred is enough.
+			 */
+			cstate->escontext->error_occurred = false;
+
+			/* Report that this tuple was skipped by the ON_ERROR clause */
+			pgstat_progress_update_param(PROGRESS_COPY_TUPLES_SKIPPED,
+										cstate->num_errors);
+
+			/* Repeat NextCopyFrom() until no soft error occurs */
+			continue;
+		}
+
 		/*
 		 * The first targrows sample rows are simply copied into the
 		 * reservoir.  Then we start replacing tuples in the sample until we
diff --git a/contrib/file_fdw/sql/file_fdw.sql b/contrib/file_fdw/sql/file_fdw.sql
index f0548e14e1..a417067e54 100644
--- a/contrib/file_fdw/sql/file_fdw.sql
+++ b/contrib/file_fdw/sql/file_fdw.sql
@@ -150,6 +150,12 @@ SELECT * FROM agg_csv c JOIN agg_text t ON (t.a = c.a) ORDER BY c.a;
 -- error context report tests
 SELECT * FROM agg_bad;               -- ERROR
 
+-- on_error and log_verbosity tests
+ALTER FOREIGN TABLE agg_bad OPTIONS (ADD on_error 'ignore');
+SELECT * FROM agg_bad;
+ALTER FOREIGN TABLE agg_bad OPTIONS (ADD log_verbosity 'silent');
+SELECT * FROM agg_bad;
+
 -- misc query tests
 \t on
 SELECT explain_filter('EXPLAIN (VERBOSE, COSTS FALSE) SELECT * FROM agg_csv');
diff --git a/doc/src/sgml/file-fdw.sgml b/doc/src/sgml/file-fdw.sgml
index f2f2af9a59..bb3579b077 100644
--- a/doc/src/sgml/file-fdw.sgml
+++ b/doc/src/sgml/file-fdw.sgml
@@ -126,6 +126,29 @@
    </listitem>
   </varlistentry>
 
+  <varlistentry>
+   <term><literal>on_error</literal></term>
+
+   <listitem>
+    <para>
+     Specifies how to behave when encountering an error converting a column's
+     input value into its data type,
+     the same as <command>COPY</command>'s <literal>ON_ERROR</literal> option.
+    </para>
+   </listitem>
+  </varlistentry>
+
+  <varlistentry>
+   <term><literal>log_verbosity</literal></term>
+
+   <listitem>
+    <para>
+     Specifies the amount of messages emitted by <literal>file_fdw</literal>,
+     the same as <command>COPY</command>'s <literal>LOG_VERBOSITY</literal> option.
+    </para>
+   </listitem>
+  </varlistentry>
+
  </variablelist>
 
  <para>
-- 
2.39.2

