From 8a20341eaf165252c685d92c01718222923c4799 Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Thu, 9 Mar 2017 10:53:55 +0800
Subject: [PATCH 3/3] Documentation and comments fixes relating to replication
 position tracking

There is some confusing code in the walsender and in logical decoding that
treats a "sentPos" variable as "sent up to but not including", and treats the
"confirmed_flush" value in a replication slot as "confirmed up to but not
including".

The latter has already resulted in an off-by-one error where confirmed_flush
was assumed to mean "confirmed flushed up and including" in some places.

Additionally, some log message output for logical decoding stated that
commits "after" the specified LSN would be replayed, when in fact it is commits
beginning at or after the specified LSN that will  be replayed.

Improve comments, messages and documentation to emphasise that sentPtr and
confirmed_flush are "up to and excluding", that the pg_stat_replication fields
are the first un-sent/un-flushed/un-committed LSN, that replication functions
start at the LSN specified inclusive, etc.

Additionally, document the restart_lsn and confirmed_flush lsn in struct
ReplicationSlotPersistentData.
---
 doc/src/sgml/func.sgml                      |  8 +++---
 doc/src/sgml/logicaldecoding.sgml           |  3 ++-
 doc/src/sgml/monitoring.sgml                | 15 +++++++----
 doc/src/sgml/protocol.sgml                  | 20 +++++++++-----
 src/backend/replication/logical/logical.c   | 21 ++++++++++-----
 src/backend/replication/walsender.c         | 42 +++++++++++++++++++++++------
 src/include/replication/reorderbuffer.h     |  4 +--
 src/include/replication/slot.h              | 25 +++++++++++++----
 src/include/replication/walsender_private.h |  2 +-
 9 files changed, 103 insertions(+), 37 deletions(-)

diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 583b3b2..f6f74cd 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -18815,9 +18815,11 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup());
         (<parameter>location</parameter> <type>pg_lsn</type>, <parameter>xid</parameter> <type>xid</type>, <parameter>data</parameter> <type>text</type>)
        </entry>
        <entry>
-        Returns changes in the slot <parameter>slot_name</parameter>, starting
-        from the point at which since changes have been consumed last.  If
-        <parameter>upto_lsn</> and <parameter>upto_nchanges</> are NULL,
+        Returns changes in the <link
+        linkend="logicaldecoding-replication-slots">logical slot</>
+        <parameter>slot_name</parameter>, starting from the point at which
+        since changes have been consumed last.  If <parameter>upto_lsn</> and
+        <parameter>upto_nchanges</> are NULL,
         logical decoding will continue until end of WAL.  If
         <parameter>upto_lsn</> is non-NULL, decoding will include only
         those transactions which commit prior to the specified LSN.  If
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 03c2c69..30874ce 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -11,7 +11,8 @@
   </para>
 
   <para>
-   Changes are sent out in streams identified by logical replication slots.
+   Changes are sent out in streams identified by <link
+   linkend="logicaldecoding-replication-slots"> logical replication slots.</>
   </para>
 
   <para>
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 4d03531..7f33f91 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1404,24 +1404,29 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
     <row>
      <entry><structfield>sent_location</></entry>
      <entry><type>pg_lsn</></entry>
-     <entry>Last transaction log position sent on this connection</entry>
+     <entry>
+      Last transaction log position + 1 sent on this connection, or, for
+      logical decoding sessions, the log position of the last record processed
+      and buffered for sending at commit-time.
+     </entry>
     </row>
     <row>
      <entry><structfield>write_location</></entry>
      <entry><type>pg_lsn</></entry>
-     <entry>Last transaction log position written to disk by this standby
-      server</entry>
+     <entry>
+      Last transaction log position + 1 written to disk by this standby
+      server, i.e the start of the first record not written.</entry>
     </row>
     <row>
      <entry><structfield>flush_location</></entry>
      <entry><type>pg_lsn</></entry>
-     <entry>Last transaction log position flushed to disk by this standby
+     <entry>Last transaction log position + 1 flushed to disk by this standby
       server</entry>
     </row>
     <row>
      <entry><structfield>replay_location</></entry>
      <entry><type>pg_lsn</></entry>
-     <entry>Last transaction log position replayed into the database on this
+     <entry>Last transaction log position + 1 replayed into the database on this
       standby server</entry>
     </row>
     <row>
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 3d6e8ee..f8233df 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1925,11 +1925,13 @@ The commands accepted in walsender mode are:
     <term><literal>START_REPLICATION</literal> <literal>SLOT</literal> <replaceable class="parameter">slot_name</> <literal>LOGICAL</literal> <replaceable class="parameter">XXX/XXX</> [ ( <replaceable>option_name</replaceable> [ <replaceable>option_value</replaceable> ] [, ...] ) ]</term>
     <listitem>
      <para>
-      Instructs server to start streaming WAL for logical replication, starting
-      at WAL position <replaceable class="parameter">XXX/XXX</>. The server can
-      reply with an error, for example if the requested section of WAL has already
-      been recycled. On success, server responds with a CopyBothResponse
-      message, and then starts to stream WAL to the frontend.
+      Instructs server to start streaming WAL for <link
+      linkend="logicaldecoding">logical decoding</>, starting at the first
+      commit with starting WAL position equal to or greater than <replaceable
+      class="parameter">XXX/XXX</>. The server can reply with an error, for
+      example if the requested section of WAL has already been recycled. On
+      success, server responds with a <literal>CopyBothResponse</> message, and
+      then starts to stream WAL to the frontend.
      </para>
 
      <para>
@@ -1958,7 +1960,13 @@ The commands accepted in walsender mode are:
        <term><replaceable class="parameter">XXX/XXX</></term>
        <listitem>
         <para>
-         The WAL position to begin streaming at.
+         The WAL position to begin streaming commits at. Inclusive; a commit record
+         beginning at exactly XXX/XXX will be streamed to the client.
+        </para>
+        <para>
+         If the requested WAL position is less than the <literal>confirmed_flush_lsn</>
+         for <replaceable>SLOT</> it will be ignored and the confirmed flush position
+         will be used as the start point.
         </para>
        </listitem>
       </varlistentry>
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 7e03f33..68193de 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -304,10 +304,13 @@ CreateInitDecodingContext(char *plugin,
  * used already.
  *
  * start_lsn
- *		The LSN at which to start decoding.  If InvalidXLogRecPtr, restart
- *		from the slot's confirmed_flush; otherwise, start from the specified
- *		location (but move it forwards to confirmed_flush if it's older than
- *		that, see below).
+ *		The LSN at which to start sending commits to the output plugin.  If
+ *		InvalidXLogRecPtr, restart from the slot's confirmed_flush; otherwise,
+ *		start from the specified location (but move it forwards to
+ *		confirmed_flush if it's older than that, see below).
+ *
+ *		start_lsn is inclusive, so a commit beginning exactly at start_lsn
+ *		will be sent to the client.
  *
  * output_plugin_options
  *		contains options passed to the output plugin.
@@ -320,6 +323,10 @@ CreateInitDecodingContext(char *plugin,
  * as the decoding context because further memory contexts will be created
  * inside it.
  *
+ * WAL reading and logical decoding always starts at restart_lsn and is not
+ * controlled by start_lsn. That argument only controls which decoded commits
+ * are sent to the client.
+ *
  * Returns an initialized decoding context after calling the output plugin's
  * startup function.
  */
@@ -389,7 +396,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 	ereport(LOG,
 			(errmsg("starting logical decoding for slot \"%s\"",
 					NameStr(slot->data.name)),
-			 errdetail("streaming transactions committing after %X/%X, reading WAL from %X/%X",
+			 errdetail("streaming transactions committing at or after %X/%X, reading WAL from %X/%X",
 					   (uint32) (slot->data.confirmed_flush >> 32),
 					   (uint32) slot->data.confirmed_flush,
 					   (uint32) (slot->data.restart_lsn >> 32),
@@ -446,6 +453,7 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 		CHECK_FOR_INTERRUPTS();
 	}
 
+	/* Start output to client for commits after end of last record */
 	ctx->slot->data.confirmed_flush = ctx->reader->EndRecPtr;
 }
 
@@ -885,7 +893,8 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 }
 
 /*
- * Handle a consumer's confirmation having received all changes up to lsn.
+ * Handle a consumer's confirmation having received all changes up to (but not
+ * including) lsn.
  */
 void
 LogicalConfirmReceivedLocation(XLogRecPtr lsn)
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index ec5d9db..35a0e7c 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -142,8 +142,10 @@ static bool sendTimeLineIsHistoric = false;
 static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr;
 
 /*
- * How far have we sent WAL already? This is also advertised in
- * MyWalSnd->sentPtr.  (Actually, this is the next WAL location to send.)
+ * Position up to, but not including, which we have sent WAL already.
+ * The next request will start from this position.
+ *
+ * Also advertised in MyWalSnd->sentPtr.
  */
 static XLogRecPtr sentPtr = 0;
 
@@ -888,6 +890,8 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd)
 /*
  * Load previously initiated logical slot and prepare for sending data (via
  * WalSndLoop).
+ *
+ * Handles START_REPLICATION ... LOGICAL ...
  */
 static void
 StartLogicalReplication(StartReplicationCmd *cmd)
@@ -927,20 +931,28 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 	sendTimeLine = ThisTimeLineID;
 
 	/*
-	 * Initialize position to the last ack'ed one, then the xlog records begin
-	 * to be shipped from that position.
+	 * Let logical decoding decide where to start reading WAL and where to
+	 * start sending commits to the client, giving it the client-supplied start
+	 * point so it can skip over any unwanted commits the client has already
+	 * processed.
 	 */
 	logical_decoding_ctx = CreateDecodingContext(
 											   cmd->startpoint, cmd->options,
 												 logical_read_xlog_page,
 										WalSndPrepareWrite, WalSndWriteData);
 
-	/* Start reading WAL from the oldest required WAL. */
+	/*
+	 * Start reading WAL from the oldest required WAL.
+	 *
+	 * This is just a parameter to XLogSendLogical passed via a global.
+	 */
 	logical_startptr = MyReplicationSlot->data.restart_lsn;
 
 	/*
-	 * Report the location after which we'll send out further commits as the
-	 * current sentPtr.
+	 * Report the location we start processing WAL from as the "sent" location,
+	 * even though it will generally be well behind cmd->startpoint.
+	 *
+	 * See XLogSendLogical for rationale.
 	 */
 	sentPtr = MyReplicationSlot->data.restart_lsn;
 
@@ -2388,7 +2400,7 @@ XLogSendLogical(void)
 	WalSndCaughtUp = false;
 
 	record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm);
-	logical_startptr = InvalidXLogRecPtr;
+	logical_startptr = InvalidXLogRecPtr; /* no longer used */
 
 	/* xlog record was invalid */
 	if (errm != NULL)
@@ -2398,6 +2410,20 @@ XLogSendLogical(void)
 	{
 		LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
 
+		/*
+		 * Report the "sent" pointer as the LSN after the end of the most
+		 * recent xlog record we've processed in logical decoding. This is
+		 * somewhat misleading since we have "sent" the record to logical
+		 * decoding, but not yet to the output plugin or the client its self.
+		 *
+		 * Due to reorder buffer processing we can't really report any other
+		 * measure of progress in terms of an LSN, though. If we reported
+		 * the LSN of the last row change during reorder buffer commit
+		 * processing the LSNs would go backwards whenever we started
+		 * processing the next commit (if they were running concurrently),
+		 * and we'd have nothing to report when we weren't processing a
+		 * commit since we're just buffering.
+		 */
 		sentPtr = logical_decoding_ctx->reader->EndRecPtr;
 	}
 	else
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 17e47b3..4b2f73c 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -160,8 +160,8 @@ typedef struct ReorderBufferTXN
 	XLogRecPtr	first_lsn;
 
 	/* ----
-	 * LSN of the record that lead to this xact to be committed or
-	 * aborted. This can be a
+	 * LSN of the beginning of the record that lead to this xact to be
+	 * committed or aborted. This can be a
 	 * * plain commit record
 	 * * plain commit record, of a parent transaction
 	 * * prepared transaction commit
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 62cacdb..81b74e2 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -64,14 +64,29 @@ typedef struct ReplicationSlotPersistentData
 	 */
 	TransactionId catalog_xmin;
 
-	/* oldest LSN that might be required by this replication slot */
+	/*
+	 * Oldest LSN that might be required by this replication slot.
+	 *
+	 * For logical decoding this points to the most recent xl_running_xacts
+	 * record prior to the xid allocation (BEGIN) of the oldest xact the client
+	 * has not yet confirmed replay of. WAL will be re-read from this LSN and
+	 * needed changes and invalidations will be assembled into reorder buffers.
+	 */
 	XLogRecPtr	restart_lsn;
 
 	/*
-	 * Oldest LSN that the client has acked receipt for.  This is used as the
-	 * start_lsn point in case the client doesn't specify one, and also as a
-	 * safety measure to jump forwards in case the client specifies a
-	 * start_lsn that's further in the past than this value.
+	 * The client has acked all records up to but not including confirmed_flush
+	 * as safely flushed to client storage.
+	 *
+	 * This is used as the point at which logical decoding begins sending
+	 * changes to the client if the client doesn't specify one. It also serves
+	 * as a safety measure to (silently) jump forwards in case the client
+	 * specifies a start_lsn that's further in the past than this value.
+	 *
+	 * Logical decoding may only invoke the output plugin for changes where the
+	 * start of the commit record is equal to or greater than to this LSN.
+	 * catalog_xmin may have been advanced so that needed catalogs for any
+	 * earlier commits have been vacuumed away.
 	 */
 	XLogRecPtr	confirmed_flush;
 
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 5e6ccfc..2db0b2a 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -34,7 +34,7 @@ typedef struct WalSnd
 {
 	pid_t		pid;			/* this walsender's process id, or 0 */
 	WalSndState state;			/* this walsender's state */
-	XLogRecPtr	sentPtr;		/* WAL has been sent up to this point */
+	XLogRecPtr	sentPtr;		/* WAL has been sent up to (but not including) this point */
 	bool		needreload;		/* does currently-open file need to be
 								 * reloaded? */
 
-- 
2.5.5

