From 344c629ff43d9a44cff55eea5968e693ba1ee4f4 Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Thu, 17 Nov 2016 15:25:29 +0800
Subject: [PATCH 16/21] Drop logical replication slots when redoing database
 drop

When a standby has logical replication slots on its database, drop them
as part of redoing database drop.
---
 src/backend/commands/dbcommands.c                  |   6 ++
 src/backend/replication/slot.c                     |  72 +++++++++++++
 src/include/replication/slot.h                     |   1 +
 .../recovery/t/010_logical_decoding_on_replica.pl  | 118 ++++++++++++++++++---
 4 files changed, 182 insertions(+), 15 deletions(-)

diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c
index 0919ad8..3efc833 100644
--- a/src/backend/commands/dbcommands.c
+++ b/src/backend/commands/dbcommands.c
@@ -2119,11 +2119,17 @@ dbase_redo(XLogReaderState *record)
 			 * InitPostgres() cannot fully re-execute concurrently. This
 			 * avoids backends re-connecting automatically to same database,
 			 * which can happen in some cases.
+			 *
+			 * This will lock out walsenders trying to connect to db-specific
+			 * slots for logical decoding too, so it's safe for us to drop slots.
 			 */
 			LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
 			ResolveRecoveryConflictWithDatabase(xlrec->db_id);
 		}
 
+		/* Drop any database-specific replication slots */
+		ReplicationSlotsDropDBSlots(xlrec->db_id);
+
 		/* Drop pages for this database that are in the shared buffer cache */
 		DropDatabaseBuffers(xlrec->db_id);
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 0b2575e..426f0d0 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -758,6 +758,78 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
 	return false;
 }
 
