From 2be7b039e926e80488f7f6d84033e016048f9ab7 Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Tue, 23 Feb 2016 16:04:05 +0800
Subject: [PATCH 4/8] Add the UI and for failover slots

Expose failover slots to the user.

Add a new 'failover' argument to pg_create_logical_replication_slot and
pg_create_physical_replication_slot . Accept a new FAILOVER keyword
argument in CREATE_REPLICATION_SLOT on the walsender protocol.
---
 contrib/test_decoding/expected/ddl.out |  3 +++
 contrib/test_decoding/sql/ddl.sql      |  2 ++
 src/backend/catalog/system_views.sql   | 11 ++++++++++-
 src/backend/replication/repl_gram.y    | 13 +++++++++++--
 src/backend/replication/repl_scanner.l |  1 +
 src/backend/replication/slotfuncs.c    |  7 +++++--
 src/backend/replication/walsender.c    |  4 ++--
 src/include/catalog/pg_proc.h          |  4 ++--
 src/include/nodes/replnodes.h          |  1 +
 src/include/replication/slot.h         |  1 +
 10 files changed, 38 insertions(+), 9 deletions(-)

diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out
index 57a1289..5fed500 100644
--- a/contrib/test_decoding/expected/ddl.out
+++ b/contrib/test_decoding/expected/ddl.out
@@ -9,6 +9,9 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
 -- fail because of an already existing slot
 SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
 ERROR:  replication slot "regression_slot" already exists
+-- fail because a failover slot can't replace a normal slot on the master
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', true);
+ERROR:  replication slot "regression_slot" already exists
 -- fail because of an invalid name
 SELECT 'init' FROM pg_create_logical_replication_slot('Invalid Name', 'test_decoding');
 ERROR:  replication slot name "Invalid Name" contains invalid character
diff --git a/contrib/test_decoding/sql/ddl.sql b/contrib/test_decoding/sql/ddl.sql
index e311c59..dc61ef4 100644
--- a/contrib/test_decoding/sql/ddl.sql
+++ b/contrib/test_decoding/sql/ddl.sql
@@ -4,6 +4,8 @@ SET synchronous_commit = on;
 SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
 -- fail because of an already existing slot
 SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+-- fail because a failover slot can't replace a normal slot on the master
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', true);
 -- fail because of an invalid name
 SELECT 'init' FROM pg_create_logical_replication_slot('Invalid Name', 'test_decoding');
 
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index abf9a70..fcb877d 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -949,12 +949,21 @@ AS 'pg_logical_slot_peek_binary_changes';
 
 CREATE OR REPLACE FUNCTION pg_create_physical_replication_slot(
     IN slot_name name, IN immediately_reserve boolean DEFAULT false,
-    OUT slot_name name, OUT xlog_position pg_lsn)
+    IN failover boolean DEFAULT false, OUT slot_name name,
+    OUT xlog_position pg_lsn)
 RETURNS RECORD
 LANGUAGE INTERNAL
 STRICT VOLATILE
 AS 'pg_create_physical_replication_slot';
 
+CREATE OR REPLACE FUNCTION pg_create_logical_replication_slot(
+    IN slot_name name, IN plugin name, IN failover boolean DEFAULT false,
+    OUT slot_name text, OUT xlog_position pg_lsn)
+RETURNS RECORD
+LANGUAGE INTERNAL
+STRICT VOLATILE
+AS 'pg_create_logical_replication_slot';
+
 CREATE OR REPLACE FUNCTION
   make_interval(years int4 DEFAULT 0, months int4 DEFAULT 0, weeks int4 DEFAULT 0,
                 days int4 DEFAULT 0, hours int4 DEFAULT 0, mins int4 DEFAULT 0,
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index d93db88..1574f24 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -77,6 +77,7 @@ Node *replication_parse_result;
 %token K_LOGICAL
 %token K_SLOT
 %token K_RESERVE_WAL
+%token K_FAILOVER
 
 %type <node>	command
 %type <node>	base_backup start_replication start_logical_replication
@@ -90,6 +91,7 @@ Node *replication_parse_result;
 %type <node>	plugin_opt_arg
 %type <str>		opt_slot
 %type <boolval>	opt_reserve_wal
+%type <boolval> opt_failover
 
 %%
 
@@ -184,23 +186,25 @@ base_backup_opt:
 
 create_replication_slot:
 			/* CREATE_REPLICATION_SLOT slot PHYSICAL RESERVE_WAL */
-			K_CREATE_REPLICATION_SLOT IDENT K_PHYSICAL opt_reserve_wal
+			K_CREATE_REPLICATION_SLOT IDENT K_PHYSICAL opt_reserve_wal opt_failover
 				{
 					CreateReplicationSlotCmd *cmd;
 					cmd = makeNode(CreateReplicationSlotCmd);
 					cmd->kind = REPLICATION_KIND_PHYSICAL;
 					cmd->slotname = $2;
 					cmd->reserve_wal = $4;
+					cmd->failover = $5;
 					$$ = (Node *) cmd;
 				}
 			/* CREATE_REPLICATION_SLOT slot LOGICAL plugin */
-			| K_CREATE_REPLICATION_SLOT IDENT K_LOGICAL IDENT
+			| K_CREATE_REPLICATION_SLOT IDENT K_LOGICAL IDENT opt_failover
 				{
 					CreateReplicationSlotCmd *cmd;
 					cmd = makeNode(CreateReplicationSlotCmd);
 					cmd->kind = REPLICATION_KIND_LOGICAL;
 					cmd->slotname = $2;
 					cmd->plugin = $4;
+					cmd->failover = $5;
 					$$ = (Node *) cmd;
 				}
 			;
@@ -276,6 +280,11 @@ opt_reserve_wal:
 			| /* EMPTY */					{ $$ = false; }
 			;
 
+opt_failover:
+			K_FAILOVER						{ $$ = true; }
+			| /* EMPTY */					{ $$ = false; }
+			;
+
 opt_slot:
 			K_SLOT IDENT
 				{ $$ = $2; }
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index f83ec53..a1d9f10 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -98,6 +98,7 @@ PHYSICAL			{ return K_PHYSICAL; }
 RESERVE_WAL			{ return K_RESERVE_WAL; }
 LOGICAL				{ return K_LOGICAL; }
 SLOT				{ return K_SLOT; }
+FAILOVER			{ return K_FAILOVER; }
 
 ","				{ return ','; }
 ";"				{ return ';'; }
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index f430714..a2dfc40 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -18,6 +18,7 @@
 
 #include "access/htup_details.h"
 #include "replication/slot.h"
+#include "replication/slot_xlog.h"
 #include "replication/logical.h"
 #include "replication/logicalfuncs.h"
 #include "utils/builtins.h"
@@ -41,6 +42,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
 {
 	Name		name = PG_GETARG_NAME(0);
 	bool 		immediately_reserve = PG_GETARG_BOOL(1);
+	bool		failover = PG_GETARG_BOOL(2);
 	Datum		values[2];
 	bool		nulls[2];
 	TupleDesc	tupdesc;
@@ -57,7 +59,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
 	CheckSlotRequirements();
 
 	/* acquire replication slot, this will check for conflicting names */
-	ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT, false);
+	ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT, failover);
 
 	values[0] = NameGetDatum(&MyReplicationSlot->data.name);
 	nulls[0] = false;
