diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml
index 13a8b68d95..bf21abd8e0 100644
--- a/doc/src/sgml/ref/copy.sgml
+++ b/doc/src/sgml/ref/copy.sgml
@@ -364,6 +364,17 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>IGNORE_CONFLICTS</literal></term>
+    <listitem>
+     <para>
+      specifies ignore to error up to specified amount .
+      Instead write the error record to failed record file and 
+      precede to the next record
+     </para>
+    </listitem>
+   </varlistentry>
+
   </variablelist>
  </refsect1>
 
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 9bc67ce60f..ffa6aecbd5 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -43,6 +43,7 @@
 #include "port/pg_bswap.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/fd.h"
+#include "storage/lmgr.h"
 #include "tcop/tcopprot.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
@@ -118,6 +119,7 @@ typedef struct CopyStateData
 	int			file_encoding;	/* file or remote side's character encoding */
 	bool		need_transcoding;	/* file encoding diff from server? */
 	bool		encoding_embeds_ascii;	/* ASCII can be non-first byte? */
+	FILE	   *failed_rec_file;
 
 	/* parameters from the COPY command */
 	Relation	rel;			/* relation to copy to or from */
@@ -147,6 +149,9 @@ typedef struct CopyStateData
 	bool		convert_selectively;	/* do selective binary conversion? */
 	List	   *convert_select; /* list of column names (can be NIL) */
 	bool	   *convert_select_flags;	/* per-column CSV/TEXT CS flags */
+	char	   *failed_rec_filename;
+	bool	   ignore_conflict;
+	int	   error_limit;	/* total # of error to log */
 
 	/* these are just for error messages, see CopyFromErrorCallback */
 	const char *cur_relname;	/* table name for error messages */
@@ -766,6 +771,21 @@ CopyLoadRawBuf(CopyState cstate)
 	return (inbytes > 0);
 }
 
+/*
+ * LogCopyError log error in to error log file
+ */
+static void
+LogCopyError(CopyState cstate, const char *str)
+{
+	appendBinaryStringInfo(&cstate->line_buf, str, strlen(str));
+#ifndef WIN32
+	appendStringInfoCharMacro(&cstate->line_buf, '\n');
+#else
+	appendBinaryStringInfo(&cstate->line_buf, "\r\n", strlen("\r\n"));
+#endif
+	fwrite(cstate->line_buf.data, 1, cstate->line_buf.len, cstate->failed_rec_file);
+	cstate->error_limit--;
+}
 
 /*
  *	 DoCopy executes the SQL COPY statement
@@ -1226,6 +1246,19 @@ ProcessCopyOptions(ParseState *pstate,
 								defel->defname),
 						 parser_errposition(pstate, defel->location)));
 		}
+		else if (strcmp(defel->defname, "ignore_conflicts") == 0)
+		{
+			List       *conflictOption;
+			if (cstate->ignore_conflict)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options"),
+						 parser_errposition(pstate, defel->location)));
+			cstate->ignore_conflict = true;
+			conflictOption = (List *) defel->arg;
+			cstate->error_limit = intVal(list_nth(conflictOption, 0));
+			cstate->failed_rec_filename = strVal(list_nth(conflictOption, 1));
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -1749,6 +1782,11 @@ EndCopy(CopyState cstate)
 					(errcode_for_file_access(),
 					 errmsg("could not close file \"%s\": %m",
 							cstate->filename)));
+		if (cstate->failed_rec_filename != NULL && FreeFile(cstate->failed_rec_file))
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not close file \"%s\": %m",
+							cstate->failed_rec_filename)));
 	}
 
 	MemoryContextDelete(cstate->copycontext);
@@ -2461,6 +2499,8 @@ CopyFrom(CopyState cstate)
 		hi_options |= HEAP_INSERT_FROZEN;
 	}
 
+	if (!cstate->ignore_conflict)
+		cstate->error_limit = 0;
 	/*
 	 * We need a ResultRelInfo so we can use the regular executor's
 	 * index-entry-making machinery.  (There used to be a huge amount of code
@@ -2579,6 +2619,10 @@ CopyFrom(CopyState cstate)
 		 */
 		insertMethod = CIM_SINGLE;
 	}
