From 2841fd41db106ae09b68d3b5cf29901e98e2446a Mon Sep 17 00:00:00 2001
From: Elvis Pranskevichus <elvis@magic.io>
Date: Thu, 6 Dec 2018 17:16:28 -0500
Subject: [PATCH] Allow anonymous rowtypes in function return column definition

Currently, the following query

    SELECT q.b = row(2)
    FROM unnest(ARRAY[row(1, row(2))]) AS q(a int, b record);

would fail with

    ERROR:  column "b" has pseudo-type record

This is due to CheckAttributeNamesTypes() being used on a function
coldeflist as if it was a real relation definition.  But in the context
of a query there seems to be no harm in allowing this, as other ways of
manipulating anonymous rowtypes work well, e.g.:

    SELECT (ARRAY[ROW(1, ROW(2))])[1];
---
 src/backend/catalog/heap.c             | 42 ++++++++++++++++++--------
 src/backend/catalog/index.c            |  2 +-
 src/backend/commands/tablecmds.c       |  4 +--
 src/backend/parser/parse_relation.c    |  5 +--
 src/include/catalog/heap.h             |  6 ++--
 src/test/regress/expected/rowtypes.out | 10 ++++++
 src/test/regress/sql/rowtypes.sql      |  5 +++
 7 files changed, 55 insertions(+), 19 deletions(-)

diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c
index cc865de627..103b4fb96a 100644
--- a/src/backend/catalog/heap.c
+++ b/src/backend/catalog/heap.c
@@ -451,7 +451,8 @@ heap_create(const char *relname,
  */
 void
 CheckAttributeNamesTypes(TupleDesc tupdesc, char relkind,
-						 bool allow_system_table_mods)
+						 bool allow_system_table_mods,
+						 bool in_coldeflist)
 {
 	int			i;
 	int			j;
@@ -509,7 +510,8 @@ CheckAttributeNamesTypes(TupleDesc tupdesc, char relkind,
 						   TupleDescAttr(tupdesc, i)->atttypid,
 						   TupleDescAttr(tupdesc, i)->attcollation,
 						   NIL, /* assume we're creating a new rowtype */
-						   allow_system_table_mods);
+						   allow_system_table_mods,
+						   in_coldeflist);
 	}
 }
 
