From d657f080e9bdda296010b74d9f0c4964ccbe13e2 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Thu, 28 Jan 2021 18:42:25 +0200
Subject: [PATCH v2 2/2] Do COPY FROM encoding conversion/verification in
 larger chunks.

NOTE: This changes behavior in one corner-case: if client and server
encodings are the same single-byte encoding (e.g. latin1), previously the
input would not be checked for zero bytes ('\0'). Any fields containing
zero bytes would be truncated at the zero. But if encoding conversion was
needed, the conversion routine would throw an error on the zero. After
this commit, the input is always checked for zeros.
---
 src/backend/commands/copyfrom.c          |  41 +++--
 src/backend/commands/copyfromparse.c     | 220 +++++++++++++++++------
 src/backend/utils/mb/mbutils.c           |  55 ++++++
 src/include/commands/copyfrom_internal.h |  29 ++-
 src/include/mb/pg_wchar.h                |   6 +
 5 files changed, 270 insertions(+), 81 deletions(-)

diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index c39cc736ed2..d34b034be2e 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -23,6 +23,7 @@
 #include "access/tableam.h"
 #include "access/xact.h"
 #include "access/xlog.h"
+#include "catalog/namespace.h"
 #include "commands/copy.h"
 #include "commands/copyfrom_internal.h"
 #include "commands/progress.h"
@@ -149,15 +150,9 @@ CopyFromErrorCallback(void *arg)
 			/*
 			 * Error is relevant to a particular line.
 			 *
-			 * If line_buf still contains the correct line, and it's already
-			 * transcoded, print it. If it's still in a foreign encoding, it's
-			 * quite likely that the error is precisely a failure to do
-			 * encoding conversion (ie, bad data). We dare not try to convert
-			 * it, and at present there's no way to regurgitate it without
-			 * conversion. So we have to punt and just report the line number.
+			 * If line_buf still contains the correct line, print it.
 			 */
-			if (cstate->line_buf_valid &&
-				(cstate->line_buf_converted || !cstate->need_transcoding))
+			if (cstate->line_buf_valid)
 			{
 				char	   *lineval;
 
@@ -1305,15 +1300,22 @@ BeginCopyFrom(ParseState *pstate,
 		cstate->file_encoding = cstate->opts.file_encoding;
 
 	/*
-	 * Set up encoding conversion info.  Even if the file and server encodings
-	 * are the same, we must apply pg_any_to_server() to validate data in
-	 * multibyte encodings.
+	 * Set up encoding conversion info.  If the file and server encodings are
+	 * the same, no conversion is required by we must still validate that the
+	 * data is valid for the encoding.
 	 */
-	cstate->need_transcoding =
-		(cstate->file_encoding != GetDatabaseEncoding() ||
-		 pg_database_encoding_max_length() > 1);
-	/* See Multibyte encoding comment above */
-	cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding);
+	if (cstate->file_encoding == GetDatabaseEncoding() ||
+		cstate->file_encoding == PG_SQL_ASCII ||
+		GetDatabaseEncoding() == PG_SQL_ASCII)
+	{
+		cstate->need_transcoding = false;
+	}
+	else
+	{
+		cstate->need_transcoding = true;
+		cstate->conversion_proc = FindDefaultConversionProc(cstate->file_encoding,
+															GetDatabaseEncoding());
+	}
 
 	cstate->copy_src = COPY_FILE;	/* default */
 
@@ -1342,7 +1344,12 @@ BeginCopyFrom(ParseState *pstate,
 	if (!cstate->opts.binary)
 	{
 		initStringInfo(&cstate->line_buf);
-		cstate->line_buf_converted = false;
+
+		if (cstate->need_transcoding)
+		{
+			cstate->conversion_buf = palloc(CONVERSION_BUF_SIZE + 1);
+			cstate->conversion_buf_index = cstate->conversion_buf_len = 0;
+		}
 	}
 
 	/* Assign range table, we'll need it in CopyFrom. */
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index 4c74067f849..60dfebb0bdb 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -118,7 +118,8 @@ static int	CopyGetData(CopyFromState cstate, void *databuf,
 						int minread, int maxread);
 static inline bool CopyGetInt32(CopyFromState cstate, int32 *val);
 static inline bool CopyGetInt16(CopyFromState cstate, int16 *val);
-static bool CopyLoadRawBuf(CopyFromState cstate);
+static bool CopyLoadRawBufText(CopyFromState cstate);
+static bool CopyLoadRawBufBinary(CopyFromState cstate);
 static int	CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes);
 
 void
@@ -359,6 +360,65 @@ CopyGetInt16(CopyFromState cstate, int16 *val)
 	return true;
 }
 