+	else if (cstate->ignore_conflict)
+	{
+		insertMethod = CIM_SINGLE;
+	}
 	else
 	{
 		/*
@@ -2968,12 +3012,59 @@ CopyFrom(CopyState cstate)
 						 */
 						tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
 					}
+					else if (cstate->ignore_conflict && cstate->error_limit > 0)
+					{
+						bool		specConflict;
+						uint32		specToken;
+						specConflict = false;
+
+						specToken = SpeculativeInsertionLockAcquire(GetCurrentTransactionId());
+						HeapTupleHeaderSetSpeculativeToken(tuple->t_data, specToken);
+
+						/* insert the tuple, with the speculative token */
+						heap_insert(resultRelInfo->ri_RelationDesc, tuple,
+									estate->es_output_cid,
+									HEAP_INSERT_SPECULATIVE,
+									NULL);
+
+						/* insert index entries for tuple */
+						recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
+													   estate, true, &specConflict,
+													   NIL);
+
+						/* adjust the tuple's state accordingly */
+						if (!specConflict)
+						{
+							heap_finish_speculative(resultRelInfo->ri_RelationDesc, tuple);
+							processed++;
+						}
+						else
+						{
+							heap_abort_speculative(resultRelInfo->ri_RelationDesc, tuple);
+#ifndef WIN32
+							appendStringInfoCharMacro(&cstate->line_buf, '\n');
+#else
+							appendBinaryStringInfo(&cstate->cstate->line_buf, "\r\n", strlen("\r\n"));
+#endif
+							fwrite(cstate->line_buf.data, 1, cstate->line_buf.len, cstate->failed_rec_file);
+							cstate->error_limit--;
+
+						}
+
+				/*
+				 * Wake up anyone waiting for our decision.  They will re-check
+				 * the tuple, see that it's no longer speculative, and wait on our
+				 * XID as if this was a regularly inserted tuple all along.
+				 */
+						SpeculativeInsertionLockRelease(GetCurrentTransactionId());
+
+				}
 					else
 						heap_insert(resultRelInfo->ri_RelationDesc, tuple,
 									mycid, hi_options, bistate);
 
 					/* And create index entries for it */
-					if (resultRelInfo->ri_NumIndices > 0)
+					if (resultRelInfo->ri_NumIndices > 0 && cstate->error_limit == 0)
 						recheckIndexes = ExecInsertIndexTuples(slot,
 															   &(tuple->t_self),
 															   estate,
@@ -2994,7 +3085,8 @@ CopyFrom(CopyState cstate)
 			 * or FDW; this is the same definition used by nodeModifyTable.c
 			 * for counting tuples inserted by an INSERT command.
 			 */
-			processed++;
+				if(!cstate->ignore_conflict)
+				processed++;
 		}
 	}
 