@@ -532,7 +534,8 @@ void
 CheckAttributeType(const char *attname,
 				   Oid atttypid, Oid attcollation,
 				   List *containing_rowtypes,
-				   bool allow_system_table_mods)
+				   bool allow_system_table_mods,
+				   bool in_coldeflist)
 {
 	char		att_typtype = get_typtype(atttypid);
 	Oid			att_typelem;
@@ -540,12 +543,24 @@ CheckAttributeType(const char *attname,
 	if (att_typtype == TYPTYPE_PSEUDO)
 	{
 		/*
-		 * Refuse any attempt to create a pseudo-type column, except for a
-		 * special hack for pg_statistic: allow ANYARRAY when modifying system
-		 * catalogs (this allows creating pg_statistic and cloning it during
-		 * VACUUM FULL)
+		 * Generally speaking, we don't allow pseudo-types in column
+		 * definitions, with the following exceptions:
+		 *
+		 * First, we allow the record pseudo-type (and its array type) in
+		 * the column definition of a set-returning function.  In this case
+		 * the record type is "blessed", i.e. there is an actual TupleDesc
+		 * associated with it and it can be treated as a normal composite
+		 * type.  Composite type recursion is not a danger here since
+		 * each instance of an anonymous rowtype in this context would have
+		 * a distinct TupleDesc.
+		 *
+		 * Secondly, we allow allow ANYARRAY when modifying system
+		 * catalogs (this allows creating pg_statistic and cloning
+		 * it during VACUUM FULL).
 		 */
-		if (atttypid != ANYARRAYOID || !allow_system_table_mods)
+		if (!((atttypid == ANYARRAYOID && allow_system_table_mods) ||
+				((atttypid == RECORDOID || atttypid == RECORDARRAYOID) &&
+					in_coldeflist)))
 			ereport(ERROR,
 					(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
 					 errmsg("column \"%s\" has pseudo-type %s",
@@ -558,7 +573,8 @@ CheckAttributeType(const char *attname,
 		 */
 		CheckAttributeType(attname, getBaseType(atttypid), attcollation,
 						   containing_rowtypes,
-						   allow_system_table_mods);
+						   allow_system_table_mods,
+						   in_coldeflist);
 	}
 	else if (att_typtype == TYPTYPE_COMPOSITE)
 	{
@@ -596,7 +612,8 @@ CheckAttributeType(const char *attname,
 			CheckAttributeType(NameStr(attr->attname),
 							   attr->atttypid, attr->attcollation,
 							   containing_rowtypes,
-							   allow_system_table_mods);
+							   allow_system_table_mods,
+							   in_coldeflist);
 		}
 
 		relation_close(relation, AccessShareLock);
@@ -610,7 +627,8 @@ CheckAttributeType(const char *attname,
 		 */
 		CheckAttributeType(attname, att_typelem, attcollation,
 						   containing_rowtypes,
-						   allow_system_table_mods);
+						   allow_system_table_mods,
+						   in_coldeflist);
 	}
 
 	/*
@@ -1076,7 +1094,7 @@ heap_create_with_catalog(const char *relname,
 	 */
 	Assert(IsNormalProcessingMode() || IsBootstrapProcessingMode());
 
-	CheckAttributeNamesTypes(tupdesc, relkind, allow_system_table_mods);
+	CheckAttributeNamesTypes(tupdesc, relkind, allow_system_table_mods, false);
 
 	/*
 	 * This would fail later on anyway, if the relation already exists.  But
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 225c078018..9a7f7aabbf 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -411,7 +411,7 @@ ConstructTupleDescriptor(Relation heapRelation,
 			 */
 			CheckAttributeType(NameStr(to->attname),
 							   to->atttypid, to->attcollation,
-							   NIL, false);
+							   NIL, false, false);
 		}
 
 		/*
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index ff76499137..8383b419aa 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -5509,7 +5509,7 @@ ATExecAddColumn(List **wqueue, AlteredTableInfo *tab, Relation rel,
 	/* make sure datatype is legal for a column */
 	CheckAttributeType(colDef->colname, typeOid, collOid,
 					   list_make1_oid(rel->rd_rel->reltype),
-					   false);
+					   false, false);
 
 	/* construct new attribute's pg_attribute entry */
 	attribute.attrelid = myrelid;
@@ -9449,7 +9449,7 @@ ATPrepAlterColumnType(List **wqueue,
 	/* make sure datatype is legal for a column */
 	CheckAttributeType(colName, targettype, targetcollid,
 					   list_make1_oid(rel->rd_rel->reltype),
-					   false);
+					   false, false);
 
 	if (tab->relkind == RELKIND_RELATION ||
 		tab->relkind == RELKIND_PARTITIONED_TABLE)
diff --git a/src/backend/parser/parse_relation.c b/src/backend/parser/parse_relation.c
index 09fbb588af..2106d5bcd7 100644
--- a/src/backend/parser/parse_relation.c
+++ b/src/backend/parser/parse_relation.c
@@ -1590,9 +1590,10 @@ addRangeTableEntryForFunction(ParseState *pstate,
 
 			/*
 			 * Ensure that the coldeflist defines a legal set of names (no
-			 * duplicates) and datatypes (no pseudo-types, for instance).
+			 * duplicates) and datatypes (no pseudo-types, for instance, but
+			 * record, and record[] are allowed).
 			 */
-			CheckAttributeNamesTypes(tupdesc, RELKIND_COMPOSITE_TYPE, false);
+			CheckAttributeNamesTypes(tupdesc, RELKIND_COMPOSITE_TYPE, false, true);
 		}
 		else
 			ereport(ERROR,
diff --git a/src/include/catalog/heap.h b/src/include/catalog/heap.h
index 625b7e5c43..962ca08eab 100644
--- a/src/include/catalog/heap.h
+++ b/src/include/catalog/heap.h
@@ -130,12 +130,14 @@ extern const FormData_pg_attribute *SystemAttributeDefinition(AttrNumber attno);
 extern const FormData_pg_attribute *SystemAttributeByName(const char *attname);
 
 extern void CheckAttributeNamesTypes(TupleDesc tupdesc, char relkind,
-						 bool allow_system_table_mods);
+						 bool allow_system_table_mods,
+						 bool allow_anonymous_records);
 
 extern void CheckAttributeType(const char *attname,
 				   Oid atttypid, Oid attcollation,
 				   List *containing_rowtypes,
-				   bool allow_system_table_mods);
+				   bool allow_system_table_mods,
+				   bool allow_anonymous_records);
 
 /* pg_partitioned_table catalog manipulation functions */
 extern void StorePartitionKey(Relation rel,
diff --git a/src/test/regress/expected/rowtypes.out b/src/test/regress/expected/rowtypes.out
index d6a1a3331e..aaeffbc2a2 100644
--- a/src/test/regress/expected/rowtypes.out
+++ b/src/test/regress/expected/rowtypes.out
@@ -668,6 +668,16 @@ select row(1, '(1,2)')::testtype6 *<> row(1, '(1,3)')::testtype6;
 (1 row)
 
 drop type testtype1, testtype2, testtype3, testtype4, testtype5, testtype6;
+-- Test anonymous rowtype in coldeflist
+SELECT q.b = row(2), q.c = ARRAY[row(3)], q.d = row(row(4)) FROM
+    unnest(ARRAY[
+      row(1, row(2), ARRAY[row(3)], row(row(4)))
+    ]) AS q(a int, b record, c record[], d record);
+ ?column? | ?column? | ?column? 
+----------+----------+----------
+ t        | t        | t
+(1 row)
+
 --
 -- Test case derived from bug #5716: check multiple uses of a rowtype result
 --
diff --git a/src/test/regress/sql/rowtypes.sql b/src/test/regress/sql/rowtypes.sql
index e6d389805c..5fe7a48788 100644
--- a/src/test/regress/sql/rowtypes.sql
+++ b/src/test/regress/sql/rowtypes.sql
@@ -261,6 +261,11 @@ select row(1, '(1,2)')::testtype6 *<> row(1, '(1,3)')::testtype6;
 
 drop type testtype1, testtype2, testtype3, testtype4, testtype5, testtype6;
 
+-- Test anonymous rowtype in coldeflist
+SELECT q.b = row(2), q.c = ARRAY[row(3)], q.d = row(row(4)) FROM
+    unnest(ARRAY[
+      row(1, row(2), ARRAY[row(3)], row(row(4)))
+    ]) AS q(a int, b record, c record[], d record);
 
 --
 -- Test case derived from bug #5716: check multiple uses of a rowtype result
-- 
2.19.2

