>From 83b6821ceb9be613f2bb377f3a1d9dc459b3b4b4 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Fri, 22 Feb 2013 17:43:27 +0100
Subject: [PATCH] Introduce "replication_identifiers" to keep track of remote
 nodes.

Replication identifiers can be used to track & lookup remote nodes identified
via (sysid, tlid, remote_dbid, local_dbid, name) and map that tuple to a local
uint16.

Keyed by that replication identifier the progress of replication from
that system is tracked in a crashsafe manner.

Support for tracking that via output plugins is added as well.

Needs a catversion bump.
---
 src/backend/access/rmgrdesc/xactdesc.c             |  16 +-
 src/backend/access/transam/xact.c                  |  71 ++-
 src/backend/access/transam/xlog.c                  |  10 +
 src/backend/catalog/Makefile                       |   2 +-
 src/backend/catalog/catalog.c                      |   8 +-
 src/backend/replication/logical/Makefile           |   3 +-
 src/backend/replication/logical/decode.c           |   6 +-
 src/backend/replication/logical/logical.c          |   9 +-
 src/backend/replication/logical/reorderbuffer.c    |   3 +-
 .../replication/logical/replication_identifier.c   | 647 +++++++++++++++++++++
 src/backend/storage/ipc/ipci.c                     |   3 +
 src/backend/utils/cache/syscache.c                 |  23 +
 src/backend/utils/misc/guc.c                       |  47 +-
 src/backend/utils/misc/postgresql.conf.sample      |   2 +-
 src/bin/initdb/initdb.c                            |   1 +
 src/bin/pg_resetxlog/pg_resetxlog.c                |   2 +
 src/include/access/xact.h                          |   9 +-
 src/include/access/xlog.h                          |   2 +
 src/include/access/xlogdefs.h                      |   6 +
 src/include/catalog/indexing.h                     |   6 +
 src/include/catalog/pg_replication_identifier.h    |  92 +++
 src/include/replication/logical.h                  |   7 +
 src/include/replication/reorderbuffer.h            |  10 +-
 src/include/replication/replication_identifier.h   |  42 ++
 src/include/utils/syscache.h                       |   2 +
 src/test/regress/expected/sanity_check.out         |   1 +
 26 files changed, 1006 insertions(+), 24 deletions(-)
 create mode 100644 src/backend/replication/logical/replication_identifier.c
 create mode 100644 src/include/catalog/pg_replication_identifier.h
 create mode 100644 src/include/replication/replication_identifier.h

diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index 2caf5a0..2fa7e78 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -26,9 +26,12 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
 {
 	int			i;
 	TransactionId *subxacts;
+	SharedInvalidationMessage *msgs;
 
 	subxacts = (TransactionId *) &xlrec->xnodes[xlrec->nrels];
 
+	msgs = (SharedInvalidationMessage *) &subxacts[xlrec->nsubxacts];
+
 	appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
 
 	if (xlrec->nrels > 0)
@@ -50,9 +53,6 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
 	}
 	if (xlrec->nmsgs > 0)
 	{
-		SharedInvalidationMessage *msgs;
-
-		msgs = (SharedInvalidationMessage *) &subxacts[xlrec->nsubxacts];
 
 		if (XactCompletionRelcacheInitFileInval(xlrec->xinfo))
 			appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u",
@@ -81,6 +81,16 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
 				appendStringInfo(buf, " unknown id %d", msg->id);
 		}
 	}
+	if (xlrec->xinfo & XACT_CONTAINS_ORIGIN)
+	{
+		xl_xact_origin *origin = (xl_xact_origin *) &(msgs[xlrec->nmsgs]);
+
+		appendStringInfo(buf, " origin %u, lsn %X/%X",
+						 origin->origin_node_id,
+						 (uint32)(origin->origin_lsn >> 32),
+						 (uint32)origin->origin_lsn);
+	}
+
 }
 
 static void
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index ef9d42d..c05526d 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -37,8 +37,10 @@
 #include "libpq/be-fsstubs.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "replication/logical.h"
 #include "replication/walsender.h"
 #include "replication/syncrep.h"
+#include "replication/replication_identifier.h"
 #include "storage/fd.h"
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
@@ -1072,11 +1074,13 @@ RecordTransactionCommit(void)
 		/*
 		 * Do we need the long commit record? If not, use the compact format.
 		 */
-		if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit)
+		if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval ||
+			forceSyncCommit || guc_replication_origin_id != InvalidRepNodeId)
 		{
-			XLogRecData rdata[4];
+			XLogRecData rdata[5];
 			int			lastrdata = 0;
 			xl_xact_commit xlrec;
+			xl_xact_origin origin;
 
 			/*
 			 * Set flags required for recovery processing of commits.
@@ -1124,6 +1128,21 @@ RecordTransactionCommit(void)
 				rdata[3].buffer = InvalidBuffer;
 				lastrdata = 3;
 			}
+			/* dump transaction origin information */
+			if (guc_replication_origin_id != InvalidRepNodeId)
+			{
+				Assert(replication_origin_lsn != InvalidXLogRecPtr);
+				xlrec.xinfo |= XACT_CONTAINS_ORIGIN;
+				origin.origin_node_id = guc_replication_origin_id;
+				origin.origin_lsn = replication_origin_lsn;
+				origin.origin_timestamp = replication_origin_timestamp;
+
+				rdata[lastrdata].next = &(rdata[4]);
+				rdata[4].data = (char *) &origin;
+				rdata[4].len = sizeof(xl_xact_origin);
+				rdata[4].buffer = InvalidBuffer;
+				lastrdata = 4;
+			}
 			rdata[lastrdata].next = NULL;
 
 			(void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, rdata);
@@ -1154,13 +1173,19 @@ RecordTransactionCommit(void)
 		}
 	}
 
+	/* record plain commit ts if not replaying remote actions */
+	if (guc_replication_origin_id == InvalidRepNodeId)
+		replication_origin_timestamp = xactStopTimestamp;
+
 	/*
 	 * We don't need to log the commit timestamp separately since the commit
 	 * record logged above has all the necessary action to set the timestamp
 	 * again.
 	 */
 	TransactionTreeSetCommitTimestamp(xid, nchildren, children,
-									  xactStopTimestamp, 0, false);
+									  replication_origin_timestamp,
+									  guc_replication_origin_id,
+									  false);
 
 	/*
 	 * Check if we want to commit asynchronously.  We can allow the XLOG flush
@@ -1242,9 +1267,11 @@ RecordTransactionCommit(void)
 	if (wrote_xlog)
 		SyncRepWaitForLSN(XactLastRecEnd);
 
+	/* remember end of last commit record */
+	XactLastCommitEnd = XactLastRecEnd;
+
 	/* Reset XactLastRecEnd until the next transaction writes something */
 	XactLastRecEnd = 0;
-
 cleanup:
 	/* Clean up local data */
 	if (rels)
@@ -4614,10 +4641,12 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
 						  SharedInvalidationMessage *inval_msgs, int nmsgs,
 						  RelFileNode *xnodes, int nrels,
 						  Oid dbId, Oid tsId,
-						  uint32 xinfo)
+						  uint32 xinfo,
+						  xl_xact_origin *origin)
 {
 	TransactionId max_xid;
 	int			i;
+	RepNodeId	origin_node_id = InvalidRepNodeId;
 
 	max_xid = TransactionIdLatest(xid, nsubxacts, sub_xids);
 
@@ -4637,9 +4666,26 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
 		LWLockRelease(XidGenLock);
 	}
 
+	Assert(!!(xinfo & XACT_CONTAINS_ORIGIN) == (origin != NULL));
+
+	if (xinfo & XACT_CONTAINS_ORIGIN)
+	{
+		origin_node_id = origin->origin_node_id;
+		commit_time = origin->origin_timestamp;
+	}
+
 	/* Set the transaction commit time */
 	TransactionTreeSetCommitTimestamp(xid, nsubxacts, sub_xids,
-									  commit_time, 0, false);
+									  commit_time,
+									  origin_node_id, false);
+
+	if (xinfo & XACT_CONTAINS_ORIGIN)
+	{
+		/* recover apply progress */
+		AdvanceReplicationIdentifier(origin->origin_node_id,
+									 origin->origin_lsn,
+									 lsn);
+	}
 
 	if (standbyState == STANDBY_DISABLED)
 	{
@@ -4754,19 +4800,25 @@ xact_redo_commit(xl_xact_commit *xlrec,
 {
 	TransactionId *subxacts;
 	SharedInvalidationMessage *inval_msgs;
-
+	xl_xact_origin *origin = NULL;
 	/* subxid array follows relfilenodes */
 	subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
 	/* invalidation messages array follows subxids */
 	inval_msgs = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
 
+	if (xlrec->xinfo & XACT_CONTAINS_ORIGIN)
+	{
+		origin = (xl_xact_origin *) &(inval_msgs[xlrec->nmsgs]);
+	}
+
 	xact_redo_commit_internal(xid, lsn, xlrec->xact_time,
 							  subxacts, xlrec->nsubxacts,
 							  inval_msgs, xlrec->nmsgs,
 							  xlrec->xnodes, xlrec->nrels,
 							  xlrec->dbId,
 							  xlrec->tsId,
-							  xlrec->xinfo);
+							  xlrec->xinfo,
+							  origin);
 }
 
 /*
@@ -4782,7 +4834,8 @@ xact_redo_commit_compact(xl_xact_commit_compact *xlrec,
 							  NULL, 0,	/* relfilenodes */
 							  InvalidOid,		/* dbId */
 							  InvalidOid,		/* tsId */
-							  0);		/* xinfo */
+							  0,		/* xinfo */
+							  NULL		/* origin */);
 }
 
 /*
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 585fe67..c900940 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -42,6 +42,7 @@
 #include "postmaster/bgwriter.h"
 #include "postmaster/startup.h"
 #include "replication/logical.h"
+#include "replication/replication_identifier.h"
 #include "replication/snapbuild.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
@@ -280,6 +281,8 @@ static XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr;
 
 XLogRecPtr	XactLastRecEnd = InvalidXLogRecPtr;
 
+XLogRecPtr	XactLastCommitEnd = InvalidXLogRecPtr;
+
 /*
  * RedoRecPtr is this backend's local copy of the REDO record pointer
  * (which is almost but not quite the same as a pointer to the most recent
@@ -1055,6 +1058,7 @@ begin:;
 	rechdr->xl_len = len;		/* doesn't include backup blocks */
 	rechdr->xl_info = info;
 	rechdr->xl_rmid = rmid;
+	rechdr->xl_origin_id = guc_replication_origin_id;
 	rechdr->xl_prev = InvalidXLogRecPtr;
 	COMP_CRC32(rdata_crc, ((char *) rechdr), offsetof(XLogRecord, xl_prev));
 
@@ -6353,6 +6357,11 @@ StartupXLOG(void)
 	StartupLogicalDecoding(checkPoint.redo);
 
 	/*
+	 * Recover knowledge about replay progress of known replication partners.
+	 */
+	StartupReplicationIdentifier(checkPoint.redo);
+
+	/*
 	 * Initialize unlogged LSN. On a clean shutdown, it's restored from the
 	 * control file. On recovery, all unlogged relations are blown away, so
 	 * the unlogged LSN counter can be reset too.
@@ -8428,6 +8437,7 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
 	CheckPointSnapBuild();
 	CheckpointLogicalRewriteHeap();
 	CheckPointBuffers(flags);	/* performs all required fsyncs */
+	CheckPointReplicationIdentifier(checkPointRedo);
 	/* We deliberately delay 2PC checkpointing as long as possible */
 	CheckPointTwoPhase(checkPointRedo);
 }
diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile
index a974bd5..3a20672 100644
--- a/src/backend/catalog/Makefile
+++ b/src/backend/catalog/Makefile
@@ -39,7 +39,7 @@ POSTGRES_BKI_SRCS = $(addprefix $(top_srcdir)/src/include/catalog/,\
 	pg_ts_config.h pg_ts_config_map.h pg_ts_dict.h \
 	pg_ts_parser.h pg_ts_template.h pg_extension.h \
 	pg_foreign_data_wrapper.h pg_foreign_server.h pg_user_mapping.h \
-	pg_foreign_table.h \
+	pg_foreign_table.h pg_replication_identifier.h \
 	pg_default_acl.h pg_seclabel.h pg_shseclabel.h pg_collation.h pg_range.h \
 	toasting.h indexing.h \
     )
diff --git a/src/backend/catalog/catalog.c b/src/backend/catalog/catalog.c
index 885ba27..d9ecf8a 100644
--- a/src/backend/catalog/catalog.c
+++ b/src/backend/catalog/catalog.c
@@ -32,6 +32,7 @@
 #include "catalog/pg_namespace.h"
 #include "catalog/pg_pltemplate.h"
 #include "catalog/pg_db_role_setting.h"
+#include "catalog/pg_replication_identifier.h"
 #include "catalog/pg_shdepend.h"
 #include "catalog/pg_shdescription.h"
 #include "catalog/pg_shseclabel.h"
@@ -278,7 +279,8 @@ IsSharedRelation(Oid relationId)
 		relationId == SharedDependRelationId ||
 		relationId == SharedSecLabelRelationId ||
 		relationId == TableSpaceRelationId ||
