Replication Node Identifiers and crashsafe Apply Progress
Hello,
As you know, the reason we are working changeset extraction is that we
want to build logical unidirection and bidirectional replication
ontop. To use changeset extraction effectively, I think one set of
related features ontop is very useful:
When extracting changes using the changeset extraction patchset (latest
version at [1]http://archives.postgresql.org/message-id/20131114134647.GA26172%40alap2.anarazel.de) the START_LOGICAL_REPLICATION command is used to stream
changes from a source system. When started it will continue to send
changes as long as the connection is up or it is aborted. For obvious
performance reasons it will *not* wait for an ACK for each transaction
commit it streams out.
Instead it relies on the receiver, exactly as in physical replication,
sending feedback messages containing the LSN up to which data has safely
been received.
That means frequently something like:
walsender: => COMMIT 0/10000000
walsender: => COMMIT 0/10000200
walsender: => COMMIT 0/10000400
walsender: => COMMIT 0/10000600
receiver: <= ACKNOWLEDGE 0/10000270
walsender: => COMMIT 0/10000800
is possible and important for performance. I.e. the server has streamed
out more changes than it got confirmation for.
So, when the the replication connection goes down, e.g. because the
receiving side has crashed, we need to tell the server from where to
start. Every position between the last ACKed and the end of WAL is
legal.
The receiver then can ask the source to start replication from the last
replayed commit using START_LOGICAL_REPLICATION 'slot_name'
'0/10000600' which would then re-stream all the changes in the
transaction that committe at 0/10000600 and all that follow.
But for that the receiving side needs to know up to where changes have
been applied. One relatively easy solution for that is that the
receiving side does something like:
UPDATE replication_progress SET lsn = '0/10000600' WHERE source_id = ...;
before the end of every replayed transaction. But that obviously will
quickly cause bloat.
Our solution to that is that a replaying process can tell the backend
that it is currently doing so and setup three variables for every
transaction:
1) an identifier for the the source database
2) the LSN at which the replayed transaction has committed remotely
3) the time at which the replayed transaction has committed remotely
When the transaction then commits the commit record will set the
XACT_CONTAINS_ORIGIN flag to ->xinfo and will add that data to the end
of the commit record. During crash recovery the startup process will
remember the newest LSN for each remote database in shared memory.
This way, after a crash, restart, disconnect the replay process can look
into shared memory and check how far it has already replayed and restart
seamlessly. With minimal effort.
We previously discussed the topic and some were very adverse to using
any sort of numeric node identifiers across systems and suggested that
those should only be used internally. So what the attached patch does is
to add a new shared system catalog called 'pg_replication_identifier'
(suggestions for a better name welcome) which translates a number of
identifying traits into a numeric identifier.
The set of identifiers currently are:
* the sysid of the remote system, combined with the remote TLI
* the oid of the local database
* the oid of the remote database
* an optional name
but that's just what we needed in our multimaster prototype, and not
what I necessarily think is correct.
The added API (which surely need some work, I am not particularly happy
with the naming of functions for one) basically consists of two parts:
1) functions to query/create replication identifiers:
* GetReplicationIdentifier(identifying traits) - search for a numeric replication identifier
* CreateReplicationIdentifier(identifying traits) - creates a numeric replication identifier
* GetReplicationInfoByIdentifier(numeric identifier) - returns identifying traits
2) functions to query/manipulate replication progress:
* AdvanceReplicationIdentifier(node, local_lsn, remote_lsn)
* XLogRecPtr RemoteCommitFromReplicationIdentifier(node)
Internally the code also maintains some on-disk data which is updated
during checkpoints to store the replication progress, otherwise it'd
vanish if we shutdown gracefully ;).
The attached code also integrates with the "commit timestamp" module
that Alvaro submitted ([2]http://archives.postgresql.org/message-id/20131022221600.GE4987%40eldon.alvh.no-ip.org). Everytime a remote transaction is committed
we store a) the remote commit's timestamp, b) the origin node id in it.
That allows to relatively easily build multimaster systems with conflict
resolution ontop, since whenever there's a conflict the originating
node, and originating commit timestamp for a row can be queried
efficiently.
Having information about the origin of a change/transaction allows to
implement complex replication topologies since the information is
available to changeset extration output plugins.
It allows to do write plugins that:
* decode all changes, independent from the system they were originally
executed on by the user
* decode changes generated locally, but none from remote systems
* pick and choose between those, say only decode those the receiving
system isn't replicating from itself
Questions are:
* Which identifying traits do we want to use to identify nodes?
* How do we want to manipulate replication identifiers? Currently they
can only be manipulated by using C functions, which is fine for some users,
but probably not for others?
* Do we want to allow setting (remote_lsn, remote_timestamp,
remote_node_id) via SQL? Currently the remote_node_id can be set as a
GUC, but the other's can't. They probably should be a function that
can be called instead of GUCs?
* Suggestions for better names!
* Would slony et al need something ontop to use this?
Todo:
* cleanup/naming
* Set returning function to see the replication progress
* remove old checkpoint files
Note that this only applies a) ontop the changeset extraction code b)
the commit timestamp code. The 'replication-identifiers' git branch
([3]http://git.postgresql.org/gitweb/?p=users/andresfreund/postgres.git;a=shortlog;h=refs/heads/replication-identifiers -- Andres Freund http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Training & Services) contains all integrated together.
Comments welcome!
Greetings,
Andres Freund
[1]: http://archives.postgresql.org/message-id/20131114134647.GA26172%40alap2.anarazel.de
[2]: http://archives.postgresql.org/message-id/20131022221600.GE4987%40eldon.alvh.no-ip.org
[3]: http://git.postgresql.org/gitweb/?p=users/andresfreund/postgres.git;a=shortlog;h=refs/heads/replication-identifiers -- Andres Freund http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Training & Services
--
Andres Freund http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
Attachments:
0001-Introduce-replication_identifiers-to-keep-track-of-r.patchtext/x-patch; charset=us-asciiDownload
>From 83b6821ceb9be613f2bb377f3a1d9dc459b3b4b4 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Fri, 22 Feb 2013 17:43:27 +0100
Subject: [PATCH] Introduce "replication_identifiers" to keep track of remote
nodes.
Replication identifiers can be used to track & lookup remote nodes identified
via (sysid, tlid, remote_dbid, local_dbid, name) and map that tuple to a local
uint16.
Keyed by that replication identifier the progress of replication from
that system is tracked in a crashsafe manner.
Support for tracking that via output plugins is added as well.
Needs a catversion bump.
---
src/backend/access/rmgrdesc/xactdesc.c | 16 +-
src/backend/access/transam/xact.c | 71 ++-
src/backend/access/transam/xlog.c | 10 +
src/backend/catalog/Makefile | 2 +-
src/backend/catalog/catalog.c | 8 +-
src/backend/replication/logical/Makefile | 3 +-
src/backend/replication/logical/decode.c | 6 +-
src/backend/replication/logical/logical.c | 9 +-
src/backend/replication/logical/reorderbuffer.c | 3 +-
.../replication/logical/replication_identifier.c | 647 +++++++++++++++++++++
src/backend/storage/ipc/ipci.c | 3 +
src/backend/utils/cache/syscache.c | 23 +
src/backend/utils/misc/guc.c | 47 +-
src/backend/utils/misc/postgresql.conf.sample | 2 +-
src/bin/initdb/initdb.c | 1 +
src/bin/pg_resetxlog/pg_resetxlog.c | 2 +
src/include/access/xact.h | 9 +-
src/include/access/xlog.h | 2 +
src/include/access/xlogdefs.h | 6 +
src/include/catalog/indexing.h | 6 +
src/include/catalog/pg_replication_identifier.h | 92 +++
src/include/replication/logical.h | 7 +
src/include/replication/reorderbuffer.h | 10 +-
src/include/replication/replication_identifier.h | 42 ++
src/include/utils/syscache.h | 2 +
src/test/regress/expected/sanity_check.out | 1 +
26 files changed, 1006 insertions(+), 24 deletions(-)
create mode 100644 src/backend/replication/logical/replication_identifier.c
create mode 100644 src/include/catalog/pg_replication_identifier.h
create mode 100644 src/include/replication/replication_identifier.h
diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index 2caf5a0..2fa7e78 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -26,9 +26,12 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
{
int i;
TransactionId *subxacts;
+ SharedInvalidationMessage *msgs;
subxacts = (TransactionId *) &xlrec->xnodes[xlrec->nrels];
+ msgs = (SharedInvalidationMessage *) &subxacts[xlrec->nsubxacts];
+
appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
if (xlrec->nrels > 0)
@@ -50,9 +53,6 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
}
if (xlrec->nmsgs > 0)
{
- SharedInvalidationMessage *msgs;
-
- msgs = (SharedInvalidationMessage *) &subxacts[xlrec->nsubxacts];
if (XactCompletionRelcacheInitFileInval(xlrec->xinfo))
appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u",
@@ -81,6 +81,16 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
appendStringInfo(buf, " unknown id %d", msg->id);
}
}
+ if (xlrec->xinfo & XACT_CONTAINS_ORIGIN)
+ {
+ xl_xact_origin *origin = (xl_xact_origin *) &(msgs[xlrec->nmsgs]);
+
+ appendStringInfo(buf, " origin %u, lsn %X/%X",
+ origin->origin_node_id,
+ (uint32)(origin->origin_lsn >> 32),
+ (uint32)origin->origin_lsn);
+ }
+
}
static void
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index ef9d42d..c05526d 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -37,8 +37,10 @@
#include "libpq/be-fsstubs.h"
#include "miscadmin.h"
#include "pgstat.h"
+#include "replication/logical.h"
#include "replication/walsender.h"
#include "replication/syncrep.h"
+#include "replication/replication_identifier.h"
#include "storage/fd.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
@@ -1072,11 +1074,13 @@ RecordTransactionCommit(void)
/*
* Do we need the long commit record? If not, use the compact format.
*/
- if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit)
+ if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval ||
+ forceSyncCommit || guc_replication_origin_id != InvalidRepNodeId)
{
- XLogRecData rdata[4];
+ XLogRecData rdata[5];
int lastrdata = 0;
xl_xact_commit xlrec;
+ xl_xact_origin origin;
/*
* Set flags required for recovery processing of commits.
@@ -1124,6 +1128,21 @@ RecordTransactionCommit(void)
rdata[3].buffer = InvalidBuffer;
lastrdata = 3;
}
+ /* dump transaction origin information */
+ if (guc_replication_origin_id != InvalidRepNodeId)
+ {
+ Assert(replication_origin_lsn != InvalidXLogRecPtr);
+ xlrec.xinfo |= XACT_CONTAINS_ORIGIN;
+ origin.origin_node_id = guc_replication_origin_id;
+ origin.origin_lsn = replication_origin_lsn;
+ origin.origin_timestamp = replication_origin_timestamp;
+
+ rdata[lastrdata].next = &(rdata[4]);
+ rdata[4].data = (char *) &origin;
+ rdata[4].len = sizeof(xl_xact_origin);
+ rdata[4].buffer = InvalidBuffer;
+ lastrdata = 4;
+ }
rdata[lastrdata].next = NULL;
(void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, rdata);
@@ -1154,13 +1173,19 @@ RecordTransactionCommit(void)
}
}
+ /* record plain commit ts if not replaying remote actions */
+ if (guc_replication_origin_id == InvalidRepNodeId)
+ replication_origin_timestamp = xactStopTimestamp;
+
/*
* We don't need to log the commit timestamp separately since the commit
* record logged above has all the necessary action to set the timestamp
* again.
*/
TransactionTreeSetCommitTimestamp(xid, nchildren, children,
- xactStopTimestamp, 0, false);
+ replication_origin_timestamp,
+ guc_replication_origin_id,
+ false);
/*
* Check if we want to commit asynchronously. We can allow the XLOG flush
@@ -1242,9 +1267,11 @@ RecordTransactionCommit(void)
if (wrote_xlog)
SyncRepWaitForLSN(XactLastRecEnd);
+ /* remember end of last commit record */
+ XactLastCommitEnd = XactLastRecEnd;
+
/* Reset XactLastRecEnd until the next transaction writes something */
XactLastRecEnd = 0;
-
cleanup:
/* Clean up local data */
if (rels)
@@ -4614,10 +4641,12 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
SharedInvalidationMessage *inval_msgs, int nmsgs,
RelFileNode *xnodes, int nrels,
Oid dbId, Oid tsId,
- uint32 xinfo)
+ uint32 xinfo,
+ xl_xact_origin *origin)
{
TransactionId max_xid;
int i;
+ RepNodeId origin_node_id = InvalidRepNodeId;
max_xid = TransactionIdLatest(xid, nsubxacts, sub_xids);
@@ -4637,9 +4666,26 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
LWLockRelease(XidGenLock);
}
+ Assert(!!(xinfo & XACT_CONTAINS_ORIGIN) == (origin != NULL));
+
+ if (xinfo & XACT_CONTAINS_ORIGIN)
+ {
+ origin_node_id = origin->origin_node_id;
+ commit_time = origin->origin_timestamp;
+ }
+
/* Set the transaction commit time */
TransactionTreeSetCommitTimestamp(xid, nsubxacts, sub_xids,
- commit_time, 0, false);
+ commit_time,
+ origin_node_id, false);
+
+ if (xinfo & XACT_CONTAINS_ORIGIN)
+ {
+ /* recover apply progress */
+ AdvanceReplicationIdentifier(origin->origin_node_id,
+ origin->origin_lsn,
+ lsn);
+ }
if (standbyState == STANDBY_DISABLED)
{
@@ -4754,19 +4800,25 @@ xact_redo_commit(xl_xact_commit *xlrec,
{
TransactionId *subxacts;
SharedInvalidationMessage *inval_msgs;
-
+ xl_xact_origin *origin = NULL;
/* subxid array follows relfilenodes */
subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
/* invalidation messages array follows subxids */
inval_msgs = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
+ if (xlrec->xinfo & XACT_CONTAINS_ORIGIN)
+ {
+ origin = (xl_xact_origin *) &(inval_msgs[xlrec->nmsgs]);
+ }
+
xact_redo_commit_internal(xid, lsn, xlrec->xact_time,
subxacts, xlrec->nsubxacts,
inval_msgs, xlrec->nmsgs,
xlrec->xnodes, xlrec->nrels,
xlrec->dbId,
xlrec->tsId,
- xlrec->xinfo);
+ xlrec->xinfo,
+ origin);
}
/*
@@ -4782,7 +4834,8 @@ xact_redo_commit_compact(xl_xact_commit_compact *xlrec,
NULL, 0, /* relfilenodes */
InvalidOid, /* dbId */
InvalidOid, /* tsId */
- 0); /* xinfo */
+ 0, /* xinfo */
+ NULL /* origin */);
}
/*
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 585fe67..c900940 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -42,6 +42,7 @@
#include "postmaster/bgwriter.h"
#include "postmaster/startup.h"
#include "replication/logical.h"
+#include "replication/replication_identifier.h"
#include "replication/snapbuild.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
@@ -280,6 +281,8 @@ static XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr;
XLogRecPtr XactLastRecEnd = InvalidXLogRecPtr;
+XLogRecPtr XactLastCommitEnd = InvalidXLogRecPtr;
+
/*
* RedoRecPtr is this backend's local copy of the REDO record pointer
* (which is almost but not quite the same as a pointer to the most recent
@@ -1055,6 +1058,7 @@ begin:;
rechdr->xl_len = len; /* doesn't include backup blocks */
rechdr->xl_info = info;
rechdr->xl_rmid = rmid;
+ rechdr->xl_origin_id = guc_replication_origin_id;
rechdr->xl_prev = InvalidXLogRecPtr;
COMP_CRC32(rdata_crc, ((char *) rechdr), offsetof(XLogRecord, xl_prev));
@@ -6353,6 +6357,11 @@ StartupXLOG(void)
StartupLogicalDecoding(checkPoint.redo);
/*
+ * Recover knowledge about replay progress of known replication partners.
+ */
+ StartupReplicationIdentifier(checkPoint.redo);
+
+ /*
* Initialize unlogged LSN. On a clean shutdown, it's restored from the
* control file. On recovery, all unlogged relations are blown away, so
* the unlogged LSN counter can be reset too.
@@ -8428,6 +8437,7 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
CheckPointSnapBuild();
CheckpointLogicalRewriteHeap();
CheckPointBuffers(flags); /* performs all required fsyncs */
+ CheckPointReplicationIdentifier(checkPointRedo);
/* We deliberately delay 2PC checkpointing as long as possible */
CheckPointTwoPhase(checkPointRedo);
}
diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile
index a974bd5..3a20672 100644
--- a/src/backend/catalog/Makefile
+++ b/src/backend/catalog/Makefile
@@ -39,7 +39,7 @@ POSTGRES_BKI_SRCS = $(addprefix $(top_srcdir)/src/include/catalog/,\
pg_ts_config.h pg_ts_config_map.h pg_ts_dict.h \
pg_ts_parser.h pg_ts_template.h pg_extension.h \
pg_foreign_data_wrapper.h pg_foreign_server.h pg_user_mapping.h \
- pg_foreign_table.h \
+ pg_foreign_table.h pg_replication_identifier.h \
pg_default_acl.h pg_seclabel.h pg_shseclabel.h pg_collation.h pg_range.h \
toasting.h indexing.h \
)
diff --git a/src/backend/catalog/catalog.c b/src/backend/catalog/catalog.c
index 885ba27..d9ecf8a 100644
--- a/src/backend/catalog/catalog.c
+++ b/src/backend/catalog/catalog.c
@@ -32,6 +32,7 @@
#include "catalog/pg_namespace.h"
#include "catalog/pg_pltemplate.h"
#include "catalog/pg_db_role_setting.h"
+#include "catalog/pg_replication_identifier.h"
#include "catalog/pg_shdepend.h"
#include "catalog/pg_shdescription.h"
#include "catalog/pg_shseclabel.h"
@@ -278,7 +279,8 @@ IsSharedRelation(Oid relationId)
relationId == SharedDependRelationId ||
relationId == SharedSecLabelRelationId ||
relationId == TableSpaceRelationId ||
- relationId == DbRoleSettingRelationId)
+ relationId == DbRoleSettingRelationId ||
+ relationId == ReplicationIdentifierRelationId)
return true;
/* These are their indexes (see indexing.h) */
if (relationId == AuthIdRolnameIndexId ||
@@ -294,7 +296,9 @@ IsSharedRelation(Oid relationId)
relationId == SharedSecLabelObjectIndexId ||
relationId == TablespaceOidIndexId ||
relationId == TablespaceNameIndexId ||
- relationId == DbRoleSettingDatidRolidIndexId)
+ relationId == DbRoleSettingDatidRolidIndexId ||
+ relationId == ReplicationLocalIdIndex ||
+ relationId == ReplicationRemoteIndex)
return true;
/* These are their toast tables and toast indexes (see toasting.h) */
if (relationId == PgShdescriptionToastTable ||
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index 6fae278..f24dbbe 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -14,7 +14,8 @@ include $(top_builddir)/src/Makefile.global
override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
-OBJS = decode.o logical.o logicalfuncs.o reorderbuffer.o snapbuild.o
+OBJS = decode.o logical.o logicalfuncs.o reorderbuffer.o replication_identifier.o \
+ snapbuild.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 655feca..50ccef9 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -496,7 +496,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
/* replay actions of all transaction + subtransactions in order */
ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
- commit_time);
+ commit_time, buf->record.xl_origin_id);
}
/*
@@ -540,6 +540,7 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
change = ReorderBufferGetChange(ctx->reorder);
change->action = REORDER_BUFFER_CHANGE_INSERT;
+ change->origin_id = r->xl_origin_id;
memcpy(&change->tp.relnode, &xlrec->target.node, sizeof(RelFileNode));
if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
@@ -580,6 +581,7 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
change = ReorderBufferGetChange(ctx->reorder);
change->action = REORDER_BUFFER_CHANGE_UPDATE;
+ change->origin_id = r->xl_origin_id;
memcpy(&change->tp.relnode, &xlrec->target.node, sizeof(RelFileNode));
data = (char *) &xlhdr->header;
@@ -634,6 +636,7 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
change = ReorderBufferGetChange(ctx->reorder);
change->action = REORDER_BUFFER_CHANGE_DELETE;
+ change->origin_id = r->xl_origin_id;
memcpy(&change->tp.relnode, &xlrec->target.node, sizeof(RelFileNode));
@@ -689,6 +692,7 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
change = ReorderBufferGetChange(ctx->reorder);
change->action = REORDER_BUFFER_CHANGE_INSERT;
+ change->origin_id = r->xl_origin_id;
memcpy(&change->tp.relnode, &xlrec->node, sizeof(RelFileNode));
/*
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 10041cd..6a028c1 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -68,8 +68,12 @@ LogicalDecodingCtlData *LogicalDecodingCtl = NULL;
/* My slot for logical rep in the shared memory array */
LogicalDecodingSlot *MyLogicalDecodingSlot = NULL;
-/* user settable parameters */
int max_logical_slots = 0; /* the maximum number of logical slots */
+RepNodeId guc_replication_node_id = InvalidRepNodeId; /* local node id */
+RepNodeId guc_replication_origin_id = InvalidRepNodeId; /* assumed identity */
+
+XLogRecPtr replication_origin_lsn;
+TimestampTz replication_origin_timestamp;
static void LogicalSlotKill(int code, Datum arg);
@@ -809,7 +813,8 @@ StartupLogicalDecoding(XLogRecPtr checkPointRedo)
/* one of our own directories */
if (strcmp(logical_de->d_name, "snapshots") == 0 ||
- strcmp(logical_de->d_name, "mappings") == 0)
+ strcmp(logical_de->d_name, "mappings") == 0 ||
+ strcmp(logical_de->d_name, "checkpoints") == 0)
continue;
/* we crashed while a slot was being setup or deleted, clean up */
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 05a5020..979fa23 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1261,7 +1261,7 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
void
ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
- TimestampTz commit_time)
+ TimestampTz commit_time, RepNodeId origin)
{
ReorderBufferTXN *txn;
ReorderBufferIterTXNState *iterstate = NULL;
@@ -1282,6 +1282,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
txn->final_lsn = commit_lsn;
txn->end_lsn = end_lsn;
txn->commit_time = commit_time;
+ txn->origin_id = origin;
/* serialize the last bunch of changes if we need start earlier anyway */
if (txn->nentries_mem != txn->nentries)
diff --git a/src/backend/replication/logical/replication_identifier.c b/src/backend/replication/logical/replication_identifier.c
new file mode 100644
index 0000000..d455935
--- /dev/null
+++ b/src/backend/replication/logical/replication_identifier.c
@@ -0,0 +1,647 @@
+/*-------------------------------------------------------------------------
+ *
+ * replication_identifier.c
+ * Logical Replication Node Identifier and progress persistency support.
+ *
+ * Copyright (c) 2013, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/replication/logical/replication_identifier.c
+ *
+ */
+
+#include "postgres.h"
+
+#include <unistd.h>
+
+#include "access/heapam.h"
+#include "access/htup_details.h"
+#include "access/xact.h"
+#include "catalog/indexing.h"
+#include "replication/replication_identifier.h"
+#include "replication/logical.h"
+#include "storage/fd.h"
+#include "storage/copydir.h"
+#include "utils/syscache.h"
+#include "utils/rel.h"
+
+/*
+ * Replay progress of a single remote node.
+ */
+typedef struct ReplicationState
+{
+ /*
+ * Local identifier for the remote node.
+ */
+ RepNodeId local_identifier;
+
+ /*
+ * Location of the latest commit from the remote side.
+ */
+ XLogRecPtr remote_lsn;
+
+ /*
+ * Remember the local lsn of the commit record so we can XLogFlush() to it
+ * during a checkpoint so we know the commit record actually is safe on
+ * disk.
+ */
+ XLogRecPtr local_lsn;
+} ReplicationState;
+
+/*
+ * Base address into a shared memory array of replication states of size
+ * max_logical_slots.
+ * XXX: Should we use a separate variable to size this than max_logical_slots?
+ */
+static ReplicationState *ReplicationStates;
+
+/*
+ * Backend-local, cached element from ReplicationStates for use in a backend
+ * replaying remote commits, so we don't have to search ReplicationStates for
+ * the backends current RepNodeId.
+ */
+static ReplicationState *local_replication_state = NULL;
+
+/* Magic for on disk files. */
+#define REPLICATION_STATE_MAGIC (uint32)0x1257DADE
+
+/* XXX: move to c.h? */
+#ifndef UINT16_MAX
+#define UINT16_MAX ((1<<16) - 1)
+#else
+#if UINT16_MAX != ((1<<16) - 1)
+#error "uh, wrong UINT16_MAX?"
+#endif
+#endif
+
+/*
+ * Check for a persistent repication identifier identified by remotesysid,
+ * remotetli, remotedb, riname, rilocaldb.
+ *
+ * Returns InvalidOid if the node isn't known yet.
+ */
+RepNodeId
+GetReplicationIdentifier(uint64 remotesysid, Oid remotetli, Oid remotedb,
+ Name riname, Oid rilocaldb)
+{
+ Oid riident = InvalidOid;
+ HeapTuple tuple;
+ Form_pg_replication_identifier ident;
+ NameData sysid;
+
+ sprintf(NameStr(sysid), UINT64_FORMAT "-%u", remotesysid, remotetli);
+
+ tuple = SearchSysCache4(REPLIDREMOTE,
+ NameGetDatum(&sysid),
+ ObjectIdGetDatum(remotedb),
+ ObjectIdGetDatum(rilocaldb),
+ NameGetDatum(riname));
+ if (HeapTupleIsValid(tuple))
+ {
+ ident = (Form_pg_replication_identifier)GETSTRUCT(tuple);
+ riident = ident->riident;
+ ReleaseSysCache(tuple);
+ }
+ return riident;
+}
+
+/*
+ * Create a persistent replication identifier.
+ *
+ * Needs to be called in a transaction.
+ */
+RepNodeId
+CreateReplicationIdentifier(uint64 remotesysid, Oid remotetli, Oid remotedb,
+ Name riname, Oid rilocaldb)
+{
+ Oid riident;
+ HeapTuple tuple = NULL;
+ NameData sysid;
+ Relation rel;
+
+ Assert(IsTransactionState());
+
+ sprintf(NameStr(sysid), UINT64_FORMAT "-%u", remotesysid, remotetli);
+
+ /*
+ * We need the numeric replication identifiers to be 16bit wide, so we
+ * cannot rely on the normal oid allocation.
+ *
+ * Lock table against modifications. This is quite heavyweight, but
+ * shouldn't happen frequently.
+ */
+ rel = heap_open(ReplicationIdentifierRelationId, ExclusiveLock);
+
+ for (riident = InvalidOid + 1; riident <= UINT16_MAX; riident++)
+ {
+ bool nulls[Natts_pg_replication_identifier];
+ Datum values[Natts_pg_replication_identifier];
+
+ /*
+ * FIXME: do an index lookup using SnapshotDirty instead. This
+ * pollutes the cache if there are lots of nodes.
+ */
+ tuple = GetReplicationInfoByIdentifier(riident);
+
+ if (tuple != NULL)
+ {
+ ReleaseSysCache(tuple);
+ continue;
+ }
+
+ /* ok, found an unused riident */
+ memset(&nulls, 0, sizeof(nulls));
+
+ values[Anum_pg_replication_riident -1] = ObjectIdGetDatum(riident);
+ values[Anum_pg_replication_riremotesysid - 1] = NameGetDatum(&sysid);
+ values[Anum_pg_replication_rilocaldb - 1] = ObjectIdGetDatum(rilocaldb);
+ values[Anum_pg_replication_riremotedb - 1] = ObjectIdGetDatum(remotedb);
+ values[Anum_pg_replication_riname - 1] = NameGetDatum(riname);
+
+ tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
+ simple_heap_insert(rel, tuple);
+ CatalogUpdateIndexes(rel, tuple);
+ CommandCounterIncrement();
+ break;
+ }
+
+ /*
+ * only release at end of transaction, so we don't have to worry about
+ * race conditions with other transactions trying to insert a new
+ * identifier. Acquiring a new identifier should be a fairly infrequent
+ * thing, so this seems fine.
+ * FIXME: If we use SnapshotDirty above, this isn't necessary anymore.
+ */
+ heap_close(rel, NoLock);
+
+ if (tuple == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+ errmsg("no free replication id could be found")));
+
+ return riident;
+}
+
+/*
+ * Lookup pg_replication_identifier tuple via its riident.
+ *
+ * The result needs to be ReleaseSysCache'ed
+ */
+HeapTuple
+GetReplicationInfoByIdentifier(RepNodeId riident)
+{
+ HeapTuple tuple;
+
+ Assert(OidIsValid((Oid) riident));
+ Assert(riident < UINT16_MAX);
+ tuple = SearchSysCache1(REPLIDIDENT,
+ ObjectIdGetDatum((Oid) riident));
+ return tuple;
+}
+
+Size
+ReplicationIdentifierShmemSize(void)
+{
+ Size size = 0;
+
+ /*
+ * FIXME: max_logical_slots is the wrong thing to use here, here we keep
+ * the replay state of *remote* transactions.
+ */
+ if (max_logical_slots == 0)
+ return size;
+
+ size = add_size(size,
+ mul_size(max_logical_slots, sizeof(ReplicationState)));
+ return size;
+}
+
+void
+ReplicationIdentifierShmemInit(void)
+{
+ bool found;
+
+ if (max_logical_slots == 0)
+ return;
+
+ ReplicationStates = (ReplicationState *)
+ ShmemInitStruct("ReplicationIdentifierState",
+ ReplicationIdentifierShmemSize(),
+ &found);
+
+ if (!found)
+ {
+ MemSet(ReplicationStates, 0, ReplicationIdentifierShmemSize());
+ }
+}
+
+/* ---------------------------------------------------------------------------
+ * Perform a checkpoint of replication identifier's progress with respect to
+ * the replayed remote_lsn. Make sure that all transactions we refer to in the
+ * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
+ * if the transactions were originally committed asynchronously.
+ *
+ * We store checkpoints in the following format:
+ * +-------+-------------------------+-------------------------+-----+
+ * | MAGIC | struct ReplicationState | struct ReplicationState | ... | EOF
+ * +-------+-------------------------+-------------------------+-----+
+ *
+ * So its just the magic, followed by the statically sized
+ * ReplicationStates. Note that the maximum number of ReplicationStates is
+ * determined by max_logical_slots.
+ *
+ * FIXME: Add a CRC32 to the end.
+ * ---------------------------------------------------------------------------
+ */
+void
+CheckPointReplicationIdentifier(XLogRecPtr ckpt)
+{
+ char tmppath[MAXPGPATH];
+ char path[MAXPGPATH];
+ int fd;
+ int tmpfd;
+ int i;
+ uint32 magic = REPLICATION_STATE_MAGIC;
+
+ if (max_logical_slots == 0)
+ return;
+
+ /*
+ * Write to a filename a LSN of the checkpoint's REDO pointer, so we can
+ * deal with the checkpoint failing after
+ * CheckPointReplicationIdentifier() finishing.
+ */
+ sprintf(path, "pg_llog/checkpoints/%X-%X.ckpt",
+ (uint32)(ckpt >> 32), (uint32)ckpt);
+ sprintf(tmppath, "pg_llog/checkpoints/%X-%X.ckpt.tmp",
+ (uint32)(ckpt >> 32), (uint32)ckpt);
+
+ /* check whether file already exists */
+ fd = OpenTransientFile(path,
+ O_RDONLY | PG_BINARY,
+ 0);
+
+ /* usual case, no checkpoint performed yet */
+ if (fd < 0 && errno == ENOENT)
+ ;
+ else if (fd < 0)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not check replication state checkpoint \"%s\": %m",
+ path)));
+ /* already checkpointed before crash during a checkpoint or so */
+ else
+ {
+ CloseTransientFile(fd);
+ return;
+ }
+
+ /* make sure no old temp file is remaining */
+ if (unlink(tmppath) < 0 && errno != ENOENT)
+ ereport(PANIC, (errmsg("failed while unlinking %s", path)));
+
+ /*
+ * no other backend can perform this at the same time, we're protected by
+ * CheckpointLock.
+ */
+ tmpfd = OpenTransientFile(tmppath,
+ O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
+ S_IRUSR | S_IWUSR);
+ if (tmpfd < 0)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not create replication identifier checkpoint \"%s\": %m",
+ tmppath)));
+
+ /* write magic */
+ if ((write(tmpfd, &magic, sizeof(magic))) !=
+ sizeof(magic))
+ {
+ CloseTransientFile(tmpfd);
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not write replication identifier checkpoint \"%s\": %m",
+ tmppath)));
+ }
+
+ /* write actual data */
+ for (i = 0; i < max_logical_slots; i++)
+ {
+ ReplicationState local_state;
+
+ if (ReplicationStates[i].local_identifier == InvalidRepNodeId)
+ continue;
+
+ local_state.local_identifier = ReplicationStates[i].local_identifier;
+ local_state.remote_lsn = ReplicationStates[i].remote_lsn;
+ local_state.local_lsn = InvalidXLogRecPtr;
+
+ /* make sure we only write out a commit that's persistent */
+ XLogFlush(ReplicationStates[i].local_lsn);
+
+ if ((write(tmpfd, &local_state, sizeof(ReplicationState))) !=
+ sizeof(ReplicationState))
+ {
+ CloseTransientFile(tmpfd);
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not write replication identifier checkpoint \"%s\": %m",
+ tmppath)));
+ }
+ }
+
+ /* fsync the file */
+ if (pg_fsync(tmpfd) != 0)
+ {
+ CloseTransientFile(tmpfd);
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not fsync replication identifier checkpoint \"%s\": %m",
+ tmppath)));
+ }
+
+ CloseTransientFile(tmpfd);
+
+ /* rename to permanent file, fsync file and directory */
+ if (rename(tmppath, path) != 0)
+ {
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not rename replication identifier checkpoint from \"%s\" to \"%s\": %m",
+ tmppath, path)));
+ }
+
+ fsync_fname("pg_llog/checkpoints", true);
+ fsync_fname(path, false);
+}
+
+/*
+ * Recover replication replay status from checkpoint data saved earlier by
+ * CheckPointReplicationIdentifier.
+ *
+ * This only needs to be called at startup and *not* during every checkpoint
+ * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
+ * state thereafter can be recovered by looking at commit records.
+ */
+void
+StartupReplicationIdentifier(XLogRecPtr ckpt)
+{
+ char path[MAXPGPATH];
+ int fd;
+ int readBytes;
+ uint32 magic = REPLICATION_STATE_MAGIC;
+ int last_state = 0;
+
+ /* don't want to overwrite already existing state */
+#ifdef USE_ASSERT_CHECKING
+ static bool already_started = false;
+ Assert(!already_started);
+ already_started = true;
+#endif
+
+ if (max_logical_slots == 0)
+ return;
+
+ elog(LOG, "starting up replication identifier with ckpt at %X/%X",
+ (uint32)(ckpt >> 32), (uint32)ckpt);
+
+ sprintf(path, "pg_llog/checkpoints/%X-%X.ckpt",
+ (uint32)(ckpt >> 32), (uint32)ckpt);
+
+ fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
+
+ /*
+ * might have had max_logical_slots == 0 last run, or we just brought up a
+ * standby.
+ */
+ if (fd < 0 && errno == ENOENT)
+ return;
+ else if (fd < 0)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not open replication state checkpoint \"%s\": %m",
+ path)));
+
+ /* verify magic, thats written even if nothing was active */
+ readBytes = read(fd, &magic, sizeof(magic));
+ if (readBytes != sizeof(magic))
+ ereport(PANIC,
+ (errmsg("could not read replication state checkpoint magic \"%s\": %m",
+ path)));
+
+ if (magic != REPLICATION_STATE_MAGIC)
+ ereport(PANIC,
+ (errmsg("replication checkpoint has wrong magic %u instead of %u",
+ magic, REPLICATION_STATE_MAGIC)));
+
+ /* recover individual states, until there are no more to be found */
+ while (true)
+ {
+ ReplicationState local_state;
+ readBytes = read(fd, &local_state, sizeof(local_state));
+
+ /* no further data */
+ if (readBytes == 0)
+ break;
+
+ if (readBytes < 0)
+ {
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not read replication checkpoint file \"%s\": %m",
+ path)));
+ }
+
+ if (readBytes != sizeof(local_state))
+ {
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not read replication checkpoint file \"%s\": read %d of %zu",
+ path, readBytes, sizeof(local_state))));
+ }
+
+ if (last_state == max_logical_slots)
+ ereport(PANIC,
+ (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+ errmsg("no free replication state could be found, increase max_logical_slots")));
+
+ /* copy data shared memory */
+ ReplicationStates[last_state++] = local_state;
+
+ elog(LOG, "recovered replication state of node %u to %X/%X",
+ local_state.local_identifier,
+ (uint32)(local_state.remote_lsn >> 32),
+ (uint32)local_state.remote_lsn);
+ }
+
+ CloseTransientFile(fd);
+}
+
+/*
+ * Tell the replication identifier machinery that a commit from 'node' that
+ * originated at the LSN remote_commit on the remote node was replayed
+ * successfully and that we don't need to do so again. In combination with
+ * setting up replication_origin_lsn and guc_replication_origin_id that
+ * ensures we won't loose knowledge about that after a crash if the the
+ * transaction had a persistent effect (think of asynchronous commits).
+ *
+ * local_commit needs to be a local LSN of the commit so that we can make sure
+ * uppon a checkpoint that enough WAL has been persisted to disk.
+ */
+void
+AdvanceReplicationIdentifier(RepNodeId node,
+ XLogRecPtr remote_commit,
+ XLogRecPtr local_commit)
+{
+ int i;
+ int free_slot = -1;
+ ReplicationState *replication_state = NULL;
+
+ /*
+ * XXX: should we restore into a hashtable and dump into shmem only after
+ * recovery finished?
+ */
+
+ /* check whether slot already exists */
+ for (i = 0; i < max_logical_slots; i++)
+ {
+ /* remember where to insert if necessary */
+ if (ReplicationStates[i].local_identifier == InvalidRepNodeId &&
+ free_slot == -1)
+ {
+ free_slot = i;
+ continue;
+ }
+
+ /* not our slot */
+ if (ReplicationStates[i].local_identifier != node)
+ continue;
+
+ /* ok, found slot */
+ replication_state = &ReplicationStates[i];
+ break;
+ }
+
+ if (replication_state == NULL && free_slot == -1)
+ ereport(PANIC,
+ (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+ errmsg("no free replication state could be found for %u, increase max_logical_slots",
+ node)));
+ /* initialize new slot */
+ else if (replication_state == NULL)
+ {
+ replication_state = &ReplicationStates[free_slot];
+ Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
+ Assert(replication_state->local_lsn == InvalidXLogRecPtr);
+ replication_state->local_identifier = node;
+ }
+
+ /*
+ * Due to - harmless - race conditions during a checkpoint we could see
+ * values here that are older than the ones we already have in
+ * memory. Don't overwrite those.
+ */
+ if (replication_state->remote_lsn < remote_commit)
+ replication_state->remote_lsn = remote_commit;
+ if (replication_state->local_lsn < local_commit)
+ replication_state->local_lsn = local_commit;
+}
+
+
+/*
+ * Setup a replication identifier in the shared memory struct if it doesn't
+ * already exists and cache access to the specific ReplicationSlot so the
+ * array doesn't have to be searched when calling
+ * AdvanceCachedReplicationIdentifier().
+ *
+ * Obviously only one such cached identifier can exist per process and the
+ * current cached value can only be set again after the prvious value is torn
+ * down with TeardownCachedReplicationIdentifier.
+ */
+void
+SetupCachedReplicationIdentifier(RepNodeId node)
+{
+ int i;
+ int free_slot = -1;
+
+ Assert(max_logical_slots != 0);
+ Assert(local_replication_state == NULL);
+
+ /*
+ * Search for either an existing slot for that identifier or a free one we
+ * can use.
+ */
+ for (i = 0; i < max_logical_slots; i++)
+ {
+ /* remember where to insert if necessary */
+ if (ReplicationStates[i].local_identifier == InvalidRepNodeId &&
+ free_slot == -1)
+ {
+ free_slot = i;
+ continue;
+ }
+
+ /* not our slot */
+ if (ReplicationStates[i].local_identifier != node)
+ continue;
+
+ local_replication_state = &ReplicationStates[i];
+ }
+
+
+ if (local_replication_state == NULL && free_slot == -1)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+ errmsg("no free replication state could be found for %u, increase max_logical_slots",
+ node)));
+ else if (local_replication_state == NULL)
+ {
+ local_replication_state = &ReplicationStates[free_slot];
+ local_replication_state->local_identifier = node;
+ Assert(local_replication_state->remote_lsn == InvalidXLogRecPtr);
+ Assert(local_replication_state->local_lsn == InvalidXLogRecPtr);
+ }
+}
+
+/*
+ * Make currently cached replication identifier unavailable so a new one can
+ * be setup with SetupCachedReplicationIdentifier().
+ *
+ * This function may only be called if a previous identifier was setup with
+ * SetupCachedReplicationIdentifier().
+ */
+void
+TeardownCachedReplicationIdentifier(RepNodeId node)
+{
+ Assert(max_logical_slots != 0);
+ Assert(local_replication_state != NULL);
+
+ local_replication_state = NULL;
+}
+
+/*
+ * Do the same work AdvanceReplicationIdentifier() does, just on a pre-cached
+ * identifier. This is noticeably cheaper if you only ever work on a single
+ * replication identifier.
+ */
+void
+AdvanceCachedReplicationIdentifier(XLogRecPtr remote_commit,
+ XLogRecPtr local_commit)
+{
+ Assert(local_replication_state != NULL);
+ if (local_replication_state->local_lsn < local_commit)
+ local_replication_state->local_lsn = local_commit;
+ if (local_replication_state->remote_lsn < remote_commit)
+ local_replication_state->remote_lsn = remote_commit;
+}
+
+/*
+ * Ask the machinery about the point up to which we successfully replayed
+ * changes from a already setup & cached replication identifier.
+ */
+XLogRecPtr
+RemoteCommitFromCachedReplicationIdentifier(void)
+{
+ Assert(local_replication_state != NULL);
+ return local_replication_state->remote_lsn;
+}
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index ed2d69f..868ebb4 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -31,6 +31,7 @@
#include "replication/logical.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
+#include "replication/replication_identifier.h"
#include "storage/bufmgr.h"
#include "storage/dsm.h"
#include "storage/ipc.h"
@@ -129,6 +130,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
size = add_size(size, CheckpointerShmemSize());
size = add_size(size, AutoVacuumShmemSize());
size = add_size(size, LogicalDecodingShmemSize());
+ size = add_size(size, ReplicationIdentifierShmemSize());
size = add_size(size, WalSndShmemSize());
size = add_size(size, WalRcvShmemSize());
size = add_size(size, BTreeShmemSize());
@@ -237,6 +239,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
CheckpointerShmemInit();
AutoVacuumShmemInit();
LogicalDecodingShmemInit();
+ ReplicationIdentifierShmemInit();
WalSndShmemInit();
WalRcvShmemInit();
diff --git a/src/backend/utils/cache/syscache.c b/src/backend/utils/cache/syscache.c
index e9bdfea..a4ebdd5 100644
--- a/src/backend/utils/cache/syscache.c
+++ b/src/backend/utils/cache/syscache.c
@@ -54,6 +54,7 @@
#include "catalog/pg_shdepend.h"
#include "catalog/pg_shdescription.h"
#include "catalog/pg_shseclabel.h"
+#include "catalog/pg_replication_identifier.h"
#include "catalog/pg_statistic.h"
#include "catalog/pg_tablespace.h"
#include "catalog/pg_ts_config.h"
@@ -620,6 +621,28 @@ static const struct cachedesc cacheinfo[] = {
},
128
},
+ {ReplicationIdentifierRelationId, /* REPLIDIDENT */
+ ReplicationLocalIdIndex,
+ 1,
+ {
+ Anum_pg_replication_riident,
+ 0,
+ 0,
+ 0
+ },
+ 16
+ },
+ {ReplicationIdentifierRelationId, /* REPLIDREMOTE */
+ ReplicationRemoteIndex,
+ 4,
+ {
+ Anum_pg_replication_riremotesysid,
+ Anum_pg_replication_riremotedb,
+ Anum_pg_replication_rilocaldb,
+ Anum_pg_replication_riname
+ },
+ 16
+ },
{RewriteRelationId, /* RULERELNAME */
RewriteRelRulenameIndexId,
2,
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 15286e7..7326800 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -201,6 +201,8 @@ static bool check_application_name(char **newval, void **extra, GucSource source
static void assign_application_name(const char *newval, void *extra);
static const char *show_unix_socket_permissions(void);
static const char *show_log_file_mode(void);
+static void assign_replication_node_id(int newval, void *extra);
+static void assign_replication_origin_id(int newval, void *extra);
static char *config_enum_get_options(struct config_enum * record,
const char *prefix, const char *suffix,
@@ -473,7 +475,8 @@ static bool data_checksums;
static int wal_segment_size;
static bool integer_datetimes;
static int effective_io_concurrency;
-
+static int phony_replication_node_id;
+static int phony_replication_origin_id;
/* should be static, but commands/variable.c needs to get at this */
char *role_string;
@@ -2118,6 +2121,26 @@ static struct config_int ConfigureNamesInt[] =
},
{
+ {"replication_node_id", PGC_POSTMASTER, REPLICATION_MASTER,
+ gettext_noop("node id for replication."),
+ NULL
+ },
+ &phony_replication_node_id,
+ InvalidRepNodeId, InvalidRepNodeId, INT_MAX,
+ NULL, assign_replication_node_id, NULL
+ },
+
+ {
+ {"replication_origin_id", PGC_USERSET, REPLICATION_MASTER,
+ gettext_noop("current node id for replication."),
+ NULL
+ },
+ &phony_replication_origin_id,
+ InvalidRepNodeId, InvalidRepNodeId, INT_MAX,
+ NULL, assign_replication_origin_id, NULL
+ },
+
+ {
{"commit_siblings", PGC_USERSET, WAL_SETTINGS,
gettext_noop("Sets the minimum concurrent open transactions before performing "
"commit_delay."),
@@ -8918,4 +8941,26 @@ show_log_file_mode(void)
return buf;
}
+static void
+assign_replication_node_id(int newval, void *extra)
+{
+ guc_replication_node_id = newval;
+ /* set default to local node */
+ guc_replication_origin_id = newval;
+ phony_replication_origin_id = newval;
+}
+
+
+static void
+assign_replication_origin_id(int newval, void *extra)
+{
+ /*
+ * FIXME: add error checking hook that check wal_level and
+ * replication_node_id.
+ */
+ guc_replication_origin_id = newval;
+}
+
+
+
#include "guc-file.c"
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index d66ec83..d93e863 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -258,7 +258,7 @@
#wal_receiver_timeout = 60s # time that receiver waits for
# communication from master
# in milliseconds; 0 disables
-
+#replication_node_id = 0 #invalid node id
#------------------------------------------------------------------------------
# QUERY TUNING
diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c
index b7424e7..f1e991a 100644
--- a/src/bin/initdb/initdb.c
+++ b/src/bin/initdb/initdb.c
@@ -202,6 +202,7 @@ const char *subdirs[] = {
"pg_stat",
"pg_stat_tmp",
"pg_llog",
+ "pg_llog/checkpoints",
"pg_llog/snapshots",
"pg_llog/mappings"
};
diff --git a/src/bin/pg_resetxlog/pg_resetxlog.c b/src/bin/pg_resetxlog/pg_resetxlog.c
index f1b5d6d..dbc5b74 100644
--- a/src/bin/pg_resetxlog/pg_resetxlog.c
+++ b/src/bin/pg_resetxlog/pg_resetxlog.c
@@ -55,6 +55,7 @@
#include "catalog/catversion.h"
#include "catalog/pg_control.h"
#include "common/fe_memutils.h"
+#include "replication/logical.h"
extern int optind;
extern char *optarg;
@@ -970,6 +971,7 @@ WriteEmptyXLOG(void)
record->xl_len = sizeof(CheckPoint);
record->xl_info = XLOG_CHECKPOINT_SHUTDOWN;
record->xl_rmid = RM_XLOG_ID;
+ record->xl_origin_id = InvalidRepNodeId;
memcpy(XLogRecGetData(record), &ControlFile.checkPointCopy,
sizeof(CheckPoint));
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 96502ce..84ff5ca 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -146,6 +146,13 @@ typedef struct xl_xact_commit
/* ARRAY OF SHARED INVALIDATION MESSAGES FOLLOWS */
} xl_xact_commit;
+typedef struct xl_xact_origin
+{
+ XLogRecPtr origin_lsn;
+ RepNodeId origin_node_id;
+ TimestampTz origin_timestamp;
+} xl_xact_origin;
+
#define MinSizeOfXactCommit offsetof(xl_xact_commit, xnodes)
/*
@@ -158,7 +165,7 @@ typedef struct xl_xact_commit
*/
#define XACT_COMPLETION_UPDATE_RELCACHE_FILE 0x01
#define XACT_COMPLETION_FORCE_SYNC_COMMIT 0x02
-
+#define XACT_CONTAINS_ORIGIN 0x04
/* Access macros for above flags */
#define XactCompletionRelcacheInitFileInval(xinfo) (xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE)
#define XactCompletionForceSyncCommit(xinfo) (xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT)
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 7415a26..8b03846 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -48,6 +48,7 @@ typedef struct XLogRecord
/* 2 bytes of padding here, initialize to zero */
XLogRecPtr xl_prev; /* ptr to previous record in log */
pg_crc32 xl_crc; /* CRC for this record */
+ RepNodeId xl_origin_id; /* what node did originally cause this record to be written */
/* If MAXALIGN==8, there are 4 wasted bytes here */
@@ -177,6 +178,7 @@ typedef enum
} RecoveryTargetType;
extern XLogRecPtr XactLastRecEnd;
+extern XLogRecPtr XactLastCommitEnd;
extern bool reachedConsistency;
diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h
index bca166e..60fcc31 100644
--- a/src/include/access/xlogdefs.h
+++ b/src/include/access/xlogdefs.h
@@ -45,6 +45,12 @@ typedef uint64 XLogSegNo;
typedef uint32 TimeLineID;
/*
+ * Denotes the node on which the action causing a wal record to be logged
+ * originated on.
+ */
+typedef uint16 RepNodeId;
+
+/*
* Because O_DIRECT bypasses the kernel buffers, and because we never
* read those buffers except during crash recovery or if wal_level != minimal,
* it is a win to use it in all cases where we sync on each write(). We could
diff --git a/src/include/catalog/indexing.h b/src/include/catalog/indexing.h
index 4860e98..3fdc2b5 100644
--- a/src/include/catalog/indexing.h
+++ b/src/include/catalog/indexing.h
@@ -313,6 +313,12 @@ DECLARE_UNIQUE_INDEX(pg_extension_name_index, 3081, on pg_extension using btree(
DECLARE_UNIQUE_INDEX(pg_range_rngtypid_index, 3542, on pg_range using btree(rngtypid oid_ops));
#define RangeTypidIndexId 3542
+DECLARE_UNIQUE_INDEX(pg_replication_identifier_riiident_index, 3195, on pg_replication_identifier using btree(riident oid_ops));
+#define ReplicationLocalIdIndex 3195
+
+DECLARE_UNIQUE_INDEX(pg_replication_identifier_remote_index, 3196, on pg_replication_identifier using btree(riremotesysid name_ops, riremotedb oid_ops, rilocaldb oid_ops, riname name_ops));
+#define ReplicationRemoteIndex 3196
+
/* last step of initialization script: build the indexes declared above */
BUILD_INDICES
diff --git a/src/include/catalog/pg_replication_identifier.h b/src/include/catalog/pg_replication_identifier.h
new file mode 100644
index 0000000..3d55fd1
--- /dev/null
+++ b/src/include/catalog/pg_replication_identifier.h
@@ -0,0 +1,92 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_replication_identifier.h
+ * Persistent Replication Node Identifiers
+ *
+ * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/catalog/pg_replication_identifier.h
+ *
+ * NOTES
+ * the genbki.pl script reads this file and generates .bki
+ * information from the DATA() statements.
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_REPLICATION_IDENTIFIER_H
+#define PG_REPLICATION_IDENTIFIER_H
+
+#include "catalog/genbki.h"
+#include "access/xlogdefs.h"
+
+/* ----------------
+ * pg_replication_identifier. cpp turns this into
+ * typedef struct FormData_pg_replication_identifier
+ * ----------------
+ */
+#define ReplicationIdentifierRelationId 3465
+
+CATALOG(pg_replication_identifier,3465) BKI_SHARED_RELATION BKI_WITHOUT_OIDS
+{
+ /*
+ * locally known identifier that gets included into wal.
+ *
+ * This should never leave the system.
+ *
+ * Needs to fit into a uint16, so we don't waste too much space. For this
+ * reason we don't use a normal Oid column here, since we need to handle
+ * allocation of new values manually.
+ */
+ Oid riident;
+
+ /* ----
+ * remote system identifier, including tli, separated by a -.
+ *
+ * They are packed together for two reasons:
+ * a) we can't represent sysids as uint64 because there's no such type on
+ * sql level, so we need a fixed width string anyway. And a name
+ * already has enough space for that.
+ * b) syscaches can only have 4 keys, and were already at that with
+ * combined keys
+ * ----
+ */
+ NameData riremotesysid;
+
+ /* local database */
+ Oid rilocaldb;
+
+ /* remote database */
+ Oid riremotedb;
+
+ /* optional name, zero length string */
+ NameData riname;
+#ifdef CATALOG_VARLEN /* variable-length fields start here */
+#endif
+} FormData_pg_replication_identifier;
+
+/* ----------------
+ * Form_pg_extension corresponds to a pointer to a tuple with
+ * the format of pg_extension relation.
+ * ----------------
+ */
+typedef FormData_pg_replication_identifier *Form_pg_replication_identifier;
+
+/* ----------------
+ * compiler constants for pg_replication_identifier
+ * ----------------
+ */
+
+#define Natts_pg_replication_identifier 5
+#define Anum_pg_replication_riident 1
+#define Anum_pg_replication_riremotesysid 2
+#define Anum_pg_replication_rilocaldb 3
+#define Anum_pg_replication_riremotedb 4
+#define Anum_pg_replication_riname 5
+
+/* ----------------
+ * pg_replication_identifier has no initial contents
+ * ----------------
+ */
+
+#endif /* PG_REPLICTION_IDENTIFIER_H */
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 273b98f..d22fc6c 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -160,6 +160,13 @@ typedef struct LogicalDecodingContext
/* GUCs */
extern PGDLLIMPORT int max_logical_slots;
+#define InvalidRepNodeId 0
+extern PGDLLIMPORT RepNodeId guc_replication_node_id;
+extern PGDLLIMPORT RepNodeId guc_replication_origin_id;
+extern PGDLLIMPORT XLogRecPtr replication_origin_lsn;
+extern PGDLLIMPORT TimestampTz replication_origin_timestamp;
+
+
extern Size LogicalDecodingShmemSize(void);
extern void LogicalDecodingShmemInit(void);
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 4181ccf..ff5d29c 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -60,6 +60,8 @@ typedef struct ReorderBufferChange
int action_internal;
};
+ RepNodeId origin_id;
+
/*
* Context data for the change, which part of the union is valid depends
* on action/action_internal.
@@ -148,6 +150,12 @@ typedef struct ReorderBufferTXN
*/
XLogRecPtr restart_decoding_lsn;
+ /* origin of the change that caused this transaction */
+ RepNodeId origin_id;
+
+ /* did the TX have catalog changes */
+ bool does_timetravel;
+
/*
* Commit time, only known when we read the actual commit record.
*/
@@ -320,7 +328,7 @@ void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *);
void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *);
void ReorderBufferCommit(ReorderBuffer *, TransactionId,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
- TimestampTz commit_time);
+ TimestampTz commit_time, RepNodeId origin_id);
void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn);
void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn);
diff --git a/src/include/replication/replication_identifier.h b/src/include/replication/replication_identifier.h
new file mode 100644
index 0000000..3c47038
--- /dev/null
+++ b/src/include/replication/replication_identifier.h
@@ -0,0 +1,42 @@
+/*-------------------------------------------------------------------------
+ * replication_identifier.h
+ * XXX
+ *
+ * Copyright (c) 2013, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef REPLICATION_IDENTIFIER_H
+#define REPLICATION_IDENTIFIER_H
+
+#include "catalog/pg_replication_identifier.h"
+#include "replication/logical.h"
+
+/* API for querying & manipulating replication identifiers */
+extern RepNodeId GetReplicationIdentifier(uint64 remotesysid, Oid remotetli,
+ Oid remotedb, Name riname,
+ Oid rilocaldb);
+extern RepNodeId CreateReplicationIdentifier(uint64 remotesysid, Oid remotetli,
+ Oid remotedb, Name riname,
+ Oid rilocaldb);
+extern HeapTuple GetReplicationInfoByIdentifier(RepNodeId riident);
+
+
+extern void AdvanceReplicationIdentifier(RepNodeId node,
+ XLogRecPtr remote_commit,
+ XLogRecPtr local_commit);
+extern void AdvanceCachedReplicationIdentifier(XLogRecPtr remote_commit,
+ XLogRecPtr local_commit);
+extern void SetupCachedReplicationIdentifier(RepNodeId node);
+extern void TeardownCachedReplicationIdentifier(RepNodeId node);
+extern XLogRecPtr RemoteCommitFromCachedReplicationIdentifier(void);
+
+/* crash recovery support */
+extern void CheckPointReplicationIdentifier(XLogRecPtr ckpt);
+extern void StartupReplicationIdentifier(XLogRecPtr ckpt);
+
+/* internals */
+extern Size ReplicationIdentifierShmemSize(void);
+extern void ReplicationIdentifierShmemInit(void);
+
+#endif
diff --git a/src/include/utils/syscache.h b/src/include/utils/syscache.h
index e41b3d2..357f344 100644
--- a/src/include/utils/syscache.h
+++ b/src/include/utils/syscache.h
@@ -77,6 +77,8 @@ enum SysCacheIdentifier
RANGETYPE,
RELNAMENSP,
RELOID,
+ REPLIDIDENT,
+ REPLIDREMOTE,
RULERELNAME,
STATRELATTINH,
TABLESPACEOID,
diff --git a/src/test/regress/expected/sanity_check.out b/src/test/regress/expected/sanity_check.out
index a62a3e3..d5e40d6 100644
--- a/src/test/regress/expected/sanity_check.out
+++ b/src/test/regress/expected/sanity_check.out
@@ -120,6 +120,7 @@ pg_opfamily|t
pg_pltemplate|t
pg_proc|t
pg_range|t
+pg_replication_identifier|t
pg_rewrite|t
pg_seclabel|t
pg_shdepend|t
--
1.8.3.251.g1462b67
On Thu, Nov 14, 2013 at 12:26 PM, Andres Freund <andres@2ndquadrant.com> wrote:
As you know, the reason we are working changeset extraction is that we
want to build logical unidirection and bidirectional replication
ontop. To use changeset extraction effectively, I think one set of
related features ontop is very useful:When extracting changes using the changeset extraction patchset (latest
version at [1]) the START_LOGICAL_REPLICATION command is used to stream
changes from a source system. When started it will continue to send
changes as long as the connection is up or it is aborted. For obvious
performance reasons it will *not* wait for an ACK for each transaction
commit it streams out.
Instead it relies on the receiver, exactly as in physical replication,
sending feedback messages containing the LSN up to which data has safely
been received.
That means frequently something like:
walsender: => COMMIT 0/10000000
walsender: => COMMIT 0/10000200
walsender: => COMMIT 0/10000400
walsender: => COMMIT 0/10000600
receiver: <= ACKNOWLEDGE 0/10000270
walsender: => COMMIT 0/10000800
is possible and important for performance. I.e. the server has streamed
out more changes than it got confirmation for.So, when the the replication connection goes down, e.g. because the
receiving side has crashed, we need to tell the server from where to
start. Every position between the last ACKed and the end of WAL is
legal.
The receiver then can ask the source to start replication from the last
replayed commit using START_LOGICAL_REPLICATION 'slot_name'
'0/10000600' which would then re-stream all the changes in the
transaction that committe at 0/10000600 and all that follow.But for that the receiving side needs to know up to where changes have
been applied. One relatively easy solution for that is that the
receiving side does something like:
UPDATE replication_progress SET lsn = '0/10000600' WHERE source_id = ...;
before the end of every replayed transaction. But that obviously will
quickly cause bloat.Our solution to that is that a replaying process can tell the backend
that it is currently doing so and setup three variables for every
transaction:
1) an identifier for the the source database
2) the LSN at which the replayed transaction has committed remotely
3) the time at which the replayed transaction has committed remotelyWhen the transaction then commits the commit record will set the
XACT_CONTAINS_ORIGIN flag to ->xinfo and will add that data to the end
of the commit record. During crash recovery the startup process will
remember the newest LSN for each remote database in shared memory.This way, after a crash, restart, disconnect the replay process can look
into shared memory and check how far it has already replayed and restart
seamlessly. With minimal effort.
It would be much less invasive for the replication apply code to fsync
its own state on the apply side. Obviously, that means doubling the
fsync rate, which is not appealing, but I think that's still a useful
way to think about what you're aiming to accomplish here: avoid
doubling the fsync rate when applying remote transactions in a
crash-safe manner.
Although I agree that we need a way to do that, I don't have a
particularly warm and fuzzy feeling about this particular proposal:
there are too many bits of it that feel like entirely arbitrary design
decisions. If we're going to build a full-fledged logical replication
solution into core, attempting to obsolete Slony and Bucardo and
Londiste and everything that's out there, then I think we have a great
deal of design work that we have to do before we start committing
things, or even finalizing designs. If we're going to continue with
the philosophy of building a toolkit that can serve as a building
block for multiple solutions, then color me unconvinced that this will
do the job.
If we made the xlog system truly extensible, that seems like it'd
punch your ticket here. I'm not sure how practical that is, though.
We previously discussed the topic and some were very adverse to using
any sort of numeric node identifiers across systems and suggested that
those should only be used internally. So what the attached patch does is
to add a new shared system catalog called 'pg_replication_identifier'
(suggestions for a better name welcome) which translates a number of
identifying traits into a numeric identifier.
The set of identifiers currently are:
* the sysid of the remote system, combined with the remote TLI
* the oid of the local database
* the oid of the remote database
* an optional name
but that's just what we needed in our multimaster prototype, and not
what I necessarily think is correct.
The fact that you've included both local and remote database OIDs
seems wrong; shouldn't the replication identifier only serve to
identify the source node, not the replication stream? What if you
want to replicate from table A to table B within the same database?
(e.g. so that you can lock them for DDL changes in alternation) What
if you want to replicate tables A and B in one database into a
database on another node, but making the two of them independent
replication streams - e.g. because we know that transactions on table
A will be large but not latency-sensitive, but transactions on B will
be small but highly sensitive to replication delay? What if we want
to use changeset extraction to produce delta relations on the base
tables for a materialized view, feed them through a bunch of
relational algebra, and apply the resulting changes to the MV, keeping
track of how far we got?
We need some kind of pretty flexible system here, if we're not to box
ourselves into a corner.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Thu, Nov 14, 2013 at 5:26 PM, Andres Freund <andres@2ndquadrant.com>wrote:
But for that the receiving side needs to know up to where changes have
been applied. One relatively easy solution for that is that the
receiving side does something like:
UPDATE replication_progress SET lsn = '0/10000600' WHERE source_id = ...;
before the end of every replayed transaction. But that obviously will
quickly cause bloat.Our solution to that is that a replaying process can tell the backend
that it is currently doing so and setup three variables for every
transaction:
This is a pretty massive design decision to hinge on such a minor
implementation detail of table bloat (which I don't think would actually be
an issue anyway -- isn't that what we have HOT for?)
Fundamentally the question here is where to keep all the book-keeping state
about replicas, in a central repository in the master or locally in each
replica. At first blush it seems obvious to me that locally in each replica
is the more flexible choice.
Replication systems become complex when you start restoring from old
backups and not every node has the same view of the topology as every other
node. I fear what will happen to a central repository when you fail over
the master and it's out of sync with where the slaves have actually
restored up to. Or where you fail over a slave to a standby of the slave
and it needs to redo some of the logical replication to catch up. Or where
you restore all your nodes, both master and slaves from backups taken at
different points in time (presumably with the master ahead of the slaves).
Having a central repository makes the whole system simpler but it also
makes it much more fragile. It's nice to have a single place to go to find
out what the state of every replica is but it should do that by actually
asking the replicas, not by maintaining state that might be out of sync.
--
greg
Hi,
On 2013-11-19 07:40:30 -0500, Robert Haas wrote:
This way, after a crash, restart, disconnect the replay process can look
into shared memory and check how far it has already replayed and restart
seamlessly. With minimal effort.It would be much less invasive for the replication apply code to fsync
its own state on the apply side. Obviously, that means doubling the
fsync rate, which is not appealing, but I think that's still a useful
way to think about what you're aiming to accomplish here: avoid
doubling the fsync rate when applying remote transactions in a
crash-safe manner.
Exactly.
Although I agree that we need a way to do that, I don't have a
particularly warm and fuzzy feeling about this particular proposal:
there are too many bits of it that feel like entirely arbitrary design
decisions. If we're going to build a full-fledged logical replication
solution into core, attempting to obsolete Slony and Bucardo and
Londiste and everything that's out there, then I think we have a great
deal of design work that we have to do before we start committing
things, or even finalizing designs. If we're going to continue with
the philosophy of building a toolkit that can serve as a building
block for multiple solutions, then color me unconvinced that this will
do the job.
Imo we actually want and need both, wanting something builtin doesn't
preclude important usecases that need to be served by other solutions.
I think - while the API certainly needs work - the general idea
integrates pretty well with the pretty generic changeset extraction
mechanism and possible solutions replication between postgres servers.
Note that this really is a draft of what I think is needed, written
after the experience of developing a solution for the problem in a
specific replication solution and talking to some people implementing
replication solutions. Maybe somebody has a far better idea to implement
this: I am all ears!
If we made the xlog system truly extensible, that seems like it'd
punch your ticket here. I'm not sure how practical that is, though.
I don't think it is.
We previously discussed the topic and some were very adverse to using
any sort of numeric node identifiers across systems and suggested that
those should only be used internally. So what the attached patch does is
to add a new shared system catalog called 'pg_replication_identifier'
(suggestions for a better name welcome) which translates a number of
identifying traits into a numeric identifier.
The set of identifiers currently are:
* the sysid of the remote system, combined with the remote TLI
* the oid of the local database
* the oid of the remote database
* an optional name
but that's just what we needed in our multimaster prototype, and not
what I necessarily think is correct.The fact that you've included both local and remote database OIDs
seems wrong; shouldn't the replication identifier only serve to
identify the source node, not the replication stream? What if you
want to replicate from table A to table B within the same database?
The reason I chose those parameters is that they avoid the need for a
human to assign identifiers in many situations since they already are
unique. For the cases where they aren't I've included the "name" to
distinguish several streams.
The reason both source and target database are included is that it
avoids manual work if you want to replicate between two databases in
both directions.
We need some kind of pretty flexible system here, if we're not to box
ourselves into a corner.
Agreed. As an alternative we could just have a single - probably longer
than NAMEDATALEN - string to identify replication progress and rely on
the users of the facility to build the identifier automatically
themselves using components that are helpful in their system.
Thanks,
Andres Freund
--
Andres Freund http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Tue, Nov 19, 2013 at 11:57 AM, Andres Freund <andres@2ndquadrant.com> wrote:
Agreed. As an alternative we could just have a single - probably longer
than NAMEDATALEN - string to identify replication progress and rely on
the users of the facility to build the identifier automatically
themselves using components that are helpful in their system.
I tend to feel like a generic identifier would be better. I'm not
sure why something like a UUID wouldn't be enough, though.
Arbitrary-length identifiers will be bad for performance, and 128 bits
ought to be enough for anyone.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 2013-11-19 12:47:29 -0500, Robert Haas wrote:
On Tue, Nov 19, 2013 at 11:57 AM, Andres Freund <andres@2ndquadrant.com> wrote:
Agreed. As an alternative we could just have a single - probably longer
than NAMEDATALEN - string to identify replication progress and rely on
the users of the facility to build the identifier automatically
themselves using components that are helpful in their system.I tend to feel like a generic identifier would be better. I'm not
sure why something like a UUID wouldn't be enough, though.
Arbitrary-length identifiers will be bad for performance, and 128 bits
ought to be enough for anyone.
That's what I had suggested to some people originally and the response
was, well, somewhat unenthusiastic. It's not that easy to assign them in
a meaningful automated manner. How do you automatically assign a pg
cluster an id?
But yes, maybe the answer is to balk on that part, let the users figure
out what's best, and then only later implement more policy based on that
experience.
WRT performance: I agree that fixed-width identifiers are more
performant, that's why I went for them, but I am not sure it's that
important. The performance sensitive parts should all be done using the
internal id the identifier maps to, not the public one.
Greetings,
Andres Freund
--
Andres Freund http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 11/14/2013 12:26 PM, Andres Freund wrote:
Hello,
As you know, the reason we are working changeset extraction is that we
want to build logical unidirection and bidirectional replication
ontop. To use changeset extraction effectively, I think one set of
related features ontop is very useful:When extracting changes using the changeset extraction patchset (latest
version at [1]) the START_LOGICAL_REPLICATION command is used to stream
changes from a source system. When started it will continue to send
changes as long as the connection is up or it is aborted. For obvious
performance reasons it will *not* wait for an ACK for each transaction
commit it streams out.
Instead it relies on the receiver, exactly as in physical replication,
sending feedback messages containing the LSN up to which data has safely
been received.
That means frequently something like:
walsender: => COMMIT 0/10000000
walsender: => COMMIT 0/10000200
walsender: => COMMIT 0/10000400
walsender: => COMMIT 0/10000600
receiver: <= ACKNOWLEDGE 0/10000270
walsender: => COMMIT 0/10000800
is possible and important for performance. I.e. the server has streamed
out more changes than it got confirmation for.So, when the the replication connection goes down, e.g. because the
receiving side has crashed, we need to tell the server from where to
start. Every position between the last ACKed and the end of WAL is
legal.
The receiver then can ask the source to start replication from the last
replayed commit using START_LOGICAL_REPLICATION 'slot_name'
'0/10000600' which would then re-stream all the changes in the
transaction that committe at 0/10000600 and all that follow.But for that the receiving side needs to know up to where changes have
been applied. One relatively easy solution for that is that the
receiving side does something like:
UPDATE replication_progress SET lsn = '0/10000600' WHERE source_id = ...;
before the end of every replayed transaction. But that obviously will
quickly cause bloat.
I don't see how this is going to cause any more bloat than what
trigger-based slony does today with sl_confirm and I don't hear a lot of
complaints about that being a big problem. This might be because slony
doesn't do a commit on the replica for every transaction but groups the
transactions together, logical slony will behave the same way where we
would only commit on SYNC transactions.
Our solution to that is that a replaying process can tell the backend
that it is currently doing so and setup three variables for every
transaction:
1) an identifier for the the source database
2) the LSN at which the replayed transaction has committed remotely
3) the time at which the replayed transaction has committed remotelyWhen the transaction then commits the commit record will set the
XACT_CONTAINS_ORIGIN flag to ->xinfo and will add that data to the end
of the commit record. During crash recovery the startup process will
remember the newest LSN for each remote database in shared memory.This way, after a crash, restart, disconnect the replay process can look
into shared memory and check how far it has already replayed and restart
seamlessly. With minimal effort.We previously discussed the topic and some were very adverse to using
any sort of numeric node identifiers across systems and suggested that
those should only be used internally. So what the attached patch does is
to add a new shared system catalog called 'pg_replication_identifier'
(suggestions for a better name welcome) which translates a number of
identifying traits into a numeric identifier.
The set of identifiers currently are:
* the sysid of the remote system, combined with the remote TLI
* the oid of the local database
* the oid of the remote database
* an optional name
but that's just what we needed in our multimaster prototype, and not
what I necessarily think is correct.The added API (which surely need some work, I am not particularly happy
with the naming of functions for one) basically consists of two parts:
1) functions to query/create replication identifiers:
* GetReplicationIdentifier(identifying traits) - search for a numeric replication identifier
* CreateReplicationIdentifier(identifying traits) - creates a numeric replication identifier
* GetReplicationInfoByIdentifier(numeric identifier) - returns identifying traits2) functions to query/manipulate replication progress:
* AdvanceReplicationIdentifier(node, local_lsn, remote_lsn)
* XLogRecPtr RemoteCommitFromReplicationIdentifier(node)Internally the code also maintains some on-disk data which is updated
during checkpoints to store the replication progress, otherwise it'd
vanish if we shutdown gracefully ;).The attached code also integrates with the "commit timestamp" module
that Alvaro submitted ([2]). Everytime a remote transaction is committed
we store a) the remote commit's timestamp, b) the origin node id in it.
That allows to relatively easily build multimaster systems with conflict
resolution ontop, since whenever there's a conflict the originating
node, and originating commit timestamp for a row can be queried
efficiently.Having information about the origin of a change/transaction allows to
implement complex replication topologies since the information is
available to changeset extration output plugins.
It allows to do write plugins that:
* decode all changes, independent from the system they were originally
executed on by the user
* decode changes generated locally, but none from remote systems
* pick and choose between those, say only decode those the receiving
system isn't replicating from itselfQuestions are:
* Which identifying traits do we want to use to identify nodes?
* How do we want to manipulate replication identifiers? Currently they
can only be manipulated by using C functions, which is fine for some users,
but probably not for others?
* Do we want to allow setting (remote_lsn, remote_timestamp,
remote_node_id) via SQL? Currently the remote_node_id can be set as a
GUC, but the other's can't. They probably should be a function that
can be called instead of GUCs?
A way of advancing the replication pointer via SQL would be nice,
otherwise I'll just have to write my own C function that I will invoke
via SQL (which sin't hard but everyone would need to do the same)
* Suggestions for better names!
* Would slony et al need something ontop to use this?
In the slony world we identifer nodes with a 32 bit integer. I think
the idea is that I'm going to have to pass arguments into
+extern RepNodeId GetReplicationIdentifier(uint64 remotesysid, Oid remotetli,
+ Oid remotedb, Name riname,
+ Oid rilocaldb);
to map my slony concept of a node id to a 16 bit "node id" that is only
useful on the local system. In additon to a slony 32 bit node id I have
a conninfo that I can use to contact that node. I think the slon would
need to connect to the remote node with that conninfo (which it does
anyway) and get the remote oid's and then use the slony node_id
converted to a string as the "riname" value.
I would then have to invoke AdvanceReplicationIdentifier at some point
before I issue the commit.
What does building up node_id key from
(sysid, tlid, remote_dbid, local_dbid, name) get us over just mapping from an arbitrary name field to a 16 bit node_id ?
I agree with the other comments on the thread that letting the replication system figure out its own unique naming is better. If we were going t come up with a schema then I am also not sure if using the remote TLI as part of the node key is a good idea.
Todo:
* cleanup/naming
* Set returning function to see the replication progress
* remove old checkpoint filesNote that this only applies a) ontop the changeset extraction code b)
the commit timestamp code. The 'replication-identifiers' git branch
([3]) contains all integrated together.Comments welcome!
Greetings,
Andres Freund
[1]http://archives.postgresql.org/message-id/20131114134647.GA26172%40alap2.anarazel.de
[2]http://archives.postgresql.org/message-id/20131022221600.GE4987%40eldon.alvh.no-ip.org
[3]http://git.postgresql.org/gitweb/?p=users/andresfreund/postgres.git;a=shortlog;h=refs/heads/replication-identifiers
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Tue, Nov 19, 2013 at 1:20 PM, Andres Freund <andres@2ndquadrant.com> wrote:
On 2013-11-19 12:47:29 -0500, Robert Haas wrote:
On Tue, Nov 19, 2013 at 11:57 AM, Andres Freund <andres@2ndquadrant.com> wrote:
Agreed. As an alternative we could just have a single - probably longer
than NAMEDATALEN - string to identify replication progress and rely on
the users of the facility to build the identifier automatically
themselves using components that are helpful in their system.I tend to feel like a generic identifier would be better. I'm not
sure why something like a UUID wouldn't be enough, though.
Arbitrary-length identifiers will be bad for performance, and 128 bits
ought to be enough for anyone.That's what I had suggested to some people originally and the response
was, well, somewhat unenthusiastic. It's not that easy to assign them in
a meaningful automated manner. How do you automatically assign a pg
cluster an id?
/dev/urandom
But yes, maybe the answer is to balk on that part, let the users figure
out what's best, and then only later implement more policy based on that
experience.WRT performance: I agree that fixed-width identifiers are more
performant, that's why I went for them, but I am not sure it's that
important. The performance sensitive parts should all be done using the
internal id the identifier maps to, not the public one.
But I thought the internal identifier was exactly what we're creating.
I think we should also take note of Steve Singer's comments. Perhaps
this entire endeavor is premature.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Hi,
On 2013-11-19 18:49:27 -0500, Steve Singer wrote:
But for that the receiving side needs to know up to where changes have
been applied. One relatively easy solution for that is that the
receiving side does something like:
UPDATE replication_progress SET lsn = '0/10000600' WHERE source_id = ...;
before the end of every replayed transaction. But that obviously will
quickly cause bloat.I don't see how this is going to cause any more bloat than what
trigger-based slony does today with sl_confirm and I don't hear a lot of
complaints about that being a big problem.
FWIW, bloat on slony's tables (including sl_confirm) is one of the major
reasons I've seen people move away from slony for production, and use it
only for upgrades.
It's only really a problem if you have longrunning transactions on the
standby, but that's a pretty major use-case of having replicas.
This might be because slony doesn't do a commit on the replica for
every transaction but groups the transactions together, logical slony
will behave the same way where we would only commit on SYNC
transactions.
But yes, the grouping of transactions certainly makes for a major
difference. I don't think we want to force solutions to commit
transactions in batches. Not the least because that obviously prohibits
using a standby as a synchronous replica.
* Do we want to allow setting (remote_lsn, remote_timestamp,
remote_node_id) via SQL? Currently the remote_node_id can be set as a
GUC, but the other's can't. They probably should be a function that
can be called instead of GUCs?A way of advancing the replication pointer via SQL would be nice, otherwise
I'll just have to write my own C function that I will invoke via SQL (which
sin't hard but everyone would need to do the same)
But don't you already essentially perform the actual inserts via C in
new slonys? That's mainly the reason I wasn't sure it's needed.
But then, providing a function to do that setup isn't hard.
What does building up node_id key from (sysid, tlid, remote_dbid,
local_dbid, name) get us over just mapping from an arbitrary name
field to a 16 bit node_id ?
It avoids the need to manually assign ids to systems in many cases. I've
seen people complain about that a fair bit.
But it seems pretty clear that a more arbitrary identifier is preferred
so far, so I'll go for that.
Thanks for the comments,
Andres Freund
--
Andres Freund http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 2013-11-19 14:16:04 +0000, Greg Stark wrote:
On Thu, Nov 14, 2013 at 5:26 PM, Andres Freund <andres@2ndquadrant.com>wrote:
But for that the receiving side needs to know up to where changes have
been applied. One relatively easy solution for that is that the
receiving side does something like:
UPDATE replication_progress SET lsn = '0/10000600' WHERE source_id = ...;
before the end of every replayed transaction. But that obviously will
quickly cause bloat.Our solution to that is that a replaying process can tell the backend
that it is currently doing so and setup three variables for every
transaction:This is a pretty massive design decision to hinge on such a minor
implementation detail of table bloat (which I don't think would actually be
an issue anyway -- isn't that what we have HOT for?)
Not sure what HOT is going to help with? Even with HOT we can only
remove tuples that are invisible to everyone. If there are longrunning
queries on the standby - and running analytics on standbys is a rather
frequent use-case - that won't be the case for a long, long time.
Fundamentally the question here is where to keep all the book-keeping state
about replicas, in a central repository in the master or locally in each
replica. At first blush it seems obvious to me that locally in each replica
is the more flexible choice.
This really is about storing the state of apply on each replica
efficiently.
Imagine the standby just received data for a transaction x and has
replayed it locally. If it crashes in that moment, it needs to know
whether that transaction has successfully committed or not. And that has
to work even if the commit succeeded internally but hasn't yet returned
success!
So, what this provides is a facility to say 'hey, this local transaction
was at X on the source Y' and a way to get the last X for each Y at the
end of crash recovery.
Then the replication solution can restart replication from X onwards for
each Y.
Greetings,
Andres Freund
--
Andres Freund http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 2013-11-20 15:05:17 -0500, Robert Haas wrote:
That's what I had suggested to some people originally and the response
was, well, somewhat unenthusiastic. It's not that easy to assign them in
a meaningful automated manner. How do you automatically assign a pg
cluster an id?/dev/urandom
Well yes. But then you need a way to store and change that random id for
each cluster.
Anyway, the preference is clear, so I am going to go for that in v2. I
am not sure about the type of the public identifier yet, I'll think a
bit about it.
But yes, maybe the answer is to balk on that part, let the users figure
out what's best, and then only later implement more policy based on that
experience.WRT performance: I agree that fixed-width identifiers are more
performant, that's why I went for them, but I am not sure it's that
important. The performance sensitive parts should all be done using the
internal id the identifier maps to, not the public one.But I thought the internal identifier was exactly what we're creating.
Sure. But how often are we a) going to create such an identifier b)
looking it up? Hopefully both will be rather infrequent operations.
Greetings,
Andres Freund
--
Andres Freund http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Thu, Nov 21, 2013 at 6:15 AM, Andres Freund <andres@2ndquadrant.com> wrote:
WRT performance: I agree that fixed-width identifiers are more
performant, that's why I went for them, but I am not sure it's that
important. The performance sensitive parts should all be done using the
internal id the identifier maps to, not the public one.But I thought the internal identifier was exactly what we're creating.
Sure. But how often are we a) going to create such an identifier b)
looking it up?
Never. Make that the replication solution's problem. Make the core
support deal only with UUIDs or pairs of 64-bit integers or something
like that, and let the replication solution decide what they mean.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 2013-11-21 08:22:05 -0500, Robert Haas wrote:
On Thu, Nov 21, 2013 at 6:15 AM, Andres Freund <andres@2ndquadrant.com> wrote:
WRT performance: I agree that fixed-width identifiers are more
performant, that's why I went for them, but I am not sure it's that
important. The performance sensitive parts should all be done using the
internal id the identifier maps to, not the public one.But I thought the internal identifier was exactly what we're creating.
Sure. But how often are we a) going to create such an identifier b)
looking it up?Never. Make that the replication solution's problem. Make the core
support deal only with UUIDs or pairs of 64-bit integers or something
like that, and let the replication solution decide what they mean.
I think we're misunderstanding each other. I was commenting on your fear
that strings longer than NAMEDATALEN or something would be bad for
performance - which I don't think is very relevant because the lookups
from "public" to "internal" identifier shouldn't be in any performance
critical path.
I personally would prefer a string because it'd allow me to build an
identifier using the criterions I'd originally outlined outside of this
infrastructure.
Greetings,
Andres Freund
--
Andres Freund http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 11/21/13, 6:15 AM, Andres Freund wrote:
On 2013-11-20 15:05:17 -0500, Robert Haas wrote:
That's what I had suggested to some people originally and the response
was, well, somewhat unenthusiastic. It's not that easy to assign them in
a meaningful automated manner. How do you automatically assign a pg
cluster an id?/dev/urandom
Well yes. But then you need a way to store and change that random id for
each cluster.
You can use a v3 UUID, which is globally reproducible.
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Thu, Nov 21, 2013 at 8:26 AM, Andres Freund <andres@2ndquadrant.com> wrote:
On 2013-11-21 08:22:05 -0500, Robert Haas wrote:
On Thu, Nov 21, 2013 at 6:15 AM, Andres Freund <andres@2ndquadrant.com> wrote:
WRT performance: I agree that fixed-width identifiers are more
performant, that's why I went for them, but I am not sure it's that
important. The performance sensitive parts should all be done using the
internal id the identifier maps to, not the public one.But I thought the internal identifier was exactly what we're creating.
Sure. But how often are we a) going to create such an identifier b)
looking it up?Never. Make that the replication solution's problem. Make the core
support deal only with UUIDs or pairs of 64-bit integers or something
like that, and let the replication solution decide what they mean.I think we're misunderstanding each other. I was commenting on your fear
that strings longer than NAMEDATALEN or something would be bad for
performance - which I don't think is very relevant because the lookups
from "public" to "internal" identifier shouldn't be in any performance
critical path.I personally would prefer a string because it'd allow me to build an
identifier using the criterions I'd originally outlined outside of this
infrastructure.
Yeah, there's some confusion here. I don't care at all about the
performance characteristics of long strings here, because we shouldn't
be using them anywhere in the core code. What I do care about is
making sure that whatever core support we use here is agnostic to how
the internal identifiers - relatively short bit strings - are
generated. The patch as proposed puts forward a particular way of
doing that, and I think that neither that method *nor any other*
should be part of core.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 2013-11-22 14:43:15 -0500, Robert Haas wrote:
The patch as proposed puts forward a particular way of
doing that, and I think that neither that method *nor any other*
should be part of core.
Working on something like that, updated the patch state to "waiting on
author".
Thanks,
Andres Freund
--
Andres Freund http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers