From eb68d572c815c34500f6ed720860f24386afe9cd Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Mon, 12 Nov 2018 16:18:45 -0300
Subject: [PATCH 2/5] move CloneForeignKeyConstraints to tablecmds.c

---
 src/backend/catalog/pg_constraint.c | 304 -----------------------------------
 src/backend/commands/tablecmds.c    | 306 ++++++++++++++++++++++++++++++++++++
 src/include/catalog/pg_constraint.h |   3 -
 3 files changed, 306 insertions(+), 307 deletions(-)

diff --git a/src/backend/catalog/pg_constraint.c b/src/backend/catalog/pg_constraint.c
index f71d89ff17..2833e45cff 100644
--- a/src/backend/catalog/pg_constraint.c
+++ b/src/backend/catalog/pg_constraint.c
@@ -39,10 +39,6 @@
 #include "utils/tqual.h"
 
 
-static void clone_fk_constraints(Relation pg_constraint, Relation parentRel,
-					 Relation partRel, List *clone, List **cloned);
-
-
 /*
  * CreateConstraintEntry
  *	Create a constraint table entry.
@@ -378,306 +374,6 @@ CreateConstraintEntry(const char *constraintName,
 }
 
 /*
- * CloneForeignKeyConstraints
- *		Clone foreign keys from a partitioned table to a newly acquired
- *		partition.
- *
- * relationId is a partition of parentId, so we can be certain that it has the
- * same columns with the same datatypes.  The columns may be in different
- * order, though.
- *
- * The *cloned list is appended ClonedConstraint elements describing what was
- * created.
- */
-void
-CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
-{
-	Relation	pg_constraint;
-	Relation	parentRel;
-	Relation	rel;
-	ScanKeyData key;
-	SysScanDesc scan;
-	HeapTuple	tuple;
-	List	   *clone = NIL;
-
-	parentRel = heap_open(parentId, NoLock);	/* already got lock */
-	/* see ATAddForeignKeyConstraint about lock level */
-	rel = heap_open(relationId, AccessExclusiveLock);
-	pg_constraint = heap_open(ConstraintRelationId, RowShareLock);
-
-	/* Obtain the list of constraints to clone or attach */
-	ScanKeyInit(&key,
-				Anum_pg_constraint_conrelid, BTEqualStrategyNumber,
-				F_OIDEQ, ObjectIdGetDatum(parentId));
-	scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
-							  NULL, 1, &key);
-	while ((tuple = systable_getnext(scan)) != NULL)
-	{
-		Oid		oid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
-
-		clone = lappend_oid(clone, oid);
-	}
-	systable_endscan(scan);
-
-	/* Do the actual work, recursing to partitions as needed */
-	clone_fk_constraints(pg_constraint, parentRel, rel, clone, cloned);
-
-	/* We're done.  Clean up */
-	heap_close(parentRel, NoLock);
-	heap_close(rel, NoLock);	/* keep lock till commit */
-	heap_close(pg_constraint, RowShareLock);
-}
-
-/*
- * clone_fk_constraints
- *		Recursive subroutine for CloneForeignKeyConstraints
- *
- * Clone the given list of FK constraints when a partition is attached.
- *
- * When cloning foreign keys to a partition, it may happen that equivalent
- * constraints already exist in the partition for some of them.  We can skip
- * creating a clone in that case, and instead just attach the existing
- * constraint to the one in the parent.
- *
- * This function recurses to partitions, if the new partition is partitioned;
- * of course, only do this for FKs that were actually cloned.
- */
-static void
-clone_fk_constraints(Relation pg_constraint, Relation parentRel,
-					 Relation partRel, List *clone, List **cloned)
-{
-	AttrNumber *attmap;
-	List	   *partFKs;
-	List	   *subclone = NIL;
-	ListCell   *cell;
-
-	/*
-	 * The constraint key may differ, if the columns in the partition are
-	 * different.  This map is used to convert them.
-	 */
-	attmap = convert_tuples_by_name_map(RelationGetDescr(partRel),
-										RelationGetDescr(parentRel),
-										gettext_noop("could not convert row type"));
-
-	partFKs = copyObject(RelationGetFKeyList(partRel));
-
-	foreach(cell, clone)
-	{
-		Oid			parentConstrOid = lfirst_oid(cell);
-		Form_pg_constraint constrForm;
-		HeapTuple	tuple;
-		AttrNumber	conkey[INDEX_MAX_KEYS];
-		AttrNumber	mapped_conkey[INDEX_MAX_KEYS];
-		AttrNumber	confkey[INDEX_MAX_KEYS];
-		Oid			conpfeqop[INDEX_MAX_KEYS];
-		Oid			conppeqop[INDEX_MAX_KEYS];
-		Oid			conffeqop[INDEX_MAX_KEYS];
-		Constraint *fkconstraint;
-		bool		attach_it;
-		Oid			constrOid;
-		ObjectAddress parentAddr,
-					childAddr;
-		int			nelem;
-		ListCell   *cell;
-		int			i;
-
-		tuple = SearchSysCache1(CONSTROID, parentConstrOid);
-		if (!tuple)
-			elog(ERROR, "cache lookup failed for constraint %u",
-				 parentConstrOid);
-		constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
-
-		/* only foreign keys */
-		if (constrForm->contype != CONSTRAINT_FOREIGN)
-		{
-			ReleaseSysCache(tuple);
-			continue;
-		}
-
-		ObjectAddressSet(parentAddr, ConstraintRelationId, parentConstrOid);
-
-		DeconstructConstraintRow(tuple, &nelem, conkey, confkey,
-								 conpfeqop, conppeqop, conffeqop);
-		for (i = 0; i < nelem; i++)
-			mapped_conkey[i] = attmap[conkey[i] - 1];
-
-		/*
-		 * Before creating a new constraint, see whether any existing FKs are
-		 * fit for the purpose.  If one is, attach the parent constraint to it,
-		 * and don't clone anything.  This way we avoid the expensive
-		 * verification step and don't end up with a duplicate FK.  This also
-		 * means we don't consider this constraint when recursing to
-		 * partitions.
-		 */
-		attach_it = false;
-		foreach(cell, partFKs)
-		{
-			ForeignKeyCacheInfo *fk = lfirst_node(ForeignKeyCacheInfo, cell);
-			Form_pg_constraint partConstr;
-			HeapTuple	partcontup;
-
-			attach_it = true;
-
-			/*
-			 * Do some quick & easy initial checks.  If any of these fail, we
-			 * cannot use this constraint, but keep looking.
-			 */
-			if (fk->confrelid != constrForm->confrelid || fk->nkeys != nelem)
-			{
-				attach_it = false;
-				continue;
-			}
-			for (i = 0; i < nelem; i++)
-			{
-				if (fk->conkey[i] != mapped_conkey[i] ||
-					fk->confkey[i] != confkey[i] ||
-					fk->conpfeqop[i] != conpfeqop[i])
-				{
-					attach_it = false;
-					break;
-				}
-			}
-			if (!attach_it)
-				continue;
-
-			/*
-			 * Looks good so far; do some more extensive checks.  Presumably
-			 * the check for 'convalidated' could be dropped, since we don't
-			 * really care about that, but let's be careful for now.
-			 */
-			partcontup = SearchSysCache1(CONSTROID,
-										 ObjectIdGetDatum(fk->conoid));
-			if (!partcontup)
-				elog(ERROR, "cache lookup failed for constraint %u",
-					 fk->conoid);
-			partConstr = (Form_pg_constraint) GETSTRUCT(partcontup);
-			if (OidIsValid(partConstr->conparentid) ||
-				!partConstr->convalidated ||
-				partConstr->condeferrable != constrForm->condeferrable ||
-				partConstr->condeferred != constrForm->condeferred ||
-				partConstr->confupdtype != constrForm->confupdtype ||
-				partConstr->confdeltype != constrForm->confdeltype ||
-				partConstr->confmatchtype != constrForm->confmatchtype)
-			{
-				ReleaseSysCache(partcontup);
-				attach_it = false;
-				continue;
-			}
-
-			ReleaseSysCache(partcontup);
-
-			/* looks good!  Attach this constraint */
-			ConstraintSetParentConstraint(fk->conoid, constrForm->oid);
-			CommandCounterIncrement();
-			attach_it = true;
-			break;
-		}
-
-		/*
-		 * If we attached to an existing constraint, there is no need to
-		 * create a new one.  In fact, there's no need to recurse for this
-		 * constraint to partitions, either.
-		 */
-		if (attach_it)
-		{
-			ReleaseSysCache(tuple);
-			continue;
-		}
-
-		constrOid =
-			CreateConstraintEntry(NameStr(constrForm->conname),
-								  constrForm->connamespace,
-								  CONSTRAINT_FOREIGN,
-								  constrForm->condeferrable,
-								  constrForm->condeferred,
-								  constrForm->convalidated,
-								  constrForm->oid,
-								  RelationGetRelid(partRel),
-								  mapped_conkey,
-								  nelem,
-								  nelem,
-								  InvalidOid,	/* not a domain constraint */
-								  constrForm->conindid, /* same index */
-								  constrForm->confrelid,	/* same foreign rel */
-								  confkey,
-								  conpfeqop,
-								  conppeqop,
-								  conffeqop,
-								  nelem,
-								  constrForm->confupdtype,
-								  constrForm->confdeltype,
-								  constrForm->confmatchtype,
-								  NULL,
-								  NULL,
-								  NULL,
-								  false,
-								  1, false, true);
-		subclone = lappend_oid(subclone, constrOid);
-
-		ObjectAddressSet(childAddr, ConstraintRelationId, constrOid);
-		recordDependencyOn(&childAddr, &parentAddr, DEPENDENCY_INTERNAL_AUTO);
-
-		fkconstraint = makeNode(Constraint);
-		/* for now this is all we need */
-		fkconstraint->conname = pstrdup(NameStr(constrForm->conname));
-		fkconstraint->fk_upd_action = constrForm->confupdtype;
-		fkconstraint->fk_del_action = constrForm->confdeltype;
-		fkconstraint->deferrable = constrForm->condeferrable;
-		fkconstraint->initdeferred = constrForm->condeferred;
-
-		createForeignKeyTriggers(partRel, constrForm->confrelid, fkconstraint,
-								 constrOid, constrForm->conindid, false);
-
-		if (cloned)
-		{
-			ClonedConstraint *newc;
-
-			/*
-			 * Feed back caller about the constraints we created, so that they
-			 * can set up constraint verification.
-			 */
-			newc = palloc(sizeof(ClonedConstraint));
-			newc->relid = RelationGetRelid(partRel);
-			newc->refrelid = constrForm->confrelid;
-			newc->conindid = constrForm->conindid;
-			newc->conid = constrOid;
-			newc->constraint = fkconstraint;
-
-			*cloned = lappend(*cloned, newc);
-		}
-
-		ReleaseSysCache(tuple);
-	}
-
-	pfree(attmap);
-	list_free_deep(partFKs);
-
-	/*
-	 * If the partition is partitioned, recurse to handle any constraints that
-	 * were cloned.
-	 */
-	if (partRel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE &&
-		subclone != NIL)
-	{
-		PartitionDesc partdesc = RelationGetPartitionDesc(partRel);
-		int			i;
-
-		for (i = 0; i < partdesc->nparts; i++)
-		{
-			Relation	childRel;
-
-			childRel = heap_open(partdesc->oids[i], AccessExclusiveLock);
-			clone_fk_constraints(pg_constraint,
-								 partRel,
-								 childRel,
-								 subclone,
-								 cloned);
-			heap_close(childRel, NoLock);	/* keep lock till commit */
-		}
-	}
-}
-
-/*
  * Test whether given name is currently used as a constraint name
  * for the given object (relation or domain).
  *
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 843ed48aa7..31ca651213 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -412,6 +412,10 @@ static ObjectAddress ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *
 						  Relation rel, Constraint *fkconstraint, Oid parentConstr,
 						  bool recurse, bool recursing,
 						  LOCKMODE lockmode);
+static void CloneForeignKeyConstraints(Oid parentId, Oid relationId,
+						   List **cloned);
+static void CloneFkReferencing(Relation pg_constraint, Relation parentRel,
+				   Relation partRel, List *clone, List **cloned);
 static void ATExecDropConstraint(Relation rel, const char *constrName,
 					 DropBehavior behavior,
 					 bool recurse, bool recursing,
@@ -7631,6 +7635,308 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
 }
 
 /*
+ * CloneForeignKeyConstraints
+ *		Clone foreign keys from a partitioned table to a newly acquired
+ *		partition.
+ *
+ * relationId is a partition of parentId, so we can be certain that it has the
+ * same columns with the same datatypes.  The columns may be in different
+ * order, though.
+ *
+ * The *cloned list is appended ClonedConstraint elements describing what was
+ * created, for the purposes of validating the constraint in ALTER TABLE's
+ * Phase 3.
+ */
+static void
+CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
+{
+	Relation	pg_constraint;
+	Relation	parentRel;
+	Relation	rel;
+	ScanKeyData key;
+	SysScanDesc scan;
+	HeapTuple	tuple;
+	List	   *clone = NIL;
+
+	parentRel = heap_open(parentId, NoLock);	/* already got lock */
+	/* see ATAddForeignKeyConstraint about lock level */
+	rel = heap_open(relationId, AccessExclusiveLock);
+	pg_constraint = heap_open(ConstraintRelationId, RowShareLock);
+
+	/* Obtain the list of constraints to clone or attach */
+	ScanKeyInit(&key,
+				Anum_pg_constraint_conrelid, BTEqualStrategyNumber,
+				F_OIDEQ, ObjectIdGetDatum(parentId));
+	scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
+							  NULL, 1, &key);
+	while ((tuple = systable_getnext(scan)) != NULL)
+	{
+		Oid		oid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
+
+		clone = lappend_oid(clone, oid);
+	}
+	systable_endscan(scan);
+
+	/* Do the actual work, recursing to partitions as needed */
+	CloneFkReferencing(pg_constraint, parentRel, rel, clone, cloned);
+
+	/* We're done.  Clean up */
+	heap_close(parentRel, NoLock);
+	heap_close(rel, NoLock);	/* keep lock till commit */
+	heap_close(pg_constraint, RowShareLock);
+}
+
+/*
+ * CloneFkReferencing
+ *		Recursive subroutine for CloneForeignKeyConstraints, referencing side
+ *
+ * Clone the given list of FK constraints when a partition is attached on the
+ * referencing side of those constraints.
+ *
+ * When cloning foreign keys to a partition, it may happen that equivalent
+ * constraints already exist in the partition for some of them.  We can skip
+ * creating a clone in that case, and instead just attach the existing
+ * constraint to the one in the parent.
+ *
+ * This function recurses to partitions, if the new partition is partitioned;
+ * of course, only do this for FKs that were actually cloned.
+ */
+static void
+CloneFkReferencing(Relation pg_constraint, Relation parentRel,
+				   Relation partRel, List *clone, List **cloned)
+{
+	AttrNumber *attmap;
+	List	   *partFKs;
+	List	   *subclone = NIL;
+	ListCell   *cell;
+
+	/*
+	 * The constraint key may differ, if the columns in the partition are
+	 * different.  This map is used to convert them.
+	 */
+	attmap = convert_tuples_by_name_map(RelationGetDescr(partRel),
+										RelationGetDescr(parentRel),
+										gettext_noop("could not convert row type"));
+
+	partFKs = copyObject(RelationGetFKeyList(partRel));
+
+	foreach(cell, clone)
+	{
+		Oid			parentConstrOid = lfirst_oid(cell);
+		Form_pg_constraint constrForm;
+		HeapTuple	tuple;
+		int			numfks;
+		AttrNumber	conkey[INDEX_MAX_KEYS];
+		AttrNumber	mapped_conkey[INDEX_MAX_KEYS];
+		AttrNumber	confkey[INDEX_MAX_KEYS];
+		Oid			conpfeqop[INDEX_MAX_KEYS];
+		Oid			conppeqop[INDEX_MAX_KEYS];
+		Oid			conffeqop[INDEX_MAX_KEYS];
+		Constraint *fkconstraint;
+		bool		attach_it;
+		Oid			constrOid;
+		ObjectAddress parentAddr,
+					childAddr;
+		ListCell   *cell;
+		int			i;
+
+		tuple = SearchSysCache1(CONSTROID, parentConstrOid);
+		if (!tuple)
+			elog(ERROR, "cache lookup failed for constraint %u",
+				 parentConstrOid);
+		constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
+
+		/* only foreign keys */
+		if (constrForm->contype != CONSTRAINT_FOREIGN)
+		{
+			ReleaseSysCache(tuple);
+			continue;
+		}
+
+		ObjectAddressSet(parentAddr, ConstraintRelationId, parentConstrOid);
+
+		DeconstructConstraintRow(tuple, &numfks, conkey, confkey,
+								 conpfeqop, conppeqop, conffeqop);
+		for (i = 0; i < numfks; i++)
+			mapped_conkey[i] = attmap[conkey[i] - 1];
+
+		/*
+		 * Before creating a new constraint, see whether any existing FKs are
+		 * fit for the purpose.  If one is, attach the parent constraint to it,
+		 * and don't clone anything.  This way we avoid the expensive
+		 * verification step and don't end up with a duplicate FK.  This also
+		 * means we don't consider this constraint when recursing to
+		 * partitions.
+		 */
+		attach_it = false;
+		foreach(cell, partFKs)
+		{
+			ForeignKeyCacheInfo *fk = lfirst_node(ForeignKeyCacheInfo, cell);
+			Form_pg_constraint partConstr;
+			HeapTuple	partcontup;
+
+			attach_it = true;
+
+			/*
+			 * Do some quick & easy initial checks.  If any of these fail, we
+			 * cannot use this constraint, but keep looking.
+			 */
+			if (fk->confrelid != constrForm->confrelid || fk->nkeys != numfks)
+			{
+				attach_it = false;
+				continue;
+			}
+			for (i = 0; i < numfks; i++)
+			{
+				if (fk->conkey[i] != mapped_conkey[i] ||
+					fk->confkey[i] != confkey[i] ||
+					fk->conpfeqop[i] != conpfeqop[i])
+				{
+					attach_it = false;
+					break;
+				}
+			}
+			if (!attach_it)
+				continue;
+
+			/*
+			 * Looks good so far; do some more extensive checks.  Presumably
+			 * the check for 'convalidated' could be dropped, since we don't
+			 * really care about that, but let's be careful for now.
+			 */
+			partcontup = SearchSysCache1(CONSTROID,
+										 ObjectIdGetDatum(fk->conoid));
+			if (!partcontup)
+				elog(ERROR, "cache lookup failed for constraint %u",
+					 fk->conoid);
+			partConstr = (Form_pg_constraint) GETSTRUCT(partcontup);
+			if (OidIsValid(partConstr->conparentid) ||
+				!partConstr->convalidated ||
+				partConstr->condeferrable != constrForm->condeferrable ||
+				partConstr->condeferred != constrForm->condeferred ||
+				partConstr->confupdtype != constrForm->confupdtype ||
+				partConstr->confdeltype != constrForm->confdeltype ||
+				partConstr->confmatchtype != constrForm->confmatchtype)
+			{
+				ReleaseSysCache(partcontup);
+				attach_it = false;
+				continue;
+			}
+
+			ReleaseSysCache(partcontup);
+
+			/* looks good!  Attach this constraint */
+			ConstraintSetParentConstraint(fk->conoid, parentConstrOid);
+			CommandCounterIncrement();
+			attach_it = true;
+			break;
+		}
+
+		/*
+		 * If we attached to an existing constraint, there is no need to
+		 * create a new one.  In fact, there's no need to recurse for this
+		 * constraint to partitions, either.
+		 */
+		if (attach_it)
+		{
+			ReleaseSysCache(tuple);
+			continue;
+		}
+
+		constrOid =
+			CreateConstraintEntry(NameStr(constrForm->conname),
+								  constrForm->connamespace,
+								  CONSTRAINT_FOREIGN,
+								  constrForm->condeferrable,
+								  constrForm->condeferred,
+								  constrForm->convalidated,
+								  parentConstrOid,
+								  RelationGetRelid(partRel),
+								  mapped_conkey,
+								  numfks,
+								  numfks,
+								  InvalidOid,	/* not a domain constraint */
+								  constrForm->conindid, /* same index */
+								  constrForm->confrelid,	/* same foreign rel */
+								  confkey,
+								  conpfeqop,
+								  conppeqop,
+								  conffeqop,
+								  numfks,
+								  constrForm->confupdtype,
+								  constrForm->confdeltype,
+								  constrForm->confmatchtype,
+								  NULL,
+								  NULL,
+								  NULL,
+								  false,
+								  1, false, true);
+		subclone = lappend_oid(subclone, constrOid);
+
+		ObjectAddressSet(childAddr, ConstraintRelationId, constrOid);
+		recordDependencyOn(&childAddr, &parentAddr, DEPENDENCY_INTERNAL_AUTO);
+
+		fkconstraint = makeNode(Constraint);
+		/* for now this is all we need */
+		fkconstraint->conname = pstrdup(NameStr(constrForm->conname));
+		fkconstraint->fk_upd_action = constrForm->confupdtype;
+		fkconstraint->fk_del_action = constrForm->confdeltype;
+		fkconstraint->deferrable = constrForm->condeferrable;
+		fkconstraint->initdeferred = constrForm->condeferred;
+
+		createForeignKeyTriggers(partRel, constrForm->confrelid, fkconstraint,
+								 constrOid, constrForm->conindid, false);
+
+		if (cloned)
+		{
+			ClonedConstraint *newc;
+
+			/*
+			 * Feed back caller about the constraints we created, so that they
+			 * can set up constraint verification.
+			 */
+			newc = palloc(sizeof(ClonedConstraint));
+			newc->relid = RelationGetRelid(partRel);
+			newc->refrelid = constrForm->confrelid;
+			newc->conindid = constrForm->conindid;
+			newc->conid = constrOid;
+			newc->constraint = fkconstraint;
+
+			*cloned = lappend(*cloned, newc);
+		}
+
+		ReleaseSysCache(tuple);
+	}
+
+	pfree(attmap);
+	list_free_deep(partFKs);
+
+	/*
+	 * If the partition is partitioned, recurse to handle any constraints that
+	 * were cloned.
+	 */
+	if (partRel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE &&
+		subclone != NIL)
+	{
+		PartitionDesc partdesc = RelationGetPartitionDesc(partRel);
+		int			i;
+
+		for (i = 0; i < partdesc->nparts; i++)
+		{
+			Relation	childRel;
+
+			childRel = heap_open(partdesc->oids[i], AccessExclusiveLock);
+			CloneFkReferencing(pg_constraint,
+							   partRel,
+							   childRel,
+							   subclone,
+							   cloned);
+			heap_close(childRel, NoLock);	/* keep lock till commit */
+		}
+	}
+}
+
+/*
  * ALTER TABLE ALTER CONSTRAINT
  *
  * Update the attributes of a constraint.
diff --git a/src/include/catalog/pg_constraint.h b/src/include/catalog/pg_constraint.h
index 8405fe2eea..8b2af36ba7 100644
--- a/src/include/catalog/pg_constraint.h
+++ b/src/include/catalog/pg_constraint.h
@@ -226,9 +226,6 @@ extern Oid CreateConstraintEntry(const char *constraintName,
 					  bool conNoInherit,
 					  bool is_internal);
 
-extern void CloneForeignKeyConstraints(Oid parentId, Oid relationId,
-						   List **cloned);
-
 extern void RemoveConstraintById(Oid conId);
 extern void RenameConstraintById(Oid conId, const char *newname);
 
-- 
2.11.0

