From 6b83385b76adb9db26483499290eb3621c415ace Mon Sep 17 00:00:00 2001
From: Jehan-Guillaume de Rorthais <jgdr@dalibo.com>
Date: Tue, 4 Feb 2020 18:05:17 +0100
Subject: [PATCH] Fix TRUNCATE on a partition to apply CASCADE to partitioned
 table.

Previously, when running TRUNCATE CASCADE on a child of a
partitioned table referenced by another partitioned table, the
truncate was not applied to referencing children. This leaves a
FK constraint violation in the referencing partitioned table.

This commit fixes that bug.

Reported-by: Christophe Courtois
Author: Jehan-Guillaume de Rorthais
---
 doc/src/sgml/ref/truncate.sgml         |  3 +
 src/backend/catalog/heap.c             | 81 +++++++++++++++++++++-----
 src/test/regress/expected/truncate.out | 50 ++++++++++++++++
 src/test/regress/sql/truncate.sql      | 38 ++++++++++++
 4 files changed, 157 insertions(+), 15 deletions(-)

diff --git a/doc/src/sgml/ref/truncate.sgml b/doc/src/sgml/ref/truncate.sgml
index c1e42376ab..86296d0699 100644
--- a/doc/src/sgml/ref/truncate.sgml
+++ b/doc/src/sgml/ref/truncate.sgml
@@ -124,6 +124,9 @@ TRUNCATE [ TABLE ] [ ONLY ] <replaceable class="parameter">name</replaceable> [
    option can be used to automatically include all dependent tables &mdash;
    but be very careful when using this option, or else you might lose data you
    did not intend to!
+   Note in particular that when the table to be truncated is a partition,
+   siblings partitions are left untouched, but cascading occurs to all
+   referencing tables and all their partitions wit no distinction.
   </para>
 
   <para>
diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c
index 046b3d37ce..7e8baf63dd 100644
--- a/src/backend/catalog/heap.c
+++ b/src/backend/catalog/heap.c
@@ -3396,39 +3396,90 @@ List *
 heap_truncate_find_FKs(List *relationIds)
 {
 	List	   *result = NIL;
+	List	   *oids = NIL;
+	List	   *parent_cons = NIL;
+	ListCell   *cell;
+	ScanKeyData key;
 	Relation	fkeyRel;
 	SysScanDesc fkeyScan;
 	HeapTuple	tuple;
 
+	oids = list_copy(relationIds);
+
 	/*
 	 * Must scan pg_constraint.  Right now, it is a seqscan because there is
 	 * no available index on confrelid.
 	 */
 	fkeyRel = table_open(ConstraintRelationId, AccessShareLock);
 
-	fkeyScan = systable_beginscan(fkeyRel, InvalidOid, false,
-								  NULL, 0, NULL);
+	for (;;) {
+		int num_oids = oids->length;
 
-	while (HeapTupleIsValid(tuple = systable_getnext(fkeyScan)))
-	{
-		Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple);
+		fkeyScan = systable_beginscan(fkeyRel, InvalidOid, false,
+									  NULL, 0, NULL);
 
-		/* Not a foreign key */
-		if (con->contype != CONSTRAINT_FOREIGN)
-			continue;
+		while (HeapTupleIsValid(tuple = systable_getnext(fkeyScan)))
+		{
+			Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple);
 
-		/* Not referencing one of our list of tables */
-		if (!list_member_oid(relationIds, con->confrelid))
-			continue;
+			/* Not a foreign key */
+			if (con->contype != CONSTRAINT_FOREIGN)
+				continue;
+
+			/* Not referencing one of our list of tables */
+			if (!list_member_oid(oids, con->confrelid))
+				continue;
+
+			/* have a parent constraint */
+			if (OidIsValid(con->conparentid) &&
+				!list_member_oid(parent_cons, con->conparentid))
+				parent_cons = lappend_oid(parent_cons, con->conparentid);
+
+			/* Add referencer to result, unless present in input list */
+			if (!list_member_oid(relationIds, con->conrelid) )
+				result = lappend_oid(result, con->conrelid);
+		}
+
+		systable_endscan(fkeyScan);
 
-		/* Add referencer to result, unless present in input list */
-		if (!list_member_oid(relationIds, con->conrelid))
-			result = lappend_oid(result, con->conrelid);
+		if (parent_cons == NIL)
+			break;
+
+		foreach(cell, parent_cons)
+		{
+			Oid		parent = lfirst_oid(cell);
+
+			ScanKeyInit(&key,
+					Anum_pg_constraint_oid,
+					BTEqualStrategyNumber, F_OIDEQ,
+					ObjectIdGetDatum(parent));
+
+			fkeyScan = systable_beginscan(fkeyRel, ConstraintOidIndexId,
+										  true, NULL, 1, &key);
+
+			while (HeapTupleIsValid(tuple = systable_getnext(fkeyScan)))
+			{
+				Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple);
+				/* Add referencer to result, unless present in input list */
+				if (!list_member_oid(oids, con->confrelid))
+					oids = lappend_oid(oids, con->confrelid);
+			}
+
+			systable_endscan(fkeyScan);
+		}
+
+		if (num_oids == oids->length)
+			break;
+
+		list_free(parent_cons);
+		parent_cons = NIL;
 	}
 
-	systable_endscan(fkeyScan);
 	table_close(fkeyRel, AccessShareLock);
 
+	list_free(parent_cons);
+	list_free(oids);
+
 	/* Now sort and de-duplicate the result list */
 	list_sort(result, list_oid_cmp);
 	list_deduplicate_oid(result);