@@ -3286,6 +3378,48 @@ BeginCopyFrom(ParseState *pstate,
 	cstate->num_defaults = num_defaults;
 	cstate->is_program = is_program;
 
+	if (cstate->failed_rec_filename)
+	{
+		mode_t		oumask; /* Pre-existing umask value */
+		struct stat st;
+			/*
+			 * Prevent write to relative path ... too easy to shoot oneself in
+			 * the foot by overwriting a database file ...
+			 */
+			if (!is_absolute_path(cstate->failed_rec_filename))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_NAME),
+						 errmsg("relative path not allowed for failed record file")));
+			oumask = umask(S_IWGRP | S_IWOTH);
+			PG_TRY();
+			{
+				cstate->failed_rec_file = AllocateFile(cstate->failed_rec_filename, PG_BINARY_W);
+			}
+			PG_CATCH();
+			{
+				umask(oumask);
+				PG_RE_THROW();
+			}
+			PG_END_TRY();
+			umask(oumask);
+			if (cstate->failed_rec_file == NULL)
+				ereport(ERROR,
+						(errcode_for_file_access(),
+						 errmsg("could not open file \"%s\" for writing: %m",
+								cstate->failed_rec_filename)));
+
+			if (fstat(fileno(cstate->failed_rec_file), &st))
+				ereport(ERROR,
+						(errcode_for_file_access(),
+						 errmsg("could not stat file \"%s\": %m",
+								cstate->failed_rec_filename)));
+
+			if (S_ISDIR(st.st_mode))
+				ereport(ERROR,
+						(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+						 errmsg("\"%s\" is a directory", cstate->failed_rec_filename)));
+		}
+
 	if (data_source_cb)
 	{
 		cstate->copy_dest = COPY_CALLBACK;
@@ -3498,7 +3632,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
 	/* Initialize all values for row to NULL */
 	MemSet(values, 0, num_phys_attrs * sizeof(Datum));
 	MemSet(nulls, true, num_phys_attrs * sizeof(bool));
-
+next_line:
 	if (!cstate->binary)
 	{
 		char	  **field_strings;
@@ -3513,9 +3647,16 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
 
 		/* check for overflowing fields */
 		if (nfields > 0 && fldct > nfields)
+		{
+			if (cstate->ignore_conflict && cstate->error_limit > 0)
+			{
+				LogCopyError(cstate, " extra data after last expected column");
+				goto next_line;
+			}else
 			ereport(ERROR,
 					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
 					 errmsg("extra data after last expected column")));
+		}
 
 		fieldno = 0;
 
@@ -3523,15 +3664,29 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
 		if (file_has_oids)
 		{
 			if (fieldno >= fldct)
-				ereport(ERROR,
-						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-						 errmsg("missing data for OID column")));
+			{
+				if (cstate->ignore_conflict && cstate->error_limit > 0)
+				{
+					LogCopyError(cstate, " missing data for OID column");
+					goto next_line;
+				}else
+					ereport(ERROR,
+							(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+							 errmsg("missing data for OID column")));
+			}
 			string = field_strings[fieldno++];
 
 			if (string == NULL)
+			{
+				if (cstate->ignore_conflict && cstate->error_limit > 0)
+				{
+					LogCopyError(cstate, " null OID in COPY data");
+					goto next_line;
+				}else
 				ereport(ERROR,
 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
 						 errmsg("null OID in COPY data")));
+			}
 			else if (cstate->oids && tupleOid != NULL)
 			{
 				cstate->cur_attname = "oid";
@@ -3539,9 +3694,17 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
 				*tupleOid = DatumGetObjectId(DirectFunctionCall1(oidin,
 																 CStringGetDatum(string)));
 				if (*tupleOid == InvalidOid)
+				{
+					if (cstate->ignore_conflict && cstate->error_limit > 0)
+					{
+						LogCopyError(cstate, " invalid OID in COPY data");
+						goto next_line;
+					}else
+
 					ereport(ERROR,
 							(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
 							 errmsg("invalid OID in COPY data")));
+				}
 				cstate->cur_attname = NULL;
 				cstate->cur_attval = NULL;
 			}
@@ -3555,10 +3718,20 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
 			Form_pg_attribute att = TupleDescAttr(tupDesc, m);
 
 			if (fieldno >= fldct)
+			{
+				if (cstate->ignore_conflict && cstate->error_limit > 0)
+				{
+					appendStringInfo(&cstate->line_buf, " missing data for column %s",
+								NameStr(att->attname));
+					LogCopyError(cstate, " ");
+					goto next_line;
+				}else
+
 				ereport(ERROR,
 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
 						 errmsg("missing data for column \"%s\"",
 								NameStr(att->attname))));
+			}
 			string = field_strings[fieldno++];
 
 			if (cstate->convert_select_flags &&
@@ -3645,10 +3818,19 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
 		}
 
 		if (fld_count != attr_count)