+/*
+ * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
+ * passed database oid. The caller should hold an exclusive lock on the database
+ * to ensure no replication slots on the database are in use.
+ */
+void
+ReplicationSlotsDropDBSlots(Oid dboid)
+{
+	int			i;
+	char		path[MAXPGPATH];
+
+	if (max_replication_slots <= 0)
+		return;
+
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	for (i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationSlot *s;
+		NameData slotname;
+		int active_pid;
+
+		s = &ReplicationSlotCtl->replication_slots[i];
+
+		/* cannot change while ReplicationSlotCtlLock is held */
+		if (!s->in_use)
+			continue;
+
+		/* only logical slots are database specific, skip */
+		if (!SlotIsLogical(s))
+			continue;
+
+		/* not our database, skip */
+		if (s->data.database != dboid)
+			continue;
+
+		/* Deactivate the slot in memory */
+		SpinLockAcquire(&s->mutex);
+		strncpy(NameStr(slotname), NameStr(s->data.name), NAMEDATALEN);
+		NameStr(slotname)[NAMEDATALEN-1] = '\0';
+		active_pid = s->active_pid;
+		if (active_pid == 0)
+		{
+			s->active_pid = 0;
+			s->in_use = false;
+		}
+		SpinLockRelease(&s->mutex);
+
+		/*
+		 * The caller should have an exclusive lock on the database so
+		 * we'll never have any in-use slots.
+		 */
+		if (active_pid)
+			elog(PANIC, "replication slot %s is in use by pid %d",
+				 NameStr(slotname), active_pid);
+
+		/* and purge it from disk */
+		sprintf(path, "pg_replslot/%s", NameStr(slotname));
+
+		/* if deletion fails we want to bail out and force retry of recovery */
+		if (!rmtree(path, true))
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not remove directory \"%s\" for slot \"%s\"",
+					 		path, NameStr(slotname))));
+	}
+	LWLockRelease(ReplicationSlotControlLock);
+
+	/* recompute limits once after all slots are dropped */
+	ReplicationSlotsComputeRequiredXmin(false);
+	ReplicationSlotsComputeRequiredLSN();
+}
+
 
 /*
  * Check whether the server's configuration supports using replication
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index e00562d..4ad2bcf 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -175,6 +175,7 @@ extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
 extern void ReplicationSlotsComputeRequiredLSN(void);
 extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
+extern void ReplicationSlotsDropDBSlots(Oid dboid);
 
 extern void StartupReplicationSlots(void);
 extern void CheckPointReplicationSlots(void);
diff --git a/src/test/recovery/t/010_logical_decoding_on_replica.pl b/src/test/recovery/t/010_logical_decoding_on_replica.pl
index 0d869e4..7934e9f 100644
--- a/src/test/recovery/t/010_logical_decoding_on_replica.pl
+++ b/src/test/recovery/t/010_logical_decoding_on_replica.pl
@@ -7,7 +7,7 @@ use warnings;
 
 use PostgresNode;
 use TestLib;
-use Test::More tests => 28;
+use Test::More tests => 43;
 use RecursiveCopy;
 use File::Copy;
 
@@ -28,10 +28,12 @@ $node_master->append_conf('postgresql.conf', "wal_receiver_status_interval = 1\n
 $node_master->dump_info;
 $node_master->start;
 
-$node_master->safe_psql('postgres', q[SELECT * FROM pg_create_physical_replication_slot('decoding_standby');]);
+$node_master->psql('postgres', q[CREATE DATABASE testdb]);
+
+$node_master->safe_psql('testdb', q[SELECT * FROM pg_create_physical_replication_slot('decoding_standby');]);
 $backup_name = 'b1';
 my $backup_dir = $node_master->backup_dir . "/" . $backup_name;
-TestLib::system_or_bail('pg_basebackup', '-D', $backup_dir, '-d', $node_master->connstr('postgres'), '--xlog-method=stream', '--write-recovery-conf', '--slot=decoding_standby');
+TestLib::system_or_bail('pg_basebackup', '-D', $backup_dir, '-d', $node_master->connstr('testdb'), '--xlog-method=stream', '--write-recovery-conf', '--slot=decoding_standby');
 
 open(my $fh, "<", $backup_dir . "/recovery.conf")
   or die "can't open recovery.conf";
@@ -50,8 +52,8 @@ ok($found, "using physical slot for standby");
 
 sub print_phys_xmin
 {
-    my ($xmin, $catalog_xmin) = split(qr/\|/, $node_master->safe_psql('postgres', q[SELECT xmin, catalog_xmin FROM pg_replication_slots WHERE slot_name = 'decoding_standby';]));
-	return ($xmin, $catalog_xmin);
+	my $slot = $node_master->slot('decoding_standby');
+	return ($slot->{'xmin'}, $slot->{'catalog_xmin'});
 }
 
 my ($xmin, $catalog_xmin) = print_phys_xmin();
@@ -77,13 +79,13 @@ ok($xmin, "xmin not null");
 ok(!$catalog_xmin, "catalog_xmin null");
 
 # Create new slots on the replica, ignoring the ones on the master completely.
-is($node_replica->psql('postgres', qq[SELECT * FROM pg_create_logical_replication_slot('standby_logical', 'test_decoding')]),
+is($node_replica->psql('testdb', qq[SELECT * FROM pg_create_logical_replication_slot('standby_logical', 'test_decoding')]),
    0, 'logical slot creation on standby succeeded');
 
 sub print_logical_xmin
 {
-    my ($xmin, $catalog_xmin) = split(qr/\|/, $node_replica->safe_psql('postgres', q[SELECT xmin, catalog_xmin FROM pg_replication_slots WHERE slot_name = 'standby_logical';]));
-	return ($xmin, $catalog_xmin);
+	my $slot = $node_replica->slot('standby_logical');
+	return ($slot->{'xmin'}, $slot->{'catalog_xmin'});
 }
 
 $node_master->wait_for_catchup($node_replica);
@@ -97,8 +99,8 @@ isnt($catalog_xmin, '', "physical catalog_xmin not null");
 is($xmin, '', "logical xmin null");
 isnt($catalog_xmin, '', "logical catalog_xmin not null");
 
-$node_master->safe_psql('postgres', 'CREATE TABLE test_table(id serial primary key, blah text)');
-$node_master->safe_psql('postgres', q[INSERT INTO test_table(blah) values ('itworks')]);
+$node_master->safe_psql('testdb', 'CREATE TABLE test_table(id serial primary key, blah text)');
+$node_master->safe_psql('testdb', q[INSERT INTO test_table(blah) values ('itworks')]);
 
 $node_master->wait_for_catchup($node_replica);
 sleep(2); # ensure walreceiver feedback sent
@@ -110,7 +112,7 @@ isnt($catalog_xmin, '', "physical catalog_xmin not null");
 $node_master->wait_for_catchup($node_replica);
 sleep(2); # ensure walreceiver feedback sent
 
-($ret, $stdout, $stderr) = $node_replica->psql('postgres', qq[SELECT data FROM pg_logical_slot_get_changes('standby_logical', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]);
+($ret, $stdout, $stderr) = $node_replica->psql('testdb', qq[SELECT data FROM pg_logical_slot_get_changes('standby_logical', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]);
 is($ret, 0, 'replay from slot succeeded');
 is($stdout, q{BEGIN
 table public.test_table: INSERT: id[integer]:1 blah[text]:'itworks'
@@ -133,11 +135,11 @@ isnt($logical_catalog_xmin, '', "logical catalog_xmin not null");
 # we hold down xmin.
 for my $i (0 .. 1000)
 {
-    $node_master->safe_psql('postgres', qq[INSERT INTO test_table(blah) VALUES ('entry $i')]);
+    $node_master->safe_psql('testdb', qq[INSERT INTO test_table(blah) VALUES ('entry $i')]);
 }
-$node_master->safe_psql('postgres', 'VACUUM');
+$node_master->safe_psql('testdb', 'VACUUM');
 
-($ret, $stdout, $stderr) = $node_replica->psql('postgres', qq[SELECT data FROM pg_logical_slot_get_changes('standby_logical', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]);
+($ret, $stdout, $stderr) = $node_replica->psql('testdb', qq[SELECT data FROM pg_logical_slot_get_changes('standby_logical', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]);
 is($ret, 0, 'replay of big series succeeded');
 
 $node_master->wait_for_catchup($node_replica);
@@ -158,7 +160,9 @@ isnt($new_physical_xmin, '', "physical xmin not null");
 isnt($new_physical_xmin, $physical_xmin, "physical xmin changed");
 isnt($new_physical_catalog_xmin, '', "physical catalog_xmin not null");
 
-$node_replica->psql('postgres', q[SELECT pg_drop_replication_slot('standby_logical')]);
+$node_replica->psql('testdb', q[SELECT pg_drop_replication_slot('standby_logical')]);
+
+is($node_replica->slot('standby_logical')->{'slot_type'}, '', 'slot on standby dropped manually');
 
 $node_master->wait_for_catchup($node_replica);
 sleep(2); # ensure walreceiver feedback sent
@@ -166,3 +170,87 @@ sleep(2); # ensure walreceiver feedback sent
 ($xmin, $catalog_xmin) = print_phys_xmin();
 isnt($xmin, '', "physical xmin not null");
 is($catalog_xmin, '', "physical catalog_xmin null");
+
+
+
+# Create a couple of slots on the DB to ensure they are dropped when we drop
+# the DB on the upstream if they're on the right DB, or not dropped if on
+# another DB.
+diag "Testing dropdb when downstream slot is not in-use";
+$node_replica->command_ok(['pg_recvlogical', '-d', $node_replica->connstr('testdb'), '-P', 'test_decoding', '-S', 'dodropslot', '--create-slot']);
+$node_replica->command_ok(['pg_recvlogical', '-d', $node_replica->connstr('postgres'), '-P', 'test_decoding', '-S', 'otherslot', '--create-slot']);
+
+is($node_replica->slot('dodropslot')->{'slot_type'}, 'logical', 'slot dodropslot on standby created');
+is($node_replica->slot('otherslot')->{'slot_type'}, 'logical', 'slot otherslot on standby created');
+
+# dropdb on the master to verify slots are dropped on standby
+$node_master->safe_psql('postgres', q[DROP DATABASE testdb]);
+
+$node_master->wait_for_catchup($node_replica);
+
+is($node_replica->safe_psql('postgres', q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb')]), 'f',
+  'database dropped on standby');
+
+is($node_replica->slot('dodropslot2')->{'slot_type'}, '', 'slot on standby dropped');
+is($node_replica->slot('otherslot')->{'slot_type'}, 'logical', 'otherslot on standby not dropped');
+
+
+
+# This time, have the slot in-use on the downstream DB when we drop it.
+diag "Testing dropdb when downstream slot is in-use";
+$node_master->psql('postgres', q[CREATE DATABASE testdb2]);
+
+$node_replica->command_ok(['pg_recvlogical', '-d', $node_replica->connstr('testdb2'), '-P', 'test_decoding', '-S', 'dodropslot2', '--create-slot']);
+is($node_replica->slot('dodropslot2')->{'slot_type'}, 'logical', 'slot dodropslot2 on standby created');
+
+# make sure the slot is in use
+diag "starting pg_recvlogical";
+my $handle = IPC::Run::start(['pg_recvlogical', '-d', $node_replica->connstr('testdb2'), '-S', 'dodropslot2', '-f', '-', '--start'], '>', \$stdout, '2>', \$stderr);
+sleep(1);
+$handle->reap_nb;
+$handle->pump;
+
+if (!$handle->pumpable)
+{
+	$handle->finish;
+	BAIL_OUT("pg_recvlogical already exited with " . (($handle->results())[0]) . " and stderr '$stderr'");
+}
+
+is($node_replica->slot('dodropslot2')->{'active'}, 't', 'slot on standby is active')
+  or BAIL_OUT("slot not active on standby, cannot continue. pg_recvlogical exited with '$stdout', '$stderr'");
+
+diag "pg_recvlogical backend pid is " . $node_replica->slot('dodropslot2')->{'active_pid'};
+
+# Master doesn't know the replica's slot is busy so dropdb should succeed
+$node_master->safe_psql('postgres', q[DROP DATABASE testdb2]);
+ok(1, 'dropdb finished');
+
+# replication won't catch up, we'll error on apply while the slot is in use
+# TODO check for error
+
+$node_master->wait_for_catchup($node_replica);
+
+sleep(1);
+
+# our client should've terminated
+do {
+	local $@;
+	eval {
+		$handle->finish;
+	};
+	my $return = $?;
+	my $save_exc = $@;
+	if ($@) {
+		diag "pg_recvlogical terminated with $? and stderr '$stderr'";	
+		is($return, 1, "pg_recvlogical terminated by server");
+	}
+	else
+	{
+		fail("pg_recvlogical not terminated? $save_exc");
+	}
+};
+
+is($node_replica->safe_psql('postgres', q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb2')]), 'f',
+  'database dropped on standby');
+
+is($node_replica->slot('dodropslot2')->{'slot_type'}, '', 'slot on standby dropped');
-- 
2.5.5