+/*
+ * Convert input data from 'conversion_buf', writing it into
+ * 'raw_buf'.
+ *
+ * 'conversion_buf' mustn't be empty.
+ */
+static void
+CopyConvertBuf(CopyFromState cstate)
+{
+	int			convertedbytes;
+	int			srclen;
+	int			dstlen;
+
+	Assert(cstate->raw_buf_index == 0);
+
+	srclen = cstate->conversion_buf_len - cstate->conversion_buf_index;
+	dstlen = RAW_BUF_SIZE - cstate->raw_buf_len + 1;
+
+	/*
+	 * Do the conversion. This might stop short, if there is an invalid byte
+	 * sequence in the input. We'll convert as much as we can in that case.
+	 *
+	 * Note: Even if we hit an invalid byte sequence, we don't report the error
+	 * until all the valid bytes have been consumed. The input might contain
+	 * an end-of-input marker (\.), and we don't want to report an error if
+	 * the invalid byte sequence is after the end-of-input marker. We might
+	 * still convert extra data after the end-of-input marker if it's valid
+	 * for the encoding, but that's harmless.
+	 */
+	convertedbytes = pg_do_encoding_conversion_buf(cstate->conversion_proc,
+												   cstate->file_encoding,
+												   GetDatabaseEncoding(),
+												   (unsigned char *) cstate->conversion_buf + cstate->conversion_buf_index,
+												   srclen,
+												   (unsigned char *) cstate->raw_buf + cstate->raw_buf_len,
+												   dstlen,
+												   true);
+	if (convertedbytes == 0)
+	{
+		/*
+		 * No more valid input in the buffer, and we have hit an invalid byte sequence.
+		 * Let the conversion function throw the error.
+		 */
+		convertedbytes = pg_do_encoding_conversion_buf(cstate->conversion_proc,
+													   cstate->file_encoding,
+													   GetDatabaseEncoding(),
+													   (unsigned char *) cstate->conversion_buf + cstate->conversion_buf_index,
+													   srclen,
+													   (unsigned char *) cstate->raw_buf + cstate->raw_buf_len,
+													   dstlen,
+													   false);
+		/* pg_do_encoding_conversion_buf should've reported the error */
+		Assert(convertedbytes == 0);
+		elog(ERROR, "conversion error");
+	}
+	cstate->conversion_buf_index += convertedbytes;
+	cstate->raw_buf_len += strlen(cstate->raw_buf + cstate->raw_buf_len);
+	cstate->valid_raw_buf_len = cstate->raw_buf_len;
+}
 
 /*
  * CopyLoadRawBuf loads some more data into raw_buf
@@ -370,7 +430,96 @@ CopyGetInt16(CopyFromState cstate, int16 *val)
  * when a multibyte character crosses a bufferload boundary.
  */
 static bool
