>From 91900b87f099aaf736df6f6b5a6465ae5b334405 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Mon, 10 Aug 2015 16:10:13 +0200
Subject: [PATCH 2/2] Allow pg_create_physical_replication_slot() to reserve
 WAL.

When creating a physical slot it's often useful to immediately reserve
the current WAL position instead of only doing after the first feedback
message arrives. That e.g. allows slots to guarantee that all the WAL
for a base backup will be available afterwards.

Logical slots already have to reserve WAL during creation, so generalize
that logic into being usable for both physical and logical slots.

Author: Gurjeet Singh
Reviewed-By: Andres Freund
Discussion: CABwTF4Wh_dBCzTU=49pFXR6coR4NW1ynb+vBqT+Po=7fuq5iCw@mail.gmail.com
---
 doc/src/sgml/func.sgml                    |  8 +++-
 src/backend/catalog/system_views.sql      |  7 +++
 src/backend/replication/logical/logical.c | 47 +-------------------
 src/backend/replication/slot.c            | 71 +++++++++++++++++++++++++++++++
 src/backend/replication/slotfuncs.c       | 21 ++++++++-
 src/include/catalog/catversion.h          |  2 +-
 src/include/catalog/pg_proc.h             |  2 +-
 src/include/replication/slot.h            |  1 +
 8 files changed, 107 insertions(+), 52 deletions(-)

diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 4fcc4fe..b1df55d 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -17211,7 +17211,7 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
         <indexterm>
          <primary>pg_create_physical_replication_slot</primary>
         </indexterm>
-        <literal><function>pg_create_physical_replication_slot(<parameter>slot_name</parameter> <type>name</type>)</function></literal>
+        <literal><function>pg_create_physical_replication_slot(<parameter>slot_name</parameter> <type>name</type><optional>, <parameter>immediately_reserve</> <type>boolean</> </optional>)</function></literal>
        </entry>
        <entry>
         (<parameter>slot_name</parameter> <type>name</type>, <parameter>xlog_position</parameter> <type>pg_lsn</type>)
@@ -17221,7 +17221,11 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
         <parameter>slot_name</parameter>. Streaming changes from a physical slot
         is only possible with the streaming-replication protocol - see <xref
         linkend="protocol-replication">. Corresponds to the replication protocol
-        command <literal>CREATE_REPLICATION_SLOT ... PHYSICAL</literal>.
+        command <literal>CREATE_REPLICATION_SLOT ... PHYSICAL</literal>. The optional
+        second parameter,  when <literal>true</>, specifies that the <acronym>LSN</>
+        for this replication slot be reserved immediately; the <acronym<LSN</>
+        is otherwise reserved on first connection from a streaming replication
+        client.
        </entry>
       </row>
       <row>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 3190c7f..ccc030f 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -917,6 +917,13 @@ LANGUAGE INTERNAL
 VOLATILE ROWS 1000 COST 1000
 AS 'pg_logical_slot_peek_binary_changes';
 
