From bce33b0732e0498b25d6673b49b86c1eb09ab894 Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Mon, 15 Feb 2016 12:00:59 +0800
Subject: [PATCH 3/4] Add the UI and documentation for failover slots

Expose failover slots to the user.

Add a new 'failover' argument to pg_create_logical_replication_slot and
pg_create_physical_replication_slot . Report if a slot is a failover
slot in pg_catalog.pg_replication_slots. Accept a new FAILOVER keyword
argument in CREATE_REPLICATION_SLOT on the walsender protocol.

Document the existence of failover slots support and how to use them.
---
 contrib/test_decoding/expected/ddl.out | 41 ++++++++++++++++++---
 contrib/test_decoding/sql/ddl.sql      | 17 ++++++++-
 doc/src/sgml/catalogs.sgml             | 10 +++++
 doc/src/sgml/func.sgml                 | 24 ++++++++----
 doc/src/sgml/high-availability.sgml    | 67 ++++++++++++++++++++++++++++++++--
 doc/src/sgml/logicaldecoding.sgml      | 52 +++++++++++++++++---------
 doc/src/sgml/protocol.sgml             | 24 ++++++++++--
 src/backend/catalog/system_views.sql   | 12 +++++-
 src/backend/replication/repl_gram.y    | 13 ++++++-
 src/backend/replication/slotfuncs.c    | 13 +++++--
 src/backend/replication/walsender.c    |  4 +-
 src/include/catalog/pg_proc.h          |  6 +--
 src/include/nodes/replnodes.h          |  1 +
 src/include/replication/slot.h         |  1 +
 src/test/regress/expected/rules.out    |  3 +-
 15 files changed, 237 insertions(+), 51 deletions(-)

diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out
index 57a1289..5b2f34a 100644
--- a/contrib/test_decoding/expected/ddl.out
+++ b/contrib/test_decoding/expected/ddl.out
@@ -9,6 +9,9 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
 -- fail because of an already existing slot
 SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
 ERROR:  replication slot "regression_slot" already exists
+-- fail because a failover slot can't replace a normal slot on the master
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', true);
+ERROR:  replication slot "regression_slot" already exists
 -- fail because of an invalid name
 SELECT 'init' FROM pg_create_logical_replication_slot('Invalid Name', 'test_decoding');
 ERROR:  replication slot name "Invalid Name" contains invalid character
@@ -58,11 +61,37 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
 SELECT slot_name, plugin, slot_type, active,
     NOT catalog_xmin IS NULL AS catalog_xmin_set,
     xmin IS NULl  AS data_xmin_not_set,
-    pg_xlog_location_diff(restart_lsn, '0/01000000') > 0 AS some_wal
+    pg_xlog_location_diff(restart_lsn, '0/01000000') > 0 AS some_wal,
+    failover
 FROM pg_replication_slots;
-    slot_name    |    plugin     | slot_type | active | catalog_xmin_set | data_xmin_not_set | some_wal 
------------------+---------------+-----------+--------+------------------+-------------------+----------
- regression_slot | test_decoding | logical   | f      | t                | t                 | t
+    slot_name    |    plugin     | slot_type | active | catalog_xmin_set | data_xmin_not_set | some_wal | failover 
+-----------------+---------------+-----------+--------+------------------+-------------------+----------+----------
+ regression_slot | test_decoding | logical   | f      | t                | t                 | t        | f
+(1 row)
+
+/* same for a failover slot */
+SELECT 'init' FROM pg_create_logical_replication_slot('failover_slot', 'test_decoding', true);
+ ?column? 
+----------
+ init
+(1 row)
+
+SELECT slot_name, plugin, slot_type, active,
+    NOT catalog_xmin IS NULL AS catalog_xmin_set,
+    xmin IS NULl  AS data_xmin_not_set,
+    pg_xlog_location_diff(restart_lsn, '0/01000000') > 0 AS some_wal,
+    failover
+FROM pg_replication_slots
+WHERE slot_name = 'failover_slot';
+   slot_name   |    plugin     | slot_type | active | catalog_xmin_set | data_xmin_not_set | some_wal | failover 
+---------------+---------------+-----------+--------+------------------+-------------------+----------+----------
+ failover_slot | test_decoding | logical   | f      | t                | t                 | t        | t
+(1 row)
+
+SELECT pg_drop_replication_slot('failover_slot');
+ pg_drop_replication_slot 
+--------------------------
+ 
 (1 row)
 
 /*
@@ -673,7 +702,7 @@ SELECT pg_drop_replication_slot('regression_slot');
 
 /* check that the slot is gone */
 SELECT * FROM pg_replication_slots;
- slot_name | plugin | slot_type | datoid | database | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn 
------------+--------+-----------+--------+----------+--------+------------+------+--------------+-------------+---------------------
+ slot_name | plugin | slot_type | datoid | database | active | active_pid | failover | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn 
+-----------+--------+-----------+--------+----------+--------+------------+----------+------+--------------+-------------+---------------------
 (0 rows)
 
diff --git a/contrib/test_decoding/sql/ddl.sql b/contrib/test_decoding/sql/ddl.sql
index e311c59..f64b21c 100644
--- a/contrib/test_decoding/sql/ddl.sql
+++ b/contrib/test_decoding/sql/ddl.sql
@@ -4,6 +4,8 @@ SET synchronous_commit = on;
 SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
 -- fail because of an already existing slot
 SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+-- fail because a failover slot can't replace a normal slot on the master
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', true);
 -- fail because of an invalid name
 SELECT 'init' FROM pg_create_logical_replication_slot('Invalid Name', 'test_decoding');
 
@@ -22,16 +24,27 @@ SELECT 'init' FROM pg_create_physical_replication_slot('repl');
 SELECT data FROM pg_logical_slot_get_changes('repl', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 SELECT pg_drop_replication_slot('repl');
 
-
 SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
 
 /* check whether status function reports us, only reproduceable columns */
 SELECT slot_name, plugin, slot_type, active,
     NOT catalog_xmin IS NULL AS catalog_xmin_set,
     xmin IS NULl  AS data_xmin_not_set,
-    pg_xlog_location_diff(restart_lsn, '0/01000000') > 0 AS some_wal
+    pg_xlog_location_diff(restart_lsn, '0/01000000') > 0 AS some_wal,
+    failover
 FROM pg_replication_slots;
 
+/* same for a failover slot */
+SELECT 'init' FROM pg_create_logical_replication_slot('failover_slot', 'test_decoding', true);
+SELECT slot_name, plugin, slot_type, active,
+    NOT catalog_xmin IS NULL AS catalog_xmin_set,
+    xmin IS NULl  AS data_xmin_not_set,
+    pg_xlog_location_diff(restart_lsn, '0/01000000') > 0 AS some_wal,
+    failover
+FROM pg_replication_slots
+WHERE slot_name = 'failover_slot';
+SELECT pg_drop_replication_slot('failover_slot');
+
 /*
  * Check that changes are handled correctly when interleaved with ddl
  */
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 412c845..053b91a 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -5377,6 +5377,16 @@
      </row>
 
      <row>
+      <entry><structfield>failover</structfield></entry>
+      <entry><type>boolean</type></entry>
+      <entry></entry>
+      <entry>
+       True if this slot is a failover slot; see
+       <xref linkend="streaming-replication-slots-failover"/>.
+      </entry>
+     </row>
+
+     <row>
       <entry><structfield>xmin</structfield></entry>
       <entry><type>xid</type></entry>
       <entry></entry>
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index f9eea76..ef49bd7 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -17420,7 +17420,7 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
         <indexterm>
          <primary>pg_create_physical_replication_slot</primary>
         </indexterm>
-        <literal><function>pg_create_physical_replication_slot(<parameter>slot_name</parameter> <type>name</type> <optional>, <parameter>immediately_reserve</> <type>boolean</> </optional>)</function></literal>
+        <literal><function>pg_create_physical_replication_slot(<parameter>slot_name</parameter> <type>name</type>, <optional><parameter>immediately_reserve</> <type>boolean</></optional>, <optional><parameter>failover</> <type>boolean</></optional>)</function></literal>
        </entry>
        <entry>
         (<parameter>slot_name</parameter> <type>name</type>, <parameter>xlog_position</parameter> <type>pg_lsn</type>)
@@ -17431,7 +17431,10 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
         when <literal>true</>, specifies that the <acronym>LSN</> for this
         replication slot be reserved immediately; otherwise
         the <acronym>LSN</> is reserved on first connection from a streaming
-        replication client. Streaming changes from a physical slot is only
+        replication client. If <literal>failover</literal> is <literal>true</literal>
+        then the slot is created as a failover slot; see <xref
+        linkend="streaming-replication-slots-failover">.
+        Streaming changes from a physical slot is only
         possible with the streaming-replication protocol &mdash;
         see <xref linkend="protocol-replication">. This function corresponds
         to the replication protocol command <literal>CREATE_REPLICATION_SLOT
@@ -17460,7 +17463,7 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
         <indexterm>
          <primary>pg_create_logical_replication_slot</primary>
         </indexterm>
-        <literal><function>pg_create_logical_replication_slot(<parameter>slot_name</parameter> <type>name</type>, <parameter>plugin</parameter> <type>name</type>)</function></literal>
+        <literal><function>pg_create_logical_replication_slot(<parameter>slot_name</parameter> <type>name</type>, <parameter>plugin</parameter> <type>name</type>, <optional><parameter>failover</> <type>boolean</></optional>)</function></literal>
        </entry>
        <entry>
         (<parameter>slot_name</parameter> <type>name</type>, <parameter>xlog_position</parameter> <type>pg_lsn</type>)
@@ -17468,8 +17471,10 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
        <entry>
         Creates a new logical (decoding) replication slot named
         <parameter>slot_name</parameter> using the output plugin
-        <parameter>plugin</parameter>.  A call to this function has the same
-        effect as the replication protocol command
+        <parameter>plugin</parameter>. If <literal>failover</literal>
+        is <literal>true</literal> the slot is created as a failover
+        slot; see <xref linkend="streaming-replication-slots-failover">. A call to
+        this function has the same effect as the replication protocol command
         <literal>CREATE_REPLICATION_SLOT ... LOGICAL</literal>.
        </entry>
       </row>
@@ -17485,7 +17490,7 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
         (<parameter>location</parameter> <type>pg_lsn</type>, <parameter>xid</parameter> <type>xid</type>, <parameter>data</parameter> <type>text</type>)
        </entry>
        <entry>
-        Returns changes in the slot <parameter>slot_name</parameter>, starting
+        eturns changes in the slot <parameter>slot_name</parameter>, starting
         from the point at which since changes have been consumed last.  If
         <parameter>upto_lsn</> and <parameter>upto_nchanges</> are NULL,
         logical decoding will continue until end of WAL.  If
@@ -17495,7 +17500,12 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
         stop when the number of rows produced by decoding exceeds
         the specified value.  Note, however, that the actual number of
         rows returned may be larger, since this limit is only checked after
-        adding the rows produced when decoding each new transaction commit.
+        adding the rows produced when decoding each new transaction commit,
+        so at least one transaction is always returned. The returned changes
+        are consumed and will not be returned by a subsequent calls to
+        <function>pg_logical_slot_get_changes</function>, though a server
+        crash may cause recently consumed changes to be replayed again after
+        recovery.
        </entry>
       </row>
 
diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml
index 6cb690c..1624d51 100644
--- a/doc/src/sgml/high-availability.sgml
+++ b/doc/src/sgml/high-availability.sgml
@@ -859,7 +859,8 @@ primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
      <xref linkend="functions-recovery-info-table"> for details).
      The last WAL receive location in the standby is also displayed in the
      process status of the WAL receiver process, displayed using the