-CopyLoadRawBuf(CopyFromState cstate)
+CopyLoadRawBufText(CopyFromState cstate)
+{
+	int			nbytes = RAW_BUF_BYTES(cstate);
+	int			inbytes;
+
+	/* Copy down the unprocessed data if any. */
+	if (nbytes > 0)
+	{
+		memmove(cstate->raw_buf, cstate->raw_buf + cstate->raw_buf_index,
+				nbytes);
+	}
+	cstate->raw_buf_index = 0;
+	cstate->raw_buf_len = nbytes;
+
+	if (cstate->need_transcoding)
+	{
+		for (;;)
+		{
+			/* If we still have a good amount of unconverted data left, convert it. */
+			nbytes = cstate->conversion_buf_len - cstate->conversion_buf_index;
+			if (nbytes >= MAX_CONVERSION_GROWTH)
+			{
+				CopyConvertBuf(cstate);
+				return true;
+			}
+
+			/* Load more raw bytes to the conversion buffer */
+			if (nbytes > 0 && cstate->conversion_buf_index > 0)
+			{
+				memmove(cstate->conversion_buf, cstate->conversion_buf + cstate->conversion_buf_index,
+						nbytes);
+			}
+			cstate->conversion_buf_index = 0;
+			cstate->conversion_buf_len = nbytes;
+			inbytes = CopyGetData(cstate, cstate->conversion_buf + cstate->conversion_buf_len,
+								  1, CONVERSION_BUF_SIZE - cstate->conversion_buf_len);
+			cstate->conversion_buf_len += inbytes;
+
+			cstate->bytes_processed += inbytes;
+			pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, cstate->bytes_processed);
+
+			if (inbytes == 0)
+			{
+				/* Hit EOF. If we have any unconverted bytes left, convert them now */
+				if (cstate->conversion_buf_index < cstate->conversion_buf_len)
+				{
+					CopyConvertBuf(cstate);
+					return true;
+				}
+
+				/* truly hit EOF */
+				cstate->valid_raw_buf_len = 0;
+				return false;
+			}
+		}
+	}
+	else
+	{
+		/*
+		 * No encoding conversion required. But we still need to verify that the input is
+		 * valid.
+		 *
+		 * XXX: for single-byte encoding, the verification only needs to check that the
+		 * input doesn't contain any zero bytes. Could we skip that altogether?
+		 */
+		int			validbytes;
+
+		inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes,
+							  1, RAW_BUF_SIZE - nbytes);
+		nbytes += inbytes;
+		cstate->raw_buf[nbytes] = '\0';
+		cstate->raw_buf_len = nbytes;
+
+		cstate->bytes_processed += inbytes;
+		pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, cstate->bytes_processed);
+
+		validbytes = pg_encoding_verifymbstr(cstate->file_encoding, cstate->raw_buf, nbytes);
+		if (validbytes == 0 && nbytes > 0)
+		{
+			report_invalid_encoding(cstate->file_encoding, cstate->raw_buf, nbytes);
+		}
+
+		cstate->valid_raw_buf_len = validbytes;
+	}
+
+	return (inbytes > 0);
+}
+
+static bool
+CopyLoadRawBufBinary(CopyFromState cstate)
 {
 	int			nbytes = RAW_BUF_BYTES(cstate);
 	int			inbytes;
@@ -386,8 +535,10 @@ CopyLoadRawBuf(CopyFromState cstate)
 	cstate->raw_buf[nbytes] = '\0';
 	cstate->raw_buf_index = 0;
 	cstate->raw_buf_len = nbytes;
+
 	cstate->bytes_processed += nbytes;
 	pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, cstate->bytes_processed);
+
 	return (inbytes > 0);
 }
 
@@ -423,7 +574,7 @@ CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes)
 			/* Load more data if buffer is empty. */
 			if (RAW_BUF_BYTES(cstate) == 0)
 			{
-				if (!CopyLoadRawBuf(cstate))
+				if (!CopyLoadRawBufBinary(cstate))
 					break;		/* EOF */
 			}
 
@@ -699,9 +850,6 @@ CopyReadLine(CopyFromState cstate)
 	resetStringInfo(&cstate->line_buf);
 	cstate->line_buf_valid = true;
 
-	/* Mark that encoding conversion hasn't occurred yet */
-	cstate->line_buf_converted = false;
-
 	/* Parse data and transfer into line_buf */
 	result = CopyReadLineText(cstate);
 
@@ -714,10 +862,13 @@ CopyReadLine(CopyFromState cstate)
 		 */
 		if (cstate->copy_src == COPY_NEW_FE)
 		{
+			int			inbytes;
+
 			do
 			{
-				cstate->raw_buf_index = cstate->raw_buf_len;
-			} while (CopyLoadRawBuf(cstate));
+				inbytes = CopyGetData(cstate, cstate->raw_buf,
+									  1, RAW_BUF_SIZE);
+			} while (inbytes > 0);
 		}
 	}
 	else
@@ -754,26 +905,6 @@ CopyReadLine(CopyFromState cstate)
 		}
 	}
 
-	/* Done reading the line.  Convert it to server encoding. */
-	if (cstate->need_transcoding)
-	{
-		char	   *cvt;
-
-		cvt = pg_any_to_server(cstate->line_buf.data,
-							   cstate->line_buf.len,
-							   cstate->file_encoding);
-		if (cvt != cstate->line_buf.data)
-		{
-			/* transfer converted data back to line_buf */
-			resetStringInfo(&cstate->line_buf);
-			appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt));
-			pfree(cvt);
-		}
-	}
-
-	/* Now it's safe to use the buffer in error messages */
-	cstate->line_buf_converted = true;
-
 	return result;
 }
 
@@ -789,7 +920,6 @@ CopyReadLineText(CopyFromState cstate)
 	bool		need_data = false;
 	bool		hit_eof = false;
 	bool		result = false;