@@ -96,6 +98,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 {
 	Name		name = PG_GETARG_NAME(0);
 	Name		plugin = PG_GETARG_NAME(1);
+	bool		failover = PG_GETARG_BOOL(2);
 
 	LogicalDecodingContext *ctx = NULL;
 
@@ -120,7 +123,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 	 * errors during initialization because it'll get dropped if this
 	 * transaction fails. We'll make it persistent at the end.
 	 */
-	ReplicationSlotCreate(NameStr(*name), true, RS_EPHEMERAL, false);
+	ReplicationSlotCreate(NameStr(*name), true, RS_EPHEMERAL, failover);
 
 	/*
 	 * Create logical decoding context, to build the initial snapshot.
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 1583862..efdbfd1 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -792,7 +792,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 
 	if (cmd->kind == REPLICATION_KIND_PHYSICAL)
 	{
-		ReplicationSlotCreate(cmd->slotname, false, RS_PERSISTENT, false);
+		ReplicationSlotCreate(cmd->slotname, false, RS_PERSISTENT, cmd->failover);
 	}
 	else
 	{
@@ -803,7 +803,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 		 * handle errors during initialization because it'll get dropped if
 		 * this transaction fails. We'll make it persistent at the end.
 		 */
-		ReplicationSlotCreate(cmd->slotname, true, RS_EPHEMERAL, false);
+		ReplicationSlotCreate(cmd->slotname, true, RS_EPHEMERAL, cmd->failover);
 	}
 
 	initStringInfo(&output_message);
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 62b9125..af2b214 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5068,13 +5068,13 @@ DATA(insert OID = 3473 (  spg_range_quad_leaf_consistent	PGNSP PGUID 12 1 0 0 0
 DESCR("SP-GiST support for quad tree over range");
 
 /* replication slots */
-DATA(insert OID = 3779 (  pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 2 0 2249 "19 16" "{19,16,19,3220}" "{i,i,o,o}" "{slot_name,immediately_reserve,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ ));
+DATA(insert OID = 3779 (  pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 2249 "19 16 16" "{19,16,16,19,3220}" "{i,i,i,o,o}" "{slot_name,immediately_reserve,failover,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ ));
 DESCR("create a physical replication slot");
 DATA(insert OID = 3780 (  pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ ));
 DESCR("drop a replication slot");
 DATA(insert OID = 3781 (  pg_get_replication_slots	PGNSP PGUID 12 1 10 0 0 f f f f f t s s 0 0 2249 "" "{19,19,25,26,16,23,28,28,3220,3220}" "{o,o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ ));
 DESCR("information about replication slots currently in use");
-DATA(insert OID = 3786 (  pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 2 0 2249 "19 19" "{19,19,25,3220}" "{i,i,o,o}" "{slot_name,plugin,slot_name,xlog_position}" _null_ _null_ pg_create_logical_replication_slot _null_ _null_ _null_ ));
+DATA(insert OID = 3786 (  pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 2249 "19 19 16" "{19,19,16,25,3220}" "{i,i,i,o,o}" "{slot_name,plugin,failover,slot_name,xlog_position}" _null_ _null_ pg_create_logical_replication_slot _null_ _null_ _null_ ));
 DESCR("set up a logical replication slot");
 DATA(insert OID = 3782 (  pg_logical_slot_get_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v u 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,25}" "{i,i,i,v,o,o,o}" "{slot_name,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ _null_ pg_logical_slot_get_changes _null_ _null_ _null_ ));
 DESCR("get changes from replication slot");
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index d2f1edb..a8fa9d5 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -56,6 +56,7 @@ typedef struct CreateReplicationSlotCmd
 	ReplicationKind kind;
 	char	   *plugin;
 	bool		reserve_wal;
+	bool		failover;
 } CreateReplicationSlotCmd;
 
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index cdcbd37..9e23a29 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -4,6 +4,7 @@
  *
  * Copyright (c) 2012-2016, PostgreSQL Global Development Group
  *
+ * src/include/replication/slot.h
  *-------------------------------------------------------------------------
  */
 #ifndef SLOT_H
-- 
2.1.0

