From e41389c4a873fbf7dd28907cb4624dffc56cb3d9 Mon Sep 17 00:00:00 2001
From: nkey <nkey@toloka.ai>
Date: Fri, 16 Aug 2024 11:19:37 +0200
Subject: [PATCH v2] specs to reproduce issues with CREATE INDEX/REINDEX
 CONCURRENTLY for UNIQUE indexes and INSERT ON CONFLICT UPDATE using injection
 points

---
 src/backend/commands/indexcmds.c              |   5 +-
 src/backend/executor/execIndexing.c           |   3 +
 src/backend/executor/nodeModifyTable.c        |   2 +
 src/backend/utils/time/snapmgr.c              |   2 +
 src/test/modules/injection_points/Makefile    |   2 +-
 .../expected/index_concurrently_upsert.out    |  80 ++++++
 .../expected/reindex_concurrently_upsert.out  | 238 ++++++++++++++++++
 src/test/modules/injection_points/meson.build |   2 +
 .../specs/index_concurrently_upsert.spec      |  68 +++++
 .../specs/reindex_concurrently_upsert.spec    |  86 +++++++
 10 files changed, 486 insertions(+), 2 deletions(-)
 create mode 100644 src/test/modules/injection_points/expected/index_concurrently_upsert.out
 create mode 100644 src/test/modules/injection_points/expected/reindex_concurrently_upsert.out
 create mode 100644 src/test/modules/injection_points/specs/index_concurrently_upsert.spec
 create mode 100644 src/test/modules/injection_points/specs/reindex_concurrently_upsert.spec

diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index 2caab88aa5..822467bdd5 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -69,6 +69,7 @@
 #include "utils/regproc.h"
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
+#include "utils/injection_point.h"
 
 
 /* non-export function prototypes */
@@ -1775,6 +1776,7 @@ DefineIndex(Oid tableId,
 	pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
 								 PROGRESS_CREATEIDX_PHASE_WAIT_3);
 	WaitForOlderSnapshots(limitXmin, true);
+	INJECTION_POINT("define_index_before_set_valid");
 
 	/*
 	 * Index can now be marked valid -- update its pg_index entry
@@ -4078,7 +4080,7 @@ ReindexRelationConcurrently(const ReindexStmt *stmt, Oid relationOid, const Rein
 	 * the same time to make sure we only get constraint violations from the
 	 * indexes with the correct names.
 	 */
-
+	INJECTION_POINT("reindex_relation_concurrently_before_swap");
 	StartTransactionCommand();
 
 	/*
@@ -4152,6 +4154,7 @@ ReindexRelationConcurrently(const ReindexStmt *stmt, Oid relationOid, const Rein
 	pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
 								 PROGRESS_CREATEIDX_PHASE_WAIT_4);
 	WaitForLockersMultiple(lockTags, AccessExclusiveLock, true);
+	INJECTION_POINT("reindex_relation_concurrently_before_set_dead");
 
 	foreach(lc, indexIds)
 	{
diff --git a/src/backend/executor/execIndexing.c b/src/backend/executor/execIndexing.c
index 9f05b3654c..1d451a329a 100644
--- a/src/backend/executor/execIndexing.c
+++ b/src/backend/executor/execIndexing.c
@@ -115,6 +115,7 @@
 #include "nodes/nodeFuncs.h"
 #include "storage/lmgr.h"
 #include "utils/snapmgr.h"
+#include "utils/injection_point.h"
 
 /* waitMode argument to check_exclusion_or_unique_constraint() */
 typedef enum
@@ -901,6 +902,8 @@ retry:
 	econtext->ecxt_scantuple = save_scantuple;
 
 	ExecDropSingleTupleTableSlot(existing_slot);
+	if (!conflict)
+		INJECTION_POINT("check_exclusion_or_unique_constraint_no_conflict");
 
 	return !conflict;
 }
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 4913e49319..65bc63c612 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -69,6 +69,7 @@
 #include "utils/datum.h"
 #include "utils/rel.h"
 #include "utils/snapmgr.h"
+#include "utils/injection_point.h"
 
 
 typedef struct MTTargetRelLookup