-	char		mblen_str[2];
 
 	/* CSV variables */
 	bool		first_char_in_line = true;
@@ -807,8 +937,6 @@ CopyReadLineText(CopyFromState cstate)
 			escapec = '\0';
 	}
 
-	mblen_str[1] = '\0';
-
 	/*
 	 * The objective of this loop is to transfer the entire next input line
 	 * into line_buf.  Hence, we only care for detecting newlines (\r and/or
@@ -832,7 +960,7 @@ CopyReadLineText(CopyFromState cstate)
 	 */
 	copy_raw_buf = cstate->raw_buf;
 	raw_buf_ptr = cstate->raw_buf_index;
-	copy_buf_len = cstate->raw_buf_len;
+	copy_buf_len = cstate->valid_raw_buf_len;
 
 	for (;;)
 	{
@@ -857,10 +985,10 @@ CopyReadLineText(CopyFromState cstate)
 			 * Try to read some more data.  This will certainly reset
 			 * raw_buf_index to zero, and raw_buf_ptr must go with it.
 			 */
-			if (!CopyLoadRawBuf(cstate))
+			if (!CopyLoadRawBufText(cstate))
 				hit_eof = true;
 			raw_buf_ptr = 0;
-			copy_buf_len = cstate->raw_buf_len;
+			copy_buf_len = cstate->valid_raw_buf_len;
 
 			/*
 			 * If we are completely out of data, break out of the loop,
@@ -1106,30 +1234,6 @@ CopyReadLineText(CopyFromState cstate)
 		 * value, while in non-CSV mode, \. cannot be a data value.
 		 */
 not_end_of_copy:
-
-		/*
-		 * Process all bytes of a multi-byte character as a group.
-		 *
-		 * We only support multi-byte sequences where the first byte has the
-		 * high-bit set, so as an optimization we can avoid this block
-		 * entirely if it is not set.
-		 */
-		if (cstate->encoding_embeds_ascii && IS_HIGHBIT_SET(c))
-		{
-			int			mblen;
-
-			/*
-			 * It is enough to look at the first byte in all our encodings, to
-			 * get the length.  (GB18030 is a bit special, but still works for
-			 * our purposes; see comment in pg_gb18030_mblen())
-			 */
-			mblen_str[0] = c;
-			mblen = pg_encoding_mblen(cstate->file_encoding, mblen_str);
-
-			IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(mblen - 1);
-			IF_NEED_REFILL_AND_EOF_BREAK(mblen - 1);
-			raw_buf_ptr += mblen - 1;
-		}
 		first_char_in_line = false;
 	}							/* end of outer loop */
 
diff --git a/src/backend/utils/mb/mbutils.c b/src/backend/utils/mb/mbutils.c
index b41d3e0bb9a..c7fab04b9b4 100644
--- a/src/backend/utils/mb/mbutils.c
+++ b/src/backend/utils/mb/mbutils.c
@@ -436,6 +436,61 @@ pg_do_encoding_conversion(unsigned char *src, int len,
 	return result;
 }
 
+/*
+ * Convert src string to another encoding.
+ *
+ * This function has a different API than the other conversion functions.
+ * The caller should've looked up the conversion function using
+ * FindDefaultConversionProc(). Unlike the other functions, the converted
+ * result is not palloc'd. It is written to a caller-supplied buffer instead.
+ *
+ * src_encoding   - encoding to convert from
+ * dest_encoding  - encoding to convert to
+ * src, srclen    - input buffer and its length in bytes
+ * dest, destlen  - destination buffer and its size in bytes
+ *
+ * The output is null-terminated.
+ *
+ * If destlen < srclen * MAX_CONVERSION_LENGTH + 1, the converted output
+ * wouldn't necessarily fit in the output buffer, and the function will not
+ * convert the whole input.
+ *
+ * TODO: It would be nice to also return the number of bytes written to the
+ * caller, to avoid a call to strlen().
+ */
+int
+pg_do_encoding_conversion_buf(Oid proc,
+							  int src_encoding,
+							  int dest_encoding,
+							  unsigned char *src, int srclen,
+							  unsigned char *dest, int destlen,
+							  bool noError)
+{
+	Datum           result;
+
+	/*
+	 * If the destination buffer is not large enough to hold the
+	 * result in the worst case, limit the input size passed to
+	 * the conversion function.
+	 *
+	 * TODO: It would perhaps be more efficient to pass the destination
+	 * buffer size to the conversion function, so that if the conversion
+	 * expands less than the worst case, it could continue to fill up the
+	 * whole buffer.
+	 */
+	if ((Size) srclen >= ((destlen - 1) / (Size) MAX_CONVERSION_GROWTH))
+		srclen = ((destlen - 1) / (Size) MAX_CONVERSION_GROWTH);
+
+	result = OidFunctionCall6(proc,
+							  Int32GetDatum(src_encoding),
+							  Int32GetDatum(dest_encoding),
+							  CStringGetDatum(src),
+							  CStringGetDatum(dest),
+							  Int32GetDatum(srclen),
+							  BoolGetDatum(noError));
+	return DatumGetInt32(result);
+}
+
 /*
  * Convert string to encoding encoding_name. The source
  * encoding is the DB encoding.
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index e37942df391..364e1bc3651 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -77,7 +77,7 @@ typedef struct CopyFromStateData
 	EolType		eol_type;		/* EOL type of input */
 	int			file_encoding;	/* file or remote side's character encoding */
 	bool		need_transcoding;	/* file encoding diff from server? */
