[PATCH] Logical decoding timeline following take II
Hi all
Attached is a rebased and updated logical decoding timeline following
patch for 10.0.
This is a pre-requisite for the pending work on logical decoding on
standby servers and simplified failover of logical decoding.
Restating the commit message:
__________
Follow timeline switches in logical decoding
When decoding from a logical slot, it's necessary for xlog reading to
be able to read xlog from historical (i.e. not current) timelines.
Otherwise decoding fails after failover to a physical replica because
the oldest still-needed archives are in the historical timeline.
Supporting logical decoding timeline following is a pre-requisite for
logical decoding on physical standby servers. It also makes it
possible to promote a replica with logical slots to a master and
replay from those slots, allowing logical decoding applications to
follow physical failover.
Logical slots cannot actually be created on a replica without use of
the low-level C slot management APIs so this is mostly foundation work
for subsequent changes to enable logical decoding on standbys.
This commit includes a module in src/test/modules with functions to
manipulate the slots (which is not otherwise possible in SQL code) in
order to enable testing, and a new test in src/test/recovery to ensure
that the behavior is as expected.
Note that an earlier version of logical decoding timeline following
was committed to 9.5 as 24c5f1a103ce, 3a3b309041b0, 82c83b337202, and
f07d18b6e94d. It was then reverted by c1543a81a7a8 just after 9.5
feature freeze when issues were discovered too late to safely fix them
in the 9.5 release cycle.
The prior approach failed to consider that a record could be split
across pages that are on different segments, where the new segment
contains the start of a new timeline. In that case the old segment
might be missing or renamed with a .partial suffix.
This patch reworks the logic to be page-based and in the process
simplify how the last timeline for a segment is looked up.
Slot timeline following only works in a backend. Frontend support can
be aded separately, where it could be useful for pg_xlogdump etc once
support for timeline.c, List, etc is added for frontend code.
__________
I'm hoping to find time to refactor timeline following so that we
avoid passing timeline information around the xlogreader using
globals, but that'd be a separate change that can be made after this.
I've omitted the --endpos changes for pg_recvlogical, which again can
be added separately.
The test harness code will become unnecessary when proper support for
logical failover or logical decoding on standby is added, so I'm not
really sure it should be committed.
Prior threads:
* /messages/by-id/CAMsr+YG_1FU_-L8QWSk6oKFT4Jt8dpORy2RHXDyMy0B5ZfkpGA@mail.gmail.com
* /messages/by-id/CAMsr+YH-C1-X_+s=2nzAPnR0wwqJa-rUmVHSYyZaNSn93MUBMQ@mail.gmail.com
* /messages/by-id/20160503165812.GA29604@alvherre.pgsql
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
Attachments:
0001-Follow-timeline-switches-in-logical-decoding.patchtext/x-patch; charset=US-ASCII; name=0001-Follow-timeline-switches-in-logical-decoding.patchDownload
From 10d5f4652689fbcaf5b14fe6bd991c98dbf60e00 Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Thu, 1 Sep 2016 10:16:55 +0800
Subject: [PATCH] Follow timeline switches in logical decoding
When decoding from a logical slot, it's necessary for xlog reading to
be able to read xlog from historical (i.e. not current) timelines.
Otherwise decoding fails after failover to a physical replica because
the oldest still-needed archives are in the historical timeline.
Supporting logical decoding timeline following is a pre-requisite for
logical decoding on physical standby servers. It also makes it
possible to promote a replica with logical slots to a master and
replay from those slots, allowing logical decoding applications to
follow physical failover.
Logical slots cannot actually be created on a replica without use of
the low-level C slot management APIs so this is mostly foundation work
for subsequent changes to enable logical decoding on standbys.
This commit includes a module in src/test/modules with functions to
manipulate the slots (which is not otherwise possible in SQL code) in
order to enable testing, and a new test in src/test/recovery to ensure
that the behavior is as expected.
Note that an earlier version of logical decoding timeline following
was committed to 9.5 as 24c5f1a103ce, 3a3b309041b0, 82c83b337202, and
f07d18b6e94d. It was then reverted by c1543a81a7a8 just after 9.5
feature freeze when issues were discovered too late to safely fix them
in the 9.5 release cycle.
The prior approach failed to consider that a record could be split
across pages that are on different segments, where the new segment
contains the start of a new timeline. In that case the old segment
might be missing or renamed with a .partial suffix.
This patch reworks the logic to be page-based and in the process
simplify how the last timeline for a segment is looked up.
Slot timeline following only works in a backend. Frontend support can
be aded separately, where it could be useful for pg_xlogdump etc once
support for timeline.c, List, etc is added for frontend code.
---
src/backend/access/transam/xlogutils.c | 207 +++++++++++++-
src/backend/replication/logical/logicalfuncs.c | 12 +-
src/include/access/xlogreader.h | 11 +
src/test/modules/Makefile | 1 +
src/test/modules/test_slot_timelines/.gitignore | 3 +
src/test/modules/test_slot_timelines/Makefile | 22 ++
src/test/modules/test_slot_timelines/README | 19 ++
.../expected/load_extension.out | 19 ++
.../expected/load_extension_1.out | 7 +
.../test_slot_timelines/sql/load_extension.sql | 7 +
.../test_slot_timelines--1.0.sql | 16 ++
.../test_slot_timelines/test_slot_timelines.c | 133 +++++++++
.../test_slot_timelines/test_slot_timelines.conf | 2 +
.../test_slot_timelines.control | 5 +
src/test/recovery/Makefile | 2 +
.../recovery/t/006_logical_decoding_timelines.pl | 307 +++++++++++++++++++++
16 files changed, 752 insertions(+), 21 deletions(-)
create mode 100644 src/test/modules/test_slot_timelines/.gitignore
create mode 100644 src/test/modules/test_slot_timelines/Makefile
create mode 100644 src/test/modules/test_slot_timelines/README
create mode 100644 src/test/modules/test_slot_timelines/expected/load_extension.out
create mode 100644 src/test/modules/test_slot_timelines/expected/load_extension_1.out
create mode 100644 src/test/modules/test_slot_timelines/sql/load_extension.sql
create mode 100644 src/test/modules/test_slot_timelines/test_slot_timelines--1.0.sql
create mode 100644 src/test/modules/test_slot_timelines/test_slot_timelines.c
create mode 100644 src/test/modules/test_slot_timelines/test_slot_timelines.conf
create mode 100644 src/test/modules/test_slot_timelines/test_slot_timelines.control
create mode 100644 src/test/recovery/t/006_logical_decoding_timelines.pl
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 51a8e8d..014978f 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -19,6 +19,7 @@
#include <unistd.h>
+#include "access/timeline.h"
#include "access/xlog.h"
#include "access/xlog_internal.h"
#include "access/xlogutils.h"
@@ -660,6 +661,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
/* state maintained across calls */
static int sendFile = -1;
static XLogSegNo sendSegNo = 0;
+ static TimeLineID sendTLI = 0;
static uint32 sendOff = 0;
p = buf;
@@ -675,7 +677,8 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
startoff = recptr % XLogSegSize;
/* Do we need to switch to a different xlog segment? */
- if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
+ if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo) ||
+ sendTLI != tli)
{
char path[MAXPGPATH];
@@ -702,6 +705,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
path)));
}
sendOff = 0;
+ sendTLI = tli;
}
/* Need to seek in the file? */
@@ -750,6 +754,142 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
}
/*
+ * Determine which timeline to read an xlog page from and set the
+ * XLogReaderState's state->currTLI to that timeline ID.
+ *
+ * wantPage must be set to the start address of the page to read and
+ * wantLength to the amount of the page that will be read, up to
+ * XLOG_BLCKSZ. If the amount to be read isn't known, pass XLOG_BLCKSZ.
+ *
+ * We switch to an xlog segment from the new timeline eagerly when on a
+ * historical timeline, as soon as we reach the start of the xlog segment
+ * containing the timeline switch. The server copied the segment to the new
+ * timeline so all the data up to the switch point is the same, but there's no
+ * guarantee the old segment will still exist. It may have been deleted or
+ * renamed with a .partial suffix so we can't necessarily keep reading from
+ * the old TLI even though tliSwitchPoint says it's OK.
+ *
+ * We can't just check the timeline when we read a page on a different segment
+ * to the last page. We could've received a timeline switch from a cascading
+ * upstream, so the current segment ends and we have to switch to a new one.
+ * Even in the middle of reading a page we could have to dump the cached page
+ * and switch to a new TLI.
+ *
+ * Because of this, callers MAY NOT assume that currTLI is the timeline that
+ * will be in a page's xlp_tli; the page may begin on an older timeline or we
+ * might be reading from historical timeline data on a segment that's been
+ * copied to a new timeline.
+ *
+ * The caller must also make sure it doesn't read past the current redo pointer
+ * so it doesn't fail to notice that the current timeline became historical.
+ */
+static void
+XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
+{
+ const XLogRecPtr lastReadPage = state->readSegNo * XLogSegSize + state->readOff;
+
+ elog(DEBUG4, "Determining timeline for read at %X/%X+%X",
+ (uint32)(wantPage>>32), (uint32)wantPage, wantLength);
+
+ Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0);
+ Assert(wantLength <= XLOG_BLCKSZ);
+ Assert(state->readLen == 0 || state->readLen <= XLOG_BLCKSZ);
+
+ /*
+ * If the desired page is currently read in and valid, we have nothing to do.
+ *
+ * The caller should've ensured that it didn't previously advance readOff
+ * past the valid limit of this timeline, so it doesn't matter if the current
+ * TLI has since become historical.
+ */
+ if (lastReadPage == wantPage &&
+ state->readLen != 0 &&
+ lastReadPage + state->readLen >= wantPage + Min(wantLength,XLOG_BLCKSZ-1))
+ {
+ elog(DEBUG4, "Wanted data already valid"); //XXX
+ return;
+ }
+
+ /*
+ * If we're reading from the current timeline, it hasn't become historical
+ * and the page we're reading is after the last page read, we can again
+ * just carry on. (Seeking backwards requires a check to make sure the older
+ * page isn't on a prior timeline).
+ */
+ if (state->currTLI == ThisTimeLineID && wantPage >= lastReadPage)
+ {
+ Assert(state->currTLIValidUntil == InvalidXLogRecPtr);
+ elog(DEBUG4, "On current timeline");
+ return;
+ }
+
+ /*
+ * If we're just reading pages from a previously validated historical
+ * timeline and the timeline we're reading from is valid until the
+ * end of the current segment we can just keep reading.
+ */
+ if (state->currTLIValidUntil != InvalidXLogRecPtr &&
+ state->currTLI != ThisTimeLineID &&
+ state->currTLI != 0 &&
+ (wantPage + wantLength) / XLogSegSize < state->currTLIValidUntil / XLogSegSize)
+ {
+ elog(DEBUG4, "Still on historical timeline %u until %X/%X",
+ state->currTLI,
+ (uint32)(state->currTLIValidUntil >> 32),
+ (uint32)(state->currTLIValidUntil));
+ return;
+ }
+
+ /*
+ * If we reach this point we're either looking up a page for random access,
+ * the current timeline just became historical, or we're reading from a new
+ * segment containing a timeline switch. In all cases we need to determine
+ * the newest timeline on the segment.
+ *
+ * If it's the current timeline we can just keep reading from here unless
+ * we detect a timeline switch that makes the current timeline historical.
+ * If it's a historical timeline we can read all the segment on the newest
+ * timeline because it contains all the old timelines' data too. So only
+ * one switch check is required.
+ */
+ {
+ /*
+ * We need to re-read the timeline history in case it's been changed
+ * by a promotion or replay from a cascaded replica.
+ */
+ List *timelineHistory = readTimeLineHistory(ThisTimeLineID);
+
+ XLogRecPtr endOfSegment = (((wantPage / XLogSegSize) + 1) * XLogSegSize) - 1;
+
+ Assert(wantPage / XLogSegSize == endOfSegment / XLogSegSize);
+
+ /* Find the timeline of the last LSN on the segment containing wantPage. */
+ state->currTLI = tliOfPointInHistory(endOfSegment, timelineHistory);
+ state->currTLIValidUntil = tliSwitchPoint(state->currTLI, timelineHistory, NULL);
+
+ Assert(state->currTLIValidUntil == InvalidXLogRecPtr ||
+ wantPage + wantLength < state->currTLIValidUntil);
+
+ list_free_deep(timelineHistory);
+
+ elog(DEBUG3, "switched to timeline %u valid until %X/%X",
+ state->currTLI,
+ (uint32)(state->currTLIValidUntil >> 32),
+ (uint32)(state->currTLIValidUntil));
+ }
+
+ elog(DEBUG3, "page read ptr %X/%X (for record %X/%X) is on segment with TLI %u valid until %X/%X, server current TLI is %u",
+ (uint32) (wantPage >> 32),
+ (uint32) wantPage,
+ (uint32) (state->currRecPtr >> 32),
+ (uint32) state->currRecPtr,
+ state->currTLI,
+ (uint32) (state->currTLIValidUntil >> 32),
+ (uint32) (state->currTLIValidUntil),
+ ThisTimeLineID);
+}
+
+/*
* read_page callback for reading local xlog files
*
* Public because it would likely be very helpful for someone writing another
@@ -770,28 +910,65 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
int count;
loc = targetPagePtr + reqLen;
+
+ /* Make sure enough xlog is available... */
while (1)
{
/*
- * TODO: we're going to have to do something more intelligent about
- * timelines on standbys. Use readTimeLineHistory() and
- * tliOfPointInHistory() to get the proper LSN? For now we'll catch
- * that case earlier, but the code and TODO is left in here for when
- * that changes.
+ * Check which timeline to get the record from.
+ *
+ * We have to do it each time through the loop because if we're in
+ * recovery as a cascading standby, the current timeline might've
+ * become historical.
*/
- if (!RecoveryInProgress())
+ XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
+
+ if (state->currTLI == ThisTimeLineID)
{
- *pageTLI = ThisTimeLineID;
- read_upto = GetFlushRecPtr();
+ /*
+ * We're reading from the current timeline so we might have to
+ * wait for the desired record to be generated (or, for a standby,
+ * received & replayed)
+ */
+ if (!RecoveryInProgress())
+ {
+ *pageTLI = ThisTimeLineID;
+ read_upto = GetFlushRecPtr();
+ }
+ else
+ read_upto = GetXLogReplayRecPtr(pageTLI);
+
+ if (loc <= read_upto)
+ break;
+
+ CHECK_FOR_INTERRUPTS();
+ pg_usleep(1000L);
}
else
- read_upto = GetXLogReplayRecPtr(pageTLI);
-
- if (loc <= read_upto)
+ {
+ /*
+ * We're on a historical timeline, so limit reading to the switch
+ * point where we moved to the next timeline.
+ *
+ * We don't need to GetFlushRecPtr or GetXLogReplayRecPtr. We know
+ * about the new timeline, so we must've received past the end of
+ * it.
+ */
+ read_upto = state->currTLIValidUntil;
+
+ /*
+ * Setting pageTLI to our wanted record's TLI is slightly wrong;
+ * the page might begin on an older timeline if it contains a
+ * timeline switch, since its xlog segment will have been copied
+ * from the prior timeline. This is pretty harmless though, as
+ * nothing cares so long as the timeline doesn't go backwards. We
+ * should read the page header instead; FIXME someday.
+ */
+ *pageTLI = state->currTLI;
+
+ /* No need to wait on a historical timeline */
break;
-
- CHECK_FOR_INTERRUPTS();
- pg_usleep(1000L);
+ }
}
if (targetPagePtr + XLOG_BLCKSZ <= read_upto)
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 4e4c8cd..99112ac 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -234,12 +234,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
rsinfo->setResult = p->tupstore;
rsinfo->setDesc = p->tupdesc;
- /* compute the current end-of-wal */
- if (!RecoveryInProgress())
- end_of_wal = GetFlushRecPtr();
- else
- end_of_wal = GetXLogReplayRecPtr(NULL);
-
ReplicationSlotAcquire(NameStr(*name));
PG_TRY();
@@ -279,6 +273,12 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
/* invalidate non-timetravel entries */
InvalidateSystemCaches();
+ if (!RecoveryInProgress())
+ end_of_wal = GetFlushRecPtr();
+ else
+ end_of_wal = GetXLogReplayRecPtr(NULL);
+
+ /* Decode until we run out of records */
while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) ||
(ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < end_of_wal))
{
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index deaa7f5..caff9a6 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -27,6 +27,10 @@
#include "access/xlogrecord.h"
+#ifndef FRONTEND
+#include "nodes/pg_list.h"
+#endif
+
typedef struct XLogReaderState XLogReaderState;
/* Function type definition for the read_page callback */
@@ -160,6 +164,13 @@ struct XLogReaderState
/* beginning of the WAL record being read. */
XLogRecPtr currRecPtr;
+ /* timeline to read it from, 0 if a lookup is required */
+ TimeLineID currTLI;
+ /*
+ * Safe point to read to in currTLI if current TLI is historical
+ * (tliSwitchPoint) or InvalidXLogRecPtr if on current timeline.
+ */
+ XLogRecPtr currTLIValidUntil;
/* Buffer for current ReadRecord result (expandable) */
char *readRecordBuf;
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 3ce9904..106b816 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -15,6 +15,7 @@ SUBDIRS = \
test_pg_dump \
test_rls_hooks \
test_shm_mq \
+ test_slot_timelines \
worker_spi
all: submake-generated-headers
diff --git a/src/test/modules/test_slot_timelines/.gitignore b/src/test/modules/test_slot_timelines/.gitignore
new file mode 100644
index 0000000..543c50d
--- /dev/null
+++ b/src/test/modules/test_slot_timelines/.gitignore
@@ -0,0 +1,3 @@
+results/
+tmp_check/
+log/
diff --git a/src/test/modules/test_slot_timelines/Makefile b/src/test/modules/test_slot_timelines/Makefile
new file mode 100644
index 0000000..21757c5
--- /dev/null
+++ b/src/test/modules/test_slot_timelines/Makefile
@@ -0,0 +1,22 @@
+# src/test/modules/test_slot_timelines/Makefile
+
+MODULES = test_slot_timelines
+PGFILEDESC = "test_slot_timelines - test utility for slot timeline following"
+
+EXTENSION = test_slot_timelines
+DATA = test_slot_timelines--1.0.sql
+
+EXTRA_INSTALL=contrib/test_decoding
+REGRESS=load_extension
+REGRESS_OPTS = --temp-config=$(top_srcdir)/src/test/modules/test_slot_timelines/test_slot_timelines.conf
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_slot_timelines
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_slot_timelines/README b/src/test/modules/test_slot_timelines/README
new file mode 100644
index 0000000..585f02f
--- /dev/null
+++ b/src/test/modules/test_slot_timelines/README
@@ -0,0 +1,19 @@
+A test module for logical decoding failover and timeline following.
+
+This module provides a minimal way to maintain logical slots on replicas
+that mirror the state on the master. It doesn't make decoding possible,
+just tracking slot state so that a decoding client that's using the master
+can follow a physical failover to the standby. The master doesn't know
+about the slots on the standby, they're synced by a client that connects
+to both.
+
+This is intentionally not part of the test_decoding module because that's meant
+to serve as example code, where this module exercises internal server features
+by unsafely exposing internal state to SQL. It's not the right way to do
+failover, it's just a simple way to test it from the perl TAP framework to
+prove the feature works.
+
+In a practical implementation of this approach a bgworker on the master would
+monitor slot positions and relay them to a bgworker on the standby that applies
+the position updates without exposing slot internals to SQL. That's too complex
+for this test framework though.
diff --git a/src/test/modules/test_slot_timelines/expected/load_extension.out b/src/test/modules/test_slot_timelines/expected/load_extension.out
new file mode 100644
index 0000000..7c2ad9d
--- /dev/null
+++ b/src/test/modules/test_slot_timelines/expected/load_extension.out
@@ -0,0 +1,19 @@
+CREATE EXTENSION test_slot_timelines;
+SELECT test_slot_timelines_create_logical_slot('test_slot', 'test_decoding');
+ test_slot_timelines_create_logical_slot
+-----------------------------------------
+
+(1 row)
+
+SELECT test_slot_timelines_advance_logical_slot('test_slot', txid_current()::text::xid, txid_current()::text::xid, pg_current_xlog_location(), pg_current_xlog_location());
+ test_slot_timelines_advance_logical_slot
+------------------------------------------
+
+(1 row)
+
+SELECT pg_drop_replication_slot('test_slot');
+ pg_drop_replication_slot
+--------------------------
+
+(1 row)
+
diff --git a/src/test/modules/test_slot_timelines/expected/load_extension_1.out b/src/test/modules/test_slot_timelines/expected/load_extension_1.out
new file mode 100644
index 0000000..0db21e4
--- /dev/null
+++ b/src/test/modules/test_slot_timelines/expected/load_extension_1.out
@@ -0,0 +1,7 @@
+CREATE EXTENSION test_slot_timelines;
+SELECT test_slot_timelines_create_logical_slot('test_slot', 'test_decoding');
+ERROR: replication slots can only be used if max_replication_slots > 0
+SELECT test_slot_timelines_advance_logical_slot('test_slot', txid_current()::text::xid, txid_current()::text::xid, pg_current_xlog_location(), pg_current_xlog_location());
+ERROR: replication slots can only be used if max_replication_slots > 0
+SELECT pg_drop_replication_slot('test_slot');
+ERROR: replication slots can only be used if max_replication_slots > 0
diff --git a/src/test/modules/test_slot_timelines/sql/load_extension.sql b/src/test/modules/test_slot_timelines/sql/load_extension.sql
new file mode 100644
index 0000000..2440355
--- /dev/null
+++ b/src/test/modules/test_slot_timelines/sql/load_extension.sql
@@ -0,0 +1,7 @@
+CREATE EXTENSION test_slot_timelines;
+
+SELECT test_slot_timelines_create_logical_slot('test_slot', 'test_decoding');
+
+SELECT test_slot_timelines_advance_logical_slot('test_slot', txid_current()::text::xid, txid_current()::text::xid, pg_current_xlog_location(), pg_current_xlog_location());
+
+SELECT pg_drop_replication_slot('test_slot');
diff --git a/src/test/modules/test_slot_timelines/test_slot_timelines--1.0.sql b/src/test/modules/test_slot_timelines/test_slot_timelines--1.0.sql
new file mode 100644
index 0000000..a1886f7
--- /dev/null
+++ b/src/test/modules/test_slot_timelines/test_slot_timelines--1.0.sql
@@ -0,0 +1,16 @@
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION test_slot_timelines" to load this file. \quit
+
+CREATE OR REPLACE FUNCTION test_slot_timelines_create_logical_slot(slot_name text, plugin text)
+RETURNS void
+STRICT LANGUAGE c AS 'MODULE_PATHNAME';
+
+COMMENT ON FUNCTION test_slot_timelines_create_logical_slot(text, text)
+IS 'Create a logical slot at a particular lsn and xid. Do not use in production servers, it is not safe. The slot is created with an invalid xmin and lsn.';
+
+CREATE OR REPLACE FUNCTION test_slot_timelines_advance_logical_slot(slot_name text, new_xmin xid, new_catalog_xmin xid, new_restart_lsn pg_lsn, new_confirmed_lsn pg_lsn)
+RETURNS void
+STRICT LANGUAGE c AS 'MODULE_PATHNAME';
+
+COMMENT ON FUNCTION test_slot_timelines_advance_logical_slot(text, xid, xid, pg_lsn, pg_lsn)
+IS 'Advance a logical slot directly. Do not use this in production servers, it is not safe.';
diff --git a/src/test/modules/test_slot_timelines/test_slot_timelines.c b/src/test/modules/test_slot_timelines/test_slot_timelines.c
new file mode 100644
index 0000000..1f07488
--- /dev/null
+++ b/src/test/modules/test_slot_timelines/test_slot_timelines.c
@@ -0,0 +1,133 @@
+/*--------------------------------------------------------------------------
+ *
+ * test_slot_timelines.c
+ * Test harness code for slot timeline following
+ *
+ * Copyright (c) 2016, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/test/modules/test_slot_timelines/test_slot_timelines.c
+ *
+ * -------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/transam.h"
+#include "fmgr.h"
+#include "miscadmin.h"
+#include "replication/slot.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(test_slot_timelines_create_logical_slot);
+PG_FUNCTION_INFO_V1(test_slot_timelines_advance_logical_slot);
+
+static void clear_slot_transient_state(void);
+
+/*
+ * Create a new logical slot, with invalid LSN and xid, directly. This does not
+ * use the snapshot builder or logical decoding machinery. It's only intended
+ * for creating a slot on a replica that mirrors the state of a slot on an
+ * upstream master.
+ *
+ * Note that this is test harness code. You shouldn't expose slot internals
+ * to SQL like this for any real world usage. See the README.
+ */
+Datum
+test_slot_timelines_create_logical_slot(PG_FUNCTION_ARGS)
+{
+ char *slotname = text_to_cstring(PG_GETARG_TEXT_P(0));
+ char *plugin = text_to_cstring(PG_GETARG_TEXT_P(1));
+
+ CheckSlotRequirements();
+
+ ReplicationSlotCreate(slotname, true, RS_PERSISTENT);
+
+ /* register the plugin name with the slot */
+ StrNCpy(NameStr(MyReplicationSlot->data.plugin), plugin, NAMEDATALEN);
+
+ /*
+ * Initialize persistent state to placeholders to be set by
+ * test_slot_timelines_advance_logical_slot .
+ */
+ MyReplicationSlot->data.xmin = InvalidTransactionId;
+ MyReplicationSlot->data.catalog_xmin = InvalidTransactionId;
+ MyReplicationSlot->data.restart_lsn = InvalidXLogRecPtr;
+ MyReplicationSlot->data.confirmed_flush = InvalidXLogRecPtr;
+
+ clear_slot_transient_state();
+
+ ReplicationSlotRelease();
+
+ PG_RETURN_VOID();
+}
+
+/*
+ * Set the state of a slot.
+ *
+ * This doesn't maintain the non-persistent state at all,
+ * but since the slot isn't in use that's OK.
+ *
+ * There's intentionally no check to prevent slots going backwards
+ * because they can actually go backwards if the master crashes when
+ * it hasn't yet flushed slot state to disk then we copy the older
+ * slot state after recovery.
+ *
+ * There's no checking done for xmin or catalog xmin either, since
+ * we can't really do anything useful that accounts for xid wrap-around.
+ *
+ * Note that this is test harness code. You shouldn't expose slot internals
+ * to SQL like this for any real world usage. See the README.
+ */
+Datum
+test_slot_timelines_advance_logical_slot(PG_FUNCTION_ARGS)
+{
+ char *slotname = text_to_cstring(PG_GETARG_TEXT_P(0));
+ TransactionId new_xmin = DatumGetTransactionId(PG_GETARG_DATUM(1));
+ TransactionId new_catalog_xmin = DatumGetTransactionId(PG_GETARG_DATUM(2));
+ XLogRecPtr restart_lsn = PG_GETARG_LSN(3);
+ XLogRecPtr confirmed_lsn = PG_GETARG_LSN(4);
+
+ CheckSlotRequirements();
+
+ ReplicationSlotAcquire(slotname);
+
+ if (MyReplicationSlot->data.database != MyDatabaseId)
+ elog(ERROR, "trying to update a slot on a different database");
+
+ MyReplicationSlot->data.xmin = new_xmin;
+ MyReplicationSlot->data.catalog_xmin = new_catalog_xmin;
+ MyReplicationSlot->data.restart_lsn = restart_lsn;
+ MyReplicationSlot->data.confirmed_flush = confirmed_lsn;
+
+ clear_slot_transient_state();
+
+ ReplicationSlotMarkDirty();
+ ReplicationSlotSave();
+ ReplicationSlotRelease();
+
+ ReplicationSlotsComputeRequiredXmin(false);
+ ReplicationSlotsComputeRequiredLSN();
+
+ PG_RETURN_VOID();
+}
+
+static void
+clear_slot_transient_state(void)
+{
+ Assert(MyReplicationSlot != NULL);
+
+ /*
+ * Make sure the slot state is the same as if it were newly loaded from
+ * disk on recovery.
+ */
+ MyReplicationSlot->effective_xmin = MyReplicationSlot->data.xmin;
+ MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin;
+
+ MyReplicationSlot->candidate_catalog_xmin = InvalidTransactionId;
+ MyReplicationSlot->candidate_xmin_lsn = InvalidXLogRecPtr;
+ MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
+ MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr;
+}
diff --git a/src/test/modules/test_slot_timelines/test_slot_timelines.conf b/src/test/modules/test_slot_timelines/test_slot_timelines.conf
new file mode 100644
index 0000000..56b46d7
--- /dev/null
+++ b/src/test/modules/test_slot_timelines/test_slot_timelines.conf
@@ -0,0 +1,2 @@
+max_replication_slots=2
+wal_level=logical
diff --git a/src/test/modules/test_slot_timelines/test_slot_timelines.control b/src/test/modules/test_slot_timelines/test_slot_timelines.control
new file mode 100644
index 0000000..dcee1a7
--- /dev/null
+++ b/src/test/modules/test_slot_timelines/test_slot_timelines.control
@@ -0,0 +1,5 @@
+# test_slot_timelines extension
+comment = 'Test utility for slot timeline following and logical decoding'
+default_version = '1.0'
+module_pathname = '$libdir/test_slot_timelines'
+relocatable = true
diff --git a/src/test/recovery/Makefile b/src/test/recovery/Makefile
index 9290719..78570dd 100644
--- a/src/test/recovery/Makefile
+++ b/src/test/recovery/Makefile
@@ -9,6 +9,8 @@
#
#-------------------------------------------------------------------------
+EXTRA_INSTALL=contrib/test_decoding src/test/modules/test_slot_timelines
+
subdir = src/test/recovery
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
diff --git a/src/test/recovery/t/006_logical_decoding_timelines.pl b/src/test/recovery/t/006_logical_decoding_timelines.pl
new file mode 100644
index 0000000..a3a4b61
--- /dev/null
+++ b/src/test/recovery/t/006_logical_decoding_timelines.pl
@@ -0,0 +1,307 @@
+# Demonstrate that logical can follow timeline switches.
+#
+# Logical replication slots can follow timeline switches but it's
+# normally not possible to have a logical slot on a replica where
+# promotion and a timeline switch can occur. The only ways
+# we can create that circumstance are:
+#
+# * By doing a filesystem-level copy of the DB, since pg_basebackup
+# excludes pg_replslot but we can copy it directly; or
+#
+# * by creating a slot directly at the C level on the replica and
+# advancing it as we go using the low level APIs. It can't be done
+# from SQL since logical decoding isn't allowed on replicas.
+#
+# This module uses the first approach to show that timeline following
+# on a logical slot works.
+#
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 20;
+use RecursiveCopy;
+use File::Copy;
+
+my ($stdout, $stderr, $ret);
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1, has_archiving => 1);
+$node_master->append_conf('postgresql.conf', "wal_level = 'logical'\n");
+$node_master->append_conf('postgresql.conf', "max_replication_slots = 2\n");
+$node_master->append_conf('postgresql.conf', "max_wal_senders = 2\n");
+$node_master->append_conf('postgresql.conf', "log_min_messages = 'debug2'\n");
+$node_master->dump_info;
+$node_master->start;
+
+diag "Testing logical timeline following with a filesystem-level copy";
+
+$node_master->safe_psql('postgres',
+"SELECT pg_create_logical_replication_slot('before_basebackup', 'test_decoding');"
+);
+$node_master->safe_psql('postgres', "CREATE TABLE decoding(blah text);");
+$node_master->safe_psql('postgres',
+ "INSERT INTO decoding(blah) VALUES ('beforebb');");
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+
+my $backup_name = 'b1';
+$node_master->backup_fs_hot($backup_name);
+
+my $node_replica = get_new_node('replica');
+$node_replica->init_from_backup(
+ $node_master, $backup_name,
+ has_streaming => 1,
+ has_restoring => 1);
+$node_replica->start;
+
+$node_master->safe_psql('postgres',
+"SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');"
+);
+$node_master->safe_psql('postgres',
+ "INSERT INTO decoding(blah) VALUES ('afterbb');");
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+
+# Verify that only the before base_backup slot is on the replica
+$stdout = $node_replica->safe_psql('postgres',
+ 'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name');
+is($stdout, 'before_basebackup',
+ 'Expected to find only slot before_basebackup on replica');
+
+# Boom, crash
+$node_master->stop('immediate');
+
+$node_replica->promote;
+$node_replica->poll_query_until('postgres',
+ "SELECT NOT pg_is_in_recovery();");
+
+$node_replica->safe_psql('postgres',
+ "INSERT INTO decoding(blah) VALUES ('after failover');");
+
+# Shouldn't be able to read from slot created after base backup
+($ret, $stdout, $stderr) = $node_replica->psql('postgres',
+"SELECT data FROM pg_logical_slot_peek_changes('after_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');"
+);
+is($ret, 3, 'replaying from after_basebackup slot fails');
+like(
+ $stderr,
+ qr/replication slot "after_basebackup" does not exist/,
+ 'after_basebackup slot missing');
+
+# Should be able to read from slot created before base backup
+($ret, $stdout, $stderr) = $node_replica->psql(
+ 'postgres',
+"SELECT data FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');",
+ timeout => 30);
+is($ret, 0, 'replay from slot before_basebackup succeeds');
+is( $stdout, q(BEGIN
+table public.decoding: INSERT: blah[text]:'beforebb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'afterbb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'after failover'
+COMMIT), 'decoded expected data from slot before_basebackup');
+is($stderr, '', 'replay from slot before_basebackup produces no stderr');
+
+# We don't need the standby anymore
+$node_replica->teardown_node();
+
+
+# OK, time to try the same thing again, but this time we'll be using slot
+# mirroring on the standby and a pg_basebackup of the master.
+
+diag "Testing logical timeline following with test_slot_timelines module";
+
+$node_master->start();
+
+# Clean up after the last test
+$node_master->safe_psql('postgres', 'DELETE FROM decoding;');
+is( $node_master->psql(
+ 'postgres',
+'SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots;'),
+ 0,
+ 'dropping slots succeeds via pg_drop_replication_slot');
+
+# Same as before, we'll make one slot before basebackup, one after. This time
+# the basebackup will be with pg_basebackup so it'll omit both slots, then
+# we'll use SQL functions provided by the test_slot_timelines test module to sync
+# them to the replica, do some work, sync them and fail over then test again.
+# This time we should have both the before- and after-basebackup slots working.
+
+is( $node_master->psql(
+ 'postgres',
+"SELECT pg_create_logical_replication_slot('before_basebackup', 'test_decoding');"
+ ),
+ 0,
+ 'creating slot before_basebackup succeeds');
+
+$node_master->safe_psql('postgres',
+ "INSERT INTO decoding(blah) VALUES ('beforebb');");
+
+$backup_name = 'b2';
+$node_master->backup($backup_name);
+
+is( $node_master->psql(
+ 'postgres',
+"SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');"
+ ),
+ 0,
+ 'creating slot after_basebackup succeeds');
+
+$node_master->safe_psql('postgres',
+ "INSERT INTO decoding(blah) VALUES ('afterbb');");
+
+$node_replica = get_new_node('replica2');
+$node_replica->init_from_backup(
+ $node_master, $backup_name,
+ has_streaming => 1,
+ has_restoring => 1);
+
+$node_replica->start;
+
+# Verify the slots are both absent on the replica
+$stdout = $node_replica->safe_psql('postgres',
+ 'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name');
+is($stdout, '', 'No slots exist on the replica');
+
+# Now do our magic to sync the slot states across. Normally
+# this would be being done continuously by a bgworker but
+# we're just doing it by hand for this test. This is exposing
+# postgres innards to SQL so it's unsafe except for testing.
+$node_master->safe_psql('postgres', 'CREATE EXTENSION test_slot_timelines;');
+
+my $slotinfo = $node_master->safe_psql(
+ 'postgres',
+ qq{SELECT slot_name, plugin,
+ COALESCE(xmin, '0'), catalog_xmin,
+ restart_lsn, confirmed_flush_lsn
+ FROM pg_replication_slots ORDER BY slot_name}
+);
+diag "Copying slots to replica";
+open my $fh, '<', \$slotinfo or die $!;
+while (<$fh>)
+{
+ print $_;
+ chomp $_;
+ my ($slot_name, $plugin, $xmin, $catalog_xmin, $restart_lsn,
+ $confirmed_flush_lsn)
+ = map { "'$_'" } split qr/\|/, $_;
+
+ print
+"# Copying slot $slot_name,$plugin,$xmin,$catalog_xmin,$restart_lsn,$confirmed_flush_lsn\n";
+ $node_replica->safe_psql('postgres',
+ "SELECT test_slot_timelines_create_logical_slot($slot_name, $plugin);"
+ );
+ $node_replica->safe_psql('postgres',
+"SELECT test_slot_timelines_advance_logical_slot($slot_name, $xmin, $catalog_xmin, $restart_lsn, $confirmed_flush_lsn);"
+ );
+}
+close $fh or die $!;
+
+# Now both slots are present on the replica and exactly match the master
+$stdout = $node_replica->safe_psql('postgres',
+ 'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name');
+is( $stdout,
+ "after_basebackup\nbefore_basebackup",
+ 'both slots now exist on replica');
+
+$stdout = $node_replica->safe_psql(
+ 'postgres',
+ qq{SELECT slot_name, plugin, COALESCE(xmin, '0'), catalog_xmin,
+ restart_lsn, confirmed_flush_lsn
+ FROM pg_replication_slots
+ ORDER BY slot_name});
+is($stdout, $slotinfo,
+ "slot data read back from replica matches slot data on master");
+
+# We now have to copy some extra WAL to satisfy the requirements of the oldest
+# replication slot. pg_basebackup doesn't know to copy the extra WAL for slots
+# so we have to help out. We know the WAL is still retained on the master
+# because we haven't advanced the slots there.
+#
+# Figure out what the oldest segment we need is by looking at the restart_lsn
+# of the oldest slot.
+#
+# It only makes sense to do this once the slots are created on the replica,
+# otherwise it might just delete the segments again.
+
+my $oldest_needed_segment = $node_master->safe_psql(
+ 'postgres',
+ qq{SELECT pg_xlogfile_name((
+ SELECT restart_lsn
+ FROM pg_replication_slots
+ ORDER BY restart_lsn ASC
+ LIMIT 1
+ ));}
+);
+
+diag "oldest needed xlog seg is $oldest_needed_segment ";
+
+# WAL segment names sort lexically so we can just grab everything > than this
+# segment.
+opendir(my $pg_xlog, $node_master->data_dir . "/pg_xlog") or die $!;
+while (my $seg = readdir $pg_xlog)
+{
+ next if $seg eq '.' or $seg eq '..';
+ next unless $seg >= $oldest_needed_segment && $seg =~ /^[0-9]{24}/;
+ diag "copying xlog seg $seg";
+ copy(
+ $node_master->data_dir . "/pg_xlog/" . $seg,
+ $node_replica->data_dir . "/pg_xlog/" . $seg
+ ) or die "copy of xlog seg $seg failed: $!";
+}
+closedir $pg_xlog;
+
+# Boom, crash the master
+$node_master->stop('immediate');
+
+$node_replica->promote;
+$node_replica->poll_query_until('postgres',
+ "SELECT NOT pg_is_in_recovery();");
+
+$node_replica->safe_psql('postgres',
+ "INSERT INTO decoding(blah) VALUES ('after failover');");
+
+# This time we can read from both slots
+($ret, $stdout, $stderr) = $node_replica->psql(
+ 'postgres',
+"SELECT data FROM pg_logical_slot_peek_changes('after_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');",
+ timeout => 30);
+is($ret, 0, 'replay from slot after_basebackup succeeds');
+is( $stdout, q(BEGIN
+table public.decoding: INSERT: blah[text]:'afterbb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'after failover'
+COMMIT), 'decoded expected data from slot after_basebackup');
+is($stderr, '', 'replay from slot after_basebackup produces no stderr');
+
+# Should be able to read from slot created before base backup
+#
+# This would fail with an error about missing WAL segments if we hadn't
+# copied extra WAL earlier.
+($ret, $stdout, $stderr) = $node_replica->psql(
+ 'postgres',
+"SELECT data FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');",
+ timeout => 30);
+is($ret, 0, 'replay from slot before_basebackup succeeds');
+is( $stdout, q(BEGIN
+table public.decoding: INSERT: blah[text]:'beforebb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'afterbb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'after failover'
+COMMIT), 'decoded expected data from slot before_basebackup');
+is($stderr, '', 'replay from slot before_basebackup produces no stderr');
+
+($ret, $stdout, $stderr) = $node_replica->psql('postgres',
+ 'SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots;');
+is($ret, 0, 'dropping slots succeeds via pg_drop_replication_slot');
+is($stderr, '', 'dropping slots produces no stderr output');
+
+1;
--
2.5.5
On Thu, Sep 1, 2016 at 1:08 PM, Craig Ringer <craig@2ndquadrant.com> wrote:
Attached is a rebased and updated logical decoding timeline following
patch for 10.0.
Moved to next CF, nothing has happened since submission.
--
Michael
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Hi Craig,
On 01/09/16 06:08, Craig Ringer wrote:
Hi all
Attached is a rebased and updated logical decoding timeline following
patch for 10.0.This is a pre-requisite for the pending work on logical decoding on
standby servers and simplified failover of logical decoding.
I went over this and it looks fine to me, I only rebased the patch on
top of current version (we renamed pg_xlog which broke the tests) and
split the test harness to separate patch. Otherwise I would consider
this to be ready for committer.
I think this should go in early so that there is enough time in the
cycle to uncover potential issues if there are any, even though it looks
all correct to me.
The test harness code will become unnecessary when proper support for
logical failover or logical decoding on standby is added, so I'm not
really sure it should be committed.
Yeah as I said above I split out the test harness for this reason. The
good thing is that when the followup patches get in the test harness
should be easy to removed as the changes are very localized.
--
Petr Jelinek http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
Attachments:
0001-Follow-timeline-switches-in-logical-decoding.patchtext/plain; charset=UTF-8; name=0001-Follow-timeline-switches-in-logical-decoding.patchDownload
From 3c775ee99820ea3e915e1a859fa399651e170ffc Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Mon, 24 Oct 2016 17:40:40 +0200
Subject: [PATCH 1/2] Follow timeline switches in logical decoding
When decoding from a logical slot, it's necessary for xlog reading to
be able to read xlog from historical (i.e. not current) timelines.
Otherwise decoding fails after failover to a physical replica because
the oldest still-needed archives are in the historical timeline.
Supporting logical decoding timeline following is a pre-requisite for
logical decoding on physical standby servers. It also makes it
possible to promote a replica with logical slots to a master and
replay from those slots, allowing logical decoding applications to
follow physical failover.
Logical slots cannot actually be created on a replica without use of
the low-level C slot management APIs so this is mostly foundation work
for subsequent changes to enable logical decoding on standbys.
This commit includes a module in src/test/modules with functions to
manipulate the slots (which is not otherwise possible in SQL code) in
order to enable testing, and a new test in src/test/recovery to ensure
that the behavior is as expected.
Note that an earlier version of logical decoding timeline following
was committed to 9.5 as 24c5f1a103ce, 3a3b309041b0, 82c83b337202, and
f07d18b6e94d. It was then reverted by c1543a81a7a8 just after 9.5
feature freeze when issues were discovered too late to safely fix them
in the 9.5 release cycle.
The prior approach failed to consider that a record could be split
across pages that are on different segments, where the new segment
contains the start of a new timeline. In that case the old segment
might be missing or renamed with a .partial suffix.
This patch reworks the logic to be page-based and in the process
simplify how the last timeline for a segment is looked up.
Slot timeline following only works in a backend. Frontend support can
be aded separately, where it could be useful for pg_xlogdump etc once
support for timeline.c, List, etc is added for frontend code.
---
src/backend/access/transam/xlogutils.c | 207 +++++++++++++++++++++++--
src/backend/replication/logical/logicalfuncs.c | 12 +-
src/include/access/xlogreader.h | 11 ++
3 files changed, 209 insertions(+), 21 deletions(-)
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 51a8e8d..014978f 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -19,6 +19,7 @@
#include <unistd.h>
+#include "access/timeline.h"
#include "access/xlog.h"
#include "access/xlog_internal.h"
#include "access/xlogutils.h"
@@ -660,6 +661,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
/* state maintained across calls */
static int sendFile = -1;
static XLogSegNo sendSegNo = 0;
+ static TimeLineID sendTLI = 0;
static uint32 sendOff = 0;
p = buf;
@@ -675,7 +677,8 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
startoff = recptr % XLogSegSize;
/* Do we need to switch to a different xlog segment? */
- if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
+ if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo) ||
+ sendTLI != tli)
{
char path[MAXPGPATH];
@@ -702,6 +705,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
path)));
}
sendOff = 0;
+ sendTLI = tli;
}
/* Need to seek in the file? */
@@ -750,6 +754,142 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
}
/*
+ * Determine which timeline to read an xlog page from and set the
+ * XLogReaderState's state->currTLI to that timeline ID.
+ *
+ * wantPage must be set to the start address of the page to read and
+ * wantLength to the amount of the page that will be read, up to
+ * XLOG_BLCKSZ. If the amount to be read isn't known, pass XLOG_BLCKSZ.
+ *
+ * We switch to an xlog segment from the new timeline eagerly when on a
+ * historical timeline, as soon as we reach the start of the xlog segment
+ * containing the timeline switch. The server copied the segment to the new
+ * timeline so all the data up to the switch point is the same, but there's no
+ * guarantee the old segment will still exist. It may have been deleted or
+ * renamed with a .partial suffix so we can't necessarily keep reading from
+ * the old TLI even though tliSwitchPoint says it's OK.
+ *
+ * We can't just check the timeline when we read a page on a different segment
+ * to the last page. We could've received a timeline switch from a cascading
+ * upstream, so the current segment ends and we have to switch to a new one.
+ * Even in the middle of reading a page we could have to dump the cached page
+ * and switch to a new TLI.
+ *
+ * Because of this, callers MAY NOT assume that currTLI is the timeline that
+ * will be in a page's xlp_tli; the page may begin on an older timeline or we
+ * might be reading from historical timeline data on a segment that's been
+ * copied to a new timeline.
+ *
+ * The caller must also make sure it doesn't read past the current redo pointer
+ * so it doesn't fail to notice that the current timeline became historical.
+ */
+static void
+XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
+{
+ const XLogRecPtr lastReadPage = state->readSegNo * XLogSegSize + state->readOff;
+
+ elog(DEBUG4, "Determining timeline for read at %X/%X+%X",
+ (uint32)(wantPage>>32), (uint32)wantPage, wantLength);
+
+ Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0);
+ Assert(wantLength <= XLOG_BLCKSZ);
+ Assert(state->readLen == 0 || state->readLen <= XLOG_BLCKSZ);
+
+ /*
+ * If the desired page is currently read in and valid, we have nothing to do.
+ *
+ * The caller should've ensured that it didn't previously advance readOff
+ * past the valid limit of this timeline, so it doesn't matter if the current
+ * TLI has since become historical.
+ */
+ if (lastReadPage == wantPage &&
+ state->readLen != 0 &&
+ lastReadPage + state->readLen >= wantPage + Min(wantLength,XLOG_BLCKSZ-1))
+ {
+ elog(DEBUG4, "Wanted data already valid"); //XXX
+ return;
+ }
+
+ /*
+ * If we're reading from the current timeline, it hasn't become historical
+ * and the page we're reading is after the last page read, we can again
+ * just carry on. (Seeking backwards requires a check to make sure the older
+ * page isn't on a prior timeline).
+ */
+ if (state->currTLI == ThisTimeLineID && wantPage >= lastReadPage)
+ {
+ Assert(state->currTLIValidUntil == InvalidXLogRecPtr);
+ elog(DEBUG4, "On current timeline");
+ return;
+ }
+
+ /*
+ * If we're just reading pages from a previously validated historical
+ * timeline and the timeline we're reading from is valid until the
+ * end of the current segment we can just keep reading.
+ */
+ if (state->currTLIValidUntil != InvalidXLogRecPtr &&
+ state->currTLI != ThisTimeLineID &&
+ state->currTLI != 0 &&
+ (wantPage + wantLength) / XLogSegSize < state->currTLIValidUntil / XLogSegSize)
+ {
+ elog(DEBUG4, "Still on historical timeline %u until %X/%X",
+ state->currTLI,
+ (uint32)(state->currTLIValidUntil >> 32),
+ (uint32)(state->currTLIValidUntil));
+ return;
+ }
+
+ /*
+ * If we reach this point we're either looking up a page for random access,
+ * the current timeline just became historical, or we're reading from a new
+ * segment containing a timeline switch. In all cases we need to determine
+ * the newest timeline on the segment.
+ *
+ * If it's the current timeline we can just keep reading from here unless
+ * we detect a timeline switch that makes the current timeline historical.
+ * If it's a historical timeline we can read all the segment on the newest
+ * timeline because it contains all the old timelines' data too. So only
+ * one switch check is required.
+ */
+ {
+ /*
+ * We need to re-read the timeline history in case it's been changed
+ * by a promotion or replay from a cascaded replica.
+ */
+ List *timelineHistory = readTimeLineHistory(ThisTimeLineID);
+
+ XLogRecPtr endOfSegment = (((wantPage / XLogSegSize) + 1) * XLogSegSize) - 1;
+
+ Assert(wantPage / XLogSegSize == endOfSegment / XLogSegSize);
+
+ /* Find the timeline of the last LSN on the segment containing wantPage. */
+ state->currTLI = tliOfPointInHistory(endOfSegment, timelineHistory);
+ state->currTLIValidUntil = tliSwitchPoint(state->currTLI, timelineHistory, NULL);
+
+ Assert(state->currTLIValidUntil == InvalidXLogRecPtr ||
+ wantPage + wantLength < state->currTLIValidUntil);
+
+ list_free_deep(timelineHistory);
+
+ elog(DEBUG3, "switched to timeline %u valid until %X/%X",
+ state->currTLI,
+ (uint32)(state->currTLIValidUntil >> 32),
+ (uint32)(state->currTLIValidUntil));
+ }
+
+ elog(DEBUG3, "page read ptr %X/%X (for record %X/%X) is on segment with TLI %u valid until %X/%X, server current TLI is %u",
+ (uint32) (wantPage >> 32),
+ (uint32) wantPage,
+ (uint32) (state->currRecPtr >> 32),
+ (uint32) state->currRecPtr,
+ state->currTLI,
+ (uint32) (state->currTLIValidUntil >> 32),
+ (uint32) (state->currTLIValidUntil),
+ ThisTimeLineID);
+}
+
+/*
* read_page callback for reading local xlog files
*
* Public because it would likely be very helpful for someone writing another
@@ -770,28 +910,65 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
int count;
loc = targetPagePtr + reqLen;
+
+ /* Make sure enough xlog is available... */
while (1)
{
/*
- * TODO: we're going to have to do something more intelligent about
- * timelines on standbys. Use readTimeLineHistory() and
- * tliOfPointInHistory() to get the proper LSN? For now we'll catch
- * that case earlier, but the code and TODO is left in here for when
- * that changes.
+ * Check which timeline to get the record from.
+ *
+ * We have to do it each time through the loop because if we're in
+ * recovery as a cascading standby, the current timeline might've
+ * become historical.
*/
- if (!RecoveryInProgress())
+ XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
+
+ if (state->currTLI == ThisTimeLineID)
{
- *pageTLI = ThisTimeLineID;
- read_upto = GetFlushRecPtr();
+ /*
+ * We're reading from the current timeline so we might have to
+ * wait for the desired record to be generated (or, for a standby,
+ * received & replayed)
+ */
+ if (!RecoveryInProgress())
+ {
+ *pageTLI = ThisTimeLineID;
+ read_upto = GetFlushRecPtr();
+ }
+ else
+ read_upto = GetXLogReplayRecPtr(pageTLI);
+
+ if (loc <= read_upto)
+ break;
+
+ CHECK_FOR_INTERRUPTS();
+ pg_usleep(1000L);
}
else
- read_upto = GetXLogReplayRecPtr(pageTLI);
-
- if (loc <= read_upto)
+ {
+ /*
+ * We're on a historical timeline, so limit reading to the switch
+ * point where we moved to the next timeline.
+ *
+ * We don't need to GetFlushRecPtr or GetXLogReplayRecPtr. We know
+ * about the new timeline, so we must've received past the end of
+ * it.
+ */
+ read_upto = state->currTLIValidUntil;
+
+ /*
+ * Setting pageTLI to our wanted record's TLI is slightly wrong;
+ * the page might begin on an older timeline if it contains a
+ * timeline switch, since its xlog segment will have been copied
+ * from the prior timeline. This is pretty harmless though, as
+ * nothing cares so long as the timeline doesn't go backwards. We
+ * should read the page header instead; FIXME someday.
+ */
+ *pageTLI = state->currTLI;
+
+ /* No need to wait on a historical timeline */
break;
-
- CHECK_FOR_INTERRUPTS();
- pg_usleep(1000L);
+ }
}
if (targetPagePtr + XLOG_BLCKSZ <= read_upto)
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 318726e..4315fb3 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -234,12 +234,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
rsinfo->setResult = p->tupstore;
rsinfo->setDesc = p->tupdesc;
- /* compute the current end-of-wal */
- if (!RecoveryInProgress())
- end_of_wal = GetFlushRecPtr();
- else
- end_of_wal = GetXLogReplayRecPtr(NULL);
-
ReplicationSlotAcquire(NameStr(*name));
PG_TRY();
@@ -279,6 +273,12 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
/* invalidate non-timetravel entries */
InvalidateSystemCaches();
+ if (!RecoveryInProgress())
+ end_of_wal = GetFlushRecPtr();
+ else
+ end_of_wal = GetXLogReplayRecPtr(NULL);
+
+ /* Decode until we run out of records */
while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) ||
(ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < end_of_wal))
{
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index deaa7f5..caff9a6 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -27,6 +27,10 @@
#include "access/xlogrecord.h"
+#ifndef FRONTEND
+#include "nodes/pg_list.h"
+#endif
+
typedef struct XLogReaderState XLogReaderState;
/* Function type definition for the read_page callback */
@@ -160,6 +164,13 @@ struct XLogReaderState
/* beginning of the WAL record being read. */
XLogRecPtr currRecPtr;
+ /* timeline to read it from, 0 if a lookup is required */
+ TimeLineID currTLI;
+ /*
+ * Safe point to read to in currTLI if current TLI is historical
+ * (tliSwitchPoint) or InvalidXLogRecPtr if on current timeline.
+ */
+ XLogRecPtr currTLIValidUntil;
/* Buffer for current ReadRecord result (expandable) */
char *readRecordBuf;
--
2.7.4
0002-Test-harness-for-timeline-switches-following-in-logi.patchtext/plain; charset=UTF-8; name=0002-Test-harness-for-timeline-switches-following-in-logi.patchDownload
From b38c0df97fbab36fa347d201cd83624ed6a7f576 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Mon, 24 Oct 2016 17:41:12 +0200
Subject: [PATCH 2/2] Test harness for timeline switches following in logical
decoding
---
src/test/modules/Makefile | 1 +
src/test/modules/test_slot_timelines/.gitignore | 3 +
src/test/modules/test_slot_timelines/Makefile | 22 ++
src/test/modules/test_slot_timelines/README | 19 ++
.../expected/load_extension.out | 19 ++
.../expected/load_extension_1.out | 7 +
.../test_slot_timelines/sql/load_extension.sql | 7 +
.../test_slot_timelines--1.0.sql | 16 ++
.../test_slot_timelines/test_slot_timelines.c | 133 +++++++++
.../test_slot_timelines/test_slot_timelines.conf | 2 +
.../test_slot_timelines.control | 5 +
src/test/recovery/Makefile | 4 +-
.../recovery/t/006_logical_decoding_timelines.pl | 307 +++++++++++++++++++++
13 files changed, 543 insertions(+), 2 deletions(-)
create mode 100644 src/test/modules/test_slot_timelines/.gitignore
create mode 100644 src/test/modules/test_slot_timelines/Makefile
create mode 100644 src/test/modules/test_slot_timelines/README
create mode 100644 src/test/modules/test_slot_timelines/expected/load_extension.out
create mode 100644 src/test/modules/test_slot_timelines/expected/load_extension_1.out
create mode 100644 src/test/modules/test_slot_timelines/sql/load_extension.sql
create mode 100644 src/test/modules/test_slot_timelines/test_slot_timelines--1.0.sql
create mode 100644 src/test/modules/test_slot_timelines/test_slot_timelines.c
create mode 100644 src/test/modules/test_slot_timelines/test_slot_timelines.conf
create mode 100644 src/test/modules/test_slot_timelines/test_slot_timelines.control
create mode 100644 src/test/recovery/t/006_logical_decoding_timelines.pl
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 3ce9904..106b816 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -15,6 +15,7 @@ SUBDIRS = \
test_pg_dump \
test_rls_hooks \
test_shm_mq \
+ test_slot_timelines \
worker_spi
all: submake-generated-headers
diff --git a/src/test/modules/test_slot_timelines/.gitignore b/src/test/modules/test_slot_timelines/.gitignore
new file mode 100644
index 0000000..543c50d
--- /dev/null
+++ b/src/test/modules/test_slot_timelines/.gitignore
@@ -0,0 +1,3 @@
+results/
+tmp_check/
+log/
diff --git a/src/test/modules/test_slot_timelines/Makefile b/src/test/modules/test_slot_timelines/Makefile
new file mode 100644
index 0000000..21757c5
--- /dev/null
+++ b/src/test/modules/test_slot_timelines/Makefile
@@ -0,0 +1,22 @@
+# src/test/modules/test_slot_timelines/Makefile
+
+MODULES = test_slot_timelines
+PGFILEDESC = "test_slot_timelines - test utility for slot timeline following"
+
+EXTENSION = test_slot_timelines
+DATA = test_slot_timelines--1.0.sql
+
+EXTRA_INSTALL=contrib/test_decoding
+REGRESS=load_extension
+REGRESS_OPTS = --temp-config=$(top_srcdir)/src/test/modules/test_slot_timelines/test_slot_timelines.conf
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_slot_timelines
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_slot_timelines/README b/src/test/modules/test_slot_timelines/README
new file mode 100644
index 0000000..585f02f
--- /dev/null
+++ b/src/test/modules/test_slot_timelines/README
@@ -0,0 +1,19 @@
+A test module for logical decoding failover and timeline following.
+
+This module provides a minimal way to maintain logical slots on replicas
+that mirror the state on the master. It doesn't make decoding possible,
+just tracking slot state so that a decoding client that's using the master
+can follow a physical failover to the standby. The master doesn't know
+about the slots on the standby, they're synced by a client that connects
+to both.
+
+This is intentionally not part of the test_decoding module because that's meant
+to serve as example code, where this module exercises internal server features
+by unsafely exposing internal state to SQL. It's not the right way to do
+failover, it's just a simple way to test it from the perl TAP framework to
+prove the feature works.
+
+In a practical implementation of this approach a bgworker on the master would
+monitor slot positions and relay them to a bgworker on the standby that applies
+the position updates without exposing slot internals to SQL. That's too complex
+for this test framework though.
diff --git a/src/test/modules/test_slot_timelines/expected/load_extension.out b/src/test/modules/test_slot_timelines/expected/load_extension.out
new file mode 100644
index 0000000..7c2ad9d
--- /dev/null
+++ b/src/test/modules/test_slot_timelines/expected/load_extension.out
@@ -0,0 +1,19 @@
+CREATE EXTENSION test_slot_timelines;
+SELECT test_slot_timelines_create_logical_slot('test_slot', 'test_decoding');
+ test_slot_timelines_create_logical_slot
+-----------------------------------------
+
+(1 row)
+
+SELECT test_slot_timelines_advance_logical_slot('test_slot', txid_current()::text::xid, txid_current()::text::xid, pg_current_xlog_location(), pg_current_xlog_location());
+ test_slot_timelines_advance_logical_slot
+------------------------------------------
+
+(1 row)
+
+SELECT pg_drop_replication_slot('test_slot');
+ pg_drop_replication_slot
+--------------------------
+
+(1 row)
+
diff --git a/src/test/modules/test_slot_timelines/expected/load_extension_1.out b/src/test/modules/test_slot_timelines/expected/load_extension_1.out
new file mode 100644
index 0000000..0db21e4
--- /dev/null
+++ b/src/test/modules/test_slot_timelines/expected/load_extension_1.out
@@ -0,0 +1,7 @@
+CREATE EXTENSION test_slot_timelines;
+SELECT test_slot_timelines_create_logical_slot('test_slot', 'test_decoding');
+ERROR: replication slots can only be used if max_replication_slots > 0
+SELECT test_slot_timelines_advance_logical_slot('test_slot', txid_current()::text::xid, txid_current()::text::xid, pg_current_xlog_location(), pg_current_xlog_location());
+ERROR: replication slots can only be used if max_replication_slots > 0
+SELECT pg_drop_replication_slot('test_slot');
+ERROR: replication slots can only be used if max_replication_slots > 0
diff --git a/src/test/modules/test_slot_timelines/sql/load_extension.sql b/src/test/modules/test_slot_timelines/sql/load_extension.sql
new file mode 100644
index 0000000..2440355
--- /dev/null
+++ b/src/test/modules/test_slot_timelines/sql/load_extension.sql
@@ -0,0 +1,7 @@
+CREATE EXTENSION test_slot_timelines;
+
+SELECT test_slot_timelines_create_logical_slot('test_slot', 'test_decoding');
+
+SELECT test_slot_timelines_advance_logical_slot('test_slot', txid_current()::text::xid, txid_current()::text::xid, pg_current_xlog_location(), pg_current_xlog_location());
+
+SELECT pg_drop_replication_slot('test_slot');
diff --git a/src/test/modules/test_slot_timelines/test_slot_timelines--1.0.sql b/src/test/modules/test_slot_timelines/test_slot_timelines--1.0.sql
new file mode 100644
index 0000000..a1886f7
--- /dev/null
+++ b/src/test/modules/test_slot_timelines/test_slot_timelines--1.0.sql
@@ -0,0 +1,16 @@
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION test_slot_timelines" to load this file. \quit
+
+CREATE OR REPLACE FUNCTION test_slot_timelines_create_logical_slot(slot_name text, plugin text)
+RETURNS void
+STRICT LANGUAGE c AS 'MODULE_PATHNAME';
+
+COMMENT ON FUNCTION test_slot_timelines_create_logical_slot(text, text)
+IS 'Create a logical slot at a particular lsn and xid. Do not use in production servers, it is not safe. The slot is created with an invalid xmin and lsn.';
+
+CREATE OR REPLACE FUNCTION test_slot_timelines_advance_logical_slot(slot_name text, new_xmin xid, new_catalog_xmin xid, new_restart_lsn pg_lsn, new_confirmed_lsn pg_lsn)
+RETURNS void
+STRICT LANGUAGE c AS 'MODULE_PATHNAME';
+
+COMMENT ON FUNCTION test_slot_timelines_advance_logical_slot(text, xid, xid, pg_lsn, pg_lsn)
+IS 'Advance a logical slot directly. Do not use this in production servers, it is not safe.';
diff --git a/src/test/modules/test_slot_timelines/test_slot_timelines.c b/src/test/modules/test_slot_timelines/test_slot_timelines.c
new file mode 100644
index 0000000..1f07488
--- /dev/null
+++ b/src/test/modules/test_slot_timelines/test_slot_timelines.c
@@ -0,0 +1,133 @@
+/*--------------------------------------------------------------------------
+ *
+ * test_slot_timelines.c
+ * Test harness code for slot timeline following
+ *
+ * Copyright (c) 2016, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/test/modules/test_slot_timelines/test_slot_timelines.c
+ *
+ * -------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/transam.h"
+#include "fmgr.h"
+#include "miscadmin.h"
+#include "replication/slot.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(test_slot_timelines_create_logical_slot);
+PG_FUNCTION_INFO_V1(test_slot_timelines_advance_logical_slot);
+
+static void clear_slot_transient_state(void);
+
+/*
+ * Create a new logical slot, with invalid LSN and xid, directly. This does not
+ * use the snapshot builder or logical decoding machinery. It's only intended
+ * for creating a slot on a replica that mirrors the state of a slot on an
+ * upstream master.
+ *
+ * Note that this is test harness code. You shouldn't expose slot internals
+ * to SQL like this for any real world usage. See the README.
+ */
+Datum
+test_slot_timelines_create_logical_slot(PG_FUNCTION_ARGS)
+{
+ char *slotname = text_to_cstring(PG_GETARG_TEXT_P(0));
+ char *plugin = text_to_cstring(PG_GETARG_TEXT_P(1));
+
+ CheckSlotRequirements();
+
+ ReplicationSlotCreate(slotname, true, RS_PERSISTENT);
+
+ /* register the plugin name with the slot */
+ StrNCpy(NameStr(MyReplicationSlot->data.plugin), plugin, NAMEDATALEN);
+
+ /*
+ * Initialize persistent state to placeholders to be set by
+ * test_slot_timelines_advance_logical_slot .
+ */
+ MyReplicationSlot->data.xmin = InvalidTransactionId;
+ MyReplicationSlot->data.catalog_xmin = InvalidTransactionId;
+ MyReplicationSlot->data.restart_lsn = InvalidXLogRecPtr;
+ MyReplicationSlot->data.confirmed_flush = InvalidXLogRecPtr;
+
+ clear_slot_transient_state();
+
+ ReplicationSlotRelease();
+
+ PG_RETURN_VOID();
+}
+
+/*
+ * Set the state of a slot.
+ *
+ * This doesn't maintain the non-persistent state at all,
+ * but since the slot isn't in use that's OK.
+ *
+ * There's intentionally no check to prevent slots going backwards
+ * because they can actually go backwards if the master crashes when
+ * it hasn't yet flushed slot state to disk then we copy the older
+ * slot state after recovery.
+ *
+ * There's no checking done for xmin or catalog xmin either, since
+ * we can't really do anything useful that accounts for xid wrap-around.
+ *
+ * Note that this is test harness code. You shouldn't expose slot internals
+ * to SQL like this for any real world usage. See the README.
+ */
+Datum
+test_slot_timelines_advance_logical_slot(PG_FUNCTION_ARGS)
+{
+ char *slotname = text_to_cstring(PG_GETARG_TEXT_P(0));
+ TransactionId new_xmin = DatumGetTransactionId(PG_GETARG_DATUM(1));
+ TransactionId new_catalog_xmin = DatumGetTransactionId(PG_GETARG_DATUM(2));
+ XLogRecPtr restart_lsn = PG_GETARG_LSN(3);
+ XLogRecPtr confirmed_lsn = PG_GETARG_LSN(4);
+
+ CheckSlotRequirements();
+
+ ReplicationSlotAcquire(slotname);
+
+ if (MyReplicationSlot->data.database != MyDatabaseId)
+ elog(ERROR, "trying to update a slot on a different database");
+
+ MyReplicationSlot->data.xmin = new_xmin;
+ MyReplicationSlot->data.catalog_xmin = new_catalog_xmin;
+ MyReplicationSlot->data.restart_lsn = restart_lsn;
+ MyReplicationSlot->data.confirmed_flush = confirmed_lsn;
+
+ clear_slot_transient_state();
+
+ ReplicationSlotMarkDirty();
+ ReplicationSlotSave();
+ ReplicationSlotRelease();
+
+ ReplicationSlotsComputeRequiredXmin(false);
+ ReplicationSlotsComputeRequiredLSN();
+
+ PG_RETURN_VOID();
+}
+
+static void
+clear_slot_transient_state(void)
+{
+ Assert(MyReplicationSlot != NULL);
+
+ /*
+ * Make sure the slot state is the same as if it were newly loaded from
+ * disk on recovery.
+ */
+ MyReplicationSlot->effective_xmin = MyReplicationSlot->data.xmin;
+ MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin;
+
+ MyReplicationSlot->candidate_catalog_xmin = InvalidTransactionId;
+ MyReplicationSlot->candidate_xmin_lsn = InvalidXLogRecPtr;
+ MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
+ MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr;
+}
diff --git a/src/test/modules/test_slot_timelines/test_slot_timelines.conf b/src/test/modules/test_slot_timelines/test_slot_timelines.conf
new file mode 100644
index 0000000..56b46d7
--- /dev/null
+++ b/src/test/modules/test_slot_timelines/test_slot_timelines.conf
@@ -0,0 +1,2 @@
+max_replication_slots=2
+wal_level=logical
diff --git a/src/test/modules/test_slot_timelines/test_slot_timelines.control b/src/test/modules/test_slot_timelines/test_slot_timelines.control
new file mode 100644
index 0000000..dcee1a7
--- /dev/null
+++ b/src/test/modules/test_slot_timelines/test_slot_timelines.control
@@ -0,0 +1,5 @@
+# test_slot_timelines extension
+comment = 'Test utility for slot timeline following and logical decoding'
+default_version = '1.0'
+module_pathname = '$libdir/test_slot_timelines'
+relocatable = true
diff --git a/src/test/recovery/Makefile b/src/test/recovery/Makefile
index a847952..78570dd 100644
--- a/src/test/recovery/Makefile
+++ b/src/test/recovery/Makefile
@@ -9,6 +9,8 @@
#
#-------------------------------------------------------------------------
+EXTRA_INSTALL=contrib/test_decoding src/test/modules/test_slot_timelines
+
subdir = src/test/recovery
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
@@ -18,5 +20,3 @@ check:
clean distclean maintainer-clean:
rm -rf tmp_check
-
-EXTRA_INSTALL = contrib/test_decoding
diff --git a/src/test/recovery/t/006_logical_decoding_timelines.pl b/src/test/recovery/t/006_logical_decoding_timelines.pl
new file mode 100644
index 0000000..4ef9f5f
--- /dev/null
+++ b/src/test/recovery/t/006_logical_decoding_timelines.pl
@@ -0,0 +1,307 @@
+# Demonstrate that logical can follow timeline switches.
+#
+# Logical replication slots can follow timeline switches but it's
+# normally not possible to have a logical slot on a replica where
+# promotion and a timeline switch can occur. The only ways
+# we can create that circumstance are:
+#
+# * By doing a filesystem-level copy of the DB, since pg_basebackup
+# excludes pg_replslot but we can copy it directly; or
+#
+# * by creating a slot directly at the C level on the replica and
+# advancing it as we go using the low level APIs. It can't be done
+# from SQL since logical decoding isn't allowed on replicas.
+#
+# This module uses the first approach to show that timeline following
+# on a logical slot works.
+#
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 20;
+use RecursiveCopy;
+use File::Copy;
+
+my ($stdout, $stderr, $ret);
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1, has_archiving => 1);
+$node_master->append_conf('postgresql.conf', "wal_level = 'logical'\n");
+$node_master->append_conf('postgresql.conf', "max_replication_slots = 2\n");
+$node_master->append_conf('postgresql.conf', "max_wal_senders = 2\n");
+$node_master->append_conf('postgresql.conf', "log_min_messages = 'debug2'\n");
+$node_master->dump_info;
+$node_master->start;
+
+diag "Testing logical timeline following with a filesystem-level copy";
+
+$node_master->safe_psql('postgres',
+"SELECT pg_create_logical_replication_slot('before_basebackup', 'test_decoding');"
+);
+$node_master->safe_psql('postgres', "CREATE TABLE decoding(blah text);");
+$node_master->safe_psql('postgres',
+ "INSERT INTO decoding(blah) VALUES ('beforebb');");
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+
+my $backup_name = 'b1';
+$node_master->backup_fs_hot($backup_name);
+
+my $node_replica = get_new_node('replica');
+$node_replica->init_from_backup(
+ $node_master, $backup_name,
+ has_streaming => 1,
+ has_restoring => 1);
+$node_replica->start;
+
+$node_master->safe_psql('postgres',
+"SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');"
+);
+$node_master->safe_psql('postgres',
+ "INSERT INTO decoding(blah) VALUES ('afterbb');");
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+
+# Verify that only the before base_backup slot is on the replica
+$stdout = $node_replica->safe_psql('postgres',
+ 'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name');
+is($stdout, 'before_basebackup',
+ 'Expected to find only slot before_basebackup on replica');
+
+# Boom, crash
+$node_master->stop('immediate');
+
+$node_replica->promote;
+$node_replica->poll_query_until('postgres',
+ "SELECT NOT pg_is_in_recovery();");
+
+$node_replica->safe_psql('postgres',
+ "INSERT INTO decoding(blah) VALUES ('after failover');");
+
+# Shouldn't be able to read from slot created after base backup
+($ret, $stdout, $stderr) = $node_replica->psql('postgres',
+"SELECT data FROM pg_logical_slot_peek_changes('after_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');"
+);
+is($ret, 3, 'replaying from after_basebackup slot fails');
+like(
+ $stderr,
+ qr/replication slot "after_basebackup" does not exist/,
+ 'after_basebackup slot missing');
+
+# Should be able to read from slot created before base backup
+($ret, $stdout, $stderr) = $node_replica->psql(
+ 'postgres',
+"SELECT data FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');",
+ timeout => 30);
+is($ret, 0, 'replay from slot before_basebackup succeeds');
+is( $stdout, q(BEGIN
+table public.decoding: INSERT: blah[text]:'beforebb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'afterbb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'after failover'
+COMMIT), 'decoded expected data from slot before_basebackup');
+is($stderr, '', 'replay from slot before_basebackup produces no stderr');
+
+# We don't need the standby anymore
+$node_replica->teardown_node();
+
+
+# OK, time to try the same thing again, but this time we'll be using slot
+# mirroring on the standby and a pg_basebackup of the master.
+
+diag "Testing logical timeline following with test_slot_timelines module";
+
+$node_master->start();
+
+# Clean up after the last test
+$node_master->safe_psql('postgres', 'DELETE FROM decoding;');
+is( $node_master->psql(
+ 'postgres',
+'SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots;'),
+ 0,
+ 'dropping slots succeeds via pg_drop_replication_slot');
+
+# Same as before, we'll make one slot before basebackup, one after. This time
+# the basebackup will be with pg_basebackup so it'll omit both slots, then
+# we'll use SQL functions provided by the test_slot_timelines test module to sync
+# them to the replica, do some work, sync them and fail over then test again.
+# This time we should have both the before- and after-basebackup slots working.
+
+is( $node_master->psql(
+ 'postgres',
+"SELECT pg_create_logical_replication_slot('before_basebackup', 'test_decoding');"
+ ),
+ 0,
+ 'creating slot before_basebackup succeeds');
+
+$node_master->safe_psql('postgres',
+ "INSERT INTO decoding(blah) VALUES ('beforebb');");
+
+$backup_name = 'b2';
+$node_master->backup($backup_name);
+
+is( $node_master->psql(
+ 'postgres',
+"SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');"
+ ),
+ 0,
+ 'creating slot after_basebackup succeeds');
+
+$node_master->safe_psql('postgres',
+ "INSERT INTO decoding(blah) VALUES ('afterbb');");
+
+$node_replica = get_new_node('replica2');
+$node_replica->init_from_backup(
+ $node_master, $backup_name,
+ has_streaming => 1,
+ has_restoring => 1);
+
+$node_replica->start;
+
+# Verify the slots are both absent on the replica
+$stdout = $node_replica->safe_psql('postgres',
+ 'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name');
+is($stdout, '', 'No slots exist on the replica');
+
+# Now do our magic to sync the slot states across. Normally
+# this would be being done continuously by a bgworker but
+# we're just doing it by hand for this test. This is exposing
+# postgres innards to SQL so it's unsafe except for testing.
+$node_master->safe_psql('postgres', 'CREATE EXTENSION test_slot_timelines;');
+
+my $slotinfo = $node_master->safe_psql(
+ 'postgres',
+ qq{SELECT slot_name, plugin,
+ COALESCE(xmin, '0'), catalog_xmin,
+ restart_lsn, confirmed_flush_lsn
+ FROM pg_replication_slots ORDER BY slot_name}
+);
+diag "Copying slots to replica";
+open my $fh, '<', \$slotinfo or die $!;
+while (<$fh>)
+{
+ print $_;
+ chomp $_;
+ my ($slot_name, $plugin, $xmin, $catalog_xmin, $restart_lsn,
+ $confirmed_flush_lsn)
+ = map { "'$_'" } split qr/\|/, $_;
+
+ print
+"# Copying slot $slot_name,$plugin,$xmin,$catalog_xmin,$restart_lsn,$confirmed_flush_lsn\n";
+ $node_replica->safe_psql('postgres',
+ "SELECT test_slot_timelines_create_logical_slot($slot_name, $plugin);"
+ );
+ $node_replica->safe_psql('postgres',
+"SELECT test_slot_timelines_advance_logical_slot($slot_name, $xmin, $catalog_xmin, $restart_lsn, $confirmed_flush_lsn);"
+ );
+}
+close $fh or die $!;
+
+# Now both slots are present on the replica and exactly match the master
+$stdout = $node_replica->safe_psql('postgres',
+ 'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name');
+is( $stdout,
+ "after_basebackup\nbefore_basebackup",
+ 'both slots now exist on replica');
+
+$stdout = $node_replica->safe_psql(
+ 'postgres',
+ qq{SELECT slot_name, plugin, COALESCE(xmin, '0'), catalog_xmin,
+ restart_lsn, confirmed_flush_lsn
+ FROM pg_replication_slots
+ ORDER BY slot_name});
+is($stdout, $slotinfo,
+ "slot data read back from replica matches slot data on master");
+
+# We now have to copy some extra WAL to satisfy the requirements of the oldest
+# replication slot. pg_basebackup doesn't know to copy the extra WAL for slots
+# so we have to help out. We know the WAL is still retained on the master
+# because we haven't advanced the slots there.
+#
+# Figure out what the oldest segment we need is by looking at the restart_lsn
+# of the oldest slot.
+#
+# It only makes sense to do this once the slots are created on the replica,
+# otherwise it might just delete the segments again.
+
+my $oldest_needed_segment = $node_master->safe_psql(
+ 'postgres',
+ qq{SELECT pg_xlogfile_name((
+ SELECT restart_lsn
+ FROM pg_replication_slots
+ ORDER BY restart_lsn ASC
+ LIMIT 1
+ ));}
+);
+
+diag "oldest needed xlog seg is $oldest_needed_segment ";
+
+# WAL segment names sort lexically so we can just grab everything > than this
+# segment.
+opendir(my $pg_wal, $node_master->data_dir . "/pg_wal") or die $!;
+while (my $seg = readdir $pg_wal)
+{
+ next if $seg eq '.' or $seg eq '..';
+ next unless $seg >= $oldest_needed_segment && $seg =~ /^[0-9]{24}/;
+ diag "copying xlog seg $seg";
+ copy(
+ $node_master->data_dir . "/pg_wal/" . $seg,
+ $node_replica->data_dir . "/pg_wal/" . $seg
+ ) or die "copy of xlog seg $seg failed: $!";
+}
+closedir $pg_wal;
+
+# Boom, crash the master
+$node_master->stop('immediate');
+
+$node_replica->promote;
+$node_replica->poll_query_until('postgres',
+ "SELECT NOT pg_is_in_recovery();");
+
+$node_replica->safe_psql('postgres',
+ "INSERT INTO decoding(blah) VALUES ('after failover');");
+
+# This time we can read from both slots
+($ret, $stdout, $stderr) = $node_replica->psql(
+ 'postgres',
+"SELECT data FROM pg_logical_slot_peek_changes('after_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');",
+ timeout => 30);
+is($ret, 0, 'replay from slot after_basebackup succeeds');
+is( $stdout, q(BEGIN
+table public.decoding: INSERT: blah[text]:'afterbb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'after failover'
+COMMIT), 'decoded expected data from slot after_basebackup');
+is($stderr, '', 'replay from slot after_basebackup produces no stderr');
+
+# Should be able to read from slot created before base backup
+#
+# This would fail with an error about missing WAL segments if we hadn't
+# copied extra WAL earlier.
+($ret, $stdout, $stderr) = $node_replica->psql(
+ 'postgres',
+"SELECT data FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');",
+ timeout => 30);
+is($ret, 0, 'replay from slot before_basebackup succeeds');
+is( $stdout, q(BEGIN
+table public.decoding: INSERT: blah[text]:'beforebb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'afterbb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'after failover'
+COMMIT), 'decoded expected data from slot before_basebackup');
+is($stderr, '', 'replay from slot before_basebackup produces no stderr');
+
+($ret, $stdout, $stderr) = $node_replica->psql('postgres',
+ 'SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots;');
+is($ret, 0, 'dropping slots succeeds via pg_drop_replication_slot');
+is($stderr, '', 'dropping slots produces no stderr output');
+
+1;
--
2.7.4
On 24 October 2016 at 23:49, Petr Jelinek <petr@2ndquadrant.com> wrote:
Hi Craig,
On 01/09/16 06:08, Craig Ringer wrote:
Hi all
Attached is a rebased and updated logical decoding timeline following
patch for 10.0.This is a pre-requisite for the pending work on logical decoding on
standby servers and simplified failover of logical decoding.I went over this and it looks fine to me, I only rebased the patch on
top of current version (we renamed pg_xlog which broke the tests) and
split the test harness to separate patch. Otherwise I would consider
this to be ready for committer.I think this should go in early so that there is enough time in the
cycle to uncover potential issues if there are any, even though it looks
all correct to me.
Thanks for the review and update.
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 2016-10-24 17:49:13 +0200, Petr Jelinek wrote:
+ * Determine which timeline to read an xlog page from and set the + * XLogReaderState's state->currTLI to that timeline ID.
"XLogReaderState's state->currTLI" - the state's a bit redundant.
+ * The caller must also make sure it doesn't read past the current redo pointer + * so it doesn't fail to notice that the current timeline became historical. + */
Not sure what that means? The redo pointer usually menas the "logical
start" of the last checkpoint, but I don't see how thta could be meant
here?
+static void +XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength) +{ + const XLogRecPtr lastReadPage = state->readSegNo * XLogSegSize + state->readOff; + + elog(DEBUG4, "Determining timeline for read at %X/%X+%X", + (uint32)(wantPage>>32), (uint32)wantPage, wantLength);
Doing this on every single read from a page seems quite verbose. It's
also (like most or all the following debug elogs) violating the casing
prescribed by the message style guidelines.
+ /* + * If the desired page is currently read in and valid, we have nothing to do. + * + * The caller should've ensured that it didn't previously advance readOff + * past the valid limit of this timeline, so it doesn't matter if the current + * TLI has since become historical. + */ + if (lastReadPage == wantPage && + state->readLen != 0 && + lastReadPage + state->readLen >= wantPage + Min(wantLength,XLOG_BLCKSZ-1)) + { + elog(DEBUG4, "Wanted data already valid"); //XXX + return; + }
With that kind of comment/XXX present, this surely can't be ready for
committer?
+ /* + * If we're reading from the current timeline, it hasn't become historical + * and the page we're reading is after the last page read, we can again + * just carry on. (Seeking backwards requires a check to make sure the older + * page isn't on a prior timeline). + */
How can ThisTimeLineID become historical?
+ if (state->currTLI == ThisTimeLineID && wantPage >= lastReadPage) + { + Assert(state->currTLIValidUntil == InvalidXLogRecPtr); + elog(DEBUG4, "On current timeline"); + return; + }
Also, is it actually ok to rely on ThisTimeLineID here? That's IIRC not
maintained outside of the startup process, until recovery ended (cf it
being set in InitXLOGAccess() called via RecoveryInProgress()).
/* - * TODO: we're going to have to do something more intelligent about - * timelines on standbys. Use readTimeLineHistory() and - * tliOfPointInHistory() to get the proper LSN? For now we'll catch - * that case earlier, but the code and TODO is left in here for when - * that changes. + * Check which timeline to get the record from. + * + * We have to do it each time through the loop because if we're in + * recovery as a cascading standby, the current timeline might've + * become historical. */
I guess you mean cascading as "replicating logically from a physical
standby"? I don't think it's good to use cascading here, because that's
usually thought to mean doing physical SR from a standby...
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 318726e..4315fb3 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -234,12 +234,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin rsinfo->setResult = p->tupstore; rsinfo->setDesc = p->tupdesc;- /* compute the current end-of-wal */
- if (!RecoveryInProgress())
- end_of_wal = GetFlushRecPtr();
- else
- end_of_wal = GetXLogReplayRecPtr(NULL);
-
ReplicationSlotAcquire(NameStr(*name));PG_TRY();
@@ -279,6 +273,12 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
/* invalidate non-timetravel entries */
InvalidateSystemCaches();+ if (!RecoveryInProgress()) + end_of_wal = GetFlushRecPtr(); + else + end_of_wal = GetXLogReplayRecPtr(NULL); + + /* Decode until we run out of records */ while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) || (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < end_of_wal)) {
That seems like a pretty random change?
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index deaa7f5..caff9a6 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -27,6 +27,10 @@#include "access/xlogrecord.h"
+#ifndef FRONTEND +#include "nodes/pg_list.h" +#endif
Why is that needed?
diff --git a/src/test/modules/test_slot_timelines/README b/src/test/modules/test_slot_timelines/README new file mode 100644 index 0000000..585f02f --- /dev/null +++ b/src/test/modules/test_slot_timelines/README @@ -0,0 +1,19 @@ +A test module for logical decoding failover and timeline following. + +This module provides a minimal way to maintain logical slots on replicas +that mirror the state on the master. It doesn't make decoding possible, +just tracking slot state so that a decoding client that's using the master +can follow a physical failover to the standby. The master doesn't know +about the slots on the standby, they're synced by a client that connects +to both. + +This is intentionally not part of the test_decoding module because that's meant +to serve as example code, where this module exercises internal server features +by unsafely exposing internal state to SQL. It's not the right way to do +failover, it's just a simple way to test it from the perl TAP framework to +prove the feature works. + +In a practical implementation of this approach a bgworker on the master would +monitor slot positions and relay them to a bgworker on the standby that applies +the position updates without exposing slot internals to SQL. That's too complex +for this test framework though.
I still don't want to have this code in postgres, it's just an example
for doing something bad. Lets get proper feedback control in, and go
from there.
diff --git a/src/test/modules/test_slot_timelines/expected/load_extension_1.out b/src/test/modules/test_slot_timelines/expected/load_extension_1.out new file mode 100644 index 0000000..0db21e4 --- /dev/null +++ b/src/test/modules/test_slot_timelines/expected/load_extension_1.out @@ -0,0 +1,7 @@ +CREATE EXTENSION test_slot_timelines; +SELECT test_slot_timelines_create_logical_slot('test_slot', 'test_decoding'); +ERROR: replication slots can only be used if max_replication_slots > 0 +SELECT test_slot_timelines_advance_logical_slot('test_slot', txid_current()::text::xid, txid_current()::text::xid, pg_current_xlog_location(), pg_current_xlog_location()); +ERROR: replication slots can only be used if max_replication_slots > 0 +SELECT pg_drop_replication_slot('test_slot'); +ERROR: replication slots can only be used if max_replication_slots > 0 diff --git a/src/test/modules/test_slot_timelines/sql/load_extension.sql b/src/test/modules/test_slot_timelines/sql/load_extension.sql new file mode 100644 index 0000000..2440355
It doesn't seem worthwhlie to maintain a test for this - why do we run
the tests with those settings in the first place?
Greetings,
Andres Freund
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 12 November 2016 at 23:07, Andres Freund <andres@anarazel.de> wrote
On 2016-10-24 17:49:13 +0200, Petr Jelinek wrote:
+ * Determine which timeline to read an xlog page from and set the + * XLogReaderState's state->currTLI to that timeline ID."XLogReaderState's state->currTLI" - the state's a bit redundant.
Thanks for taking a look at the patch.
Agreed re above, fixed.
You know, having returned to this work after a long break doing other
things, it's clear that so much of the same work is being done in
XLogSendPhysical(...) and walsender.c's XLogRead(...) that maybe it is
worth facing the required refactoring so that we can use that in
logical decoding from both the walsender and the SQL interface.
The approach I originally took focused above all else on being
minimally intrusive, rather than clean and clear. Maybe it's better to
tidy this up instead.
Instead of coupling timeline following in logical decoding to the
XLogReader struct and having effectively duplicated logic to that for
physical walsender, move the walsender.c globals
static TimeLineID sendTimeLine = 0;
static TimeLineID sendTimeLineNextTLI = 0;
static bool sendTimeLineIsHistoric = false;
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr;
into new strut in timeline.h and move the logic into timeline.c . So
we stop relying on so many magic globals in the walsender. Don't keep
this state as a global, instead init it in StartReplication and pass
it as a param to WalSndLoop. For logical decoding from walsender,
store the walsender's copy in the XLogReaderState. For logical
decoding from SQL, set it up in pg_logical_slot_get_changes_guts(...)
and again store it in XLogReaderState.
In the process, finally unify the two XLogRead(...) functions in
xlogutils.c and walsender.c and merge walsender's
logical_read_xlog_page(...) with xlogutils'
logical_read_xlog_page(...) . Sure, we can't rely on a normal
backend's latch being set when wal is written like we can for the
walsender, but do we have to duplicate everything else? Can always add
a miscadmin.h var like IsWALSender and test that to see if we can
expect to have our latch set when new WAL comes in and adjust our
latch wait timeout interval appropriately.
The downside is of course that it touches physical replication, unlike
the current approach which avoids touching anything that could affect
physical replication at the cost of increasing the duplication between
physical and logical replication logic.
+ * The caller must also make sure it doesn't read past the current redo pointer + * so it doesn't fail to notice that the current timeline became historical. + */Not sure what that means? The redo pointer usually menas the "logical
start" of the last checkpoint, but I don't see how thta could be meant
here?
What I was trying to say was the current replay position on a standby. Amended.
+static void +XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength) +{ + const XLogRecPtr lastReadPage = state->readSegNo * XLogSegSize + state->readOff; + + elog(DEBUG4, "Determining timeline for read at %X/%X+%X", + (uint32)(wantPage>>32), (uint32)wantPage, wantLength);Doing this on every single read from a page seems quite verbose. It's
also (like most or all the following debug elogs) violating the casing
prescribed by the message style guidelines.
Agreed. It's unnecessary now. It's the sort of thing I'd want to keep
to have if Pg had fine grained module level log control, but we don't.
+ /* + * If we're reading from the current timeline, it hasn't become historical + * and the page we're reading is after the last page read, we can again + * just carry on. (Seeking backwards requires a check to make sure the older + * page isn't on a prior timeline). + */How can ThisTimeLineID become historical?
It can't, right now. Though I admit I didn't realise that at the time.
Presently ThisTimeLineID is only set in the walsender by
GetStandbyFlushRecPtr as called by IdentifySystem, and in
StartReplication, but not afterwards for logical replication, so
logical rep won't notice timeline transitions when on a standby unless
a decoding session is restarted. We don't support decoding sessions in
recovery, so it can't happen yet.
It will, when we're on a cascading standby and the upstream is
promoted, once we support logical decoding on standby. As part of that
we'll need to maintain the timeline in the walsender in logical
decoding like we do in physical, and limit logical decoding to the
currently replayed position with something like walsender's
GetStandbyFlushRecPtr(). But usable form the SQL interface too, of
course.
I'm currently implementing logical decoding on standby on top of this
and the catalog_xmin feedback patch, and in the process will be adding
tests for logical decoding on a physical replica where its upstream is
promoted from cascading standby to master, so I'll nail down any
issues for sure in that process.
I think it makes sense to consider this patch as part of the same set
as catalog_xmin feedback and decoding on standby, really. Without
those it's hard to come up with good real world tests and it's not
very useful. As you've correctly noted, the bundled tests are
contrived in the extreme.
Despite that, I've attached a revised version of the current approach
incorporating fixes for the issues you mentioned. It's preceded by the
patch to add an --endpos option to pg_recvlogical so that we can
properly test the walsender interface too.
+ if (state->currTLI == ThisTimeLineID && wantPage >= lastReadPage) + { + Assert(state->currTLIValidUntil == InvalidXLogRecPtr); + elog(DEBUG4, "On current timeline"); + return; + }Also, is it actually ok to rely on ThisTimeLineID here? That's IIRC not
maintained outside of the startup process, until recovery ended (cf it
being set in InitXLOGAccess() called via RecoveryInProgress()).
We don't yet allow decoding in recovery, so yes.
We can be reading xlog from a prior timeline to the server's current
timeline, since we're replaying xlog generated on the old master
before promotion. That's when this test fails and we carry on. Amended
for clarity.
Added this to func header comment, hopefully helps:
* It's necessary to care about timelines in xlogreader and logical decoding
* when we might be reading xlog generated prior to a promotion, either if
* we're currently a standby in recovery or if we're a promoted master reading
* xlogs generated by the old master before our promotion. Notably, logical
* decoding on a standby needs to be able to replay any remaining pending data
* from the old timeline when the standby or one of its upstreams being
* promoted.
/* - * TODO: we're going to have to do something more intelligent about - * timelines on standbys. Use readTimeLineHistory() and - * tliOfPointInHistory() to get the proper LSN? For now we'll catch - * that case earlier, but the code and TODO is left in here for when - * that changes. + * Check which timeline to get the record from. + * + * We have to do it each time through the loop because if we're in + * recovery as a cascading standby, the current timeline might've + * become historical. */I guess you mean cascading as "replicating logically from a physical
standby"? I don't think it's good to use cascading here, because that's
usually thought to mean doing physical SR from a standby...
I actually do mean cascading physical standby here.
If we're a physical standby and get promoted. IsInRecovery() becomes
false. Fine. But if the timeline can also change while we remain in
recovery, if a configuration like
A => B => C -> L
exists, where => is a physical replication link and -> is a logical
decoding session.
If A dies and B is promoted, C's timeline will change.
Comment now explains this.
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 318726e..4315fb3 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -234,12 +234,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin rsinfo->setResult = p->tupstore; rsinfo->setDesc = p->tupdesc;- /* compute the current end-of-wal */
- if (!RecoveryInProgress())
- end_of_wal = GetFlushRecPtr();
- else
- end_of_wal = GetXLogReplayRecPtr(NULL);
-
ReplicationSlotAcquire(NameStr(*name));PG_TRY();
@@ -279,6 +273,12 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
/* invalidate non-timetravel entries */
InvalidateSystemCaches();+ if (!RecoveryInProgress()) + end_of_wal = GetFlushRecPtr(); + else + end_of_wal = GetXLogReplayRecPtr(NULL); + + /* Decode until we run out of records */ while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) || (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < end_of_wal)) {That seems like a pretty random change?
You're right. It's a change from when the timeline determination logic
happened inside pg_logical_slot_get_changes_guts that I didn't realise
had become irrelevant now.
Reverted, and in the process:
- end_of_wal = GetXLogReplayRecPtr(NULL);
+ end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID);
so we maintain the current timeline when called in recovery, where
it's not inherited from the startup process.
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index deaa7f5..caff9a6 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -27,6 +27,10 @@#include "access/xlogrecord.h"
+#ifndef FRONTEND +#include "nodes/pg_list.h" +#endifWhy is that needed?
It isn't anymore. Thanks for the catch. It was used when the timeline
history had to be looked up more frequently, so it was kept in the
xlogreader state. Since pg_xlogdump uses the xlogreader this meant it
had to be conditional on backend use. Now that the timeline history
only has to be looked up when first examining the segment in which the
timeline transition occurs there's no point caching it there anymore.
(I'd still like to teach pg_xlogdump to follow timelines but that's a "later").
diff --git a/src/test/modules/test_slot_timelines/README b/src/test/modules/test_slot_timelines/README
I still don't want to have this code in postgres, it's just an example
for doing something bad. Lets get proper feedback control in, and go
from there.
100% agree, and I'm going to cut it from the patch since I'm
submitting updated feedback control now and have a PoC of decoding on
standby too.
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
Attachments:
0001-Add-an-optional-endpos-LSN-argument-to-pg_recvlogica.patchtext/x-patch; charset=UTF-8; name=0001-Add-an-optional-endpos-LSN-argument-to-pg_recvlogica.patchDownload
From b1266ad1adba8619bf43d4297d1ed6392e302198 Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Thu, 1 Sep 2016 12:37:40 +0800
Subject: [PATCH 1/3] Add an optional --endpos LSN argument to pg_recvlogical
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
pg_recvlogical usually just runs until cancelled or until the upstream
server disconnects. For some purposes, especially testing, it's useful
to have the ability to stop receive at a specified LSN without having
to parse the output and deal with buffering issues, etc.
Add a --endpos parameter that takes the LSN at which no further
messages should be written and receive should stop.
Craig Ringer, Álvaro Herrera
---
doc/src/sgml/ref/pg_recvlogical.sgml | 34 ++++++++
src/bin/pg_basebackup/pg_recvlogical.c | 145 +++++++++++++++++++++++++++++----
2 files changed, 164 insertions(+), 15 deletions(-)
diff --git a/doc/src/sgml/ref/pg_recvlogical.sgml b/doc/src/sgml/ref/pg_recvlogical.sgml
index b35881f..d066ce8 100644
--- a/doc/src/sgml/ref/pg_recvlogical.sgml
+++ b/doc/src/sgml/ref/pg_recvlogical.sgml
@@ -38,6 +38,14 @@ PostgreSQL documentation
constraints as <xref linkend="app-pgreceivexlog">, plus those for logical
replication (see <xref linkend="logicaldecoding">).
</para>
+
+ <para>
+ <command>pg_recvlogical</> has no equivalent to the logical decoding
+ SQL interface's peek and get modes. It sends replay confirmations for
+ data lazily as it receives it and on clean exit. To examine pending data on
+ a slot without consuming it, use
+ <link linkend="functions-replication"><function>pg_logical_slot_peek_changes</></>.
+ </para>
</refsect1>
<refsect1>
@@ -155,6 +163,32 @@ PostgreSQL documentation
</varlistentry>
<varlistentry>
+ <term><option>-E <replaceable>lsn</replaceable></option></term>
+ <term><option>--endpos=<replaceable>lsn</replaceable></option></term>
+ <listitem>
+ <para>
+ In <option>--start</option> mode, automatically stop replication
+ and exit with normal exit status 0 when receiving reaches the
+ specified LSN. If specified when not in <option>--start</option>
+ mode, an error is raised.
+ </para>
+
+ <para>
+ If there's a record with LSN exactly equal to <replaceable>lsn</>,
+ the record will be output.
+ </para>
+
+ <para>
+ The <option>--endpos</option> option is not aware of transaction
+ boundaries and may truncate output partway through a transaction.
+ Any partially output transaction will not be consumed and will be
+ replayed again when the slot is next read from. Individual messages
+ are never truncated.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
<term><option>--if-not-exists</option></term>
<listitem>
<para>
diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index cb5f989..c700edf 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -40,6 +40,7 @@ static int noloop = 0;
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
static int fsync_interval = 10 * 1000; /* 10 sec = default */
static XLogRecPtr startpos = InvalidXLogRecPtr;
+static XLogRecPtr endpos = InvalidXLogRecPtr;
static bool do_create_slot = false;
static bool slot_exists_ok = false;
static bool do_start_slot = false;
@@ -63,6 +64,9 @@ static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
static void usage(void);
static void StreamLogicalLog(void);
static void disconnect_and_exit(int code);
+static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
+static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
+ bool keepalive, XLogRecPtr lsn);
static void
usage(void)
@@ -81,6 +85,7 @@ usage(void)
" time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n"));
printf(_(" -I, --startpos=LSN where in an existing slot should the streaming start\n"));
+ printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n"));
printf(_(" -n, --no-loop do not loop on connection lost\n"));
printf(_(" -o, --option=NAME[=VALUE]\n"
" pass option NAME with optional value VALUE to the\n"
@@ -281,6 +286,7 @@ StreamLogicalLog(void)
int bytes_written;
int64 now;
int hdr_len;
+ XLogRecPtr cur_record_lsn = InvalidXLogRecPtr;
if (copybuf != NULL)
{
@@ -454,6 +460,7 @@ StreamLogicalLog(void)
int pos;
bool replyRequested;
XLogRecPtr walEnd;
+ bool endposReached = false;
/*
* Parse the keepalive message, enclosed in the CopyData message.
@@ -476,18 +483,32 @@ StreamLogicalLog(void)
}
replyRequested = copybuf[pos];
- /* If the server requested an immediate reply, send one. */
- if (replyRequested)
+ if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
{
- /* fsync data, so we send a recent flush pointer */
- if (!OutputFsync(now))
- goto error;
+ /*
+ * If there's nothing to read on the socket until a keepalive
+ * we know that the server has nothing to send us; and if
+ * walEnd has passed endpos, we know nothing else can have
+ * committed before endpos. So we can bail out now.
+ */
+ endposReached = true;
+ }
- now = feGetCurrentTimestamp();
- if (!sendFeedback(conn, now, true, false))
+ /* Send a reply, if necessary */
+ if (replyRequested || endposReached)
+ {
+ if (!flushAndSendFeedback(conn, &now))
goto error;
last_status = now;
}
+
+ if (endposReached)
+ {
+ prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr);
+ time_to_abort = true;
+ break;
+ }
+
continue;
}
else if (copybuf[0] != 'w')
@@ -497,7 +518,6 @@ StreamLogicalLog(void)
goto error;
}
-
/*
* Read the header of the XLogData message, enclosed in the CopyData
* message. We only need the WAL location field (dataStart), the rest
@@ -515,12 +535,23 @@ StreamLogicalLog(void)
}
/* Extract WAL location for this block */
- {
- XLogRecPtr temp = fe_recvint64(©buf[1]);
+ cur_record_lsn = fe_recvint64(©buf[1]);
- output_written_lsn = Max(temp, output_written_lsn);
+ if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
+ {
+ /*
+ * We've read past our endpoint, so prepare to go away being
+ * cautious about what happens to our output data.
+ */
+ if (!flushAndSendFeedback(conn, &now))
+ goto error;
+ prepareToTerminate(conn, endpos, false, cur_record_lsn);
+ time_to_abort = true;
+ break;
}
+ output_written_lsn = Max(cur_record_lsn, output_written_lsn);
+
bytes_left = r - hdr_len;
bytes_written = 0;
@@ -557,10 +588,29 @@ StreamLogicalLog(void)
strerror(errno));
goto error;
}
+
+ if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos)
+ {
+ /* endpos was exactly the record we just processed, we're done */
+ if (!flushAndSendFeedback(conn, &now))
+ goto error;
+ prepareToTerminate(conn, endpos, false, cur_record_lsn);
+ time_to_abort = true;
+ break;
+ }
}
res = PQgetResult(conn);
- if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ if (PQresultStatus(res) == PGRES_COPY_OUT)
+ {
+ /*
+ * We're doing a client-initiated clean exit and have sent CopyDone to
+ * the server. We've already sent replay confirmation and fsync'd so
+ * we can just clean up the connection now.
+ */
+ goto error;
+ }
+ else if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr,
_("%s: unexpected termination of replication stream: %s"),
@@ -638,6 +688,7 @@ main(int argc, char **argv)
{"password", no_argument, NULL, 'W'},
/* replication options */
{"startpos", required_argument, NULL, 'I'},
+ {"endpos", required_argument, NULL, 'E'},
{"option", required_argument, NULL, 'o'},
{"plugin", required_argument, NULL, 'P'},
{"status-interval", required_argument, NULL, 's'},
@@ -673,7 +724,7 @@ main(int argc, char **argv)
}
}
- while ((c = getopt_long(argc, argv, "f:F:nvd:h:p:U:wWI:o:P:s:S:",
+ while ((c = getopt_long(argc, argv, "f:F:nvd:h:p:U:wWI:E:o:P:s:S:",
long_options, &option_index)) != -1)
{
switch (c)
@@ -733,6 +784,16 @@ main(int argc, char **argv)
}
startpos = ((uint64) hi) << 32 | lo;
break;
+ case 'E':
+ if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
+ {
+ fprintf(stderr,
+ _("%s: could not parse end position \"%s\"\n"),
+ progname, optarg);
+ exit(1);
+ }
+ endpos = ((uint64) hi) << 32 | lo;
+ break;
case 'o':
{
char *data = pg_strdup(optarg);
@@ -857,6 +918,16 @@ main(int argc, char **argv)
exit(1);
}
+ if (endpos != InvalidXLogRecPtr && !do_start_slot)
+ {
+ fprintf(stderr,
+ _("%s: cannot use --create-slot or --drop-slot together with --endpos\n"),
+ progname);
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ progname);
+ exit(1);
+ }
+
#ifndef WIN32
pqsignal(SIGINT, sigint_handler);
pqsignal(SIGHUP, sighup_handler);
@@ -923,8 +994,8 @@ main(int argc, char **argv)
if (time_to_abort)
{
/*
- * We've been Ctrl-C'ed. That's not an error, so exit without an
- * errorcode.
+ * We've been Ctrl-C'ed or reached an exit limit condition. That's
+ * not an error, so exit without an errorcode.
*/
disconnect_and_exit(0);
}
@@ -943,3 +1014,47 @@ main(int argc, char **argv)
}
}
}
+
+/*
+ * Fsync our output data, and send a feedback message to the server. Returns
+ * true if successful, false otherwise.
+ *
+ * If successful, *now is updated to the current timestamp just before sending
+ * feedback.
+ */
+static bool
+flushAndSendFeedback(PGconn *conn, TimestampTz *now)
+{
+ /* flush data to disk, so that we send a recent flush pointer */
+ if (!OutputFsync(*now))
+ return false;
+ *now = feGetCurrentTimestamp();
+ if (!sendFeedback(conn, *now, true, false))
+ return false;
+
+ return true;
+}
+
+/*
+ * Try to inform the server about of upcoming demise, but don't wait around or
+ * retry on failure.
+ */
+static void
+prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn)
+{
+ (void) PQputCopyEnd(conn, NULL);
+ (void) PQflush(conn);
+
+ if (verbose)
+ {
+ if (keepalive)
+ fprintf(stderr, "%s: endpos %X/%X reached by keepalive\n",
+ progname,
+ (uint32) (endpos >> 32), (uint32) endpos);
+ else
+ fprintf(stderr, "%s: endpos %X/%X reached by record at %X/%X\n",
+ progname, (uint32) (endpos >> 32), (uint32) (endpos),
+ (uint32) (lsn >> 32), (uint32) lsn);
+
+ }
+}
--
2.5.5
0002-Add-a-pg_recvlogical-wrapper-to-PostgresNode.patchtext/x-patch; charset=US-ASCII; name=0002-Add-a-pg_recvlogical-wrapper-to-PostgresNode.patchDownload
From 8a4b4788940e2b72383e2e0195e7df7f1cf4061c Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Tue, 15 Nov 2016 16:06:16 +0800
Subject: [PATCH 2/3] Add a pg_recvlogical wrapper to PostgresNode
---
src/test/perl/PostgresNode.pm | 75 ++++++++++++++++++++++++++++-
src/test/recovery/t/006_logical_decoding.pl | 31 +++++++++++-
2 files changed, 104 insertions(+), 2 deletions(-)
diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm
index c1b16ca..b2e4813 100644
--- a/src/test/perl/PostgresNode.pm
+++ b/src/test/perl/PostgresNode.pm
@@ -1125,7 +1125,7 @@ sub psql
# IPC::Run::run threw an exception. re-throw unless it's a
# timeout, which we'll handle by testing is_expired
die $exc_save
- if (blessed($exc_save) || $exc_save ne $timeout_exception);
+ if (blessed($exc_save) || $exc_save !~ qr/$timeout_exception/);
$ret = undef;
@@ -1325,6 +1325,79 @@ sub run_log
TestLib::run_log(@_);
}
+=pod $node->pg_recvlogical_upto(self, dbname, slot_name, endpos, timeout_secs, ...)
+
+Invoke pg_recvlogical to read from slot_name on dbname until LSN endpos, which
+corresponds to pg_recvlogical --endpos. Gives up after timeout (if nonzero).
+
+Disallows pg_recvlogial from internally retrying on error by passing --no-loop.
+
+Plugin options are passed as additional keyword arguments.
+
+If called in scalar context, returns stdout, and die()s on timeout or nonzero return.
+
+If called in array context, returns a tuple of (retval, stdout, stderr, timeout).
+timeout is the IPC::Run::Timeout object whose is_expired method can be tested
+to check for timeout. retval is undef on timeout.
+
+=cut
+
+sub pg_recvlogical_upto
+{
+ my ($self, $dbname, $slot_name, $endpos, $timeout_secs, %plugin_options) = @_;
+ my ($stdout, $stderr);
+
+ my $timeout_exception = 'pg_recvlogical timed out';
+
+ my @cmd = ('pg_recvlogical', '-S', $slot_name, '--dbname', $self->connstr($dbname));
+ push @cmd, '--endpos', $endpos if ($endpos);
+ push @cmd, '-f', '-', '--no-loop', '--start';
+
+ while (my ($k, $v) = each %plugin_options)
+ {
+ die "= is not permitted to appear in replication option name" if ($k =~ qr/=/);
+ push @cmd, "-o", "$k=$v";
+ }
+
+ my $timeout;
+ $timeout = IPC::Run::timeout($timeout_secs, exception => $timeout_exception ) if $timeout_secs;
+ my $ret = 0;
+
+ do {
+ local $@;
+ eval {
+ IPC::Run::run(\@cmd, ">", \$stdout, "2>", \$stderr, $timeout);
+ $ret = $?;
+ };
+ my $exc_save = $@;
+ if ($exc_save)
+ {
+ # IPC::Run::run threw an exception. re-throw unless it's a
+ # timeout, which we'll handle by testing is_expired
+ die $exc_save
+ if (blessed($exc_save) || $exc_save !~ qr/$timeout_exception/);
+
+ $ret = undef;
+
+ die "Got timeout exception '$exc_save' but timer not expired?!"
+ unless $timeout->is_expired;
+
+ die "$exc_save waiting for endpos $endpos with stdout '$stdout', stderr '$stderr'"
+ unless wantarray;
+ }
+ };
+
+ if (wantarray)
+ {
+ return ($ret, $stdout, $stderr, $timeout);
+ }
+ else
+ {
+ die "pg_recvlogical exited with code '$ret', stdout '$stdout' and stderr '$stderr'" if $ret;
+ return $stdout;
+ }
+}
+
=pod
=back
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
index b80a9a9..d8cc8d3 100644
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -1,9 +1,13 @@
# Testing of logical decoding using SQL interface and/or pg_recvlogical
+#
+# Most logical decoding tests are in contrib/test_decoding. This module
+# is for work that doesn't fit well there, like where server restarts
+# are required.
use strict;
use warnings;
use PostgresNode;
use TestLib;
-use Test::More tests => 2;
+use Test::More tests => 5;
# Initialize master node
my $node_master = get_new_node('master');
@@ -36,5 +40,30 @@ $result = $node_master->safe_psql('postgres', qq[SELECT pg_logical_slot_get_chan
chomp($result);
is($result, '', 'Decoding after fast restart repeats no rows');
+# Insert some rows and verify that we get the same results from pg_recvlogical
+# and the SQL interface.
+$node_master->safe_psql('postgres', qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;]);
+
+my $expected = q{BEGIN
+table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'
+table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'
+table public.decoding_test: INSERT: x[integer]:3 y[text]:'3'
+table public.decoding_test: INSERT: x[integer]:4 y[text]:'4'
+COMMIT};
+
+my $stdout_sql = $node_master->safe_psql('postgres', qq[SELECT data FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]);
+is($stdout_sql, $expected, 'got expected output from SQL decoding session');
+
+my $endpos = $node_master->safe_psql('postgres', "SELECT location FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY location DESC LIMIT 1;");
+diag "waiting to replay $endpos";
+
+my $stdout_recv = $node_master->pg_recvlogical_upto('postgres', 'test_slot', $endpos, 10, 'include-xids' => '0', 'skip-empty-xacts' => '1');
+chomp($stdout_recv);
+is($stdout_recv, $expected, 'got same expected output from pg_recvlogical decoding session');
+
+$stdout_recv = $node_master->pg_recvlogical_upto('postgres', 'test_slot', $endpos, 10, 'include-xids' => '0', 'skip-empty-xacts' => '1');
+chomp($stdout_recv);
+is($stdout_recv, '', 'pg_recvlogical acknowledged changes, nothing pending on slot');
+
# done with the node
$node_master->stop;
--
2.5.5
0003-Follow-timeline-switches-in-logical-decoding.patchtext/x-patch; charset=US-ASCII; name=0003-Follow-timeline-switches-in-logical-decoding.patchDownload
From f09dd3160a913c2a900f6ffef61fa6e94821ed42 Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Thu, 1 Sep 2016 10:16:55 +0800
Subject: [PATCH 3/3] Follow timeline switches in logical decoding
When decoding from a logical slot, it's necessary for xlog reading to
be able to read xlog from historical (i.e. not current) timelines.
Otherwise decoding fails after failover to a physical replica because
the oldest still-needed archives are in the historical timeline.
Supporting logical decoding timeline following is a pre-requisite for
logical decoding on physical standby servers. It also makes it
possible to promote a replica with logical slots to a master and
replay from those slots, allowing logical decoding applications to
follow physical failover.
Logical slots cannot actually be created on a replica without use of
the low-level C slot management APIs so this is mostly foundation work
for subsequent changes to enable logical decoding on standbys.
Tests are included to exercise the functionality using a cold disk-level copy
of the master that's started up as a replica with slots intact, but the
intended use of the functionality is with logical decoding on a standby.
Note that an earlier version of logical decoding timeline following
was committed to 9.5 as 24c5f1a103ce, 3a3b309041b0, 82c83b337202, and
f07d18b6e94d. It was then reverted by c1543a81a7a8 just after 9.5
feature freeze when issues were discovered too late to safely fix them
in the 9.5 release cycle.
The prior approach failed to consider that a record could be split
across pages that are on different segments, where the new segment
contains the start of a new timeline. In that case the old segment
might be missing or renamed with a .partial suffix.
This patch reworks the logic to be page-based and in the process
simplify how the last timeline for a segment is looked up.
---
src/backend/access/transam/xlogutils.c | 200 +++++++++++++++++++--
src/backend/replication/logical/logicalfuncs.c | 7 +-
src/backend/replication/walsender.c | 11 +-
src/include/access/xlogreader.h | 16 ++
src/include/access/xlogutils.h | 3 +
src/test/recovery/Makefile | 2 +
.../recovery/t/009_logical_decoding_timelines.pl | 185 +++++++++++++++++++
7 files changed, 402 insertions(+), 22 deletions(-)
create mode 100644 src/test/recovery/t/009_logical_decoding_timelines.pl
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 51a8e8d..ab15cf3 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -19,6 +19,7 @@
#include <unistd.h>
+#include "access/timeline.h"
#include "access/xlog.h"
#include "access/xlog_internal.h"
#include "access/xlogutils.h"
@@ -660,6 +661,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
/* state maintained across calls */
static int sendFile = -1;
static XLogSegNo sendSegNo = 0;
+ static TimeLineID sendTLI = 0;
static uint32 sendOff = 0;
p = buf;
@@ -675,7 +677,8 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
startoff = recptr % XLogSegSize;
/* Do we need to switch to a different xlog segment? */
- if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
+ if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo) ||
+ sendTLI != tli)
{
char path[MAXPGPATH];
@@ -702,6 +705,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
path)));
}
sendOff = 0;
+ sendTLI = tli;
}
/* Need to seek in the file? */
@@ -750,6 +754,129 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
}
/*
+ * Determine which timeline to read an xlog page from and set the
+ * XLogReaderState's currTLI to that timeline ID.
+ *
+ * It's necessary to care about timelines in xlogreader and logical decoding
+ * when we might be reading xlog generated prior to a promotion, either if
+ * we're currently a standby in recovery or if we're a promoted master reading
+ * xlogs generated by the old master before our promotion. Notably, logical
+ * decoding on a standby needs to be able to replay any remaining pending data
+ * from the old timeline when the standby or one of its upstreams being
+ * promoted.
+ *
+ * wantPage must be set to the start address of the page to read and
+ * wantLength to the amount of the page that will be read, up to
+ * XLOG_BLCKSZ. If the amount to be read isn't known, pass XLOG_BLCKSZ.
+ *
+ * We switch to an xlog segment from the new timeline eagerly when on a
+ * historical timeline, as soon as we reach the start of the xlog segment
+ * containing the timeline switch. The server copied the segment to the new
+ * timeline so all the data up to the switch point is the same, but there's no
+ * guarantee the old segment will still exist. It may have been deleted or
+ * renamed with a .partial suffix so we can't necessarily keep reading from
+ * the old TLI even though tliSwitchPoint says it's OK.
+ *
+ * We can't just check the timeline when we read a page on a different segment
+ * to the last page. We could've received a timeline switch from a cascading
+ * upstream, so the current segment ends and we have to switch to a new one.
+ * Even in the middle of reading a page we could have to dump the cached page
+ * and switch to a new TLI.
+ *
+ * Because of this, callers MAY NOT assume that currTLI is the timeline that
+ * will be in a page's xlp_tli; the page may begin on an older timeline or we
+ * might be reading from historical timeline data on a segment that's been
+ * copied to a new timeline.
+ *
+ * The caller must also make sure it doesn't read past the current replay
+ * position if executing in recovery, so it doesn't fail to notice that the
+ * current timeline became historical.
+ */
+void
+XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
+{
+ const XLogRecPtr lastReadPage = state->readSegNo * XLogSegSize + state->readOff;
+
+ Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0);
+ Assert(wantLength <= XLOG_BLCKSZ);
+ Assert(state->readLen == 0 || state->readLen <= XLOG_BLCKSZ);
+
+ /*
+ * If the desired page is currently read in and valid, we have nothing to do.
+ *
+ * The caller should've ensured that it didn't previously advance readOff
+ * past the valid limit of this timeline, so it doesn't matter if the current
+ * TLI has since become historical.
+ */
+ if (lastReadPage == wantPage &&
+ state->readLen != 0 &&
+ lastReadPage + state->readLen >= wantPage + Min(wantLength,XLOG_BLCKSZ-1))
+ return;
+
+ /*
+ * If we're reading from the current timeline, it hasn't become historical
+ * and the page we're reading is after the last page read, we can again
+ * just carry on. (Seeking backwards requires a check to make sure the older
+ * page isn't on a prior timeline).
+ */
+ if (state->currTLI == ThisTimeLineID && wantPage >= lastReadPage)
+ {
+ Assert(state->currTLIValidUntil == InvalidXLogRecPtr);
+ return;
+ }
+
+ /*
+ * If we're just reading pages from a previously validated historical
+ * timeline and the timeline we're reading from is valid until the
+ * end of the current segment we can just keep reading.
+ */
+ if (state->currTLIValidUntil != InvalidXLogRecPtr &&
+ state->currTLI != ThisTimeLineID &&
+ state->currTLI != 0 &&
+ (wantPage + wantLength) / XLogSegSize < state->currTLIValidUntil / XLogSegSize)
+ return;
+
+ /*
+ * If we reach this point we're either looking up a page for random access,
+ * the current timeline just became historical, or we're reading from a new
+ * segment containing a timeline switch. In all cases we need to determine
+ * the newest timeline on the segment.
+ *
+ * If it's the current timeline we can just keep reading from here unless
+ * we detect a timeline switch that makes the current timeline historical.
+ * If it's a historical timeline we can read all the segment on the newest
+ * timeline because it contains all the old timelines' data too. So only
+ * one switch check is required.
+ */
+ {
+ /*
+ * We need to re-read the timeline history in case it's been changed
+ * by a promotion or replay from a cascaded replica.
+ */
+ List *timelineHistory = readTimeLineHistory(ThisTimeLineID);
+
+ XLogRecPtr endOfSegment = (((wantPage / XLogSegSize) + 1) * XLogSegSize) - 1;
+
+ Assert(wantPage / XLogSegSize == endOfSegment / XLogSegSize);
+
+ /* Find the timeline of the last LSN on the segment containing wantPage. */
+ state->currTLI = tliOfPointInHistory(endOfSegment, timelineHistory);
+ state->currTLIValidUntil = tliSwitchPoint(state->currTLI, timelineHistory,
+ &state->nextTLI);
+
+ Assert(state->currTLIValidUntil == InvalidXLogRecPtr ||
+ wantPage + wantLength < state->currTLIValidUntil);
+
+ list_free_deep(timelineHistory);
+
+ elog(DEBUG3, "switched to timeline %u valid until %X/%X",
+ state->currTLI,
+ (uint32)(state->currTLIValidUntil >> 32),
+ (uint32)(state->currTLIValidUntil));
+ }
+}
+
+/*
* read_page callback for reading local xlog files
*
* Public because it would likely be very helpful for someone writing another
@@ -770,28 +897,71 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
int count;
loc = targetPagePtr + reqLen;
+
+ /* Make sure enough xlog is available... */
while (1)
{
/*
- * TODO: we're going to have to do something more intelligent about
- * timelines on standbys. Use readTimeLineHistory() and
- * tliOfPointInHistory() to get the proper LSN? For now we'll catch
- * that case earlier, but the code and TODO is left in here for when
- * that changes.
+ * Check which timeline to get the record from.
+ *
+ * We have to do it each time through the loop because if we're in
+ * recovery as a cascading standby, the current timeline might've
+ * become historical. We can't rely on RecoveryInProgress() because
+ * in a standby configuration like
+ *
+ * A => B => C
+ *
+ * if we're a logical decoding on C, and B gets promoted, our timeline
+ * will change while we remain in recovery.
*/
- if (!RecoveryInProgress())
+ XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
+
+ if (state->currTLI == ThisTimeLineID)
{
- *pageTLI = ThisTimeLineID;
- read_upto = GetFlushRecPtr();
+ /*
+ * We're reading from the current timeline so we might have to
+ * wait for the desired record to be generated (or, for a standby,
+ * received & replayed)
+ */
+ if (!RecoveryInProgress())
+ {
+ *pageTLI = ThisTimeLineID;
+ read_upto = GetFlushRecPtr();
+ }
+ else
+ read_upto = GetXLogReplayRecPtr(pageTLI);
+
+ if (loc <= read_upto)
+ break;
+
+ CHECK_FOR_INTERRUPTS();
+ pg_usleep(1000L);
}
else
- read_upto = GetXLogReplayRecPtr(pageTLI);
-
- if (loc <= read_upto)
+ {
+ /*
+ * We're on a historical timeline, so limit reading to the switch
+ * point where we moved to the next timeline.
+ *
+ * We don't need to GetFlushRecPtr or GetXLogReplayRecPtr. We know
+ * about the new timeline, so we must've received past the end of
+ * it.
+ */
+ read_upto = state->currTLIValidUntil;
+
+ /*
+ * Setting pageTLI to our wanted record's TLI is slightly wrong;
+ * the page might begin on an older timeline if it contains a
+ * timeline switch, since its xlog segment will have been copied
+ * from the prior timeline. This is pretty harmless though, as
+ * nothing cares so long as the timeline doesn't go backwards. We
+ * should read the page header instead; FIXME someday.
+ */
+ *pageTLI = state->currTLI;
+
+ /* No need to wait on a historical timeline */
break;
-
- CHECK_FOR_INTERRUPTS();
- pg_usleep(1000L);
+ }
}
if (targetPagePtr + XLOG_BLCKSZ <= read_upto)
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 318726e..a8f7b76 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -234,13 +234,13 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
rsinfo->setResult = p->tupstore;
rsinfo->setDesc = p->tupdesc;
+ ReplicationSlotAcquire(NameStr(*name));
+
/* compute the current end-of-wal */
if (!RecoveryInProgress())
end_of_wal = GetFlushRecPtr();
else
- end_of_wal = GetXLogReplayRecPtr(NULL);
-
- ReplicationSlotAcquire(NameStr(*name));
+ end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID);
PG_TRY();
{
@@ -279,6 +279,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
/* invalidate non-timetravel entries */
InvalidateSystemCaches();
+ /* Decode until we run out of records */
while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) ||
(ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < end_of_wal))
{
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index bc5e508..ef8ba80 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -47,6 +47,7 @@
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
+#include "access/xlogutils.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
@@ -756,6 +757,12 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
XLogRecPtr flushptr;
int count;
+ XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
+ sendTimeLineIsHistoric = state->currTLI == ThisTimeLineID;
+ sendTimeLine = state->currTLI;
+ sendTimeLineValidUpto = state->currTLIValidUntil;
+ sendTimeLineNextTLI = state->nextTLI;
+
/* make sure we have enough WAL available */
flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
@@ -984,10 +991,6 @@ StartLogicalReplication(StartReplicationCmd *cmd)
pq_endmessage(&buf);
pq_flush();
- /* setup state for XLogReadPage */
- sendTimeLineIsHistoric = false;
- sendTimeLine = ThisTimeLineID;
-
/*
* Initialize position to the last ack'ed one, then the xlog records begin
* to be shipped from that position.
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index deaa7f5..8f96728 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -160,6 +160,22 @@ struct XLogReaderState
/* beginning of the WAL record being read. */
XLogRecPtr currRecPtr;
+ /* timeline to read it from, 0 if a lookup is required */
+ TimeLineID currTLI;
+ /*
+ * Safe point to read to in currTLI if current TLI is historical
+ * (tliSwitchPoint) or InvalidXLogRecPtr if on current timeline.
+ *
+ * Actually set to the start of the segment containing the timeline
+ * switch that ends currTLI's validity, not the LSN of the switch
+ * its self, since we can't assume the old segment will be present.
+ */
+ XLogRecPtr currTLIValidUntil;
+ /*
+ * If currTLI is not the most recent known timeline, the next timeline to
+ * read from when currTLIValidUntil is reached.
+ */
+ TimeLineID nextTLI;
/* Buffer for current ReadRecord result (expandable) */
char *readRecordBuf;
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index d027ea1..f0ee352 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -52,4 +52,7 @@ extern int read_local_xlog_page(XLogReaderState *state,
XLogRecPtr targetRecPtr, char *cur_page,
TimeLineID *pageTLI);
+extern void XLogReadDetermineTimeline(XLogReaderState *state,
+ XLogRecPtr wantPage, uint32 wantLength);
+
#endif
diff --git a/src/test/recovery/Makefile b/src/test/recovery/Makefile
index a847952..d2ff1e9 100644
--- a/src/test/recovery/Makefile
+++ b/src/test/recovery/Makefile
@@ -9,6 +9,8 @@
#
#-------------------------------------------------------------------------
+EXTRA_INSTALL=contrib/test_decoding
+
subdir = src/test/recovery
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
diff --git a/src/test/recovery/t/009_logical_decoding_timelines.pl b/src/test/recovery/t/009_logical_decoding_timelines.pl
new file mode 100644
index 0000000..caa395e
--- /dev/null
+++ b/src/test/recovery/t/009_logical_decoding_timelines.pl
@@ -0,0 +1,185 @@
+# Demonstrate that logical can follow timeline switches.
+#
+# Logical replication slots can follow timeline switches but it's
+# normally not possible to have a logical slot on a replica where
+# promotion and a timeline switch can occur. The only ways
+# we can create that circumstance are:
+#
+# * By doing a filesystem-level copy of the DB, since pg_basebackup
+# excludes pg_replslot but we can copy it directly; or
+#
+# * by creating a slot directly at the C level on the replica and
+# advancing it as we go using the low level APIs. It can't be done
+# from SQL since logical decoding isn't allowed on replicas.
+#
+# This module uses the first approach to show that timeline following
+# on a logical slot works.
+#
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 7;
+use RecursiveCopy;
+use File::Copy;
+use IPC::Run ();
+use Scalar::Util qw(blessed);
+
+my ($stdout, $stderr, $ret);
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1, has_archiving => 1);
+$node_master->append_conf('postgresql.conf', "wal_level = 'logical'\n");
+$node_master->append_conf('postgresql.conf', "max_replication_slots = 2\n");
+$node_master->append_conf('postgresql.conf', "max_wal_senders = 2\n");
+$node_master->append_conf('postgresql.conf', "log_min_messages = 'debug2'\n");
+$node_master->dump_info;
+$node_master->start;
+
+diag "Testing logical timeline following with a filesystem-level copy";
+
+$node_master->safe_psql('postgres',
+"SELECT pg_create_logical_replication_slot('before_basebackup', 'test_decoding');"
+);
+$node_master->safe_psql('postgres', "CREATE TABLE decoding(blah text);");
+$node_master->safe_psql('postgres',
+ "INSERT INTO decoding(blah) VALUES ('beforebb');");
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+
+my $backup_name = 'b1';
+$node_master->backup_fs_hot($backup_name);
+
+my $node_replica = get_new_node('replica');
+$node_replica->init_from_backup(
+ $node_master, $backup_name,
+ has_streaming => 1,
+ has_restoring => 1);
+$node_replica->start;
+
+$node_master->safe_psql('postgres',
+"SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');"
+);
+$node_master->safe_psql('postgres',
+ "INSERT INTO decoding(blah) VALUES ('afterbb');");
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+
+# Verify that only the before base_backup slot is on the replica
+$stdout = $node_replica->safe_psql('postgres',
+ 'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name');
+is($stdout, 'before_basebackup',
+ 'Expected to find only slot before_basebackup on replica');
+
+# Boom, crash
+$node_master->stop('immediate');
+
+$node_replica->promote;
+$node_replica->poll_query_until('postgres',
+ "SELECT NOT pg_is_in_recovery();");
+
+$node_replica->safe_psql('postgres',
+ "INSERT INTO decoding(blah) VALUES ('after failover');");
+
+# Shouldn't be able to read from slot created after base backup
+($ret, $stdout, $stderr) = $node_replica->psql('postgres',
+"SELECT data FROM pg_logical_slot_peek_changes('after_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');"
+);
+is($ret, 3, 'replaying from after_basebackup slot fails');
+like(
+ $stderr,
+ qr/replication slot "after_basebackup" does not exist/,
+ 'after_basebackup slot missing');
+
+# Should be able to read from slot created before base backup
+($ret, $stdout, $stderr) = $node_replica->psql(
+ 'postgres',
+"SELECT data FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');",
+ timeout => 30);
+is($ret, 0, 'replay from slot before_basebackup succeeds');
+
+my $final_expected_output_bb = q(BEGIN
+table public.decoding: INSERT: blah[text]:'beforebb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'afterbb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'after failover'
+COMMIT);
+is($stdout, $final_expected_output_bb, 'decoded expected data from slot before_basebackup');
+is($stderr, '', 'replay from slot before_basebackup produces no stderr');
+
+sub pg_recvlogical_upto
+{
+ my ($node, $dbname, $slot, $endpos, $plugin_options, %kwargs) = @_;
+ my ($stdout, $stderr);
+
+ my $timeout_exception = 'pg_recvlogical timed out';
+
+ my @cmd = ('pg_recvlogical', '-S', $slot, '--dbname', $node->connstr($dbname), '--endpos', $endpos, '-f', '-', '--no-loop', '--start');
+
+ while (my ($k, $v) = each %$plugin_options)
+ {
+ die "= is not permitted to appear in replication option name" if ($k =~ qr/=/);
+ push @cmd, "-o", "$k=$v";
+ }
+
+ my $timeout;
+ $timeout = IPC::Run::timeout( $kwargs{'timeout'}, exception => $timeout_exception ) if $kwargs{'timeout'};
+ my $ret = 0;
+
+ do {
+ local $@;
+ eval {
+ IPC::Run::run(\@cmd, ">", \$stdout, "2>", \$stderr, $timeout);
+ $ret = $?;
+ };
+ my $exc_save = $@;
+ if ($exc_save)
+ {
+ # IPC::Run::run threw an exception. re-throw unless it's a
+ # timeout, which we'll handle by testing is_expired
+ die $exc_save
+ if (blessed($exc_save) || $exc_save !~ qr/$timeout_exception/);
+
+ $ret = undef;
+
+ die "Got timeout exception '$exc_save' but timer not expired?!"
+ unless $timeout->is_expired;
+
+ die "$exc_save waiting for endpos $endpos with stdout '$stdout', stderr '$stderr'";
+ }
+ };
+
+ if (wantarray)
+ {
+ return ($ret, $stdout, $stderr, $timeout);
+ }
+ else
+ {
+ die "pg_recvlogical exited with code '$ret', stdout '$stdout' and stderr '$stderr'" if $ret;
+ return $stdout;
+ }
+}
+
+# So far we've peeked the slots, so when we fetch the same info over
+# pg_recvlogical we should get complete results. First, find out the commit lsn
+# of the last transaction. There's no max(pg_lsn), so:
+
+my $endpos = $node_replica->safe_psql('postgres', "SELECT location FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL) ORDER BY location DESC LIMIT 1;");
+
+diag "replaying up to $endpos";
+
+# now use the walsender protocol to peek the slot changes and make sure we see
+# the same results.
+
+$stdout = pg_recvlogical_upto($node_replica, 'postgres', 'before_basebackup',
+ $endpos, {'include-xids' => '0', 'skip-empty-xacts' => '1'}, 'timeout' => '30');
+
+# walsender likes to add a newline
+chomp($stdout);
+is($stdout, $final_expected_output_bb, 'got same output from walsender via pg_recvlogical on before_basebackup');
+
+# We don't need the standby anymore
+$node_replica->teardown_node();
--
2.5.5
On 16 November 2016 at 12:44, Craig Ringer <craig@2ndquadrant.com> wrote:
Despite that, I've attached a revised version of the current approach
incorporating fixes for the issues you mentioned. It's preceded by the
patch to add an --endpos option to pg_recvlogical so that we can
properly test the walsender interface too.
I didn't rebase the patch that made the timeline following tests use
the recvlogical support in PostgreNode.pm. Now attached.
Even if timeline following isn't accepted as-is, I'd greatly
appreciate inclusion of the first two patches as they add basic
coverage of pg_recvlogical and a helper to make using it in tests
simple and reliable.
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
Attachments:
0001-Add-an-optional-endpos-LSN-argument-to-pg_recvlogica.patchtext/x-patch; charset=UTF-8; name=0001-Add-an-optional-endpos-LSN-argument-to-pg_recvlogica.patchDownload
From b1266ad1adba8619bf43d4297d1ed6392e302198 Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Thu, 1 Sep 2016 12:37:40 +0800
Subject: [PATCH 1/3] Add an optional --endpos LSN argument to pg_recvlogical
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
pg_recvlogical usually just runs until cancelled or until the upstream
server disconnects. For some purposes, especially testing, it's useful
to have the ability to stop receive at a specified LSN without having
to parse the output and deal with buffering issues, etc.
Add a --endpos parameter that takes the LSN at which no further
messages should be written and receive should stop.
Craig Ringer, Álvaro Herrera
---
doc/src/sgml/ref/pg_recvlogical.sgml | 34 ++++++++
src/bin/pg_basebackup/pg_recvlogical.c | 145 +++++++++++++++++++++++++++++----
2 files changed, 164 insertions(+), 15 deletions(-)
diff --git a/doc/src/sgml/ref/pg_recvlogical.sgml b/doc/src/sgml/ref/pg_recvlogical.sgml
index b35881f..d066ce8 100644
--- a/doc/src/sgml/ref/pg_recvlogical.sgml
+++ b/doc/src/sgml/ref/pg_recvlogical.sgml
@@ -38,6 +38,14 @@ PostgreSQL documentation
constraints as <xref linkend="app-pgreceivexlog">, plus those for logical
replication (see <xref linkend="logicaldecoding">).
</para>
+
+ <para>
+ <command>pg_recvlogical</> has no equivalent to the logical decoding
+ SQL interface's peek and get modes. It sends replay confirmations for
+ data lazily as it receives it and on clean exit. To examine pending data on
+ a slot without consuming it, use
+ <link linkend="functions-replication"><function>pg_logical_slot_peek_changes</></>.
+ </para>
</refsect1>
<refsect1>
@@ -155,6 +163,32 @@ PostgreSQL documentation
</varlistentry>
<varlistentry>
+ <term><option>-E <replaceable>lsn</replaceable></option></term>
+ <term><option>--endpos=<replaceable>lsn</replaceable></option></term>
+ <listitem>
+ <para>
+ In <option>--start</option> mode, automatically stop replication
+ and exit with normal exit status 0 when receiving reaches the
+ specified LSN. If specified when not in <option>--start</option>
+ mode, an error is raised.
+ </para>
+
+ <para>
+ If there's a record with LSN exactly equal to <replaceable>lsn</>,
+ the record will be output.
+ </para>
+
+ <para>
+ The <option>--endpos</option> option is not aware of transaction
+ boundaries and may truncate output partway through a transaction.
+ Any partially output transaction will not be consumed and will be
+ replayed again when the slot is next read from. Individual messages
+ are never truncated.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
<term><option>--if-not-exists</option></term>
<listitem>
<para>
diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index cb5f989..c700edf 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -40,6 +40,7 @@ static int noloop = 0;
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
static int fsync_interval = 10 * 1000; /* 10 sec = default */
static XLogRecPtr startpos = InvalidXLogRecPtr;
+static XLogRecPtr endpos = InvalidXLogRecPtr;
static bool do_create_slot = false;
static bool slot_exists_ok = false;
static bool do_start_slot = false;
@@ -63,6 +64,9 @@ static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
static void usage(void);
static void StreamLogicalLog(void);
static void disconnect_and_exit(int code);
+static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
+static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
+ bool keepalive, XLogRecPtr lsn);
static void
usage(void)
@@ -81,6 +85,7 @@ usage(void)
" time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n"));
printf(_(" -I, --startpos=LSN where in an existing slot should the streaming start\n"));
+ printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n"));
printf(_(" -n, --no-loop do not loop on connection lost\n"));
printf(_(" -o, --option=NAME[=VALUE]\n"
" pass option NAME with optional value VALUE to the\n"
@@ -281,6 +286,7 @@ StreamLogicalLog(void)
int bytes_written;
int64 now;
int hdr_len;
+ XLogRecPtr cur_record_lsn = InvalidXLogRecPtr;
if (copybuf != NULL)
{
@@ -454,6 +460,7 @@ StreamLogicalLog(void)
int pos;
bool replyRequested;
XLogRecPtr walEnd;
+ bool endposReached = false;
/*
* Parse the keepalive message, enclosed in the CopyData message.
@@ -476,18 +483,32 @@ StreamLogicalLog(void)
}
replyRequested = copybuf[pos];
- /* If the server requested an immediate reply, send one. */
- if (replyRequested)
+ if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
{
- /* fsync data, so we send a recent flush pointer */
- if (!OutputFsync(now))
- goto error;
+ /*
+ * If there's nothing to read on the socket until a keepalive
+ * we know that the server has nothing to send us; and if
+ * walEnd has passed endpos, we know nothing else can have
+ * committed before endpos. So we can bail out now.
+ */
+ endposReached = true;
+ }
- now = feGetCurrentTimestamp();
- if (!sendFeedback(conn, now, true, false))
+ /* Send a reply, if necessary */
+ if (replyRequested || endposReached)
+ {
+ if (!flushAndSendFeedback(conn, &now))
goto error;
last_status = now;
}
+
+ if (endposReached)
+ {
+ prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr);
+ time_to_abort = true;
+ break;
+ }
+
continue;
}
else if (copybuf[0] != 'w')
@@ -497,7 +518,6 @@ StreamLogicalLog(void)
goto error;
}
-
/*
* Read the header of the XLogData message, enclosed in the CopyData
* message. We only need the WAL location field (dataStart), the rest
@@ -515,12 +535,23 @@ StreamLogicalLog(void)
}
/* Extract WAL location for this block */
- {
- XLogRecPtr temp = fe_recvint64(©buf[1]);
+ cur_record_lsn = fe_recvint64(©buf[1]);
- output_written_lsn = Max(temp, output_written_lsn);
+ if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
+ {
+ /*
+ * We've read past our endpoint, so prepare to go away being
+ * cautious about what happens to our output data.
+ */
+ if (!flushAndSendFeedback(conn, &now))
+ goto error;
+ prepareToTerminate(conn, endpos, false, cur_record_lsn);
+ time_to_abort = true;
+ break;
}
+ output_written_lsn = Max(cur_record_lsn, output_written_lsn);
+
bytes_left = r - hdr_len;
bytes_written = 0;
@@ -557,10 +588,29 @@ StreamLogicalLog(void)
strerror(errno));
goto error;
}
+
+ if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos)
+ {
+ /* endpos was exactly the record we just processed, we're done */
+ if (!flushAndSendFeedback(conn, &now))
+ goto error;
+ prepareToTerminate(conn, endpos, false, cur_record_lsn);
+ time_to_abort = true;
+ break;
+ }
}
res = PQgetResult(conn);
- if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ if (PQresultStatus(res) == PGRES_COPY_OUT)
+ {
+ /*
+ * We're doing a client-initiated clean exit and have sent CopyDone to
+ * the server. We've already sent replay confirmation and fsync'd so
+ * we can just clean up the connection now.
+ */
+ goto error;
+ }
+ else if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr,
_("%s: unexpected termination of replication stream: %s"),
@@ -638,6 +688,7 @@ main(int argc, char **argv)
{"password", no_argument, NULL, 'W'},
/* replication options */
{"startpos", required_argument, NULL, 'I'},
+ {"endpos", required_argument, NULL, 'E'},
{"option", required_argument, NULL, 'o'},
{"plugin", required_argument, NULL, 'P'},
{"status-interval", required_argument, NULL, 's'},
@@ -673,7 +724,7 @@ main(int argc, char **argv)
}
}
- while ((c = getopt_long(argc, argv, "f:F:nvd:h:p:U:wWI:o:P:s:S:",
+ while ((c = getopt_long(argc, argv, "f:F:nvd:h:p:U:wWI:E:o:P:s:S:",
long_options, &option_index)) != -1)
{
switch (c)
@@ -733,6 +784,16 @@ main(int argc, char **argv)
}
startpos = ((uint64) hi) << 32 | lo;
break;
+ case 'E':
+ if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
+ {
+ fprintf(stderr,
+ _("%s: could not parse end position \"%s\"\n"),
+ progname, optarg);
+ exit(1);
+ }
+ endpos = ((uint64) hi) << 32 | lo;
+ break;
case 'o':
{
char *data = pg_strdup(optarg);
@@ -857,6 +918,16 @@ main(int argc, char **argv)
exit(1);
}
+ if (endpos != InvalidXLogRecPtr && !do_start_slot)
+ {
+ fprintf(stderr,
+ _("%s: cannot use --create-slot or --drop-slot together with --endpos\n"),
+ progname);
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ progname);
+ exit(1);
+ }
+
#ifndef WIN32
pqsignal(SIGINT, sigint_handler);
pqsignal(SIGHUP, sighup_handler);
@@ -923,8 +994,8 @@ main(int argc, char **argv)
if (time_to_abort)
{
/*
- * We've been Ctrl-C'ed. That's not an error, so exit without an
- * errorcode.
+ * We've been Ctrl-C'ed or reached an exit limit condition. That's
+ * not an error, so exit without an errorcode.
*/
disconnect_and_exit(0);
}
@@ -943,3 +1014,47 @@ main(int argc, char **argv)
}
}
}
+
+/*
+ * Fsync our output data, and send a feedback message to the server. Returns
+ * true if successful, false otherwise.
+ *
+ * If successful, *now is updated to the current timestamp just before sending
+ * feedback.
+ */
+static bool
+flushAndSendFeedback(PGconn *conn, TimestampTz *now)
+{
+ /* flush data to disk, so that we send a recent flush pointer */
+ if (!OutputFsync(*now))
+ return false;
+ *now = feGetCurrentTimestamp();
+ if (!sendFeedback(conn, *now, true, false))
+ return false;
+
+ return true;
+}
+
+/*
+ * Try to inform the server about of upcoming demise, but don't wait around or
+ * retry on failure.
+ */
+static void
+prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn)
+{
+ (void) PQputCopyEnd(conn, NULL);
+ (void) PQflush(conn);
+
+ if (verbose)
+ {
+ if (keepalive)
+ fprintf(stderr, "%s: endpos %X/%X reached by keepalive\n",
+ progname,
+ (uint32) (endpos >> 32), (uint32) endpos);
+ else
+ fprintf(stderr, "%s: endpos %X/%X reached by record at %X/%X\n",
+ progname, (uint32) (endpos >> 32), (uint32) (endpos),
+ (uint32) (lsn >> 32), (uint32) lsn);
+
+ }
+}
--
2.5.5
0002-Add-a-pg_recvlogical-wrapper-to-PostgresNode.patchtext/x-patch; charset=US-ASCII; name=0002-Add-a-pg_recvlogical-wrapper-to-PostgresNode.patchDownload
From 8a4b4788940e2b72383e2e0195e7df7f1cf4061c Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Tue, 15 Nov 2016 16:06:16 +0800
Subject: [PATCH 2/3] Add a pg_recvlogical wrapper to PostgresNode
---
src/test/perl/PostgresNode.pm | 75 ++++++++++++++++++++++++++++-
src/test/recovery/t/006_logical_decoding.pl | 31 +++++++++++-
2 files changed, 104 insertions(+), 2 deletions(-)
diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm
index c1b16ca..b2e4813 100644
--- a/src/test/perl/PostgresNode.pm
+++ b/src/test/perl/PostgresNode.pm
@@ -1125,7 +1125,7 @@ sub psql
# IPC::Run::run threw an exception. re-throw unless it's a
# timeout, which we'll handle by testing is_expired
die $exc_save
- if (blessed($exc_save) || $exc_save ne $timeout_exception);
+ if (blessed($exc_save) || $exc_save !~ qr/$timeout_exception/);
$ret = undef;
@@ -1325,6 +1325,79 @@ sub run_log
TestLib::run_log(@_);
}
+=pod $node->pg_recvlogical_upto(self, dbname, slot_name, endpos, timeout_secs, ...)
+
+Invoke pg_recvlogical to read from slot_name on dbname until LSN endpos, which
+corresponds to pg_recvlogical --endpos. Gives up after timeout (if nonzero).
+
+Disallows pg_recvlogial from internally retrying on error by passing --no-loop.
+
+Plugin options are passed as additional keyword arguments.
+
+If called in scalar context, returns stdout, and die()s on timeout or nonzero return.
+
+If called in array context, returns a tuple of (retval, stdout, stderr, timeout).
+timeout is the IPC::Run::Timeout object whose is_expired method can be tested
+to check for timeout. retval is undef on timeout.
+
+=cut
+
+sub pg_recvlogical_upto
+{
+ my ($self, $dbname, $slot_name, $endpos, $timeout_secs, %plugin_options) = @_;
+ my ($stdout, $stderr);
+
+ my $timeout_exception = 'pg_recvlogical timed out';
+
+ my @cmd = ('pg_recvlogical', '-S', $slot_name, '--dbname', $self->connstr($dbname));
+ push @cmd, '--endpos', $endpos if ($endpos);
+ push @cmd, '-f', '-', '--no-loop', '--start';
+
+ while (my ($k, $v) = each %plugin_options)
+ {
+ die "= is not permitted to appear in replication option name" if ($k =~ qr/=/);
+ push @cmd, "-o", "$k=$v";
+ }
+
+ my $timeout;
+ $timeout = IPC::Run::timeout($timeout_secs, exception => $timeout_exception ) if $timeout_secs;
+ my $ret = 0;
+
+ do {
+ local $@;
+ eval {
+ IPC::Run::run(\@cmd, ">", \$stdout, "2>", \$stderr, $timeout);
+ $ret = $?;
+ };
+ my $exc_save = $@;
+ if ($exc_save)
+ {
+ # IPC::Run::run threw an exception. re-throw unless it's a
+ # timeout, which we'll handle by testing is_expired
+ die $exc_save
+ if (blessed($exc_save) || $exc_save !~ qr/$timeout_exception/);
+
+ $ret = undef;
+
+ die "Got timeout exception '$exc_save' but timer not expired?!"
+ unless $timeout->is_expired;
+
+ die "$exc_save waiting for endpos $endpos with stdout '$stdout', stderr '$stderr'"
+ unless wantarray;
+ }
+ };
+
+ if (wantarray)
+ {
+ return ($ret, $stdout, $stderr, $timeout);
+ }
+ else
+ {
+ die "pg_recvlogical exited with code '$ret', stdout '$stdout' and stderr '$stderr'" if $ret;
+ return $stdout;
+ }
+}
+
=pod
=back
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
index b80a9a9..d8cc8d3 100644
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -1,9 +1,13 @@
# Testing of logical decoding using SQL interface and/or pg_recvlogical
+#
+# Most logical decoding tests are in contrib/test_decoding. This module
+# is for work that doesn't fit well there, like where server restarts
+# are required.
use strict;
use warnings;
use PostgresNode;
use TestLib;
-use Test::More tests => 2;
+use Test::More tests => 5;
# Initialize master node
my $node_master = get_new_node('master');
@@ -36,5 +40,30 @@ $result = $node_master->safe_psql('postgres', qq[SELECT pg_logical_slot_get_chan
chomp($result);
is($result, '', 'Decoding after fast restart repeats no rows');
+# Insert some rows and verify that we get the same results from pg_recvlogical
+# and the SQL interface.
+$node_master->safe_psql('postgres', qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;]);
+
+my $expected = q{BEGIN
+table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'
+table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'
+table public.decoding_test: INSERT: x[integer]:3 y[text]:'3'
+table public.decoding_test: INSERT: x[integer]:4 y[text]:'4'
+COMMIT};
+
+my $stdout_sql = $node_master->safe_psql('postgres', qq[SELECT data FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]);
+is($stdout_sql, $expected, 'got expected output from SQL decoding session');
+
+my $endpos = $node_master->safe_psql('postgres', "SELECT location FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY location DESC LIMIT 1;");
+diag "waiting to replay $endpos";
+
+my $stdout_recv = $node_master->pg_recvlogical_upto('postgres', 'test_slot', $endpos, 10, 'include-xids' => '0', 'skip-empty-xacts' => '1');
+chomp($stdout_recv);
+is($stdout_recv, $expected, 'got same expected output from pg_recvlogical decoding session');
+
+$stdout_recv = $node_master->pg_recvlogical_upto('postgres', 'test_slot', $endpos, 10, 'include-xids' => '0', 'skip-empty-xacts' => '1');
+chomp($stdout_recv);
+is($stdout_recv, '', 'pg_recvlogical acknowledged changes, nothing pending on slot');
+
# done with the node
$node_master->stop;
--
2.5.5
0003-Follow-timeline-switches-in-logical-decoding.patchtext/x-patch; charset=US-ASCII; name=0003-Follow-timeline-switches-in-logical-decoding.patchDownload
From 161f99eb79573202c244b561821f56d1b3168158 Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Thu, 1 Sep 2016 10:16:55 +0800
Subject: [PATCH 3/3] Follow timeline switches in logical decoding
When decoding from a logical slot, it's necessary for xlog reading to
be able to read xlog from historical (i.e. not current) timelines.
Otherwise decoding fails after failover to a physical replica because
the oldest still-needed archives are in the historical timeline.
Supporting logical decoding timeline following is a pre-requisite for
logical decoding on physical standby servers. It also makes it
possible to promote a replica with logical slots to a master and
replay from those slots, allowing logical decoding applications to
follow physical failover.
Logical slots cannot actually be created on a replica without use of
the low-level C slot management APIs so this is mostly foundation work
for subsequent changes to enable logical decoding on standbys.
Tests are included to exercise the functionality using a cold disk-level copy
of the master that's started up as a replica with slots intact, but the
intended use of the functionality is with logical decoding on a standby.
Note that an earlier version of logical decoding timeline following
was committed to 9.5 as 24c5f1a103ce, 3a3b309041b0, 82c83b337202, and
f07d18b6e94d. It was then reverted by c1543a81a7a8 just after 9.5
feature freeze when issues were discovered too late to safely fix them
in the 9.5 release cycle.
The prior approach failed to consider that a record could be split
across pages that are on different segments, where the new segment
contains the start of a new timeline. In that case the old segment
might be missing or renamed with a .partial suffix.
This patch reworks the logic to be page-based and in the process
simplify how the last timeline for a segment is looked up.
---
src/backend/access/transam/xlogutils.c | 200 +++++++++++++++++++--
src/backend/replication/logical/logicalfuncs.c | 7 +-
src/backend/replication/walsender.c | 11 +-
src/include/access/xlogreader.h | 16 ++
src/include/access/xlogutils.h | 3 +
src/test/recovery/Makefile | 2 +
.../recovery/t/009_logical_decoding_timelines.pl | 130 ++++++++++++++
7 files changed, 347 insertions(+), 22 deletions(-)
create mode 100644 src/test/recovery/t/009_logical_decoding_timelines.pl
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 51a8e8d..ab15cf3 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -19,6 +19,7 @@
#include <unistd.h>
+#include "access/timeline.h"
#include "access/xlog.h"
#include "access/xlog_internal.h"
#include "access/xlogutils.h"
@@ -660,6 +661,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
/* state maintained across calls */
static int sendFile = -1;
static XLogSegNo sendSegNo = 0;
+ static TimeLineID sendTLI = 0;
static uint32 sendOff = 0;
p = buf;
@@ -675,7 +677,8 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
startoff = recptr % XLogSegSize;
/* Do we need to switch to a different xlog segment? */
- if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
+ if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo) ||
+ sendTLI != tli)
{
char path[MAXPGPATH];
@@ -702,6 +705,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
path)));
}
sendOff = 0;
+ sendTLI = tli;
}
/* Need to seek in the file? */
@@ -750,6 +754,129 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
}
/*
+ * Determine which timeline to read an xlog page from and set the
+ * XLogReaderState's currTLI to that timeline ID.
+ *
+ * It's necessary to care about timelines in xlogreader and logical decoding
+ * when we might be reading xlog generated prior to a promotion, either if
+ * we're currently a standby in recovery or if we're a promoted master reading
+ * xlogs generated by the old master before our promotion. Notably, logical
+ * decoding on a standby needs to be able to replay any remaining pending data
+ * from the old timeline when the standby or one of its upstreams being
+ * promoted.
+ *
+ * wantPage must be set to the start address of the page to read and
+ * wantLength to the amount of the page that will be read, up to
+ * XLOG_BLCKSZ. If the amount to be read isn't known, pass XLOG_BLCKSZ.
+ *
+ * We switch to an xlog segment from the new timeline eagerly when on a
+ * historical timeline, as soon as we reach the start of the xlog segment
+ * containing the timeline switch. The server copied the segment to the new
+ * timeline so all the data up to the switch point is the same, but there's no
+ * guarantee the old segment will still exist. It may have been deleted or
+ * renamed with a .partial suffix so we can't necessarily keep reading from
+ * the old TLI even though tliSwitchPoint says it's OK.
+ *
+ * We can't just check the timeline when we read a page on a different segment
+ * to the last page. We could've received a timeline switch from a cascading
+ * upstream, so the current segment ends and we have to switch to a new one.
+ * Even in the middle of reading a page we could have to dump the cached page
+ * and switch to a new TLI.
+ *
+ * Because of this, callers MAY NOT assume that currTLI is the timeline that
+ * will be in a page's xlp_tli; the page may begin on an older timeline or we
+ * might be reading from historical timeline data on a segment that's been
+ * copied to a new timeline.
+ *
+ * The caller must also make sure it doesn't read past the current replay
+ * position if executing in recovery, so it doesn't fail to notice that the
+ * current timeline became historical.
+ */
+void
+XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
+{
+ const XLogRecPtr lastReadPage = state->readSegNo * XLogSegSize + state->readOff;
+
+ Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0);
+ Assert(wantLength <= XLOG_BLCKSZ);
+ Assert(state->readLen == 0 || state->readLen <= XLOG_BLCKSZ);
+
+ /*
+ * If the desired page is currently read in and valid, we have nothing to do.
+ *
+ * The caller should've ensured that it didn't previously advance readOff
+ * past the valid limit of this timeline, so it doesn't matter if the current
+ * TLI has since become historical.
+ */
+ if (lastReadPage == wantPage &&
+ state->readLen != 0 &&
+ lastReadPage + state->readLen >= wantPage + Min(wantLength,XLOG_BLCKSZ-1))
+ return;
+
+ /*
+ * If we're reading from the current timeline, it hasn't become historical
+ * and the page we're reading is after the last page read, we can again
+ * just carry on. (Seeking backwards requires a check to make sure the older
+ * page isn't on a prior timeline).
+ */
+ if (state->currTLI == ThisTimeLineID && wantPage >= lastReadPage)
+ {
+ Assert(state->currTLIValidUntil == InvalidXLogRecPtr);
+ return;
+ }
+
+ /*
+ * If we're just reading pages from a previously validated historical
+ * timeline and the timeline we're reading from is valid until the
+ * end of the current segment we can just keep reading.
+ */
+ if (state->currTLIValidUntil != InvalidXLogRecPtr &&
+ state->currTLI != ThisTimeLineID &&
+ state->currTLI != 0 &&
+ (wantPage + wantLength) / XLogSegSize < state->currTLIValidUntil / XLogSegSize)
+ return;
+
+ /*
+ * If we reach this point we're either looking up a page for random access,
+ * the current timeline just became historical, or we're reading from a new
+ * segment containing a timeline switch. In all cases we need to determine
+ * the newest timeline on the segment.
+ *
+ * If it's the current timeline we can just keep reading from here unless
+ * we detect a timeline switch that makes the current timeline historical.
+ * If it's a historical timeline we can read all the segment on the newest
+ * timeline because it contains all the old timelines' data too. So only
+ * one switch check is required.
+ */
+ {
+ /*
+ * We need to re-read the timeline history in case it's been changed
+ * by a promotion or replay from a cascaded replica.
+ */
+ List *timelineHistory = readTimeLineHistory(ThisTimeLineID);
+
+ XLogRecPtr endOfSegment = (((wantPage / XLogSegSize) + 1) * XLogSegSize) - 1;
+
+ Assert(wantPage / XLogSegSize == endOfSegment / XLogSegSize);
+
+ /* Find the timeline of the last LSN on the segment containing wantPage. */
+ state->currTLI = tliOfPointInHistory(endOfSegment, timelineHistory);
+ state->currTLIValidUntil = tliSwitchPoint(state->currTLI, timelineHistory,
+ &state->nextTLI);
+
+ Assert(state->currTLIValidUntil == InvalidXLogRecPtr ||
+ wantPage + wantLength < state->currTLIValidUntil);
+
+ list_free_deep(timelineHistory);
+
+ elog(DEBUG3, "switched to timeline %u valid until %X/%X",
+ state->currTLI,
+ (uint32)(state->currTLIValidUntil >> 32),
+ (uint32)(state->currTLIValidUntil));
+ }
+}
+
+/*
* read_page callback for reading local xlog files
*
* Public because it would likely be very helpful for someone writing another
@@ -770,28 +897,71 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
int count;
loc = targetPagePtr + reqLen;
+
+ /* Make sure enough xlog is available... */
while (1)
{
/*
- * TODO: we're going to have to do something more intelligent about
- * timelines on standbys. Use readTimeLineHistory() and
- * tliOfPointInHistory() to get the proper LSN? For now we'll catch
- * that case earlier, but the code and TODO is left in here for when
- * that changes.
+ * Check which timeline to get the record from.
+ *
+ * We have to do it each time through the loop because if we're in
+ * recovery as a cascading standby, the current timeline might've
+ * become historical. We can't rely on RecoveryInProgress() because
+ * in a standby configuration like
+ *
+ * A => B => C
+ *
+ * if we're a logical decoding on C, and B gets promoted, our timeline
+ * will change while we remain in recovery.
*/
- if (!RecoveryInProgress())
+ XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
+
+ if (state->currTLI == ThisTimeLineID)
{
- *pageTLI = ThisTimeLineID;
- read_upto = GetFlushRecPtr();
+ /*
+ * We're reading from the current timeline so we might have to
+ * wait for the desired record to be generated (or, for a standby,
+ * received & replayed)
+ */
+ if (!RecoveryInProgress())
+ {
+ *pageTLI = ThisTimeLineID;
+ read_upto = GetFlushRecPtr();
+ }
+ else
+ read_upto = GetXLogReplayRecPtr(pageTLI);
+
+ if (loc <= read_upto)
+ break;
+
+ CHECK_FOR_INTERRUPTS();
+ pg_usleep(1000L);
}
else
- read_upto = GetXLogReplayRecPtr(pageTLI);
-
- if (loc <= read_upto)
+ {
+ /*
+ * We're on a historical timeline, so limit reading to the switch
+ * point where we moved to the next timeline.
+ *
+ * We don't need to GetFlushRecPtr or GetXLogReplayRecPtr. We know
+ * about the new timeline, so we must've received past the end of
+ * it.
+ */
+ read_upto = state->currTLIValidUntil;
+
+ /*
+ * Setting pageTLI to our wanted record's TLI is slightly wrong;
+ * the page might begin on an older timeline if it contains a
+ * timeline switch, since its xlog segment will have been copied
+ * from the prior timeline. This is pretty harmless though, as
+ * nothing cares so long as the timeline doesn't go backwards. We
+ * should read the page header instead; FIXME someday.
+ */
+ *pageTLI = state->currTLI;
+
+ /* No need to wait on a historical timeline */
break;
-
- CHECK_FOR_INTERRUPTS();
- pg_usleep(1000L);
+ }
}
if (targetPagePtr + XLOG_BLCKSZ <= read_upto)
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 318726e..a8f7b76 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -234,13 +234,13 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
rsinfo->setResult = p->tupstore;
rsinfo->setDesc = p->tupdesc;
+ ReplicationSlotAcquire(NameStr(*name));
+
/* compute the current end-of-wal */
if (!RecoveryInProgress())
end_of_wal = GetFlushRecPtr();
else
- end_of_wal = GetXLogReplayRecPtr(NULL);
-
- ReplicationSlotAcquire(NameStr(*name));
+ end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID);
PG_TRY();
{
@@ -279,6 +279,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
/* invalidate non-timetravel entries */
InvalidateSystemCaches();
+ /* Decode until we run out of records */
while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) ||
(ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < end_of_wal))
{
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index bc5e508..ef8ba80 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -47,6 +47,7 @@
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
+#include "access/xlogutils.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
@@ -756,6 +757,12 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
XLogRecPtr flushptr;
int count;
+ XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
+ sendTimeLineIsHistoric = state->currTLI == ThisTimeLineID;
+ sendTimeLine = state->currTLI;
+ sendTimeLineValidUpto = state->currTLIValidUntil;
+ sendTimeLineNextTLI = state->nextTLI;
+
/* make sure we have enough WAL available */
flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
@@ -984,10 +991,6 @@ StartLogicalReplication(StartReplicationCmd *cmd)
pq_endmessage(&buf);
pq_flush();
- /* setup state for XLogReadPage */
- sendTimeLineIsHistoric = false;
- sendTimeLine = ThisTimeLineID;
-
/*
* Initialize position to the last ack'ed one, then the xlog records begin
* to be shipped from that position.
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index deaa7f5..8f96728 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -160,6 +160,22 @@ struct XLogReaderState
/* beginning of the WAL record being read. */
XLogRecPtr currRecPtr;
+ /* timeline to read it from, 0 if a lookup is required */
+ TimeLineID currTLI;
+ /*
+ * Safe point to read to in currTLI if current TLI is historical
+ * (tliSwitchPoint) or InvalidXLogRecPtr if on current timeline.
+ *
+ * Actually set to the start of the segment containing the timeline
+ * switch that ends currTLI's validity, not the LSN of the switch
+ * its self, since we can't assume the old segment will be present.
+ */
+ XLogRecPtr currTLIValidUntil;
+ /*
+ * If currTLI is not the most recent known timeline, the next timeline to
+ * read from when currTLIValidUntil is reached.
+ */
+ TimeLineID nextTLI;
/* Buffer for current ReadRecord result (expandable) */
char *readRecordBuf;
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index d027ea1..f0ee352 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -52,4 +52,7 @@ extern int read_local_xlog_page(XLogReaderState *state,
XLogRecPtr targetRecPtr, char *cur_page,
TimeLineID *pageTLI);
+extern void XLogReadDetermineTimeline(XLogReaderState *state,
+ XLogRecPtr wantPage, uint32 wantLength);
+
#endif
diff --git a/src/test/recovery/Makefile b/src/test/recovery/Makefile
index a847952..d2ff1e9 100644
--- a/src/test/recovery/Makefile
+++ b/src/test/recovery/Makefile
@@ -9,6 +9,8 @@
#
#-------------------------------------------------------------------------
+EXTRA_INSTALL=contrib/test_decoding
+
subdir = src/test/recovery
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
diff --git a/src/test/recovery/t/009_logical_decoding_timelines.pl b/src/test/recovery/t/009_logical_decoding_timelines.pl
new file mode 100644
index 0000000..09830dc
--- /dev/null
+++ b/src/test/recovery/t/009_logical_decoding_timelines.pl
@@ -0,0 +1,130 @@
+# Demonstrate that logical can follow timeline switches.
+#
+# Logical replication slots can follow timeline switches but it's
+# normally not possible to have a logical slot on a replica where
+# promotion and a timeline switch can occur. The only ways
+# we can create that circumstance are:
+#
+# * By doing a filesystem-level copy of the DB, since pg_basebackup
+# excludes pg_replslot but we can copy it directly; or
+#
+# * by creating a slot directly at the C level on the replica and
+# advancing it as we go using the low level APIs. It can't be done
+# from SQL since logical decoding isn't allowed on replicas.
+#
+# This module uses the first approach to show that timeline following
+# on a logical slot works.
+#
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 7;
+use RecursiveCopy;
+use File::Copy;
+use IPC::Run ();
+use Scalar::Util qw(blessed);
+
+my ($stdout, $stderr, $ret);
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1, has_archiving => 1);
+$node_master->append_conf('postgresql.conf', "wal_level = 'logical'\n");
+$node_master->append_conf('postgresql.conf', "max_replication_slots = 2\n");
+$node_master->append_conf('postgresql.conf', "max_wal_senders = 2\n");
+$node_master->append_conf('postgresql.conf', "log_min_messages = 'debug2'\n");
+$node_master->dump_info;
+$node_master->start;
+
+diag "Testing logical timeline following with a filesystem-level copy";
+
+$node_master->safe_psql('postgres',
+"SELECT pg_create_logical_replication_slot('before_basebackup', 'test_decoding');"
+);
+$node_master->safe_psql('postgres', "CREATE TABLE decoding(blah text);");
+$node_master->safe_psql('postgres',
+ "INSERT INTO decoding(blah) VALUES ('beforebb');");
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+
+my $backup_name = 'b1';
+$node_master->backup_fs_hot($backup_name);
+
+my $node_replica = get_new_node('replica');
+$node_replica->init_from_backup(
+ $node_master, $backup_name,
+ has_streaming => 1,
+ has_restoring => 1);
+$node_replica->start;
+
+$node_master->safe_psql('postgres',
+"SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');"
+);
+$node_master->safe_psql('postgres',
+ "INSERT INTO decoding(blah) VALUES ('afterbb');");
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+
+# Verify that only the before base_backup slot is on the replica
+$stdout = $node_replica->safe_psql('postgres',
+ 'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name');
+is($stdout, 'before_basebackup',
+ 'Expected to find only slot before_basebackup on replica');
+
+# Boom, crash
+$node_master->stop('immediate');
+
+$node_replica->promote;
+$node_replica->poll_query_until('postgres',
+ "SELECT NOT pg_is_in_recovery();");
+
+$node_replica->safe_psql('postgres',
+ "INSERT INTO decoding(blah) VALUES ('after failover');");
+
+# Shouldn't be able to read from slot created after base backup
+($ret, $stdout, $stderr) = $node_replica->psql('postgres',
+"SELECT data FROM pg_logical_slot_peek_changes('after_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');"
+);
+is($ret, 3, 'replaying from after_basebackup slot fails');
+like(
+ $stderr,
+ qr/replication slot "after_basebackup" does not exist/,
+ 'after_basebackup slot missing');
+
+# Should be able to read from slot created before base backup
+($ret, $stdout, $stderr) = $node_replica->psql(
+ 'postgres',
+"SELECT data FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');",
+ timeout => 30);
+is($ret, 0, 'replay from slot before_basebackup succeeds');
+
+my $final_expected_output_bb = q(BEGIN
+table public.decoding: INSERT: blah[text]:'beforebb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'afterbb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'after failover'
+COMMIT);
+is($stdout, $final_expected_output_bb, 'decoded expected data from slot before_basebackup');
+is($stderr, '', 'replay from slot before_basebackup produces no stderr');
+
+# So far we've peeked the slots, so when we fetch the same info over
+# pg_recvlogical we should get complete results. First, find out the commit lsn
+# of the last transaction. There's no max(pg_lsn), so:
+
+my $endpos = $node_replica->safe_psql('postgres', "SELECT location FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL) ORDER BY location DESC LIMIT 1;");
+
+# now use the walsender protocol to peek the slot changes and make sure we see
+# the same results.
+
+$stdout = $node_replica->pg_recvlogical_upto('postgres', 'before_basebackup',
+ $endpos, 30, 'include-xids' => '0', 'skip-empty-xacts' => '1');
+
+# walsender likes to add a newline
+chomp($stdout);
+is($stdout, $final_expected_output_bb, 'got same output from walsender via pg_recvlogical on before_basebackup');
+
+# We don't need the standby anymore
+$node_replica->teardown_node();
--
2.5.5
It's unlikely this will get in as a standalone patch, so I'm closing
the CF entry for it as RwF
https://commitfest.postgresql.org/11/779/
It is now being tracked as part of logical decoding on standby at
https://commitfest.postgresql.org/12/788/
in thread "Logical decoding on standby", which began as
/messages/by-id/CAMsr+YFi-LV7S8ehnwUiZnb=1h_14PwQ25d-vyUNq-f5S5r=Zg@mail.gmail.com
.
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers