Review for GetWALAvailability()

Started by Fujii Masaoover 5 years ago38 messages
#1Fujii Masao
masao.fujii@oss.nttdata.com
1 attachment(s)

Hi,

The document explains that "lost" value that
pg_replication_slots.wal_status reports means

some WAL files are definitely lost and this slot cannot be used to resume replication anymore.

However, I observed "lost" value while inserting lots of records,
but replication could continue normally. So I wonder if
pg_replication_slots.wal_status may have a bug.

wal_status is calculated in GetWALAvailability(), and probably I found
some issues in it.

keepSegs = ConvertToXSegs(Max(max_wal_size_mb, wal_keep_segments),
wal_segment_size) + 1;

max_wal_size_mb is the number of megabytes. wal_keep_segments is
the number of WAL segment files. So it's strange to calculate max of them.
The above should be the following?

Max(ConvertToXSegs(max_wal_size_mb, wal_segment_size), wal_keep_segments) + 1

if ((max_slot_wal_keep_size_mb <= 0 ||
max_slot_wal_keep_size_mb >= max_wal_size_mb) &&
oldestSegMaxWalSize <= targetSeg)
return WALAVAIL_NORMAL;

This code means that wal_status reports "normal" only when
max_slot_wal_keep_size is negative or larger than max_wal_size.
Why is this condition necessary? The document explains "normal
means that the claimed files are within max_wal_size". So whatever
max_slot_wal_keep_size value is, IMO that "normal" should be
reported if the WAL files claimed by the slot are within max_wal_size.
Thought?

Or, if that condition is really necessary, the document should be
updated so that the note about the condition is added.

If the WAL files claimed by the slot exceeds max_slot_wal_keep_size
but any those WAL files have not been removed yet, wal_status seems
to report "lost". Is this expected behavior? Per the meaning of "lost"
described in the document, "lost" should be reported only when
any claimed files are removed, I think. Thought?

Or this behavior is expected and the document is incorrect?

BTW, if we want to implement GetWALAvailability() as the document
advertises, we can simplify it like the attached POC patch.

Regards,

--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION

Attachments:

simple_getwalavailability_wip.patchtext/plain; charset=UTF-8; name=simple_getwalavailability_wip.patch; x-mac-creator=0; x-mac-type=0Download
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 55cac186dc..0b9cca2173 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -9504,62 +9504,29 @@ GetWALAvailability(XLogRecPtr targetLSN)
 	XLogSegNo	currSeg;		/* segid of currpos */
 	XLogSegNo	targetSeg;		/* segid of targetLSN */
 	XLogSegNo	oldestSeg;		/* actual oldest segid */
-	XLogSegNo	oldestSegMaxWalSize;	/* oldest segid kept by max_wal_size */
-	XLogSegNo	oldestSlotSeg = InvalidXLogRecPtr;	/* oldest segid kept by
-													 * slot */
 	uint64		keepSegs;
 
 	/* slot does not reserve WAL. Either deactivated, or has never been active */
 	if (XLogRecPtrIsInvalid(targetLSN))
 		return WALAVAIL_INVALID_LSN;
 
-	currpos = GetXLogWriteRecPtr();
-
 	/* calculate oldest segment currently needed by slots */
 	XLByteToSeg(targetLSN, targetSeg, wal_segment_size);
-	KeepLogSeg(currpos, &oldestSlotSeg);
 
-	/*
-	 * Find the oldest extant segment file. We get 1 until checkpoint removes
-	 * the first WAL segment file since startup, which causes the status being
-	 * wrong under certain abnormal conditions but that doesn't actually harm.
-	 */
-	oldestSeg = XLogGetLastRemovedSegno() + 1;
+	/* Find the oldest extant segment file */
+	oldestSeg = XLogGetLastRemovedSegno();
 
-	/* calculate oldest segment by max_wal_size and wal_keep_segments */
+	if (targetSeg <= oldestSeg)
+		return WALAVAIL_REMOVED;
+
+	currpos = GetXLogWriteRecPtr();
 	XLByteToSeg(currpos, currSeg, wal_segment_size);
-	keepSegs = ConvertToXSegs(Max(max_wal_size_mb, wal_keep_segments),
-							  wal_segment_size) + 1;
+	keepSegs = ConvertToXSegs(max_wal_size_mb, wal_segment_size);
 
-	if (currSeg > keepSegs)
-		oldestSegMaxWalSize = currSeg - keepSegs;
-	else
-		oldestSegMaxWalSize = 1;
-
-	/*
-	 * If max_slot_wal_keep_size has changed after the last call, the segment
-	 * that would been kept by the current setting might have been lost by the
-	 * previous setting. No point in showing normal or keeping status values
-	 * if the targetSeg is known to be lost.
-	 */
-	if (targetSeg >= oldestSeg)
-	{
-		/*
-		 * show "normal" when targetSeg is within max_wal_size, even if
-		 * max_slot_wal_keep_size is smaller than max_wal_size.
-		 */
-		if ((max_slot_wal_keep_size_mb <= 0 ||
-			 max_slot_wal_keep_size_mb >= max_wal_size_mb) &&
-			oldestSegMaxWalSize <= targetSeg)
+	if (currSeg - targetSeg <= keepSegs)
 			return WALAVAIL_NORMAL;
 
-		/* being retained by slots */
-		if (oldestSlotSeg <= targetSeg)
-			return WALAVAIL_RESERVED;
-	}
-
-	/* Definitely lost */
-	return WALAVAIL_REMOVED;
+	return WALAVAIL_RESERVED;
 }
 
 
#2Kyotaro Horiguchi
horikyota.ntt@gmail.com
In reply to: Fujii Masao (#1)
1 attachment(s)
Re: Review for GetWALAvailability()

At Sat, 13 Jun 2020 01:38:49 +0900, Fujii Masao <masao.fujii@oss.nttdata.com> wrote in

Hi,

The document explains that "lost" value that
pg_replication_slots.wal_status reports means

some WAL files are definitely lost and this slot cannot be used to
resume replication anymore.

However, I observed "lost" value while inserting lots of records,
but replication could continue normally. So I wonder if
pg_replication_slots.wal_status may have a bug.

wal_status is calculated in GetWALAvailability(), and probably I found
some issues in it.

keepSegs = ConvertToXSegs(Max(max_wal_size_mb, wal_keep_segments),
wal_segment_size) +
1;

max_wal_size_mb is the number of megabytes. wal_keep_segments is
the number of WAL segment files. So it's strange to calculate max of
them.

Oops! I don't want to believe I did that but it's definitely wrong.

The above should be the following?

Max(ConvertToXSegs(max_wal_size_mb, wal_segment_size),
wal_keep_segments) + 1

Looks reasonable.

if ((max_slot_wal_keep_size_mb <= 0 ||
max_slot_wal_keep_size_mb >= max_wal_size_mb) &&
oldestSegMaxWalSize <= targetSeg)
return WALAVAIL_NORMAL;

This code means that wal_status reports "normal" only when
max_slot_wal_keep_size is negative or larger than max_wal_size.
Why is this condition necessary? The document explains "normal
means that the claimed files are within max_wal_size". So whatever
max_slot_wal_keep_size value is, IMO that "normal" should be
reported if the WAL files claimed by the slot are within max_wal_size.
Thought?

It was a kind of hard to decide. Even when max_slot_wal_keep_size is
smaller than max_wal_size, the segments more than
max_slot_wal_keep_size are not guaranteed to be kept. In that case
the state transits as NORMAL->LOST skipping the "RESERVED" state.
Putting aside whether the setting is useful or not, I thought that the
state transition is somewhat abrupt.

Or, if that condition is really necessary, the document should be
updated so that the note about the condition is added.

Does the following make sense?

https://www.postgresql.org/docs/13/view-pg-replication-slots.html

normal means that the claimed files are within max_wal_size.
+ If max_slot_wal_keep_size is smaller than max_wal_size, this state
+ will not appear.

If the WAL files claimed by the slot exceeds max_slot_wal_keep_size
but any those WAL files have not been removed yet, wal_status seems
to report "lost". Is this expected behavior? Per the meaning of "lost"
described in the document, "lost" should be reported only when
any claimed files are removed, I think. Thought?

Or this behavior is expected and the document is incorrect?

In short, it is known behavior but it was judged as useless to prevent
that.

That can happen when checkpointer removes up to the segment that is
being read by walsender. I think that that doesn't happen (or
happenswithin a narrow time window?) for physical replication but
happenes for logical replication.

While development, I once added walsender a code to exit for that
reason, but finally it is moved to InvalidateObsoleteReplicationSlots
as a bit defferent function. With the current mechanism, there's a
case where once invalidated slot came to revive but we decided to
allow that behavior, but forgot to document that.

Anyway if you see "lost", something bad is being happening.

- lost means that some WAL files are definitely lost and this slot
- cannot be used to resume replication anymore.
+ lost means that some required WAL files are removed and this slot is
+ no longer usable after once disconnected during this status.

If it is crucial that the "lost" state may come back to reserved or
normal state,

+ Note that there are cases where the state moves back to reserved or
+ normal state when all wal senders have left the just removed segment
+ before being terminated.

There is a case where the state moves back to reserved or normal state when wal senders leaves the just removed segment before being terminated.

BTW, if we want to implement GetWALAvailability() as the document
advertises, we can simplify it like the attached POC patch.

I'm not sure it is right that the patch removes wal_keep_segments from
the function.

regards.

--
Kyotaro Horiguchi
NTT Open Source Software Center

Attachments:

fix_GetWalAvailability.patchtext/x-patch; charset=us-asciiDownload
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 700271fd40..199053dd4a 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -11240,18 +11240,25 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
        <itemizedlist>
         <listitem>
          <para><literal>normal</literal> means that the claimed files
-          are within <varname>max_wal_size</varname>.</para>
+          are within <varname>max_wal_size</varname>. If
+          <varname>max_slot_wal_keep_size</varname> is smaller than
+          <varname>max_wal_size</varname>, this state will not appear.</para>
         </listitem>
         <listitem>
          <para><literal>reserved</literal> means
           that <varname>max_wal_size</varname> is exceeded but the files are
           still held, either by some replication slot or
-          by <varname>wal_keep_segments</varname>.</para>
+          by <varname>wal_keep_segments</varname>.
+          </para>
         </listitem>
         <listitem>
-         <para><literal>lost</literal> means that some WAL files are
-          definitely lost and this slot cannot be used to resume replication
-          anymore.</para>
+          <para>
+            <literal>lost</literal> means that some required WAL files are
+			removed and this slot is no longer usable after once disconnected
+			during this state. Note that there are cases where the state moves
+			back to reserved or normal state when all wal senders have left
+			the just removed segment before being terminated.
+          </para>
         </listitem>
        </itemizedlist>
        The last two states are seen only when
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 55cac186dc..d1501d0cf7 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -9528,8 +9528,8 @@ GetWALAvailability(XLogRecPtr targetLSN)
 
 	/* calculate oldest segment by max_wal_size and wal_keep_segments */
 	XLByteToSeg(currpos, currSeg, wal_segment_size);
-	keepSegs = ConvertToXSegs(Max(max_wal_size_mb, wal_keep_segments),
-							  wal_segment_size) + 1;
+	keepSegs = Max(ConvertToXSegs(max_wal_size_mb, wal_keep_segments),
+				   wal_segment_size) + 1;
 
 	if (currSeg > keepSegs)
 		oldestSegMaxWalSize = currSeg - keepSegs;
#3Kyotaro Horiguchi
horikyota.ntt@gmail.com
In reply to: Kyotaro Horiguchi (#2)
1 attachment(s)
Re: Review for GetWALAvailability()

At Mon, 15 Jun 2020 13:42:25 +0900 (JST), Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote in

Oops! I don't want to believe I did that but it's definitely wrong.

Hmm. Quite disappointing. The patch was just a crap.
This is the right patch.

regards.

--
Kyotaro Horiguchi
NTT Open Source Software Center

Attachments:

fix_GetWalAvailability.patchtext/x-patch; charset=us-asciiDownload
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 700271fd40..199053dd4a 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -11240,18 +11240,25 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
        <itemizedlist>
         <listitem>
          <para><literal>normal</literal> means that the claimed files
-          are within <varname>max_wal_size</varname>.</para>
+          are within <varname>max_wal_size</varname>. If
+          <varname>max_slot_wal_keep_size</varname> is smaller than
+          <varname>max_wal_size</varname>, this state will not appear.</para>
         </listitem>
         <listitem>
          <para><literal>reserved</literal> means
           that <varname>max_wal_size</varname> is exceeded but the files are
           still held, either by some replication slot or
-          by <varname>wal_keep_segments</varname>.</para>
+          by <varname>wal_keep_segments</varname>.
+          </para>
         </listitem>
         <listitem>
-         <para><literal>lost</literal> means that some WAL files are
-          definitely lost and this slot cannot be used to resume replication
-          anymore.</para>
+          <para>
+            <literal>lost</literal> means that some required WAL files are
+			removed and this slot is no longer usable after once disconnected
+			during this state. Note that there are cases where the state moves
+			back to reserved or normal state when all wal senders have left
+			the just removed segment before being terminated.
+          </para>
         </listitem>
        </itemizedlist>
        The last two states are seen only when
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 55cac186dc..d6fe205eb4 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -9528,8 +9528,8 @@ GetWALAvailability(XLogRecPtr targetLSN)
 
 	/* calculate oldest segment by max_wal_size and wal_keep_segments */
 	XLByteToSeg(currpos, currSeg, wal_segment_size);
-	keepSegs = ConvertToXSegs(Max(max_wal_size_mb, wal_keep_segments),
-							  wal_segment_size) + 1;
+	keepSegs = Max(ConvertToXSegs(max_wal_size_mb, wal_segment_size),
+				   wal_keep_segments) + 1;
 
 	if (currSeg > keepSegs)
 		oldestSegMaxWalSize = currSeg - keepSegs;
#4Fujii Masao
masao.fujii@oss.nttdata.com
In reply to: Kyotaro Horiguchi (#2)
Re: Review for GetWALAvailability()

On 2020/06/15 13:42, Kyotaro Horiguchi wrote:

At Sat, 13 Jun 2020 01:38:49 +0900, Fujii Masao <masao.fujii@oss.nttdata.com> wrote in

Hi,

The document explains that "lost" value that
pg_replication_slots.wal_status reports means

some WAL files are definitely lost and this slot cannot be used to
resume replication anymore.

However, I observed "lost" value while inserting lots of records,
but replication could continue normally. So I wonder if
pg_replication_slots.wal_status may have a bug.

wal_status is calculated in GetWALAvailability(), and probably I found
some issues in it.

keepSegs = ConvertToXSegs(Max(max_wal_size_mb, wal_keep_segments),
wal_segment_size) +
1;

max_wal_size_mb is the number of megabytes. wal_keep_segments is
the number of WAL segment files. So it's strange to calculate max of
them.

Oops! I don't want to believe I did that but it's definitely wrong.

The above should be the following?

Max(ConvertToXSegs(max_wal_size_mb, wal_segment_size),
wal_keep_segments) + 1

Looks reasonable.

if ((max_slot_wal_keep_size_mb <= 0 ||
max_slot_wal_keep_size_mb >= max_wal_size_mb) &&
oldestSegMaxWalSize <= targetSeg)
return WALAVAIL_NORMAL;

This code means that wal_status reports "normal" only when
max_slot_wal_keep_size is negative or larger than max_wal_size.
Why is this condition necessary? The document explains "normal
means that the claimed files are within max_wal_size". So whatever
max_slot_wal_keep_size value is, IMO that "normal" should be
reported if the WAL files claimed by the slot are within max_wal_size.
Thought?

It was a kind of hard to decide. Even when max_slot_wal_keep_size is
smaller than max_wal_size, the segments more than
max_slot_wal_keep_size are not guaranteed to be kept. In that case
the state transits as NORMAL->LOST skipping the "RESERVED" state.
Putting aside whether the setting is useful or not, I thought that the
state transition is somewhat abrupt.

IMO the direct transition of the state from normal to lost is ok to me
if each state is clearly defined.

Or, if that condition is really necessary, the document should be
updated so that the note about the condition is added.

Does the following make sense?

https://www.postgresql.org/docs/13/view-pg-replication-slots.html

normal means that the claimed files are within max_wal_size.
+ If max_slot_wal_keep_size is smaller than max_wal_size, this state
+ will not appear.

I don't think this change is enough. For example, when max_slot_wal_keep_size
is smaller than max_wal_size and the amount of WAL files claimed by the slot
is smaller thhan max_slot_wal_keep_size, "reserved" is reported. But which is
inconsistent with the meaning of "reserved" in the docs.

To consider what should be reported in wal_status, could you tell me what
purpose and how the users is expected to use this information?

If the WAL files claimed by the slot exceeds max_slot_wal_keep_size
but any those WAL files have not been removed yet, wal_status seems
to report "lost". Is this expected behavior? Per the meaning of "lost"
described in the document, "lost" should be reported only when
any claimed files are removed, I think. Thought?

Or this behavior is expected and the document is incorrect?

In short, it is known behavior but it was judged as useless to prevent
that.

That can happen when checkpointer removes up to the segment that is
being read by walsender. I think that that doesn't happen (or
happenswithin a narrow time window?) for physical replication but
happenes for logical replication.

While development, I once added walsender a code to exit for that
reason, but finally it is moved to InvalidateObsoleteReplicationSlots
as a bit defferent function. With the current mechanism, there's a
case where once invalidated slot came to revive but we decided to
allow that behavior, but forgot to document that.

Anyway if you see "lost", something bad is being happening.

- lost means that some WAL files are definitely lost and this slot
- cannot be used to resume replication anymore.
+ lost means that some required WAL files are removed and this slot is
+ no longer usable after once disconnected during this status.

If it is crucial that the "lost" state may come back to reserved or
normal state,

+ Note that there are cases where the state moves back to reserved or
+ normal state when all wal senders have left the just removed segment
+ before being terminated.

There is a case where the state moves back to reserved or normal state when wal senders leaves the just removed segment before being terminated.

Even if walsender is terminated during the state "lost", unless checkpointer
removes the required WAL files, the state can go back to "reserved" after
new replication connection is established. This is the same as what you're
explaining at the above?

Regards,

--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION

#5Fujii Masao
masao.fujii@oss.nttdata.com
In reply to: Kyotaro Horiguchi (#2)
1 attachment(s)
Re: Review for GetWALAvailability()

On 2020/06/15 13:42, Kyotaro Horiguchi wrote:

At Sat, 13 Jun 2020 01:38:49 +0900, Fujii Masao <masao.fujii@oss.nttdata.com> wrote in

Hi,

The document explains that "lost" value that
pg_replication_slots.wal_status reports means

some WAL files are definitely lost and this slot cannot be used to
resume replication anymore.

However, I observed "lost" value while inserting lots of records,
but replication could continue normally. So I wonder if
pg_replication_slots.wal_status may have a bug.

wal_status is calculated in GetWALAvailability(), and probably I found
some issues in it.

keepSegs = ConvertToXSegs(Max(max_wal_size_mb, wal_keep_segments),
wal_segment_size) +
1;

max_wal_size_mb is the number of megabytes. wal_keep_segments is
the number of WAL segment files. So it's strange to calculate max of
them.

Oops! I don't want to believe I did that but it's definitely wrong.

The above should be the following?

Max(ConvertToXSegs(max_wal_size_mb, wal_segment_size),
wal_keep_segments) + 1

Looks reasonable.

if ((max_slot_wal_keep_size_mb <= 0 ||
max_slot_wal_keep_size_mb >= max_wal_size_mb) &&
oldestSegMaxWalSize <= targetSeg)
return WALAVAIL_NORMAL;

This code means that wal_status reports "normal" only when
max_slot_wal_keep_size is negative or larger than max_wal_size.
Why is this condition necessary? The document explains "normal
means that the claimed files are within max_wal_size". So whatever
max_slot_wal_keep_size value is, IMO that "normal" should be
reported if the WAL files claimed by the slot are within max_wal_size.
Thought?

It was a kind of hard to decide. Even when max_slot_wal_keep_size is
smaller than max_wal_size, the segments more than
max_slot_wal_keep_size are not guaranteed to be kept. In that case
the state transits as NORMAL->LOST skipping the "RESERVED" state.
Putting aside whether the setting is useful or not, I thought that the
state transition is somewhat abrupt.

Or, if that condition is really necessary, the document should be
updated so that the note about the condition is added.

Does the following make sense?

https://www.postgresql.org/docs/13/view-pg-replication-slots.html

normal means that the claimed files are within max_wal_size.
+ If max_slot_wal_keep_size is smaller than max_wal_size, this state
+ will not appear.

If the WAL files claimed by the slot exceeds max_slot_wal_keep_size
but any those WAL files have not been removed yet, wal_status seems
to report "lost". Is this expected behavior? Per the meaning of "lost"
described in the document, "lost" should be reported only when
any claimed files are removed, I think. Thought?

Or this behavior is expected and the document is incorrect?

In short, it is known behavior but it was judged as useless to prevent
that.

That can happen when checkpointer removes up to the segment that is
being read by walsender. I think that that doesn't happen (or
happenswithin a narrow time window?) for physical replication but
happenes for logical replication.

While development, I once added walsender a code to exit for that
reason, but finally it is moved to InvalidateObsoleteReplicationSlots
as a bit defferent function.

BTW, I read the code of InvalidateObsoleteReplicationSlots() and probably
found some issues in it.

1. Each cycle of the "for" loop in InvalidateObsoleteReplicationSlots()
emits the log message "terminating walsender ...". This means that
if it takes more than 10ms for walsender to exit after it's signaled,
the second and subsequent cycles would happen and output the same
log message several times. IMO that log message should be output
only once.

2. InvalidateObsoleteReplicationSlots() uses the loop to scan replication
slots array and uses the "for" loop in each scan. Also it calls
ReplicationSlotAcquire() for each "for" loop cycle, and
ReplicationSlotAcquire() uses another loop to scan replication slots
array. I don't think this is good design.

ISTM that we can get rid of ReplicationSlotAcquire()'s loop because
InvalidateObsoleteReplicationSlots() already know the index of the slot
that we want to find. The attached patch does that. Thought?

3. There is a corner case where the termination of walsender cleans up
the temporary replication slot while InvalidateObsoleteReplicationSlots()
is sleeping on ConditionVariableTimedSleep(). In this case,
ReplicationSlotAcquire() is called in the subsequent cycle of the "for"
loop, cannot find the slot and then emits ERROR message. This leads
to the failure of checkpoint by the checkpointer.

To avoid this case, if SAB_Inquire is specified, ReplicationSlotAcquire()
should return the special value instead of emitting ERROR even when
it cannot find the slot. Also InvalidateObsoleteReplicationSlots() should
handle that special returned value.

Regards,

--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION

Attachments:

replication_slot_acquire.patchtext/plain; charset=UTF-8; name=replication_slot_acquire.patch; x-mac-creator=0; x-mac-type=0Download
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 505445f2dc..72aa5de60c 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -99,6 +99,8 @@ ReplicationSlot *MyReplicationSlot = NULL;
 int			max_replication_slots = 0;	/* the maximum number of replication
 										 * slots */
 
+static int ReplicationSlotAcquireInternal(const char *name,
+										  SlotAcquireBehavior behavior, int index);
 static void ReplicationSlotDropAcquired(void);
 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
 
@@ -332,25 +334,45 @@ ReplicationSlotCreate(const char *name, bool db_specific,
  */
 int
 ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior)
+{
+	int			i;
+
+	/*
+	 * Search for the named slot and mark it active if we find it.
+	 */
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	for (i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+		if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
+			break;
+	}
+	LWLockRelease(ReplicationSlotControlLock);
+
+	return ReplicationSlotAcquireInternal(name, behavior, i);
+}
+
+static int
+ReplicationSlotAcquireInternal(const char *name,
+							   SlotAcquireBehavior behavior, int index)
 {
 	ReplicationSlot *slot;
 	int			active_pid;
-	int			i;
 
 retry:
 	Assert(MyReplicationSlot == NULL);
 
 	/*
-	 * Search for the named slot and mark it active if we find it.  If the
-	 * slot is already active, we exit the loop with active_pid set to the PID
+	 * If the slot is already active, we set active_pid to the PID
 	 * of the backend that owns it.
 	 */
 	active_pid = 0;
 	slot = NULL;
 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
-	for (i = 0; i < max_replication_slots; i++)
+	if (index >= 0 && index < max_replication_slots)
 	{
-		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[index];
 
 		if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
 		{
@@ -378,8 +400,6 @@ retry:
 			else
 				active_pid = MyProcPid;
 			slot = s;
-
-			break;
 		}
 	}
 	LWLockRelease(ReplicationSlotControlLock);
@@ -1120,8 +1140,9 @@ restart:
 
 		for (;;)
 		{
-			int			wspid = ReplicationSlotAcquire(NameStr(slotname),
-													   SAB_Inquire);
+			int			wspid =
+				ReplicationSlotAcquireInternal(NameStr(slotname),
+											   SAB_Inquire, i);
 
 			/* no walsender? success! */
 			if (wspid == 0)
#6Kyotaro Horiguchi
horikyota.ntt@gmail.com
In reply to: Fujii Masao (#4)
Re: Review for GetWALAvailability()

At Mon, 15 Jun 2020 18:59:49 +0900, Fujii Masao <masao.fujii@oss.nttdata.com> wrote in

It was a kind of hard to decide. Even when max_slot_wal_keep_size is
smaller than max_wal_size, the segments more than
max_slot_wal_keep_size are not guaranteed to be kept. In that case
the state transits as NORMAL->LOST skipping the "RESERVED" state.
Putting aside whether the setting is useful or not, I thought that the
state transition is somewhat abrupt.

IMO the direct transition of the state from normal to lost is ok to me
if each state is clearly defined.

Or, if that condition is really necessary, the document should be
updated so that the note about the condition is added.

Does the following make sense?
https://www.postgresql.org/docs/13/view-pg-replication-slots.html
normal means that the claimed files are within max_wal_size.
+ If max_slot_wal_keep_size is smaller than max_wal_size, this state
+ will not appear.

I don't think this change is enough. For example, when
max_slot_wal_keep_size
is smaller than max_wal_size and the amount of WAL files claimed by
the slot
is smaller thhan max_slot_wal_keep_size, "reserved" is reported. But
which is
inconsistent with the meaning of "reserved" in the docs.

You're right.

To consider what should be reported in wal_status, could you tell me
what
purpose and how the users is expected to use this information?

I saw that the "reserved" is the state where slots are working to
retain segments, and "normal" is the state to indicate that "WAL
segments are within max_wal_size", which is orthogonal to the notion
of "reserved". So it seems to me useless when the retained WAL
segments cannot exceeds max_wal_size.

With longer description they would be:

"reserved under max_wal_size"
"reserved over max_wal_size"
"lost some segements"

Come to think of that, I realized that my trouble was just the
wording. Are the following wordings make sense to you?

"reserved" - retained within max_wal_size
"extended" - retained over max_wal_size
"lost" - lost some segments

With these wordings I can live with "not extended"=>"lost". Of course
more appropriate wording are welcome.

Even if walsender is terminated during the state "lost", unless
checkpointer
removes the required WAL files, the state can go back to "reserved"
after
new replication connection is established. This is the same as what
you're
explaining at the above?

GetWALAvailability checks restart_lsn against lastRemovedSegNo, thus
the "lost" cannot be seen unless checkpointer actually have removed
the segment at restart_lsn (and restart_lsn has not been invalidated).
However, walsenders are killed before that segments are actually
removed so there're cases where physical walreceiver reconnects before
RemoveOldXloFiles removes all segments, then removed after
reconnection. "lost" can go back to "resrved" in that case. (Physical
walreceiver can connect to invalid-restart_lsn slot)

I noticed the another issue. If some required WALs are removed, the
slot will be "invalidated", that is, restart_lsn is set to invalid
value. As the result we hardly see the "lost" state.

It can be "fixed" by remembering the validity of a slot separately
from restart_lsn. Is that worth doing?

regards.

--
Kyotaro Horiguchi
NTT Open Source Software Center

#7Kyotaro Horiguchi
horikyota.ntt@gmail.com
In reply to: Fujii Masao (#5)
Re: Review for GetWALAvailability()

At Tue, 16 Jun 2020 01:46:21 +0900, Fujii Masao <masao.fujii@oss.nttdata.com> wrote in

In short, it is known behavior but it was judged as useless to prevent
that.
That can happen when checkpointer removes up to the segment that is
being read by walsender. I think that that doesn't happen (or
happenswithin a narrow time window?) for physical replication but
happenes for logical replication.
While development, I once added walsender a code to exit for that
reason, but finally it is moved to InvalidateObsoleteReplicationSlots
as a bit defferent function.

BTW, I read the code of InvalidateObsoleteReplicationSlots() and
probably
found some issues in it.

1. Each cycle of the "for" loop in
InvalidateObsoleteReplicationSlots()
emits the log message "terminating walsender ...". This means that
if it takes more than 10ms for walsender to exit after it's signaled,
the second and subsequent cycles would happen and output the same
log message several times. IMO that log message should be output
only once.

Sounds reasonable.

2. InvalidateObsoleteReplicationSlots() uses the loop to scan
replication
slots array and uses the "for" loop in each scan. Also it calls
ReplicationSlotAcquire() for each "for" loop cycle, and
ReplicationSlotAcquire() uses another loop to scan replication slots
array. I don't think this is good design.

ISTM that we can get rid of ReplicationSlotAcquire()'s loop because
InvalidateObsoleteReplicationSlots() already know the index of the
slot
that we want to find. The attached patch does that. Thought?

The inner loop is expected to run at most several times per
checkpoint, which won't be a serious problem. However, it is better if
we can get rid of that in a reasonable way.

The attached patch changes the behavior for SAB_Block. Before the
patch, it rescans from the first slot for the same name, but with the
patch it just rechecks the same slot. The only caller of the function
with SAB_Block is ReplicationSlotDrop and I don't come up with a case
where another slot with the same name is created at different place
before the condition variable fires. But I'm not sure the change is
completely safe. Maybe some assertion is needed?

3. There is a corner case where the termination of walsender cleans up
the temporary replication slot while
InvalidateObsoleteReplicationSlots()
is sleeping on ConditionVariableTimedSleep(). In this case,
ReplicationSlotAcquire() is called in the subsequent cycle of the
"for"
loop, cannot find the slot and then emits ERROR message. This leads
to the failure of checkpoint by the checkpointer.

Agreed.

To avoid this case, if SAB_Inquire is specified,
ReplicationSlotAcquire()
should return the special value instead of emitting ERROR even when
it cannot find the slot. Also InvalidateObsoleteReplicationSlots()
should
handle that special returned value.

I thought the same thing hearing that.

regards.

--
Kyotaro Horiguchi
NTT Open Source Software Center

#8Fujii Masao
masao.fujii@oss.nttdata.com
In reply to: Kyotaro Horiguchi (#7)
1 attachment(s)
Re: Review for GetWALAvailability()

On 2020/06/16 14:00, Kyotaro Horiguchi wrote:

At Tue, 16 Jun 2020 01:46:21 +0900, Fujii Masao <masao.fujii@oss.nttdata.com> wrote in

In short, it is known behavior but it was judged as useless to prevent
that.
That can happen when checkpointer removes up to the segment that is
being read by walsender. I think that that doesn't happen (or
happenswithin a narrow time window?) for physical replication but
happenes for logical replication.
While development, I once added walsender a code to exit for that
reason, but finally it is moved to InvalidateObsoleteReplicationSlots
as a bit defferent function.

BTW, I read the code of InvalidateObsoleteReplicationSlots() and
probably
found some issues in it.

1. Each cycle of the "for" loop in
InvalidateObsoleteReplicationSlots()
emits the log message "terminating walsender ...". This means that
if it takes more than 10ms for walsender to exit after it's signaled,
the second and subsequent cycles would happen and output the same
log message several times. IMO that log message should be output
only once.

Sounds reasonable.

2. InvalidateObsoleteReplicationSlots() uses the loop to scan
replication
slots array and uses the "for" loop in each scan. Also it calls
ReplicationSlotAcquire() for each "for" loop cycle, and
ReplicationSlotAcquire() uses another loop to scan replication slots
array. I don't think this is good design.

ISTM that we can get rid of ReplicationSlotAcquire()'s loop because
InvalidateObsoleteReplicationSlots() already know the index of the
slot
that we want to find. The attached patch does that. Thought?

The inner loop is expected to run at most several times per
checkpoint, which won't be a serious problem. However, it is better if
we can get rid of that in a reasonable way.

The attached patch changes the behavior for SAB_Block. Before the
patch, it rescans from the first slot for the same name, but with the
patch it just rechecks the same slot. The only caller of the function
with SAB_Block is ReplicationSlotDrop and I don't come up with a case
where another slot with the same name is created at different place
before the condition variable fires. But I'm not sure the change is
completely safe.

Yes, that change might not be safe. So I'm thinking another approach to
fix the issues.

Maybe some assertion is needed?

3. There is a corner case where the termination of walsender cleans up
the temporary replication slot while
InvalidateObsoleteReplicationSlots()
is sleeping on ConditionVariableTimedSleep(). In this case,
ReplicationSlotAcquire() is called in the subsequent cycle of the
"for"
loop, cannot find the slot and then emits ERROR message. This leads
to the failure of checkpoint by the checkpointer.

Agreed.

To avoid this case, if SAB_Inquire is specified,
ReplicationSlotAcquire()
should return the special value instead of emitting ERROR even when
it cannot find the slot. Also InvalidateObsoleteReplicationSlots()
should
handle that special returned value.

I thought the same thing hearing that.

While reading InvalidateObsoleteReplicationSlots() code, I found another issue.

ereport(LOG,
(errmsg("terminating walsender %d because replication slot \"%s\" is too far behind",
wspid, NameStr(slotname))));
(void) kill(wspid, SIGTERM);

wspid indicates the PID of process using the slot. That process can be
a backend, for example, executing pg_replication_slot_advance().
So "walsender" in the above log message is not always correct.

int wspid = ReplicationSlotAcquire(NameStr(slotname),
SAB_Inquire);

Why do we need to call ReplicationSlotAcquire() here and mark the slot as
used by the checkpointer? Isn't it enough to check directly the slot's
active_pid, instead?

Maybe ReplicationSlotAcquire() is necessary because
ReplicationSlotRelease() is called later? If so, why do we need to call
ReplicationSlotRelease()? ISTM that we don't need to do that if the slot's
active_pid is zero. No?

If my understanding is right, I'd like to propose the attached patch.
It introduces DeactivateReplicationSlot() and replace the "for" loop
in InvalidateObsoleteReplicationSlots() with it. ReplicationSlotAcquire()
and ReplicationSlotRelease() are no longer called there.

Regards,

--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION

Attachments:

invalidate_obsolete_replication_slots.patchtext/plain; charset=UTF-8; name=invalidate_obsolete_replication_slots.patch; x-mac-creator=0; x-mac-type=0Download
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 505445f2dc..b89b6da768 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -99,6 +99,8 @@ ReplicationSlot *MyReplicationSlot = NULL;
 int			max_replication_slots = 0;	/* the maximum number of replication
 										 * slots */
 
+static bool ReplicationSlotIsActive(ReplicationSlot *slot, int *active_pid);
+static void DeactivateReplicationSlot(ReplicationSlot *slot);
 static void ReplicationSlotDropAcquired(void);
 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
 
@@ -1080,6 +1082,61 @@ ReplicationSlotReserveWal(void)
 	}
 }
 
+/*
+ * Is the specified replication slot currently actively being used?
+ *
+ * Set *active_pid to the PID of the process using this slot if active.
+ */
+static bool
+ReplicationSlotIsActive(ReplicationSlot *slot, int *active_pid)
+{
+	int		pid;
+
+	SpinLockAcquire(&slot->mutex);
+	pid = slot->active_pid;
+	SpinLockRelease(&slot->mutex);
+
+	if (active_pid != NULL)
+		*active_pid = pid;
+
+	return (pid != 0);
+}
+
+/*
+ * Deactivate the specified replication slot.
+ *
+ * Terminate the process using this slot if active.
+ */
+static void
+DeactivateReplicationSlot(ReplicationSlot *slot)
+{
+	int		active_pid;
+	bool	killed = false;
+
+	/* Quick exit if already inactive */
+	if (!ReplicationSlotIsActive(slot, &active_pid))
+		return;
+
+	ereport(LOG,
+			(errmsg("terminating the process %d using replication slot \"%s\"",
+					active_pid, NameStr(slot->data.name))));
+
+	ConditionVariablePrepareToSleep(&slot->active_cv);
+	do
+	{
+		/*
+		 * Signal to terminate the process using the replication slot.
+		 *
+		 * Try to signal every 100ms until it succeeds.
+		 */
+		if (!killed && kill(active_pid, SIGTERM) == 0)
+			killed = true;
+		ConditionVariableTimedSleep(&slot->active_cv, 100,
+									WAIT_EVENT_REPLICATION_SLOT_DROP);
+	} while (ReplicationSlotIsActive(slot, NULL));
+	ConditionVariableCancelSleep();
+}
+
 /*
  * Mark any slot that points to an LSN older than the given segment
  * as invalid; it requires WAL that's about to be removed.
@@ -1105,37 +1162,18 @@ restart:
 			continue;
 
 		SpinLockAcquire(&s->mutex);
-		if (s->data.restart_lsn == InvalidXLogRecPtr ||
-			s->data.restart_lsn >= oldestLSN)
-		{
-			SpinLockRelease(&s->mutex);
-			continue;
-		}
-
 		slotname = s->data.name;
 		restart_lsn = s->data.restart_lsn;
-
 		SpinLockRelease(&s->mutex);
+
+		if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
+			continue;
 		LWLockRelease(ReplicationSlotControlLock);
 
-		for (;;)
-		{
-			int			wspid = ReplicationSlotAcquire(NameStr(slotname),
-													   SAB_Inquire);
-
-			/* no walsender? success! */
-			if (wspid == 0)
-				break;
-
-			ereport(LOG,
-					(errmsg("terminating walsender %d because replication slot \"%s\" is too far behind",
-							wspid, NameStr(slotname))));
-			(void) kill(wspid, SIGTERM);
-
-			ConditionVariableTimedSleep(&s->active_cv, 10,
-										WAIT_EVENT_REPLICATION_SLOT_DROP);
-		}
-		ConditionVariableCancelSleep();
+		DeactivateReplicationSlot(s);
+		SpinLockAcquire(&s->mutex);
+		s->data.restart_lsn = InvalidXLogRecPtr;
+		SpinLockRelease(&s->mutex);
 
 		ereport(LOG,
 				(errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
@@ -1143,11 +1181,6 @@ restart:
 						(uint32) (restart_lsn >> 32),
 						(uint32) restart_lsn)));
 
-		SpinLockAcquire(&s->mutex);
-		s->data.restart_lsn = InvalidXLogRecPtr;
-		SpinLockRelease(&s->mutex);
-		ReplicationSlotRelease();
-
 		/* if we did anything, start from scratch */
 		CHECK_FOR_INTERRUPTS();
 		goto restart;
#9Alvaro Herrera
alvherre@2ndquadrant.com
In reply to: Kyotaro Horiguchi (#6)
Re: Review for GetWALAvailability()

On 2020-Jun-16, Kyotaro Horiguchi wrote:

I noticed the another issue. If some required WALs are removed, the
slot will be "invalidated", that is, restart_lsn is set to invalid
value. As the result we hardly see the "lost" state.

It can be "fixed" by remembering the validity of a slot separately
from restart_lsn. Is that worth doing?

We discussed this before. I agree it would be better to do this
in some way, but I fear that if we do it naively, some code might exist
that reads the LSN without realizing that it needs to check the validity
flag first.

On the other hand, maybe this is not a problem in practice, because if
such a bug occurs, what will happen is that trying to read WAL from such
a slot will return the error message that the WAL file cannot be found.
Maybe this is acceptable?

--
�lvaro Herrera https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

#10Alvaro Herrera
alvherre@2ndquadrant.com
In reply to: Fujii Masao (#8)
Re: Review for GetWALAvailability()

On 2020-Jun-17, Fujii Masao wrote:

While reading InvalidateObsoleteReplicationSlots() code, I found another issue.

ereport(LOG,
(errmsg("terminating walsender %d because replication slot \"%s\" is too far behind",
wspid, NameStr(slotname))));
(void) kill(wspid, SIGTERM);

wspid indicates the PID of process using the slot. That process can be
a backend, for example, executing pg_replication_slot_advance().
So "walsender" in the above log message is not always correct.

Good point.

int wspid = ReplicationSlotAcquire(NameStr(slotname),
SAB_Inquire);

Why do we need to call ReplicationSlotAcquire() here and mark the slot as
used by the checkpointer? Isn't it enough to check directly the slot's
active_pid, instead?

Maybe ReplicationSlotAcquire() is necessary because
ReplicationSlotRelease() is called later? If so, why do we need to call
ReplicationSlotRelease()? ISTM that we don't need to do that if the slot's
active_pid is zero. No?

I think the point here was that in order to modify the slot you have to
acquire it -- it's not valid to modify a slot you don't own.

+		/*
+		 * Signal to terminate the process using the replication slot.
+		 *
+		 * Try to signal every 100ms until it succeeds.
+		 */
+		if (!killed && kill(active_pid, SIGTERM) == 0)
+			killed = true;
+		ConditionVariableTimedSleep(&slot->active_cv, 100,
+									WAIT_EVENT_REPLICATION_SLOT_DROP);
+	} while (ReplicationSlotIsActive(slot, NULL));

Note that here you're signalling only once and then sleeping many times
in increments of 100ms -- you're not signalling every 100ms as the
comment claims -- unless the signal fails, but you don't really expect
that. On the contrary, I'd claim that the logic is reversed: if the
signal fails, *then* you should stop signalling.

--
�lvaro Herrera https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

#11Kyotaro Horiguchi
horikyota.ntt@gmail.com
In reply to: Fujii Masao (#8)
Re: Review for GetWALAvailability()

At Wed, 17 Jun 2020 00:46:38 +0900, Fujii Masao <masao.fujii@oss.nttdata.com> wrote in

2. InvalidateObsoleteReplicationSlots() uses the loop to scan
replication
slots array and uses the "for" loop in each scan. Also it calls
ReplicationSlotAcquire() for each "for" loop cycle, and
ReplicationSlotAcquire() uses another loop to scan replication slots
array. I don't think this is good design.

ISTM that we can get rid of ReplicationSlotAcquire()'s loop because
InvalidateObsoleteReplicationSlots() already know the index of the
slot
that we want to find. The attached patch does that. Thought?

The inner loop is expected to run at most several times per
checkpoint, which won't be a serious problem. However, it is better if
we can get rid of that in a reasonable way.
The attached patch changes the behavior for SAB_Block. Before the
patch, it rescans from the first slot for the same name, but with the
patch it just rechecks the same slot. The only caller of the function
with SAB_Block is ReplicationSlotDrop and I don't come up with a case
where another slot with the same name is created at different place
before the condition variable fires. But I'm not sure the change is
completely safe.

Yes, that change might not be safe. So I'm thinking another approach
to
fix the issues.

Maybe some assertion is needed?

3. There is a corner case where the termination of walsender cleans up
the temporary replication slot while
InvalidateObsoleteReplicationSlots()
is sleeping on ConditionVariableTimedSleep(). In this case,
ReplicationSlotAcquire() is called in the subsequent cycle of the
"for"
loop, cannot find the slot and then emits ERROR message. This leads
to the failure of checkpoint by the checkpointer.

Agreed.

To avoid this case, if SAB_Inquire is specified,
ReplicationSlotAcquire()
should return the special value instead of emitting ERROR even when
it cannot find the slot. Also InvalidateObsoleteReplicationSlots()
should
handle that special returned value.

I thought the same thing hearing that.

While reading InvalidateObsoleteReplicationSlots() code, I found
another issue.

ereport(LOG,
(errmsg("terminating walsender %d
because replication slot \"%s\" is too
far behind",
wspid,
NameStr(slotname))));
(void) kill(wspid, SIGTERM);

wspid indicates the PID of process using the slot. That process can be
a backend, for example, executing pg_replication_slot_advance().
So "walsender" in the above log message is not always correct.

Agreed.

int wspid = ReplicationSlotAcquire(NameStr(slotname),
SAB_Inquire);

Why do we need to call ReplicationSlotAcquire() here and mark the slot
as
used by the checkpointer? Isn't it enough to check directly the slot's
active_pid, instead?
Maybe ReplicationSlotAcquire() is necessary because
ReplicationSlotRelease() is called later? If so, why do we need to
call
ReplicationSlotRelease()? ISTM that we don't need to do that if the
slot's
active_pid is zero. No?

My understanding of the reason is that we update a slot value here.
The restriction allows the owner of a slot to assume that all the slot
values don't voluntarily change.

slot.h:104
| * - Individual fields are protected by mutex where only the backend owning
| * the slot is authorized to update the fields from its own slot. The
| * backend owning the slot does not need to take this lock when reading its
| * own fields, while concurrent backends not owning this slot should take the
| * lock when reading this slot's data.

If my understanding is right, I'd like to propose the attached patch.
It introduces DeactivateReplicationSlot() and replace the "for" loop
in InvalidateObsoleteReplicationSlots() with
it. ReplicationSlotAcquire()
and ReplicationSlotRelease() are no longer called there.

regards.

--
Kyotaro Horiguchi
NTT Open Source Software Center

#12Kyotaro Horiguchi
horikyota.ntt@gmail.com
In reply to: Alvaro Herrera (#9)
Re: Review for GetWALAvailability()

At Tue, 16 Jun 2020 14:31:43 -0400, Alvaro Herrera <alvherre@2ndquadrant.com> wrote in

On 2020-Jun-16, Kyotaro Horiguchi wrote:

I noticed the another issue. If some required WALs are removed, the
slot will be "invalidated", that is, restart_lsn is set to invalid
value. As the result we hardly see the "lost" state.

It can be "fixed" by remembering the validity of a slot separately
from restart_lsn. Is that worth doing?

We discussed this before. I agree it would be better to do this
in some way, but I fear that if we do it naively, some code might exist
that reads the LSN without realizing that it needs to check the validity
flag first.

Yes, that was my main concern on it. That's error-prone. How about
remembering the LSN where invalidation happened? It's safe since no
others than slot-monitoring functions would look
last_invalidated_lsn. It can be reset if active_pid is a valid pid.

InvalidateObsoleteReplicationSlots:
...
SpinLockAcquire(&s->mutex);
+ s->data.last_invalidated_lsn = s->data.restart_lsn;
s->data.restart_lsn = InvalidXLogRecPtr;
SpinLockRelease(&s->mutex);

On the other hand, maybe this is not a problem in practice, because if
such a bug occurs, what will happen is that trying to read WAL from such
a slot will return the error message that the WAL file cannot be found.
Maybe this is acceptable?

I'm not sure. For my part a problem of that would we need to look
into server logs to know what is acutally going on.

regards.

--
Kyotaro Horiguchi
NTT Open Source Software Center

#13Fujii Masao
masao.fujii@oss.nttdata.com
In reply to: Alvaro Herrera (#10)
Re: Review for GetWALAvailability()

On 2020/06/17 3:50, Alvaro Herrera wrote:

On 2020-Jun-17, Fujii Masao wrote:

While reading InvalidateObsoleteReplicationSlots() code, I found another issue.

ereport(LOG,
(errmsg("terminating walsender %d because replication slot \"%s\" is too far behind",
wspid, NameStr(slotname))));
(void) kill(wspid, SIGTERM);

wspid indicates the PID of process using the slot. That process can be
a backend, for example, executing pg_replication_slot_advance().
So "walsender" in the above log message is not always correct.

Good point.

So InvalidateObsoleteReplicationSlots() can terminate normal backends.
But do we want to do this? If we want, we should add the note about this
case into the docs? Otherwise the users would be surprised at termination
of backends by max_slot_wal_keep_size. I guess that it's basically rarely
happen, though.

int wspid = ReplicationSlotAcquire(NameStr(slotname),
SAB_Inquire);

Why do we need to call ReplicationSlotAcquire() here and mark the slot as
used by the checkpointer? Isn't it enough to check directly the slot's
active_pid, instead?

Maybe ReplicationSlotAcquire() is necessary because
ReplicationSlotRelease() is called later? If so, why do we need to call
ReplicationSlotRelease()? ISTM that we don't need to do that if the slot's
active_pid is zero. No?

I think the point here was that in order to modify the slot you have to
acquire it -- it's not valid to modify a slot you don't own.

Understood. Thanks!

+		/*
+		 * Signal to terminate the process using the replication slot.
+		 *
+		 * Try to signal every 100ms until it succeeds.
+		 */
+		if (!killed && kill(active_pid, SIGTERM) == 0)
+			killed = true;
+		ConditionVariableTimedSleep(&slot->active_cv, 100,
+									WAIT_EVENT_REPLICATION_SLOT_DROP);
+	} while (ReplicationSlotIsActive(slot, NULL));

Note that here you're signalling only once and then sleeping many times
in increments of 100ms -- you're not signalling every 100ms as the
comment claims -- unless the signal fails, but you don't really expect
that. On the contrary, I'd claim that the logic is reversed: if the
signal fails, *then* you should stop signalling.

You mean; in this code path, signaling fails only when the target process
disappears just before signaling. So if it fails, slot->active_pid is
expected to become 0 even without signaling more. Right?

Regards,

--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION

#14Alvaro Herrera
alvherre@2ndquadrant.com
In reply to: Fujii Masao (#13)
Re: Review for GetWALAvailability()

On 2020-Jun-17, Fujii Masao wrote:

On 2020/06/17 3:50, Alvaro Herrera wrote:

So InvalidateObsoleteReplicationSlots() can terminate normal backends.
But do we want to do this? If we want, we should add the note about this
case into the docs? Otherwise the users would be surprised at termination
of backends by max_slot_wal_keep_size. I guess that it's basically rarely
happen, though.

Well, if we could distinguish a walsender from a non-walsender process,
then maybe it would make sense to leave backends alive. But do we want
that? I admit I don't know what would be the reason to have a
non-walsender process with an active slot, so I don't have a good
opinion on what to do in this case.

+		/*
+		 * Signal to terminate the process using the replication slot.
+		 *
+		 * Try to signal every 100ms until it succeeds.
+		 */
+		if (!killed && kill(active_pid, SIGTERM) == 0)
+			killed = true;
+		ConditionVariableTimedSleep(&slot->active_cv, 100,
+									WAIT_EVENT_REPLICATION_SLOT_DROP);
+	} while (ReplicationSlotIsActive(slot, NULL));

Note that here you're signalling only once and then sleeping many times
in increments of 100ms -- you're not signalling every 100ms as the
comment claims -- unless the signal fails, but you don't really expect
that. On the contrary, I'd claim that the logic is reversed: if the
signal fails, *then* you should stop signalling.

You mean; in this code path, signaling fails only when the target process
disappears just before signaling. So if it fails, slot->active_pid is
expected to become 0 even without signaling more. Right?

I guess kill() can also fail if the PID now belongs to a process owned
by a different user. I think we've disregarded very quick reuse of
PIDs, so we needn't concern ourselves with it.

--
�lvaro Herrera https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

#15Kyotaro Horiguchi
horikyota.ntt@gmail.com
In reply to: Alvaro Herrera (#14)
Re: Review for GetWALAvailability()

At Tue, 16 Jun 2020 22:40:56 -0400, Alvaro Herrera <alvherre@2ndquadrant.com> wrote in

On 2020-Jun-17, Fujii Masao wrote:

On 2020/06/17 3:50, Alvaro Herrera wrote:

So InvalidateObsoleteReplicationSlots() can terminate normal backends.
But do we want to do this? If we want, we should add the note about this
case into the docs? Otherwise the users would be surprised at termination
of backends by max_slot_wal_keep_size. I guess that it's basically rarely
happen, though.

Well, if we could distinguish a walsender from a non-walsender process,
then maybe it would make sense to leave backends alive. But do we want
that? I admit I don't know what would be the reason to have a
non-walsender process with an active slot, so I don't have a good
opinion on what to do in this case.

The non-walsender backend is actually doing replication work. It
rather should be killed?

+		/*
+		 * Signal to terminate the process using the replication slot.
+		 *
+		 * Try to signal every 100ms until it succeeds.
+		 */
+		if (!killed && kill(active_pid, SIGTERM) == 0)
+			killed = true;
+		ConditionVariableTimedSleep(&slot->active_cv, 100,
+									WAIT_EVENT_REPLICATION_SLOT_DROP);
+	} while (ReplicationSlotIsActive(slot, NULL));

Note that here you're signalling only once and then sleeping many times
in increments of 100ms -- you're not signalling every 100ms as the
comment claims -- unless the signal fails, but you don't really expect
that. On the contrary, I'd claim that the logic is reversed: if the
signal fails, *then* you should stop signalling.

You mean; in this code path, signaling fails only when the target process
disappears just before signaling. So if it fails, slot->active_pid is
expected to become 0 even without signaling more. Right?

I guess kill() can also fail if the PID now belongs to a process owned
by a different user. I think we've disregarded very quick reuse of
PIDs, so we needn't concern ourselves with it.

The first time call to ConditionVariableTimedSleep doen't actually
sleep, so the loop works as expected. But we may make an extra call
to kill(2). Calling ConditionVariablePrepareToSleep beforehand of the
loop would make it better.

regards.

--
Kyotaro Horiguchi
NTT Open Source Software Center

#16Kyotaro Horiguchi
horikyota.ntt@gmail.com
In reply to: Kyotaro Horiguchi (#12)
1 attachment(s)
Re: Review for GetWALAvailability()

At Wed, 17 Jun 2020 10:17:07 +0900 (JST), Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote in

At Tue, 16 Jun 2020 14:31:43 -0400, Alvaro Herrera <alvherre@2ndquadrant.com> wrote in

On 2020-Jun-16, Kyotaro Horiguchi wrote:

I noticed the another issue. If some required WALs are removed, the
slot will be "invalidated", that is, restart_lsn is set to invalid
value. As the result we hardly see the "lost" state.

It can be "fixed" by remembering the validity of a slot separately
from restart_lsn. Is that worth doing?

We discussed this before. I agree it would be better to do this
in some way, but I fear that if we do it naively, some code might exist
that reads the LSN without realizing that it needs to check the validity
flag first.

Yes, that was my main concern on it. That's error-prone. How about
remembering the LSN where invalidation happened? It's safe since no
others than slot-monitoring functions would look
last_invalidated_lsn. It can be reset if active_pid is a valid pid.

InvalidateObsoleteReplicationSlots:
...
SpinLockAcquire(&s->mutex);
+ s->data.last_invalidated_lsn = s->data.restart_lsn;
s->data.restart_lsn = InvalidXLogRecPtr;
SpinLockRelease(&s->mutex);

The attached does that (Poc). No document fix included.

regards.

--
Kyotaro Horiguchi
NTT Open Source Software Center

Attachments:

GetWalAvailability_change_statuses_fix_lost.patchtext/x-patch; charset=us-asciiDownload
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index d6fe205eb4..d3240d1e38 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -9485,20 +9485,25 @@ CreateRestartPoint(int flags)
  *		(typically a slot's restart_lsn)
  *
  * Returns one of the following enum values:
- * * WALAVAIL_NORMAL means targetLSN is available because it is in the range
- *   of max_wal_size.
  *
- * * WALAVAIL_PRESERVED means it is still available by preserving extra
+ * * WALAVAIL_RESERVED means targetLSN is available and it is in the range of
+ *   max_wal_size.
+ *
+ * * WALAVAIL_EXTENDED means it is still available by preserving extra
  *   segments beyond max_wal_size. If max_slot_wal_keep_size is smaller
  *   than max_wal_size, this state is not returned.
  *
+ * * WALAVAIL_BEING_REMOVED means it is being lost. The walsender using this
+ *   slot may return to the above.
+ *
  * * WALAVAIL_REMOVED means it is definitely lost. A replication stream on
  *   a slot with this LSN cannot continue.
  *
  * * WALAVAIL_INVALID_LSN means the slot hasn't been set to reserve WAL.
  */
 WALAvailability
-GetWALAvailability(XLogRecPtr targetLSN)
+GetWALAvailability(XLogRecPtr targetLSN, XLogSegNo last_removed_seg,
+				   bool slot_is_active)
 {
 	XLogRecPtr	currpos;		/* current write LSN */
 	XLogSegNo	currSeg;		/* segid of currpos */
@@ -9509,7 +9514,11 @@ GetWALAvailability(XLogRecPtr targetLSN)
 													 * slot */
 	uint64		keepSegs;
 
-	/* slot does not reserve WAL. Either deactivated, or has never been active */
+	/*
+	 * slot does not reserve WAL. Either deactivated, or has never been active
+	 * The caller should have passed last_invalidated_lsn as targetLSN if the
+	 * slot has been invalidated.
+	 */
 	if (XLogRecPtrIsInvalid(targetLSN))
 		return WALAVAIL_INVALID_LSN;
 
@@ -9524,7 +9533,7 @@ GetWALAvailability(XLogRecPtr targetLSN)
 	 * the first WAL segment file since startup, which causes the status being
 	 * wrong under certain abnormal conditions but that doesn't actually harm.
 	 */
-	oldestSeg = XLogGetLastRemovedSegno() + 1;
+	oldestSeg = last_removed_seg + 1;
 
 	/* calculate oldest segment by max_wal_size and wal_keep_segments */
 	XLByteToSeg(currpos, currSeg, wal_segment_size);
@@ -9544,20 +9553,21 @@ GetWALAvailability(XLogRecPtr targetLSN)
 	 */
 	if (targetSeg >= oldestSeg)
 	{
-		/*
-		 * show "normal" when targetSeg is within max_wal_size, even if
-		 * max_slot_wal_keep_size is smaller than max_wal_size.
-		 */
-		if ((max_slot_wal_keep_size_mb <= 0 ||
-			 max_slot_wal_keep_size_mb >= max_wal_size_mb) &&
-			oldestSegMaxWalSize <= targetSeg)
-			return WALAVAIL_NORMAL;
-
-		/* being retained by slots */
-		if (oldestSlotSeg <= targetSeg)
+		/* show "reserved" when targetSeg is within max_wal_size */
+		if (oldestSegMaxWalSize <= targetSeg)
 			return WALAVAIL_RESERVED;
+
+		/* being retained by slots exceeding max_wal_size */
+		return WALAVAIL_EXTENDED;
 	}
 
+	/*
+	 * However segments required by the slot has been lost, if walsender is
+	 * active the walsender can read into the first reserved slot.
+	 */
+	if (slot_is_active)
+		return WALAVAIL_BEING_REMOVED;
+
 	/* Definitely lost */
 	return WALAVAIL_REMOVED;
 }
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 505445f2dc..f141b29d28 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -285,6 +285,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	slot->candidate_xmin_lsn = InvalidXLogRecPtr;
 	slot->candidate_restart_valid = InvalidXLogRecPtr;
 	slot->candidate_restart_lsn = InvalidXLogRecPtr;
+	slot->last_invalidated_lsn = InvalidXLogRecPtr;
 
 	/*
 	 * Create the slot on disk.  We haven't actually marked the slot allocated
@@ -1144,6 +1145,7 @@ restart:
 						(uint32) restart_lsn)));
 
 		SpinLockAcquire(&s->mutex);
+		s->last_invalidated_lsn = s->data.restart_lsn;
 		s->data.restart_lsn = InvalidXLogRecPtr;
 		SpinLockRelease(&s->mutex);
 		ReplicationSlotRelease();
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 1b929a603e..ed0abe0c39 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -243,6 +243,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 	MemoryContext per_query_ctx;
 	MemoryContext oldcontext;
 	int			slotno;
+	XLogSegNo	last_removed_seg;
 
 	/* check to see if caller supports us returning a tuplestore */
 	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
@@ -272,6 +273,14 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 	rsinfo->setResult = tupstore;
 	rsinfo->setDesc = tupdesc;
 
+	/*
+	 * Remember the last removed segment at this point for the consistency in
+	 * this table. Since there's no interlock between slot data and
+	 * checkpointer, the segment can be removed in-between, but that doesn't
+	 * make any practical difference.
+	 */
+	last_removed_seg = XLogGetLastRemovedSegno();
+
 	MemoryContextSwitchTo(oldcontext);
 
 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
@@ -282,7 +291,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		Datum		values[PG_GET_REPLICATION_SLOTS_COLS];
 		bool		nulls[PG_GET_REPLICATION_SLOTS_COLS];
 		WALAvailability walstate;
-		XLogSegNo	last_removed_seg;
+		XLogRecPtr	targetLSN;
 		int			i;
 
 		if (!slot->in_use)
@@ -342,7 +351,14 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		else
 			nulls[i++] = true;
 
-		walstate = GetWALAvailability(slot_contents.data.restart_lsn);
+		/* use last_invalidated_lsn when the slot is invalidated */
+		if (XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
+			targetLSN = slot_contents.last_invalidated_lsn;
+		else
+			targetLSN = slot_contents.data.restart_lsn;
+
+		walstate = GetWALAvailability(targetLSN, last_removed_seg,
+									  slot_contents.active_pid != 0);
 
 		switch (walstate)
 		{
@@ -350,14 +366,18 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 				nulls[i++] = true;
 				break;
 
-			case WALAVAIL_NORMAL:
-				values[i++] = CStringGetTextDatum("normal");
-				break;
-
 			case WALAVAIL_RESERVED:
 				values[i++] = CStringGetTextDatum("reserved");
 				break;
 
+			case WALAVAIL_EXTENDED:
+				values[i++] = CStringGetTextDatum("extended");
+				break;
+
+			case WALAVAIL_BEING_REMOVED:
+				values[i++] = CStringGetTextDatum("being lost");
+				break;
+
 			case WALAVAIL_REMOVED:
 				values[i++] = CStringGetTextDatum("lost");
 				break;
@@ -367,8 +387,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		}
 
 		if (max_slot_wal_keep_size_mb >= 0 &&
-			(walstate == WALAVAIL_NORMAL || walstate == WALAVAIL_RESERVED) &&
-			((last_removed_seg = XLogGetLastRemovedSegno()) != 0))
+			(walstate == WALAVAIL_RESERVED || walstate == WALAVAIL_EXTENDED) &&
+			(last_removed_seg != 0))
 		{
 			XLogRecPtr	min_safe_lsn;
 
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index e917dfe92d..49d9578bc5 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -270,8 +270,9 @@ extern CheckpointStatsData CheckpointStats;
 typedef enum WALAvailability
 {
 	WALAVAIL_INVALID_LSN,		/* parameter error */
-	WALAVAIL_NORMAL,			/* WAL segment is within max_wal_size */
-	WALAVAIL_RESERVED,			/* WAL segment is reserved by a slot */
+	WALAVAIL_RESERVED,			/* WAL segment is within max_wal_size */
+	WALAVAIL_EXTENDED,			/* WAL segment is reserved by a slot */
+	WALAVAIL_BEING_REMOVED,		/* WAL segment is being removed */
 	WALAVAIL_REMOVED			/* WAL segment has been removed */
 } WALAvailability;
 
@@ -326,7 +327,9 @@ extern void ShutdownXLOG(int code, Datum arg);
 extern void InitXLOGAccess(void);
 extern void CreateCheckPoint(int flags);
 extern bool CreateRestartPoint(int flags);
-extern WALAvailability GetWALAvailability(XLogRecPtr restart_lsn);
+extern WALAvailability GetWALAvailability(XLogRecPtr targetLSN,
+										  XLogSegNo last_removed_seg,
+										  bool slot_is_active);
 extern XLogRecPtr CalculateMaxmumSafeLSN(void);
 extern void XLogPutNextOid(Oid nextOid);
 extern XLogRecPtr XLogRestorePoint(const char *rpName);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 917876010e..8090ca81fe 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -156,6 +156,9 @@ typedef struct ReplicationSlot
 	XLogRecPtr	candidate_xmin_lsn;
 	XLogRecPtr	candidate_restart_valid;
 	XLogRecPtr	candidate_restart_lsn;
+
+	/* restart_lsn is copied here when the slot is invalidated */
+	XLogRecPtr	last_invalidated_lsn;
 } ReplicationSlot;
 
 #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
#17Fujii Masao
masao.fujii@oss.nttdata.com
In reply to: Kyotaro Horiguchi (#15)
1 attachment(s)
Re: Review for GetWALAvailability()

On 2020/06/17 12:10, Kyotaro Horiguchi wrote:

At Tue, 16 Jun 2020 22:40:56 -0400, Alvaro Herrera <alvherre@2ndquadrant.com> wrote in

On 2020-Jun-17, Fujii Masao wrote:

On 2020/06/17 3:50, Alvaro Herrera wrote:

So InvalidateObsoleteReplicationSlots() can terminate normal backends.
But do we want to do this? If we want, we should add the note about this
case into the docs? Otherwise the users would be surprised at termination
of backends by max_slot_wal_keep_size. I guess that it's basically rarely
happen, though.

Well, if we could distinguish a walsender from a non-walsender process,
then maybe it would make sense to leave backends alive. But do we want
that? I admit I don't know what would be the reason to have a
non-walsender process with an active slot, so I don't have a good
opinion on what to do in this case.

The non-walsender backend is actually doing replication work. It
rather should be killed?

I have no better opinion about this. So I agree to leave the logic as it is
at least for now, i.e., we terminate the process owning the slot whatever
the type of process is.

+		/*
+		 * Signal to terminate the process using the replication slot.
+		 *
+		 * Try to signal every 100ms until it succeeds.
+		 */
+		if (!killed && kill(active_pid, SIGTERM) == 0)
+			killed = true;
+		ConditionVariableTimedSleep(&slot->active_cv, 100,
+									WAIT_EVENT_REPLICATION_SLOT_DROP);
+	} while (ReplicationSlotIsActive(slot, NULL));

Note that here you're signalling only once and then sleeping many times
in increments of 100ms -- you're not signalling every 100ms as the
comment claims -- unless the signal fails, but you don't really expect
that. On the contrary, I'd claim that the logic is reversed: if the
signal fails, *then* you should stop signalling.

You mean; in this code path, signaling fails only when the target process
disappears just before signaling. So if it fails, slot->active_pid is
expected to become 0 even without signaling more. Right?

I guess kill() can also fail if the PID now belongs to a process owned
by a different user.

Yes. This case means that the PostgreSQL process using the slot disappeared
and the same PID was assigned to non-PostgreSQL process. So if kill() fails
for this reason, we don't need to kill() again.

I think we've disregarded very quick reuse of

PIDs, so we needn't concern ourselves with it.

The first time call to ConditionVariableTimedSleep doen't actually
sleep, so the loop works as expected. But we may make an extra call
to kill(2). Calling ConditionVariablePrepareToSleep beforehand of the
loop would make it better.

Sorry I failed to understand your point...

Anyway, the attached is the updated version of the patch. This fixes
all the issues in InvalidateObsoleteReplicationSlots() that I reported
upthread.

Regards,

--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION

Attachments:

invalidate_obsolete_replication_slots_v2.patchtext/plain; charset=UTF-8; name=invalidate_obsolete_replication_slots_v2.patch; x-mac-creator=0; x-mac-type=0Download
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 505445f2dc..a065d41d76 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -99,6 +99,9 @@ ReplicationSlot *MyReplicationSlot = NULL;
 int			max_replication_slots = 0;	/* the maximum number of replication
 										 * slots */
 
+static ReplicationSlot *SearchNamedReplicationSlot(const char *name);
+static int ReplicationSlotAcquireInternal(ReplicationSlot *slot,
+										  const char *name, SlotAcquireBehavior behavior);
 static void ReplicationSlotDropAcquired(void);
 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
 
@@ -322,77 +325,104 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 }
 
 /*
- * Find a previously created slot and mark it as used by this backend.
+ * Search for the named replication slot.
+ *
+ * Return the replication slot if found, otherwise NULL.
+ *
+ * The caller must hold ReplicationSlotControlLock in shared mode.
+ */
+static ReplicationSlot *
+SearchNamedReplicationSlot(const char *name)
+{
+	int			i;
+	ReplicationSlot	*slot = NULL;
+
+	for (i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+		if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
+		{
+			slot = s;
+			break;
+		}
+	}
+
+	return slot;
+}
+
+/*
+ * Find a previously created slot and mark it as used by this process.
  *
  * The return value is only useful if behavior is SAB_Inquire, in which
- * it's zero if we successfully acquired the slot, or the PID of the
- * owning process otherwise.  If behavior is SAB_Error, then trying to
- * acquire an owned slot is an error.  If SAB_Block, we sleep until the
- * slot is released by the owning process.
+ * it's zero if we successfully acquired the slot, -1 if the slot no longer
+ * exists, or the PID of the owning process otherwise.  If behavior is
+ * SAB_Error, then trying to acquire an owned slot is an error.
+ * If SAB_Block, we sleep until the slot is released by the owning process.
  */
 int
 ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior)
 {
-	ReplicationSlot *slot;
+	return ReplicationSlotAcquireInternal(NULL, name, behavior);
+}
+
+/*
+ * Mark the specified slot as used by this process.
+ *
+ * If *slot == NULL, search for the slot with the given name.
+ *
+ * See comments about the return value in ReplicationSlotAcquire().
+ */
+static int
+ReplicationSlotAcquireInternal(ReplicationSlot *slot, const char *name,
+							   SlotAcquireBehavior behavior)
+{
+	ReplicationSlot *s;
 	int			active_pid;
-	int			i;
 
 retry:
 	Assert(MyReplicationSlot == NULL);
 
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+
 	/*
-	 * Search for the named slot and mark it active if we find it.  If the
-	 * slot is already active, we exit the loop with active_pid set to the PID
-	 * of the backend that owns it.
+	 * Search for the slot with the specified name if the slot to
+	 * acquire is not given. If the slot is not found, we either
+	 * return -1 or error out.
 	 */
-	active_pid = 0;
-	slot = NULL;
-	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
-	for (i = 0; i < max_replication_slots; i++)
+	s = (slot == NULL) ? SearchNamedReplicationSlot(name) : slot;
+	if (s == NULL || !s->in_use || strcmp(name, NameStr(s->data.name)) != 0)
 	{
-		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
-
-		if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
+		if (behavior == SAB_Inquire)
 		{
-			/*
-			 * This is the slot we want; check if it's active under some other
-			 * process.  In single user mode, we don't need this check.
-			 */
-			if (IsUnderPostmaster)
-			{
-				/*
-				 * Get ready to sleep on it in case it is active.  (We may end
-				 * up not sleeping, but we don't want to do this while holding
-				 * the spinlock.)
-				 */
-				ConditionVariablePrepareToSleep(&s->active_cv);
-
-				SpinLockAcquire(&s->mutex);
-
-				active_pid = s->active_pid;
-				if (active_pid == 0)
-					active_pid = s->active_pid = MyProcPid;
-
-				SpinLockRelease(&s->mutex);
-			}
-			else
-				active_pid = MyProcPid;
-			slot = s;
-
-			break;
+			LWLockRelease(ReplicationSlotControlLock);
+			return -1;
 		}
-	}
-	LWLockRelease(ReplicationSlotControlLock);
-
-	/* If we did not find the slot, error out. */
-	if (slot == NULL)
 		ereport(ERROR,
 				(errcode(ERRCODE_UNDEFINED_OBJECT),
 				 errmsg("replication slot \"%s\" does not exist", name)));
+	}
+
+	/*
+	 * This is the slot we want; check if it's active under some other
+	 * process.  In single user mode, we don't need this check.
+	 */
+	if (IsUnderPostmaster)
+	{
+		SpinLockAcquire(&s->mutex);
+		if (s->active_pid == 0)
+			s->active_pid = MyProcPid;
+		active_pid = s->active_pid;
+		SpinLockRelease(&s->mutex);
+	}
+	else
+		active_pid = MyProcPid;
+	LWLockRelease(ReplicationSlotControlLock);
 
 	/*
 	 * If we found the slot but it's already active in another backend, we
-	 * either error out or retry after a short wait, as caller specified.
+	 * either error out, return the PID of the owning process, or retry
+	 * after a short wait, as caller specified.
 	 */
 	if (active_pid != MyProcPid)
 	{
@@ -405,19 +435,17 @@ retry:
 			return active_pid;
 
 		/* Wait here until we get signaled, and then restart */
-		ConditionVariableSleep(&slot->active_cv,
+		ConditionVariableSleep(&s->active_cv,
 							   WAIT_EVENT_REPLICATION_SLOT_DROP);
 		ConditionVariableCancelSleep();
 		goto retry;
 	}
-	else
-		ConditionVariableCancelSleep(); /* no sleep needed after all */
 
 	/* Let everybody know we've modified this slot */
-	ConditionVariableBroadcast(&slot->active_cv);
+	ConditionVariableBroadcast(&s->active_cv);
 
 	/* We made this slot active, so it's ours now. */
-	MyReplicationSlot = slot;
+	MyReplicationSlot = s;
 
 	/* success */
 	return 0;
@@ -1100,43 +1128,71 @@ restart:
 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
 		XLogRecPtr	restart_lsn = InvalidXLogRecPtr;
 		NameData	slotname;
+		int		wspid;
+		int		last_signaled_pid = 0;
 
 		if (!s->in_use)
 			continue;
 
 		SpinLockAcquire(&s->mutex);
-		if (s->data.restart_lsn == InvalidXLogRecPtr ||
-			s->data.restart_lsn >= oldestLSN)
-		{
-			SpinLockRelease(&s->mutex);
-			continue;
-		}
-
 		slotname = s->data.name;
 		restart_lsn = s->data.restart_lsn;
-
 		SpinLockRelease(&s->mutex);
+
+		if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
+			continue;
 		LWLockRelease(ReplicationSlotControlLock);
 
 		for (;;)
 		{
-			int			wspid = ReplicationSlotAcquire(NameStr(slotname),
-													   SAB_Inquire);
+			wspid = ReplicationSlotAcquireInternal(s, NameStr(slotname),
+												   SAB_Inquire);
 
-			/* no walsender? success! */
-			if (wspid == 0)
+			/*
+			 * Exit the loop if we successfully acquired the slot or
+			 * the slot was dropped during waiting for the owning process
+			 * to be terminated. For example, the latter case is likely to
+			 * happen when the slot is temporary because it's automatically
+			 * dropped by the termination of the owning process.
+			 */
+			if (wspid <= 0)
 				break;
 
-			ereport(LOG,
-					(errmsg("terminating walsender %d because replication slot \"%s\" is too far behind",
-							wspid, NameStr(slotname))));
-			(void) kill(wspid, SIGTERM);
+			/*
+			 * Signal to terminate the process that owns the slot.
+			 *
+			 * There is the race condition where other process may own
+			 * the slot after the process using it was terminated and before
+			 * this process owns it. To handle this case, we signal again
+			 * if the PID of the owning process is changed than the last.
+			 *
+			 * XXX This logic assumes that the same PID is not reused
+			 * very quickly.
+			 */
+			if (last_signaled_pid != wspid)
+			{
+				ereport(LOG,
+						(errmsg("terminating process %d because replication slot \"%s\" is too far behind",
+								wspid, NameStr(slotname))));
+				(void) kill(wspid, SIGTERM);
+				last_signaled_pid = wspid;
+			}
 
 			ConditionVariableTimedSleep(&s->active_cv, 10,
 										WAIT_EVENT_REPLICATION_SLOT_DROP);
 		}
 		ConditionVariableCancelSleep();
 
+		/*
+		 * Do nothing here and start from scratch if the slot has
+		 * already been dropped.
+		 */
+		if (wspid == -1)
+		{
+			CHECK_FOR_INTERRUPTS();
+			goto restart;
+		}
+
 		ereport(LOG,
 				(errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
 						NameStr(slotname),
#18Kyotaro Horiguchi
horikyota.ntt@gmail.com
In reply to: Fujii Masao (#17)
Re: Review for GetWALAvailability()

At Wed, 17 Jun 2020 17:01:11 +0900, Fujii Masao <masao.fujii@oss.nttdata.com> wrote in

On 2020/06/17 12:10, Kyotaro Horiguchi wrote:

At Tue, 16 Jun 2020 22:40:56 -0400, Alvaro Herrera
<alvherre@2ndquadrant.com> wrote in

On 2020-Jun-17, Fujii Masao wrote:

On 2020/06/17 3:50, Alvaro Herrera wrote:

So InvalidateObsoleteReplicationSlots() can terminate normal backends.
But do we want to do this? If we want, we should add the note about
this
case into the docs? Otherwise the users would be surprised at
termination
of backends by max_slot_wal_keep_size. I guess that it's basically
rarely
happen, though.

Well, if we could distinguish a walsender from a non-walsender
process,
then maybe it would make sense to leave backends alive. But do we
want
that? I admit I don't know what would be the reason to have a
non-walsender process with an active slot, so I don't have a good
opinion on what to do in this case.

The non-walsender backend is actually doing replication work. It
rather should be killed?

I have no better opinion about this. So I agree to leave the logic as
it is
at least for now, i.e., we terminate the process owning the slot
whatever
the type of process is.

Agreed.

+		/*
+		 * Signal to terminate the process using the replication slot.
+		 *
+		 * Try to signal every 100ms until it succeeds.
+		 */
+		if (!killed && kill(active_pid, SIGTERM) == 0)
+			killed = true;
+		ConditionVariableTimedSleep(&slot->active_cv, 100,
+									WAIT_EVENT_REPLICATION_SLOT_DROP);
+	} while (ReplicationSlotIsActive(slot, NULL));

Note that here you're signalling only once and then sleeping many
times
in increments of 100ms -- you're not signalling every 100ms as the
comment claims -- unless the signal fails, but you don't really expect
that. On the contrary, I'd claim that the logic is reversed: if the
signal fails, *then* you should stop signalling.

You mean; in this code path, signaling fails only when the target
process
disappears just before signaling. So if it fails, slot->active_pid is
expected to become 0 even without signaling more. Right?

I guess kill() can also fail if the PID now belongs to a process owned
by a different user.

Yes. This case means that the PostgreSQL process using the slot
disappeared
and the same PID was assigned to non-PostgreSQL process. So if kill()
fails
for this reason, we don't need to kill() again.

I think we've disregarded very quick reuse of

PIDs, so we needn't concern ourselves with it.

The first time call to ConditionVariableTimedSleep doen't actually
sleep, so the loop works as expected. But we may make an extra call
to kill(2). Calling ConditionVariablePrepareToSleep beforehand of the
loop would make it better.

Sorry I failed to understand your point...

My point is the ConditionVariableTimedSleep does *not* sleep on the CV
first time in this usage. The new version anyway avoids useless
kill(2) call, but still may make an extra call to
ReplicationSlotAcquireInternal. I think we should call
ConditionVariablePrepareToSleep before the sorrounding for statement
block.

Anyway, the attached is the updated version of the patch. This fixes
all the issues in InvalidateObsoleteReplicationSlots() that I reported
upthread.

regards.

--
Kyotaro Horiguchi
NTT Open Source Software Center

#19Fujii Masao
masao.fujii@oss.nttdata.com
In reply to: Kyotaro Horiguchi (#18)
1 attachment(s)
Re: Review for GetWALAvailability()

On 2020/06/17 17:30, Kyotaro Horiguchi wrote:

At Wed, 17 Jun 2020 17:01:11 +0900, Fujii Masao <masao.fujii@oss.nttdata.com> wrote in

On 2020/06/17 12:10, Kyotaro Horiguchi wrote:

At Tue, 16 Jun 2020 22:40:56 -0400, Alvaro Herrera
<alvherre@2ndquadrant.com> wrote in

On 2020-Jun-17, Fujii Masao wrote:

On 2020/06/17 3:50, Alvaro Herrera wrote:

So InvalidateObsoleteReplicationSlots() can terminate normal backends.
But do we want to do this? If we want, we should add the note about
this
case into the docs? Otherwise the users would be surprised at
termination
of backends by max_slot_wal_keep_size. I guess that it's basically
rarely
happen, though.

Well, if we could distinguish a walsender from a non-walsender
process,
then maybe it would make sense to leave backends alive. But do we
want
that? I admit I don't know what would be the reason to have a
non-walsender process with an active slot, so I don't have a good
opinion on what to do in this case.

The non-walsender backend is actually doing replication work. It
rather should be killed?

I have no better opinion about this. So I agree to leave the logic as
it is
at least for now, i.e., we terminate the process owning the slot
whatever
the type of process is.

Agreed.

+		/*
+		 * Signal to terminate the process using the replication slot.
+		 *
+		 * Try to signal every 100ms until it succeeds.
+		 */
+		if (!killed && kill(active_pid, SIGTERM) == 0)
+			killed = true;
+		ConditionVariableTimedSleep(&slot->active_cv, 100,
+									WAIT_EVENT_REPLICATION_SLOT_DROP);
+	} while (ReplicationSlotIsActive(slot, NULL));

Note that here you're signalling only once and then sleeping many
times
in increments of 100ms -- you're not signalling every 100ms as the
comment claims -- unless the signal fails, but you don't really expect
that. On the contrary, I'd claim that the logic is reversed: if the
signal fails, *then* you should stop signalling.

You mean; in this code path, signaling fails only when the target
process
disappears just before signaling. So if it fails, slot->active_pid is
expected to become 0 even without signaling more. Right?

I guess kill() can also fail if the PID now belongs to a process owned
by a different user.

Yes. This case means that the PostgreSQL process using the slot
disappeared
and the same PID was assigned to non-PostgreSQL process. So if kill()
fails
for this reason, we don't need to kill() again.

I think we've disregarded very quick reuse of

PIDs, so we needn't concern ourselves with it.

The first time call to ConditionVariableTimedSleep doen't actually
sleep, so the loop works as expected. But we may make an extra call
to kill(2). Calling ConditionVariablePrepareToSleep beforehand of the
loop would make it better.

Sorry I failed to understand your point...

My point is the ConditionVariableTimedSleep does *not* sleep on the CV
first time in this usage. The new version anyway avoids useless
kill(2) call, but still may make an extra call to
ReplicationSlotAcquireInternal. I think we should call
ConditionVariablePrepareToSleep before the sorrounding for statement
block.

OK, so what about the attached patch? I added ConditionVariablePrepareToSleep()
just before entering the "for" loop in InvalidateObsoleteReplicationSlots().

Regards,

--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION

Attachments:

invalidate_obsolete_replication_slots_v3.patchtext/plain; charset=UTF-8; name=invalidate_obsolete_replication_slots_v3.patch; x-mac-creator=0; x-mac-type=0Download
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 505445f2dc..b94e11a8e7 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -99,6 +99,9 @@ ReplicationSlot *MyReplicationSlot = NULL;
 int			max_replication_slots = 0;	/* the maximum number of replication
 										 * slots */
 
+static ReplicationSlot *SearchNamedReplicationSlot(const char *name);
+static int ReplicationSlotAcquireInternal(ReplicationSlot *slot,
+										  const char *name, SlotAcquireBehavior behavior);
 static void ReplicationSlotDropAcquired(void);
 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
 
@@ -322,77 +325,107 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 }
 
 /*
- * Find a previously created slot and mark it as used by this backend.
+ * Search for the named replication slot.
+ *
+ * Return the replication slot if found, otherwise NULL.
+ *
+ * The caller must hold ReplicationSlotControlLock in shared mode.
+ */
+static ReplicationSlot *
+SearchNamedReplicationSlot(const char *name)
+{
+	int			i;
+	ReplicationSlot	*slot = NULL;
+
+	Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock,
+								LW_SHARED));
+
+	for (i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+		if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
+		{
+			slot = s;
+			break;
+		}
+	}
+
+	return slot;
+}
+
+/*
+ * Find a previously created slot and mark it as used by this process.
  *
  * The return value is only useful if behavior is SAB_Inquire, in which
- * it's zero if we successfully acquired the slot, or the PID of the
- * owning process otherwise.  If behavior is SAB_Error, then trying to
- * acquire an owned slot is an error.  If SAB_Block, we sleep until the
- * slot is released by the owning process.
+ * it's zero if we successfully acquired the slot, -1 if the slot no longer
+ * exists, or the PID of the owning process otherwise.  If behavior is
+ * SAB_Error, then trying to acquire an owned slot is an error.
+ * If SAB_Block, we sleep until the slot is released by the owning process.
  */
 int
 ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior)
 {
-	ReplicationSlot *slot;
+	return ReplicationSlotAcquireInternal(NULL, name, behavior);
+}
+
+/*
+ * Mark the specified slot as used by this process.
+ *
+ * If *slot == NULL, search for the slot with the given name.
+ *
+ * See comments about the return value in ReplicationSlotAcquire().
+ */
+static int
+ReplicationSlotAcquireInternal(ReplicationSlot *slot, const char *name,
+							   SlotAcquireBehavior behavior)
+{
+	ReplicationSlot *s;
 	int			active_pid;
-	int			i;
 
 retry:
 	Assert(MyReplicationSlot == NULL);
 
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+
 	/*
-	 * Search for the named slot and mark it active if we find it.  If the
-	 * slot is already active, we exit the loop with active_pid set to the PID
-	 * of the backend that owns it.
+	 * Search for the slot with the specified name if the slot to
+	 * acquire is not given. If the slot is not found, we either
+	 * return -1 or error out.
 	 */
-	active_pid = 0;
-	slot = NULL;
-	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
-	for (i = 0; i < max_replication_slots; i++)
+	s = (slot == NULL) ? SearchNamedReplicationSlot(name) : slot;
+	if (s == NULL || !s->in_use || strcmp(name, NameStr(s->data.name)) != 0)
 	{
-		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
-
-		if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
+		if (behavior == SAB_Inquire)
 		{
-			/*
-			 * This is the slot we want; check if it's active under some other
-			 * process.  In single user mode, we don't need this check.
-			 */
-			if (IsUnderPostmaster)
-			{
-				/*
-				 * Get ready to sleep on it in case it is active.  (We may end
-				 * up not sleeping, but we don't want to do this while holding
-				 * the spinlock.)
-				 */
-				ConditionVariablePrepareToSleep(&s->active_cv);
-
-				SpinLockAcquire(&s->mutex);
-
-				active_pid = s->active_pid;
-				if (active_pid == 0)
-					active_pid = s->active_pid = MyProcPid;
-
-				SpinLockRelease(&s->mutex);
-			}
-			else
-				active_pid = MyProcPid;
-			slot = s;
-
-			break;
+			LWLockRelease(ReplicationSlotControlLock);
+			return -1;
 		}
-	}
-	LWLockRelease(ReplicationSlotControlLock);
-
-	/* If we did not find the slot, error out. */
-	if (slot == NULL)
 		ereport(ERROR,
 				(errcode(ERRCODE_UNDEFINED_OBJECT),
 				 errmsg("replication slot \"%s\" does not exist", name)));
+	}
 
 	/*
-	 * If we found the slot but it's already active in another backend, we
-	 * either error out or retry after a short wait, as caller specified.
+	 * This is the slot we want; check if it's active under some other
+	 * process.  In single user mode, we don't need this check.
+	 */
+	if (IsUnderPostmaster)
+	{
+		SpinLockAcquire(&s->mutex);
+		if (s->active_pid == 0)
+			s->active_pid = MyProcPid;
+		active_pid = s->active_pid;
+		SpinLockRelease(&s->mutex);
+	}
+	else
+		active_pid = MyProcPid;
+	LWLockRelease(ReplicationSlotControlLock);
+
+	/*
+	 * If we found the slot but it's already active in another process, we
+	 * either error out, return the PID of the owning process, or retry
+	 * after a short wait, as caller specified.
 	 */
 	if (active_pid != MyProcPid)
 	{
@@ -405,19 +438,18 @@ retry:
 			return active_pid;
 
 		/* Wait here until we get signaled, and then restart */
-		ConditionVariableSleep(&slot->active_cv,
+		ConditionVariablePrepareToSleep(&s->active_cv);
+		ConditionVariableSleep(&s->active_cv,
 							   WAIT_EVENT_REPLICATION_SLOT_DROP);
 		ConditionVariableCancelSleep();
 		goto retry;
 	}
-	else
-		ConditionVariableCancelSleep(); /* no sleep needed after all */
 
 	/* Let everybody know we've modified this slot */
-	ConditionVariableBroadcast(&slot->active_cv);
+	ConditionVariableBroadcast(&s->active_cv);
 
 	/* We made this slot active, so it's ours now. */
-	MyReplicationSlot = slot;
+	MyReplicationSlot = s;
 
 	/* success */
 	return 0;
@@ -1100,43 +1132,83 @@ restart:
 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
 		XLogRecPtr	restart_lsn = InvalidXLogRecPtr;
 		NameData	slotname;
+		int		wspid;
+		int		last_signaled_pid = 0;
 
 		if (!s->in_use)
 			continue;
 
 		SpinLockAcquire(&s->mutex);
-		if (s->data.restart_lsn == InvalidXLogRecPtr ||
-			s->data.restart_lsn >= oldestLSN)
-		{
-			SpinLockRelease(&s->mutex);
-			continue;
-		}
-
 		slotname = s->data.name;
 		restart_lsn = s->data.restart_lsn;
-
 		SpinLockRelease(&s->mutex);
+
+		if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
+			continue;
 		LWLockRelease(ReplicationSlotControlLock);
 
+		/* Get ready to sleep on the slot in case it is active */
+		ConditionVariablePrepareToSleep(&s->active_cv);
+
 		for (;;)
 		{
-			int			wspid = ReplicationSlotAcquire(NameStr(slotname),
-													   SAB_Inquire);
+			/*
+			 * Try to mark this slot as used by this process.
+			 *
+			 * Note that ReplicationSlotAcquireInternal(SAB_Inquire)
+			 * should not cancel the prepared condition variable
+			 * if this slot is active in other process. Because in this case
+			 * we have to wait on that CV for the process owning
+			 * the slot to be terminated, later.
+			 */
+			wspid = ReplicationSlotAcquireInternal(s, NameStr(slotname),
+												   SAB_Inquire);
 
-			/* no walsender? success! */
-			if (wspid == 0)
+			/*
+			 * Exit the loop if we successfully acquired the slot or
+			 * the slot was dropped during waiting for the owning process
+			 * to be terminated. For example, the latter case is likely to
+			 * happen when the slot is temporary because it's automatically
+			 * dropped by the termination of the owning process.
+			 */
+			if (wspid <= 0)
 				break;
 
-			ereport(LOG,
-					(errmsg("terminating walsender %d because replication slot \"%s\" is too far behind",
-							wspid, NameStr(slotname))));
-			(void) kill(wspid, SIGTERM);
+			/*
+			 * Signal to terminate the process that owns the slot.
+			 *
+			 * There is the race condition where other process may own
+			 * the slot after the process using it was terminated and before
+			 * this process owns it. To handle this case, we signal again
+			 * if the PID of the owning process is changed than the last.
+			 *
+			 * XXX This logic assumes that the same PID is not reused
+			 * very quickly.
+			 */
+			if (last_signaled_pid != wspid)
+			{
+				ereport(LOG,
+						(errmsg("terminating process %d because replication slot \"%s\" is too far behind",
+								wspid, NameStr(slotname))));
+				(void) kill(wspid, SIGTERM);
+				last_signaled_pid = wspid;
+			}
 
 			ConditionVariableTimedSleep(&s->active_cv, 10,
 										WAIT_EVENT_REPLICATION_SLOT_DROP);
 		}
 		ConditionVariableCancelSleep();
 
+		/*
+		 * Do nothing here and start from scratch if the slot has
+		 * already been dropped.
+		 */
+		if (wspid == -1)
+		{
+			CHECK_FOR_INTERRUPTS();
+			goto restart;
+		}
+
 		ereport(LOG,
 				(errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
 						NameStr(slotname),
#20Alvaro Herrera
alvherre@2ndquadrant.com
In reply to: Fujii Masao (#19)
1 attachment(s)
Re: Review for GetWALAvailability()

I think passing the slot name when the slot is also passed is useless
and wasteful; it'd be better to pass NULL for the name and ignore the
strcmp() in that case -- in fact I suggest to forbid passing both name
and slot. (Any failure there would risk raising an error during
checkpoint, which is undesirable.)

So I propose the following tweaks to your patch, and otherwise +1.

--
�lvaro Herrera https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

Attachments:

tweak.patchtext/x-diff; charset=us-asciiDownload
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index b94e11a8e7..d99d0e9b42 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -383,24 +383,25 @@ ReplicationSlotAcquireInternal(ReplicationSlot *slot, const char *name,
 	ReplicationSlot *s;
 	int			active_pid;
 
+	AssertArg((slot == NULL) ^ (name == NULL));
+
 retry:
 	Assert(MyReplicationSlot == NULL);
 
 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 
 	/*
-	 * Search for the slot with the specified name if the slot to
-	 * acquire is not given. If the slot is not found, we either
-	 * return -1 or error out.
+	 * Search for the slot with the specified name if the slot to acquire is
+	 * not given. If the slot is not found, we either return -1 or error out.
 	 */
-	s = (slot == NULL) ? SearchNamedReplicationSlot(name) : slot;
-	if (s == NULL || !s->in_use || strcmp(name, NameStr(s->data.name)) != 0)
+	s = slot ? slot : SearchNamedReplicationSlot(name);
+	if (s == NULL || !s->in_use ||
+		(name && strcmp(name, NameStr(s->data.name)) != 0))
 	{
+		LWLockRelease(ReplicationSlotControlLock);
+
 		if (behavior == SAB_Inquire)
-		{
-			LWLockRelease(ReplicationSlotControlLock);
 			return -1;
-		}
 		ereport(ERROR,
 				(errcode(ERRCODE_UNDEFINED_OBJECT),
 				 errmsg("replication slot \"%s\" does not exist", name)));
@@ -1161,8 +1162,7 @@ restart:
 			 * we have to wait on that CV for the process owning
 			 * the slot to be terminated, later.
 			 */
-			wspid = ReplicationSlotAcquireInternal(s, NameStr(slotname),
-												   SAB_Inquire);
+			wspid = ReplicationSlotAcquireInternal(s, NULL, SAB_Inquire);
 
 			/*
 			 * Exit the loop if we successfully acquired the slot or
#21Alvaro Herrera
alvherre@2ndquadrant.com
In reply to: Kyotaro Horiguchi (#16)
Re: Review for GetWALAvailability()

On 2020-Jun-17, Kyotaro Horiguchi wrote:

@@ -342,7 +351,14 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
else
nulls[i++] = true;

-		walstate = GetWALAvailability(slot_contents.data.restart_lsn);
+		/* use last_invalidated_lsn when the slot is invalidated */
+		if (XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
+			targetLSN = slot_contents.last_invalidated_lsn;
+		else
+			targetLSN = slot_contents.data.restart_lsn;
+
+		walstate = GetWALAvailability(targetLSN, last_removed_seg,
+									  slot_contents.active_pid != 0);

Yeah, this approach seems better overall. I'll see if I can get this
done after lunch.

--
�lvaro Herrera https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

#22Fujii Masao
masao.fujii@oss.nttdata.com
In reply to: Alvaro Herrera (#20)
Re: Review for GetWALAvailability()

On 2020/06/18 3:04, Alvaro Herrera wrote:

I think passing the slot name when the slot is also passed is useless
and wasteful; it'd be better to pass NULL for the name and ignore the
strcmp() in that case -- in fact I suggest to forbid passing both name
and slot. (Any failure there would risk raising an error during
checkpoint, which is undesirable.)

Sounds reasonable.

So I propose the following tweaks to your patch, and otherwise +1.

Thanks for the patch! It looks good to me.

Barring any objections, I will commit the patches in the master and
v13 branches later.

Regards,

--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION

#23Kyotaro Horiguchi
horikyota.ntt@gmail.com
In reply to: Fujii Masao (#19)
3 attachment(s)
Re: Review for GetWALAvailability()

At Wed, 17 Jun 2020 20:13:01 +0900, Fujii Masao <masao.fujii@oss.nttdata.com> wrote in

ReplicationSlotAcquireInternal. I think we should call
ConditionVariablePrepareToSleep before the sorrounding for statement
block.

OK, so what about the attached patch? I added
ConditionVariablePrepareToSleep()
just before entering the "for" loop in
InvalidateObsoleteReplicationSlots().

Thanks.

ReplicationSlotAcquireInternal:
+ * If *slot == NULL, search for the slot with the given name.

'*' seems needless here.

The patch moves ConditionVariablePrepareToSleep. We need to call the
function before looking into active_pid as originally commented.
Since it is not protected by ReplicationSlotControLock, just before
releasing the lock is not correct.

The attached on top of the v3 fixes that.

+   s = (slot == NULL) ? SearchNamedReplicationSlot(name) : slot;
+   if (s == NULL || !s->in_use || strcmp(name, NameStr(s->data.name)) != 0)

The conditions in the second line is needed for the case slot is
given, but it is already done in SearchNamedReplicationSlot if slot is
not given. I would like something like the following instead, but I
don't insist on it.

ReplicationSlot *s = NULL;
...
if (!slot)
s = SearchNamedReplicationSlot(name);
else if(s->in_use && strcmp(name, NameStr(s->data.name)))
s = slot;

+        ereport(ERROR,
+                (errcode(ERRCODE_UNDEFINED_OBJECT),
+                 errmsg("replication slot \"%s\" does not exist", name)));

The error message is not right when the given slot doesn't match the
given name. It might be better to leave it to the caller. Currently
no such caller exists so I don't insist on this but the message should
be revised otherwise.

regards.

--
Kyotaro Horiguchi
NTT Open Source Software Center

Attachments:

0001.patchtext/x-patch; charset=us-asciiDownload
From 8f1913ec7ff3809d11adf1611aba57e44cb42a82 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyoga.ntt@gmail.com>
Date: Thu, 18 Jun 2020 10:55:49 +0900
Subject: [PATCH 1/3] 001

---
 src/backend/replication/slot.c | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index b94e11a8e7..dcc76c4783 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -412,6 +412,14 @@ retry:
 	 */
 	if (IsUnderPostmaster)
 	{
+		/*
+		 * Get ready to sleep on it in case it is active if needed.
+		 * (We may end up not sleeping, but we don't want to do this while
+		 * holding the spinlock.)
+		 */
+		if (behavior != SAB_Inquire)
+			ConditionVariablePrepareToSleep(&s->active_cv);
+
 		SpinLockAcquire(&s->mutex);
 		if (s->active_pid == 0)
 			s->active_pid = MyProcPid;
@@ -438,12 +446,13 @@ retry:
 			return active_pid;
 
 		/* Wait here until we get signaled, and then restart */
-		ConditionVariablePrepareToSleep(&s->active_cv);
 		ConditionVariableSleep(&s->active_cv,
 							   WAIT_EVENT_REPLICATION_SLOT_DROP);
 		ConditionVariableCancelSleep();
 		goto retry;
 	}
+	else if (behavior != SAB_Inquire)
+		ConditionVariableCancelSleep();
 
 	/* Let everybody know we've modified this slot */
 	ConditionVariableBroadcast(&s->active_cv);
-- 
2.18.4

0002.patchtext/x-patch; charset=us-asciiDownload
From 1b2f94d6bfb4508b2cbf3d552a5615ae2959e90c Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyoga.ntt@gmail.com>
Date: Thu, 18 Jun 2020 11:25:25 +0900
Subject: [PATCH 2/3] 002

---
 src/backend/replication/slot.c | 10 +++++++---
 1 file changed, 7 insertions(+), 3 deletions(-)

diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index dcc76c4783..8893516f00 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -380,7 +380,7 @@ static int
 ReplicationSlotAcquireInternal(ReplicationSlot *slot, const char *name,
 							   SlotAcquireBehavior behavior)
 {
-	ReplicationSlot *s;
+	ReplicationSlot *s = NULL;
 	int			active_pid;
 
 retry:
@@ -393,8 +393,12 @@ retry:
 	 * acquire is not given. If the slot is not found, we either
 	 * return -1 or error out.
 	 */
-	s = (slot == NULL) ? SearchNamedReplicationSlot(name) : slot;
-	if (s == NULL || !s->in_use || strcmp(name, NameStr(s->data.name)) != 0)
+	if (!slot)
+		s = SearchNamedReplicationSlot(name);
+	else if(s->in_use && strcmp(name, NameStr(s->data.name)))
+		s = slot;
+
+	if (s == NULL)
 	{
 		if (behavior == SAB_Inquire)
 		{
-- 
2.18.4

0003.patchtext/x-patch; charset=us-asciiDownload
From 2d4a83abd2f278a9f9dc6e9329aed1acc191f2c8 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyoga.ntt@gmail.com>
Date: Thu, 18 Jun 2020 11:33:29 +0900
Subject: [PATCH 3/3] 003

---
 src/backend/replication/slot.c | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 8893516f00..25ae334a29 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -373,8 +373,10 @@ ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior)
  * Mark the specified slot as used by this process.
  *
  * If *slot == NULL, search for the slot with the given name.
- *
  * See comments about the return value in ReplicationSlotAcquire().
+ *
+ * If slot is not NULL, returns -1 if the slot is not in use or doesn't match
+ * the given name.
  */
 static int
 ReplicationSlotAcquireInternal(ReplicationSlot *slot, const char *name,
@@ -400,7 +402,7 @@ retry:
 
 	if (s == NULL)
 	{
-		if (behavior == SAB_Inquire)
+		if (behavior == SAB_Inquire || slot)
 		{
 			LWLockRelease(ReplicationSlotControlLock);
 			return -1;
-- 
2.18.4

#24Fujii Masao
masao.fujii@oss.nttdata.com
In reply to: Kyotaro Horiguchi (#23)
1 attachment(s)
Re: Review for GetWALAvailability()

On 2020/06/18 11:44, Kyotaro Horiguchi wrote:

At Wed, 17 Jun 2020 20:13:01 +0900, Fujii Masao <masao.fujii@oss.nttdata.com> wrote in

ReplicationSlotAcquireInternal. I think we should call
ConditionVariablePrepareToSleep before the sorrounding for statement
block.

OK, so what about the attached patch? I added
ConditionVariablePrepareToSleep()
just before entering the "for" loop in
InvalidateObsoleteReplicationSlots().

Thanks.

Thanks for the review!

ReplicationSlotAcquireInternal:
+ * If *slot == NULL, search for the slot with the given name.

'*' seems needless here.

Fixed.

Also I added "Only one of slot and name can be specified." into
the comments of ReplicationSlotAcquireInternal().

The patch moves ConditionVariablePrepareToSleep. We need to call the
function before looking into active_pid as originally commented.
Since it is not protected by ReplicationSlotControLock, just before
releasing the lock is not correct.

The attached on top of the v3 fixes that.

Yes, you're right. I merged your 0001.patch into mine.

+		if (behavior != SAB_Inquire)
+			ConditionVariablePrepareToSleep(&s->active_cv);
+	else if (behavior != SAB_Inquire)

Isn't "behavior == SAB_Block" condition better here?
I changed the patch that way.

The attached is the updated version of the patch.
I also merged Alvaro's patch into this.

+   s = (slot == NULL) ? SearchNamedReplicationSlot(name) : slot;
+   if (s == NULL || !s->in_use || strcmp(name, NameStr(s->data.name)) != 0)

The conditions in the second line is needed for the case slot is
given, but it is already done in SearchNamedReplicationSlot if slot is
not given. I would like something like the following instead, but I
don't insist on it.

Yes, I got rid of strcmp() check, but left is_use check as it is.
I like that because it's simpler.

ReplicationSlot *s = NULL;
...
if (!slot)
s = SearchNamedReplicationSlot(name);
else if(s->in_use && strcmp(name, NameStr(s->data.name)))
s = slot;

+        ereport(ERROR,
+                (errcode(ERRCODE_UNDEFINED_OBJECT),
+                 errmsg("replication slot \"%s\" does not exist", name)));

The error message is not right when the given slot doesn't match the
given name.

This doesn't happen after applying Alvaro's patch.

BTW, using "name" here is not valid because it may be NULL.
So I added the following code and used "slot_name" in log messages.

+ slot_name = name ? name : NameStr(slot->data.name);

Regards,

--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION

Attachments:

invalidate_obsolete_replication_slots_v4.patchtext/plain; charset=UTF-8; name=invalidate_obsolete_replication_slots_v4.patch; x-mac-creator=0; x-mac-type=0Download
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 505445f2dc..77cf366ef1 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -99,6 +99,9 @@ ReplicationSlot *MyReplicationSlot = NULL;
 int			max_replication_slots = 0;	/* the maximum number of replication
 										 * slots */
 
+static ReplicationSlot *SearchNamedReplicationSlot(const char *name);
+static int ReplicationSlotAcquireInternal(ReplicationSlot *slot,
+										  const char *name, SlotAcquireBehavior behavior);
 static void ReplicationSlotDropAcquired(void);
 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
 
@@ -322,77 +325,123 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 }
 
 /*
- * Find a previously created slot and mark it as used by this backend.
+ * Search for the named replication slot.
+ *
+ * Return the replication slot if found, otherwise NULL.
+ *
+ * The caller must hold ReplicationSlotControlLock in shared mode.
+ */
+static ReplicationSlot *
+SearchNamedReplicationSlot(const char *name)
+{
+	int			i;
+	ReplicationSlot	*slot = NULL;
+
+	Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock,
+								LW_SHARED));
+
+	for (i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+		if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
+		{
+			slot = s;
+			break;
+		}
+	}
+
+	return slot;
+}
+
+/*
+ * Find a previously created slot and mark it as used by this process.
  *
  * The return value is only useful if behavior is SAB_Inquire, in which
- * it's zero if we successfully acquired the slot, or the PID of the
- * owning process otherwise.  If behavior is SAB_Error, then trying to
- * acquire an owned slot is an error.  If SAB_Block, we sleep until the
- * slot is released by the owning process.
+ * it's zero if we successfully acquired the slot, -1 if the slot no longer
+ * exists, or the PID of the owning process otherwise.  If behavior is
+ * SAB_Error, then trying to acquire an owned slot is an error.
+ * If SAB_Block, we sleep until the slot is released by the owning process.
  */
 int
 ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior)
 {
-	ReplicationSlot *slot;
+	return ReplicationSlotAcquireInternal(NULL, name, behavior);
+}
+
+/*
+ * Mark the specified slot as used by this process.
+ *
+ * Only one of slot and name can be specified.
+ * If slot == NULL, search for the slot with the given name.
+ *
+ * See comments about the return value in ReplicationSlotAcquire().
+ */
+static int
+ReplicationSlotAcquireInternal(ReplicationSlot *slot, const char *name,
+							   SlotAcquireBehavior behavior)
+{
+	ReplicationSlot *s;
 	int			active_pid;
-	int			i;
+	char	*slot_name;
+
+	AssertArg((slot == NULL) ^ (name == NULL));
+
+	/*
+	 * Determine the name of slot to acquire. This name is used in
+	 * log messages.
+	 */
+	slot_name = name ? name : NameStr(slot->data.name);
 
 retry:
 	Assert(MyReplicationSlot == NULL);
 
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+
 	/*
-	 * Search for the named slot and mark it active if we find it.  If the
-	 * slot is already active, we exit the loop with active_pid set to the PID
-	 * of the backend that owns it.
+	 * Search for the slot with the specified name if the slot to acquire is
+	 * not given. If the slot is not found, we either return -1 or error out.
 	 */
-	active_pid = 0;
-	slot = NULL;
-	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
-	for (i = 0; i < max_replication_slots; i++)
+	s = slot ? slot : SearchNamedReplicationSlot(name);
+	if (s == NULL || !s->in_use)
 	{
-		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+		LWLockRelease(ReplicationSlotControlLock);
 
-		if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
-		{
-			/*
-			 * This is the slot we want; check if it's active under some other
-			 * process.  In single user mode, we don't need this check.
-			 */
-			if (IsUnderPostmaster)
-			{
-				/*
-				 * Get ready to sleep on it in case it is active.  (We may end
-				 * up not sleeping, but we don't want to do this while holding
-				 * the spinlock.)
-				 */
-				ConditionVariablePrepareToSleep(&s->active_cv);
-
-				SpinLockAcquire(&s->mutex);
-
-				active_pid = s->active_pid;
-				if (active_pid == 0)
-					active_pid = s->active_pid = MyProcPid;
-
-				SpinLockRelease(&s->mutex);
-			}
-			else
-				active_pid = MyProcPid;
-			slot = s;
-
-			break;
-		}
-	}
-	LWLockRelease(ReplicationSlotControlLock);
-
-	/* If we did not find the slot, error out. */
-	if (slot == NULL)
+		if (behavior == SAB_Inquire)
+			return -1;
 		ereport(ERROR,
 				(errcode(ERRCODE_UNDEFINED_OBJECT),
-				 errmsg("replication slot \"%s\" does not exist", name)));
+				 errmsg("replication slot \"%s\" does not exist", slot_name)));
+	}
 
 	/*
-	 * If we found the slot but it's already active in another backend, we
-	 * either error out or retry after a short wait, as caller specified.
+	 * This is the slot we want; check if it's active under some other
+	 * process.  In single user mode, we don't need this check.
+	 */
+	if (IsUnderPostmaster)
+	{
+		/*
+		 * Get ready to sleep on the slot in case it is active if SAB_Block.
+		 * (We may end up not sleeping, but we don't want to do this while
+		 * holding the spinlock.)
+		 */
+		if (behavior == SAB_Block)
+			ConditionVariablePrepareToSleep(&s->active_cv);
+
+		SpinLockAcquire(&s->mutex);
+		if (s->active_pid == 0)
+			s->active_pid = MyProcPid;
+		active_pid = s->active_pid;
+		SpinLockRelease(&s->mutex);
+	}
+	else
+		active_pid = MyProcPid;
+	LWLockRelease(ReplicationSlotControlLock);
+
+	/*
+	 * If we found the slot but it's already active in another process, we
+	 * either error out, return the PID of the owning process, or retry
+	 * after a short wait, as caller specified.
 	 */
 	if (active_pid != MyProcPid)
 	{
@@ -400,24 +449,24 @@ retry:
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_IN_USE),
 					 errmsg("replication slot \"%s\" is active for PID %d",
-							name, active_pid)));
+							slot_name, active_pid)));
 		else if (behavior == SAB_Inquire)
 			return active_pid;
 
 		/* Wait here until we get signaled, and then restart */
-		ConditionVariableSleep(&slot->active_cv,
+		ConditionVariableSleep(&s->active_cv,
 							   WAIT_EVENT_REPLICATION_SLOT_DROP);
 		ConditionVariableCancelSleep();
 		goto retry;
 	}
-	else
-		ConditionVariableCancelSleep(); /* no sleep needed after all */
+	else if (behavior == SAB_Block)
+		ConditionVariableCancelSleep();	/* no sleep needed after all */
 
 	/* Let everybody know we've modified this slot */
-	ConditionVariableBroadcast(&slot->active_cv);
+	ConditionVariableBroadcast(&s->active_cv);
 
 	/* We made this slot active, so it's ours now. */
-	MyReplicationSlot = slot;
+	MyReplicationSlot = s;
 
 	/* success */
 	return 0;
@@ -1100,43 +1149,82 @@ restart:
 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
 		XLogRecPtr	restart_lsn = InvalidXLogRecPtr;
 		NameData	slotname;
+		int		wspid;
+		int		last_signaled_pid = 0;
 
 		if (!s->in_use)
 			continue;
 
 		SpinLockAcquire(&s->mutex);
-		if (s->data.restart_lsn == InvalidXLogRecPtr ||
-			s->data.restart_lsn >= oldestLSN)
-		{
-			SpinLockRelease(&s->mutex);
-			continue;
-		}
-
 		slotname = s->data.name;
 		restart_lsn = s->data.restart_lsn;
-
 		SpinLockRelease(&s->mutex);
+
+		if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
+			continue;
 		LWLockRelease(ReplicationSlotControlLock);
 
+		/* Get ready to sleep on the slot in case it is active */
+		ConditionVariablePrepareToSleep(&s->active_cv);
+
 		for (;;)
 		{
-			int			wspid = ReplicationSlotAcquire(NameStr(slotname),
-													   SAB_Inquire);
+			/*
+			 * Try to mark this slot as used by this process.
+			 *
+			 * Note that ReplicationSlotAcquireInternal(SAB_Inquire)
+			 * should not cancel the prepared condition variable
+			 * if this slot is active in other process. Because in this case
+			 * we have to wait on that CV for the process owning
+			 * the slot to be terminated, later.
+			 */
+			wspid = ReplicationSlotAcquireInternal(s, NULL, SAB_Inquire);
 
-			/* no walsender? success! */
-			if (wspid == 0)
+			/*
+			 * Exit the loop if we successfully acquired the slot or
+			 * the slot was dropped during waiting for the owning process
+			 * to be terminated. For example, the latter case is likely to
+			 * happen when the slot is temporary because it's automatically
+			 * dropped by the termination of the owning process.
+			 */
+			if (wspid <= 0)
 				break;
 
-			ereport(LOG,
-					(errmsg("terminating walsender %d because replication slot \"%s\" is too far behind",
-							wspid, NameStr(slotname))));
-			(void) kill(wspid, SIGTERM);
+			/*
+			 * Signal to terminate the process that owns the slot.
+			 *
+			 * There is the race condition where other process may own
+			 * the slot after the process using it was terminated and before
+			 * this process owns it. To handle this case, we signal again
+			 * if the PID of the owning process is changed than the last.
+			 *
+			 * XXX This logic assumes that the same PID is not reused
+			 * very quickly.
+			 */
+			if (last_signaled_pid != wspid)
+			{
+				ereport(LOG,
+						(errmsg("terminating process %d because replication slot \"%s\" is too far behind",
+								wspid, NameStr(slotname))));
+				(void) kill(wspid, SIGTERM);
+				last_signaled_pid = wspid;
+			}
 
 			ConditionVariableTimedSleep(&s->active_cv, 10,
 										WAIT_EVENT_REPLICATION_SLOT_DROP);
 		}
 		ConditionVariableCancelSleep();
 
+		/*
+		 * Do nothing here and start from scratch if the slot has
+		 * already been dropped.
+		 */
+		if (wspid == -1)
+		{
+			CHECK_FOR_INTERRUPTS();
+			goto restart;
+		}
+
 		ereport(LOG,
 				(errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
 						NameStr(slotname),
#25Fujii Masao
masao.fujii@oss.nttdata.com
In reply to: Fujii Masao (#24)
1 attachment(s)
Re: Review for GetWALAvailability()

On 2020/06/18 14:40, Fujii Masao wrote:

On 2020/06/18 11:44, Kyotaro Horiguchi wrote:

At Wed, 17 Jun 2020 20:13:01 +0900, Fujii Masao <masao.fujii@oss.nttdata.com> wrote in

ReplicationSlotAcquireInternal.� I think we should call
ConditionVariablePrepareToSleep before the sorrounding for statement
block.

OK, so what about the attached patch? I added
ConditionVariablePrepareToSleep()
just before entering the "for" loop in
InvalidateObsoleteReplicationSlots().

Thanks.

Thanks for the review!

ReplicationSlotAcquireInternal:
+ * If *slot == NULL, search for the slot with the given name.

'*' seems needless here.

Fixed.

Also I added "Only one of slot and name can be specified." into
the comments of ReplicationSlotAcquireInternal().

The patch moves ConditionVariablePrepareToSleep. We need to call the
function before looking into active_pid as originally commented.
Since it is not protected by ReplicationSlotControLock, just before
releasing the lock is not correct.

The attached on top of the v3 fixes that.

Yes, you're right. I merged your 0001.patch into mine.

+������� if (behavior != SAB_Inquire)
+����������� ConditionVariablePrepareToSleep(&s->active_cv);
+��� else if (behavior != SAB_Inquire)

Isn't "behavior == SAB_Block" condition better here?
I changed the patch that way.

The attached is the updated version of the patch.
I also merged Alvaro's patch into this.

+�� s = (slot == NULL) ? SearchNamedReplicationSlot(name) : slot;
+�� if (s == NULL || !s->in_use || strcmp(name, NameStr(s->data.name)) != 0)

The conditions in the second line is needed for the case slot is
given, but it is already done in SearchNamedReplicationSlot if slot is
not given.� I would like something like the following instead, but I
don't insist on it.

Yes, I got rid of strcmp() check, but left is_use check as it is.
I like that because it's simpler.

���� ReplicationSlot *s = NULL;
���� ...
���� if (!slot)
�������� s = SearchNamedReplicationSlot(name);
���� else if(s->in_use && strcmp(name, NameStr(s->data.name)))
�������� s = slot;

+������� ereport(ERROR,
+��������������� (errcode(ERRCODE_UNDEFINED_OBJECT),
+���������������� errmsg("replication slot \"%s\" does not exist", name)));

The error message is not right when the given slot doesn't match the
given name.

This doesn't happen after applying Alvaro's patch.

BTW, using "name" here is not valid because it may be NULL.
So I added the following code and used "slot_name" in log messages.

+��� slot_name = name ? name : NameStr(slot->data.name);

Sorry, this caused compiler failure. So I fixed that and
attached the updated version of the patch.

Regards,

--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION

Attachments:

invalidate_obsolete_replication_slots_v5.patchtext/plain; charset=UTF-8; name=invalidate_obsolete_replication_slots_v5.patch; x-mac-creator=0; x-mac-type=0Download
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 505445f2dc..a7bbcf3499 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -99,6 +99,9 @@ ReplicationSlot *MyReplicationSlot = NULL;
 int			max_replication_slots = 0;	/* the maximum number of replication
 										 * slots */
 
+static ReplicationSlot *SearchNamedReplicationSlot(const char *name);
+static int ReplicationSlotAcquireInternal(ReplicationSlot *slot,
+										  const char *name, SlotAcquireBehavior behavior);
 static void ReplicationSlotDropAcquired(void);
 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
 
@@ -322,77 +325,117 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 }
 
 /*
- * Find a previously created slot and mark it as used by this backend.
+ * Search for the named replication slot.
+ *
+ * Return the replication slot if found, otherwise NULL.
+ *
+ * The caller must hold ReplicationSlotControlLock in shared mode.
+ */
+static ReplicationSlot *
+SearchNamedReplicationSlot(const char *name)
+{
+	int			i;
+	ReplicationSlot	*slot = NULL;
+
+	Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock,
+								LW_SHARED));
+
+	for (i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+		if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
+		{
+			slot = s;
+			break;
+		}
+	}
+
+	return slot;
+}
+
+/*
+ * Find a previously created slot and mark it as used by this process.
  *
  * The return value is only useful if behavior is SAB_Inquire, in which
- * it's zero if we successfully acquired the slot, or the PID of the
- * owning process otherwise.  If behavior is SAB_Error, then trying to
- * acquire an owned slot is an error.  If SAB_Block, we sleep until the
- * slot is released by the owning process.
+ * it's zero if we successfully acquired the slot, -1 if the slot no longer
+ * exists, or the PID of the owning process otherwise.  If behavior is
+ * SAB_Error, then trying to acquire an owned slot is an error.
+ * If SAB_Block, we sleep until the slot is released by the owning process.
  */
 int
 ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior)
 {
-	ReplicationSlot *slot;
+	return ReplicationSlotAcquireInternal(NULL, name, behavior);
+}
+
+/*
+ * Mark the specified slot as used by this process.
+ *
+ * Only one of slot and name can be specified.
+ * If slot == NULL, search for the slot with the given name.
+ *
+ * See comments about the return value in ReplicationSlotAcquire().
+ */
+static int
+ReplicationSlotAcquireInternal(ReplicationSlot *slot, const char *name,
+							   SlotAcquireBehavior behavior)
+{
+	ReplicationSlot *s;
 	int			active_pid;
-	int			i;
+
+	AssertArg((slot == NULL) ^ (name == NULL));
 
 retry:
 	Assert(MyReplicationSlot == NULL);
 
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+
 	/*
-	 * Search for the named slot and mark it active if we find it.  If the
-	 * slot is already active, we exit the loop with active_pid set to the PID
-	 * of the backend that owns it.
+	 * Search for the slot with the specified name if the slot to acquire is
+	 * not given. If the slot is not found, we either return -1 or error out.
 	 */
-	active_pid = 0;
-	slot = NULL;
-	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
-	for (i = 0; i < max_replication_slots; i++)
+	s = slot ? slot : SearchNamedReplicationSlot(name);
+	if (s == NULL || !s->in_use)
 	{
-		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+		LWLockRelease(ReplicationSlotControlLock);
 
-		if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
-		{
-			/*
-			 * This is the slot we want; check if it's active under some other
-			 * process.  In single user mode, we don't need this check.
-			 */
-			if (IsUnderPostmaster)
-			{
-				/*
-				 * Get ready to sleep on it in case it is active.  (We may end
-				 * up not sleeping, but we don't want to do this while holding
-				 * the spinlock.)
-				 */
-				ConditionVariablePrepareToSleep(&s->active_cv);
-
-				SpinLockAcquire(&s->mutex);
-
-				active_pid = s->active_pid;
-				if (active_pid == 0)
-					active_pid = s->active_pid = MyProcPid;
-
-				SpinLockRelease(&s->mutex);
-			}
-			else
-				active_pid = MyProcPid;
-			slot = s;
-
-			break;
-		}
-	}
-	LWLockRelease(ReplicationSlotControlLock);
-
-	/* If we did not find the slot, error out. */
-	if (slot == NULL)
+		if (behavior == SAB_Inquire)
+			return -1;
 		ereport(ERROR,
 				(errcode(ERRCODE_UNDEFINED_OBJECT),
-				 errmsg("replication slot \"%s\" does not exist", name)));
+				 errmsg("replication slot \"%s\" does not exist",
+						name ? name : NameStr(slot->data.name))));
+	}
 
 	/*
-	 * If we found the slot but it's already active in another backend, we
-	 * either error out or retry after a short wait, as caller specified.
+	 * This is the slot we want; check if it's active under some other
+	 * process.  In single user mode, we don't need this check.
+	 */
+	if (IsUnderPostmaster)
+	{
+		/*
+		 * Get ready to sleep on the slot in case it is active if SAB_Block.
+		 * (We may end up not sleeping, but we don't want to do this while
+		 * holding the spinlock.)
+		 */
+		if (behavior == SAB_Block)
+			ConditionVariablePrepareToSleep(&s->active_cv);
+
+		SpinLockAcquire(&s->mutex);
+		if (s->active_pid == 0)
+			s->active_pid = MyProcPid;
+		active_pid = s->active_pid;
+		SpinLockRelease(&s->mutex);
+	}
+	else
+		active_pid = MyProcPid;
+	LWLockRelease(ReplicationSlotControlLock);
+
+	/*
+	 * If we found the slot but it's already active in another process, we
+	 * either error out, return the PID of the owning process, or retry
+	 * after a short wait, as caller specified.
 	 */
 	if (active_pid != MyProcPid)
 	{
@@ -400,24 +443,24 @@ retry:
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_IN_USE),
 					 errmsg("replication slot \"%s\" is active for PID %d",
-							name, active_pid)));
+							NameStr(s->data.name), active_pid)));
 		else if (behavior == SAB_Inquire)
 			return active_pid;
 
 		/* Wait here until we get signaled, and then restart */
