pg_replication_slot_advance to return NULL instead of 0/0 if slot not advanced
Hi all,
When attempting to use multiple times pg_replication_slot_advance on a
slot, then the caller gets back directly InvalidXLogRecPtr as result,
for example:
=# select * from pg_replication_slot_advance('popo', 'FF/0');
slot_name | end_lsn
-----------+-----------
popo | 0/60021E0
(1 row)
=# select * from pg_replication_slot_advance('popo', 'FF/0');
slot_name | end_lsn
-----------+---------
popo | 0/0
(1 row)
Wouldn't it be more simple to return NULL to mean that the slot could
not be moved forward? That would be easier to parse for clients.
Please see the attached.
Thanks,
--
Michael
Attachments:
slot-advance-null.patchtext/x-diff; charset=us-asciiDownload
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index b851fe023a..df63201eb3 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -19329,7 +19329,8 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup());
Advances the current confirmed position of a replication slot named
<parameter>slot_name</parameter>. The slot will not be moved backwards,
and it will not be moved beyond the current insert location. Returns
- name of the slot and real position to which it was advanced to.
+ name of the slot and real position to which it was advanced to, or
+ <literal>NULL</literal> if the slot could not be advanced.
</entry>
</row>
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index d9e10263bb..cd8fc1560a 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -502,9 +502,17 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
ReplicationSlotRelease();
- /* Return the reached position. */
- values[1] = LSNGetDatum(endlsn);
- nulls[1] = false;
+ /*
+ * Return the reached position, or NULL if the slot has not been
+ * advanced.
+ */
+ if (XLogRecPtrIsInvalid(endlsn))
+ nulls[1] = true;
+ else
+ {
+ values[1] = LSNGetDatum(endlsn);
+ nulls[1] = false;
+ }
tuple = heap_form_tuple(tupdesc, values, nulls);
result = HeapTupleGetDatum(tuple);
On Fri, May 25, 2018 at 7:28 AM, Michael Paquier <michael@paquier.xyz>
wrote:
Hi all,
When attempting to use multiple times pg_replication_slot_advance on a
slot, then the caller gets back directly InvalidXLogRecPtr as result,
for example:
=# select * from pg_replication_slot_advance('popo', 'FF/0');
slot_name | end_lsn
-----------+-----------
popo | 0/60021E0
(1 row)
=# select * from pg_replication_slot_advance('popo', 'FF/0');
slot_name | end_lsn
-----------+---------
popo | 0/0
(1 row)Wouldn't it be more simple to return NULL to mean that the slot could
not be moved forward? That would be easier to parse for clients.
Please see the attached.
I agree that returning 0/0 on this is wrong.
However, can this actually occour for any case other than exactly the case
of "moving the position to where the position already is"? If I look at the
physical slot path at least that seems to eb the only case, and in that
case I think the correct thing to return would be the new position, and not
NULL. If we actually *fail* to move the position, we give an error.
Actually, isn't there also a race there? That is, if we try to move it, we
check that we're not trying to move it backwards, and throw an error, but
that's checked outside the lock. Then later we actually move it, and check
*again* if we try to move it backwards, but if that one fails we return
InvalidXLogRecPtr (which can happen in the case of concurrent activity on
the slot, I think)? In this case, maybe we should just re-check that and
raise an error appropriately?
(I haven't looked at the logical slot path, but I assume it would have
something similar in it)
--
Magnus Hagander
Me: https://www.hagander.net/ <http://www.hagander.net/>
Work: https://www.redpill-linpro.com/ <http://www.redpill-linpro.com/>
On 25 May 2018 at 13:12, Magnus Hagander <magnus@hagander.net> wrote:
On Fri, May 25, 2018 at 7:28 AM, Michael Paquier <michael@paquier.xyz>
wrote:Hi all,
When attempting to use multiple times pg_replication_slot_advance on a
slot, then the caller gets back directly InvalidXLogRecPtr as result,
for example:
=# select * from pg_replication_slot_advance('popo', 'FF/0');
slot_name | end_lsn
-----------+-----------
popo | 0/60021E0
(1 row)
=# select * from pg_replication_slot_advance('popo', 'FF/0');
slot_name | end_lsn
-----------+---------
popo | 0/0
(1 row)Wouldn't it be more simple to return NULL to mean that the slot could
not be moved forward? That would be easier to parse for clients.
Please see the attached.I agree that returning 0/0 on this is wrong.
Agreed
However, can this actually occour for any case other than exactly the case
of "moving the position to where the position already is"? If I look at the
physical slot path at least that seems to eb the only case, and in that case
I think the correct thing to return would be the new position, and not NULL.
Docs say
"Returns name of the slot and real position to which it was advanced to."
so agreed
If we actually *fail* to move the position, we give an error.
Actually, isn't there also a race there? That is, if we try to move it, we
check that we're not trying to move it backwards, and throw an error, but
that's checked outside the lock. Then later we actually move it, and check
*again* if we try to move it backwards, but if that one fails we return
InvalidXLogRecPtr (which can happen in the case of concurrent activity on
the slot, I think)? In this case, maybe we should just re-check that and
raise an error appropriately?
Agreed
(I haven't looked at the logical slot path, but I assume it would have
something similar in it)
--
Simon Riggs http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
On Fri, May 25, 2018 at 02:12:32PM +0200, Magnus Hagander wrote:
I agree that returning 0/0 on this is wrong.
However, can this actually occour for any case other than exactly the case
of "moving the position to where the position already is"? If I look at the
physical slot path at least that seems to be the only case, and in that
case I think the correct thing to return would be the new position, and not
NULL. If we actually *fail* to move the position, we give an error.
Yes, this only returns InvalidXLogRecPtr if the location could not be
moved forward. Still, there is more going on here. For a physical
slot, confirmed_lsn is always 0/0, hence the backward check is never
applied for it. What I think should be used for value assigned to
startlsn is restart_lsn instead. Then what do we do if the position
cannot be moved: should we raise an error, as what my patch attached
does, or just report the existing position the slot is at?
A second error that I can see is in pg_logical_replication_slot_advance,
which does not take the mutex lock of MyReplicationSlot, so concurrent
callers like those of ReplicationSlotsComputeRequiredLSN (applies to
physical slot actually) and pg_get_replication_slots() may read false
data.
On top of that, it seems to me that StartLogicalReplication is reading a
couple of fields from a slot without taking a lock, so at least
pg_get_replication_slots() may read incorrect data.
ReplicationSlotReserveWal also is missing a lock.. Those are older than
v11 though.
Actually, isn't there also a race there? That is, if we try to move it, we
check that we're not trying to move it backwards, and throw an error, but
that's checked outside the lock. Then later we actually move it, and check
*again* if we try to move it backwards, but if that one fails we return
InvalidXLogRecPtr (which can happen in the case of concurrent activity on
the slot, I think)? In this case, maybe we should just re-check that and
raise an error appropriately?
Er, isn't the take here that ReplicationSlotAcquire is used to lock any
replication slot update to happen from other backends? It seems to me
that what counts at the end if if a backend PID is associated to a slot
in memory. If you look at the code paths updating a logical or physical
slot then those imply that the slot is owned, still a spin lock needs to
be taken for concurrent readers.
(I haven't looked at the logical slot path, but I assume it would have
something similar in it)
Found one. All the things I have spotted are in the patch attached.
--
Michael
Attachments:
slot-lock-fixes.patchtext/x-diff; charset=us-asciiDownload
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 056628fe8e..79d7a57d67 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1016,7 +1016,9 @@ ReplicationSlotReserveWal(void)
XLogRecPtr flushptr;
/* start at current insert position */
+ SpinLockAcquire(&slot->mutex);
slot->data.restart_lsn = GetXLogInsertRecPtr();
+ SpinLockRelease(&slot->mutex);
/* make sure we have enough information to start */
flushptr = LogStandbySnapshot();
@@ -1026,7 +1028,9 @@ ReplicationSlotReserveWal(void)
}
else
{
+ SpinLockAcquire(&slot->mutex);
slot->data.restart_lsn = GetRedoRecPtr();
+ SpinLockRelease(&slot->mutex);
}
/* prevent WAL removal as fast as possible */
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index d9e10263bb..4cf2aef95f 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -413,7 +413,9 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
ReplicationSlotMarkDirty();
}
+ SpinLockAcquire(&MyReplicationSlot->mutex);
retlsn = MyReplicationSlot->data.confirmed_flush;
+ SpinLockRelease(&MyReplicationSlot->mutex);
/* free context, call shutdown callback */
FreeDecodingContext(ctx);
@@ -472,7 +474,13 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
/* Acquire the slot so we "own" it */
ReplicationSlotAcquire(NameStr(*slotname), true);
- startlsn = MyReplicationSlot->data.confirmed_flush;
+ SpinLockAcquire(&MyReplicationSlot->mutex);
+ if (OidIsValid(MyReplicationSlot->data.database))
+ startlsn = MyReplicationSlot->data.confirmed_flush;
+ else
+ startlsn = MyReplicationSlot->data.restart_lsn;
+ SpinLockRelease(&MyReplicationSlot->mutex);
+
if (moveto < startlsn)
{
ReplicationSlotRelease();
@@ -488,6 +496,10 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
else
endlsn = pg_physical_replication_slot_advance(startlsn, moveto);
+ if (XLogRecPtrIsInvalid(endlsn))
+ ereport(ERROR,
+ (errmsg("slot not moved forward as position is already reached")));
+
values[0] = NameGetDatum(&MyReplicationSlot->data.name);
nulls[0] = false;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index e47ddca6bc..0b1f1ba3c1 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1040,7 +1040,9 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd)
static void
StartLogicalReplication(StartReplicationCmd *cmd)
{
- StringInfoData buf;
+ StringInfoData buf;
+ XLogRecPtr restart_lsn;
+ XLogRecPtr confirmed_lsn;
/* make sure that our requirements are still fulfilled */
CheckLogicalDecodingRequirements();
@@ -1081,14 +1083,20 @@ StartLogicalReplication(StartReplicationCmd *cmd)
WalSndWriteData,
WalSndUpdateProgress);
+ /* Fetch all needed values from the slot */
+ SpinLockAcquire(&MyReplicationSlot->mutex);
+ restart_lsn = MyReplicationSlot->data.restart_lsn;
+ confirmed_lsn = MyReplicationSlot->data.confirmed_flush;
+ SpinLockRelease(&MyReplicationSlot->mutex);
+
/* Start reading WAL from the oldest required WAL. */
- logical_startptr = MyReplicationSlot->data.restart_lsn;
+ logical_startptr = restart_lsn;
/*
* Report the location after which we'll send out further commits as the
* current sentPtr.
*/
- sentPtr = MyReplicationSlot->data.confirmed_flush;
+ sentPtr = confirmed_lsn;
/* Also update the sent position status in shared memory */
SpinLockAcquire(&MyWalSnd->mutex);
On Mon, May 28, 2018 at 05:57:47PM +0900, Michael Paquier wrote:
Found one. All the things I have spotted are in the patch attached.
Oops, forgot a ReplicationSlotRelease call.
--
Michael
Attachments:
slot-lock-fixes-v2.patchtext/x-diff; charset=us-asciiDownload
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 056628fe8e..79d7a57d67 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1016,7 +1016,9 @@ ReplicationSlotReserveWal(void)
XLogRecPtr flushptr;
/* start at current insert position */
+ SpinLockAcquire(&slot->mutex);
slot->data.restart_lsn = GetXLogInsertRecPtr();
+ SpinLockRelease(&slot->mutex);
/* make sure we have enough information to start */
flushptr = LogStandbySnapshot();
@@ -1026,7 +1028,9 @@ ReplicationSlotReserveWal(void)
}
else
{
+ SpinLockAcquire(&slot->mutex);
slot->data.restart_lsn = GetRedoRecPtr();
+ SpinLockRelease(&slot->mutex);
}
/* prevent WAL removal as fast as possible */
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index d9e10263bb..1b8fd951d5 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -413,7 +413,9 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
ReplicationSlotMarkDirty();
}
+ SpinLockAcquire(&MyReplicationSlot->mutex);
retlsn = MyReplicationSlot->data.confirmed_flush;
+ SpinLockRelease(&MyReplicationSlot->mutex);
/* free context, call shutdown callback */
FreeDecodingContext(ctx);
@@ -472,7 +474,13 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
/* Acquire the slot so we "own" it */
ReplicationSlotAcquire(NameStr(*slotname), true);
- startlsn = MyReplicationSlot->data.confirmed_flush;
+ SpinLockAcquire(&MyReplicationSlot->mutex);
+ if (OidIsValid(MyReplicationSlot->data.database))
+ startlsn = MyReplicationSlot->data.confirmed_flush;
+ else
+ startlsn = MyReplicationSlot->data.restart_lsn;
+ SpinLockRelease(&MyReplicationSlot->mutex);
+
if (moveto < startlsn)
{
ReplicationSlotRelease();
@@ -488,6 +496,13 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
else
endlsn = pg_physical_replication_slot_advance(startlsn, moveto);
+ if (XLogRecPtrIsInvalid(endlsn))
+ {
+ ReplicationSlotRelease();
+ ereport(ERROR,
+ (errmsg("slot not moved forward as position is already reached")));
+ }
+
values[0] = NameGetDatum(&MyReplicationSlot->data.name);
nulls[0] = false;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index e47ddca6bc..0b1f1ba3c1 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1040,7 +1040,9 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd)
static void
StartLogicalReplication(StartReplicationCmd *cmd)
{
- StringInfoData buf;
+ StringInfoData buf;
+ XLogRecPtr restart_lsn;
+ XLogRecPtr confirmed_lsn;
/* make sure that our requirements are still fulfilled */
CheckLogicalDecodingRequirements();
@@ -1081,14 +1083,20 @@ StartLogicalReplication(StartReplicationCmd *cmd)
WalSndWriteData,
WalSndUpdateProgress);
+ /* Fetch all needed values from the slot */
+ SpinLockAcquire(&MyReplicationSlot->mutex);
+ restart_lsn = MyReplicationSlot->data.restart_lsn;
+ confirmed_lsn = MyReplicationSlot->data.confirmed_flush;
+ SpinLockRelease(&MyReplicationSlot->mutex);
+
/* Start reading WAL from the oldest required WAL. */
- logical_startptr = MyReplicationSlot->data.restart_lsn;
+ logical_startptr = restart_lsn;
/*
* Report the location after which we'll send out further commits as the
* current sentPtr.
*/
- sentPtr = MyReplicationSlot->data.confirmed_flush;
+ sentPtr = confirmed_lsn;
/* Also update the sent position status in shared memory */
SpinLockAcquire(&MyWalSnd->mutex);
On 28 May 2018 at 09:57, Michael Paquier <michael@paquier.xyz> wrote:
On Fri, May 25, 2018 at 02:12:32PM +0200, Magnus Hagander wrote:
I agree that returning 0/0 on this is wrong.
However, can this actually occour for any case other than exactly the case
of "moving the position to where the position already is"? If I look at the
physical slot path at least that seems to be the only case, and in that
case I think the correct thing to return would be the new position, and not
NULL. If we actually *fail* to move the position, we give an error.Yes, this only returns InvalidXLogRecPtr if the location could not be
moved forward. Still, there is more going on here. For a physical
slot, confirmed_lsn is always 0/0, hence the backward check is never
applied for it. What I think should be used for value assigned to
startlsn is restart_lsn instead. Then what do we do if the position
cannot be moved: should we raise an error, as what my patch attached
does, or just report the existing position the slot is at?
I don't see why an ERROR would be appropriate.
A second error that I can see is in pg_logical_replication_slot_advance,
which does not take the mutex lock of MyReplicationSlot, so concurrent
callers like those of ReplicationSlotsComputeRequiredLSN (applies to
physical slot actually) and pg_get_replication_slots() may read false
data.On top of that, it seems to me that StartLogicalReplication is reading a
couple of fields from a slot without taking a lock, so at least
pg_get_replication_slots() may read incorrect data.
ReplicationSlotReserveWal also is missing a lock.. Those are older than
v11 though.Actually, isn't there also a race there? That is, if we try to move it, we
check that we're not trying to move it backwards, and throw an error, but
that's checked outside the lock. Then later we actually move it, and check
*again* if we try to move it backwards, but if that one fails we return
InvalidXLogRecPtr (which can happen in the case of concurrent activity on
the slot, I think)? In this case, maybe we should just re-check that and
raise an error appropriately?Er, isn't the take here that ReplicationSlotAcquire is used to lock any
replication slot update to happen from other backends? It seems to me
that what counts at the end if if a backend PID is associated to a slot
in memory. If you look at the code paths updating a logical or physical
slot then those imply that the slot is owned, still a spin lock needs to
be taken for concurrent readers.(I haven't looked at the logical slot path, but I assume it would have
something similar in it)Found one. All the things I have spotted are in the patch attached.
I think the problem here is there are no comments explaining how to
access the various fields in the structure, so there was no way to
check whether the code was good or not.
If we add corrective code we should clarify that in comments the .h
file also, as is done in XlogCtl
Your points look correct to me, well spotted. I'd like to separate the
correction of these issues from the change of behavior patch. Those
earlier issues can be backpatched, but the change of behavior only
affects PG11.
--
Simon Riggs http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
On Thu, May 31, 2018 at 06:31:24PM +0100, Simon Riggs wrote:
Thanks for the input, Simon. I have been able to spend more time
monitoring the slot-related code.
On 28 May 2018 at 09:57, Michael Paquier <michael@paquier.xyz> wrote:
Yes, this only returns InvalidXLogRecPtr if the location could not be
moved forward. Still, there is more going on here. For a physical
slot, confirmed_lsn is always 0/0, hence the backward check is never
applied for it. What I think should be used for value assigned to
startlsn is restart_lsn instead. Then what do we do if the position
cannot be moved: should we raise an error, as what my patch attached
does, or just report the existing position the slot is at?I don't see why an ERROR would be appropriate.
Okay, the caller can always compare if the returned position matches
what happened in the past or not, so that's fine for me. About that,
the LSN used as the initialization should then be startlsn instead of
InvalidXLogRecPtr.
A second error that I can see is in pg_logical_replication_slot_advance,
which does not take the mutex lock of MyReplicationSlot, so concurrent
callers like those of ReplicationSlotsComputeRequiredLSN (applies to
physical slot actually) and pg_get_replication_slots() may read false
data.On top of that, it seems to me that StartLogicalReplication is reading a
couple of fields from a slot without taking a lock, so at least
pg_get_replication_slots() may read incorrect data.
ReplicationSlotReserveWal also is missing a lock.. Those are older than
v11 though.
Actually, there are two extra problems:
- In CreateInitDecodingContext which can be called after the slot is
marked as in use so there could be inconsistencies with
pg_get_replication_slots() as well for catalog_xmin & co.
- In DecodingContextFindStartpoint where confirmed_lsn is updated
without the mutex taken.
I think the problem here is there are no comments explaining how to
access the various fields in the structure, so there was no way to
check whether the code was good or not.If we add corrective code we should clarify that in comments the .h
file also, as is done in XlogCtl
Yes, I agree with you. There are at least two LWLocks used for the
overall slot creation and handling, as well as a set of spin locks used
for the fields. The code also does not mention in its comments that
slots marked with in_use = false are not scanned at all by concurrent
backends, but this is strongly implied in the code, hence you don't need
to care about taking lock for them when working on fields for this slot
as long as its flag in_use has not been marked to true. There is one
code path which bypasses slots with in_use marked to true, but that's
for the startup process recovering the slot data, so there is no need to
care about locking in this case.
Your points look correct to me, well spotted. I'd like to separate the
correction of these issues from the change of behavior patch. Those
earlier issues can be backpatched, but the change of behavior only
affects PG11.
Definitely, while the previous patch was around mainly to show where
things are incorrect, I am attaching a set of patches for HEAD which can
be used for commit:
- One patch which addresses the several lock problems and adds some
instructions about the variables protected by spinlocks for slots in
use, which should be back-patched.
- A second patch for HEAD which addresses what has been noticed for the
new slot advance feature. This addresses as well the lock problems
introduced in the new advance code, hopefully the split makes sense to
others on this thread.
Once again those only apply to HEAD, please feel free to ping me if you
would like versions for back-branches (or anybody picking up those
patches).
Thanks,
--
Michael
Attachments:
0001-Fix-and-document-lock-handling-for-in-memory-replica.patchtext/x-diff; charset=us-asciiDownload
From 69bb70049d8ba74af60e8554fd6379499fbd29ff Mon Sep 17 00:00:00 2001
From: Michael Paquier <michael@paquier.xyz>
Date: Fri, 1 Jun 2018 14:30:55 -0400
Subject: [PATCH 1/2] Fix and document lock handling for in-memory replication
xslot data
While debugging issues on HEAD for the new slot forwarding feature of
Postgres 11, some monitoring of the code surrounding in-memory slot data
has proved that the lock handling may cause inconsistent data to be read
by read-only callers of slot functions, particularly
pg_get_replication_slots() which fetches data for the system view
pg_replication_slots.
The code paths involved in those problems concern the WAL sender,
logical decoding initialization and WAL reservation for slots.
A set of comments documenting all the lock handlings, particularly the
dependency with LW locks for slots and the in_use flag as well as the
internal mutex lock is added, based on a suggested by Simon Riggs.
Discussion: https://postgr.es/m/CANP8+jLyS=X-CAk59BJnsxKQfjwrmKicHQykyn52Qj-Q=9GLCw@mail.gmail.com
---
src/backend/replication/logical/logical.c | 13 +++++++++----
src/backend/replication/slot.c | 4 ++++
src/backend/replication/walsender.c | 14 +++++++++++---
src/include/replication/slot.h | 10 ++++++++++
4 files changed, 34 insertions(+), 7 deletions(-)
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1393591538..61588d626f 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -297,10 +297,12 @@ CreateInitDecodingContext(char *plugin,
xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot);
+ SpinLockAcquire(&slot->mutex);
slot->effective_catalog_xmin = xmin_horizon;
slot->data.catalog_xmin = xmin_horizon;
if (need_full_snapshot)
slot->effective_xmin = xmin_horizon;
+ SpinLockRelease(&slot->mutex);
ReplicationSlotsComputeRequiredXmin(true);
@@ -445,13 +447,14 @@ void
DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
{
XLogRecPtr startptr;
+ ReplicationSlot *slot = ctx->slot;
/* Initialize from where to start reading WAL. */
- startptr = ctx->slot->data.restart_lsn;
+ startptr = slot->data.restart_lsn;
elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X",
- (uint32) (ctx->slot->data.restart_lsn >> 32),
- (uint32) ctx->slot->data.restart_lsn);
+ (uint32) (slot->data.restart_lsn >> 32),
+ (uint32) slot->data.restart_lsn);
/* Wait for a consistent starting point */
for (;;)
@@ -477,7 +480,9 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
CHECK_FOR_INTERRUPTS();
}
- ctx->slot->data.confirmed_flush = ctx->reader->EndRecPtr;
+ SpinLockAcquire(&slot->mutex);
+ slot->data.confirmed_flush = ctx->reader->EndRecPtr;
+ SpinLockRelease(&slot->mutex);
}
/*
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 056628fe8e..79d7a57d67 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1016,7 +1016,9 @@ ReplicationSlotReserveWal(void)
XLogRecPtr flushptr;
/* start at current insert position */
+ SpinLockAcquire(&slot->mutex);
slot->data.restart_lsn = GetXLogInsertRecPtr();
+ SpinLockRelease(&slot->mutex);
/* make sure we have enough information to start */
flushptr = LogStandbySnapshot();
@@ -1026,7 +1028,9 @@ ReplicationSlotReserveWal(void)
}
else
{
+ SpinLockAcquire(&slot->mutex);
slot->data.restart_lsn = GetRedoRecPtr();
+ SpinLockRelease(&slot->mutex);
}
/* prevent WAL removal as fast as possible */
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index e47ddca6bc..0b1f1ba3c1 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1040,7 +1040,9 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd)
static void
StartLogicalReplication(StartReplicationCmd *cmd)
{
- StringInfoData buf;
+ StringInfoData buf;
+ XLogRecPtr restart_lsn;
+ XLogRecPtr confirmed_lsn;
/* make sure that our requirements are still fulfilled */
CheckLogicalDecodingRequirements();
@@ -1081,14 +1083,20 @@ StartLogicalReplication(StartReplicationCmd *cmd)
WalSndWriteData,
WalSndUpdateProgress);
+ /* Fetch all needed values from the slot */
+ SpinLockAcquire(&MyReplicationSlot->mutex);
+ restart_lsn = MyReplicationSlot->data.restart_lsn;
+ confirmed_lsn = MyReplicationSlot->data.confirmed_flush;
+ SpinLockRelease(&MyReplicationSlot->mutex);
+
/* Start reading WAL from the oldest required WAL. */
- logical_startptr = MyReplicationSlot->data.restart_lsn;
+ logical_startptr = restart_lsn;
/*
* Report the location after which we'll send out further commits as the
* current sentPtr.
*/
- sentPtr = MyReplicationSlot->data.confirmed_flush;
+ sentPtr = confirmed_lsn;
/* Also update the sent position status in shared memory */
SpinLockAcquire(&MyWalSnd->mutex);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 76a88c6de7..6fa9df5553 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -86,6 +86,16 @@ typedef struct ReplicationSlotPersistentData
/*
* Shared memory state of a single replication slot.
+ *
+ * The data included in this structure, including the contents within
+ * ReplicationSlotPersistentData, are protected by mutex when read from
+ * other backends than the one registering the slot as in_use. If the
+ * slot is not marked as in_use, then no code paths refer or should refer
+ * to the in-memory data of a slot.
+ *
+ * Note that a slot is switched as in_use only with
+ * ReplicationSlotControlLock held in exclusive mode, protecting from any
+ * while readers have to hold this lock at least in shared mode.
*/
typedef struct ReplicationSlot
{
--
2.17.0
0002-Fix-a-couple-of-bugs-with-replication-slot-advancing.patchtext/x-diff; charset=us-asciiDownload
From 20af21231112a31f9d825d379bf75ce1e0aecf54 Mon Sep 17 00:00:00 2001
From: Michael Paquier <michael@paquier.xyz>
Date: Fri, 1 Jun 2018 14:37:50 -0400
Subject: [PATCH 2/2] Fix a couple of bugs with replication slot advancing
feature
A review of the code has showed up two issues fixed by this commit:
- Physical slots have been using the confirmed LSN position as a start
comparison point which is always 0/0, instead use the restart LSN
position (logical slots need to use the confirmed LSN position).
- Never return 0/0 is a slot cannot be advanced. This way, if a slot is
advanced while the activity is idle, then the same position is returned
by to caller over and over without raising an error.
Note that as the slot is owned by the backend advancing it, then the
read of those field is fine lockless, while updates need to happen while
the slot mutex is held.
Discussion: https://postgr.es/m/CANP8+jLyS=X-CAk59BJnsxKQfjwrmKicHQykyn52Qj-Q=9GLCw@mail.gmail.com
---
src/backend/replication/slotfuncs.c | 14 +++++++++-----
1 file changed, 9 insertions(+), 5 deletions(-)
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index d9e10263bb..151151b1a8 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -322,15 +322,15 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
static XLogRecPtr
pg_physical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
{
- XLogRecPtr retlsn = InvalidXLogRecPtr;
+ XLogRecPtr retlsn = startlsn;
- SpinLockAcquire(&MyReplicationSlot->mutex);
if (MyReplicationSlot->data.restart_lsn < moveto)
{
+ SpinLockAcquire(&MyReplicationSlot->mutex);
MyReplicationSlot->data.restart_lsn = moveto;
+ SpinLockRelease(&MyReplicationSlot->mutex)
retlsn = moveto;
}
- SpinLockRelease(&MyReplicationSlot->mutex);
return retlsn;
}
@@ -343,7 +343,7 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
{
LogicalDecodingContext *ctx;
ResourceOwner old_resowner = CurrentResourceOwner;
- XLogRecPtr retlsn = InvalidXLogRecPtr;
+ XLogRecPtr retlsn = startlsn;
PG_TRY();
{
@@ -472,7 +472,11 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
/* Acquire the slot so we "own" it */
ReplicationSlotAcquire(NameStr(*slotname), true);
- startlsn = MyReplicationSlot->data.confirmed_flush;
+ if (OidIsValid(MyReplicationSlot->data.database))
+ startlsn = MyReplicationSlot->data.confirmed_flush;
+ else
+ startlsn = MyReplicationSlot->data.restart_lsn;
+
if (moveto < startlsn)
{
ReplicationSlotRelease();
--
2.17.0
On Fri, Jun 01, 2018 at 02:53:09PM -0400, Michael Paquier wrote:
Definitely, while the previous patch was around mainly to show where
things are incorrect, I am attaching a set of patches for HEAD which can
be used for commit:
- One patch which addresses the several lock problems and adds some
instructions about the variables protected by spinlocks for slots in
use, which should be back-patched.
- A second patch for HEAD which addresses what has been noticed for the
new slot advance feature. This addresses as well the lock problems
introduced in the new advance code, hopefully the split makes sense to
others on this thread.
Once again those only apply to HEAD, please feel free to ping me if you
would like versions for back-branches (or anybody picking up those
patches).
And of course I found a typo just after sending those.. Please use the
attached ones instead.
--
Michael
Attachments:
0001-Fix-and-document-lock-handling-for-in-memory-replica.patchtext/x-diff; charset=us-asciiDownload
From 69bb70049d8ba74af60e8554fd6379499fbd29ff Mon Sep 17 00:00:00 2001
From: Michael Paquier <michael@paquier.xyz>
Date: Fri, 1 Jun 2018 14:30:55 -0400
Subject: [PATCH 1/2] Fix and document lock handling for in-memory replication
xslot data
While debugging issues on HEAD for the new slot forwarding feature of
Postgres 11, some monitoring of the code surrounding in-memory slot data
has proved that the lock handling may cause inconsistent data to be read
by read-only callers of slot functions, particularly
pg_get_replication_slots() which fetches data for the system view
pg_replication_slots.
The code paths involved in those problems concern the WAL sender,
logical decoding initialization and WAL reservation for slots.
A set of comments documenting all the lock handlings, particularly the
dependency with LW locks for slots and the in_use flag as well as the
internal mutex lock is added, based on a suggested by Simon Riggs.
Discussion: https://postgr.es/m/CANP8+jLyS=X-CAk59BJnsxKQfjwrmKicHQykyn52Qj-Q=9GLCw@mail.gmail.com
---
src/backend/replication/logical/logical.c | 13 +++++++++----
src/backend/replication/slot.c | 4 ++++
src/backend/replication/walsender.c | 14 +++++++++++---
src/include/replication/slot.h | 10 ++++++++++
4 files changed, 34 insertions(+), 7 deletions(-)
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1393591538..61588d626f 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -297,10 +297,12 @@ CreateInitDecodingContext(char *plugin,
xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot);
+ SpinLockAcquire(&slot->mutex);
slot->effective_catalog_xmin = xmin_horizon;
slot->data.catalog_xmin = xmin_horizon;
if (need_full_snapshot)
slot->effective_xmin = xmin_horizon;
+ SpinLockRelease(&slot->mutex);
ReplicationSlotsComputeRequiredXmin(true);
@@ -445,13 +447,14 @@ void
DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
{
XLogRecPtr startptr;
+ ReplicationSlot *slot = ctx->slot;
/* Initialize from where to start reading WAL. */
- startptr = ctx->slot->data.restart_lsn;
+ startptr = slot->data.restart_lsn;
elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X",
- (uint32) (ctx->slot->data.restart_lsn >> 32),
- (uint32) ctx->slot->data.restart_lsn);
+ (uint32) (slot->data.restart_lsn >> 32),
+ (uint32) slot->data.restart_lsn);
/* Wait for a consistent starting point */
for (;;)
@@ -477,7 +480,9 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
CHECK_FOR_INTERRUPTS();
}
- ctx->slot->data.confirmed_flush = ctx->reader->EndRecPtr;
+ SpinLockAcquire(&slot->mutex);
+ slot->data.confirmed_flush = ctx->reader->EndRecPtr;
+ SpinLockRelease(&slot->mutex);
}
/*
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 056628fe8e..79d7a57d67 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1016,7 +1016,9 @@ ReplicationSlotReserveWal(void)
XLogRecPtr flushptr;
/* start at current insert position */
+ SpinLockAcquire(&slot->mutex);
slot->data.restart_lsn = GetXLogInsertRecPtr();
+ SpinLockRelease(&slot->mutex);
/* make sure we have enough information to start */
flushptr = LogStandbySnapshot();
@@ -1026,7 +1028,9 @@ ReplicationSlotReserveWal(void)
}
else
{
+ SpinLockAcquire(&slot->mutex);
slot->data.restart_lsn = GetRedoRecPtr();
+ SpinLockRelease(&slot->mutex);
}
/* prevent WAL removal as fast as possible */
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index e47ddca6bc..0b1f1ba3c1 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1040,7 +1040,9 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd)
static void
StartLogicalReplication(StartReplicationCmd *cmd)
{
- StringInfoData buf;
+ StringInfoData buf;
+ XLogRecPtr restart_lsn;
+ XLogRecPtr confirmed_lsn;
/* make sure that our requirements are still fulfilled */
CheckLogicalDecodingRequirements();
@@ -1081,14 +1083,20 @@ StartLogicalReplication(StartReplicationCmd *cmd)
WalSndWriteData,
WalSndUpdateProgress);
+ /* Fetch all needed values from the slot */
+ SpinLockAcquire(&MyReplicationSlot->mutex);
+ restart_lsn = MyReplicationSlot->data.restart_lsn;
+ confirmed_lsn = MyReplicationSlot->data.confirmed_flush;
+ SpinLockRelease(&MyReplicationSlot->mutex);
+
/* Start reading WAL from the oldest required WAL. */
- logical_startptr = MyReplicationSlot->data.restart_lsn;
+ logical_startptr = restart_lsn;
/*
* Report the location after which we'll send out further commits as the
* current sentPtr.
*/
- sentPtr = MyReplicationSlot->data.confirmed_flush;
+ sentPtr = confirmed_lsn;
/* Also update the sent position status in shared memory */
SpinLockAcquire(&MyWalSnd->mutex);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 76a88c6de7..6fa9df5553 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -86,6 +86,16 @@ typedef struct ReplicationSlotPersistentData
/*
* Shared memory state of a single replication slot.
+ *
+ * The data included in this structure, including the contents within
+ * ReplicationSlotPersistentData, are protected by mutex when read from
+ * other backends than the one registering the slot as in_use. If the
+ * slot is not marked as in_use, then no code paths refer or should refer
+ * to the in-memory data of a slot.
+ *
+ * Note that a slot is switched as in_use only with
+ * ReplicationSlotControlLock held in exclusive mode, protecting from any
+ * while readers have to hold this lock at least in shared mode.
*/
typedef struct ReplicationSlot
{
--
2.17.0
0002-Fix-a-couple-of-bugs-with-replication-slot-advancing.patchtext/x-diff; charset=us-asciiDownload
From 7032bb81531527c495c77f97bdc170a6aa7e09ad Mon Sep 17 00:00:00 2001
From: Michael Paquier <michael@paquier.xyz>
Date: Fri, 1 Jun 2018 14:37:50 -0400
Subject: [PATCH 2/2] Fix a couple of bugs with replication slot advancing
feature
A review of the code has showed up two issues fixed by this commit:
- Physical slots have been using the confirmed LSN position as a start
comparison point which is always 0/0, instead use the restart LSN
position (logical slots need to use the confirmed LSN position).
- Never return 0/0 is a slot cannot be advanced. This way, if a slot is
advanced while the activity is idle, then the same position is returned
by to caller over and over without raising an error.
Note that as the slot is owned by the backend advancing it, then the
read of those field is fine lockless, while updates need to happen while
the slot mutex is held.
Discussion: https://postgr.es/m/CANP8+jLyS=X-CAk59BJnsxKQfjwrmKicHQykyn52Qj-Q=9GLCw@mail.gmail.com
---
src/backend/replication/slotfuncs.c | 14 +++++++++-----
1 file changed, 9 insertions(+), 5 deletions(-)
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index d9e10263bb..d07d2c432b 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -322,15 +322,15 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
static XLogRecPtr
pg_physical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
{
- XLogRecPtr retlsn = InvalidXLogRecPtr;
+ XLogRecPtr retlsn = startlsn;
- SpinLockAcquire(&MyReplicationSlot->mutex);
if (MyReplicationSlot->data.restart_lsn < moveto)
{
+ SpinLockAcquire(&MyReplicationSlot->mutex);
MyReplicationSlot->data.restart_lsn = moveto;
+ SpinLockRelease(&MyReplicationSlot->mutex);
retlsn = moveto;
}
- SpinLockRelease(&MyReplicationSlot->mutex);
return retlsn;
}
@@ -343,7 +343,7 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
{
LogicalDecodingContext *ctx;
ResourceOwner old_resowner = CurrentResourceOwner;
- XLogRecPtr retlsn = InvalidXLogRecPtr;
+ XLogRecPtr retlsn = startlsn;
PG_TRY();
{
@@ -472,7 +472,11 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
/* Acquire the slot so we "own" it */
ReplicationSlotAcquire(NameStr(*slotname), true);
- startlsn = MyReplicationSlot->data.confirmed_flush;
+ if (OidIsValid(MyReplicationSlot->data.database))
+ startlsn = MyReplicationSlot->data.confirmed_flush;
+ else
+ startlsn = MyReplicationSlot->data.restart_lsn;
+
if (moveto < startlsn)
{
ReplicationSlotRelease();
--
2.17.0
Hi,
On 01/06/18 21:13, Michael Paquier wrote:
- startlsn = MyReplicationSlot->data.confirmed_flush; + if (OidIsValid(MyReplicationSlot->data.database)) + startlsn = MyReplicationSlot->data.confirmed_flush; + else + startlsn = MyReplicationSlot->data.restart_lsn; + if (moveto < startlsn) { ReplicationSlotRelease();
This part looks correct for the checking that we are not moving
backwards. However, there is another existing issue with this code which
is that we are later using the confirmed_flush (via startlsn) as start
point of logical decoding (XLogReadRecord parameter in
pg_logical_replication_slot_advance) which is not correct. The
restart_lsn should be used for that. I think it would make sense to fix
that as part of this patch as well.
--
Petr Jelinek http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
On Mon, Jun 04, 2018 at 11:51:35AM +0200, Petr Jelinek wrote:
On 01/06/18 21:13, Michael Paquier wrote:
- startlsn = MyReplicationSlot->data.confirmed_flush; + if (OidIsValid(MyReplicationSlot->data.database)) + startlsn = MyReplicationSlot->data.confirmed_flush; + else + startlsn = MyReplicationSlot->data.restart_lsn; + if (moveto < startlsn) { ReplicationSlotRelease();This part looks correct for the checking that we are not moving
backwards. However, there is another existing issue with this code which
is that we are later using the confirmed_flush (via startlsn) as start
point of logical decoding (XLogReadRecord parameter in
pg_logical_replication_slot_advance) which is not correct. The
restart_lsn should be used for that. I think it would make sense to fix
that as part of this patch as well.
I am not sure I understand what you are coming at here. Could you
explain your point a bit more please as 9c7d06d is yours? When creating
the decoding context, all other code paths use the confirmed LSN as a
start point if not explicitely defined by the caller of
CreateDecodingContext, as it points to the last LSN where a transaction
has been committed and decoded. The backward check is also correct to
me, for which I propose to add a comment block like that:
+ /*
+ * Check if the slot is not moving backwards. Physical slots rely
+ * simply on restart_lsn as a minimum point, while logical slots
+ * have confirmed consumption up to confirmed_lsn, meaning that
+ * in both cases data older than that is not available anymore.
+ */
+ if (OidIsValid(MyReplicationSlot->data.database))
+ minlsn = MyReplicationSlot->data.confirmed_flush;
+ else
+ minlsn = MyReplicationSlot->data.restart_lsn;
Any tests I do are showing me that using confirmed_lsn would not matter
much? as we want the slot's consumer to still decode transactions whose
commits happened after the point where the slot has been advanced to.
So let's make sure that we are on the same page for the starting
LSN used.
On top of that, the locking issues in CreateInitDecodingContext() and
DecodingContextFindStartpoint go back to 9.4. So I would be inclined to
get 0001 applied first as a bug fix on all branches, still that's a
minor issue so there could be arguments for just doing it on HEAD. I am
as well fully open to suggestions for the extra comments which document
the use of ReplicationSlotControlLock and mutex for in-memory slot data.
Any thoughts about those two last points?
--
Michael
Attachments:
0001-Fix-and-document-lock-handling-for-in-memory-replica.patchtext/x-diff; charset=us-asciiDownload
From 0cd02e359cbf5b74c8231f6619bc479a314213bc Mon Sep 17 00:00:00 2001
From: Michael Paquier <michael@paquier.xyz>
Date: Fri, 1 Jun 2018 14:30:55 -0400
Subject: [PATCH] Fix and document lock handling for in-memory replication slot
data
While debugging issues on HEAD for the new slot forwarding feature of
Postgres 11, some monitoring of the code surrounding in-memory slot data
has proved that the lock handling may cause inconsistent data to be read
by read-only callers of slot functions, particularly
pg_get_replication_slots() which fetches data for the system view
pg_replication_slots.
The code paths involved in those problems concern logical decoding
initialization (down to 9.4) and WAL reservation for slots (new as of
10).
A set of comments documenting all the lock handlings, particularly the
dependency with LW locks for slots and the in_use flag as well as the
internal mutex lock is added, based on a suggested by Simon Riggs.
Backpatch down to 9.4, where WAL decoding has been introduced.
Discussion: https://postgr.es/m/CANP8+jLyS=X-CAk59BJnsxKQfjwrmKicHQykyn52Qj-Q=9GLCw@mail.gmail.com
---
src/backend/replication/logical/logical.c | 13 +++++++++----
src/backend/replication/slot.c | 4 ++++
src/include/replication/slot.h | 13 +++++++++++++
3 files changed, 26 insertions(+), 4 deletions(-)
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1393591538..61588d626f 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -297,10 +297,12 @@ CreateInitDecodingContext(char *plugin,
xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot);
+ SpinLockAcquire(&slot->mutex);
slot->effective_catalog_xmin = xmin_horizon;
slot->data.catalog_xmin = xmin_horizon;
if (need_full_snapshot)
slot->effective_xmin = xmin_horizon;
+ SpinLockRelease(&slot->mutex);
ReplicationSlotsComputeRequiredXmin(true);
@@ -445,13 +447,14 @@ void
DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
{
XLogRecPtr startptr;
+ ReplicationSlot *slot = ctx->slot;
/* Initialize from where to start reading WAL. */
- startptr = ctx->slot->data.restart_lsn;
+ startptr = slot->data.restart_lsn;
elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X",
- (uint32) (ctx->slot->data.restart_lsn >> 32),
- (uint32) ctx->slot->data.restart_lsn);
+ (uint32) (slot->data.restart_lsn >> 32),
+ (uint32) slot->data.restart_lsn);
/* Wait for a consistent starting point */
for (;;)
@@ -477,7 +480,9 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
CHECK_FOR_INTERRUPTS();
}
- ctx->slot->data.confirmed_flush = ctx->reader->EndRecPtr;
+ SpinLockAcquire(&slot->mutex);
+ slot->data.confirmed_flush = ctx->reader->EndRecPtr;
+ SpinLockRelease(&slot->mutex);
}
/*
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 056628fe8e..79d7a57d67 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1016,7 +1016,9 @@ ReplicationSlotReserveWal(void)
XLogRecPtr flushptr;
/* start at current insert position */
+ SpinLockAcquire(&slot->mutex);
slot->data.restart_lsn = GetXLogInsertRecPtr();
+ SpinLockRelease(&slot->mutex);
/* make sure we have enough information to start */
flushptr = LogStandbySnapshot();
@@ -1026,7 +1028,9 @@ ReplicationSlotReserveWal(void)
}
else
{
+ SpinLockAcquire(&slot->mutex);
slot->data.restart_lsn = GetRedoRecPtr();
+ SpinLockRelease(&slot->mutex);
}
/* prevent WAL removal as fast as possible */
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 76a88c6de7..2af2a14994 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -86,6 +86,19 @@ typedef struct ReplicationSlotPersistentData
/*
* Shared memory state of a single replication slot.
+ *
+ * The in-memory data of replication slots follows a locking model based
+ * on two linked concepts:
+ * - A replication slot's in_use is switched when added or discarded using
+ * the LWLock ReplicationSlotControlLock, which needs to be hold in exclusive
+ * mode when updating the flag by the backend owning the slot and doing the
+ * operation, while readers (concurrent backends not owning the slot) need
+ * to hold it in shared mode when looking at replication slot data.
+ * - Individual fields are protected by mutex where only the backend owning
+ * the slot is authorized to update the fields from its own slot. The
+ * backend owning the slot does not need to take this lock when reading its
+ * own fields, while concurrent backends not owning this slot should take the
+ * lock when reading this slot's data.
*/
typedef struct ReplicationSlot
{
--
2.17.0
On Mon, Jun 04, 2018 at 11:51:35AM +0200, Petr Jelinek wrote:
On 01/06/18 21:13, Michael Paquier wrote:
- startlsn =3D MyReplicationSlot->data.confirmed_flush; + if (OidIsValid(MyReplicationSlot->data.database)) + startlsn =3D MyReplicationSlot->data.confirmed_flush; + else + startlsn =3D MyReplicationSlot->data.restart_lsn; + if (moveto < startlsn) { ReplicationSlotRelease();This part looks correct for the checking that we are not moving
backwards. However, there is another existing issue with this code
which
is that we are later using the confirmed_flush (via startlsn) as start
point of logical decoding (XLogReadRecord parameter in
pg_logical_replication_slot_advance) which is not correct. The
restart_lsn should be used for that. I think it would make sense to
fix
that as part of this patch as well.
I am not sure I understand what you are coming at here. Could you
explain your point a bit more please as 9c7d06d is yours? When creating
the decoding context, all other code paths use the confirmed LSN as a
start point if not explicitely defined by the caller of
CreateDecodingContext, as it points to the last LSN where a transaction
has been committed and decoded. The backward check is also correct to
me, for which I propose to add a comment block like that:
+ /*
+ * Check if the slot is not moving backwards. Physical slots rely
+ * simply on restart_lsn as a minimum point, while logical slots
+ * have confirmed consumption up to confirmed_lsn, meaning that
+ * in both cases data older than that is not available anymore.
+ */
+ if (OidIsValid(MyReplicationSlot->data.database))
+ minlsn = MyReplicationSlot->data.confirmed_flush;
+ else
+ minlsn = MyReplicationSlot->data.restart_lsn;
Any tests I do are showing me that using confirmed_lsn would not matter
much? as we want the slot's consumer to still decode transactions whose
commits happened after the point where the slot has been advanced to.
So let's make sure that we are on the same page for the starting
LSN used.
On top of that, the locking issues in CreateInitDecodingContext() and
DecodingContextFindStartpoint go back to 9.4. So I would be inclined to
get 0001 applied first as a bug fix on all branches, still that's a
minor issue so there could be arguments for just doing it on HEAD. I am
as well fully open to suggestions for the extra comments which document
the use of ReplicationSlotControlLock and mutex for in-memory slot data.
Any thoughts about those two last points?
--
Michael
Attachments:
0001-Fix-and-document-lock-handling-for-in-memory-replica.patchtext/x-diff; charset=us-asciiDownload
From 0cd02e359cbf5b74c8231f6619bc479a314213bc Mon Sep 17 00:00:00 2001
From: Michael Paquier <michael@paquier.xyz>
Date: Fri, 1 Jun 2018 14:30:55 -0400
Subject: [PATCH] Fix and document lock handling for in-memory replication slot
data
While debugging issues on HEAD for the new slot forwarding feature of
Postgres 11, some monitoring of the code surrounding in-memory slot data
has proved that the lock handling may cause inconsistent data to be read
by read-only callers of slot functions, particularly
pg_get_replication_slots() which fetches data for the system view
pg_replication_slots.
The code paths involved in those problems concern logical decoding
initialization (down to 9.4) and WAL reservation for slots (new as of
10).
A set of comments documenting all the lock handlings, particularly the
dependency with LW locks for slots and the in_use flag as well as the
internal mutex lock is added, based on a suggested by Simon Riggs.
Backpatch down to 9.4, where WAL decoding has been introduced.
Discussion: https://postgr.es/m/CANP8+jLyS=X-CAk59BJnsxKQfjwrmKicHQykyn52Qj-Q=9GLCw@mail.gmail.com
---
src/backend/replication/logical/logical.c | 13 +++++++++----
src/backend/replication/slot.c | 4 ++++
src/include/replication/slot.h | 13 +++++++++++++
3 files changed, 26 insertions(+), 4 deletions(-)
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1393591538..61588d626f 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -297,10 +297,12 @@ CreateInitDecodingContext(char *plugin,
xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot);
+ SpinLockAcquire(&slot->mutex);
slot->effective_catalog_xmin = xmin_horizon;
slot->data.catalog_xmin = xmin_horizon;
if (need_full_snapshot)
slot->effective_xmin = xmin_horizon;
+ SpinLockRelease(&slot->mutex);
ReplicationSlotsComputeRequiredXmin(true);
@@ -445,13 +447,14 @@ void
DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
{
XLogRecPtr startptr;
+ ReplicationSlot *slot = ctx->slot;
/* Initialize from where to start reading WAL. */
- startptr = ctx->slot->data.restart_lsn;
+ startptr = slot->data.restart_lsn;
elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X",
- (uint32) (ctx->slot->data.restart_lsn >> 32),
- (uint32) ctx->slot->data.restart_lsn);
+ (uint32) (slot->data.restart_lsn >> 32),
+ (uint32) slot->data.restart_lsn);
/* Wait for a consistent starting point */
for (;;)
@@ -477,7 +480,9 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
CHECK_FOR_INTERRUPTS();
}
- ctx->slot->data.confirmed_flush = ctx->reader->EndRecPtr;
+ SpinLockAcquire(&slot->mutex);
+ slot->data.confirmed_flush = ctx->reader->EndRecPtr;
+ SpinLockRelease(&slot->mutex);
}
/*
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 056628fe8e..79d7a57d67 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1016,7 +1016,9 @@ ReplicationSlotReserveWal(void)
XLogRecPtr flushptr;
/* start at current insert position */
+ SpinLockAcquire(&slot->mutex);
slot->data.restart_lsn = GetXLogInsertRecPtr();
+ SpinLockRelease(&slot->mutex);
/* make sure we have enough information to start */
flushptr = LogStandbySnapshot();
@@ -1026,7 +1028,9 @@ ReplicationSlotReserveWal(void)
}
else
{
+ SpinLockAcquire(&slot->mutex);
slot->data.restart_lsn = GetRedoRecPtr();
+ SpinLockRelease(&slot->mutex);
}
/* prevent WAL removal as fast as possible */
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 76a88c6de7..2af2a14994 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -86,6 +86,19 @@ typedef struct ReplicationSlotPersistentData
/*
* Shared memory state of a single replication slot.
+ *
+ * The in-memory data of replication slots follows a locking model based
+ * on two linked concepts:
+ * - A replication slot's in_use is switched when added or discarded using
+ * the LWLock ReplicationSlotControlLock, which needs to be hold in exclusive
+ * mode when updating the flag by the backend owning the slot and doing the
+ * operation, while readers (concurrent backends not owning the slot) need
+ * to hold it in shared mode when looking at replication slot data.
+ * - Individual fields are protected by mutex where only the backend owning
+ * the slot is authorized to update the fields from its own slot. The
+ * backend owning the slot does not need to take this lock when reading its
+ * own fields, while concurrent backends not owning this slot should take the
+ * lock when reading this slot's data.
*/
typedef struct ReplicationSlot
{
--
2.17.0
On 05/06/18 06:28, Michael Paquier wrote:
On Mon, Jun 04, 2018 at 11:51:35AM +0200, Petr Jelinek wrote:
On 01/06/18 21:13, Michael Paquier wrote:
- startlsn =3D MyReplicationSlot->data.confirmed_flush; + if (OidIsValid(MyReplicationSlot->data.database)) + startlsn =3D MyReplicationSlot->data.confirmed_flush; + else + startlsn =3D MyReplicationSlot->data.restart_lsn; + if (moveto < startlsn) { ReplicationSlotRelease();This part looks correct for the checking that we are not moving
backwards. However, there is another existing issue with this code
which
is that we are later using the confirmed_flush (via startlsn) as start
point of logical decoding (XLogReadRecord parameter in
pg_logical_replication_slot_advance) which is not correct. The
restart_lsn should be used for that. I think it would make sense to
fix
that as part of this patch as well.I am not sure I understand what you are coming at here. Could you
explain your point a bit more please as 9c7d06d is yours?
As I said, it's an existing issue. I just had chance to reread the code
when looking and your patch.
When creating
the decoding context, all other code paths use the confirmed LSN as a
start point if not explicitely defined by the caller of
CreateDecodingContext, as it points to the last LSN where a transaction
has been committed and decoded.
I didn't say anything about CreateDecodingContext though. I am talking
about the fact that we then use the same variable as input to
XLogReadRecord later in the logical slot code path. The whole point of
having restart_lsn for logical slot is to have correct start point for
XLogReadRecord so using the confirmed_flush there is wrong (and has been
wrong since the original commit).
--
Petr Jelinek http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
On Tue, Jun 05, 2018 at 01:00:30PM +0200, Petr Jelinek wrote:
I didn't say anything about CreateDecodingContext though. I am talking
about the fact that we then use the same variable as input to
XLogReadRecord later in the logical slot code path. The whole point of
having restart_lsn for logical slot is to have correct start point for
XLogReadRecord so using the confirmed_flush there is wrong (and has been
wrong since the original commit).
OK, I finally see the point you are coming at after a couple of hours
brainstorming about it, and indeed using confirmed_lsn is logically
incorrect so the current code is inconsistent with what
DecodingContextFindStartpoint() does, so we rely on restart_lsn to scan
for all the records and the decoder state is here to make sure that the
slot's confirmed_lsn is updated to a consistent position. I have added
your feedback in the attached (indented and such), which results in some
simplifications with a couple of routines.
I am attaching as well the patch I sent yesterday. 0001 is candidate
for a back-patch, 0002 is for HEAD to fix the slot advance stuff.
There is another open item related to slot advancing here:
/messages/by-id/2840048a-1184-417a-9da8-3299d207a1d7@postgrespro.ru
And per my tests the patch is solving this item as well. I have just
used the test mentioned in the thread which:
1) creates a slot
2) does some activity to generate a couple of WAL pages
3) advances the slot at page boundary
4) Moves again the slot.
This test crashes on HEAD at step 4, and not with the attached.
What do you think?
--
Michael
Attachments:
0001-Fix-and-document-lock-handling-for-in-memory-replica.patchtext/x-diff; charset=us-asciiDownload
From 0cd02e359cbf5b74c8231f6619bc479a314213bc Mon Sep 17 00:00:00 2001
From: Michael Paquier <michael@paquier.xyz>
Date: Fri, 1 Jun 2018 14:30:55 -0400
Subject: [PATCH 1/2] Fix and document lock handling for in-memory replication
slot data
While debugging issues on HEAD for the new slot forwarding feature of
Postgres 11, some monitoring of the code surrounding in-memory slot data
has proved that the lock handling may cause inconsistent data to be read
by read-only callers of slot functions, particularly
pg_get_replication_slots() which fetches data for the system view
pg_replication_slots.
The code paths involved in those problems concern logical decoding
initialization (down to 9.4) and WAL reservation for slots (new as of
10).
A set of comments documenting all the lock handlings, particularly the
dependency with LW locks for slots and the in_use flag as well as the
internal mutex lock is added, based on a suggested by Simon Riggs.
Backpatch down to 9.4, where WAL decoding has been introduced.
Discussion: https://postgr.es/m/CANP8+jLyS=X-CAk59BJnsxKQfjwrmKicHQykyn52Qj-Q=9GLCw@mail.gmail.com
---
src/backend/replication/logical/logical.c | 13 +++++++++----
src/backend/replication/slot.c | 4 ++++
src/include/replication/slot.h | 13 +++++++++++++
3 files changed, 26 insertions(+), 4 deletions(-)
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1393591538..61588d626f 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -297,10 +297,12 @@ CreateInitDecodingContext(char *plugin,
xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot);
+ SpinLockAcquire(&slot->mutex);
slot->effective_catalog_xmin = xmin_horizon;
slot->data.catalog_xmin = xmin_horizon;
if (need_full_snapshot)
slot->effective_xmin = xmin_horizon;
+ SpinLockRelease(&slot->mutex);
ReplicationSlotsComputeRequiredXmin(true);
@@ -445,13 +447,14 @@ void
DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
{
XLogRecPtr startptr;
+ ReplicationSlot *slot = ctx->slot;
/* Initialize from where to start reading WAL. */
- startptr = ctx->slot->data.restart_lsn;
+ startptr = slot->data.restart_lsn;
elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X",
- (uint32) (ctx->slot->data.restart_lsn >> 32),
- (uint32) ctx->slot->data.restart_lsn);
+ (uint32) (slot->data.restart_lsn >> 32),
+ (uint32) slot->data.restart_lsn);
/* Wait for a consistent starting point */
for (;;)
@@ -477,7 +480,9 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
CHECK_FOR_INTERRUPTS();
}
- ctx->slot->data.confirmed_flush = ctx->reader->EndRecPtr;
+ SpinLockAcquire(&slot->mutex);
+ slot->data.confirmed_flush = ctx->reader->EndRecPtr;
+ SpinLockRelease(&slot->mutex);
}
/*
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 056628fe8e..79d7a57d67 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1016,7 +1016,9 @@ ReplicationSlotReserveWal(void)
XLogRecPtr flushptr;
/* start at current insert position */
+ SpinLockAcquire(&slot->mutex);
slot->data.restart_lsn = GetXLogInsertRecPtr();
+ SpinLockRelease(&slot->mutex);
/* make sure we have enough information to start */
flushptr = LogStandbySnapshot();
@@ -1026,7 +1028,9 @@ ReplicationSlotReserveWal(void)
}
else
{
+ SpinLockAcquire(&slot->mutex);
slot->data.restart_lsn = GetRedoRecPtr();
+ SpinLockRelease(&slot->mutex);
}
/* prevent WAL removal as fast as possible */
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 76a88c6de7..2af2a14994 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -86,6 +86,19 @@ typedef struct ReplicationSlotPersistentData
/*
* Shared memory state of a single replication slot.
+ *
+ * The in-memory data of replication slots follows a locking model based
+ * on two linked concepts:
+ * - A replication slot's in_use is switched when added or discarded using
+ * the LWLock ReplicationSlotControlLock, which needs to be hold in exclusive
+ * mode when updating the flag by the backend owning the slot and doing the
+ * operation, while readers (concurrent backends not owning the slot) need
+ * to hold it in shared mode when looking at replication slot data.
+ * - Individual fields are protected by mutex where only the backend owning
+ * the slot is authorized to update the fields from its own slot. The
+ * backend owning the slot does not need to take this lock when reading its
+ * own fields, while concurrent backends not owning this slot should take the
+ * lock when reading this slot's data.
*/
typedef struct ReplicationSlot
{
--
2.17.1
0002-Fix-a-couple-of-bugs-with-replication-slot-advancing.patchtext/x-diff; charset=us-asciiDownload
From 98ffc5a3705f3cf99f958c696541896825fb2a01 Mon Sep 17 00:00:00 2001
From: Michael Paquier <michael@paquier.xyz>
Date: Fri, 1 Jun 2018 14:37:50 -0400
Subject: [PATCH 2/2] Fix a couple of bugs with replication slot advancing
feature
A review of the code has showed up a couple of issues fixed by this
commit:
- Physical slots have been using the confirmed LSN position as a start
comparison point which is always 0/0, instead use the restart LSN
position (logical slots need to use the confirmed LSN position, which
was correct).
- The actual slot update was incorrect for both physical and logical
slots. Physical slots need to use their restart_lsn as base comparison
point (confirmed_flush was used because of previous point), and logical
slots need to begin reading WAL from restart_lsn (confirmed_flush was
used as well), while confirmed_flush is compiled depending on the
decoding context and record read.
- Never return 0/0 is a slot cannot be advanced. This way, if a slot is
advanced while the activity is idle, then the same position is returned
by to caller over and over without raising an error. Instead return the
LSN the slot has been advanced to. With repetitive calls, the same
position is returned.
Note that as the slot is owned by the backend advancing it, then the
read of those field is fine lockless, while updates need to happen while
the slot mutex is held, so fix that on the way.
Author: Michael Paquier
Reviewed-by: Peter Jelinek, Simon Riggs
Discussion: https://postgr.es/m/CANP8+jLyS=X-CAk59BJnsxKQfjwrmKicHQykyn52Qj-Q=9GLCw@mail.gmail.com
Discussion: https://www.postgresql.org/message-id/2840048a-1184-417a-9da8-3299d207a1d7%40postgrespro.ru
---
src/backend/replication/slotfuncs.c | 50 +++++++++++++++++++++--------
1 file changed, 36 insertions(+), 14 deletions(-)
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index d9e10263bb..63cada0786 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -318,32 +318,43 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
/*
* Helper function for advancing physical replication slot forward.
+ * The LSN position to move to is compared simply to the slot's
+ * restart_lsn, knowing that any position older than that would be
+ * removed by successive checkpoints.
*/
static XLogRecPtr
-pg_physical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
+pg_physical_replication_slot_advance(XLogRecPtr moveto)
{
- XLogRecPtr retlsn = InvalidXLogRecPtr;
+ XLogRecPtr startlsn = MyReplicationSlot->data.restart_lsn;
+ XLogRecPtr retlsn = startlsn;
- SpinLockAcquire(&MyReplicationSlot->mutex);
- if (MyReplicationSlot->data.restart_lsn < moveto)
+ if (startlsn < moveto)
{
+ SpinLockAcquire(&MyReplicationSlot->mutex);
MyReplicationSlot->data.restart_lsn = moveto;
+ SpinLockRelease(&MyReplicationSlot->mutex);
retlsn = moveto;
}
- SpinLockRelease(&MyReplicationSlot->mutex);
return retlsn;
}
/*
* Helper function for advancing logical replication slot forward.
+ * The slot's restart_lsn is used as start point for reading records,
+ * while confirmed_lsn is used as base point for the decoding context.
+ * The LSN position to move to is checked by doing a per-record scan and
+ * logical decoding which makes sure that confirmed_lsn is updated to a
+ * LSN which allows the future slot consumer to get consistent logical
+ * changes.
*/
static XLogRecPtr
-pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
+pg_logical_replication_slot_advance(XLogRecPtr moveto)
{
LogicalDecodingContext *ctx;
ResourceOwner old_resowner = CurrentResourceOwner;
- XLogRecPtr retlsn = InvalidXLogRecPtr;
+ XLogRecPtr startlsn = MyReplicationSlot->data.restart_lsn;
+ XLogRecPtr retlsn = startlsn;
PG_TRY();
{
@@ -384,7 +395,7 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
if (record != NULL)
LogicalDecodingProcessRecord(ctx, ctx->reader);
- /* check limits */
+ /* Stop once the moving point wanted by caller has been reached */
if (moveto <= ctx->reader->EndRecPtr)
break;
@@ -441,7 +452,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
Name slotname = PG_GETARG_NAME(0);
XLogRecPtr moveto = PG_GETARG_LSN(1);
XLogRecPtr endlsn;
- XLogRecPtr startlsn;
+ XLogRecPtr minlsn;
TupleDesc tupdesc;
Datum values[2];
bool nulls[2];
@@ -472,21 +483,32 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
/* Acquire the slot so we "own" it */
ReplicationSlotAcquire(NameStr(*slotname), true);
- startlsn = MyReplicationSlot->data.confirmed_flush;
- if (moveto < startlsn)
+ /*
+ * Check if the slot is not moving backwards. Physical slots rely simply
+ * on restart_lsn as a minimum point, while logical slots have confirmed
+ * consumption up to confirmed_lsn, meaning that in both cases data older
+ * than that is not available anymore.
+ */
+ if (OidIsValid(MyReplicationSlot->data.database))
+ minlsn = MyReplicationSlot->data.confirmed_flush;
+ else
+ minlsn = MyReplicationSlot->data.restart_lsn;
+
+ if (moveto < minlsn)
{
ReplicationSlotRelease();
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot move slot to %X/%X, minimum is %X/%X",
(uint32) (moveto >> 32), (uint32) moveto,
- (uint32) (startlsn >> 32), (uint32) startlsn)));
+ (uint32) (minlsn >> 32), (uint32) minlsn)));
}
+ /* Do the actual slot update, depending on the slot type */
if (OidIsValid(MyReplicationSlot->data.database))
- endlsn = pg_logical_replication_slot_advance(startlsn, moveto);
+ endlsn = pg_logical_replication_slot_advance(moveto);
else
- endlsn = pg_physical_replication_slot_advance(startlsn, moveto);
+ endlsn = pg_physical_replication_slot_advance(moveto);
values[0] = NameGetDatum(&MyReplicationSlot->data.name);
nulls[0] = false;
--
2.17.1
On 06/06/18 04:04, Michael Paquier wrote:
On Tue, Jun 05, 2018 at 01:00:30PM +0200, Petr Jelinek wrote:
I didn't say anything about CreateDecodingContext though. I am talking
about the fact that we then use the same variable as input to
XLogReadRecord later in the logical slot code path. The whole point of
having restart_lsn for logical slot is to have correct start point for
XLogReadRecord so using the confirmed_flush there is wrong (and has been
wrong since the original commit).OK, I finally see the point you are coming at after a couple of hours
brainstorming about it, and indeed using confirmed_lsn is logically
incorrect so the current code is inconsistent with what
DecodingContextFindStartpoint() does, so we rely on restart_lsn to scan
for all the records and the decoder state is here to make sure that the
slot's confirmed_lsn is updated to a consistent position. I have added
your feedback in the attached (indented and such), which results in some
simplifications with a couple of routines.I am attaching as well the patch I sent yesterday. 0001 is candidate
for a back-patch, 0002 is for HEAD to fix the slot advance stuff.There is another open item related to slot advancing here:
/messages/by-id/2840048a-1184-417a-9da8-3299d207a1d7@postgrespro.ru
And per my tests the patch is solving this item as well. I have just
used the test mentioned in the thread which:
1) creates a slot
2) does some activity to generate a couple of WAL pages
3) advances the slot at page boundary
4) Moves again the slot.
This test crashes on HEAD at step 4, and not with the attached.What do you think?
Seems reasonable to me.
I think the only thing to note about the patches from my side is that we
probably don't want to default to restart_lsn for the
pg_logical_replication_slot_advance() return value (when nothing was
done) but rather the confirmed_lsn. As it is in current patch if we call
the function repeatedly and one call moved slot forward but the next one
didn't the return value will go backwards as restart_lsn tends to be
behind the confirmed one.
--
Petr Jelinek http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
On Wed, Jun 06, 2018 at 04:57:22PM +0200, Petr Jelinek wrote:
I think the only thing to note about the patches from my side is that we
probably don't want to default to restart_lsn for the
pg_logical_replication_slot_advance() return value (when nothing was
done) but rather the confirmed_lsn. As it is in current patch if we call
the function repeatedly and one call moved slot forward but the next one
didn't the return value will go backwards as restart_lsn tends to be
behind the confirmed one.
It does not matter much as the PG_TRY loop would still enforce the
result to confirmed_lsn anyway if nothing happens, still let's do as you
suggest as that's more consistent.
--
Michael
On Wed, Jun 06, 2018 at 11:04:39AM +0900, Michael Paquier wrote:
I am attaching as well the patch I sent yesterday. 0001 is candidate
for a back-patch, 0002 is for HEAD to fix the slot advance stuff.
I have been able to look again at 0001 and pushed it as 9e149c8. As
reading inconsistent data from replication slots is rather hard to
trigger, I have just pushed the patch to HEAD. I'll look at 0002
tomorrow.
--
Michael
On Sun, Jun 10, 2018 at 10:19:23PM +0900, Michael Paquier wrote:
I have been able to look again at 0001 and pushed it as 9e149c8. As
reading inconsistent data from replication slots is rather hard to
trigger, I have just pushed the patch to HEAD. I'll look at 0002
tomorrow.
And pushed 0002 as f731cfa9, so we should be good with this open item.
--
Michael