-			ereport(ERROR,
-					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-					 errmsg("row field count is %d, expected %d",
-							(int) fld_count, attr_count)));
+		{
+			if (cstate->ignore_conflict && cstate->error_limit > 0)
+			{
+				appendStringInfo(&cstate->line_buf, "row field count is %d, expected %d",
+						(int) fld_count, attr_count);
+				LogCopyError(cstate, " ");
+				goto next_line;
+			}else
+				ereport(ERROR,
+						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+						 errmsg("row field count is %d, expected %d",
+								(int) fld_count, attr_count)));
+		}
 
 		if (file_has_oids)
 		{
@@ -3663,9 +3845,16 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
 														 -1,
 														 &isnull));
 			if (isnull || loaded_oid == InvalidOid)
-				ereport(ERROR,
-						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-						 errmsg("invalid OID in COPY data")));
+			{
+				if (cstate->ignore_conflict && cstate->error_limit > 0)
+				{
+					LogCopyError(cstate, " invalid OID in COPY data");
+					goto next_line;
+				}else
+					ereport(ERROR,
+							(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+							 errmsg("invalid OID in COPY data")));
+			}
 			cstate->cur_attname = NULL;
 			if (cstate->oids && tupleOid != NULL)
 				*tupleOid = loaded_oid;
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 87f5e95827..c1084f71bc 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -632,7 +632,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 	EXCLUDE EXCLUDING EXCLUSIVE EXECUTE EXISTS EXPLAIN
 	EXTENSION EXTERNAL EXTRACT
 
-	FALSE_P FAMILY FETCH FILTER FIRST_P FLOAT_P FOLLOWING FOR
+	FALSE_P FAMILY FETCH FILE_P FILTER FIRST_P FLOAT_P FOLLOWING FOR
 	FORCE FOREIGN FORWARD FREEZE FROM FULL FUNCTION FUNCTIONS
 
 	GENERATED GLOBAL GRANT GRANTED GREATEST GROUP_P GROUPING GROUPS
@@ -650,7 +650,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 
 	LABEL LANGUAGE LARGE_P LAST_P LATERAL_P
 	LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL
-	LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED
+	LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOG_P LOGGED
 
 	MAPPING MATCH MATERIALIZED MAXVALUE METHOD MINUTE_P MINVALUE MODE MONTH_P MOVE
 
@@ -3107,6 +3107,10 @@ copy_opt_item:
 				{
 					$$ = makeDefElem("encoding", (Node *)makeString($2), @1);
 				}
+			| ON CONFLICT LOG_P Iconst ',' LOG_P FILE_P NAME_P Sconst
+				{
+					$$ = makeDefElem("ignore_conflicts", (Node *)list_make2(makeInteger($4), makeString($9)), @1);
+				}
 		;
 
 /* The following exist for backward compatibility with very old versions */
@@ -15086,6 +15090,7 @@ unreserved_keyword:
 			| EXTENSION
 			| EXTERNAL
 			| FAMILY
+			| FILE_P
 			| FILTER
 			| FIRST_P
 			| FOLLOWING
@@ -15134,6 +15139,7 @@ unreserved_keyword:
 			| LOCATION
 			| LOCK_P
 			| LOCKED
+			| LOG_P
 			| LOGGED
 			| MAPPING
 			| MATCH
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index 23db40147b..442562b0fe 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -162,6 +162,7 @@ PG_KEYWORD("extract", EXTRACT, COL_NAME_KEYWORD)
 PG_KEYWORD("false", FALSE_P, RESERVED_KEYWORD)
 PG_KEYWORD("family", FAMILY, UNRESERVED_KEYWORD)
 PG_KEYWORD("fetch", FETCH, RESERVED_KEYWORD)
+PG_KEYWORD("file", FILE_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("filter", FILTER, UNRESERVED_KEYWORD)
 PG_KEYWORD("first", FIRST_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("float", FLOAT_P, COL_NAME_KEYWORD)
@@ -242,6 +243,7 @@ PG_KEYWORD("localtimestamp", LOCALTIMESTAMP, RESERVED_KEYWORD)
 PG_KEYWORD("location", LOCATION, UNRESERVED_KEYWORD)
 PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("locked", LOCKED, UNRESERVED_KEYWORD)
+PG_KEYWORD("log", LOG_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("logged", LOGGED, UNRESERVED_KEYWORD)
 PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD)
 PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD)