diff --git a/src/test/regress/expected/truncate.out b/src/test/regress/expected/truncate.out
index cc68274dca..1e88e867bf 100644
--- a/src/test/regress/expected/truncate.out
+++ b/src/test/regress/expected/truncate.out
@@ -542,3 +542,53 @@ SELECT * FROM tp_chk_data();
 
 DROP TABLE truncprim, truncpart;
 DROP FUNCTION tp_ins_data(), tp_chk_data();
+-- test cascade when referencing a partitioned table
+CREATE TABLE trunc_a (a INT PRIMARY KEY) PARTITION BY RANGE (a);
+CREATE TABLE trunc_a1 PARTITION OF trunc_a FOR VALUES FROM (0) TO (10);
+CREATE TABLE trunc_a2 PARTITION OF trunc_a FOR VALUES FROM (10) TO (20)
+  PARTITION BY RANGE (a);
+CREATE TABLE trunc_a21 PARTITION OF trunc_a2 FOR VALUES FROM (10) TO (12);
+CREATE TABLE trunc_a22 PARTITION OF trunc_a2 FOR VALUES FROM (12) TO (16);
+CREATE TABLE trunc_a2d PARTITION OF trunc_a2 DEFAULT;
+CREATE TABLE trunc_a3 PARTITION OF trunc_a FOR VALUES FROM (20) TO (30);
+INSERT INTO trunc_a VALUES (0), (5), (10), (15), (20), (25);
+-- truncate a partition cascading to a table
+CREATE TABLE ref_b (
+    b INT PRIMARY KEY,
+    a INT REFERENCES trunc_a(a) ON DELETE CASCADE
+);
+INSERT INTO ref_b VALUES (10, 0), (50, 5), (100, 10), (150, 15);
+TRUNCATE TABLE trunc_a1 CASCADE;
+NOTICE:  truncate cascades to table "ref_b"
+SELECT a FROM ref_b;
+ a 
+---
+(0 rows)
+
+DROP TABLE ref_b;
+-- truncate a partition cascading to a partitioned table
+CREATE TABLE ref_c (
+    c INT PRIMARY KEY,
+    a INT REFERENCES trunc_a(a) ON DELETE CASCADE
+) PARTITION BY RANGE (c);
+CREATE TABLE ref_c1 PARTITION OF ref_c FOR VALUES FROM (100) TO (200);
+CREATE TABLE ref_c2 PARTITION OF ref_c FOR VALUES FROM (200) TO (300);
+INSERT INTO ref_c VALUES (100, 10), (150, 15), (200, 20), (250, 25);
+TRUNCATE TABLE trunc_a21 CASCADE;
+NOTICE:  truncate cascades to table "ref_c"
+NOTICE:  truncate cascades to table "ref_c1"
+NOTICE:  truncate cascades to table "ref_c2"
+SELECT a as "from table ref_c" FROM ref_c;
+ from table ref_c 
+------------------
+(0 rows)
+
+SELECT a as "from table trunc_a" FROM trunc_a ORDER BY a;
+ from table trunc_a 
+--------------------
+                 15
+                 20
+                 25
+(3 rows)
+
+DROP TABLE trunc_a, ref_c;
diff --git a/src/test/regress/sql/truncate.sql b/src/test/regress/sql/truncate.sql
index 28395e82bf..54f26e3077 100644
--- a/src/test/regress/sql/truncate.sql
+++ b/src/test/regress/sql/truncate.sql
@@ -289,3 +289,41 @@ TRUNCATE TABLE truncpart;
 SELECT * FROM tp_chk_data();
 DROP TABLE truncprim, truncpart;
 DROP FUNCTION tp_ins_data(), tp_chk_data();
+
+-- test cascade when referencing a partitioned table
+CREATE TABLE trunc_a (a INT PRIMARY KEY) PARTITION BY RANGE (a);
+CREATE TABLE trunc_a1 PARTITION OF trunc_a FOR VALUES FROM (0) TO (10);
+CREATE TABLE trunc_a2 PARTITION OF trunc_a FOR VALUES FROM (10) TO (20)
+  PARTITION BY RANGE (a);
+CREATE TABLE trunc_a21 PARTITION OF trunc_a2 FOR VALUES FROM (10) TO (12);
+CREATE TABLE trunc_a22 PARTITION OF trunc_a2 FOR VALUES FROM (12) TO (16);
+CREATE TABLE trunc_a2d PARTITION OF trunc_a2 DEFAULT;
+CREATE TABLE trunc_a3 PARTITION OF trunc_a FOR VALUES FROM (20) TO (30);
+INSERT INTO trunc_a VALUES (0), (5), (10), (15), (20), (25);
+
+-- truncate a partition cascading to a table
+CREATE TABLE ref_b (
+    b INT PRIMARY KEY,
+    a INT REFERENCES trunc_a(a) ON DELETE CASCADE
+);
+INSERT INTO ref_b VALUES (10, 0), (50, 5), (100, 10), (150, 15);
+
+TRUNCATE TABLE trunc_a1 CASCADE;
+SELECT a FROM ref_b;
+
+DROP TABLE ref_b;
+
+-- truncate a partition cascading to a partitioned table
+CREATE TABLE ref_c (
+    c INT PRIMARY KEY,
+    a INT REFERENCES trunc_a(a) ON DELETE CASCADE
+) PARTITION BY RANGE (c);
+CREATE TABLE ref_c1 PARTITION OF ref_c FOR VALUES FROM (100) TO (200);
+CREATE TABLE ref_c2 PARTITION OF ref_c FOR VALUES FROM (200) TO (300);
+INSERT INTO ref_c VALUES (100, 10), (150, 15), (200, 20), (250, 25);
+
+TRUNCATE TABLE trunc_a21 CASCADE;
+SELECT a as "from table ref_c" FROM ref_c;
+SELECT a as "from table trunc_a" FROM trunc_a ORDER BY a;
+
+DROP TABLE trunc_a, ref_c;
-- 
2.20.1