-		ConditionVariableSleep(&slot->active_cv,
+		ConditionVariableSleep(&s->active_cv,
 							   WAIT_EVENT_REPLICATION_SLOT_DROP);
 		ConditionVariableCancelSleep();
 		goto retry;
 	}
-	else
-		ConditionVariableCancelSleep(); /* no sleep needed after all */
+	else if (behavior == SAB_Block)
+		ConditionVariableCancelSleep();	/* no sleep needed after all */
 
 	/* Let everybody know we've modified this slot */
-	ConditionVariableBroadcast(&slot->active_cv);
+	ConditionVariableBroadcast(&s->active_cv);
 
 	/* We made this slot active, so it's ours now. */
-	MyReplicationSlot = slot;
+	MyReplicationSlot = s;
 
 	/* success */
 	return 0;
@@ -1100,43 +1143,82 @@ restart:
 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
 		XLogRecPtr	restart_lsn = InvalidXLogRecPtr;
 		NameData	slotname;
+		int		wspid;
+		int		last_signaled_pid = 0;
 
 		if (!s->in_use)
 			continue;
 
 		SpinLockAcquire(&s->mutex);
-		if (s->data.restart_lsn == InvalidXLogRecPtr ||
-			s->data.restart_lsn >= oldestLSN)
-		{
-			SpinLockRelease(&s->mutex);
-			continue;
-		}
-
 		slotname = s->data.name;
 		restart_lsn = s->data.restart_lsn;
-
 		SpinLockRelease(&s->mutex);
+
+		if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
+			continue;
 		LWLockRelease(ReplicationSlotControlLock);
 
+		/* Get ready to sleep on the slot in case it is active */
+		ConditionVariablePrepareToSleep(&s->active_cv);
+
 		for (;;)
 		{
-			int			wspid = ReplicationSlotAcquire(NameStr(slotname),
-													   SAB_Inquire);
+			/*
+			 * Try to mark this slot as used by this process.
+			 *
+			 * Note that ReplicationSlotAcquireInternal(SAB_Inquire)
+			 * should not cancel the prepared condition variable
+			 * if this slot is active in other process. Because in this case
+			 * we have to wait on that CV for the process owning
+			 * the slot to be terminated, later.
+			 */
+			wspid = ReplicationSlotAcquireInternal(s, NULL, SAB_Inquire);
 
-			/* no walsender? success! */
-			if (wspid == 0)
+			/*
+			 * Exit the loop if we successfully acquired the slot or
+			 * the slot was dropped during waiting for the owning process
+			 * to be terminated. For example, the latter case is likely to
+			 * happen when the slot is temporary because it's automatically
+			 * dropped by the termination of the owning process.
+			 */
+			if (wspid <= 0)
 				break;
 
-			ereport(LOG,
-					(errmsg("terminating walsender %d because replication slot \"%s\" is too far behind",
-							wspid, NameStr(slotname))));
-			(void) kill(wspid, SIGTERM);
+			/*
+			 * Signal to terminate the process that owns the slot.
+			 *
+			 * There is the race condition where other process may own
+			 * the slot after the process using it was terminated and before
+			 * this process owns it. To handle this case, we signal again
+			 * if the PID of the owning process is changed than the last.
+			 *
+			 * XXX This logic assumes that the same PID is not reused
+			 * very quickly.
+			 */
+			if (last_signaled_pid != wspid)
+			{
+				ereport(LOG,
+						(errmsg("terminating process %d because replication slot \"%s\" is too far behind",
+								wspid, NameStr(slotname))));
+				(void) kill(wspid, SIGTERM);
+				last_signaled_pid = wspid;
+			}
 
 			ConditionVariableTimedSleep(&s->active_cv, 10,
 										WAIT_EVENT_REPLICATION_SLOT_DROP);
 		}
 		ConditionVariableCancelSleep();
 
+		/*
+		 * Do nothing here and start from scratch if the slot has
+		 * already been dropped.
+		 */
+		if (wspid == -1)
+		{
+			CHECK_FOR_INTERRUPTS();
+			goto restart;
+		}
+
 		ereport(LOG,
 				(errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
 						NameStr(slotname),
#26Kyotaro Horiguchi
horikyota.ntt@gmail.com
In reply to: Kyotaro Horiguchi (#23)
Re: Review for GetWALAvailability()

At Thu, 18 Jun 2020 14:54:47 +0900, Fujii Masao <masao.fujii@oss.nttdata.com> wrote in

Sorry, this caused compiler failure. So I fixed that and
attached the updated version of the patch.

At Thu, 18 Jun 2020 14:40:55 +0900, Fujii Masao <masao.fujii@oss.nttdata.com> wrote in

+ errmsg("replication slot \"%s\" does not exist", name)));
The error message is not right when the given slot doesn't match the
given name.

This doesn't happen after applying Alvaro's patch.

If name is specified (so slot is NULL) to
ReplicationSlotAcquireInternal and the slot is not found, the ereport
in following code dereferences NULL.

====
if (s == NULL || !s->in_use)
{
LWLockRelease(ReplicationSlotControlLock);

if (behavior == SAB_Inquire)
return -1;
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("replication slot \"%s\" does not exist",
name ? name : NameStr(slot->data.name))));
}
====

