diff --git a/contrib/lrep/Makefile b/contrib/lrep/Makefile
new file mode 100644
index 0000000..d3f7ba3
--- /dev/null
+++ b/contrib/lrep/Makefile
@@ -0,0 +1,20 @@
+# contrib/lrep/Makefile
+
+MODULE_big = lrep
+OBJS = lrep_utils.o lrep_output.o $(WIN32RES)
+PG_CPPFLAGS = -I$(libpq_srcdir)
+
+#EXTENSION = lrep
+#REGRESS = lrep
+#REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/lrep/logical.conf
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/lrep
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/lrep/lrep.h b/contrib/lrep/lrep.h
new file mode 100644
index 0000000..e2910c4
--- /dev/null
+++ b/contrib/lrep/lrep.h
@@ -0,0 +1,108 @@
+/*-------------------------------------------------------------------------
+ *
+ * lrep.h
+ *		LREP public interfaces
+ *
+ * Copyright (c) 2012-2014, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		contrib/lrep/lrep.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef LREP_H
+#define LREP_H
+
+#include "libpq-fe.h"
+
+#include "nodes/parsenodes.h"
+
+#include "replication/logical.h"
+#include "replication/output_plugin.h"
+
+#include "storage/lock.h"
+
+#define LREP_PROTO_VERSION_NUM 1
+#define LREP_PROTO_MIN_REMOTE_VERSION_NUM 1
+
+typedef struct
+{
+	MemoryContext context;
+
+	bool allow_binary_protocol;
+	bool allow_sendrecv_protocol;
+	bool int_datetime_mismatch;
+	bool forward_changesets;
+
+	uint32 client_pg_version;
+	uint32 client_pg_catversion;
+	uint32 client_proto_version;
+	uint32 client_min_proto_version;
+	size_t client_sizeof_int;
+	size_t client_sizeof_long;
+	size_t client_sizeof_datum;
+	size_t client_maxalign;
+	bool client_bigendian;
+	bool client_float4_byval;
+	bool client_float8_byval;
+	bool client_int_datetime;
+	char *client_db_encoding;
+
+	void *extra;		/* Additional data */
+} LREPOutputData;
+
+
+typedef struct LREPTupleData
+{
+	Datum	values[MaxTupleAttributeNumber];
+	bool	nulls[MaxTupleAttributeNumber];
+	bool	changed[MaxTupleAttributeNumber];
+} LREPTupleData;
+
+
+extern void lrep_write_begin(StringInfo out, ReorderBufferTXN *txn, int flags,
+							 StringInfo extradata);
+extern void lrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
+							  XLogRecPtr commit_lsn, int flags,
+							  StringInfo extradata);
+
+extern void lrep_write_insert(LREPOutputData *data, StringInfo out,
+							  Relation rel, HeapTuple newtuple);
+extern void lrep_write_update(LREPOutputData *data, StringInfo out,
+							  Relation rel, HeapTuple oldtuple,
+							  HeapTuple newtuple);
+extern void lrep_write_delete(LREPOutputData *data, StringInfo out,
+							  Relation rel, HeapTuple oldtuple);
+
+extern void lrep_write_rel(StringInfo out, Relation rel);
+extern void lrep_write_tuple(LREPOutputData *data, StringInfo out, Relation rel,
+						HeapTuple tuple);
+
+extern int lrep_read_begin(StringInfo in, XLogRecPtr *origlsn,
+						   TimestampTz *committime, TransactionId *remote_xid);
+extern int lrep_read_commit(StringInfo in, XLogRecPtr *commit_lsn,
+							XLogRecPtr *end_lsn, TimestampTz *committime);
+extern Relation lrep_read_insert(StringInfo in, LOCKMODE lockmode,
+								 LREPTupleData *tuple);
+extern Relation lrep_read_update(StringInfo in, LOCKMODE lockmode,
+								 LREPTupleData *oldtuple,
+								 LREPTupleData *newtuple, bool *pkeysent);
+extern Relation lrep_read_delete(StringInfo in, LOCKMODE lockmode,
+								 LREPTupleData *tuple, bool *pkeysent);
+
+extern void lrep_read_tuple_parts(StringInfo s, TupleDesc desc,
+								  LREPTupleData *tuple);
+extern RangeVar *lrep_read_rel(StringInfo s);
+
+extern bool lrep_send_feedback(PGconn *conn, XLogRecPtr recvpos,
+							   XLogRecPtr writepos, XLogRecPtr flushpos,
+							   int64 now, bool force);
+
+extern void lrep_opt_parse_notnull(DefElem *elem, const char *paramtype);
+extern void lrep_opt_parse_uint32(DefElem *elem, uint32 *res);
+extern void lrep_opt_parse_size_t(DefElem *elem, size_t *res);
+extern void lrep_opt_parse_bool(DefElem *elem, bool *res);
+extern void lrep_opt_parse_identifier_list_arr(DefElem *elem, char ***list, int *len);
+extern void lrep_opt_required_error(const char *param);
+
+#endif /* LREP */
diff --git a/contrib/lrep/lrep_output.c b/contrib/lrep/lrep_output.c
new file mode 100644
index 0000000..7c4b614
--- /dev/null
+++ b/contrib/lrep/lrep_output.c
@@ -0,0 +1,430 @@
+/*-------------------------------------------------------------------------
+ *
+ * lrep_output.c
+ *		LREP output plugin
+ *
+ * Copyright (c) 2012-2014, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  contrib/lrep/lrep_output.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "miscadmin.h"
+
+#include "lrep_output.h"
+
+#include "access/sysattr.h"
+#include "access/tuptoaster.h"
+#include "access/xact.h"
+
+#include "catalog/catversion.h"
+#include "catalog/index.h"
+
+#include "catalog/namespace.h"
+#include "catalog/pg_class.h"
+#include "catalog/pg_database.h"
+#include "catalog/pg_namespace.h"
+#include "catalog/pg_type.h"
+
+#include "commands/dbcommands.h"
+
+#include "executor/spi.h"
+
+#include "libpq/pqformat.h"
+
+#include "mb/pg_wchar.h"
+
+#include "utils/builtins.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/rel.h"
+#include "utils/syscache.h"
+#include "utils/timestamp.h"
+#include "utils/typcache.h"
+
+PG_MODULE_MAGIC;
+
+extern void		_PG_output_plugin_init(OutputPluginCallbacks *cb);
+
+static LREPOutputCallbackData *LREPOutputCallbacks = NULL;
+
+#define startup_callback(ctx, data, options) \
+	if (LREPOutputCallbacks && LREPOutputCallbacks->startup_callback) \
+		LREPOutputCallbacks->startup_callback(ctx, data, options)
+
+#define shutdown_callback(ctx) \
+	if (LREPOutputCallbacks && LREPOutputCallbacks->shutdown_callback) \
+		LREPOutputCallbacks->shutdown_callback(ctx)
+
+#define should_forward_changeset(ctx, data, txn) \
+	(!LREPOutputCallbacks || \
+	 !LREPOutputCallbacks->should_forward_changeset || \
+	 LREPOutputCallbacks->should_forward_changeset(ctx, data, txn))
+
+#define should_forward_change(ctx, data, relation, action) \
+	(!LREPOutputCallbacks || \
+	 !LREPOutputCallbacks->should_forward_change || \
+	 LREPOutputCallbacks->should_forward_change(ctx, data, relation, action))
+
+
+#ifdef USE_FLOAT4_BYVAL
+	static bool server_float4byval = true;
+#else
+	static bool server_float4byval = false;
+#endif
+
+#ifdef USE_FLOAT8_BYVAL
+	static bool server_float8byval = true;
+#else
+	static bool server_float8byval = false;
+#endif
+
+#ifdef USE_INTEGER_DATETIMES
+	static bool server_int_datetime = true;
+#else
+	static bool server_int_datetime = false;
+#endif
+
+#ifdef server_bigendian
+	static bool server_bigendian = true;
+#else
+	static bool server_bigendian = false;
+#endif
+
+
+/* These must be available to pg_dlsym() */
+static void pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt,
+							  bool is_init);
+static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
+					ReorderBufferTXN *txn);
+static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
+					 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
+static void pg_decode_change(LogicalDecodingContext *ctx,
+				 ReorderBufferTXN *txn, Relation rel,
+				 ReorderBufferChange *change);
+static void pg_decode_shutdown(LogicalDecodingContext * ctx);
+
+void
+LREP_output_plugin_init(OutputPluginCallbacks *pgcb, LREPOutputCallbackData *plugincb)
+{
+	pgcb->startup_cb = pg_decode_startup;
+	pgcb->begin_cb = pg_decode_begin_txn;
+	pgcb->change_cb = pg_decode_change;
+	pgcb->commit_cb = pg_decode_commit_txn;
+	pgcb->shutdown_cb = pg_decode_shutdown;
+
+	LREPOutputCallbacks = plugincb;
+}
+
+/* specify output plugin callbacks */
+void
+_PG_output_plugin_init(OutputPluginCallbacks *cb)
+{
+	AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
+
+	LREP_output_plugin_init(cb, NULL);
+}
+
+/* initialize this plugin */
+static void
+pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt, bool is_init)
+{
+	List	   *additional_options = NIL;
+	ListCell   *option;
+	LREPOutputData *data;
+
+	data = palloc0(sizeof(LREPOutputData));
+	data->context = AllocSetContextCreate(TopMemoryContext,
+										  "lrep output plugin data context",
+										  ALLOCSET_DEFAULT_MINSIZE,
+										  ALLOCSET_DEFAULT_INITSIZE,
+										  ALLOCSET_DEFAULT_MAXSIZE);
+
+	ctx->output_plugin_private = data;
+
+	opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
+
+	/* parse options passed in by the client */
+
+	foreach(option, ctx->output_plugin_options)
+	{
+		DefElem    *elem = lfirst(option);
+
+		Assert(elem->arg == NULL || IsA(elem->arg, String));
+
+		if (strcmp(elem->defname, "pg_version") == 0)
+			lrep_opt_parse_uint32(elem, &data->client_pg_version);
+		else if (strcmp(elem->defname, "pg_catversion") == 0)
+			lrep_opt_parse_uint32(elem, &data->client_pg_catversion);
+		else if (strcmp(elem->defname, "proto_version") == 0)
+			lrep_opt_parse_uint32(elem, &data->client_proto_version);
+		else if (strcmp(elem->defname, "min_proto_version") == 0)
+			lrep_opt_parse_uint32(elem, &data->client_min_proto_version);
+		else if (strcmp(elem->defname, "sizeof_int") == 0)
+			lrep_opt_parse_size_t(elem, &data->client_sizeof_int);
+		else if (strcmp(elem->defname, "sizeof_long") == 0)
+			lrep_opt_parse_size_t(elem, &data->client_sizeof_long);
+		else if (strcmp(elem->defname, "sizeof_datum") == 0)
+			lrep_opt_parse_size_t(elem, &data->client_sizeof_datum);
+		else if (strcmp(elem->defname, "maxalign") == 0)
+			lrep_opt_parse_size_t(elem, &data->client_maxalign);
+		else if (strcmp(elem->defname, "bigendian") == 0)
+			lrep_opt_parse_bool(elem, &data->client_bigendian);
+		else if (strcmp(elem->defname, "float4_byval") == 0)
+			lrep_opt_parse_bool(elem, &data->client_float4_byval);
+		else if (strcmp(elem->defname, "float8_byval") == 0)
+			lrep_opt_parse_bool(elem, &data->client_float8_byval);
+		else if (strcmp(elem->defname, "integer_datetimes") == 0)
+			lrep_opt_parse_bool(elem, &data->client_int_datetime);
+		else if (strcmp(elem->defname, "db_encoding") == 0)
+			data->client_db_encoding = pstrdup(strVal(elem->arg));
+		else if (strcmp(elem->defname, "interactive") == 0)
+		{
+			/*
+			 * Set defaults for interactive mode
+			 *
+			 * This is used for examining the replication queue from SQL.
+			 */
+			data->client_pg_version = PG_VERSION_NUM;
+			data->client_pg_catversion = CATALOG_VERSION_NO;
+			data->client_proto_version = LREP_PROTO_VERSION_NUM;
+			data->client_min_proto_version = LREP_PROTO_VERSION_NUM;
+			data->client_sizeof_int = sizeof(int);
+			data->client_sizeof_long = sizeof(long);
+			data->client_sizeof_datum = sizeof(Datum);
+			data->client_maxalign = MAXIMUM_ALIGNOF;
+			data->client_bigendian = server_bigendian;
+			data->client_float4_byval = server_float4byval;
+			data->client_float8_byval = server_float8byval;
+			data->client_int_datetime = server_int_datetime;
+			data->client_db_encoding = pstrdup(GetDatabaseEncodingName());
+		}
+		else
+		{
+			additional_options = lappend(additional_options, elem);
+		}
+	}
+
+	/* no options are passed in during initialization, so don't complain there */
+	if (!is_init)
+	{
+		if (data->client_pg_version == 0)
+			lrep_opt_required_error("pg_version");
+		if (data->client_pg_catversion == 0)
+			lrep_opt_required_error("pg_catversion");
+		if (data->client_proto_version == 0)
+			lrep_opt_required_error("proto_version");
+		if (data->client_min_proto_version == 0)
+			lrep_opt_required_error("min_proto_version");
+		if (data->client_sizeof_int == 0)
+			lrep_opt_required_error("sizeof_int");
+		if (data->client_sizeof_long == 0)
+			lrep_opt_required_error("sizeof_long");
+		if (data->client_sizeof_datum == 0)
+			lrep_opt_required_error("sizeof_datum");
+		if (data->client_maxalign == 0)
+			lrep_opt_required_error("maxalign");
+		if (data->client_db_encoding == NULL)
+			lrep_opt_required_error("db_encoding");
+
+		/* check incompatibilities we cannot work around */
+		if (strcmp(data->client_db_encoding, GetDatabaseEncodingName()) != 0)
+			elog(ERROR, "mismatching encodings are not yet supported");
+
+		if (data->client_min_proto_version > LREP_PROTO_VERSION_NUM)
+			elog(ERROR, "incompatible lrep client and server versions, server too old");
+		if (data->client_proto_version < LREP_PROTO_MIN_REMOTE_VERSION_NUM)
+			elog(ERROR, "incompatible lrep client and server versions, client too old");
+
+		data->allow_binary_protocol = true;
+		data->allow_sendrecv_protocol = true;
+
+		/*
+		 * Now use the passed in information to determine how to encode the
+		 * data sent by the output plugin. We don't make datatype specific
+		 * decisions here, just generic decisions about using binary and/or
+		 * send/recv protocols.
+		 */
+
+		/*
+		 * Don't use the binary protocol if there are fundamental arch
+		 * differences.
+		 */
+		if (data->client_sizeof_int != sizeof(int) ||
+			data->client_sizeof_long != sizeof(long) ||
+			data->client_sizeof_datum != sizeof(Datum))
+		{
+			data->allow_binary_protocol = false;
+			elog(LOG, "disabling binary protocol because of sizeof differences");
+		}
+		else if (data->client_bigendian != server_bigendian)
+		{
+			data->allow_binary_protocol = false;
+			elog(LOG, "disabling binary protocol because of endianess difference");
+		}
+
+		/*
+		 * We also can't use the binary protocol if there are critical
+		 * differences in compile time settings.
+		 */
+		if (data->client_float4_byval != server_float4byval ||
+			data->client_float8_byval != server_float8byval)
+			data->allow_binary_protocol = false;
+
+		if (data->client_int_datetime != server_int_datetime)
+			data->int_datetime_mismatch = true;
+		else
+			data->int_datetime_mismatch = false;
+
+
+		/*
+		 * Don't use the send/recv protocol if there are version
+		 * differences. There currently isn't any guarantee for cross version
+		 * compatibility of the send/recv representations. But there actually
+		 * *is* a compat. guarantee for architecture differences...
+		 *
+		 * XXX: We could easily do better by doing per datatype considerations
+		 * if there are known incompatibilities.
+		 */
+		if (data->client_pg_version / 100 != PG_VERSION_NUM / 100)
+			data->allow_sendrecv_protocol = false;
+
+		/*
+		 * Make sure it's safe to begin playing changes to the remote end.
+		 * This'll ERROR out if we're not ready.
+		 */
+
+		startup_callback(ctx, data, additional_options);
+	}
+}
+
+/*
+ * BEGIN callback
+ *
+ * If you change this you must also change the corresponding code in
+ * the apply code also. Make sure that any flags are in sync.
+ */
+void
+pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
+{
+	LREPOutputData *data = ctx->output_plugin_private;
+	int flags = 0;
+	StringInfoData extradata;
+
+	AssertVariableIsOfType(&pg_decode_begin_txn, LogicalDecodeBeginCB);
+
+	if (!should_forward_changeset(ctx, data, txn))
+		return;
+
+	/* get flags and extra fields to send */
+	if (LREPOutputCallbacks && LREPOutputCallbacks->begin_txn_get_extrafields)
+	{
+		initStringInfo(&extradata);
+		flags = LREPOutputCallbacks->begin_txn_get_extrafields(ctx, txn, &extradata);
+	}
+
+	OutputPluginPrepareWrite(ctx, true);
+
+	lrep_write_begin(ctx->out, txn, flags, &extradata);
+
+	OutputPluginWrite(ctx, true);
+	return;
+}
+
+/*
+ * COMMIT callback
+ *
+ * Send the LSN at the time of the commit, the commit time, and the end LSN.
+ *
+ * The presence of additional records is controlled by a flag field, with
+ * records that're present appearing strictly in the order they're listed
+ * here. There is no sub-record header or other structure beyond the flags
+ * field.
+ *
+ * If you change this, you'll need to change process_remote_commit(...)
+ * too. Make sure to keep any flags in sync.
+ */
+void
+pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+					 XLogRecPtr commit_lsn)
+{
+	LREPOutputData *data = ctx->output_plugin_private;
+	int flags = 0;
+	StringInfoData extradata;
+
+	if (!should_forward_changeset(ctx, data, txn))
+		return;
+
+	/* get flags and extra fields to send */
+	if (LREPOutputCallbacks && LREPOutputCallbacks->commit_txn_get_extrafields)
+	{
+		initStringInfo(&extradata);
+		flags = LREPOutputCallbacks->commit_txn_get_extrafields(ctx, txn, &extradata);
+	}
+
+	OutputPluginPrepareWrite(ctx, true);
+
+	lrep_write_commit(ctx->out, txn, commit_lsn, flags, &extradata);
+
+	OutputPluginWrite(ctx, true);
+}
+
+void
+pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+				 Relation relation, ReorderBufferChange *change)
+{
+	LREPOutputData *data;
+	MemoryContext old;
+
+	data = ctx->output_plugin_private;
+
+	/* Avoid leaking memory by using and resetting our own context */
+	old = MemoryContextSwitchTo(data->context);
+
+	if (!should_forward_changeset(ctx, data, txn))
+		return;
+
+	if (!should_forward_change(ctx, data, relation, change->action))
+		return;
+
+	OutputPluginPrepareWrite(ctx, true);
+
+	switch (change->action)
+	{
+		case REORDER_BUFFER_CHANGE_INSERT:
+			lrep_write_insert(data, ctx->out, relation,
+							  &change->data.tp.newtuple->tuple);
+			break;
+		case REORDER_BUFFER_CHANGE_UPDATE:
+			{
+				HeapTuple oldtuple = change->data.tp.oldtuple ?
+					&change->data.tp.oldtuple->tuple : NULL;
+
+				lrep_write_update(data, ctx->out, relation,
+								  oldtuple, &change->data.tp.newtuple->tuple);
+				break;
+			}
+		case REORDER_BUFFER_CHANGE_DELETE:
+			{
+				HeapTuple oldtuple = change->data.tp.oldtuple ?
+					&change->data.tp.oldtuple->tuple : NULL;
+
+				lrep_write_delete(data, ctx->out, relation, oldtuple);
+				break;
+			}
+		default:
+			Assert(false);
+	}
+	OutputPluginWrite(ctx, true);
+
+	MemoryContextSwitchTo(old);
+	MemoryContextReset(data->context);
+}
+
+static void pg_decode_shutdown(LogicalDecodingContext * ctx)
+{
+	shutdown_callback(ctx);
+}
diff --git a/contrib/lrep/lrep_output.h b/contrib/lrep/lrep_output.h
new file mode 100644
index 0000000..bc19e12
--- /dev/null
+++ b/contrib/lrep/lrep_output.h
@@ -0,0 +1,49 @@
+/*-------------------------------------------------------------------------
+ *
+ * lrep_output.h
+ *		LREP output plugin interfaces
+ *
+ * Copyright (c) 2012-2014, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		contrib/lrep/lrep.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef LREP_OUTPUT_H
+#define LREP_OUTPUT_H
+
+#include "lrep.h"
+
+typedef void (*lrep_decode_startup_callback_t) (LogicalDecodingContext *ctx,
+												LREPOutputData *data,
+												List *options);
+typedef void (*lrep_decode_shutdown_callback_t) (LogicalDecodingContext * ctx);
+typedef bool (*lrep_should_forward_changeset_t) (LogicalDecodingContext *ctx,
+												 LREPOutputData *data,
+												 ReorderBufferTXN *txn);
+typedef bool (*lrep_should_forward_change_t) (LogicalDecodingContext *ctx,
+											  LREPOutputData *data,
+											  Relation relation,
+											  enum ReorderBufferChangeType action);
+typedef int	(*lrep_begin_txn_get_extrafields_t) (LogicalDecodingContext *ctx,
+												 ReorderBufferTXN *txn,
+												 StringInfo extrafields);
+typedef int	(*lrep_commit_txn_get_extrafields_t) (LogicalDecodingContext *ctx,
+												  ReorderBufferTXN *txn,
+												  StringInfo extrafields);
+
+typedef struct
+{
+	lrep_decode_startup_callback_t startup_callback;
+	lrep_decode_shutdown_callback_t shutdown_callback;
+	lrep_should_forward_changeset_t should_forward_changeset;
+	lrep_should_forward_change_t should_forward_change;
+	lrep_begin_txn_get_extrafields_t begin_txn_get_extrafields;
+	lrep_commit_txn_get_extrafields_t commit_txn_get_extrafields;
+} LREPOutputCallbackData;
+
+
+extern void LREP_output_plugin_init(OutputPluginCallbacks *pgcb, LREPOutputCallbackData *plugincb);
+
+#endif /* LREP_OUTPUT_H */
diff --git a/contrib/lrep/lrep_utils.c b/contrib/lrep/lrep_utils.c
new file mode 100644
index 0000000..eb56357
--- /dev/null
+++ b/contrib/lrep/lrep_utils.c
@@ -0,0 +1,725 @@
+/*-------------------------------------------------------------------------
+ *
+ * lrep_utils.c
+ *		LREP utility functions
+ *
+ * Copyright (c) 2012-2014, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  contrib/lrep/lrep_util.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "miscadmin.h"
+
+#include "lrep.h"
+
+#include "access/sysattr.h"
+#include "access/tuptoaster.h"
+#include "access/xact.h"
+
+#include "catalog/catversion.h"
+#include "catalog/index.h"
+
+#include "catalog/namespace.h"
+#include "catalog/pg_class.h"
+#include "catalog/pg_database.h"
+#include "catalog/pg_namespace.h"
+#include "catalog/pg_type.h"
+
+#include "commands/dbcommands.h"
+
+#include "executor/spi.h"
+
+#include "libpq/pqformat.h"
+#include "libpq-fe.h"
+
+#include "mb/pg_wchar.h"
+
+#include "utils/builtins.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/rel.h"
+#include "utils/syscache.h"
+#include "utils/timestamp.h"
+#include "utils/typcache.h"
+
+static void
+decide_datum_transfer(LREPOutputData *data,
+					  Form_pg_attribute att, Form_pg_type typclass,
+					  bool *use_binary, bool *use_sendrecv);
+
+/*
+ * Wire protocol functions
+ */
+
+/*
+ * Write BEGIN to the output stream.
+ */
+void
+lrep_write_begin(StringInfo out, ReorderBufferTXN *txn, int flags,
+				 StringInfo extradata)
+{
+	pq_sendbyte(out, 'B');		/* BEGIN */
+
+	/* send the flags field its self */
+	pq_sendint(out, flags, 4);
+
+	/* fixed fields */
+	pq_sendint64(out, txn->final_lsn);
+	pq_sendint64(out, txn->commit_time);
+	pq_sendint(out, txn->xid, 4);
+
+	/* if flags were set send the extradata too */
+	if (flags != 0 && extradata->len > 0)
+		pq_sendbytes(out, extradata->data, extradata->len);
+}
+
+/*
+ * Write COMMIT to the output stream.
+ */
+void
+lrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn,
+				  int flags, StringInfo extradata)
+{
+	pq_sendbyte(out, 'C');		/* sending COMMIT */
+
+	/* send the flags field its self */
+	pq_sendint(out, flags, 4);
+
+	/* send fixed fields */
+	pq_sendint64(out, commit_lsn);
+	pq_sendint64(out, txn->end_lsn);
+	pq_sendint64(out, txn->commit_time);
+
+	/* if flags were set send the extradata too */
+	if (flags != 0 && extradata->len > 0)
+		pq_sendbytes(out, extradata->data, extradata->len);
+}
+
+/*
+ * Write INSERT to the output stream.
+ */
+void
+lrep_write_insert(LREPOutputData *data, StringInfo out, Relation rel,
+				  HeapTuple newtuple)
+{
+	pq_sendbyte(out, 'I');		/* action INSERT */
+	lrep_write_rel(out, rel);
+	pq_sendbyte(out, 'N');		/* new tuple follows */
+	lrep_write_tuple(data, out, rel, newtuple);
+}
+
+/*
+ * Write UPDATE to the output stream.
+ */
+void
+lrep_write_update(LREPOutputData *data, StringInfo out, Relation rel,
+				  HeapTuple oldtuple, HeapTuple newtuple)
+{
+	pq_sendbyte(out, 'U');		/* action UPDATE */
+	lrep_write_rel(out, rel);
+	if (oldtuple != NULL)
+	{
+		pq_sendbyte(out, 'K');	/* old key follows */
+		lrep_write_tuple(data, out, rel, oldtuple);
+	}
+	pq_sendbyte(out, 'N');		/* new tuple follows */
+	lrep_write_tuple(data, out, rel, newtuple);
+
+	pq_sendbyte(out, 'I');		/* action INSERT */
+	lrep_write_rel(out, rel);
+	pq_sendbyte(out, 'N');		/* new tuple follows */
+	lrep_write_tuple(data, out, rel, newtuple);
+}
+
+/*
+ * Write DELETE to the output stream.
+ */
+void
+lrep_write_delete(LREPOutputData *data, StringInfo out, Relation rel,
+				  HeapTuple oldtuple)
+{
+	pq_sendbyte(out, 'D');		/* action DELETE */
+	lrep_write_rel(out, rel);
+	if (oldtuple != NULL)
+	{
+		pq_sendbyte(out, 'K');	/* old key follows */
+		lrep_write_tuple(data, out, rel, oldtuple);
+	}
+	else
+		pq_sendbyte(out, 'E');	/* empty */
+}
+
+/*
+ * Write schema.relation to the output stream.
+ */
+void
+lrep_write_rel(StringInfo out, Relation rel)
+{
+	const char *nspname;
+	int64		nspnamelen;
+	const char *relname;
+	int64		relnamelen;
+
+	nspname = get_namespace_name(rel->rd_rel->relnamespace);
+	if (nspname == NULL)
+		elog(ERROR, "cache lookup failed for namespace %u",
+			 rel->rd_rel->relnamespace);
+	nspnamelen = strlen(nspname) + 1;
+
+	relname = NameStr(rel->rd_rel->relname);
+	relnamelen = strlen(relname) + 1;
+
+	pq_sendint(out, nspnamelen, 2);		/* schema name length */
+	appendBinaryStringInfo(out, nspname, nspnamelen);
+
+	pq_sendint(out, relnamelen, 2);		/* table name length */
+	appendBinaryStringInfo(out, relname, relnamelen);
+}
+
+/*
+ * Write a tuple to the outputstream, in the most efficient format possible.
+ */
+void
+lrep_write_tuple(LREPOutputData *data, StringInfo out, Relation rel,
+			HeapTuple tuple)
+{
+	TupleDesc	desc;
+	Datum		values[MaxTupleAttributeNumber];
+	bool		isnull[MaxTupleAttributeNumber];
+	int			i;
+
+	desc = RelationGetDescr(rel);
+
+	pq_sendbyte(out, 'T');			/* tuple follows */
+
+	pq_sendint(out, desc->natts, 4);		/* number of attributes */
+
+	/* try to allocate enough memory from the get go */
+	enlargeStringInfo(out, tuple->t_len +
+					  desc->natts * ( 1 + 4));
+
+	/*
+	 * XXX: should this prove to be a relevant bottleneck, it might be
+	 * interesting to inline heap_deform_tuple() here, we don't actually need
+	 * the information in the form we get from it.
+	 */
+	heap_deform_tuple(tuple, desc, values, isnull);
+
+	for (i = 0; i < desc->natts; i++)
+	{
+		HeapTuple	typtup;
+		Form_pg_type typclass;
+
+		Form_pg_attribute att = desc->attrs[i];
+
+		bool use_binary = false;
+		bool use_sendrecv = false;
+
+		if (isnull[i] || att->attisdropped)
+		{
+			pq_sendbyte(out, 'n');	/* null column */
+			continue;
+		}
+		else if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
+		{
+			pq_sendbyte(out, 'u');	/* unchanged toast column */
+			continue;
+		}
+
+		typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
+		if (!HeapTupleIsValid(typtup))
+			elog(ERROR, "cache lookup failed for type %u", att->atttypid);
+		typclass = (Form_pg_type) GETSTRUCT(typtup);
+
+		decide_datum_transfer(data, att, typclass, &use_binary, &use_sendrecv);
+
+		if (use_binary)
+		{
+			pq_sendbyte(out, 'b');	/* binary data follows */
+
+			/* pass by value */
+			if (att->attbyval)
+			{
+				pq_sendint(out, att->attlen, 4); /* length */
+
+				enlargeStringInfo(out, att->attlen);
+				store_att_byval(out->data + out->len, values[i], att->attlen);
+				out->len += att->attlen;
+				out->data[out->len] = '\0';
+			}
+			/* fixed length non-varlena pass-by-reference type */
+			else if (att->attlen > 0)
+			{
+				pq_sendint(out, att->attlen, 4); /* length */
+
+				appendBinaryStringInfo(out, DatumGetPointer(values[i]),
+									   att->attlen);
+			}
+			/* varlena type */
+			else if (att->attlen == -1)
+			{
+				char *data = DatumGetPointer(values[i]);
+
+				/* send indirect datums inline */
+				if (VARATT_IS_EXTERNAL_INDIRECT(values[i]))
+				{
+					struct varatt_indirect redirect;
+					VARATT_EXTERNAL_GET_POINTER(redirect, data);
+					data = (char *) redirect.pointer;
+				}
+
+				Assert(!VARATT_IS_EXTERNAL(data));
+
+				pq_sendint(out, VARSIZE_ANY(data), 4); /* length */
+
+				appendBinaryStringInfo(out, data,
+									   VARSIZE_ANY(data));
+
+			}
+			else
+				elog(ERROR, "unsupported tuple type");
+		}
+		else if (use_sendrecv)
+		{
+			bytea	   *outputbytes;
+			int			len;
+
+			pq_sendbyte(out, 's');	/* 'send' data follows */
+
+			outputbytes =
+				OidSendFunctionCall(typclass->typsend, values[i]);
+
+			len = VARSIZE(outputbytes) - VARHDRSZ;
+			pq_sendint(out, len, 4); /* length */
+			pq_sendbytes(out, VARDATA(outputbytes), len); /* data */
+			pfree(outputbytes);
+		}
+		else
+		{
+			char   	   *outputstr;
+			int			len;
+
+			pq_sendbyte(out, 't');	/* 'text' data follows */
+
+			outputstr =
+				OidOutputFunctionCall(typclass->typoutput, values[i]);
+			len = strlen(outputstr) + 1;
+			pq_sendint(out, len, 4); /* length */
+			appendBinaryStringInfo(out, outputstr, len); /* data */
+			pfree(outputstr);
+		}
+
+		ReleaseSysCache(typtup);
+	}
+}
+
+/*
+ * Read transaction BEGIN from the stream.
+ */
+int
+lrep_read_begin(StringInfo in, XLogRecPtr *origlsn, TimestampTz *committime,
+				TransactionId *remote_xid)
+{
+	int flags = pq_getmsgint(in, 4);
+
+	*origlsn = pq_getmsgint64(in);
+	Assert(*origlsn != InvalidXLogRecPtr);
+	*committime = pq_getmsgint64(in);
+	*remote_xid = pq_getmsgint(in, 4);
+
+	return flags;
+}
+
+/*
+ * Read transaction COMMIT from the stream.
+ */
+int
+lrep_read_commit(StringInfo in, XLogRecPtr *commit_lsn, XLogRecPtr *end_lsn,
+				 TimestampTz *committime)
+{
+	int flags = pq_getmsgint(in, 4);
+
+	*commit_lsn = pq_getmsgint64(in);
+	*end_lsn = pq_getmsgint64(in);
+	*committime = pq_getmsgint64(in);
+
+	return flags;
+}
+
+/*
+ * Read INSERT from stream.
+ *
+ * Returns open relation locked in LOCKMODE and a HeapTuple
+ */
+Relation
+lrep_read_insert(StringInfo in, LOCKMODE lockmode, LREPTupleData *tuple)
+{
+	char		action;
+	Oid			reloid;
+	Relation	rel;
+	TupleDesc	desc;
+
+	reloid = RangeVarGetRelid(lrep_read_rel(in), lockmode, false);
+
+	action = pq_getmsgbyte(in);
+	if (action != 'N')
+		elog(ERROR, "expected new tuple but got %d",
+			 action);
+
+	rel = heap_open(reloid, NoLock);
+	desc = RelationGetDescr(rel);
+
+	lrep_read_tuple_parts(in, desc, tuple);
+
+	return rel;
+}
+
+/*
+ * Read UPDATE from stream.
+ *
+ * Returns open relation locked in LOCKMODE and a old + new HeapTuples,
+ * old might be NULL.
+ */
+Relation
+lrep_read_update(StringInfo in, LOCKMODE lockmode, LREPTupleData *oldtuple,
+				 LREPTupleData *newtuple, bool *pkey_sent)
+{
+	char		action;
+	Oid			reloid;
+	Relation	rel;
+	TupleDesc	desc;
+
+	reloid = RangeVarGetRelid(lrep_read_rel(in), lockmode, false);
+
+	action = pq_getmsgbyte(in);
+
+	if (action != 'K' && action != 'N')
+		elog(ERROR, "expected action 'N' or 'K', got %c",
+			 action);
+
+	rel = heap_open(reloid, NoLock);
+	desc = RelationGetDescr(rel);
+
+	if (action == 'K')
+	{
+		lrep_read_tuple_parts(in, desc, oldtuple);
+		*pkey_sent = true;
+		action = pq_getmsgbyte(in);
+	}
+	else
+		*pkey_sent = false;
+
+	/* check for new  tuple */
+	if (action != 'N')
+		elog(ERROR, "expected action 'N', got %c",
+			 action);
+
+	lrep_read_tuple_parts(in, desc, newtuple);
+
+	return rel;
+}
+
+/*
+ * Read DELETE from stream.
+ *
+ * Returns open relation locked in LOCKMODE and a optionaly HeapTuple
+ */
+Relation
+lrep_read_delete(StringInfo in, LOCKMODE lockmode, LREPTupleData *tuple,
+				 bool *pkey_sent)
+{
+	char		action;
+	Oid			reloid;
+	Relation	rel;
+	TupleDesc	desc;
+
+	reloid = RangeVarGetRelid(lrep_read_rel(in), lockmode, false);
+
+	action = pq_getmsgbyte(in);
+	if (action != 'K' && action != 'E')
+		elog(ERROR, "expected action K or E got %c", action);
+
+	rel = heap_open(reloid, NoLock);
+	desc = RelationGetDescr(rel);
+
+	if (action == 'E')
+		*pkey_sent = false;
+	else
+	{
+		lrep_read_tuple_parts(in, desc, tuple);
+		*pkey_sent = true;
+	}
+
+	return rel;
+}
+
+
+/*
+ * Read tuple from stream
+ */
+void
+lrep_read_tuple_parts(StringInfo s, TupleDesc desc, LREPTupleData *tuple)
+{
+	int			i;
+	int			rnatts;
+	char		action;
+
+	action = pq_getmsgbyte(s);
+
+	if (action != 'T')
+		elog(ERROR, "expected TUPLE, got %c", action);
+
+	memset(tuple->nulls, true, sizeof(tuple->nulls));
+	memset(tuple->changed, true, sizeof(tuple->changed));
+
+	rnatts = pq_getmsgint(s, 4);
+
+	if (desc->natts != rnatts)
+		elog(ERROR, "tuple natts mismatch, %u vs %u", desc->natts, rnatts);
+
+	/* FIXME: unaligned data accesses */
+	for (i = 0; i < desc->natts; i++)
+	{
+		Form_pg_attribute att = desc->attrs[i];
+		char		kind = pq_getmsgbyte(s);
+		const char *data;
+		int			len;
+
+		switch (kind)
+		{
+			case 'n': /* null */
+				/* already marked as null */
+				tuple->values[i] = 0xdeadbeef;
+				break;
+			case 'u': /* unchanged column */
+				tuple->nulls[i] = true;
+				tuple->changed[i] = false;
+				tuple->values[i] = 0xdeadbeef; /* make bad usage more obvious */
+
+				break;
+
+			case 'b': /* binary format */
+				tuple->nulls[i] = false;
+				len = pq_getmsgint(s, 4); /* read length */
+
+				data = pq_getmsgbytes(s, len);
+
+				/* and data */
+				if (att->attbyval)
+					tuple->values[i] = fetch_att(data, true, len);
+				else
+					tuple->values[i] = PointerGetDatum(data);
+				break;
+			case 's': /* send/recv format */
+				{
+					Oid typreceive;
+					Oid typioparam;
+					StringInfoData buf;
+
+					tuple->nulls[i] = false;
+					len = pq_getmsgint(s, 4); /* read length */
+
+					getTypeBinaryInputInfo(att->atttypid,
+										   &typreceive, &typioparam);
+
+					/* create StringInfo pointing into the bigger buffer */
+					initStringInfo(&buf);
+					/* and data */
+					buf.data = (char *) pq_getmsgbytes(s, len);
+					buf.len = len;
+					tuple->values[i] = OidReceiveFunctionCall(
+						typreceive, &buf, typioparam, att->atttypmod);
+
+					if (buf.len != buf.cursor)
+						ereport(ERROR,
+								(errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
+								 errmsg("incorrect binary data format")));
+					break;
+				}
+			case 't': /* text format */
+				{
+					Oid typinput;
+					Oid typioparam;
+
+					tuple->nulls[i] = false;
+					len = pq_getmsgint(s, 4); /* read length */
+
+					getTypeInputInfo(att->atttypid, &typinput, &typioparam);
+					/* and data */
+					data = (char *) pq_getmsgbytes(s, len);
+					tuple->values[i] = OidInputFunctionCall(
+						typinput, (char *) data, typioparam, att->atttypmod);
+				}
+				break;
+			default:
+				elog(ERROR, "unknown column type '%c'", kind);
+		}
+
+		if (att->attisdropped && !tuple->nulls[i])
+			elog(ERROR, "data for dropped column");
+	}
+}
+
+/*
+ * Read schema.relation from stream and return as RangeVar.
+ */
+RangeVar *
+lrep_read_rel(StringInfo s)
+{
+	int			relnamelen;
+	int			nspnamelen;
+	RangeVar*	rv;
+
+	rv = makeNode(RangeVar);
+
+	nspnamelen = pq_getmsgint(s, 2);
+	rv->schemaname = (char *) pq_getmsgbytes(s, nspnamelen);
+
+	relnamelen = pq_getmsgint(s, 2);
+	rv->relname = (char *) pq_getmsgbytes(s, relnamelen);
+
+	return rv;
+}
+
+
+/*
+ * Make the executive decision about which protocol to use.
+ */
+static void
+decide_datum_transfer(LREPOutputData *data,
+					  Form_pg_attribute att, Form_pg_type typclass,
+					  bool *use_binary, bool *use_sendrecv)
+{
+	/* always disallow fancyness if there's type representation mismatches */
+	if (data->int_datetime_mismatch &&
+		(att->atttypid == TIMESTAMPOID || att->atttypid == TIMESTAMPTZOID ||
+		 att->atttypid == TIMEOID))
+	{
+		*use_binary = false;
+		*use_sendrecv = false;
+	}
+	/*
+	 * Use the binary protocol, if allowed, for builtin & plain datatypes.
+	 */
+	else if (data->allow_binary_protocol &&
+		typclass->typtype == 'b' &&
+		att->atttypid < FirstNormalObjectId &&
+		typclass->typelem == InvalidOid)
+	{
+		*use_binary = true;
+	}
+	/*
+	 * Use send/recv, if allowed, if the type is plain or builtin.
+	 *
+	 * XXX: we can't use send/recv for array or composite types for now due to
+	 * the embedded oids.
+	 */
+	else if (data->allow_sendrecv_protocol &&
+			 OidIsValid(typclass->typreceive) &&
+			 (att->atttypid < FirstNormalObjectId || typclass->typtype != 'c') &&
+			 (att->atttypid < FirstNormalObjectId || typclass->typelem == InvalidOid))
+	{
+		*use_sendrecv = true;
+	}
+}
+
+
+/*
+ * Option parsing helper functions
+ */
+
+/*
+ * Ensure parameter is non-null
+ */
+void
+lrep_opt_parse_notnull(DefElem *elem, const char *paramtype)
+{
+	if (elem->arg == NULL || strVal(elem->arg) == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("%s parameter \"%s\" had no value",
+				 paramtype, elem->defname)));
+}
+
+
+void
+lrep_opt_parse_uint32(DefElem *elem, uint32 *res)
+{
+	lrep_opt_parse_notnull(elem, "uint32");
+	errno = 0;
+	*res = strtoul(strVal(elem->arg), NULL, 0);
+
+	if (errno != 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("could not parse uint32 value \"%s\" for parameter \"%s\": %m",
+						strVal(elem->arg), elem->defname)));
+}
+
+void
+lrep_opt_parse_size_t(DefElem *elem, size_t *res)
+{
+	lrep_opt_parse_notnull(elem, "size_t");
+	errno = 0;
+	*res = strtoull(strVal(elem->arg), NULL, 0);
+
+	if (errno != 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("could not parse size_t value \"%s\" for parameter \"%s\": %m",
+						strVal(elem->arg), elem->defname)));
+}
+
+void
+lrep_opt_parse_bool(DefElem *elem, bool *res)
+{
+	lrep_opt_parse_notnull(elem, "bool");
+	if (!parse_bool(strVal(elem->arg), res))
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("could not parse boolean value \"%s\" for parameter \"%s\": %m",
+						strVal(elem->arg), elem->defname)));
+}
+
+void
+lrep_opt_parse_identifier_list_arr(DefElem *elem, char ***list, int *len)
+{
+	List	   *namelist;
+	ListCell   *c;
+
+	lrep_opt_parse_notnull(elem, "list");
+
+	if (!SplitIdentifierString(pstrdup(strVal(elem->arg)),
+							  ',', &namelist))
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("could not identifier list value \"%s\" for parameter \"%s\": %m",
+						strVal(elem->arg), elem->defname)));
+	}
+
+	*len = 0;
+	*list = palloc(list_length(namelist) * sizeof(char *));
+
+	foreach(c, namelist)
+	{
+		(*list)[(*len)++] = pstrdup(lfirst(c));
+	}
+	list_free(namelist);
+}
+
+/*
+ * Error reporting for required params.
+ */
+void
+lrep_opt_required_error(const char *param)
+{
+	ereport(ERROR,
+			(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+			 errmsg("missing value for for parameter \"%s\"",
+					param)));
+}