-     <command>ps</> command (see <xref linkend="monitoring-ps"> for details).
+     <command>ps</> command (see <xref linkend="monitoring-ps"> for details)
+     and in the <literal>pg_stat_replication</literal> view.
     </para>
     <para>
      You can retrieve a list of WAL sender processes via the
@@ -871,10 +872,15 @@ primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
      <function>pg_last_xlog_receive_location</> on the standby might indicate
      network delay, or that the standby is under heavy load.
     </para>
+    <para>
+     Compare xlog locations and measure lag using
+     <link linkend="functions-admin-backup"><function>pg_xlog_location_diff(...)</function></link>
+     in a query over <literal>pg_stat_replication</literal>.
+    </para>
    </sect3>
   </sect2>
 
-  <sect2 id="streaming-replication-slots">
+  <sect2 id="streaming-replication-slots" xreflabel="Replication slots">
    <title>Replication Slots</title>
    <indexterm>
     <primary>replication slot</primary>
@@ -885,7 +891,10 @@ primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
     not remove WAL segments until they have been received by all standbys,
     and that the master does not remove rows which could cause a
     <link linkend="hot-standby-conflict">recovery conflict</> even when the
-    standby is disconnected.
+    standby is disconnected. They allow clients to receive a stream of
+    changes in in ordered, consistent manner - either raw WAL from a physical
+    replication slot or a logical stream of row changes from a
+    <link linkend="logicaldecoding-slots">logical replication slot</link>.
    </para>
    <para>
     In lieu of using replication slots, it is possible to prevent the removal
@@ -906,6 +915,17 @@ primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
     and the latter often needs to be set to a high value to provide adequate
     protection.  Replication slots overcome these disadvantages.
    </para>
+   <para>
+    Because replication slots cause the server to retain transaction logs
+    in <filename>pg_xlog</filename> it is important to monitor how far slots
+    are lagging behind the master in order to prevent the disk from filling
+    up and interrupting the master's operation. A query like:
+    <programlisting>
+      SELECT *, pg_xlog_location_diff(pg_current_xlog_location(), restart_lsn) AS lag_bytes
+      FROM pg_replication_slots;
+    </programlisting>
+    will provide an indication of how far a slot is lagging.
+   </para>
    <sect3 id="streaming-replication-slots-manipulation">
     <title>Querying and manipulating replication slots</title>
     <para>
@@ -949,6 +969,47 @@ primary_slot_name = 'node_a_slot'
 </programlisting>
     </para>
    </sect3>
+
+   <sect3 id="streaming-replication-slots-failover" xreflabel="Failover slots">
+     <title>Failover slots</title>
+
+     <para>
+      Normally a replication slot is not preserved across backup and restore
+      (such as by <application>pg_basebackup</application>) and is not
+      replicated to standbys. Slots are <emphasis>automatically
+      dropped</emphasis> when starting up as a streaming replica or in archive
+      recovery (PITR) mode.
+     </para>
+
+     <para>
+      To make it possible to for an application to consistently follow
+      failover when a replica is promoted to a new master a slot may be
+      created as a <emphasis>failover slot</emphasis>. A failover slot may
+      only be created, replayed from or dropped on a master server. Changes to
+      the slot are written to WAL and replicated to standbys. When a standby
+      is promoted applications may connect to the slot on the standby and
+      resume replay from it at a consistent point, as if it was the original
+      master. Failover slots may not be used to replay from a standby before
+      promotion.
+     </para>
+
+     <para>
+      Non-failover slots may be created on and used from a replica. This is
+      currently limited to physical slots as logical decoding is not supported
+      on replica server.
+     </para>
+
+     <para>
+      When a failover slot created on the master has the same name as a
+      non-failover slot on a replica server the non-failover slot will be
+      automatically dropped. Any client currently connected will be
+      disconnected with an error indicating a conflict with recovery. It
+      is strongly recommended that you avoid creating failover slots with
+      the same name as slots on replicas.
+     </para>
+
+   </sect3>
+
   </sect2>
 
   <sect2 id="cascading-replication">
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index e841348..7f6a73d 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -12,15 +12,17 @@
 
   <para>
    Changes are sent out in streams identified by logical replication slots.
-   Each stream outputs each change exactly once.
+   Each stream outputs each change once, though repeats are possible after a
+   server crash.
   </para>
 
   <para>
    The format in which those changes are streamed is determined by the output