-		relationId == DbRoleSettingRelationId)
+		relationId == DbRoleSettingRelationId ||
+		relationId == ReplicationIdentifierRelationId)
 		return true;
 	/* These are their indexes (see indexing.h) */
 	if (relationId == AuthIdRolnameIndexId ||
@@ -294,7 +296,9 @@ IsSharedRelation(Oid relationId)
 		relationId == SharedSecLabelObjectIndexId ||
 		relationId == TablespaceOidIndexId ||
 		relationId == TablespaceNameIndexId ||
-		relationId == DbRoleSettingDatidRolidIndexId)
+		relationId == DbRoleSettingDatidRolidIndexId ||
+		relationId ==  ReplicationLocalIdIndex ||
+		relationId ==  ReplicationRemoteIndex)
 		return true;
 	/* These are their toast tables and toast indexes (see toasting.h) */
 	if (relationId == PgShdescriptionToastTable ||
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index 6fae278..f24dbbe 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -14,7 +14,8 @@ include $(top_builddir)/src/Makefile.global
 
 override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
 
-OBJS = decode.o logical.o logicalfuncs.o reorderbuffer.o snapbuild.o
+OBJS = decode.o logical.o logicalfuncs.o reorderbuffer.o replication_identifier.o \
+	snapbuild.o
 
 include $(top_srcdir)/src/backend/common.mk
 
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 655feca..50ccef9 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -496,7 +496,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 
 	/* replay actions of all transaction + subtransactions in order */
 	ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
-						commit_time);
+						commit_time, buf->record.xl_origin_id);
 }
 
 /*
@@ -540,6 +540,7 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
 	change = ReorderBufferGetChange(ctx->reorder);
 	change->action = REORDER_BUFFER_CHANGE_INSERT;
+	change->origin_id = r->xl_origin_id;
 	memcpy(&change->tp.relnode, &xlrec->target.node, sizeof(RelFileNode));
 
 	if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
@@ -580,6 +581,7 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
 	change = ReorderBufferGetChange(ctx->reorder);
 	change->action = REORDER_BUFFER_CHANGE_UPDATE;
+	change->origin_id = r->xl_origin_id;
 	memcpy(&change->tp.relnode, &xlrec->target.node, sizeof(RelFileNode));
 
 	data = (char *) &xlhdr->header;
@@ -634,6 +636,7 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
 	change = ReorderBufferGetChange(ctx->reorder);
 	change->action = REORDER_BUFFER_CHANGE_DELETE;
+	change->origin_id = r->xl_origin_id;
 
 	memcpy(&change->tp.relnode, &xlrec->target.node, sizeof(RelFileNode));
 
@@ -689,6 +692,7 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
 		change = ReorderBufferGetChange(ctx->reorder);
 		change->action = REORDER_BUFFER_CHANGE_INSERT;
+		change->origin_id = r->xl_origin_id;
 		memcpy(&change->tp.relnode, &xlrec->node, sizeof(RelFileNode));
 
 		/*
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 10041cd..6a028c1 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -68,8 +68,12 @@ LogicalDecodingCtlData *LogicalDecodingCtl = NULL;
 /* My slot for logical rep in the shared memory array */
 LogicalDecodingSlot *MyLogicalDecodingSlot = NULL;
 
-/* user settable parameters */
 int			max_logical_slots = 0;		/* the maximum number of logical slots */
+RepNodeId	guc_replication_node_id = InvalidRepNodeId; /* local node id */
+RepNodeId	guc_replication_origin_id = InvalidRepNodeId; /* assumed identity */
+
+XLogRecPtr	replication_origin_lsn;
+TimestampTz	replication_origin_timestamp;
 
 static void LogicalSlotKill(int code, Datum arg);
 
@@ -809,7 +813,8 @@ StartupLogicalDecoding(XLogRecPtr checkPointRedo)
 
 		/* one of our own directories */
 		if (strcmp(logical_de->d_name, "snapshots") == 0 ||
-			strcmp(logical_de->d_name, "mappings") == 0)
+			strcmp(logical_de->d_name, "mappings") == 0 ||
+			strcmp(logical_de->d_name, "checkpoints") == 0)
 			continue;
 
 		/* we crashed while a slot was being setup or deleted, clean up */
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 05a5020..979fa23 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1261,7 +1261,7 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
 void
 ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