@@ -1084,6 +1085,7 @@ ExecInsert(ModifyTableContext *context,
 					return NULL;
 				}
 			}
+			INJECTION_POINT("exec_insert_before_insert_speculative");
 
 			/*
 			 * Before we start insertion proper, acquire our "speculative
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index 7d2b34d4f2..3a7357a050 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -64,6 +64,7 @@
 #include "utils/resowner.h"
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
+#include "utils/injection_point.h"
 
 
 /*
@@ -426,6 +427,7 @@ InvalidateCatalogSnapshot(void)
 		pairingheap_remove(&RegisteredSnapshots, &CatalogSnapshot->ph_node);
 		CatalogSnapshot = NULL;
 		SnapshotResetXmin();
+		INJECTION_POINT("invalidate_catalog_snapshot_end");
 	}
 }
 
diff --git a/src/test/modules/injection_points/Makefile b/src/test/modules/injection_points/Makefile
index 2ffd2f77ed..9777c48367 100644
--- a/src/test/modules/injection_points/Makefile
+++ b/src/test/modules/injection_points/Makefile
@@ -9,7 +9,7 @@ PGFILEDESC = "injection_points - facility for injection points"
 REGRESS = injection_points
 REGRESS_OPTS = --dlpath=$(top_builddir)/src/test/regress
 
-ISOLATION = inplace
+ISOLATION = inplace reindex_concurrently_upsert index_concurrently_upsert
 
 # The injection points are cluster-wide, so disable installcheck
 NO_INSTALLCHECK = 1
diff --git a/src/test/modules/injection_points/expected/index_concurrently_upsert.out b/src/test/modules/injection_points/expected/index_concurrently_upsert.out
new file mode 100644
index 0000000000..f39a6d452a
--- /dev/null
+++ b/src/test/modules/injection_points/expected/index_concurrently_upsert.out
@@ -0,0 +1,80 @@
+Parsed test spec with 4 sessions
+
+starting permutation: s3_start_create_index s1_start_upsert s4_wakeup_define_index_before_set_valid s2_start_upsert s4_wakeup_s1_from_invalidate_catalog_snapshot s4_wakeup_s2 s4_wakeup_s1
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+step s3_start_create_index: CREATE UNIQUE INDEX CONCURRENTLY tbl_pkey_duplicate ON test.tbl(i); <waiting ...>
+step s1_start_upsert: INSERT INTO test.tbl VALUES(13,now()) on conflict(i) do update set updated_at = now(); <waiting ...>
+step s4_wakeup_define_index_before_set_valid: 
+	SELECT injection_points_wakeup('define_index_before_set_valid');
+	SELECT injection_points_detach('define_index_before_set_valid');
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+step s3_start_create_index: <... completed>
+step s2_start_upsert: INSERT INTO test.tbl VALUES(13,now()) on conflict(i) do update set updated_at = now(); <waiting ...>
+step s4_wakeup_s1_from_invalidate_catalog_snapshot: 
+	SELECT injection_points_wakeup('invalidate_catalog_snapshot_end');
+	SELECT injection_points_detach('invalidate_catalog_snapshot_end');
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+step s4_wakeup_s2: 
+	SELECT injection_points_wakeup('exec_insert_before_insert_speculative');
+	SELECT injection_points_detach('exec_insert_before_insert_speculative');
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+step s2_start_upsert: <... completed>
+step s4_wakeup_s1: 
+	SELECT injection_points_wakeup('check_exclusion_or_unique_constraint_no_conflict');
+	SELECT injection_points_detach('check_exclusion_or_unique_constraint_no_conflict');
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+step s1_start_upsert: <... completed>
diff --git a/src/test/modules/injection_points/expected/reindex_concurrently_upsert.out b/src/test/modules/injection_points/expected/reindex_concurrently_upsert.out
new file mode 100644
index 0000000000..b7639ff7e6
--- /dev/null
+++ b/src/test/modules/injection_points/expected/reindex_concurrently_upsert.out
@@ -0,0 +1,238 @@
+Parsed test spec with 4 sessions
+
+starting permutation: s3_start_reindex s1_start_upsert s4_wakeup_to_swap s2_start_upsert s4_wakeup_s1 s4_wakeup_s2 s4_wakeup_to_set_dead
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+step s3_start_reindex: REINDEX INDEX CONCURRENTLY test.tbl_pkey; <waiting ...>
+step s1_start_upsert: INSERT INTO test.tbl VALUES(13,now()) on conflict(i) do update set updated_at = now(); <waiting ...>
+step s4_wakeup_to_swap: 
+	SELECT injection_points_wakeup('reindex_relation_concurrently_before_swap');
+	SELECT injection_points_detach('reindex_relation_concurrently_before_swap');
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+step s2_start_upsert: INSERT INTO test.tbl VALUES(13,now()) on conflict(i) do update set updated_at = now(); <waiting ...>
+step s4_wakeup_s1: 
+	SELECT injection_points_wakeup('check_exclusion_or_unique_constraint_no_conflict');
+	SELECT injection_points_detach('check_exclusion_or_unique_constraint_no_conflict');
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+step s1_start_upsert: <... completed>
+step s4_wakeup_s2: 
+	SELECT injection_points_wakeup('exec_insert_before_insert_speculative');
+	SELECT injection_points_detach('exec_insert_before_insert_speculative');
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+step s2_start_upsert: <... completed>
+step s4_wakeup_to_set_dead: 
+	SELECT injection_points_wakeup('reindex_relation_concurrently_before_set_dead');
+	SELECT injection_points_detach('reindex_relation_concurrently_before_set_dead');
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+step s3_start_reindex: <... completed>
+
+starting permutation: s3_start_reindex s2_start_upsert s4_wakeup_to_swap s1_start_upsert s4_wakeup_s1 s4_wakeup_s2 s4_wakeup_to_set_dead
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+step s3_start_reindex: REINDEX INDEX CONCURRENTLY test.tbl_pkey; <waiting ...>
+step s2_start_upsert: INSERT INTO test.tbl VALUES(13,now()) on conflict(i) do update set updated_at = now(); <waiting ...>
+step s4_wakeup_to_swap: 
+	SELECT injection_points_wakeup('reindex_relation_concurrently_before_swap');
+	SELECT injection_points_detach('reindex_relation_concurrently_before_swap');
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+step s1_start_upsert: INSERT INTO test.tbl VALUES(13,now()) on conflict(i) do update set updated_at = now(); <waiting ...>
+step s4_wakeup_s1: 
+	SELECT injection_points_wakeup('check_exclusion_or_unique_constraint_no_conflict');
+	SELECT injection_points_detach('check_exclusion_or_unique_constraint_no_conflict');
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+step s1_start_upsert: <... completed>
+step s4_wakeup_s2: 
+	SELECT injection_points_wakeup('exec_insert_before_insert_speculative');
+	SELECT injection_points_detach('exec_insert_before_insert_speculative');
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+step s2_start_upsert: <... completed>
+step s4_wakeup_to_set_dead: 
+	SELECT injection_points_wakeup('reindex_relation_concurrently_before_set_dead');
+	SELECT injection_points_detach('reindex_relation_concurrently_before_set_dead');
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+step s3_start_reindex: <... completed>
+
+starting permutation: s3_start_reindex s4_wakeup_to_swap s1_start_upsert s2_start_upsert s4_wakeup_s1 s4_wakeup_to_set_dead s4_wakeup_s2
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+step s3_start_reindex: REINDEX INDEX CONCURRENTLY test.tbl_pkey; <waiting ...>
+step s4_wakeup_to_swap: 
+	SELECT injection_points_wakeup('reindex_relation_concurrently_before_swap');
+	SELECT injection_points_detach('reindex_relation_concurrently_before_swap');
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+step s1_start_upsert: INSERT INTO test.tbl VALUES(13,now()) on conflict(i) do update set updated_at = now(); <waiting ...>
+step s2_start_upsert: INSERT INTO test.tbl VALUES(13,now()) on conflict(i) do update set updated_at = now(); <waiting ...>
+step s4_wakeup_s1: 
+	SELECT injection_points_wakeup('check_exclusion_or_unique_constraint_no_conflict');
+	SELECT injection_points_detach('check_exclusion_or_unique_constraint_no_conflict');
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+step s1_start_upsert: <... completed>
+step s4_wakeup_to_set_dead: 
+	SELECT injection_points_wakeup('reindex_relation_concurrently_before_set_dead');
+	SELECT injection_points_detach('reindex_relation_concurrently_before_set_dead');
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+step s4_wakeup_s2: 
+	SELECT injection_points_wakeup('exec_insert_before_insert_speculative');
+	SELECT injection_points_detach('exec_insert_before_insert_speculative');
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+step s3_start_reindex: <... completed>
+step s2_start_upsert: <... completed>
diff --git a/src/test/modules/injection_points/meson.build b/src/test/modules/injection_points/meson.build
index 3c23c14d81..f2aa60702b 100644
--- a/src/test/modules/injection_points/meson.build
+++ b/src/test/modules/injection_points/meson.build
@@ -40,6 +40,8 @@ tests += {
   'isolation': {
     'specs': [
       'inplace',
+      'reindex_concurrently_upsert',
+      'index_concurrently_upsert',
     ],
   },
 }
diff --git a/src/test/modules/injection_points/specs/index_concurrently_upsert.spec b/src/test/modules/injection_points/specs/index_concurrently_upsert.spec
new file mode 100644
index 0000000000..5d6aba9073
--- /dev/null
+++ b/src/test/modules/injection_points/specs/index_concurrently_upsert.spec
@@ -0,0 +1,68 @@
+# Test race conditions involving:
+# - s1: UPSERT a tuple
+# - s2: UPSERT the same tuple
+# - s3: CREATE UNIQUE INDEX CONCURRENTLY
+# - s4: operations with injection points
+
+setup
+{
+	CREATE EXTENSION injection_points;
+	CREATE SCHEMA test;
+	CREATE UNLOGGED TABLE test.tbl(i int primary key, updated_at timestamp);
+	ALTER TABLE test.tbl SET (parallel_workers=0);
+}
+
+teardown
+{
+	DROP SCHEMA test CASCADE;
+	DROP EXTENSION injection_points;
+}
+
+session s1
+setup	{
+	SELECT injection_points_set_local();
+	SELECT injection_points_attach('check_exclusion_or_unique_constraint_no_conflict', 'wait');
+	SELECT injection_points_attach('invalidate_catalog_snapshot_end', 'wait');
+}
+step s1_start_upsert	{ INSERT INTO test.tbl VALUES(13,now()) on conflict(i) do update set updated_at = now(); }
+
+session s2
+setup	{
+	SELECT injection_points_set_local();
+	SELECT injection_points_attach('exec_insert_before_insert_speculative', 'wait');
+}
+step s2_start_upsert	{ INSERT INTO test.tbl VALUES(13,now()) on conflict(i) do update set updated_at = now(); }
+
+session s3
+setup	{
+	SELECT injection_points_set_local();
+	SELECT injection_points_attach('define_index_before_set_valid', 'wait');
+}
+step s3_start_create_index		{ CREATE UNIQUE INDEX CONCURRENTLY tbl_pkey_duplicate ON test.tbl(i); }
+
+session s4
+step s4_wakeup_s1		{
+	SELECT injection_points_wakeup('check_exclusion_or_unique_constraint_no_conflict');
+	SELECT injection_points_detach('check_exclusion_or_unique_constraint_no_conflict');
+}
+step s4_wakeup_s1_from_invalidate_catalog_snapshot	{
+	SELECT injection_points_wakeup('invalidate_catalog_snapshot_end');
+	SELECT injection_points_detach('invalidate_catalog_snapshot_end');
+}
+step s4_wakeup_s2		{
+	SELECT injection_points_wakeup('exec_insert_before_insert_speculative');
+	SELECT injection_points_detach('exec_insert_before_insert_speculative');
+}
+step s4_wakeup_define_index_before_set_valid	{
+	SELECT injection_points_wakeup('define_index_before_set_valid');
+	SELECT injection_points_detach('define_index_before_set_valid');
+}
+
+permutation
+	s3_start_create_index
+	s1_start_upsert
+	s4_wakeup_define_index_before_set_valid
+	s2_start_upsert
+	s4_wakeup_s1_from_invalidate_catalog_snapshot
+	s4_wakeup_s2
+	s4_wakeup_s1
\ No newline at end of file
diff --git a/src/test/modules/injection_points/specs/reindex_concurrently_upsert.spec b/src/test/modules/injection_points/specs/reindex_concurrently_upsert.spec
new file mode 100644
index 0000000000..c6ad2c4198
--- /dev/null
+++ b/src/test/modules/injection_points/specs/reindex_concurrently_upsert.spec
@@ -0,0 +1,86 @@
+# Test race conditions involving:
+# - s1: UPSERT a tuple
+# - s2: UPSERT the same tuple
+# - s3: REINDEX concurrent primary key index
+# - s4: operations with injection points
+
+setup
+{
+	CREATE EXTENSION injection_points;
+	CREATE SCHEMA test;
+	CREATE UNLOGGED TABLE test.tbl(i int primary key, updated_at timestamp);
+	ALTER TABLE test.tbl SET (parallel_workers=0);
+}
+
+teardown
+{
+	DROP SCHEMA test CASCADE;
+	DROP EXTENSION injection_points;
+}
+
+session s1
+setup	{
+	SELECT injection_points_set_local();
+	SELECT injection_points_attach('check_exclusion_or_unique_constraint_no_conflict', 'wait');
+}
+step s1_start_upsert	{ INSERT INTO test.tbl VALUES(13,now()) on conflict(i) do update set updated_at = now(); }
+
+session s2
+setup	{
+	SELECT injection_points_set_local();
+	SELECT injection_points_attach('exec_insert_before_insert_speculative', 'wait');
+}
+step s2_start_upsert	{ INSERT INTO test.tbl VALUES(13,now()) on conflict(i) do update set updated_at = now(); }
+
+session s3
+setup	{
+	SELECT injection_points_set_local();
+	SELECT injection_points_attach('reindex_relation_concurrently_before_set_dead', 'wait');
+	SELECT injection_points_attach('reindex_relation_concurrently_before_swap', 'wait');
+}
+step s3_start_reindex			{ REINDEX INDEX CONCURRENTLY test.tbl_pkey; }
+
+session s4
+step s4_wakeup_to_swap		{
+	SELECT injection_points_wakeup('reindex_relation_concurrently_before_swap');
+	SELECT injection_points_detach('reindex_relation_concurrently_before_swap');
+}
+step s4_wakeup_s1		{
+	SELECT injection_points_wakeup('check_exclusion_or_unique_constraint_no_conflict');
+	SELECT injection_points_detach('check_exclusion_or_unique_constraint_no_conflict');
+}
+step s4_wakeup_s2		{
+	SELECT injection_points_wakeup('exec_insert_before_insert_speculative');
+	SELECT injection_points_detach('exec_insert_before_insert_speculative');
+}
+step s4_wakeup_to_set_dead		{
+	SELECT injection_points_wakeup('reindex_relation_concurrently_before_set_dead');
+	SELECT injection_points_detach('reindex_relation_concurrently_before_set_dead');
+}
+
+permutation
+	s3_start_reindex
+	s1_start_upsert
+	s4_wakeup_to_swap
+	s2_start_upsert
+	s4_wakeup_s1
+	s4_wakeup_s2
+	s4_wakeup_to_set_dead
+
+permutation
+	s3_start_reindex
+	s2_start_upsert
+	s4_wakeup_to_swap
+	s1_start_upsert
+	s4_wakeup_s1
+	s4_wakeup_s2
+	s4_wakeup_to_set_dead
+
+permutation
+	s3_start_reindex
+	s4_wakeup_to_swap
+	s1_start_upsert
+	s2_start_upsert
+	s4_wakeup_s1
+	s4_wakeup_to_set_dead
+	s4_wakeup_s2
\ No newline at end of file
-- 
2.34.1