-   plugin used.  An example plugin is provided in the PostgreSQL distribution.
-   Additional plugins can be
-   written to extend the choice of available formats without modifying any
-   core code.
+   plugin used.  An example plugin (test_decoding) is provided in the
+   PostgreSQL distribution.  Additional plugins can be written to extend the
+   choice of available formats without modifying any core code.
+  </para>
+  <para>
    Every output plugin has access to each individual new row produced
    by <command>INSERT</command> and the new row version created
    by <command>UPDATE</command>.  Availability of old row versions for
@@ -192,7 +194,7 @@ $ pg_recvlogical -d postgres --slot test --drop-slot
     </para>
    </sect2>
 
-   <sect2>
+   <sect2 id="logicaldecoding-slots" xreflabel="Logical Replication Slots">
     <title>Replication Slots</title>
 
     <indexterm>
@@ -201,20 +203,18 @@ $ pg_recvlogical -d postgres --slot test --drop-slot
     </indexterm>
 
     <para>
+     The general concepts of replication slots are discussed under
+     <xref linkend="streaming-replication-slots">. This topic only covers
+     specifics for logical slots.
+    </para>
+
+    <para>
      In the context of logical replication, a slot represents a stream of
      changes that can be replayed to a client in the order they were made on
      the origin server. Each slot streams a sequence of changes from a single
-     database, sending each change exactly once (except when peeking forward
-     in the stream).
+     database, sending each change only once.
     </para>
 
-    <note>
-     <para><productname>PostgreSQL</productname> also has streaming replication slots
-     (see <xref linkend="streaming-replication">), but they are used somewhat
-     differently there.
-     </para>
-    </note>
-
     <para>
      A replication slot has an identifier that is unique across all databases
      in a <productname>PostgreSQL</productname> cluster. Slots persist
@@ -243,9 +243,22 @@ $ pg_recvlogical -d postgres --slot test --drop-slot
       even when there is no connection using them. This consumes storage
       because neither required WAL nor required rows from the system catalogs
       can be removed by <command>VACUUM</command> as long as they are required by a replication
-      slot.  So if a slot is no longer required it should be dropped.
+      slot.  If a slot is no longer required it should be dropped to prevent
+      <filename>pg_xlog</filename> from filling up and (for logical slots)
+      the system catalogs from bloating.
      </para>
     </note>
+
+    <para>
+     A replication slot keeps track of the oldest needed WAL position that the
+     application may need to restart replay from. It does <emphasis>not</emphasis>
+     guarantee never to replay the same data twice and updates to the restart
+     position are not immediately flushed to disk so they may be lost if the
+     server crashes. The client application is responsible for keeping track of
+     the point it has replayed up to and should request that replay restart at
+     that point when it reconnects by passing the last-replayed LSN to the
+     start replication command.
+    </para>
    </sect2>
 
    <sect2>
@@ -268,7 +281,10 @@ $ pg_recvlogical -d postgres --slot test --drop-slot
      SNAPSHOT</literal></link> to read the state of the database at the moment
      the slot was created. This transaction can then be used to dump the
      database's state at that point in time, which afterwards can be updated
-     using the slot's contents without losing any changes.
+     using the slot's contents without losing any changes. The exported snapshot
+     remains valid until the connection that created it runs another command
+     or disconnects. It may be imported into another connection and re-exported
+     to preserve it longer.
     </para>
    </sect2>
   </sect1>
@@ -280,7 +296,7 @@ $ pg_recvlogical -d postgres --slot test --drop-slot
     The commands
     <itemizedlist>
      <listitem>
-      <para><literal>CREATE_REPLICATION_SLOT <replaceable>slot_name</replaceable> LOGICAL <replaceable>output_plugin</replaceable></literal></para>
+      <para><literal>CREATE_REPLICATION_SLOT <replaceable>slot_name</replaceable> LOGICAL <replaceable>output_plugin</replaceable> <optional>FAILOVER</optional></literal></para>
      </listitem>
 
      <listitem>
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 1a596cd..cbf523d 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1434,13 +1434,14 @@ The commands accepted in walsender mode are:
   </varlistentry>
 
   <varlistentry>