-					TimestampTz commit_time)
+					TimestampTz commit_time, RepNodeId origin)
 {
 	ReorderBufferTXN *txn;
 	ReorderBufferIterTXNState *iterstate = NULL;
@@ -1282,6 +1282,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 	txn->final_lsn = commit_lsn;
 	txn->end_lsn = end_lsn;
 	txn->commit_time = commit_time;
+	txn->origin_id = origin;
 
 	/* serialize the last bunch of changes if we need start earlier anyway */
 	if (txn->nentries_mem != txn->nentries)
diff --git a/src/backend/replication/logical/replication_identifier.c b/src/backend/replication/logical/replication_identifier.c
new file mode 100644
index 0000000..d455935
--- /dev/null
+++ b/src/backend/replication/logical/replication_identifier.c
@@ -0,0 +1,647 @@
+/*-------------------------------------------------------------------------
+ *
+ * replication_identifier.c
+ *	  Logical Replication Node Identifier and progress persistency support.
+ *
+ * Copyright (c) 2013, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/replication/logical/replication_identifier.c
+ *
+ */
+
+#include "postgres.h"
+
+#include <unistd.h>
+
+#include "access/heapam.h"
+#include "access/htup_details.h"
+#include "access/xact.h"
+#include "catalog/indexing.h"
+#include "replication/replication_identifier.h"
+#include "replication/logical.h"
+#include "storage/fd.h"
+#include "storage/copydir.h"
+#include "utils/syscache.h"
+#include "utils/rel.h"
+
+/*
+ * Replay progress of a single remote node.
+ */
+typedef struct ReplicationState
+{
+	/*
+	 * Local identifier for the remote node.
+	 */
+	RepNodeId	local_identifier;
+
+	/*
+	 * Location of the latest commit from the remote side.
+	 */
+	XLogRecPtr	remote_lsn;
+
+	/*
+	 * Remember the local lsn of the commit record so we can XLogFlush() to it
+	 * during a checkpoint so we know the commit record actually is safe on
+	 * disk.
+	 */
+	XLogRecPtr	local_lsn;
+} ReplicationState;
+
+/*
+ * Base address into a shared memory array of replication states of size
+ * max_logical_slots.
+ * XXX: Should we use a separate variable to size this than max_logical_slots?
+ */
+static ReplicationState *ReplicationStates;
+
+/*
+ * Backend-local, cached element from ReplicationStates for use in a backend
+ * replaying remote commits, so we don't have to search ReplicationStates for
+ * the backends current RepNodeId.
+ */
+static ReplicationState *local_replication_state = NULL;
+
+/* Magic for on disk files. */
+#define REPLICATION_STATE_MAGIC (uint32)0x1257DADE
+
+/* XXX: move to c.h? */
+#ifndef UINT16_MAX
+#define UINT16_MAX ((1<<16) - 1)
+#else
+#if UINT16_MAX != ((1<<16) - 1)
+#error "uh, wrong UINT16_MAX?"
+#endif
+#endif
+
+/*
+ * Check for a persistent repication identifier identified by remotesysid,
+ * remotetli, remotedb, riname, rilocaldb.
+ *
+ * Returns InvalidOid if the node isn't known yet.
+ */
+RepNodeId
+GetReplicationIdentifier(uint64 remotesysid, Oid remotetli, Oid remotedb,
+						 Name riname, Oid rilocaldb)
+{
+	Oid riident = InvalidOid;
+	HeapTuple tuple;
+	Form_pg_replication_identifier ident;
+	NameData sysid;
+
+	sprintf(NameStr(sysid), UINT64_FORMAT "-%u", remotesysid, remotetli);
+
+	tuple = SearchSysCache4(REPLIDREMOTE,
+							NameGetDatum(&sysid),
+							ObjectIdGetDatum(remotedb),
+							ObjectIdGetDatum(rilocaldb),
+							NameGetDatum(riname));
+	if (HeapTupleIsValid(tuple))
+	{
+		ident = (Form_pg_replication_identifier)GETSTRUCT(tuple);
+		riident = ident->riident;
+		ReleaseSysCache(tuple);
+	}
+	return riident;
+}
+
+/*
+ * Create a persistent replication identifier.
+ *
+ * Needs to be called in a transaction.
+ */
+RepNodeId
+CreateReplicationIdentifier(uint64 remotesysid, Oid remotetli, Oid remotedb,
+							Name riname, Oid rilocaldb)
+{
+	Oid riident;
+	HeapTuple tuple = NULL;
+	NameData sysid;
+	Relation rel;
+
+	Assert(IsTransactionState());
+
+	sprintf(NameStr(sysid), UINT64_FORMAT "-%u", remotesysid, remotetli);
+
+	/*
+	 * We need the numeric replication identifiers to be 16bit wide, so we
+	 * cannot rely on the normal oid allocation.
+	 *
+	 * Lock table against modifications. This is quite heavyweight, but
+	 * shouldn't happen frequently.
+	 */
+	rel = heap_open(ReplicationIdentifierRelationId, ExclusiveLock);
+
+	for (riident = InvalidOid + 1; riident <= UINT16_MAX; riident++)
+	{
+		bool		nulls[Natts_pg_replication_identifier];
+		Datum		values[Natts_pg_replication_identifier];
+
+		/*
+		 * FIXME: do an index lookup using SnapshotDirty instead. This
+		 * pollutes the cache if there are lots of nodes.
+		 */
+		tuple = GetReplicationInfoByIdentifier(riident);
+
+		if (tuple != NULL)
+		{
+			ReleaseSysCache(tuple);
+			continue;
+		}
+
+		/* ok, found an unused riident */
+		memset(&nulls, 0, sizeof(nulls));
+
+		values[Anum_pg_replication_riident -1] = ObjectIdGetDatum(riident);
+		values[Anum_pg_replication_riremotesysid - 1] = NameGetDatum(&sysid);
+		values[Anum_pg_replication_rilocaldb - 1] = ObjectIdGetDatum(rilocaldb);
+		values[Anum_pg_replication_riremotedb - 1] = ObjectIdGetDatum(remotedb);
+		values[Anum_pg_replication_riname - 1] = NameGetDatum(riname);
+
+		tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
+		simple_heap_insert(rel, tuple);
+		CatalogUpdateIndexes(rel, tuple);
+		CommandCounterIncrement();
+		break;
+	}
+
+	/*
+	 * only release at end of transaction, so we don't have to worry about
+	 * race conditions with other transactions trying to insert a new
+	 * identifier. Acquiring a new identifier should be a fairly infrequent
+	 * thing, so this seems fine.
+	 * FIXME: If we use SnapshotDirty above, this isn't necessary anymore.
+	 */
+	heap_close(rel, NoLock);
+
+	if (tuple == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+				 errmsg("no free replication id could be found")));
+
+	return riident;
+}
+
+/*
+ * Lookup pg_replication_identifier tuple via its riident.
+ *
+ * The result needs to be ReleaseSysCache'ed
+ */
+HeapTuple
+GetReplicationInfoByIdentifier(RepNodeId riident)
+{
+	HeapTuple tuple;
+
+	Assert(OidIsValid((Oid) riident));
+	Assert(riident < UINT16_MAX);
+	tuple = SearchSysCache1(REPLIDIDENT,
+							ObjectIdGetDatum((Oid) riident));
+	return tuple;
+}
+
+Size
+ReplicationIdentifierShmemSize(void)
+{
+	Size		size = 0;
+
+	/*
+	 * FIXME: max_logical_slots is the wrong thing to use here, here we keep
+	 * the replay state of *remote* transactions.
+	 */
+	if (max_logical_slots == 0)
+		return size;
+
+	size = add_size(size,
+					mul_size(max_logical_slots, sizeof(ReplicationState)));
+	return size;
+}
+
+void
+ReplicationIdentifierShmemInit(void)
+{
+	bool		found;
+
+	if (max_logical_slots == 0)
+		return;
+
+	ReplicationStates = (ReplicationState *)
+		ShmemInitStruct("ReplicationIdentifierState",
+						ReplicationIdentifierShmemSize(),
+						&found);
+
+	if (!found)
+	{
+		MemSet(ReplicationStates, 0, ReplicationIdentifierShmemSize());
+	}
+}
+
+/* ---------------------------------------------------------------------------
+ * Perform a checkpoint of replication identifier's progress with respect to
+ * the replayed remote_lsn. Make sure that all transactions we refer to in the
+ * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
+ * if the transactions were originally committed asynchronously.
+ *
+ * We store checkpoints in the following format:
+ * +-------+-------------------------+-------------------------+-----+
+ * | MAGIC | struct ReplicationState | struct ReplicationState | ... | EOF
+ * +-------+-------------------------+-------------------------+-----+
+ *
+ * So its just the magic, followed by the statically sized
+ * ReplicationStates. Note that the maximum number of ReplicationStates is
+ * determined by max_logical_slots.
+ *
+ * FIXME: Add a CRC32 to the end.
+ * ---------------------------------------------------------------------------
+ */
+void
+CheckPointReplicationIdentifier(XLogRecPtr ckpt)
+{
+	char tmppath[MAXPGPATH];
+	char path[MAXPGPATH];
+	int fd;
+	int tmpfd;
+	int i;
+	uint32 magic = REPLICATION_STATE_MAGIC;
+
+	if (max_logical_slots == 0)
+		return;
+
+	/*
+	 * Write to a filename a LSN of the checkpoint's REDO pointer, so we can
+	 * deal with the checkpoint failing after
+	 * CheckPointReplicationIdentifier() finishing.
+	 */
+	sprintf(path, "pg_llog/checkpoints/%X-%X.ckpt",
+			(uint32)(ckpt >> 32), (uint32)ckpt);
+	sprintf(tmppath, "pg_llog/checkpoints/%X-%X.ckpt.tmp",
+			(uint32)(ckpt >> 32), (uint32)ckpt);
+
+	/* check whether file already exists */
+	fd = OpenTransientFile(path,
+						   O_RDONLY | PG_BINARY,
+						   0);
+
+	/* usual case, no checkpoint performed yet */
+	if (fd < 0 && errno == ENOENT)
+		;
+	else if (fd < 0)
+		ereport(PANIC,
+				(errcode_for_file_access(),
+				 errmsg("could not check replication state checkpoint \"%s\": %m",
+						path)));
+	/* already checkpointed before crash during a checkpoint or so */
+	else
+	{
+		CloseTransientFile(fd);
+		return;
+	}
+
+	/* make sure no old temp file is remaining */
+	if (unlink(tmppath) < 0 && errno != ENOENT)
+		ereport(PANIC, (errmsg("failed while unlinking %s", path)));
+
+	/*
+	 * no other backend can perform this at the same time, we're protected by
+	 * CheckpointLock.
+	 */
+	tmpfd = OpenTransientFile(tmppath,
+							  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
+							  S_IRUSR | S_IWUSR);
+	if (tmpfd < 0)
+		ereport(PANIC,
+				(errcode_for_file_access(),
+				 errmsg("could not create replication identifier checkpoint \"%s\": %m",
+						tmppath)));
+
+	/* write magic */
+	if ((write(tmpfd, &magic, sizeof(magic))) !=
+		sizeof(magic))
+	{
+		CloseTransientFile(tmpfd);
+		ereport(PANIC,
+				(errcode_for_file_access(),
+				 errmsg("could not write replication identifier checkpoint \"%s\": %m",
+						tmppath)));
+	}
+
+	/* write actual data */
+	for (i = 0; i < max_logical_slots; i++)
+	{
+		ReplicationState local_state;
+
+		if (ReplicationStates[i].local_identifier == InvalidRepNodeId)
+			continue;
+
+		local_state.local_identifier = ReplicationStates[i].local_identifier;
+		local_state.remote_lsn = ReplicationStates[i].remote_lsn;
+		local_state.local_lsn = InvalidXLogRecPtr;
+
+		/* make sure we only write out a commit that's persistent */
+		XLogFlush(ReplicationStates[i].local_lsn);
+
+		if ((write(tmpfd, &local_state, sizeof(ReplicationState))) !=
+			sizeof(ReplicationState))
+		{
+			CloseTransientFile(tmpfd);
+			ereport(PANIC,
+					(errcode_for_file_access(),
+					 errmsg("could not write replication identifier checkpoint \"%s\": %m",
+							tmppath)));
+		}
+	}
+
+	/* fsync the file */
+	if (pg_fsync(tmpfd) != 0)
+	{
+		CloseTransientFile(tmpfd);
+		ereport(PANIC,
+				(errcode_for_file_access(),
+				 errmsg("could not fsync replication identifier checkpoint \"%s\": %m",
+						tmppath)));
+	}
+
+	CloseTransientFile(tmpfd);
+
+	/* rename to permanent file, fsync file and directory */
+	if (rename(tmppath, path) != 0)
+	{
+		ereport(PANIC,
+				(errcode_for_file_access(),
+				 errmsg("could not rename replication identifier checkpoint from \"%s\" to \"%s\": %m",
+						tmppath, path)));
+	}
+
+	fsync_fname("pg_llog/checkpoints", true);
+	fsync_fname(path, false);
+}
+
+/*
+ * Recover replication replay status from checkpoint data saved earlier by
+ * CheckPointReplicationIdentifier.
+ *
+ * This only needs to be called at startup and *not* during every checkpoint
+ * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
+ * state thereafter can be recovered by looking at commit records.
+ */
+void
+StartupReplicationIdentifier(XLogRecPtr ckpt)
+{
+	char path[MAXPGPATH];
+	int fd;
+	int readBytes;
+	uint32 magic = REPLICATION_STATE_MAGIC;
+	int last_state = 0;
+
+	/* don't want to overwrite already existing state */
+#ifdef USE_ASSERT_CHECKING
+	static bool already_started = false;
+	Assert(!already_started);
+	already_started = true;
+#endif
+
+	if (max_logical_slots == 0)
+		return;
+
+	elog(LOG, "starting up replication identifier with ckpt at %X/%X",
+		 (uint32)(ckpt >> 32), (uint32)ckpt);
+
+	sprintf(path, "pg_llog/checkpoints/%X-%X.ckpt",
+			(uint32)(ckpt >> 32), (uint32)ckpt);
+
+	fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
+
+	/*
+	 * might have had max_logical_slots == 0 last run, or we just brought up a
+	 * standby.
+	 */
+	if (fd < 0 && errno == ENOENT)
+		return;
+	else if (fd < 0)
+		ereport(PANIC,
+				(errcode_for_file_access(),
+				 errmsg("could not open replication state checkpoint \"%s\": %m",
+						path)));
+
+	/* verify magic, thats written even if nothing was active */
+	readBytes = read(fd, &magic, sizeof(magic));
+	if (readBytes != sizeof(magic))
+		ereport(PANIC,
+				(errmsg("could not read replication state checkpoint magic \"%s\": %m",
+						path)));
+
+	if (magic != REPLICATION_STATE_MAGIC)
+		ereport(PANIC,
+				(errmsg("replication checkpoint has wrong magic %u instead of %u",
+						magic, REPLICATION_STATE_MAGIC)));
+
+	/* recover individual states, until there are no more to be found */
+	while (true)
+	{
+		ReplicationState local_state;
+		readBytes = read(fd, &local_state, sizeof(local_state));
+
+		/* no further data */
+		if (readBytes == 0)
+			break;
+
+		if (readBytes < 0)
+		{
+			ereport(PANIC,
+					(errcode_for_file_access(),
+					 errmsg("could not read replication checkpoint file \"%s\": %m",
+							path)));
+		}
+
+		if (readBytes != sizeof(local_state))
+		{
+			ereport(PANIC,
+					(errcode_for_file_access(),
+					 errmsg("could not read replication checkpoint file \"%s\": read %d of %zu",
+							path, readBytes, sizeof(local_state))));
+		}
+
+		if (last_state == max_logical_slots)
+			ereport(PANIC,
+					(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+					 errmsg("no free replication state could be found, increase max_logical_slots")));
+
+		/* copy data shared memory */
+		ReplicationStates[last_state++] = local_state;
+
+		elog(LOG, "recovered replication state of node %u to %X/%X",
+			 local_state.local_identifier,
+			 (uint32)(local_state.remote_lsn >> 32),
+			 (uint32)local_state.remote_lsn);
+	}
+
+	CloseTransientFile(fd);
+}
+
+/*
+ * Tell the replication identifier machinery that a commit from 'node' that
+ * originated at the LSN remote_commit on the remote node was replayed
+ * successfully and that we don't need to do so again. In combination with
+ * setting up replication_origin_lsn and guc_replication_origin_id that
+ * ensures we won't loose knowledge about that after a crash if the the
+ * transaction had a persistent effect (think of asynchronous commits).
+ *
+ * local_commit needs to be a local LSN of the commit so that we can make sure
+ * uppon a checkpoint that enough WAL has been persisted to disk.
+ */
+void
+AdvanceReplicationIdentifier(RepNodeId node,
+							 XLogRecPtr remote_commit,
+							 XLogRecPtr local_commit)
+{
+	int i;
+	int free_slot = -1;
+	ReplicationState *replication_state = NULL;
+
+	/*
+	 * XXX: should we restore into a hashtable and dump into shmem only after
+	 * recovery finished?
+	 */
+
+	/* check whether slot already exists */
+	for (i = 0; i < max_logical_slots; i++)
+	{
+		/* remember where to insert if necessary */
+		if (ReplicationStates[i].local_identifier == InvalidRepNodeId &&
+			free_slot == -1)
+		{
+			free_slot = i;
+			continue;
+		}
+
+		/* not our slot */
+		if (ReplicationStates[i].local_identifier != node)
+			continue;
+
+		/* ok, found slot */
+		replication_state = &ReplicationStates[i];
+		break;
+	}
+
+	if (replication_state == NULL && free_slot == -1)
+		ereport(PANIC,
+				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+				 errmsg("no free replication state could be found for %u, increase max_logical_slots",
+						node)));
+	/* initialize new slot */
+	else if (replication_state == NULL)
+	{
+		replication_state = &ReplicationStates[free_slot];
+		Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
+		Assert(replication_state->local_lsn == InvalidXLogRecPtr);
+		replication_state->local_identifier = node;
+	}
+
+	/*
+	 * Due to - harmless - race conditions during a checkpoint we could see
+	 * values here that are older than the ones we already have in
+	 * memory. Don't overwrite those.
+	 */
+	if (replication_state->remote_lsn < remote_commit)
+		replication_state->remote_lsn = remote_commit;
+	if (replication_state->local_lsn < local_commit)
+		replication_state->local_lsn = local_commit;
+}
+
+
+/*
+ * Setup a replication identifier in the shared memory struct if it doesn't
+ * already exists and cache access to the specific ReplicationSlot so the
+ * array doesn't have to be searched when calling
+ * AdvanceCachedReplicationIdentifier().
+ *
+ * Obviously only one such cached identifier can exist per process and the
+ * current cached value can only be set again after the prvious value is torn
+ * down with TeardownCachedReplicationIdentifier.
+ */
+void
+SetupCachedReplicationIdentifier(RepNodeId node)
+{
+	int i;
+	int free_slot = -1;
+
+	Assert(max_logical_slots != 0);
+	Assert(local_replication_state == NULL);
+
+	/*
+	 * Search for either an existing slot for that identifier or a free one we
+	 * can use.
+	 */
+	for (i = 0; i < max_logical_slots; i++)
+	{
+		/* remember where to insert if necessary */
+		if (ReplicationStates[i].local_identifier == InvalidRepNodeId &&
+			free_slot == -1)
+		{
+			free_slot = i;
+			continue;
+		}
+
+		/* not our slot */
+		if (ReplicationStates[i].local_identifier != node)
+			continue;
+
+		local_replication_state = &ReplicationStates[i];
+	}
+
+
+	if (local_replication_state == NULL && free_slot == -1)
+		ereport(ERROR,
+				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+				 errmsg("no free replication state could be found for %u, increase max_logical_slots",
+						node)));
+	else if (local_replication_state == NULL)
+	{
+		local_replication_state = &ReplicationStates[free_slot];
+		local_replication_state->local_identifier = node;
+		Assert(local_replication_state->remote_lsn == InvalidXLogRecPtr);
+		Assert(local_replication_state->local_lsn == InvalidXLogRecPtr);
+	}
+}
+
+/*
+ * Make currently cached replication identifier unavailable so a new one can
+ * be setup with SetupCachedReplicationIdentifier().
+ *
+ * This function may only be called if a previous identifier was setup with
+ * SetupCachedReplicationIdentifier().
+ */
+void
+TeardownCachedReplicationIdentifier(RepNodeId node)
+{
+	Assert(max_logical_slots != 0);
+	Assert(local_replication_state != NULL);
+
+	local_replication_state = NULL;
+}
+
+/*
+ * Do the same work AdvanceReplicationIdentifier() does, just on a pre-cached
+ * identifier. This is noticeably cheaper if you only ever work on a single
+ * replication identifier.
+ */
+void
+AdvanceCachedReplicationIdentifier(XLogRecPtr remote_commit,
+								   XLogRecPtr local_commit)
+{
+	Assert(local_replication_state != NULL);
+	if (local_replication_state->local_lsn < local_commit)
+		local_replication_state->local_lsn = local_commit;
+	if (local_replication_state->remote_lsn < remote_commit)
+		local_replication_state->remote_lsn = remote_commit;
+}
+
+/*
+ * Ask the machinery about the point up to which we successfully replayed
+ * changes from a already setup & cached replication identifier.
+ */
+XLogRecPtr
+RemoteCommitFromCachedReplicationIdentifier(void)
+{
+	Assert(local_replication_state != NULL);
+	return local_replication_state->remote_lsn;
+}
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index ed2d69f..868ebb4 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -31,6 +31,7 @@
 #include "replication/logical.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