regards.

--
Kyotaro Horiguchi
NTT Open Source Software Center

#27Kyotaro Horiguchi
horikyota.ntt@gmail.com
In reply to: Kyotaro Horiguchi (#26)
Re: Review for GetWALAvailability()

Mmm. I hurried too much..

At Thu, 18 Jun 2020 16:32:59 +0900 (JST), Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote in

If name is specified (so slot is NULL) to
ReplicationSlotAcquireInternal and the slot is not found, the ereport
in following code dereferences NULL.

That's bogus. It is using name in that case. Sorry for the noise.

I don't find a problem by a brief look on it.

regards.

--
Kyotaro Horiguchi
NTT Open Source Software Center

#28Fujii Masao
masao.fujii@oss.nttdata.com
In reply to: Kyotaro Horiguchi (#27)
Re: Review for GetWALAvailability()

On 2020/06/18 16:36, Kyotaro Horiguchi wrote:

Mmm. I hurried too much..

At Thu, 18 Jun 2020 16:32:59 +0900 (JST), Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote in

If name is specified (so slot is NULL) to
ReplicationSlotAcquireInternal and the slot is not found, the ereport
in following code dereferences NULL.

That's bogus. It is using name in that case. Sorry for the noise.

I don't find a problem by a brief look on it.

Thanks for the review! I pushed the patch.

Regards,

--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION

#29Alvaro Herrera
alvherre@2ndquadrant.com
In reply to: Kyotaro Horiguchi (#16)
Re: Review for GetWALAvailability()

On 2020-Jun-17, Kyotaro Horiguchi wrote:

@@ -9524,7 +9533,7 @@ GetWALAvailability(XLogRecPtr targetLSN)
* the first WAL segment file since startup, which causes the status being
* wrong under certain abnormal conditions but that doesn't actually harm.
*/
-	oldestSeg = XLogGetLastRemovedSegno() + 1;
+	oldestSeg = last_removed_seg + 1;

/* calculate oldest segment by max_wal_size and wal_keep_segments */
XLByteToSeg(currpos, currSeg, wal_segment_size);

This hunk should have updated the comment two lines above. However:

@@ -272,6 +273,14 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
rsinfo->setResult = tupstore;
rsinfo->setDesc = tupdesc;

+	/*
+	 * Remember the last removed segment at this point for the consistency in
+	 * this table. Since there's no interlock between slot data and
+	 * checkpointer, the segment can be removed in-between, but that doesn't
+	 * make any practical difference.
+	 */
+	last_removed_seg = XLogGetLastRemovedSegno();

I am mystified as to why you added this change. I understand that your
point here is to make all slots reported their state as compared to the
same LSN, but why do it like that? If a segment is removed in between,
it could mean that the view reports more lies than it would if we update
the segno for each slot. I mean, suppose two slots are lagging behind
and one is reported as 'extended' because when we compute it it's still
in range; then a segment is removed. With your coding, we'll report
both as extended, but with the original coding, we'll report the new one
as lost. By the time the user reads the result, they'll read one
incorrect report with the original code, and two incorrect reports with
your code. So ... yes it might be more consistent, but what does that
buy the user?

OTOH it makes GetWALAvailability gain a new argument, which we have to
document.

+	/*
+	 * However segments required by the slot has been lost, if walsender is
+	 * active the walsender can read into the first reserved slot.
+	 */
+	if (slot_is_active)
+		return WALAVAIL_BEING_REMOVED;

I don't understand this comment; can you please clarify what you mean?

I admit I don't like this slot_is_active argument you propose to add to
GetWALAvailability either; previously the function can be called with
an LSN coming from anywhere, not just a slot; the new argument implies
that the LSN comes from a slot. (Your proposed patch doesn't document
this one either.)

--
�lvaro Herrera https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

#30Kyotaro Horiguchi
horikyota.ntt@gmail.com
In reply to: Alvaro Herrera (#29)
1 attachment(s)
Re: Review for GetWALAvailability()

Thanks for looking this.

At Fri, 19 Jun 2020 18:23:59 -0400, Alvaro Herrera <alvherre@2ndquadrant.com> wrote in

On 2020-Jun-17, Kyotaro Horiguchi wrote:

@@ -9524,7 +9533,7 @@ GetWALAvailability(XLogRecPtr targetLSN)
* the first WAL segment file since startup, which causes the status being
* wrong under certain abnormal conditions but that doesn't actually harm.
*/
-	oldestSeg = XLogGetLastRemovedSegno() + 1;
+	oldestSeg = last_removed_seg + 1;

/* calculate oldest segment by max_wal_size and wal_keep_segments */
XLByteToSeg(currpos, currSeg, wal_segment_size);

This hunk should have updated the comment two lines above. However:

@@ -272,6 +273,14 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
rsinfo->setResult = tupstore;
rsinfo->setDesc = tupdesc;

+	/*
+	 * Remember the last removed segment at this point for the consistency in
+	 * this table. Since there's no interlock between slot data and
+	 * checkpointer, the segment can be removed in-between, but that doesn't
+	 * make any practical difference.
+	 */
+	last_removed_seg = XLogGetLastRemovedSegno();

I am mystified as to why you added this change. I understand that your
point here is to make all slots reported their state as compared to the
same LSN, but why do it like that? If a segment is removed in between,
it could mean that the view reports more lies than it would if we update
the segno for each slot. I mean, suppose two slots are lagging behind
and one is reported as 'extended' because when we compute it it's still
in range; then a segment is removed. With your coding, we'll report
both as extended, but with the original coding, we'll report the new one
as lost. By the time the user reads the result, they'll read one
incorrect report with the original code, and two incorrect reports with
your code. So ... yes it might be more consistent, but what does that
buy the user?

I agree to you. Anyway the view may show "wrong" statuses if
concurrent WAL-file removal is running. But I can understand it is
better that the numbers in a view are consistent. The change
contributes only to that point. So I noted as "doesn't make any
practical difference". Since it is going to be removed, I removed the
changes for the part.

/messages/by-id/9ddfbf8c-2f67-904d-44ed-cf8bc5916228@oss.nttdata.com

OTOH it makes GetWALAvailability gain a new argument, which we have to
document.

+	/*
+	 * However segments required by the slot has been lost, if walsender is
+	 * active the walsender can read into the first reserved slot.
+	 */
+	if (slot_is_active)
+		return WALAVAIL_BEING_REMOVED;

I don't understand this comment; can you please clarify what you mean?

I have had comments that the "lost" state should be a definite state,
that is, a state mustn't go back to other states. I had the same from
Fujii-san again.

Suppose we are starting from the following situation:

State A:
|---- seg n-1 ----|---- seg n ----|
^
X (restart_lsn of slot S) - max_slot_wal_keep_size

If the segment n-1 is removed, slot S's status becomes
"lost". However, if the walsender that is using the slot has not been
killed yet, the point X can move foward to the segment n (State B).

State B:
|XXXX seg n-1 XXXX|---- seg n ----|
^
X (restart_lsn of slot S) - max_slot_wal_keep_size

This is the normal (or extend) state. If we want to the state "lost"
to be definitive, we cannot apply the state label "lost" to State A if
it is active.

WALAVAIL_BEING_REMOVED (I noticed it has been removed for a wrong
reason so I revived it in this patch [1].) was used for the same state,
that is, the segment at restart_lsn will be removed soon but not yet.

1: /messages/by-id/20200406.185027.648866525989475817.horikyota.ntt@gmail.com

I admit I don't like this slot_is_active argument you propose to add to
GetWALAvailability either; previously the function can be called with
an LSN coming from anywhere, not just a slot; the new argument implies
that the LSN comes from a slot. (Your proposed patch doesn't document
this one either.)

Agreed. I felt like you at the time. I came up with another way after
hearing that from you.

In the attached GetWALAvailability() returns the state assuming the
walsender is not active. And the caller (pg_get_replication_slots())
considers the case where the walsender is active.

regares.

--
Kyotaro Horiguchi
NTT Open Source Software Center

Attachments:

0001-Some-fixes-of-pg_replication_slots.wal_status.patchtext/x-patch; charset=us-asciiDownload
From 74fb205fc87397671392f4f877b2466d1d19869c Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyoga.ntt@gmail.com>
Date: Mon, 15 Jun 2020 16:18:23 +0900
Subject: [PATCH] Some fixes of pg_replication_slots.wal_status

The colums is shown as "lost" when the segment at the given LSN has
been removed by a checkpoint. But in certain situation the state can
come back to "normal" state. To avoid that transition, a new state
"being lost" is added, which means "the slot is nearly losing required
WAL segments, but not definitely yet." Along with that change, other
status words are changed as "normal"->"reserved" and
"reserved"->"extended" to clarify the meaning of each word.

This also fixes the state recognition logic so that the "lost" state
persists after slot invalidation happens.
---
 doc/src/sgml/catalogs.sgml                | 24 +++++++---
 src/backend/access/transam/xlog.c         | 58 +++++++++++++----------
 src/backend/replication/slot.c            |  2 +
 src/backend/replication/slotfuncs.c       | 38 ++++++++++++---
 src/include/access/xlog.h                 |  7 +--
 src/include/replication/slot.h            |  3 ++
 src/test/recovery/t/019_replslot_limit.pl | 25 +++++-----
 7 files changed, 103 insertions(+), 54 deletions(-)

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 5a66115df1..2c0b51bbd8 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -11239,19 +11239,29 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
        Possible values are:
        <itemizedlist>
         <listitem>
-         <para><literal>normal</literal> means that the claimed files
+         <para><literal>reserved</literal> means that the claimed files
           are within <varname>max_wal_size</varname>.</para>
         </listitem>
         <listitem>
-         <para><literal>reserved</literal> means
+         <para><literal>extended</literal> means
           that <varname>max_wal_size</varname> is exceeded but the files are
-          still held, either by some replication slot or
-          by <varname>wal_keep_segments</varname>.</para>
+          still retained, either by some replication slot or
+          by <varname>wal_keep_segments</varname>.
+          </para>
         </listitem>
         <listitem>
-         <para><literal>lost</literal> means that some WAL files are
-          definitely lost and this slot cannot be used to resume replication
-          anymore.</para>
+         <para><literal>being lost</literal> means that the slot no longer
+          retains all required WAL files and some of them are to be removed at
+          the next checkpoint.  This state can return
+          to <literal>reserved</literal> or <literal>extended</literal>
+          states.
+          </para>
+        </listitem>
+        <listitem>
+          <para>
+            <literal>lost</literal> means that some required WAL files have
+            been removed and this slot is no longer usable.
+          </para>
         </listitem>
        </itemizedlist>
        The last two states are seen only when
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index a1256a103b..9d94d21d5e 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -9485,15 +9485,20 @@ CreateRestartPoint(int flags)
  *		(typically a slot's restart_lsn)
  *
  * Returns one of the following enum values:
- * * WALAVAIL_NORMAL means targetLSN is available because it is in the range
- *   of max_wal_size.
  *
- * * WALAVAIL_PRESERVED means it is still available by preserving extra
+ * * WALAVAIL_RESERVED means targetLSN is available and it is in the range of
+ *   max_wal_size.
+ *
+ * * WALAVAIL_EXTENDED means it is still available by preserving extra
  *   segments beyond max_wal_size. If max_slot_wal_keep_size is smaller
  *   than max_wal_size, this state is not returned.
  *
- * * WALAVAIL_REMOVED means it is definitely lost. A replication stream on
- *   a slot with this LSN cannot continue.
+ * * WALAVAIL_BEING_REMOVED means it is being lost and the next checkpoint will
+ *   remove reserved segments. The walsender using this slot may return to the
+ *   above.
+ *
+ * * WALAVAIL_REMOVED means it has been removed. A replication stream on
+ *   a slot with this LSN cannot continue after a restart.
  *
  * * WALAVAIL_INVALID_LSN means the slot hasn't been set to reserve WAL.
  */
@@ -9509,13 +9514,19 @@ GetWALAvailability(XLogRecPtr targetLSN)
 													 * slot */
 	uint64		keepSegs;
 
-	/* slot does not reserve WAL. Either deactivated, or has never been active */
+	/*
+	 * slot does not reserve WAL. Either deactivated, or has never been
+	 * active
+	 */
 	if (XLogRecPtrIsInvalid(targetLSN))
 		return WALAVAIL_INVALID_LSN;
 
 	currpos = GetXLogWriteRecPtr();
 
-	/* calculate oldest segment currently needed by slots */
+	/*
+	 * calculate the oldest segment currently reserved by all slots,
+	 * considering wal_keep_segments and max_slot_wal_keep_size
+	 */
 	XLByteToSeg(targetLSN, targetSeg, wal_segment_size);
 	KeepLogSeg(currpos, &oldestSlotSeg);
 
@@ -9526,10 +9537,9 @@ GetWALAvailability(XLogRecPtr targetLSN)
 	 */
 	oldestSeg = XLogGetLastRemovedSegno() + 1;
 
-	/* calculate oldest segment by max_wal_size and wal_keep_segments */
+	/* calculate oldest segment by max_wal_size */
 	XLByteToSeg(currpos, currSeg, wal_segment_size);
-	keepSegs = ConvertToXSegs(Max(max_wal_size_mb, wal_keep_segments),
-							  wal_segment_size) + 1;
+	keepSegs = ConvertToXSegs(max_wal_size_mb, wal_segment_size) + 1;
 
 	if (currSeg > keepSegs)
 		oldestSegMaxWalSize = currSeg - keepSegs;
@@ -9537,27 +9547,23 @@ GetWALAvailability(XLogRecPtr targetLSN)
 		oldestSegMaxWalSize = 1;
 
 	/*
-	 * If max_slot_wal_keep_size has changed after the last call, the segment
-	 * that would been kept by the current setting might have been lost by the
-	 * previous setting. No point in showing normal or keeping status values
-	 * if the targetSeg is known to be lost.
+	 * No point in returning reserved or extended status values if the
+	 * targetSeg is known to be lost.
 	 */
-	if (targetSeg >= oldestSeg)
+	if (targetSeg >= oldestSlotSeg)
 	{
-		/*
-		 * show "normal" when targetSeg is within max_wal_size, even if
-		 * max_slot_wal_keep_size is smaller than max_wal_size.
-		 */
-		if ((max_slot_wal_keep_size_mb <= 0 ||
-			 max_slot_wal_keep_size_mb >= max_wal_size_mb) &&
-			oldestSegMaxWalSize <= targetSeg)
-			return WALAVAIL_NORMAL;
-
-		/* being retained by slots */
-		if (oldestSlotSeg <= targetSeg)
+		/* show "reserved" when targetSeg is within max_wal_size */
+		if (targetSeg >= oldestSegMaxWalSize)
 			return WALAVAIL_RESERVED;
+
+		/* being retained by slots exceeding max_wal_size */
+		return WALAVAIL_EXTENDED;
 	}
 
+	/* WAL segments are no longer retained but haven't been removed yet */
+	if (targetSeg >= oldestSeg)
+		return WALAVAIL_BEING_REMOVED;
+
 	/* Definitely lost */
 	return WALAVAIL_REMOVED;
 }
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index a7bbcf3499..a1786489ad 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -288,6 +288,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	slot->candidate_xmin_lsn = InvalidXLogRecPtr;
 	slot->candidate_restart_valid = InvalidXLogRecPtr;
 	slot->candidate_restart_lsn = InvalidXLogRecPtr;
+	slot->last_invalidated_lsn = InvalidXLogRecPtr;
 
 	/*
 	 * Create the slot on disk.  We haven't actually marked the slot allocated
@@ -1226,6 +1227,7 @@ restart:
 						(uint32) restart_lsn)));
 
 		SpinLockAcquire(&s->mutex);
+		s->last_invalidated_lsn = s->data.restart_lsn;
 		s->data.restart_lsn = InvalidXLogRecPtr;
 		SpinLockRelease(&s->mutex);
 		ReplicationSlotRelease();
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 06e4955de7..3deccf3448 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -283,6 +283,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		bool		nulls[PG_GET_REPLICATION_SLOTS_COLS];
 		WALAvailability walstate;
 		XLogSegNo	last_removed_seg;
+		XLogRecPtr	targetLSN;
 		int			i;
 
 		if (!slot->in_use)
@@ -342,7 +343,13 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		else
 			nulls[i++] = true;
 
-		walstate = GetWALAvailability(slot_contents.data.restart_lsn);
+		/* use last_invalidated_lsn when the slot is invalidated */
+		if (XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
+			targetLSN = slot_contents.last_invalidated_lsn;
+		else
+			targetLSN = slot_contents.data.restart_lsn;
+
+		walstate = GetWALAvailability(targetLSN);
 
 		switch (walstate)
 		{
@@ -350,16 +357,33 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 				nulls[i++] = true;
 				break;
 
-			case WALAVAIL_NORMAL:
-				values[i++] = CStringGetTextDatum("normal");
-				break;
-
 			case WALAVAIL_RESERVED:
 				values[i++] = CStringGetTextDatum("reserved");
 				break;
 
+			case WALAVAIL_EXTENDED:
+				values[i++] = CStringGetTextDatum("extended");
+				break;
+
+			case WALAVAIL_BEING_REMOVED:
+				values[i++] = CStringGetTextDatum("being lost");
+				break;
+
 			case WALAVAIL_REMOVED:
-				values[i++] = CStringGetTextDatum("lost");
+				/*
+				 * If the segment that walsender is currently reading has been
+				 * just removed, but the walsender goes into the next segment
+				 * just after, the state goes back to "reserved" or
+				 * "extended". We regard that state as "being lost", rather
+				 * than "lost" since the slot has not definitely dead yet. The
+				 * same can happen when walsender is immediately restarted
+				 * after invalidation.
+				 */
+				if (slot_contents.active_pid != 0)
+					values[i++] = CStringGetTextDatum("being lost");
+				else
+					values[i++] = CStringGetTextDatum("lost");
+
 				break;
 
 			default:
@@ -367,7 +391,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		}
 
 		if (max_slot_wal_keep_size_mb >= 0 &&
-			(walstate == WALAVAIL_NORMAL || walstate == WALAVAIL_RESERVED) &&
+			(walstate == WALAVAIL_RESERVED || walstate == WALAVAIL_EXTENDED) &&
 			((last_removed_seg = XLogGetLastRemovedSegno()) != 0))
 		{
 			XLogRecPtr	min_safe_lsn;
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 347a38f57c..85c1f67e57 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -270,8 +270,9 @@ extern CheckpointStatsData CheckpointStats;
 typedef enum WALAvailability
 {
 	WALAVAIL_INVALID_LSN,		/* parameter error */
-	WALAVAIL_NORMAL,			/* WAL segment is within max_wal_size */
-	WALAVAIL_RESERVED,			/* WAL segment is reserved by a slot */
+	WALAVAIL_RESERVED,			/* WAL segment is within max_wal_size */
+	WALAVAIL_EXTENDED,			/* WAL segment is reserved by a slot */
+	WALAVAIL_BEING_REMOVED,		/* WAL segment is being removed */
 	WALAVAIL_REMOVED			/* WAL segment has been removed */
 } WALAvailability;
 
@@ -326,7 +327,7 @@ extern void ShutdownXLOG(int code, Datum arg);
 extern void InitXLOGAccess(void);
 extern void CreateCheckPoint(int flags);
 extern bool CreateRestartPoint(int flags);
-extern WALAvailability GetWALAvailability(XLogRecPtr restart_lsn);
+extern WALAvailability GetWALAvailability(XLogRecPtr targetLSN);
 extern XLogRecPtr CalculateMaxmumSafeLSN(void);
 extern void XLogPutNextOid(Oid nextOid);
 extern XLogRecPtr XLogRestorePoint(const char *rpName);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 917876010e..8090ca81fe 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -156,6 +156,9 @@ typedef struct ReplicationSlot
 	XLogRecPtr	candidate_xmin_lsn;
 	XLogRecPtr	candidate_restart_valid;
 	XLogRecPtr	candidate_restart_lsn;
+
+	/* restart_lsn is copied here when the slot is invalidated */
+	XLogRecPtr	last_invalidated_lsn;
 } ReplicationSlot;
 
 #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
diff --git a/src/test/recovery/t/019_replslot_limit.pl b/src/test/recovery/t/019_replslot_limit.pl
index cba7df920c..ac0059bc7e 100644
--- a/src/test/recovery/t/019_replslot_limit.pl
+++ b/src/test/recovery/t/019_replslot_limit.pl
@@ -56,7 +56,7 @@ $node_standby->stop;
 $result = $node_master->safe_psql('postgres',
 	"SELECT wal_status, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"
 );
-is($result, "normal|t", 'check the catching-up state');
+is($result, "reserved|t", 'check the catching-up state');
 
 # Advance WAL by five segments (= 5MB) on master
 advance_wal($node_master, 1);
@@ -66,7 +66,7 @@ $node_master->safe_psql('postgres', "CHECKPOINT;");
 $result = $node_master->safe_psql('postgres',
 	"SELECT wal_status, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"
 );
-is($result, "normal|t", 'check that it is safe if WAL fits in max_wal_size');
+is($result, "reserved|t", 'check that it is safe if WAL fits in max_wal_size');
 
 advance_wal($node_master, 4);
 $node_master->safe_psql('postgres', "CHECKPOINT;");
@@ -75,7 +75,7 @@ $node_master->safe_psql('postgres', "CHECKPOINT;");
 $result = $node_master->safe_psql('postgres',
 	"SELECT wal_status, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"
 );
-is($result, "normal|t", 'check that slot is working');
+is($result, "reserved|t", 'check that slot is working');
 
 # The standby can reconnect to master
 $node_standby->start;
@@ -99,7 +99,7 @@ $node_master->reload;
 
 $result = $node_master->safe_psql('postgres',
 	"SELECT wal_status FROM pg_replication_slots WHERE slot_name = 'rep1'");
-is($result, "normal", 'check that max_slot_wal_keep_size is working');
+is($result, "reserved", 'check that max_slot_wal_keep_size is working');
 
 # Advance WAL again then checkpoint, reducing remain by 2 MB.
 advance_wal($node_master, 2);
@@ -108,7 +108,7 @@ $node_master->safe_psql('postgres', "CHECKPOINT;");
 # The slot is still working
 $result = $node_master->safe_psql('postgres',
 	"SELECT wal_status FROM pg_replication_slots WHERE slot_name = 'rep1'");
-is($result, "normal",
+is($result, "reserved",
 	'check that min_safe_lsn gets close to the current LSN');
 
 # The standby can reconnect to master
@@ -125,7 +125,7 @@ advance_wal($node_master, 6);
 $result = $node_master->safe_psql('postgres',
 	"SELECT wal_status as remain FROM pg_replication_slots WHERE slot_name = 'rep1'"
 );
-is($result, "normal",
+is($result, "extended",
 	'check that wal_keep_segments overrides max_slot_wal_keep_size');
 # restore wal_keep_segments
 $result = $node_master->safe_psql('postgres',
@@ -138,12 +138,15 @@ $node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
 $node_standby->stop;
 
 # Advance WAL again without checkpoint, reducing remain by 6 MB.
+$result = $node_master->safe_psql('postgres',
+								  "SELECT wal_status, restart_lsn, min_safe_lsn FROM pg_replication_slots WHERE slot_name = 'rep1'");
+print $result, "\n";
 advance_wal($node_master, 6);
 
 # Slot gets into 'reserved' state
 $result = $node_master->safe_psql('postgres',
 	"SELECT wal_status FROM pg_replication_slots WHERE slot_name = 'rep1'");
-is($result, "reserved", 'check that the slot state changes to "reserved"');
+is($result, "extended", 'check that the slot state changes to "extended"');
 
 # do checkpoint so that the next checkpoint runs too early
 $node_master->safe_psql('postgres', "CHECKPOINT;");
@@ -151,11 +154,11 @@ $node_master->safe_psql('postgres', "CHECKPOINT;");
 # Advance WAL again without checkpoint; remain goes to 0.
 advance_wal($node_master, 1);
 
-# Slot gets into 'lost' state
+# Slot gets into 'being lost' state
 $result = $node_master->safe_psql('postgres',
 	"SELECT wal_status, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"
 );
-is($result, "lost|t", 'check that the slot state changes to "lost"');
+is($result, "being lost|t", 'check that the slot state changes to "being lost"');
 
 # The standby still can connect to master before a checkpoint
 $node_standby->start;
@@ -186,7 +189,7 @@ ok( find_in_log(
 $result = $node_master->safe_psql('postgres',
 	"SELECT slot_name, active, restart_lsn IS NULL, wal_status, min_safe_lsn FROM pg_replication_slots WHERE slot_name = 'rep1'"
 );
-is($result, "rep1|f|t||", 'check that the slot became inactive');
+is($result, "rep1|f|t|lost|", 'check that the slot became inactive and the state "lost" persists');
 
 # The standby no longer can connect to the master
 $logstart = get_log_size($node_standby);
@@ -259,7 +262,7 @@ sub advance_wal
 	for (my $i = 0; $i < $n; $i++)
 	{
 		$node->safe_psql('postgres',
-			"CREATE TABLE t (); DROP TABLE t; SELECT pg_switch_wal();");
+						 "CREATE TABLE t (); DROP TABLE t; SELECT pg_switch_wal();");
 	}
 	return;
 }
-- 
2.18.4

#31Alvaro Herrera
alvherre@2ndquadrant.com
In reply to: Kyotaro Horiguchi (#6)
1 attachment(s)
Re: Review for GetWALAvailability()

On 2020-Jun-16, Kyotaro Horiguchi wrote:

I saw that the "reserved" is the state where slots are working to
retain segments, and "normal" is the state to indicate that "WAL
segments are within max_wal_size", which is orthogonal to the notion
of "reserved". So it seems to me useless when the retained WAL
segments cannot exceeds max_wal_size.

With longer description they would be:

"reserved under max_wal_size"
"reserved over max_wal_size"
"lost some segements"

Come to think of that, I realized that my trouble was just the
wording. Are the following wordings make sense to you?

"reserved" - retained within max_wal_size
"extended" - retained over max_wal_size
"lost" - lost some segments

So let's add Unreserved to denote the state that it's over the slot size
but no segments have been removed yet:

* Reserved under max_wal_size
* Extended past max_wal_size, but still within wal_keep_segments or
maximum slot size.
* Unreserved Past wal_keep_segments and the maximum slot size, but
not yet removed. Recoverable condition.
* Lost lost segments. Unrecoverable condition.

It seems better to me to save the invalidation LSN in the persistent
data rather than the in-memory data that's lost on restart. As is, we
would lose the status in a restart, which doesn't seem good to me. It's
just eight extra bytes to write ... should be pretty much free.

This version I propose is based on the one you posted earlier today and
is what I propose for commit.

Thanks!

--
�lvaro Herrera https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

Attachments:

wal_status.patchtext/x-diff; charset=us-asciiDownload
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 700271fd40..8945959f9d 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -11239,19 +11239,29 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
        Possible values are:
        <itemizedlist>
         <listitem>
-         <para><literal>normal</literal> means that the claimed files
+         <para><literal>reserved</literal> means that the claimed files
           are within <varname>max_wal_size</varname>.</para>
         </listitem>
         <listitem>
-         <para><literal>reserved</literal> means
+         <para><literal>extended</literal> means
           that <varname>max_wal_size</varname> is exceeded but the files are
-          still held, either by some replication slot or
-          by <varname>wal_keep_segments</varname>.</para>
+          still retained, either by the replication slot or
+          by <varname>wal_keep_segments</varname>.
+         </para>
         </listitem>
         <listitem>
-         <para><literal>lost</literal> means that some WAL files are
-          definitely lost and this slot cannot be used to resume replication
-          anymore.</para>
+         <para>
+          <literal>unreserved</literal> means that the slot no longer
+          retains the required WAL files and some of them are to be removed at
+          the next checkpoint.  This state can return
+          to <literal>reserved</literal> or <literal>extended</literal>.
+         </para>
+        </listitem>
+        <listitem>
+         <para>
+          <literal>lost</literal> means that some required WAL files have
+          been removed and this slot is no longer usable.
+         </para>
         </listitem>
        </itemizedlist>
        The last two states are seen only when
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 34ede80c44..e455384b5b 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -9488,15 +9488,20 @@ CreateRestartPoint(int flags)
  *		(typically a slot's restart_lsn)
  *
  * Returns one of the following enum values:
- * * WALAVAIL_NORMAL means targetLSN is available because it is in the range
- *   of max_wal_size.
  *
- * * WALAVAIL_PRESERVED means it is still available by preserving extra
+ * * WALAVAIL_RESERVED means targetLSN is available and it is in the range of
+ *   max_wal_size.
+ *
+ * * WALAVAIL_EXTENDED means it is still available by preserving extra
  *   segments beyond max_wal_size. If max_slot_wal_keep_size is smaller
  *   than max_wal_size, this state is not returned.
  *
- * * WALAVAIL_REMOVED means it is definitely lost. A replication stream on
- *   a slot with this LSN cannot continue.
+ * * WALAVAIL_UNRESERVED means it is being lost and the next checkpoint will
+ *   remove reserved segments. The walsender using this slot may return to the
+ *   above.
+ *
+ * * WALAVAIL_REMOVED means it has been removed. A replication stream on
+ *   a slot with this LSN cannot continue after a restart.
  *
  * * WALAVAIL_INVALID_LSN means the slot hasn't been set to reserve WAL.
  */
@@ -9512,13 +9517,18 @@ GetWALAvailability(XLogRecPtr targetLSN)
 													 * slot */
 	uint64		keepSegs;
 
-	/* slot does not reserve WAL. Either deactivated, or has never been active */
+	/*
+	 * slot does not reserve WAL. Either deactivated, or has never been active
+	 */
 	if (XLogRecPtrIsInvalid(targetLSN))
 		return WALAVAIL_INVALID_LSN;
 
 	currpos = GetXLogWriteRecPtr();
 
-	/* calculate oldest segment currently needed by slots */
+	/*
+	 * calculate the oldest segment currently reserved by all slots,
+	 * considering wal_keep_segments and max_slot_wal_keep_size
+	 */
 	XLByteToSeg(targetLSN, targetSeg, wal_segment_size);
 	KeepLogSeg(currpos, &oldestSlotSeg);
 
@@ -9529,10 +9539,9 @@ GetWALAvailability(XLogRecPtr targetLSN)
 	 */
 	oldestSeg = XLogGetLastRemovedSegno() + 1;
 
-	/* calculate oldest segment by max_wal_size and wal_keep_segments */
+	/* calculate oldest segment by max_wal_size */
 	XLByteToSeg(currpos, currSeg, wal_segment_size);
-	keepSegs = ConvertToXSegs(Max(max_wal_size_mb, wal_keep_segments),
-							  wal_segment_size) + 1;
+	keepSegs = ConvertToXSegs(max_wal_size_mb, wal_segment_size) + 1;
 
 	if (currSeg > keepSegs)
 		oldestSegMaxWalSize = currSeg - keepSegs;
@@ -9540,27 +9549,23 @@ GetWALAvailability(XLogRecPtr targetLSN)
 		oldestSegMaxWalSize = 1;
 
 	/*
-	 * If max_slot_wal_keep_size has changed after the last call, the segment
-	 * that would been kept by the current setting might have been lost by the
-	 * previous setting. No point in showing normal or keeping status values
-	 * if the targetSeg is known to be lost.
+	 * No point in returning reserved or extended status values if the
+	 * targetSeg is known to be lost.
 	 */
-	if (targetSeg >= oldestSeg)
+	if (targetSeg >= oldestSlotSeg)
 	{
-		/*
-		 * show "normal" when targetSeg is within max_wal_size, even if
-		 * max_slot_wal_keep_size is smaller than max_wal_size.
-		 */
-		if ((max_slot_wal_keep_size_mb <= 0 ||
-			 max_slot_wal_keep_size_mb >= max_wal_size_mb) &&
-			oldestSegMaxWalSize <= targetSeg)
-			return WALAVAIL_NORMAL;
-
-		/* being retained by slots */
-		if (oldestSlotSeg <= targetSeg)
+		/* show "reserved" when targetSeg is within max_wal_size */
+		if (targetSeg >= oldestSegMaxWalSize)
 			return WALAVAIL_RESERVED;
+
+		/* being retained by slots exceeding max_wal_size */
+		return WALAVAIL_EXTENDED;
 	}
 
+	/* WAL segments are no longer retained but haven't been removed yet */
+	if (targetSeg >= oldestSeg)
+		return WALAVAIL_UNRESERVED;
+
 	/* Definitely lost */
 	return WALAVAIL_REMOVED;
 }
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index a7bbcf3499..e8761f3a18 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1226,6 +1226,7 @@ restart:
 						(uint32) restart_lsn)));
 
 		SpinLockAcquire(&s->mutex);
+		s->data.invalidated_at = s->data.restart_lsn;
 		s->data.restart_lsn = InvalidXLogRecPtr;
 		SpinLockRelease(&s->mutex);
 		ReplicationSlotRelease();
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 06e4955de7..df854bc6e3 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -283,6 +283,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		bool		nulls[PG_GET_REPLICATION_SLOTS_COLS];
 		WALAvailability walstate;
 		XLogSegNo	last_removed_seg;
+		XLogRecPtr	targetLSN;
 		int			i;
 
 		if (!slot->in_use)
@@ -342,7 +343,15 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		else
 			nulls[i++] = true;
 
-		walstate = GetWALAvailability(slot_contents.data.restart_lsn);
+		/*
+		 * Report availability from invalidated_at when the slot has been
+		 * invalidated; otherwise slots would appear as invalid without any
+		 * more clues as to what happened.
+		 */
+		targetLSN = XLogRecPtrIsInvalid(slot_contents.data.restart_lsn) ?
+			slot_contents.data.invalidated_at :
+			slot_contents.data.restart_lsn;
+		walstate = GetWALAvailability(targetLSN);
 
 		switch (walstate)
 		{
@@ -350,24 +359,47 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 				nulls[i++] = true;
 				break;
 
-			case WALAVAIL_NORMAL:
-				values[i++] = CStringGetTextDatum("normal");
-				break;
-
 			case WALAVAIL_RESERVED:
 				values[i++] = CStringGetTextDatum("reserved");
 				break;
 
-			case WALAVAIL_REMOVED:
-				values[i++] = CStringGetTextDatum("lost");
+			case WALAVAIL_EXTENDED:
+				values[i++] = CStringGetTextDatum("extended");
 				break;
 
-			default:
-				elog(ERROR, "invalid walstate: %d", (int) walstate);
+			case WALAVAIL_UNRESERVED:
+				values[i++] = CStringGetTextDatum("unreserved");
+				break;
+
+			case WALAVAIL_REMOVED:
+
+				/*
+				 * If we read the restart_lsn long enough ago, maybe that file
+				 * has been removed by now.  However, the walsender could have
+				 * moved forward enough that it jumped to another file after
+				 * we looked.  If checkpointer signalled the process to
+				 * termination, then it's definitely lost; but if a process is
+				 * still alive, then "unreserved" seems more appropriate.
+				 */
+				if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
+				{
+					int			pid;
+
+					SpinLockAcquire(&slot->mutex);
+					pid = slot->active_pid;
+					SpinLockRelease(&slot->mutex);
+					if (pid != 0)
+					{
+						values[i++] = CStringGetTextDatum("unreserved");
+						break;
+					}
+				}
+				values[i++] = CStringGetTextDatum("lost");
+				break;
 		}
 
 		if (max_slot_wal_keep_size_mb >= 0 &&
-			(walstate == WALAVAIL_NORMAL || walstate == WALAVAIL_RESERVED) &&
+			(walstate == WALAVAIL_RESERVED || walstate == WALAVAIL_EXTENDED) &&
 			((last_removed_seg = XLogGetLastRemovedSegno()) != 0))
 		{
 			XLogRecPtr	min_safe_lsn;
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 347a38f57c..77ac4e785f 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -270,8 +270,10 @@ extern CheckpointStatsData CheckpointStats;
 typedef enum WALAvailability
 {
 	WALAVAIL_INVALID_LSN,		/* parameter error */
-	WALAVAIL_NORMAL,			/* WAL segment is within max_wal_size */
-	WALAVAIL_RESERVED,			/* WAL segment is reserved by a slot */
+	WALAVAIL_RESERVED,			/* WAL segment is within max_wal_size */
+	WALAVAIL_EXTENDED,			/* WAL segment is reserved by a slot or
+								 * wal_keep_segments */
+	WALAVAIL_UNRESERVED,		/* no longer reserved, but not removed yet */
 	WALAVAIL_REMOVED			/* WAL segment has been removed */
 } WALAvailability;
 
@@ -326,7 +328,7 @@ extern void ShutdownXLOG(int code, Datum arg);
 extern void InitXLOGAccess(void);
 extern void CreateCheckPoint(int flags);
 extern bool CreateRestartPoint(int flags);
-extern WALAvailability GetWALAvailability(XLogRecPtr restart_lsn);
+extern WALAvailability GetWALAvailability(XLogRecPtr targetLSN);
 extern XLogRecPtr CalculateMaxmumSafeLSN(void);
 extern void XLogPutNextOid(Oid nextOid);
 extern XLogRecPtr XLogRestorePoint(const char *rpName);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 917876010e..31362585ec 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -79,6 +79,9 @@ typedef struct ReplicationSlotPersistentData
 	/* oldest LSN that might be required by this replication slot */
 	XLogRecPtr	restart_lsn;
 
+	/* restart_lsn is copied here when the slot is invalidated */
+	XLogRecPtr	invalidated_at;
+
 	/*
 	 * Oldest LSN that the client has acked receipt for.  This is used as the
 	 * start_lsn point in case the client doesn't specify one, and also as a
diff --git a/src/test/recovery/t/019_replslot_limit.pl b/src/test/recovery/t/019_replslot_limit.pl
index cba7df920c..4e6ad86bc3 100644
--- a/src/test/recovery/t/019_replslot_limit.pl
+++ b/src/test/recovery/t/019_replslot_limit.pl
@@ -56,7 +56,7 @@ $node_standby->stop;
 $result = $node_master->safe_psql('postgres',
 	"SELECT wal_status, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"
 );
-is($result, "normal|t", 'check the catching-up state');
+is($result, "reserved|t", 'check the catching-up state');
 
 # Advance WAL by five segments (= 5MB) on master
 advance_wal($node_master, 1);
@@ -66,7 +66,8 @@ $node_master->safe_psql('postgres', "CHECKPOINT;");
 $result = $node_master->safe_psql('postgres',
 	"SELECT wal_status, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"
 );
-is($result, "normal|t", 'check that it is safe if WAL fits in max_wal_size');
+is($result, "reserved|t",
+	'check that it is safe if WAL fits in max_wal_size');
 
 advance_wal($node_master, 4);
 $node_master->safe_psql('postgres', "CHECKPOINT;");
@@ -75,7 +76,7 @@ $node_master->safe_psql('postgres', "CHECKPOINT;");
 $result = $node_master->safe_psql('postgres',
 	"SELECT wal_status, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"
 );
-is($result, "normal|t", 'check that slot is working');
+is($result, "reserved|t", 'check that slot is working');
 
 # The standby can reconnect to master
 $node_standby->start;
@@ -99,7 +100,7 @@ $node_master->reload;
 
 $result = $node_master->safe_psql('postgres',
 	"SELECT wal_status FROM pg_replication_slots WHERE slot_name = 'rep1'");
-is($result, "normal", 'check that max_slot_wal_keep_size is working');
+is($result, "reserved", 'check that max_slot_wal_keep_size is working');
 
 # Advance WAL again then checkpoint, reducing remain by 2 MB.
 advance_wal($node_master, 2);
@@ -108,7 +109,7 @@ $node_master->safe_psql('postgres', "CHECKPOINT;");
 # The slot is still working
 $result = $node_master->safe_psql('postgres',
 	"SELECT wal_status FROM pg_replication_slots WHERE slot_name = 'rep1'");
-is($result, "normal",
+is($result, "reserved",
 	'check that min_safe_lsn gets close to the current LSN');
 
 # The standby can reconnect to master
@@ -125,7 +126,7 @@ advance_wal($node_master, 6);
 $result = $node_master->safe_psql('postgres',
 	"SELECT wal_status as remain FROM pg_replication_slots WHERE slot_name = 'rep1'"
 );
-is($result, "normal",
+is($result, "extended",
 	'check that wal_keep_segments overrides max_slot_wal_keep_size');
 # restore wal_keep_segments
 $result = $node_master->safe_psql('postgres',
@@ -138,12 +139,16 @@ $node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
 $node_standby->stop;
 
 # Advance WAL again without checkpoint, reducing remain by 6 MB.
+$result = $node_master->safe_psql('postgres',
+	"SELECT wal_status, restart_lsn, min_safe_lsn FROM pg_replication_slots WHERE slot_name = 'rep1'"
+);
+print $result, "\n";
 advance_wal($node_master, 6);
 
 # Slot gets into 'reserved' state
 $result = $node_master->safe_psql('postgres',
 	"SELECT wal_status FROM pg_replication_slots WHERE slot_name = 'rep1'");
-is($result, "reserved", 'check that the slot state changes to "reserved"');
+is($result, "extended", 'check that the slot state changes to "extended"');
 
 # do checkpoint so that the next checkpoint runs too early
 $node_master->safe_psql('postgres', "CHECKPOINT;");
@@ -151,11 +156,12 @@ $node_master->safe_psql('postgres', "CHECKPOINT;");
 # Advance WAL again without checkpoint; remain goes to 0.
 advance_wal($node_master, 1);
 
-# Slot gets into 'lost' state
+# Slot gets into 'unreserved' state
 $result = $node_master->safe_psql('postgres',
 	"SELECT wal_status, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"
 );
-is($result, "lost|t", 'check that the slot state changes to "lost"');
+is($result, "unreserved|t",
+	'check that the slot state changes to "unreserved"');
 
 # The standby still can connect to master before a checkpoint
 $node_standby->start;
@@ -186,7 +192,8 @@ ok( find_in_log(
 $result = $node_master->safe_psql('postgres',
 	"SELECT slot_name, active, restart_lsn IS NULL, wal_status, min_safe_lsn FROM pg_replication_slots WHERE slot_name = 'rep1'"
 );
-is($result, "rep1|f|t||", 'check that the slot became inactive');
+is($result, "rep1|f|t|lost|",
+	'check that the slot became inactive and the state "lost" persists');
 
 # The standby no longer can connect to the master
 $logstart = get_log_size($node_standby);
#32Kyotaro Horiguchi
horikyota.ntt@gmail.com
In reply to: Alvaro Herrera (#31)
1 attachment(s)
Re: Review for GetWALAvailability()

At Tue, 23 Jun 2020 19:06:25 -0400, Alvaro Herrera <alvherre@2ndquadrant.com> wrote in

On 2020-Jun-16, Kyotaro Horiguchi wrote:

I saw that the "reserved" is the state where slots are working to
retain segments, and "normal" is the state to indicate that "WAL
segments are within max_wal_size", which is orthogonal to the notion
of "reserved". So it seems to me useless when the retained WAL
segments cannot exceeds max_wal_size.

With longer description they would be:

"reserved under max_wal_size"
"reserved over max_wal_size"
"lost some segements"

Come to think of that, I realized that my trouble was just the
wording. Are the following wordings make sense to you?

"reserved" - retained within max_wal_size
"extended" - retained over max_wal_size
"lost" - lost some segments

So let's add Unreserved to denote the state that it's over the slot size
but no segments have been removed yet:

Oh! Thanks for the more proper word. It looks good to me.

* Reserved under max_wal_size
* Extended past max_wal_size, but still within wal_keep_segments or
maximum slot size.
* Unreserved Past wal_keep_segments and the maximum slot size, but
not yet removed. Recoverable condition.
* Lost lost segments. Unrecoverable condition.

Look good, too.

It seems better to me to save the invalidation LSN in the persistent
data rather than the in-memory data that's lost on restart. As is, we
would lose the status in a restart, which doesn't seem good to me. It's
just eight extra bytes to write ... should be pretty much free.

Agreed.

This version I propose is based on the one you posted earlier today and
is what I propose for commit.

-	/* slot does not reserve WAL. Either deactivated, or has never been active */
+	/*
+	 * slot does not reserve WAL. Either deactivated, or has never been active
+	 */

Sorry, this is my fault. The change is useless. The code for
WALAVAIL_REMOVED looks good.

 # Advance WAL again without checkpoint, reducing remain by 6 MB.
+$result = $node_master->safe_psql('postgres',
+	"SELECT wal_status, restart_lsn, min_safe_lsn FROM pg_replication_slots WHERE slot_name = 'rep1'"
+);
+print $result, "\n";

Sorry this is my fault, too. Removed in the attached.

regards.

--
Kyotaro Horiguchi
NTT Open Source Software Center

Attachments:

wal_status_v2.patchtext/x-patch; charset=us-asciiDownload
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 5a66115df1..49a881b262 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -11239,19 +11239,29 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
        Possible values are:
        <itemizedlist>
         <listitem>
-         <para><literal>normal</literal> means that the claimed files
+         <para><literal>reserved</literal> means that the claimed files
           are within <varname>max_wal_size</varname>.</para>
         </listitem>
         <listitem>
-         <para><literal>reserved</literal> means
+         <para><literal>extended</literal> means
           that <varname>max_wal_size</varname> is exceeded but the files are
-          still held, either by some replication slot or
-          by <varname>wal_keep_segments</varname>.</para>
+          still retained, either by the replication slot or
+          by <varname>wal_keep_segments</varname>.
+         </para>
         </listitem>
         <listitem>
-         <para><literal>lost</literal> means that some WAL files are
-          definitely lost and this slot cannot be used to resume replication
-          anymore.</para>
+         <para>
+          <literal>unreserved</literal> means that the slot no longer
+          retains the required WAL files and some of them are to be removed at
+          the next checkpoint.  This state can return
+          to <literal>reserved</literal> or <literal>extended</literal>.
+         </para>
+        </listitem>
+        <listitem>
+         <para>
+          <literal>lost</literal> means that some required WAL files have
+          been removed and this slot is no longer usable.
+         </para>
         </listitem>
        </itemizedlist>
        The last two states are seen only when
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index a1256a103b..4a4bb30418 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -9485,15 +9485,20 @@ CreateRestartPoint(int flags)
  *		(typically a slot's restart_lsn)
  *
  * Returns one of the following enum values:
- * * WALAVAIL_NORMAL means targetLSN is available because it is in the range
- *   of max_wal_size.
  *
- * * WALAVAIL_PRESERVED means it is still available by preserving extra
+ * * WALAVAIL_RESERVED means targetLSN is available and it is in the range of
+ *   max_wal_size.
+ *
+ * * WALAVAIL_EXTENDED means it is still available by preserving extra
  *   segments beyond max_wal_size. If max_slot_wal_keep_size is smaller
  *   than max_wal_size, this state is not returned.
  *
- * * WALAVAIL_REMOVED means it is definitely lost. A replication stream on
- *   a slot with this LSN cannot continue.
+ * * WALAVAIL_UNRESERVED means it is being lost and the next checkpoint will
+ *   remove reserved segments. The walsender using this slot may return to the
+ *   above.
+ *
+ * * WALAVAIL_REMOVED means it has been removed. A replication stream on
+ *   a slot with this LSN cannot continue after a restart.
  *
  * * WALAVAIL_INVALID_LSN means the slot hasn't been set to reserve WAL.
  */
@@ -9509,13 +9514,18 @@ GetWALAvailability(XLogRecPtr targetLSN)
 													 * slot */
 	uint64		keepSegs;
 
-	/* slot does not reserve WAL. Either deactivated, or has never been active */
+	/*
+	 * slot does not reserve WAL. Either deactivated, or has never been active
+	 */
 	if (XLogRecPtrIsInvalid(targetLSN))
 		return WALAVAIL_INVALID_LSN;
 
 	currpos = GetXLogWriteRecPtr();
 
-	/* calculate oldest segment currently needed by slots */
+	/*
+	 * calculate the oldest segment currently reserved by all slots,
+	 * considering wal_keep_segments and max_slot_wal_keep_size
+	 */
 	XLByteToSeg(targetLSN, targetSeg, wal_segment_size);
 	KeepLogSeg(currpos, &oldestSlotSeg);
 
@@ -9526,10 +9536,9 @@ GetWALAvailability(XLogRecPtr targetLSN)
 	 */
 	oldestSeg = XLogGetLastRemovedSegno() + 1;
 
-	/* calculate oldest segment by max_wal_size and wal_keep_segments */
+	/* calculate oldest segment by max_wal_size */
 	XLByteToSeg(currpos, currSeg, wal_segment_size);
-	keepSegs = ConvertToXSegs(Max(max_wal_size_mb, wal_keep_segments),
-							  wal_segment_size) + 1;
+	keepSegs = ConvertToXSegs(max_wal_size_mb, wal_segment_size) + 1;
 
 	if (currSeg > keepSegs)
 		oldestSegMaxWalSize = currSeg - keepSegs;
@@ -9537,27 +9546,23 @@ GetWALAvailability(XLogRecPtr targetLSN)
 		oldestSegMaxWalSize = 1;
 
 	/*
-	 * If max_slot_wal_keep_size has changed after the last call, the segment
-	 * that would been kept by the current setting might have been lost by the
-	 * previous setting. No point in showing normal or keeping status values
-	 * if the targetSeg is known to be lost.
+	 * No point in returning reserved or extended status values if the
+	 * targetSeg is known to be lost.
 	 */
-	if (targetSeg >= oldestSeg)
+	if (targetSeg >= oldestSlotSeg)
 	{
-		/*
-		 * show "normal" when targetSeg is within max_wal_size, even if
-		 * max_slot_wal_keep_size is smaller than max_wal_size.
-		 */
-		if ((max_slot_wal_keep_size_mb <= 0 ||
-			 max_slot_wal_keep_size_mb >= max_wal_size_mb) &&
-			oldestSegMaxWalSize <= targetSeg)
-			return WALAVAIL_NORMAL;
-
-		/* being retained by slots */
-		if (oldestSlotSeg <= targetSeg)
+		/* show "reserved" when targetSeg is within max_wal_size */
+		if (targetSeg >= oldestSegMaxWalSize)
 			return WALAVAIL_RESERVED;
+
+		/* being retained by slots exceeding max_wal_size */
+		return WALAVAIL_EXTENDED;
 	}
 
+	/* WAL segments are no longer retained but haven't been removed yet */
+	if (targetSeg >= oldestSeg)
+		return WALAVAIL_UNRESERVED;
+
 	/* Definitely lost */
 	return WALAVAIL_REMOVED;
 }
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index a7bbcf3499..e8761f3a18 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1226,6 +1226,7 @@ restart:
 						(uint32) restart_lsn)));
 
 		SpinLockAcquire(&s->mutex);
+		s->data.invalidated_at = s->data.restart_lsn;
 		s->data.restart_lsn = InvalidXLogRecPtr;
 		SpinLockRelease(&s->mutex);
 		ReplicationSlotRelease();
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 06e4955de7..df854bc6e3 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -283,6 +283,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		bool		nulls[PG_GET_REPLICATION_SLOTS_COLS];
 		WALAvailability walstate;
 		XLogSegNo	last_removed_seg;
+		XLogRecPtr	targetLSN;
 		int			i;
 
 		if (!slot->in_use)
@@ -342,7 +343,15 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		else
 			nulls[i++] = true;
 
-		walstate = GetWALAvailability(slot_contents.data.restart_lsn);
+		/*
+		 * Report availability from invalidated_at when the slot has been
+		 * invalidated; otherwise slots would appear as invalid without any
+		 * more clues as to what happened.
+		 */
+		targetLSN = XLogRecPtrIsInvalid(slot_contents.data.restart_lsn) ?
+			slot_contents.data.invalidated_at :
+			slot_contents.data.restart_lsn;
+		walstate = GetWALAvailability(targetLSN);
 
 		switch (walstate)
 		{
@@ -350,24 +359,47 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 				nulls[i++] = true;
 				break;
 
-			case WALAVAIL_NORMAL:
-				values[i++] = CStringGetTextDatum("normal");
-				break;
-
 			case WALAVAIL_RESERVED:
 				values[i++] = CStringGetTextDatum("reserved");
 				break;
 
+			case WALAVAIL_EXTENDED:
+				values[i++] = CStringGetTextDatum("extended");
+				break;
+
+			case WALAVAIL_UNRESERVED:
+				values[i++] = CStringGetTextDatum("unreserved");
+				break;
+
 			case WALAVAIL_REMOVED:
+
+				/*
+				 * If we read the restart_lsn long enough ago, maybe that file
+				 * has been removed by now.  However, the walsender could have
+				 * moved forward enough that it jumped to another file after
+				 * we looked.  If checkpointer signalled the process to
+				 * termination, then it's definitely lost; but if a process is
+				 * still alive, then "unreserved" seems more appropriate.
+				 */
+				if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
+				{
+					int			pid;
+
+					SpinLockAcquire(&slot->mutex);
+					pid = slot->active_pid;
+					SpinLockRelease(&slot->mutex);
+					if (pid != 0)
+					{
+						values[i++] = CStringGetTextDatum("unreserved");
+						break;
+					}
+				}
 				values[i++] = CStringGetTextDatum("lost");
 				break;
-
-			default:
-				elog(ERROR, "invalid walstate: %d", (int) walstate);
 		}
 
 		if (max_slot_wal_keep_size_mb >= 0 &&
-			(walstate == WALAVAIL_NORMAL || walstate == WALAVAIL_RESERVED) &&
+			(walstate == WALAVAIL_RESERVED || walstate == WALAVAIL_EXTENDED) &&
 			((last_removed_seg = XLogGetLastRemovedSegno()) != 0))
 		{
 			XLogRecPtr	min_safe_lsn;
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 347a38f57c..77ac4e785f 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -270,8 +270,10 @@ extern CheckpointStatsData CheckpointStats;
 typedef enum WALAvailability
 {
 	WALAVAIL_INVALID_LSN,		/* parameter error */
-	WALAVAIL_NORMAL,			/* WAL segment is within max_wal_size */
-	WALAVAIL_RESERVED,			/* WAL segment is reserved by a slot */
+	WALAVAIL_RESERVED,			/* WAL segment is within max_wal_size */
+	WALAVAIL_EXTENDED,			/* WAL segment is reserved by a slot or
+								 * wal_keep_segments */
+	WALAVAIL_UNRESERVED,		/* no longer reserved, but not removed yet */
 	WALAVAIL_REMOVED			/* WAL segment has been removed */
 } WALAvailability;
 
@@ -326,7 +328,7 @@ extern void ShutdownXLOG(int code, Datum arg);
 extern void InitXLOGAccess(void);
 extern void CreateCheckPoint(int flags);
 extern bool CreateRestartPoint(int flags);
-extern WALAvailability GetWALAvailability(XLogRecPtr restart_lsn);
+extern WALAvailability GetWALAvailability(XLogRecPtr targetLSN);
 extern XLogRecPtr CalculateMaxmumSafeLSN(void);
 extern void XLogPutNextOid(Oid nextOid);
 extern XLogRecPtr XLogRestorePoint(const char *rpName);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 917876010e..31362585ec 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -79,6 +79,9 @@ typedef struct ReplicationSlotPersistentData
 	/* oldest LSN that might be required by this replication slot */
 	XLogRecPtr	restart_lsn;
 
+	/* restart_lsn is copied here when the slot is invalidated */
+	XLogRecPtr	invalidated_at;
+
 	/*
 	 * Oldest LSN that the client has acked receipt for.  This is used as the
 	 * start_lsn point in case the client doesn't specify one, and also as a
diff --git a/src/test/recovery/t/019_replslot_limit.pl b/src/test/recovery/t/019_replslot_limit.pl
index cba7df920c..7d22ae5720 100644
--- a/src/test/recovery/t/019_replslot_limit.pl
+++ b/src/test/recovery/t/019_replslot_limit.pl
@@ -56,7 +56,7 @@ $node_standby->stop;
 $result = $node_master->safe_psql('postgres',
 	"SELECT wal_status, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"
 );
-is($result, "normal|t", 'check the catching-up state');
+is($result, "reserved|t", 'check the catching-up state');
 
 # Advance WAL by five segments (= 5MB) on master
 advance_wal($node_master, 1);
@@ -66,7 +66,8 @@ $node_master->safe_psql('postgres', "CHECKPOINT;");
 $result = $node_master->safe_psql('postgres',
 	"SELECT wal_status, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"
 );
-is($result, "normal|t", 'check that it is safe if WAL fits in max_wal_size');
+is($result, "reserved|t",
+	'check that it is safe if WAL fits in max_wal_size');
 
 advance_wal($node_master, 4);
 $node_master->safe_psql('postgres', "CHECKPOINT;");
@@ -75,7 +76,7 @@ $node_master->safe_psql('postgres', "CHECKPOINT;");
 $result = $node_master->safe_psql('postgres',
 	"SELECT wal_status, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"
 );
-is($result, "normal|t", 'check that slot is working');
+is($result, "reserved|t", 'check that slot is working');
 
 # The standby can reconnect to master
 $node_standby->start;
@@ -99,7 +100,7 @@ $node_master->reload;
 
 $result = $node_master->safe_psql('postgres',
 	"SELECT wal_status FROM pg_replication_slots WHERE slot_name = 'rep1'");
-is($result, "normal", 'check that max_slot_wal_keep_size is working');
+is($result, "reserved", 'check that max_slot_wal_keep_size is working');
 
 # Advance WAL again then checkpoint, reducing remain by 2 MB.
 advance_wal($node_master, 2);
@@ -108,7 +109,7 @@ $node_master->safe_psql('postgres', "CHECKPOINT;");
 # The slot is still working
 $result = $node_master->safe_psql('postgres',
 	"SELECT wal_status FROM pg_replication_slots WHERE slot_name = 'rep1'");
-is($result, "normal",
+is($result, "reserved",
 	'check that min_safe_lsn gets close to the current LSN');
 
 # The standby can reconnect to master
@@ -125,7 +126,7 @@ advance_wal($node_master, 6);
 $result = $node_master->safe_psql('postgres',
 	"SELECT wal_status as remain FROM pg_replication_slots WHERE slot_name = 'rep1'"
 );
-is($result, "normal",
+is($result, "extended",
 	'check that wal_keep_segments overrides max_slot_wal_keep_size');
 # restore wal_keep_segments
 $result = $node_master->safe_psql('postgres',
@@ -143,7 +144,7 @@ advance_wal($node_master, 6);
 # Slot gets into 'reserved' state
 $result = $node_master->safe_psql('postgres',
 	"SELECT wal_status FROM pg_replication_slots WHERE slot_name = 'rep1'");
-is($result, "reserved", 'check that the slot state changes to "reserved"');
+is($result, "extended", 'check that the slot state changes to "extended"');
 
 # do checkpoint so that the next checkpoint runs too early
 $node_master->safe_psql('postgres', "CHECKPOINT;");
@@ -151,11 +152,12 @@ $node_master->safe_psql('postgres', "CHECKPOINT;");
 # Advance WAL again without checkpoint; remain goes to 0.
 advance_wal($node_master, 1);
 
-# Slot gets into 'lost' state
+# Slot gets into 'unreserved' state
 $result = $node_master->safe_psql('postgres',
 	"SELECT wal_status, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"
 );
-is($result, "lost|t", 'check that the slot state changes to "lost"');
+is($result, "unreserved|t",
+	'check that the slot state changes to "unreserved"');
 
 # The standby still can connect to master before a checkpoint
 $node_standby->start;
@@ -186,7 +188,8 @@ ok( find_in_log(
 $result = $node_master->safe_psql('postgres',
 	"SELECT slot_name, active, restart_lsn IS NULL, wal_status, min_safe_lsn FROM pg_replication_slots WHERE slot_name = 'rep1'"
 );
-is($result, "rep1|f|t||", 'check that the slot became inactive');
+is($result, "rep1|f|t|lost|",
+	'check that the slot became inactive and the state "lost" persists');
 
 # The standby no longer can connect to the master
 $logstart = get_log_size($node_standby);
#33Alvaro Herrera
alvherre@2ndquadrant.com
In reply to: Kyotaro Horiguchi (#32)
Re: Review for GetWALAvailability()

Thanks for those corrections.

I have pushed this. I think all problems Masao-san reported have been
dealt with, so we're done here.

Thanks!

--
�lvaro Herrera https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

#34Fujii Masao
masao.fujii@oss.nttdata.com
In reply to: Alvaro Herrera (#33)
Re: Review for GetWALAvailability()

On 2020/06/25 3:27, Alvaro Herrera wrote:

Thanks for those corrections.

I have pushed this. I think all problems Masao-san reported have been
dealt with, so we're done here.

Sorry for my late to reply here...

Thanks for committing the patch and improving the feature!

/*
* Find the oldest extant segment file. We get 1 until checkpoint removes
* the first WAL segment file since startup, which causes the status being
* wrong under certain abnormal conditions but that doesn't actually harm.
*/
oldestSeg = XLogGetLastRemovedSegno() + 1;

I see the point of the above comment, but this can cause wal_status to be
changed from "lost" to "unreserved" after the server restart. Isn't this
really confusing? At least it seems better to document that behavior.

Or if we *can ensure* that the slot with invalidated_at set always means
"lost" slot, we can judge that wal_status is "lost" without using fragile
XLogGetLastRemovedSegno(). Thought?

Or XLogGetLastRemovedSegno() should be fixed so that it returns valid
value even after the restart?

Regards,

--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION

#35Alvaro Herrera
alvherre@2ndquadrant.com
In reply to: Fujii Masao (#34)
Re: Review for GetWALAvailability()

On 2020-Jun-25, Fujii Masao wrote:

/*
* Find the oldest extant segment file. We get 1 until checkpoint removes
* the first WAL segment file since startup, which causes the status being
* wrong under certain abnormal conditions but that doesn't actually harm.
*/
oldestSeg = XLogGetLastRemovedSegno() + 1;

I see the point of the above comment, but this can cause wal_status to be
changed from "lost" to "unreserved" after the server restart. Isn't this
really confusing? At least it seems better to document that behavior.

Hmm.

Or if we *can ensure* that the slot with invalidated_at set always means
"lost" slot, we can judge that wal_status is "lost" without using fragile
XLogGetLastRemovedSegno(). Thought?

Hmm, this sounds compelling -- I think it just means we need to ensure
we reset invalidated_at to zero if the slot's restart_lsn is set to a
correct position afterwards. I don't think we have any operation that
does that, so it should be safe -- hopefully I didn't overlook anything?
Neither copy nor advance seem to work with a slot that has invalid
restart_lsn.

Or XLogGetLastRemovedSegno() should be fixed so that it returns valid
value even after the restart?

This seems more work to implement.

--
�lvaro Herrera https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

#36Fujii Masao
masao.fujii@oss.nttdata.com
In reply to: Alvaro Herrera (#35)
Re: Review for GetWALAvailability()

On 2020/06/25 12:57, Alvaro Herrera wrote:

On 2020-Jun-25, Fujii Masao wrote:

/*
* Find the oldest extant segment file. We get 1 until checkpoint removes
* the first WAL segment file since startup, which causes the status being
* wrong under certain abnormal conditions but that doesn't actually harm.
*/
oldestSeg = XLogGetLastRemovedSegno() + 1;

I see the point of the above comment, but this can cause wal_status to be
changed from "lost" to "unreserved" after the server restart. Isn't this
really confusing? At least it seems better to document that behavior.

Hmm.

Or if we *can ensure* that the slot with invalidated_at set always means
"lost" slot, we can judge that wal_status is "lost" without using fragile
XLogGetLastRemovedSegno(). Thought?

Hmm, this sounds compelling -- I think it just means we need to ensure
we reset invalidated_at to zero if the slot's restart_lsn is set to a
correct position afterwards.

Yes.

I don't think we have any operation that
does that, so it should be safe -- hopefully I didn't overlook anything?

We need to call ReplicationSlotMarkDirty() and ReplicationSlotSave()
just after setting invalidated_at and restart_lsn in InvalidateObsoleteReplicationSlots()?
Otherwise, restart_lsn can go back to the previous value after the restart.

diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index e8761f3a18..5584e5dd2c 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1229,6 +1229,13 @@ restart:
                 s->data.invalidated_at = s->data.restart_lsn;
                 s->data.restart_lsn = InvalidXLogRecPtr;
                 SpinLockRelease(&s->mutex);
+
+               /*
+                * Save this invalidated slot to disk, to ensure that the slot
+                * is still invalid even after the server restart.
+                */
+               ReplicationSlotMarkDirty();
+               ReplicationSlotSave();
                 ReplicationSlotRelease();

/* if we did anything, start from scratch */

Maybe we don't need to do this if the slot is temporary?

Neither copy nor advance seem to work with a slot that has invalid
restart_lsn.

Or XLogGetLastRemovedSegno() should be fixed so that it returns valid
value even after the restart?

This seems more work to implement.

Yes.

Regards,

--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION

#37Kyotaro Horiguchi
horikyota.ntt@gmail.com
In reply to: Fujii Masao (#36)
2 attachment(s)
Re: Review for GetWALAvailability()

At Thu, 25 Jun 2020 14:35:34 +0900, Fujii Masao <masao.fujii@oss.nttdata.com> wrote in

On 2020/06/25 12:57, Alvaro Herrera wrote:

On 2020-Jun-25, Fujii Masao wrote:

/*
* Find the oldest extant segment file. We get 1 until checkpoint removes
* the first WAL segment file since startup, which causes the status being
* wrong under certain abnormal conditions but that doesn't actually harm.
*/
oldestSeg = XLogGetLastRemovedSegno() + 1;

I see the point of the above comment, but this can cause wal_status to
be
changed from "lost" to "unreserved" after the server restart. Isn't
this
really confusing? At least it seems better to document that behavior.

Hmm.

Or if we *can ensure* that the slot with invalidated_at set always
means
"lost" slot, we can judge that wal_status is "lost" without using
fragile
XLogGetLastRemovedSegno(). Thought?

Hmm, this sounds compelling -- I think it just means we need to ensure
we reset invalidated_at to zero if the slot's restart_lsn is set to a
correct position afterwards.

Yes.

It is error-prone restriction, as discussed before.

Without changing updator-side, invalid restart_lsn AND valid
invalidated_at can be regarded as the lost state. With the following
change suggested by Fujii-san we can avoid the confusing status.

With attached first patch on top of the slot-dirtify fix below, we get
"lost" for invalidated slots after restart.

I don't think we have any operation that
does that, so it should be safe -- hopefully I didn't overlook
anything?

We need to call ReplicationSlotMarkDirty() and ReplicationSlotSave()
just after setting invalidated_at and restart_lsn in
InvalidateObsoleteReplicationSlots()?
Otherwise, restart_lsn can go back to the previous value after the
restart.

diff --git a/src/backend/replication/slot.c
b/src/backend/replication/slot.c
index e8761f3a18..5584e5dd2c 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1229,6 +1229,13 @@ restart:
s->data.invalidated_at = s->data.restart_lsn;
s->data.restart_lsn = InvalidXLogRecPtr;
SpinLockRelease(&s->mutex);
+
+               /*
+ * Save this invalidated slot to disk, to ensure that the slot
+                * is still invalid even after the server restart.
+                */
+               ReplicationSlotMarkDirty();
+               ReplicationSlotSave();
ReplicationSlotRelease();
/* if we did anything, start from scratch */

Maybe we don't need to do this if the slot is temporary?

The only difference of temprary slots from persistent one seems to be
an attribute "persistency". Actually,
create_physica_replication_slot() does the aboves for temporary slots.

Neither copy nor advance seem to work with a slot that has invalid
restart_lsn.

Or XLogGetLastRemovedSegno() should be fixed so that it returns valid
value even after the restart?

This seems more work to implement.

Yes.

The confusing status can be avoided without fixing it, but I prefer to
fix it. As Fujii-san suggested upthread, couldn't we remember
lastRemovedSegNo in the contorl file? (Yeah, it cuases a bump of
PG_CONTROL_VERSION and CATALOG_VERSION_NO?).

--
Kyotaro Horiguchi
NTT Open Source Software Center

Attachments:

0001-Make-slot-invalidation-persistent.patchtext/x-patch; charset=us-asciiDownload
From 7c8803f2bd7267d166f8a6f1a4ca5b129aa5ae96 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyoga.ntt@gmail.com>
Date: Thu, 25 Jun 2020 17:03:24 +0900
Subject: [PATCH 1/2] Make slot invalidation persistent

---
 src/backend/replication/slot.c | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index e8761f3a18..26f874e3cb 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1229,7 +1229,15 @@ restart:
 		s->data.invalidated_at = s->data.restart_lsn;
 		s->data.restart_lsn = InvalidXLogRecPtr;
 		SpinLockRelease(&s->mutex);
+
+		/*
+		 * Save this invalidated slot to disk, to ensure that the slot
+		 * is still invalid even after the server restart.
+		 */
+		ReplicationSlotMarkDirty();
+		ReplicationSlotSave();
 		ReplicationSlotRelease();
+		
 
 		/* if we did anything, start from scratch */
 		CHECK_FOR_INTERRUPTS();
-- 
2.18.4

0002-Show-correct-value-in-pg_replication_slots.wal_statu.patchtext/x-patch; charset=us-asciiDownload
From 0792c41c915b87474958689a2acd5c214b393015 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyoga.ntt@gmail.com>
Date: Thu, 25 Jun 2020 17:04:01 +0900
Subject: [PATCH 2/2] Show correct value in pg_replication_slots.wal_status
 after restart

The column may show bogus staus until checkpoint removes at least one
WAL segment. This patch changes the logic to detect the state so that
the column shows the correct status after restart.
---
 src/backend/replication/slotfuncs.c | 18 +++++++++++++-----
 1 file changed, 13 insertions(+), 5 deletions(-)

diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index fca18ffae5..889d6f53b6 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -283,7 +283,6 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		bool		nulls[PG_GET_REPLICATION_SLOTS_COLS];
 		WALAvailability walstate;
 		XLogSegNo	last_removed_seg;
-		XLogRecPtr	targetLSN;
 		int			i;
 
 		if (!slot->in_use)
@@ -348,10 +347,19 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		 * invalidated; otherwise slots would appear as invalid without any
 		 * more clues as to what happened.
 		 */
-		targetLSN = XLogRecPtrIsInvalid(slot_contents.data.restart_lsn) ?
-			slot_contents.data.invalidated_at :
-			slot_contents.data.restart_lsn;
-		walstate = GetWALAvailability(targetLSN);
+
+		/*
+		 * valid invalidated_at means the slot have invalidated before. If
+		 * restart_lsn is invalid addition to that, that means the slot has
+		 * lost reqruied segments. We could just pass invalidated_at to
+		 * GetWALAvailability if lastRemovedSegNo has been initialized, but the
+		 * result is not reliable until checkpoint removes some segments.
+		 */
+		if (XLogRecPtrIsInvalid(slot_contents.data.restart_lsn) &&
+			!XLogRecPtrIsInvalid(slot_contents.data.invalidated_at))
+			walstate = WALAVAIL_REMOVED;
+		else
+			walstate = GetWALAvailability(slot_contents.data.restart_lsn);
 
 		switch (walstate)
 		{
-- 
2.18.4

#38Alvaro Herrera
alvherre@2ndquadrant.com
In reply to: Kyotaro Horiguchi (#37)
Re: Review for GetWALAvailability()

On 2020-Jun-25, Kyotaro Horiguchi wrote:

It is error-prone restriction, as discussed before.

Without changing updator-side, invalid restart_lsn AND valid
invalidated_at can be regarded as the lost state. With the following
change suggested by Fujii-san we can avoid the confusing status.

With attached first patch on top of the slot-dirtify fix below, we get
"lost" for invalidated slots after restart.

Makes sense. I pushed this change, thanks.

The confusing status can be avoided without fixing it, but I prefer to
fix it. As Fujii-san suggested upthread, couldn't we remember
lastRemovedSegNo in the contorl file? (Yeah, it cuases a bump of
PG_CONTROL_VERSION and CATALOG_VERSION_NO?).

I think that's a pg14 change. Feel free to submit a patch to the
commitfest.

--
�lvaro Herrera https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services