-   <term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</> { <literal>PHYSICAL</> [ <literal>RESERVE_WAL</> ] | <literal>LOGICAL</> <replaceable class="parameter">output_plugin</> }
+   <term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</> { <literal>PHYSICAL</> <optional><literal>RESERVE_WAL</></> | <literal>LOGICAL</> <replaceable class="parameter">output_plugin</> } <optional><literal>FAILOVER</></>
      <indexterm><primary>CREATE_REPLICATION_SLOT</primary></indexterm>
     </term>
     <listitem>
      <para>
       Create a physical or logical replication
-      slot. See <xref linkend="streaming-replication-slots"> for more about
+      slot. See <xref linkend="streaming-replication-slots"> and
+      <xref linkend="logicaldecoding-slots"> for more about
       replication slots.
      </para>
      <variablelist>
@@ -1468,12 +1469,23 @@ The commands accepted in walsender mode are:
        <term><literal>RESERVE_WAL</></term>
        <listitem>
         <para>
-         Specify that this physical replication reserves <acronym>WAL</>
+         Specify that this physical replication slot reserves <acronym>WAL</>
          immediately.  Otherwise, <acronym>WAL</> is only reserved upon
          connection from a streaming replication client.
         </para>
        </listitem>
       </varlistentry>
+
+      <varlistentry>
+       <term><literal>FAILOVER</></term>
+       <listitem>
+        <para>
+         Create this slot as a <link linkend="streaming-replication-slots-failover">
+         failover slot</link>.
+        </para>
+       </listitem>
+      </varlistentry>
+
      </variablelist>
     </listitem>
   </varlistentry>
@@ -1829,6 +1841,12 @@ The commands accepted in walsender mode are:
       to process the output for streaming.
      </para>
 
+     <para>
+      Logical replication automatically follows timeline switches. It is
+      not necessary or possible to supply a <literal>TIMELINE</literal>
+      option like in physical replication.
+     </para>
+
      <variablelist>
       <varlistentry>
        <term><literal>SLOT</literal> <replaceable class="parameter">slot_name</></term>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 923fe58..b4f8fbe 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -698,6 +698,7 @@ CREATE VIEW pg_replication_slots AS
             D.datname AS database,
             L.active,
             L.active_pid,
+            L.failover,
             L.xmin,
             L.catalog_xmin,
             L.restart_lsn,
@@ -943,12 +944,21 @@ AS 'pg_logical_slot_peek_binary_changes';
 
 CREATE OR REPLACE FUNCTION pg_create_physical_replication_slot(
     IN slot_name name, IN immediately_reserve boolean DEFAULT false,
-    OUT slot_name name, OUT xlog_position pg_lsn)
+    IN failover boolean DEFAULT false, OUT slot_name name,
+    OUT xlog_position pg_lsn)
 RETURNS RECORD
 LANGUAGE INTERNAL
 STRICT VOLATILE
 AS 'pg_create_physical_replication_slot';
 