+#include "replication/replication_identifier.h"
 #include "storage/bufmgr.h"
 #include "storage/dsm.h"
 #include "storage/ipc.h"
@@ -129,6 +130,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
 		size = add_size(size, CheckpointerShmemSize());
 		size = add_size(size, AutoVacuumShmemSize());
 		size = add_size(size, LogicalDecodingShmemSize());
+		size = add_size(size, ReplicationIdentifierShmemSize());
 		size = add_size(size, WalSndShmemSize());
 		size = add_size(size, WalRcvShmemSize());
 		size = add_size(size, BTreeShmemSize());
@@ -237,6 +239,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
 	CheckpointerShmemInit();
 	AutoVacuumShmemInit();
 	LogicalDecodingShmemInit();
+	ReplicationIdentifierShmemInit();
 	WalSndShmemInit();
 	WalRcvShmemInit();
 
diff --git a/src/backend/utils/cache/syscache.c b/src/backend/utils/cache/syscache.c
index e9bdfea..a4ebdd5 100644
--- a/src/backend/utils/cache/syscache.c
+++ b/src/backend/utils/cache/syscache.c
@@ -54,6 +54,7 @@
 #include "catalog/pg_shdepend.h"
 #include "catalog/pg_shdescription.h"
 #include "catalog/pg_shseclabel.h"