+CREATE OR REPLACE FUNCTION pg_create_physical_replication_slot(
+    IN slot_name name, IN immediately_reserve boolean DEFAULT false,
+    OUT slot_name name, OUT xlog_position pg_lsn)
+RETURNS RECORD
+LANGUAGE INTERNAL
+AS 'pg_create_physical_replication_slot';
+
 CREATE OR REPLACE FUNCTION
   make_interval(years int4 DEFAULT 0, months int4 DEFAULT 0, weeks int4 DEFAULT 0,
                 days int4 DEFAULT 0, hours int4 DEFAULT 0, mins int4 DEFAULT 0,
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index d737288..3e84116 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -253,52 +253,7 @@ CreateInitDecodingContext(char *plugin,
 	StrNCpy(NameStr(slot->data.plugin), plugin, NAMEDATALEN);
 	SpinLockRelease(&slot->mutex);
 
-	/*
-	 * The replication slot mechanism is used to prevent removal of required
-	 * WAL. As there is no interlock between this and checkpoints required WAL
-	 * could be removed before ReplicationSlotsComputeRequiredLSN() has been
-	 * called to prevent that. In the very unlikely case that this happens
-	 * we'll just retry.
-	 */
-	while (true)
-	{
-		XLogSegNo	segno;
-
-		/*
-		 * Let's start with enough information if we can, so log a standby
-		 * snapshot and start decoding at exactly that position.
-		 */
-		if (!RecoveryInProgress())
-		{
-			XLogRecPtr	flushptr;
-
-			/* start at current insert position */
-			slot->data.restart_lsn = GetXLogInsertRecPtr();
-
-			/* make sure we have enough information to start */
-			flushptr = LogStandbySnapshot();
-
-			/* and make sure it's fsynced to disk */
-			XLogFlush(flushptr);
-		}
-		else
-			slot->data.restart_lsn = GetRedoRecPtr();
-
-		/* prevent WAL removal as fast as possible */
-		ReplicationSlotsComputeRequiredLSN();
-
-		/*
-		 * If all required WAL is still there, great, otherwise retry. The
-		 * slot should prevent further removal of WAL, unless there's a
-		 * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
-		 * the new restart_lsn above, so normally we should never need to loop
-		 * more than twice.
-		 */
-		XLByteToSeg(slot->data.restart_lsn, segno);
-		if (XLogGetLastRemovedSegno() < segno)
-			break;
-	}
-
+	ReplicationSlotReserveWal();
 
 	/* ----
 	 * This is a bit tricky: We need to determine a safe xmin horizon to start
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 4d33629..6bd6505 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -40,6 +40,7 @@
 #include <sys/stat.h>
 
 #include "access/transam.h"
+#include "access/xlog_internal.h"
 #include "common/string.h"
 #include "miscadmin.h"
 #include "replication/slot.h"
@@ -782,6 +783,76 @@ CheckSlotRequirements(void)
 }
 
 /*
+ * Reserve WAL for the currently active slot.
+ *
+ * Compute and set restart_lsn in a manner that's appropriate for the type of
+ * the slot and concurrency safe.
+ */
+void
+ReplicationSlotReserveWal(void)
+{
+	ReplicationSlot *slot = MyReplicationSlot;
+
+	Assert(slot != NULL);
+	Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
+
+	/*
+	 * The replication slot mechanism is used to prevent removal of required
+	 * WAL. As there is no interlock between this routine and checkpoints, WAL
+	 * segments could concurrently be removed when a now stale return value of
+	 * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
+	 * this happens we'll just retry.
+	 */
+	while (true)
+	{
+		XLogSegNo	segno;
+
+		/*
+		 * For logical slots log a standby snapshot and start logical decoding
+		 * at exactly that position. That allows the slot to start up more
+		 * quickly.
+		 *
+		 * That's not needed (or indeed helpful) for physical slots as they'll
+		 * start replay at the last logged checkpoint anyway. Instead return
+		 * the location of the last redo LSN. While that slightly increases
+		 * the chance that we have to retry, it's where a base backup has to
+		 * start replay at.
+		 */
+		if (!RecoveryInProgress() && SlotIsLogical(slot))
+		{
+			XLogRecPtr	flushptr;
+
+			/* start at current insert position */
+			slot->data.restart_lsn = GetXLogInsertRecPtr();
+
+			/* make sure we have enough information to start */
+			flushptr = LogStandbySnapshot();
+
+			/* and make sure it's fsynced to disk */
+			XLogFlush(flushptr);
+		}
+		else
+		{
+			slot->data.restart_lsn = GetRedoRecPtr();
+		}
+
+		/* prevent WAL removal as fast as possible */
+		ReplicationSlotsComputeRequiredLSN();
+
+		/*
+		 * If all required WAL is still there, great, otherwise retry. The
+		 * slot should prevent further removal of WAL, unless there's a
+		 * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
+		 * the new restart_lsn above, so normally we should never need to loop
+		 * more than twice.
+		 */
+		XLByteToSeg(slot->data.restart_lsn, segno);
+		if (XLogGetLastRemovedSegno() < segno)
+			break;
+	}
+}
+
+/*
  * Flush all replication slots to disk.
  *
  * This needn't actually be part of a checkpoint, but it's a convenient
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index ecfcb07..2dc6827 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -40,6 +40,7 @@ Datum
 pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
 {
 	Name		name = PG_GETARG_NAME(0);
+	bool 		immediately_reserve = PG_GETARG_BOOL(1);
 	Datum		values[2];
 	bool		nulls[2];
 	TupleDesc	tupdesc;
@@ -59,9 +60,25 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
 	ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT);
 
 	values[0] = NameGetDatum(&MyReplicationSlot->data.name);
-
 	nulls[0] = false;
-	nulls[1] = true;
+
+	if (immediately_reserve)
+	{
+		/* Reserve WAL as the user asked for it */
+		ReplicationSlotReserveWal();
+
+		/* Write this slot to disk */
+		ReplicationSlotMarkDirty();
+		ReplicationSlotSave();
+
+		values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn);
+		nulls[1] = false;
+	}
+	else
+	{
+		values[0] = NameGetDatum(&MyReplicationSlot->data.name);
+		nulls[1] = true;
+	}
 
 	tuple = heap_form_tuple(tupdesc, values, nulls);
 	result = HeapTupleGetDatum(tuple);
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index 8cd6772..16e9c14 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -53,6 +53,6 @@
  */
 
 /*							yyyymmddN */
-#define CATALOG_VERSION_NO	201508101
+#define CATALOG_VERSION_NO	201508102
 
 #endif
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 5163962..ddf7c67 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5193,7 +5193,7 @@ DATA(insert OID = 3473 (  spg_range_quad_leaf_consistent	PGNSP PGUID 12 1 0 0 0
 DESCR("SP-GiST support for quad tree over range");
 
 /* replication slots */
-DATA(insert OID = 3779 (  pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2249 "19" "{19,19,3220}" "{i,o,o}" "{slot_name,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ ));
+DATA(insert OID = 3779 (  pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 2 0 2249 "19 16" "{19,16,19,3220}" "{i,i,o,o}" "{slot_name,immediately_reserve,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ ));
 DESCR("create a physical replication slot");
 DATA(insert OID = 3780 (  pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ ));
 DESCR("drop a replication slot");
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 367ef0a..20dd7a2 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -166,6 +166,7 @@ extern void ReplicationSlotMarkDirty(void);
 
 /* misc stuff */
 extern bool ReplicationSlotValidateName(const char *name, int elevel);
+extern void ReplicationSlotReserveWal(void);
 extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
 extern void ReplicationSlotsComputeRequiredLSN(void);
 extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
-- 
2.3.0.149.gf3f4077.dirty