+CREATE OR REPLACE FUNCTION pg_create_logical_replication_slot(
+    IN slot_name name, IN plugin name, IN failover boolean DEFAULT false,
+    OUT slot_name text, OUT xlog_position pg_lsn)
+RETURNS RECORD
+LANGUAGE INTERNAL
+STRICT VOLATILE
+AS 'pg_create_logical_replication_slot';
+
 CREATE OR REPLACE FUNCTION
   make_interval(years int4 DEFAULT 0, months int4 DEFAULT 0, weeks int4 DEFAULT 0,
                 days int4 DEFAULT 0, hours int4 DEFAULT 0, mins int4 DEFAULT 0,
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index d93db88..1574f24 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -77,6 +77,7 @@ Node *replication_parse_result;
 %token K_LOGICAL
 %token K_SLOT
 %token K_RESERVE_WAL
+%token K_FAILOVER
 
 %type <node>	command
 %type <node>	base_backup start_replication start_logical_replication
@@ -90,6 +91,7 @@ Node *replication_parse_result;
 %type <node>	plugin_opt_arg
 %type <str>		opt_slot
 %type <boolval>	opt_reserve_wal
+%type <boolval> opt_failover
 
 %%
 
@@ -184,23 +186,25 @@ base_backup_opt:
 
 create_replication_slot:
 			/* CREATE_REPLICATION_SLOT slot PHYSICAL RESERVE_WAL */
-			K_CREATE_REPLICATION_SLOT IDENT K_PHYSICAL opt_reserve_wal
+			K_CREATE_REPLICATION_SLOT IDENT K_PHYSICAL opt_reserve_wal opt_failover
 				{
 					CreateReplicationSlotCmd *cmd;
 					cmd = makeNode(CreateReplicationSlotCmd);
 					cmd->kind = REPLICATION_KIND_PHYSICAL;
 					cmd->slotname = $2;
 					cmd->reserve_wal = $4;
+					cmd->failover = $5;
 					$$ = (Node *) cmd;
 				}
 			/* CREATE_REPLICATION_SLOT slot LOGICAL plugin */
-			| K_CREATE_REPLICATION_SLOT IDENT K_LOGICAL IDENT
+			| K_CREATE_REPLICATION_SLOT IDENT K_LOGICAL IDENT opt_failover
 				{
 					CreateReplicationSlotCmd *cmd;
 					cmd = makeNode(CreateReplicationSlotCmd);
 					cmd->kind = REPLICATION_KIND_LOGICAL;
 					cmd->slotname = $2;
 					cmd->plugin = $4;
+					cmd->failover = $5;
 					$$ = (Node *) cmd;
 				}
 			;
@@ -276,6 +280,11 @@ opt_reserve_wal:
 			| /* EMPTY */					{ $$ = false; }
 			;
 
+opt_failover:
+			K_FAILOVER						{ $$ = true; }
+			| /* EMPTY */					{ $$ = false; }
+			;
+
 opt_slot:
 			K_SLOT IDENT
 				{ $$ = $2; }
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index f430714..abc450d 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -18,6 +18,7 @@
 
 #include "access/htup_details.h"
 #include "replication/slot.h"
+#include "replication/slot_xlog.h"
 #include "replication/logical.h"
 #include "replication/logicalfuncs.h"
 #include "utils/builtins.h"
@@ -41,6 +42,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
 {
 	Name		name = PG_GETARG_NAME(0);
 	bool 		immediately_reserve = PG_GETARG_BOOL(1);
+	bool		failover = PG_GETARG_BOOL(2);
 	Datum		values[2];
 	bool		nulls[2];
 	TupleDesc	tupdesc;
@@ -57,7 +59,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
 	CheckSlotRequirements();
 
 	/* acquire replication slot, this will check for conflicting names */
-	ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT, false);
+	ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT, failover);
 
 	values[0] = NameGetDatum(&MyReplicationSlot->data.name);
 	nulls[0] = false;
@@ -96,6 +98,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 {
 	Name		name = PG_GETARG_NAME(0);
 	Name		plugin = PG_GETARG_NAME(1);
+	bool		failover = PG_GETARG_BOOL(2);
 
 	LogicalDecodingContext *ctx = NULL;
 
@@ -120,7 +123,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 	 * errors during initialization because it'll get dropped if this
 	 * transaction fails. We'll make it persistent at the end.
 	 */
-	ReplicationSlotCreate(NameStr(*name), true, RS_EPHEMERAL, false);
+	ReplicationSlotCreate(NameStr(*name), true, RS_EPHEMERAL, failover);
 
 	/*
 	 * Create logical decoding context, to build the initial snapshot.
@@ -174,7 +177,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 10
+#define PG_GET_REPLICATION_SLOTS_COLS 11
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	TupleDesc	tupdesc;
 	Tuplestorestate *tupstore;
@@ -224,6 +227,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		XLogRecPtr	restart_lsn;
 		XLogRecPtr	confirmed_flush_lsn;
 		pid_t		active_pid;
+		bool		failover;
 		Oid			database;
 		NameData	slot_name;
 		NameData	plugin;
@@ -246,6 +250,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 			namecpy(&plugin, &slot->data.plugin);
 
 			active_pid = slot->active_pid;
+			failover = slot->data.failover;
 		}
 		SpinLockRelease(&slot->mutex);
 
@@ -276,6 +281,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		else
 			nulls[i++] = true;
 
+		values[i++] = BoolGetDatum(failover);
+
 		if (xmin != InvalidTransactionId)
 			values[i++] = TransactionIdGetDatum(xmin);
 		else
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 1583862..efdbfd1 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -792,7 +792,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 
 	if (cmd->kind == REPLICATION_KIND_PHYSICAL)
 	{
-		ReplicationSlotCreate(cmd->slotname, false, RS_PERSISTENT, false);
+		ReplicationSlotCreate(cmd->slotname, false, RS_PERSISTENT, cmd->failover);
 	}
 	else
 	{
@@ -803,7 +803,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 		 * handle errors during initialization because it'll get dropped if
 		 * this transaction fails. We'll make it persistent at the end.
 		 */
-		ReplicationSlotCreate(cmd->slotname, true, RS_EPHEMERAL, false);
+		ReplicationSlotCreate(cmd->slotname, true, RS_EPHEMERAL, cmd->failover);
 	}
 
 	initStringInfo(&output_message);
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 1c0ef9a..d14ff7a 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5064,13 +5064,13 @@ DATA(insert OID = 3473 (  spg_range_quad_leaf_consistent	PGNSP PGUID 12 1 0 0 0
 DESCR("SP-GiST support for quad tree over range");
 
 /* replication slots */
-DATA(insert OID = 3779 (  pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 2 0 2249 "19 16" "{19,16,19,3220}" "{i,i,o,o}" "{slot_name,immediately_reserve,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ ));
+DATA(insert OID = 3779 (  pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 2 0 2249 "19 16 16" "{19,16,16,19,3220}" "{i,i,i,o,o}" "{slot_name,immediately_reserve,failover,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ ));
 DESCR("create a physical replication slot");
 DATA(insert OID = 3780 (  pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ ));
 DESCR("drop a replication slot");
-DATA(insert OID = 3781 (  pg_get_replication_slots	PGNSP PGUID 12 1 10 0 0 f f f f f t s s 0 0 2249 "" "{19,19,25,26,16,23,28,28,3220,3220}" "{o,o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ ));
+DATA(insert OID = 3781 (  pg_get_replication_slots	PGNSP PGUID 12 1 10 0 0 f f f f f t s s 0 0 2249 "" "{19,19,25,26,16,23,16,28,28,3220,3220}" "{o,o,o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,active,active_pid,failover,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ ));
 DESCR("information about replication slots currently in use");
-DATA(insert OID = 3786 (  pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 2 0 2249 "19 19" "{19,19,25,3220}" "{i,i,o,o}" "{slot_name,plugin,slot_name,xlog_position}" _null_ _null_ pg_create_logical_replication_slot _null_ _null_ _null_ ));
+DATA(insert OID = 3786 (  pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 2 0 2249 "19 19 16" "{19,19,16,25,3220}" "{i,i,i,o,o}" "{slot_name,plugin,failover,slot_name,xlog_position}" _null_ _null_ pg_create_logical_replication_slot _null_ _null_ _null_ ));
 DESCR("set up a logical replication slot");
 DATA(insert OID = 3782 (  pg_logical_slot_get_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v u 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,25}" "{i,i,i,v,o,o,o}" "{slot_name,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ _null_ pg_logical_slot_get_changes _null_ _null_ _null_ ));
 DESCR("get changes from replication slot");
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index d2f1edb..a8fa9d5 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -56,6 +56,7 @@ typedef struct CreateReplicationSlotCmd
 	ReplicationKind kind;
 	char	   *plugin;
 	bool		reserve_wal;
+	bool		failover;
 } CreateReplicationSlotCmd;
 
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index cdcbd37..9e23a29 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -4,6 +4,7 @@
  *
  * Copyright (c) 2012-2016, PostgreSQL Global Development Group
  *
+ * src/include/replication/slot.h
  *-------------------------------------------------------------------------
  */
 #ifndef SLOT_H
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 2bdba2d..f5dd4a8 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1414,11 +1414,12 @@ pg_replication_slots| SELECT l.slot_name,
     d.datname AS database,
     l.active,
     l.active_pid,
+    l.failover,
     l.xmin,
     l.catalog_xmin,
     l.restart_lsn,
     l.confirmed_flush_lsn
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn)
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, active, active_pid, failover, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
-- 
2.1.0