-	bool		encoding_embeds_ascii;	/* ASCII can be non-first byte? */
+	Oid			conversion_proc;
 
 	/* parameters from the COPY command */
 	Relation	rel;			/* relation to copy from */
@@ -139,24 +139,41 @@ typedef struct CopyFromStateData
 	 * line_buf is not used.)
 	 */
 	StringInfoData line_buf;
-	bool		line_buf_converted; /* converted to server encoding? */
 	bool		line_buf_valid; /* contains the row being processed? */
 
 	/*
-	 * Finally, raw_buf holds raw data read from the data source (file or
-	 * client connection).  In text mode, CopyReadLine parses this data
+	 * conversion_buf holds raw input data read from the data source (file or
+	 * client connection), not yet converted to the database encoding.
+	 *
+	 * If the encoding conversion is not required, the input data is read
+	 * directly into 'raw_buf', and conversion_buf is not used.
+	 */
+#define CONVERSION_BUF_SIZE 65536		/* we palloc CONVERSION_BUF_SIZE+1 bytes */
+	char	   *conversion_buf;
+	int			conversion_buf_index;
+	int			conversion_buf_len;
+
+	/*
+	 * raw_buf holds input data, already converted to database encoding.
+	 *
+	 * In text mode, CopyReadLine parses this data
 	 * sufficiently to locate line boundaries, then transfers the data to
-	 * line_buf and converts it.  In binary mode, CopyReadBinaryData fetches
+	 * line_buf.  In binary mode, CopyReadBinaryData fetches
 	 * appropriate amounts of data from this buffer.  In both modes, we
 	 * guarantee that there is a \0 at raw_buf[raw_buf_len].
+	 *
+	 * XXX: 'raw_buf' is a bit of a misnomer, since the data in 'conversion_buf'
+	 * is more raw than this.
 	 */
 #define RAW_BUF_SIZE 65536		/* we palloc RAW_BUF_SIZE+1 bytes */
 	char	   *raw_buf;
 	int			raw_buf_index;	/* next byte to process */
 	int			raw_buf_len;	/* total # of bytes stored */
-	uint64		bytes_processed;/* number of bytes processed so far */
+	int			valid_raw_buf_len;
 	/* Shorthand for number of unconsumed bytes available in raw_buf */
 #define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index)
+
+	uint64		bytes_processed; /* number of bytes processed so far */
 } CopyFromStateData;
 
 extern void ReceiveCopyBegin(CopyFromState cstate);
diff --git a/src/include/mb/pg_wchar.h b/src/include/mb/pg_wchar.h
index 346a41a1f3d..9a22a6461d6 100644
--- a/src/include/mb/pg_wchar.h
+++ b/src/include/mb/pg_wchar.h
@@ -616,6 +616,12 @@ extern int	pg_bind_textdomain_codeset(const char *domainname);
 extern unsigned char *pg_do_encoding_conversion(unsigned char *src, int len,
 												int src_encoding,
 												int dest_encoding);
+extern int pg_do_encoding_conversion_buf(Oid proc,
+										 int src_encoding,
+										 int dest_encoding,
+										 unsigned char *src, int srclen,
+										 unsigned char *dst, int dstlen,
+										 bool noError);
 
 extern char *pg_client_to_server(const char *s, int len);
 extern char *pg_server_to_client(const char *s, int len);
-- 
2.29.2