+#include "catalog/pg_replication_identifier.h"
 #include "catalog/pg_statistic.h"
 #include "catalog/pg_tablespace.h"
 #include "catalog/pg_ts_config.h"
@@ -620,6 +621,28 @@ static const struct cachedesc cacheinfo[] = {
 		},
 		128
 	},
+	{ReplicationIdentifierRelationId,		/* REPLIDIDENT */
+		ReplicationLocalIdIndex,
+		1,
+		{
+			Anum_pg_replication_riident,
+			0,
+			0,
+			0
+		},
+		16
+	},
+	{ReplicationIdentifierRelationId,		/* REPLIDREMOTE */
+		ReplicationRemoteIndex,
+		4,
+		{
+			Anum_pg_replication_riremotesysid,
+			Anum_pg_replication_riremotedb,
+			Anum_pg_replication_rilocaldb,
+			Anum_pg_replication_riname
+		},
+		16
+	},
 	{RewriteRelationId,			/* RULERELNAME */
 		RewriteRelRulenameIndexId,
 		2,
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 15286e7..7326800 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -201,6 +201,8 @@ static bool check_application_name(char **newval, void **extra, GucSource source
 static void assign_application_name(const char *newval, void *extra);
 static const char *show_unix_socket_permissions(void);
 static const char *show_log_file_mode(void);
+static void assign_replication_node_id(int newval, void *extra);
+static void assign_replication_origin_id(int newval, void *extra);
 
 static char *config_enum_get_options(struct config_enum * record,
 						const char *prefix, const char *suffix,
@@ -473,7 +475,8 @@ static bool	data_checksums;
 static int	wal_segment_size;
 static bool integer_datetimes;
 static int	effective_io_concurrency;
-
+static int	phony_replication_node_id;
+static int	phony_replication_origin_id;
 /* should be static, but commands/variable.c needs to get at this */
 char	   *role_string;
 
@@ -2118,6 +2121,26 @@ static struct config_int ConfigureNamesInt[] =
 	},
 
 	{
+		{"replication_node_id", PGC_POSTMASTER, REPLICATION_MASTER,
+		 gettext_noop("node id for replication."),
+		 NULL
+		},
+		&phony_replication_node_id,
+		InvalidRepNodeId, InvalidRepNodeId, INT_MAX,
+		NULL, assign_replication_node_id, NULL
+	},
+
+	{
+		{"replication_origin_id", PGC_USERSET, REPLICATION_MASTER,
+		 gettext_noop("current node id for replication."),
+		 NULL
+		},
+		&phony_replication_origin_id,
+		InvalidRepNodeId, InvalidRepNodeId, INT_MAX,
+		NULL, assign_replication_origin_id, NULL
+	},
+
+	{
 		{"commit_siblings", PGC_USERSET, WAL_SETTINGS,
 			gettext_noop("Sets the minimum concurrent open transactions before performing "
 						 "commit_delay."),
@@ -8918,4 +8941,26 @@ show_log_file_mode(void)
 	return buf;
 }
 
+static void
+assign_replication_node_id(int newval, void *extra)
+{
+	guc_replication_node_id = newval;
+	/* set default to local node */
+	guc_replication_origin_id = newval;
+	phony_replication_origin_id = newval;
+}
+
+
+static void
+assign_replication_origin_id(int newval, void *extra)
+{
+	/*
+	 * FIXME: add error checking hook that check wal_level and
+	 * replication_node_id.
+	 */
+	guc_replication_origin_id = newval;
+}
+
+
+
 #include "guc-file.c"
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index d66ec83..d93e863 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -258,7 +258,7 @@
 #wal_receiver_timeout = 60s		# time that receiver waits for
 					# communication from master
 					# in milliseconds; 0 disables
-
+#replication_node_id = 0		#invalid node id
 
 #------------------------------------------------------------------------------
 # QUERY TUNING
diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c
index b7424e7..f1e991a 100644
--- a/src/bin/initdb/initdb.c
+++ b/src/bin/initdb/initdb.c
@@ -202,6 +202,7 @@ const char *subdirs[] = {
 	"pg_stat",
 	"pg_stat_tmp",
 	"pg_llog",
+	"pg_llog/checkpoints",
 	"pg_llog/snapshots",
 	"pg_llog/mappings"
 };
diff --git a/src/bin/pg_resetxlog/pg_resetxlog.c b/src/bin/pg_resetxlog/pg_resetxlog.c
index f1b5d6d..dbc5b74 100644
--- a/src/bin/pg_resetxlog/pg_resetxlog.c
+++ b/src/bin/pg_resetxlog/pg_resetxlog.c
@@ -55,6 +55,7 @@
 #include "catalog/catversion.h"
 #include "catalog/pg_control.h"
 #include "common/fe_memutils.h"
+#include "replication/logical.h"
 
 extern int	optind;
 extern char *optarg;
@@ -970,6 +971,7 @@ WriteEmptyXLOG(void)
 	record->xl_len = sizeof(CheckPoint);
 	record->xl_info = XLOG_CHECKPOINT_SHUTDOWN;
 	record->xl_rmid = RM_XLOG_ID;
+	record->xl_origin_id = InvalidRepNodeId;
 	memcpy(XLogRecGetData(record), &ControlFile.checkPointCopy,
 		   sizeof(CheckPoint));
 
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 96502ce..84ff5ca 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -146,6 +146,13 @@ typedef struct xl_xact_commit
 	/* ARRAY OF SHARED INVALIDATION MESSAGES FOLLOWS */
 } xl_xact_commit;
 
+typedef struct xl_xact_origin
+{
+	XLogRecPtr	origin_lsn;
+	RepNodeId	origin_node_id;
+	TimestampTz origin_timestamp;
+} xl_xact_origin;
+
 #define MinSizeOfXactCommit offsetof(xl_xact_commit, xnodes)
 
 /*
@@ -158,7 +165,7 @@ typedef struct xl_xact_commit
  */
 #define XACT_COMPLETION_UPDATE_RELCACHE_FILE	0x01
 #define XACT_COMPLETION_FORCE_SYNC_COMMIT		0x02
-
+#define XACT_CONTAINS_ORIGIN					0x04
 /* Access macros for above flags */
 #define XactCompletionRelcacheInitFileInval(xinfo)	(xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE)
 #define XactCompletionForceSyncCommit(xinfo)		(xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT)
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 7415a26..8b03846 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -48,6 +48,7 @@ typedef struct XLogRecord
 	/* 2 bytes of padding here, initialize to zero */
 	XLogRecPtr	xl_prev;		/* ptr to previous record in log */
 	pg_crc32	xl_crc;			/* CRC for this record */
+	RepNodeId   xl_origin_id;   /* what node did originally cause this record to be written */
 
 	/* If MAXALIGN==8, there are 4 wasted bytes here */
 
@@ -177,6 +178,7 @@ typedef enum
 } RecoveryTargetType;
 
 extern XLogRecPtr XactLastRecEnd;
+extern XLogRecPtr XactLastCommitEnd;
 
 extern bool reachedConsistency;
 
diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h
index bca166e..60fcc31 100644
--- a/src/include/access/xlogdefs.h
+++ b/src/include/access/xlogdefs.h
@@ -45,6 +45,12 @@ typedef uint64 XLogSegNo;
 typedef uint32 TimeLineID;
 
 /*
+ * Denotes the node on which the action causing a wal record to be logged
+ * originated on.
+ */
+typedef uint16 RepNodeId;
+
+/*
  *	Because O_DIRECT bypasses the kernel buffers, and because we never
  *	read those buffers except during crash recovery or if wal_level != minimal,
  *	it is a win to use it in all cases where we sync on each write().  We could
diff --git a/src/include/catalog/indexing.h b/src/include/catalog/indexing.h
index 4860e98..3fdc2b5 100644
--- a/src/include/catalog/indexing.h
+++ b/src/include/catalog/indexing.h
@@ -313,6 +313,12 @@ DECLARE_UNIQUE_INDEX(pg_extension_name_index, 3081, on pg_extension using btree(
 DECLARE_UNIQUE_INDEX(pg_range_rngtypid_index, 3542, on pg_range using btree(rngtypid oid_ops));
 #define RangeTypidIndexId					3542
 
+DECLARE_UNIQUE_INDEX(pg_replication_identifier_riiident_index, 3195, on pg_replication_identifier using btree(riident oid_ops));
+#define ReplicationLocalIdIndex 3195
+
+DECLARE_UNIQUE_INDEX(pg_replication_identifier_remote_index, 3196, on pg_replication_identifier using btree(riremotesysid name_ops, riremotedb oid_ops, rilocaldb oid_ops, riname name_ops));
+#define ReplicationRemoteIndex 3196
+
 /* last step of initialization script: build the indexes declared above */
 BUILD_INDICES
 
diff --git a/src/include/catalog/pg_replication_identifier.h b/src/include/catalog/pg_replication_identifier.h
new file mode 100644
index 0000000..3d55fd1
--- /dev/null
+++ b/src/include/catalog/pg_replication_identifier.h
@@ -0,0 +1,92 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_replication_identifier.h
+ *	  Persistent Replication Node Identifiers
+ *
+ * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/catalog/pg_replication_identifier.h
+ *
+ * NOTES
+ *	  the genbki.pl script reads this file and generates .bki
+ *	  information from the DATA() statements.
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_REPLICATION_IDENTIFIER_H
+#define PG_REPLICATION_IDENTIFIER_H
+
+#include "catalog/genbki.h"
+#include "access/xlogdefs.h"
+
+/* ----------------
+ *		pg_replication_identifier.  cpp turns this into
+ *		typedef struct FormData_pg_replication_identifier
+ * ----------------
+ */
+#define ReplicationIdentifierRelationId 3465
+
+CATALOG(pg_replication_identifier,3465) BKI_SHARED_RELATION BKI_WITHOUT_OIDS
+{
+	/*
+	 * locally known identifier that gets included into wal.
+	 *
+	 * This should never leave the system.
+	 *
+	 * Needs to fit into a uint16, so we don't waste too much space. For this
+	 * reason we don't use a normal Oid column here, since we need to handle
+	 * allocation of new values manually.
+	 */
+	Oid		riident;
+
+	/* ----
+	 * remote system identifier, including tli, separated by a -.
+	 *
+	 * They are packed together for two reasons:
+	 * a) we can't represent sysids as uint64 because there's no such type on
+	 *    sql level, so we need a fixed width string anyway. And a name
+	 *    already has enough space for that.
+	 * b) syscaches can only have 4 keys, and were already at that with
+	 *    combined keys
+	 * ----
+	 */
+	NameData riremotesysid;
+
+	/* local database */
+	Oid		rilocaldb;
+
+	/* remote database */
+	Oid		riremotedb;
+
+	/* optional name, zero length string */
+	NameData riname;
+#ifdef CATALOG_VARLEN		/* variable-length fields start here */
+#endif
+} FormData_pg_replication_identifier;
+
+/* ----------------
+ *		Form_pg_extension corresponds to a pointer to a tuple with
+ *		the format of pg_extension relation.
+ * ----------------
+ */
+typedef FormData_pg_replication_identifier *Form_pg_replication_identifier;
+
+/* ----------------
+ *		compiler constants for pg_replication_identifier
+ * ----------------
+ */
+
+#define Natts_pg_replication_identifier		5
+#define Anum_pg_replication_riident			1
+#define Anum_pg_replication_riremotesysid	2
+#define Anum_pg_replication_rilocaldb		3
+#define Anum_pg_replication_riremotedb		4
+#define Anum_pg_replication_riname			5
+
+/* ----------------
+ *		pg_replication_identifier has no initial contents
+ * ----------------
+ */
+
+#endif   /* PG_REPLICTION_IDENTIFIER_H */
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 273b98f..d22fc6c 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -160,6 +160,13 @@ typedef struct LogicalDecodingContext
 /* GUCs */
 extern PGDLLIMPORT int max_logical_slots;
 
+#define InvalidRepNodeId 0
+extern PGDLLIMPORT RepNodeId guc_replication_node_id;
+extern PGDLLIMPORT RepNodeId guc_replication_origin_id;
+extern PGDLLIMPORT XLogRecPtr replication_origin_lsn;
+extern PGDLLIMPORT TimestampTz replication_origin_timestamp;
+
+
 extern Size LogicalDecodingShmemSize(void);
 extern void LogicalDecodingShmemInit(void);
 
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 4181ccf..ff5d29c 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -60,6 +60,8 @@ typedef struct ReorderBufferChange
 		int			action_internal;
 	};
 
+	RepNodeId origin_id;
+
 	/*
 	 * Context data for the change, which part of the union is valid depends
 	 * on action/action_internal.
@@ -148,6 +150,12 @@ typedef struct ReorderBufferTXN
 	 */
 	XLogRecPtr	restart_decoding_lsn;
 
+	/* origin of the change that caused this transaction */
+	RepNodeId origin_id;
+
+	/* did the TX have catalog changes */
+	bool		does_timetravel;
+
 	/*
 	 * Commit time, only known when we read the actual commit record.
 	 */
@@ -320,7 +328,7 @@ void		ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *);
 void		ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *);
 void		ReorderBufferCommit(ReorderBuffer *, TransactionId,
 							XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
-							TimestampTz commit_time);
+							TimestampTz commit_time, RepNodeId origin_id);
 void		ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn);
 void		ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId,
 									 XLogRecPtr commit_lsn, XLogRecPtr end_lsn);
diff --git a/src/include/replication/replication_identifier.h b/src/include/replication/replication_identifier.h
new file mode 100644
index 0000000..3c47038
--- /dev/null
+++ b/src/include/replication/replication_identifier.h
@@ -0,0 +1,42 @@
+/*-------------------------------------------------------------------------
+ * replication_identifier.h
+ *     XXX
+ *
+ * Copyright (c) 2013, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef REPLICATION_IDENTIFIER_H
+#define REPLICATION_IDENTIFIER_H
+
+#include "catalog/pg_replication_identifier.h"
+#include "replication/logical.h"
+
+/* API for querying & manipulating replication identifiers */
+extern RepNodeId GetReplicationIdentifier(uint64 remotesysid, Oid remotetli,
+										  Oid remotedb, Name riname,
+										  Oid rilocaldb);
+extern RepNodeId CreateReplicationIdentifier(uint64 remotesysid, Oid remotetli,
+											 Oid remotedb, Name riname,
+											 Oid rilocaldb);
+extern HeapTuple GetReplicationInfoByIdentifier(RepNodeId riident);
+
+
+extern void AdvanceReplicationIdentifier(RepNodeId node,
+										 XLogRecPtr remote_commit,
+										 XLogRecPtr local_commit);
+extern void AdvanceCachedReplicationIdentifier(XLogRecPtr remote_commit,
+											   XLogRecPtr local_commit);
+extern void SetupCachedReplicationIdentifier(RepNodeId node);
+extern void TeardownCachedReplicationIdentifier(RepNodeId node);
+extern XLogRecPtr RemoteCommitFromCachedReplicationIdentifier(void);
+
+/* crash recovery support */
+extern void CheckPointReplicationIdentifier(XLogRecPtr ckpt);
+extern void StartupReplicationIdentifier(XLogRecPtr ckpt);
+
+/* internals */
+extern Size ReplicationIdentifierShmemSize(void);
+extern void ReplicationIdentifierShmemInit(void);
+
+#endif
diff --git a/src/include/utils/syscache.h b/src/include/utils/syscache.h
index e41b3d2..357f344 100644
--- a/src/include/utils/syscache.h
+++ b/src/include/utils/syscache.h
@@ -77,6 +77,8 @@ enum SysCacheIdentifier
 	RANGETYPE,
 	RELNAMENSP,
 	RELOID,
+	REPLIDIDENT,
+	REPLIDREMOTE,
 	RULERELNAME,
 	STATRELATTINH,
 	TABLESPACEOID,
diff --git a/src/test/regress/expected/sanity_check.out b/src/test/regress/expected/sanity_check.out
index a62a3e3..d5e40d6 100644
--- a/src/test/regress/expected/sanity_check.out
+++ b/src/test/regress/expected/sanity_check.out
@@ -120,6 +120,7 @@ pg_opfamily|t
 pg_pltemplate|t
 pg_proc|t
 pg_range|t
+pg_replication_identifier|t
 pg_rewrite|t
 pg_seclabel|t
 pg_shdepend|t
-- 
1.8.3.251.g1462b67

