From b92deb54d10ca0de8b70498bc4898ed0873e36bf Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Mon, 2 Nov 2015 19:34:21 +0800
Subject: [PATCH] Add contrib/pglogical_output, a logical decoding plugin

---
 contrib/Makefile                                   |   1 +
 contrib/pglogical_output/.gitignore                |   6 +
 contrib/pglogical_output/DESIGN.md                 | 124 +++++
 contrib/pglogical_output/Makefile                  |  51 +++
 contrib/pglogical_output/README.md                 | 364 +++++++++++++++
 contrib/pglogical_output/examples/hooks/.gitignore |   1 +
 contrib/pglogical_output/examples/hooks/Makefile   |  20 +
 .../examples/hooks/README.pglogical_output_plhooks | 160 +++++++
 .../hooks/pglogical_output_plhooks--1.0.sql        |  89 ++++
 .../examples/hooks/pglogical_output_plhooks.c      | 414 +++++++++++++++++
 .../hooks/pglogical_output_plhooks.control         |   4 +
 contrib/pglogical_output/expected/basic.out        |  60 +++
 contrib/pglogical_output/expected/hooks.out        |  95 ++++
 contrib/pglogical_output/expected/init.out         |   0
 contrib/pglogical_output/expected/params.out       |  94 ++++
 contrib/pglogical_output/expected/pre-clean.out    |   0
 contrib/pglogical_output/pglogical_config.c        | 498 +++++++++++++++++++++
 contrib/pglogical_output/pglogical_config.h        |  56 +++
 contrib/pglogical_output/pglogical_hooks.c         | 234 ++++++++++
 contrib/pglogical_output/pglogical_hooks.h         |  22 +
 contrib/pglogical_output/pglogical_output.c        | 463 +++++++++++++++++++
 contrib/pglogical_output/pglogical_output.h        |  98 ++++
 contrib/pglogical_output/pglogical_output/README   |   7 +
 contrib/pglogical_output/pglogical_output/compat.h |  19 +
 contrib/pglogical_output/pglogical_output/hooks.h  |  74 +++
 contrib/pglogical_output/pglogical_proto.c         | 484 ++++++++++++++++++++
 contrib/pglogical_output/pglogical_proto.h         |  36 ++
 contrib/pglogical_output/regression.conf           |   2 +
 contrib/pglogical_output/sql/basic.sql             |  49 ++
 contrib/pglogical_output/sql/hooks.sql             |  86 ++++
 contrib/pglogical_output/sql/params.sql            |  77 ++++
 contrib/pglogical_output/test/Makefile             |   6 +
 contrib/pglogical_output/test/README.md            |  91 ++++
 contrib/pglogical_output/test/base.py              | 285 ++++++++++++
 contrib/pglogical_output/test/pglogical_proto.py   | 240 ++++++++++
 .../pglogical_output/test/pglogical_protoreader.py | 112 +++++
 contrib/pglogical_output/test/test_basic.py        |  89 ++++
 contrib/pglogical_output/test/test_binary_mode.py  | 172 +++++++
 contrib/pglogical_output/test/test_filter.py       | 182 ++++++++
 contrib/pglogical_output/test/test_parameters.py   |  80 ++++
 .../test/test_replication_origin.py                | 327 ++++++++++++++
 contrib/pglogical_output/test/test_tuple_fields.py | 159 +++++++
 42 files changed, 5431 insertions(+)
 create mode 100644 contrib/pglogical_output/.gitignore
 create mode 100644 contrib/pglogical_output/DESIGN.md
 create mode 100644 contrib/pglogical_output/Makefile
 create mode 100644 contrib/pglogical_output/README.md
 create mode 100644 contrib/pglogical_output/examples/hooks/.gitignore
 create mode 100644 contrib/pglogical_output/examples/hooks/Makefile
 create mode 100644 contrib/pglogical_output/examples/hooks/README.pglogical_output_plhooks
 create mode 100644 contrib/pglogical_output/examples/hooks/pglogical_output_plhooks--1.0.sql
 create mode 100644 contrib/pglogical_output/examples/hooks/pglogical_output_plhooks.c
 create mode 100644 contrib/pglogical_output/examples/hooks/pglogical_output_plhooks.control
 create mode 100644 contrib/pglogical_output/expected/basic.out
 create mode 100644 contrib/pglogical_output/expected/hooks.out
 create mode 100644 contrib/pglogical_output/expected/init.out
 create mode 100644 contrib/pglogical_output/expected/params.out
 create mode 100644 contrib/pglogical_output/expected/pre-clean.out
 create mode 100644 contrib/pglogical_output/pglogical_config.c
 create mode 100644 contrib/pglogical_output/pglogical_config.h
 create mode 100644 contrib/pglogical_output/pglogical_hooks.c
 create mode 100644 contrib/pglogical_output/pglogical_hooks.h
 create mode 100644 contrib/pglogical_output/pglogical_output.c
 create mode 100644 contrib/pglogical_output/pglogical_output.h
 create mode 100644 contrib/pglogical_output/pglogical_output/README
 create mode 100644 contrib/pglogical_output/pglogical_output/compat.h
 create mode 100644 contrib/pglogical_output/pglogical_output/hooks.h
 create mode 100644 contrib/pglogical_output/pglogical_proto.c
 create mode 100644 contrib/pglogical_output/pglogical_proto.h
 create mode 100644 contrib/pglogical_output/regression.conf
 create mode 100644 contrib/pglogical_output/sql/basic.sql
 create mode 100644 contrib/pglogical_output/sql/hooks.sql
 create mode 100644 contrib/pglogical_output/sql/params.sql
 create mode 100644 contrib/pglogical_output/test/Makefile
 create mode 100644 contrib/pglogical_output/test/README.md
 create mode 100644 contrib/pglogical_output/test/base.py
 create mode 100644 contrib/pglogical_output/test/pglogical_proto.py
 create mode 100644 contrib/pglogical_output/test/pglogical_protoreader.py
 create mode 100644 contrib/pglogical_output/test/test_basic.py
 create mode 100644 contrib/pglogical_output/test/test_binary_mode.py
 create mode 100644 contrib/pglogical_output/test/test_filter.py
 create mode 100644 contrib/pglogical_output/test/test_parameters.py
 create mode 100644 contrib/pglogical_output/test/test_replication_origin.py
 create mode 100644 contrib/pglogical_output/test/test_tuple_fields.py

diff --git a/contrib/Makefile b/contrib/Makefile
index bd251f6..c1e37b2 100644
--- a/contrib/Makefile
+++ b/contrib/Makefile
@@ -35,6 +35,7 @@ SUBDIRS = \
 		pg_stat_statements \
 		pg_trgm		\
 		pgcrypto	\
+		pglogical_output \
 		pgrowlocks	\
 		pgstattuple	\
 		postgres_fdw	\
diff --git a/contrib/pglogical_output/.gitignore b/contrib/pglogical_output/.gitignore
new file mode 100644
index 0000000..2322e13
--- /dev/null
+++ b/contrib/pglogical_output/.gitignore
@@ -0,0 +1,6 @@
+pglogical_output.so
+results/
+regression.diffs
+tmp_install/
+tmp_check/
+log/
diff --git a/contrib/pglogical_output/DESIGN.md b/contrib/pglogical_output/DESIGN.md
new file mode 100644
index 0000000..05fb4d1
--- /dev/null
+++ b/contrib/pglogical_output/DESIGN.md
@@ -0,0 +1,124 @@
+# Design decisions
+
+Explanations of why things are done the way they are.
+
+## Why does pglogical_output exist when there's wal2json etc?
+
+`pglogical_output` does plenty more than convert logical decoding change
+messages to a wire format and send them to the client.
+
+It handles format negotiations, sender-side filtering using pluggable hooks
+(and the associated plugin handling), etc. The protocol its self is also
+important, and incorporates elements like binary datum transfer that can't be
+easily or efficiently achieved with json.
+
+## Custom binary protocol
+
+Why do we have a custom binary protocol inside the walsender / copy both protocol,
+rather than using a json message representation?
+
+Speed and compactness. It's expensive to create json, with lots of allocations.
+It's expensive to decode it too. You can't represent raw binary in json, and must
+encode it, which adds considerable overhead for some data types. Using the
+obvious, easy to decode json representations also makes it difficult to do
+later enhancements planned for the protocol and decoder, like caching row
+metadata.
+
+The protocol implementation is fairly well encapsulated, so in future it should
+be possible to emit json instead for clients that request it. Right now that's
+not the priority as tools like wal2json already exist for that.
+
+## Column metadata
+
+The output plugin sends metadata for columsn - at minimum, the column names -
+before each row. It will soon be changed to send the data before each row from
+a new, different table, so that streams of inserts from COPY etc don't repeat
+the metadata each time. That's just a pending feature.
+
+The reason metadata must be sent is that the upstream and downstream table's
+attnos don't necessarily correspond. The column names might, and their ordering
+might even be the same, but any column drop or column type change will result
+in a dropped column on one side. So at the user level the tables look the same,
+but their attnos don't match, and if we rely on attno for replication we'll get
+the wrong data in the wrong columns. Not pretty.
+
+That could be avoided by requiring that the downstream table be strictly
+maintained by DDL replication, but:
+
+* We don't want to require DDL replication
+* That won't work with multiple upstreams feeding into a table
+* The initial table creation still won't be correct if the table has dropped
+  columns, unless we (ab)use `pg_dump`'s `--binary-upgrade` support to emit
+  tables with dropped columns, which we don't want to do.
+
+So despite the bandwidth cost, we need to send metadata.
+
+In future a client-negotiated cache is planned, so that clients can announce
+to the output plugin that they can cache metadata across change series, and
+metadata can only be sent when invalidated by relation changes or when a new
+relation is seen.
+
+Support for type metadata is penciled in to the protocol so that clients that
+don't have table definitions at all - like queueing engines - can decode the
+data. That'll also permit type validation sanity checking on the apply side
+with logical replication.
+
+## Hook entry point as a SQL function
+
+The hooks entry point is a SQL function that populates a passed `internal`
+struct with hook function pointers.
+
+The reason for this is that hooks are specified by a remote peer over the
+network. We can't just let the peer say "dlsym() this arbitrary function name
+and call it with these arguments" for fairly obvious security reasons. At bare
+minimum all replication using hooks would have to be superuser-only if we did
+that.
+
+The SQL entry point is only called once per decoding session and the rest of
+the calls are plain C function pointers.
+
+## The startup reply message
+
+The protocol design choices available to `pg_logical` are constrained by being
+contained in the copy-both protocol within the fe/be protocol, running as a
+logical decoding plugin. The plugin has no direct access to the network socket
+and can't send or receive messages whenever it wants, only under the control of
+the walsender and logical decoding framework.
+
+The only opportunity for the client to send data directly to the logical
+decoding plugin is in the  `START_REPLICATION` parameters, and it can't send
+anything to the client before that point.
+
+This means there's no opportunity for a multi-way step negotiation between
+client and server. We have to do all the negotiation we're going to in a single
+exchange of messages - the setup parameters and then the replication start
+message. All the client can do if it doesn't like the offer the server makes is
+disconnect and try again with different parameters.
+
+That's what the startup message is for. It reports the plugin's capabilities
+and tells the client which requested options were honoured. This gives the
+client a chance to decide if it's happy with the output plugin's decision
+or if it wants to reconnect and try again with different options. Iterative
+negotiation, effectively.
+
+## Unrecognised parameters MUST be ignored by client and server
+
+To ensure upward and downward compatibility, the output plugin must ignore
+parameters set by the client if it doesn't recognise them, and the client
+must ignore parameters it doesn't recognise in the server's startup reply
+message.
+
+This ensures that older clients can talk to newer servers and vice versa.
+
+For this to work, the server must never enable new functionality such as
+protocol message types, row formats, etc without the client explicitly
+specifying via a startup parameter that it understands the new functionality.
+Everything must be negotiated.
+
+Similarly, a newer client talking to an older server may ask the server to
+enable functionality, but it can't assume the server will actually honour that
+request. It must check the server's startup reply message to see if the server
+confirmed that it enabled the requested functionality. It might choose to
+disconnect and report an error to the user if the server didn't do what it
+asked. This can be important, e.g. when a security-significant hook is
+specified.
diff --git a/contrib/pglogical_output/Makefile b/contrib/pglogical_output/Makefile
new file mode 100644
index 0000000..8c59eb0
--- /dev/null
+++ b/contrib/pglogical_output/Makefile
@@ -0,0 +1,51 @@
+MODULE_big = pglogical_output
+PGFILEDESC = "pglogical_output - logical replication output plugin"
+
+OBJS = pglogical_output.o pglogical_proto.o pglogical_config.o pglogical_hooks.o
+
+REGRESS = params basic hooks
+
+
+ifdef USE_PGXS
+
+# For regression checks
+# http://www.postgresql.org/message-id/CAB7nPqTsR5o3g-fBi6jbsVdhfPiLFWQ_0cGU5=94Rv_8W3qvFA@mail.gmail.com
+# this makes "make check" give a useful error
+abs_top_builddir = .
+NO_TEMP_INSTALL = yes
+# Usual recipe
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+
+# These don't do anything yet, since temp install is disabled
+EXTRA_INSTALL += ./examples/hooks
+REGRESS_OPTS += --temp-config=regression.conf
+
+plhooks:
+	make -C examples/hooks USE_PGXS=1 clean install
+
+installcheck: plhooks
+
+else
+
+subdir = contrib/pglogical_output
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+
+# 'make installcheck' disabled when building in-tree because these tests
+# require "wal_level=logical", which typical installcheck users do not have
+# (e.g. buildfarm clients).
+installcheck:
+	;
+
+EXTRA_INSTALL += $(subdir)/examples/hooks
+EXTRA_REGRESS_OPTS += --temp-config=./regression.conf
+
+endif
+
+install: all
+	$(MKDIR_P) '$(DESTDIR)$(includedir)'/pglogical_output
+	$(INSTALL_DATA) pglogical_output/compat.h '$(DESTDIR)$(includedir)'/pglogical_output
+	$(INSTALL_DATA) pglogical_output/hooks.h '$(DESTDIR)$(includedir)'/pglogical_output
diff --git a/contrib/pglogical_output/README.md b/contrib/pglogical_output/README.md
new file mode 100644
index 0000000..d547774
--- /dev/null
+++ b/contrib/pglogical_output/README.md
@@ -0,0 +1,364 @@
+# `pglogical` Output Plugin
+
+This is the [logical decoding](http://www.postgresql.org/docs/current/static/logicaldecoding.html)
+[output plugin](http://www.postgresql.org/docs/current/static/logicaldecoding-output-plugin.html)
+for `pglogical`. Its purpose is to extract a change stream from a PostgreSQL
+database and send it to a client over a network connection using a
+well-defined, efficient protocol that multiple different applications can
+consume.
+
+The primary purpose of `pglogical_output` is to supply data to logical
+streaming replication solutions, but any application can potentially use its
+data stream. The output stream is designed to be compact and fast to decode,
+and the plugin supports upstream filtering of data so that only the required
+information is sent.
+
+No triggers are required to collect the change stream and no external ticker or
+other daemon is required. It's accumulated using
+[replication slots](http://www.postgresql.org/docs/current/static/logicaldecoding-explanation.html#AEN66446),
+as supported in PostgreSQL 9.4 or newer, and sent on top of the
+[PostgreSQL streaming replication protocol](http://www.postgresql.org/docs/current/static/protocol-replication.html).
+
+Unlike block-level ("physical") streaming replication, the change stream from
+the `pglogical` output plugin is compatible across different PostgreSQL
+versions and can even be consumed by non-PostgreSQL clients.
+
+The use of a replication slot means that the change stream is reliable and
+crash-safe. If the client disconnects or crashes it can reconnect and resume
+replay from the last message that client processed. Server-side changes that
+occur while the client is disconnected are accumulated in the queue to be sent
+when the client reconnects. This reliabiliy also means that server-side
+resources are consumed whether or not a client is connected.
+
+# Why another output plugin?
+
+See [`DESIGN.md`](DESIGN.md) for a discussion of why using one of the existing
+generic logical decoding output plugins like `wal2json` to drive a logical
+replication downstream isn't ideal. It's mostly about speed.
+
+# Architecture and high level interaction
+
+The output plugin is loaded by a PostgreSQL walsender process when a client
+connects to PostgreSQL using the PostgreSQL wire protocol with connection
+option `replication=database`, then uses
+[the `CREATE_REPLICATION_SLOT ... LOGICAL ...` or `START_REPLICATION SLOT ... LOGICAL ...` commands](http://www.postgresql.org/docs/current/static/logicaldecoding-walsender.html) to start streaming changes. (It can also be used via
+[SQL level functions](http://www.postgresql.org/docs/current/static/logicaldecoding-sql.html)
+over a non-replication connection, but this is mainly for debugging purposes).
+
+The client supplies parameters to the  `START_REPLICATION SLOT ... LOGICAL ...`
+command to specify the version of the `pglogical` protocol it supports,
+whether it wants binary format, etc.
+
+The output plugin processes the connection parameters and the connection enters
+streaming replication protocol mode, sometimes called "COPY BOTH" mode because
+it's based on the protocol used for the `COPY` command.  PostgreSQL then calls
+functions in this plugin to send it a stream of transactions to decode and
+translate into network messages. This stream of changes continues until the
+client disconnects.
+
+The only client-to-server interaction after startup is the sending of periodic
+feedback messages that allow the replication slot to discard no-longer-needed
+change history. The client *must* send feedback, otherwise `pg_xlog` on the
+server will eventually fill up and the server will stop working.
+
+
+# Usage
+
+The overall flow of client/server interaction is:
+
+* Client makes PostgreSQL fe/be protocol connection to server
+    * Connection options must include `replication=database` and `dbname=[...]` parameters
+    * The PostgreSQL client library can be `libpq` or anything else that supports the replication sub-protocol
+    * The same mechanisms are used for authentication and protocol encryption as for a normal non-replication connection
+* [Client issues `IDENTIFY_SYSTEM`
+    * Server responds with a single row containing system identity info
+* Client issues `CREATE_REPLICATION_SLOT slotname LOGICAL 'pglogical'` if it's setting up for the first time
+    * Server responds with success info and a snapshot identifier
+    * Client may at this point use the snapshot identifier on other connections while leaving this one idle
+* Client issues `START_REPLICATION SLOT slotname LOGICAL 0/0 (...options...)` to start streaming, which loops:
+    * Server emits `pglogical` message block encapsulated in a replication protocol `CopyData` message
+    * Client receives and unwraps message, then decodes the `pglogical` message block
+    * Client intermittently sends a standby status update message to server to confirm replay
+* ... until client sends a graceful connection termination message on the fe/be protocol level or the connection is broken
+
+ The details of `IDENTIFY_SYSTEM`, `CREATE_REPLICATION_SLOT` and `START_REPLICATION` are discussed in the [replication protocol docs](http://www.postgresql.org/docs/current/static/protocol-replication.html) and will not be repeated here.
+
+## Make a replication connection
+
+To use the `pglogical` plugin you must first establish a PostgreSQL FE/BE
+protocol connection using the client library of your choice, passing
+`replication=database` as one of the connection parameters. `database` is a
+literal string and is not replaced with the database name; instead the database
+name is passed separately in the usual `dbname` parameter. Note that
+`replication` is not a GUC (configuration parameter) and may not be passed in
+the `options` parameter on the connection, it's a top-level parameter like
+`user` or `dbname`.
+
+Example connection string for `libpq`:
+
+    'user=postgres replication=database sslmode=verify-full dbname=mydb'
+
+The plug-in name to pass on logical slot creation is `'pglogical'`.
+
+Details are in the replication protocol docs.
+
+## Get system identity
+
+If required you can use the `IDENTIFY_SYSTEM` command, which reports system
+information:
+
+	  systemid       | timeline |  xlogpos  | dbname | dboid
+    ---------------------+----------+-----------+--------+-------
+     6153224364663410513 |        1 | 0/C429C48 | testd  | 16385
+    (1 row)
+
+Details in the replication protocol docs.
+
+## Create the slot if required
+
+If your application creates its own slots on first use and hasn't previously
+connected to this database on this system you'll need to create a replication
+slot. This keeps track of the client's replay state even while it's disconnected.
+
+The slot name may be anything your application wants up to a limit of 63
+characters in length. It's strongly advised that the slot name clearly identify
+the application and the host it runs on.
+
+Pass `pglogical` as the plugin name.
+
+e.g.
+
+    CREATE_REPLICATION_SLOT "reporting_host_42" LOGICAL "pglogical";
+
+`CREATE_REPLICATION_SLOT` returns a snapshot identifier that may be used with
+[`SET TRANSACTION SNAPSHOT`](http://www.postgresql.org/docs/current/static/sql-set-transaction.html)
+to see the database's state as of the moment of the slot's creation. The first
+change streamed from the slot will be the change immediately after this
+snapshot was taken. The snapshot is useful when cloning the initial state of a
+database being replicted. Applications that want to see the change stream
+going forward, but don't care about the initial state, can ignore this. The
+snapshot is only valid as long as the connection that issued the
+`CREATE_REPLICATION_SLOT` remains open and has not run another command.
+
+## Send replication parameters
+
+The client now sends:
+
+    START_REPLICATION SLOT "the_slot_name" LOGICAL (
+	'Expected_encoding', 'UTF8',
+	'Max_proto_major_version', '1',
+	'Min_proto_major_version', '1',
+	...moreparams...
+    );
+
+to start replication.
+
+The parameters are very important for ensuring that the plugin accepts
+the replication request and streams changes in the expected form. `pglogical`
+parameters are discussed in the separate `pglogical` protocol documentation.
+
+## Process the startup message
+
+`pglogical`'s output plugin will send a `CopyData` message containing its
+startup message as the first protocol message. This message contains a
+set of key/value entries describing the capabilities of the upstream output
+plugin, its version and the Pg version, the tuple format options selected,
+etc.
+
+The downstream client may choose to cleanly close the connection and disconnect
+at this point if it doesn't like the reply. It might then inform the user
+or reconnect with different parameters based on what it learned from the
+first connection's startup message.
+
+## Consume the change stream
+
+`pglogical`'s output plugin now sends a continuous series of `CopyData`
+protocol messages, each of which encapsulates a `pglogical` protocol message
+as documented in the separate protocol docs.
+
+These messages provide information about transaction boundaries, changed
+rows, etc.
+
+The stream continues until the client disconnects, the upstream server is
+restarted, the upstream walsender is terminated by admin action, there's
+a network issue, or the connection is otherwise broken.
+
+The client should send periodic feedback messages to the server to acknowledge
+that it's replayed to a given point and let the server release the resources
+it's holding in case that change stream has to be replayed again. See
+["Hot standby feedback message" in the replication protocol docs](http://www.postgresql.org/docs/current/static/protocol-replication.html)
+for details.
+
+## Disconnect gracefully
+
+Disconnection works just like any normal client; you use your client library's
+usual method for closing the connection. No special action is required before
+disconnection, though it's usually a good idea to send a final standby status
+message just before you disconnect.
+
+# Tests
+
+There are two sets of tests bundled with `pglogical_output`: the `pg_regress`
+regression tests and some custom Python tests for the protocol.
+
+The `pg_regress` tests check invalid parameter handling and basic
+functionality.  They're intended for use by the buildfarm using an in-tree
+`make check`, but may also be run with an out-of-tree PGXS build against an
+existing PostgreSQL install using `make USE_PGXS=1 clean installcheck`.
+
+The Python tests are more comprehensive, and examine the data sent by
+the extension at the protocol level, validating the protocol structure,
+order and contents. They can run using the SQL-level logical decoding
+interface or, with a psycopg2 containing https://github.com/psycopg/psycopg2/pull/322,
+with the walsender / streaming replication protocol. The Python-based tests
+exercise the internal binary format support, too. See `test/README.md` for
+details.
+
+The tests may fail on installations that are not utf-8 encoded because the
+payloads of the binary protocol output will have text in different encodings,
+which aren't visible to psql as text to be decoded. Avoiding anything except
+7-bit ascii in the tests *should* prevent the problem.
+
+# Hooks
+
+`pglogical_output` exposes a number of extension points where applications can
+modify or override its behaviour.
+
+All hooks are called in their own memory context, which lasts for the duration
+of the logical decoding session. They may switch to longer lived contexts if
+needed, but are then responsible for their own cleanup.
+
+## Hook setup function
+
+The downstream must specify the fully-qualified name of a SQL-callable function
+on the server as the value of the `hooks.setup_function` client parameter.
+The SQL signature of this function is
+
+    CREATE OR REPLACE FUNCTION funcname(hooks internal, memory_context internal)
+    RETURNS void STABLE
+    LANGUAGE c AS 'MODULE_PATHNAME';
+
+Permissions are checked. This function must be callable by the user that the
+output plugin is running as.
+
+The function receives a pointer to a newly allocated structure of hook function
+pointers to populate as its first argument. The function must not free the
+argument.
+
+If the hooks need a private data area to store information across calls, the
+setup function should get the `MemoryContext` pointer from the 2nd argument,
+then `MemoryContextAlloc` a struct for the data in that memory context and
+store the pointer to it in `hooks->hooks_private_data`. This will then be
+accessible on future calls to hook functions. It need not be manually freed, as
+the memory context used for logical decoding will free it when it's freed.
+Don't put anything in it that needs manual cleanup.
+
+Each hook has its own C signature (defined below) and the pointers must be
+directly to the functions. Hooks that the client does not wish to set must be
+left null.
+
+An example is provided in `examples/hooks` and the argument structs are defined
+in `pglogical_output/hooks.h`, which is installed into the PostgreSQL source
+tree when the extension is installed.
+
+## Startup hook
+
+The startup hook is called when logical decoding starts.
+
+This hook can inspect the parameters passed by the client to the output
+plugin as in_params. These parameters *must not* be modified.
+
+It can add new parameters to the set to be returned to the client in the
+startup parameters message, by appending to List out_params, which is
+initially NIL. Each element must be a `DefElem` with the param name
+as the `defname` and a `String` value as the arg, as created with
+`makeDefElem(...)`. It and its contents must be allocated in the
+logical decoding memory context.
+
+For walsender based decoding the startup hook is called only once, and
+cleanup might not be called at the end of the session.
+
+Multiple decoding sessions, and thus multiple startup hook calls, may happen
+in a session if the SQL interface for logical decoding is being used. In
+that case it's guaranteed that the cleanup hook will be called between each
+startup.
+
+When successfully enabled, the output parameter `hooks.startup_hook_enabled` is
+set to true in the startup reply message.
+
+## Transaction filter hook
+
+The transaction filter hook can exclude entire transactions from being decoded
+and replicated based on the node they originated from.
+
+It is passed a `const TxFilterHookArgs *` containing:
+
+* The hook argument supplied by the client, if any
+* The `RepOriginId` that this transaction originated from
+
+and must return boolean, where true retains the transaction for sending to the
+client and false discards it. (Note that this is the reverse sense of the low
+level logical decoding transaction filter hook).
+
+The hook function must *not* free the argument struct or modify its contents.
+
+The transaction filter hook is only called on PostgreSQL 9.5 and above. It
+is ignored on 9.4.
+
+Note that individual changes within a transaction may have different origins to
+the transaction as a whole; see "Origin filtering" for more details. If a
+transaction is filtered out, all changes are filtered out even if their origins
+differ from that of the transaction as a whole.
+
+When successfully enabled, the output parameter
+`hooks.transaction_filter_enabled` is set to true in the startup reply message.
+
+## Row filter hook
+
+The row filter hook is called for each row. It is passed information about the
+table, the transaction origin, and the row origin.
+
+It is passed a `const RowFilterHookArgs*` containing:
+
+* The hook argument supplied by the client, if any
+* The `Relation` the change affects
+* The change type - 'I'nsert, 'U'pdate or 'D'elete
+
+It can return true to retain this row change, sending it to the client, or
+false to discard it.
+
+The function *must not* free the argument struct or modify its contents.
+
+Note that it is more efficient to exclude whole transactions with the
+transaction filter hook rather than filtering out individual rows.
+
+When successfully enabled, the output parameter
+`hooks.row_filter_enabled` is set to true in the startup reply message.
+
+## Shutdown hook
+
+The shutdown hook is called when a decoding session ends. You can't rely on
+this hook being invoked reliably, since a replication-protocol walsender-based
+session might just terminate. It's mostly useful for cleanup to handle repeated
+invocations under the SQL interface to logical decoding.
+
+You don't need a hook to free memory you allocated, unless you explicitly
+switched to a longer lived memory context like TopMemoryContext. Memory allocated
+in the hook context will be automatically when the decoding session shuts down.
+
+## Hook example
+
+... TODO ...
+
+## Writing hooks in procedural languages
+
+You can write hooks in PL/PgSQL, etc, too.
+
+There's a default hook setup callback `pglogical_output_default_hooks` that
+returns a set of hook functions which call PostgreSQL PL functions and return
+the results. They act as C-to-PL wrappers. The PostgreSQL PL functions to call
+for each hook are found by <XXX how? we don't want to use the hook arg, since
+we want it free to use in the hooks themselves. a new param read by startup
+hook?>
+
+... TODO examples ....
diff --git a/contrib/pglogical_output/examples/hooks/.gitignore b/contrib/pglogical_output/examples/hooks/.gitignore
new file mode 100644
index 0000000..140f8cf
--- /dev/null
+++ b/contrib/pglogical_output/examples/hooks/.gitignore
@@ -0,0 +1 @@
+*.so
diff --git a/contrib/pglogical_output/examples/hooks/Makefile b/contrib/pglogical_output/examples/hooks/Makefile
new file mode 100644
index 0000000..501cb38
--- /dev/null
+++ b/contrib/pglogical_output/examples/hooks/Makefile
@@ -0,0 +1,20 @@
+MODULES = pglogical_output_plhooks
+EXTENSION = pglogical_output_plhooks
+DATA = pglogical_output_plhooks--1.0.sql
+DOCS = README.pglogical_output_plhooks
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+PG_CPPFLAGS = -I../..
+include $(PGXS)
+else
+subdir = contrib/pglogical_output/examples/hooks
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+
+# Allow the hook plugin to see the pglogical_output headers
+# Necessary because !PGXS builds don't respect PG_CPPFLAGS
+override CPPFLAGS := $(CPPFLAGS) -I../..
+endif
diff --git a/contrib/pglogical_output/examples/hooks/README.pglogical_output_plhooks b/contrib/pglogical_output/examples/hooks/README.pglogical_output_plhooks
new file mode 100644
index 0000000..4f0ce81
--- /dev/null
+++ b/contrib/pglogical_output/examples/hooks/README.pglogical_output_plhooks
@@ -0,0 +1,160 @@
+pglogical_output_plhooks is an example module for pglogical_output, showing how
+hooks can be implemented.
+
+It provides C wrappers to allow hooks to be written in any supported PL,
+such as PL/PgSQL.
+
+No effort is made to be efficient. To avoid the need to set up cache
+invalidation handling function calls are done via oid each time, with no
+FmgrInfo caching.  Also, memory contexts are reset rather freely. If you
+want efficiency, write your hook in C.
+
+(Catalog timetravel is another reason not to write hooks in PLs; see below).
+
+Simple pointless example
+===
+
+To compile and install, just "make USE_PGXS=1 install". Note that pglogical
+must already be installed so that its headers can be found. You might have
+to set the `PATH` so that `pg_config` can be found.
+
+To use it:
+
+    CREATE EXTENSION pglogical_output_plhooks IN SCHEMA public;
+
+in the target database.
+
+Then create at least one hook procedure, of the supported hooks listed below.
+For the sake of this example we'll use some of the toy examples provided in the
+extension:
+
+* startup function: pglo_plhooks_demo_startup
+* row filter: pglo_plhooks_demo_row_filter
+* txn filter: pglo_plhooks_demo_txn_filter
+* shutdown function: pglo_plhooks_demo_shutdown
+
+Now add some arguments to your pglogical_output client's logical decoding setup
+parameters to specify the hook setup function and to tell
+pglogical_output_plhooks about one or more of the hooks you wish it to run. For
+example you might add the following parameters:
+
+	hooks.setup_function, public.pglo_plhooks_setup_fn,
+	pglo_plhooks.startup_hook, pglo_plhooks_demo_startup,
+	pglo_plhooks.row_filter_hook, pglo_plhooks_demo_row_filter,
+	pglo_plhooks.txn_filter_hook, pglo_plhooks_demo_txn_filter,
+	pglo_plhooks.shutdown_hook, pglo_plhooks_demo_shutdown,
+	pglo_plhooks.client_hook_arg, 'whatever-you-want'
+
+to configure the extension to load its hooks, then configure all the demo hooks.
+
+Why the preference for C hooks?
+===
+
+Speed. The row filter hook is called for *every single row* replicated.
+
+If a hook raises an ERROR then replication will probably stop. You won't be
+able to fix it either, because when you change the hook definition the new
+definition won't be visible in the catalogs at the current replay position due
+to catalog time travel. The old definition that raises an error will keep being
+used. You'll need to remove the problem hook from your logical decoding startup
+parameters, which will disable use the hook entirely, until replay proceeds
+past the point you fixed the problem with the hook function.
+
+Similarly, if you try to add use of a newly defined hook on an existing
+replication slot that hasn't replayed past the point you defined the hook yet,
+you'll get an error complaining that the hook function doesn't exist. Even
+though it clearly does when you look at it in psql. The reason is the same: in
+the time traveled catalogs it really doesn't exist. You have to replay past the
+point the hook was created then enable it. In this case the
+pglogical_output_plhooks startup hook will actually see your functions, but
+fail when it tries to call them during decoding since they'll appear to have
+vanished.
+
+If you write your hooks in C you can redefine them rather more easily, since
+the function definition is not subject to catalog timetravel. More importantly,
+it'll probably be a lot faster. The plhooks code has to do a lot of translation
+to pass information to the PL functions and more to get results back; it also
+has to do a lot of memory allocations and a memory context reset after each
+call. That all adds up.
+
+(You could actually write C functions to be called by this extension, but
+that'd be crazy.)
+
+Available hooks
+===
+
+The four hooks provided by pglogical_output are exposed by the module. See the
+pglogical_output documentation for details on what each hook does and when it
+runs.
+
+A function for each hook must have *exactly* the specified parameters and
+return value, or you'll get an error.
+
+None of the functions may return NULL. If they do you'll get an error.
+
+If you specified `pglo_plhooks.client_hook_arg` in the startup parameters it is
+passed as `client_hook_arg` to all hooks. If not specified the empty string is
+passed.
+
+You can find some toy examples in `pglogical_output_plhooks--1.0.sql`.
+
+
+
+Startup hook
+---
+
+Configured with `pglo_plhooks.startup_hook` startup parameter. Runs when
+logical decoding starts.
+
+Signature *must* be:
+
+    CREATE FUNCTION whatever_funcname(startup_params text[], client_hook_arg text)
+    RETURNS text[]
+
+startup_params is an array of the startup params passed to the pglogical output
+plugin, as alternating key/value elements in text representation.
+
+client_hook_arg is also passed.
+
+The return value is an array of alternating key/value elements forming a set
+of parameters you wish to add to the startup reply message sent by pglogical
+on decoding start. It must not be null; return `ARRAY[]::text[]` if you don't
+want to add any params.
+
+Transaction filter
+---
+
+Called only on 9.5+; ignored on 9.4.
+
+The arguments are the replication origin identifier and the client hook param.
+
+The return value is true to keep the transaction, false to discard it.
+
+Signature:
+
+	CREATE FUNCTION whatevername(origin_id int, client_hook_arg text)
+	RETURNS boolean
+
+Row filter
+--
+
+Called for each row. Return true to replicate the row, false to discard it.
+
+Arguments are the oid of the affected relation, and the change type: 'I'nsert,
+'U'pdate or 'D'elete. There is no way to access the change data - columns changed,
+new values, etc.
+
+Signature:
+
+	CREATE FUNCTION whatevername(affected_rel regclass, change_type "char", client_hook_arg text)
+	RETURNS boolean
+
+Shutdown hook
+--
+
+Pretty uninteresting, but included for completeness.
+
+Signature:
+
+	CREATE FUNCTION whatevername(client_hook_arg text)
+	RETURNS void
diff --git a/contrib/pglogical_output/examples/hooks/pglogical_output_plhooks--1.0.sql b/contrib/pglogical_output/examples/hooks/pglogical_output_plhooks--1.0.sql
new file mode 100644
index 0000000..cdd2af3
--- /dev/null
+++ b/contrib/pglogical_output/examples/hooks/pglogical_output_plhooks--1.0.sql
@@ -0,0 +1,89 @@
+\echo Use "CREATE EXTENSION pglogical_output_plhooks" to load this file. \quit
+
+-- Use @extschema@ or leave search_path unchanged, don't use explicit schema
+
+CREATE FUNCTION pglo_plhooks_setup_fn(internal)
+RETURNS void
+STABLE
+LANGUAGE c AS 'MODULE_PATHNAME';
+
+COMMENT ON FUNCTION pglo_plhooks_setup_fn(internal)
+IS 'Register pglogical output pl hooks. See docs for how to specify functions';
+
+--
+-- Called as the startup hook.
+--
+-- There's no useful way to expose the private data segment, so you
+-- just don't get to use that from pl hooks at this point. The C
+-- wrapper will extract a startup param named pglo_plhooks.client_hook_arg
+-- for you and pass it as client_hook_arg to all callbacks, though.
+--
+-- For implementation convenience, a null client_hook_arg is passed
+-- as the empty string.
+--
+-- Must return the empty array, not NULL, if it has nothing to add.
+--
+CREATE FUNCTION pglo_plhooks_demo_startup(startup_params text[], client_hook_arg text)
+RETURNS text[]
+LANGUAGE plpgsql AS $$
+DECLARE
+    elem text;
+	paramname text;
+	paramvalue text;
+BEGIN
+	FOREACH elem IN ARRAY startup_params
+	LOOP
+		IF elem IS NULL THEN
+				RAISE EXCEPTION 'Startup params may not be null';
+		END IF;
+
+		IF paramname IS NULL THEN
+				paramname := elem;
+		ELSIF paramvalue IS NULL THEN
+				paramvalue := elem;
+		ELSE
+				RAISE NOTICE 'got param: % = %', paramname, paramvalue;
+				paramname := NULL;
+				paramvalue := NULL;
+		END IF;
+	END LOOP;
+
+	RETURN ARRAY['pglo_plhooks_demo_startup_ran', 'true', 'otherparam', '42'];
+END;
+$$;
+
+CREATE FUNCTION pglo_plhooks_demo_txn_filter(origin_id int, client_hook_arg text)
+RETURNS boolean
+LANGUAGE plpgsql AS $$
+BEGIN
+		-- Not much to filter on really...
+		RAISE NOTICE 'Got tx with origin %',origin_id;
+		RETURN true;
+END;
+$$;
+
+CREATE FUNCTION pglo_plhooks_demo_row_filter(affected_rel regclass, change_type "char", client_hook_arg text)
+RETURNS boolean
+LANGUAGE plpgsql AS $$
+BEGIN
+		-- This is a totally absurd test, since it checks if the upstream user
+		-- doing replication has rights to make modifications that have already
+		-- been committed and are being decoded for replication. Still, it shows
+		-- how the hook works...
+		IF pg_catalog.has_table_privilege(current_user, affected_rel,
+				CASE change_type WHEN 'I' THEN 'INSERT' WHEN 'U' THEN 'UPDATE' WHEN 'D' THEN 'DELETE' END)
+		THEN
+				RETURN true;
+		ELSE
+				RETURN false;
+		END IF;
+END;
+$$;
+
+CREATE FUNCTION pglo_plhooks_demo_shutdown(client_hook_arg text)
+RETURNS void
+LANGUAGE plpgsql AS $$
+BEGIN
+		RAISE NOTICE 'Decoding shutdown';
+END;
+$$
diff --git a/contrib/pglogical_output/examples/hooks/pglogical_output_plhooks.c b/contrib/pglogical_output/examples/hooks/pglogical_output_plhooks.c
new file mode 100644
index 0000000..a5144f0
--- /dev/null
+++ b/contrib/pglogical_output/examples/hooks/pglogical_output_plhooks.c
@@ -0,0 +1,414 @@
+#include "postgres.h"
+
+#include "pglogical_output/hooks.h"
+
+#include "access/xact.h"
+
+#include "catalog/pg_type.h"
+
+#include "nodes/makefuncs.h"
+
+#include "parser/parse_func.h"
+
+#include "replication/reorderbuffer.h"
+
+#include "utils/acl.h"
+#include "utils/array.h"
+#include "utils/builtins.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/rel.h"
+
+#include "fmgr.h"
+#include "miscadmin.h"
+
+PG_MODULE_MAGIC;
+
+PGDLLEXPORT extern Datum pglo_plhooks_setup_fn(PG_FUNCTION_ARGS);
+PG_FUNCTION_INFO_V1(pglo_plhooks_setup_fn);
+
+void pglo_plhooks_startup(struct PGLogicalStartupHookArgs *startup_args);
+void pglo_plhooks_shutdown(struct PGLogicalShutdownHookArgs *shutdown_args);
+bool pglo_plhooks_row_filter(struct PGLogicalRowFilterArgs *rowfilter_args);
+bool pglo_plhooks_txn_filter(struct PGLogicalTxnFilterArgs *txnfilter_args);
+
+typedef struct PLHPrivate
+{
+	const char *client_arg;
+	Oid startup_hook;
+	Oid shutdown_hook;
+	Oid row_filter_hook;
+	Oid txn_filter_hook;
+	MemoryContext hook_call_context;
+} PLHPrivate;
+
+static void read_parameters(PLHPrivate *private, List *in_params);
+static Oid find_startup_hook(const char *proname);
+static Oid find_shutdown_hook(const char *proname);
+static Oid find_row_filter_hook(const char *proname);
+static Oid find_txn_filter_hook(const char *proname);
+static void exec_user_startup_hook(PLHPrivate *private, List *in_params, List **out_params);
+
+void
+pglo_plhooks_startup(struct PGLogicalStartupHookArgs *startup_args)
+{
+	PLHPrivate *private;
+
+	/* pglogical_output promises to call us in a tx */
+	Assert(IsTransactionState());
+
+	/* Allocated in hook memory context, scoped to the logical decoding session: */
+	startup_args->private_data = private = (PLHPrivate*)palloc(sizeof(PLHPrivate));
+
+	private->startup_hook = InvalidOid;
+	private->shutdown_hook = InvalidOid;
+	private->row_filter_hook = InvalidOid;
+	private->txn_filter_hook = InvalidOid;
+	/* client_arg is the empty string when not specified to simplify function calls */
+	private->client_arg = "";
+
+	read_parameters(private, startup_args->in_params);
+
+	private->hook_call_context = AllocSetContextCreate(CurrentMemoryContext,
+			                    "pglogical_output plhooks hook call context",
+			                    ALLOCSET_SMALL_MINSIZE,
+			                    ALLOCSET_SMALL_INITSIZE,
+			                    ALLOCSET_SMALL_MAXSIZE);
+
+
+	if (private->startup_hook != InvalidOid)
+		exec_user_startup_hook(private, startup_args->in_params, &startup_args->out_params);
+}
+
+void
+pglo_plhooks_shutdown(struct PGLogicalShutdownHookArgs *shutdown_args)
+{
+	PLHPrivate *private = (PLHPrivate*)shutdown_args->private_data;
+	MemoryContext old_ctx;
+
+	Assert(private != NULL);
+
+	if (OidIsValid(private->shutdown_hook))
+	{
+		old_ctx = MemoryContextSwitchTo(private->hook_call_context);
+		elog(DEBUG3, "calling pglo shutdown hook with %s", private->client_arg);
+		(void) OidFunctionCall1(
+				private->shutdown_hook,
+				CStringGetTextDatum(private->client_arg));
+		elog(DEBUG3, "called pglo shutdown hook");
+		MemoryContextSwitchTo(old_ctx);
+		MemoryContextReset(private->hook_call_context);
+	}
+}
+
+bool
+pglo_plhooks_row_filter(struct PGLogicalRowFilterArgs *rowfilter_args)
+{
+	PLHPrivate *private = (PLHPrivate*)rowfilter_args->private_data;
+	bool ret = true;
+	MemoryContext old_ctx;
+
+	Assert(private != NULL);
+
+	if (OidIsValid(private->row_filter_hook))
+	{
+		char change_type;
+		switch (rowfilter_args->change_type)
+		{
+			case REORDER_BUFFER_CHANGE_INSERT:
+				change_type = 'I';
+				break;
+			case REORDER_BUFFER_CHANGE_UPDATE:
+				change_type = 'U';
+				break;
+			case REORDER_BUFFER_CHANGE_DELETE:
+				change_type = 'D';
+				break;
+			default:
+				elog(ERROR, "unknown change type %d", rowfilter_args->change_type);
+				change_type = '0';	/* silence compiler */
+		}
+
+		old_ctx = MemoryContextSwitchTo(private->hook_call_context);
+		elog(DEBUG3, "calling pglo row filter hook with (%u,%c,%s)",
+				rowfilter_args->changed_rel->rd_id, change_type,
+				private->client_arg);
+		ret = DatumGetBool(OidFunctionCall3(
+				private->row_filter_hook,
+				ObjectIdGetDatum(rowfilter_args->changed_rel->rd_id),
+				CharGetDatum(change_type),
+				CStringGetTextDatum(private->client_arg)));
+		elog(DEBUG3, "called pglo row filter hook, returns %d", (int)ret);
+		MemoryContextSwitchTo(old_ctx);
+		MemoryContextReset(private->hook_call_context);
+	}
+
+	return ret;
+}
+
+bool
+pglo_plhooks_txn_filter(struct PGLogicalTxnFilterArgs *txnfilter_args)
+{
+	PLHPrivate *private = (PLHPrivate*)txnfilter_args->private_data;
+	bool ret = true;
+	MemoryContext old_ctx;
+
+	Assert(private != NULL);
+
+
+	if (OidIsValid(private->txn_filter_hook))
+	{
+		old_ctx = MemoryContextSwitchTo(private->hook_call_context);
+
+		elog(DEBUG3, "calling pglo txn filter hook with (%hu,%s)",
+				txnfilter_args->origin_id, private->client_arg);
+		ret = DatumGetBool(OidFunctionCall2(
+					private->txn_filter_hook,
+					UInt16GetDatum(txnfilter_args->origin_id),
+					CStringGetTextDatum(private->client_arg)));
+		elog(DEBUG3, "calling pglo txn filter hook, returns %d", (int)ret);
+
+		MemoryContextSwitchTo(old_ctx);
+		MemoryContextReset(private->hook_call_context);
+	}
+
+	return ret;
+}
+
+Datum
+pglo_plhooks_setup_fn(PG_FUNCTION_ARGS)
+{
+	struct PGLogicalHooks *hooks = (struct PGLogicalHooks*) PG_GETARG_POINTER(0);
+
+	/* Your code doesn't need this, it's just for the tests: */
+	Assert(hooks != NULL);
+	Assert(hooks->hooks_private_data == NULL);
+	Assert(hooks->startup_hook == NULL);
+	Assert(hooks->shutdown_hook == NULL);
+	Assert(hooks->row_filter_hook == NULL);
+	Assert(hooks->txn_filter_hook == NULL);
+
+	/*
+	 * Just assign the hook pointers. We're not meant to do much
+	 * work here.
+	 *
+	 * Note that private_data is left untouched, to be set up by the
+	 * startup hook.
+	 */
+	hooks->startup_hook = pglo_plhooks_startup;
+	hooks->shutdown_hook = pglo_plhooks_shutdown;
+	hooks->row_filter_hook = pglo_plhooks_row_filter;
+	hooks->txn_filter_hook = pglo_plhooks_txn_filter;
+	elog(DEBUG3, "configured pglo hooks");
+
+	PG_RETURN_VOID();
+}
+
+static void
+exec_user_startup_hook(PLHPrivate *private, List *in_params, List **out_params)
+{
+		ArrayType *startup_params;
+		Datum ret;
+		ListCell *lc;
+		Datum *startup_params_elems;
+		bool  *startup_params_isnulls;
+		int   n_startup_params;
+		int   i;
+		MemoryContext old_ctx;
+
+
+		old_ctx = MemoryContextSwitchTo(private->hook_call_context);
+
+		/*
+		 * Build the input parameter array. NULL parameters are passed as the
+		 * empty string for the sake of convenience. Each param is two
+		 * elements, a key then a value element.
+		 */
+		n_startup_params = list_length(in_params) * 2;
+		startup_params_elems = (Datum*)palloc0(sizeof(Datum)*n_startup_params);
+
+		i = 0;
+		foreach (lc, in_params)
+		{
+			DefElem * elem = (DefElem*)lfirst(lc);
+			const char *val;
+
+			if (elem->arg == NULL || strVal(elem->arg) == NULL)
+				val = "";
+			else
+				val = strVal(elem->arg);
+
+			startup_params_elems[i++] = CStringGetTextDatum(elem->defname);
+			startup_params_elems[i++] = CStringGetTextDatum(val);
+		}
+		Assert(i == n_startup_params);
+
+		startup_params = construct_array(startup_params_elems, n_startup_params,
+				TEXTOID, -1, false, 'i');
+
+		ret = OidFunctionCall2(
+				private->startup_hook,
+				PointerGetDatum(startup_params),
+				CStringGetTextDatum(private->client_arg));
+
+		/*
+		 * deconstruct return array and add pairs of results to a DefElem list.
+		 */
+		deconstruct_array(DatumGetArrayTypeP(ret), TEXTARRAYOID,
+				-1, false, 'i', &startup_params_elems, &startup_params_isnulls,
+				&n_startup_params);
+
+
+		*out_params = NIL;
+		for (i = 0; i < n_startup_params; i = i + 2)
+		{
+			char *value;
+			DefElem *elem;
+
+			if (startup_params_isnulls[i])
+				elog(ERROR, "Array entry corresponding to a key was null at idx=%d", i);
+
+			if (startup_params_isnulls[i+1])
+				value = "";
+			else
+				value = TextDatumGetCString(startup_params_elems[i+1]);
+
+			elem = makeDefElem(
+					TextDatumGetCString(startup_params_elems[i]),
+					(Node*)makeString(value));
+
+			*out_params = lcons(elem, *out_params);
+		}
+
+		MemoryContextSwitchTo(old_ctx);
+		MemoryContextReset(private->hook_call_context);
+}
+
+static void
+read_parameters(PLHPrivate *private, List *in_params)
+{
+	ListCell *option;
+
+	foreach(option, in_params)
+	{
+		DefElem    *elem = lfirst(option);
+
+		if (pg_strcasecmp("pglo_plhooks.client_hook_arg", elem->defname) == 0)
+		{
+			if (elem->arg == NULL || strVal(elem->arg) == NULL)
+				elog(ERROR, "pglo_plhooks.client_hook_arg may not be NULL");
+			private->client_arg = pstrdup(strVal(elem->arg));
+		}
+
+		if (pg_strcasecmp("pglo_plhooks.startup_hook", elem->defname) == 0)
+		{
+			if (elem->arg == NULL || strVal(elem->arg) == NULL)
+				elog(ERROR, "pglo_plhooks.startup_hook may not be NULL");
+			private->startup_hook = find_startup_hook(strVal(elem->arg));
+		}
+
+		if (pg_strcasecmp("pglo_plhooks.shutdown_hook", elem->defname) == 0)
+		{
+			if (elem->arg == NULL || strVal(elem->arg) == NULL)
+				elog(ERROR, "pglo_plhooks.shutdown_hook may not be NULL");
+			private->shutdown_hook = find_shutdown_hook(strVal(elem->arg));
+		}
+
+		if (pg_strcasecmp("pglo_plhooks.txn_filter_hook", elem->defname) == 0)
+		{
+			if (elem->arg == NULL || strVal(elem->arg) == NULL)
+				elog(ERROR, "pglo_plhooks.txn_filter_hook may not be NULL");
+			private->txn_filter_hook = find_txn_filter_hook(strVal(elem->arg));
+		}
+
+		if (pg_strcasecmp("pglo_plhooks.row_filter_hook", elem->defname) == 0)
+		{
+			if (elem->arg == NULL || strVal(elem->arg) == NULL)
+				elog(ERROR, "pglo_plhooks.row_filter_hook may not be NULL");
+			private->row_filter_hook = find_row_filter_hook(strVal(elem->arg));
+		}
+	}
+}
+
+static Oid
+find_hook_fn(const char *funcname, Oid funcargtypes[], int nfuncargtypes, Oid returntype)
+{
+	Oid			funcid;
+	List	   *qname;
+
+	qname = stringToQualifiedNameList(funcname);
+
+	/* find the the function */
+	funcid = LookupFuncName(qname, nfuncargtypes, funcargtypes, false);
+
+	/* Check expected return type */
+	if (get_func_rettype(funcid) != returntype)
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+				 errmsg("function %s doesn't return expected type %d",
+						NameListToString(qname), returntype)));
+	}
+
+	if (pg_proc_aclcheck(funcid, GetUserId(), ACL_EXECUTE) != ACLCHECK_OK)
+	{
+		const char * username;
+#if PG_VERSION_NUM >= 90500
+		username = GetUserNameFromId(GetUserId(), false);
+#else
+		username = GetUserNameFromId(GetUserId());
+#endif
+		ereport(ERROR,
+				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+				 errmsg("current user %s does not have permission to call function %s",
+					 username, NameListToString(qname))));
+	}
+
+	list_free_deep(qname);
+
+	return funcid;
+}
+
+static Oid
+find_startup_hook(const char *proname)
+{
+	Oid argtypes[2];
+
+	argtypes[0] = TEXTARRAYOID;
+	argtypes[1] = TEXTOID;
+
+	return find_hook_fn(proname, argtypes, 2, VOIDOID);
+}
+
+static Oid
+find_shutdown_hook(const char *proname)
+{
+	Oid argtypes[1];
+
+	argtypes[0] = TEXTOID;
+
+	return find_hook_fn(proname, argtypes, 1, VOIDOID);
+}
+
+static Oid
+find_row_filter_hook(const char *proname)
+{
+	Oid argtypes[3];
+
+	argtypes[0] = REGCLASSOID;
+	argtypes[1] = CHAROID;
+	argtypes[2] = TEXTOID;
+
+	return find_hook_fn(proname, argtypes, 3, BOOLOID);
+}
+
+static Oid
+find_txn_filter_hook(const char *proname)
+{
+	Oid argtypes[2];
+
+	argtypes[0] = INT4OID;
+	argtypes[1] = TEXTOID;
+
+	return find_hook_fn(proname, argtypes, 2, BOOLOID);
+}
diff --git a/contrib/pglogical_output/examples/hooks/pglogical_output_plhooks.control b/contrib/pglogical_output/examples/hooks/pglogical_output_plhooks.control
new file mode 100644
index 0000000..647b9ef
--- /dev/null
+++ b/contrib/pglogical_output/examples/hooks/pglogical_output_plhooks.control
@@ -0,0 +1,4 @@
+comment = 'pglogical_output pl hooks'
+default_version = '1.0'
+module_pathname = '$libdir/pglogical_output_plhooks'
+relocatable = false
diff --git a/contrib/pglogical_output/expected/basic.out b/contrib/pglogical_output/expected/basic.out
new file mode 100644
index 0000000..2c6a64e
--- /dev/null
+++ b/contrib/pglogical_output/expected/basic.out
@@ -0,0 +1,60 @@
+SET synchronous_commit = on;
+-- Schema setup
+CREATE TABLE demo (
+	seq serial primary key,
+	tx text,
+	ts timestamp,
+	jsb jsonb,
+	js json,
+	ba bytea
+);
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pglogical_output');
+ ?column? 
+----------
+ init
+(1 row)
+
+-- Queue up some work to decode with a variety of types
+INSERT INTO demo(tx) VALUES ('textval');
+INSERT INTO demo(ba) VALUES (BYTEA '\xDEADBEEF0001');
+INSERT INTO demo(ts, tx) VALUES (TIMESTAMP '2045-09-12 12:34:56.00', 'blah');
+INSERT INTO demo(js, jsb) VALUES ('{"key":"value"}', '{"key":"value"}');
+-- Simple decode with text-format tuples
+--
+-- It's still the logical decoding binary protocol and as such it has
+-- embedded timestamps, and pglogical its self has embedded LSNs, xids,
+-- etc. So all we can really do is say "yup, we got the expected number
+-- of messages".
+SELECT count(data) FROM pg_logical_slot_peek_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1');
+ count 
+-------
+    17
+(1 row)
+
+-- ... and send/recv binary format
+-- The main difference visible is that the bytea fields aren't encoded
+SELECT count(data) FROM pg_logical_slot_peek_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'binary.want_binary_basetypes', '1',
+	'binary.basetypes_major_version', (current_setting('server_version_num')::integer / 100)::text);
+ count 
+-------
+    17
+(1 row)
+
+SELECT 'drop' FROM pg_drop_replication_slot('regression_slot');
+ ?column? 
+----------
+ drop
+(1 row)
+
+DROP TABLE demo;
diff --git a/contrib/pglogical_output/expected/hooks.out b/contrib/pglogical_output/expected/hooks.out
new file mode 100644
index 0000000..69de857
--- /dev/null
+++ b/contrib/pglogical_output/expected/hooks.out
@@ -0,0 +1,95 @@
+CREATE EXTENSION pglogical_output_plhooks;
+CREATE FUNCTION test_filter(relid regclass, action "char", nodeid text)
+returns bool stable language plpgsql AS $$
+BEGIN
+	IF nodeid <> 'foo' THEN
+	    RAISE EXCEPTION 'Expected nodeid <foo>, got <%>',nodeid;
+	END IF;
+	RETURN relid::regclass::text NOT LIKE '%_filter%';
+END
+$$;
+CREATE FUNCTION test_action_filter(relid regclass, action "char", nodeid text)
+returns bool stable language plpgsql AS $$
+BEGIN
+    RETURN action NOT IN ('U', 'D');
+END
+$$;
+CREATE FUNCTION wrong_signature_fn(relid regclass)
+returns bool stable language plpgsql as $$
+BEGIN
+END;
+$$;
+CREATE TABLE test_filter(id integer);
+CREATE TABLE test_nofilt(id integer);
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pglogical_output');
+ ?column? 
+----------
+ init
+(1 row)
+
+INSERT INTO test_filter(id) SELECT generate_series(1,10);
+INSERT INTO test_nofilt(id) SELECT generate_series(1,10);
+DELETE FROM test_filter WHERE id % 2 = 0;
+DELETE FROM test_nofilt WHERE id % 2 = 0;
+UPDATE test_filter SET id = id*100 WHERE id = 5;
+UPDATE test_nofilt SET id = id*100 WHERE id = 5;
+SELECT count(data) FROM pg_logical_slot_peek_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'hooks.setup_function', 'public.pglo_plhooks_setup_fn',
+	'pglo_plhooks.row_filter_hook', 'public.test_filter',
+	'pglo_plhooks.client_hook_arg', 'foo'
+	);
+ count 
+-------
+    40
+(1 row)
+
+SELECT count(data) FROM pg_logical_slot_peek_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'hooks.setup_function', 'public.pglo_plhooks_setup_fn',
+	'pglo_plhooks.row_filter_hook', 'public.test_action_filter'
+	);
+ count 
+-------
+    53
+(1 row)
+
+SELECT count(data) FROM pg_logical_slot_peek_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'hooks.setup_function', 'public.pglo_plhooks_setup_fn',
+	'pglo_plhooks.row_filter_hook', 'public.nosuchfunction'
+	);
+ERROR:  function public.nosuchfunction(regclass, "char", text) does not exist
+CONTEXT:  slot "regression_slot", output plugin "pglogical_output", in the startup callback
+SELECT count(data) FROM pg_logical_slot_peek_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'hooks.setup_function', 'public.pglo_plhooks_setup_fn',
+	'pglo_plhooks.row_filter_hook', 'public.wrong_signature_fn'
+	);
+ERROR:  function public.wrong_signature_fn(regclass, "char", text) does not exist
+CONTEXT:  slot "regression_slot", output plugin "pglogical_output", in the startup callback
+SELECT 'drop' FROM pg_drop_replication_slot('regression_slot');
+ ?column? 
+----------
+ drop
+(1 row)
+
+DROP TABLE test_filter;
+DROP TABLE test_nofilt;
+DROP EXTENSION pglogical_output_plhooks;
diff --git a/contrib/pglogical_output/expected/init.out b/contrib/pglogical_output/expected/init.out
new file mode 100644
index 0000000..e69de29
diff --git a/contrib/pglogical_output/expected/params.out b/contrib/pglogical_output/expected/params.out
new file mode 100644
index 0000000..5289686
--- /dev/null
+++ b/contrib/pglogical_output/expected/params.out
@@ -0,0 +1,94 @@
+SET synchronous_commit = on;
+-- no need to CREATE EXTENSION as we intentionally don't have any catalog presence
+-- Instead, just create a slot.
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pglogical_output');
+ ?column? 
+----------
+ init
+(1 row)
+
+-- Minimal invocation with no data
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1');
+ data 
+------
+(0 rows)
+
+--
+-- Various invalid parameter combos:
+--
+-- Text mode is not supported
+SELECT data FROM pg_logical_slot_get_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1');
+ERROR:  logical decoding output plugin "pglogical_output" produces binary output, but "pg_logical_slot_get_changes(name,pg_lsn,integer,text[])" expects textual data
+-- error, only supports proto v1
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '2',
+	'max_proto_version', '1',
+	'startup_params_format', '1');
+ERROR:  client sent min_proto_version=2 but we only support protocol 1 or lower
+CONTEXT:  slot "regression_slot", output plugin "pglogical_output", in the startup callback
+-- error, only supports proto v1
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '2',
+	'max_proto_version', '2',
+	'startup_params_format', '1');
+ERROR:  client sent min_proto_version=2 but we only support protocol 1 or lower
+CONTEXT:  slot "regression_slot", output plugin "pglogical_output", in the startup callback
+-- error, unrecognised startup params format
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '2');
+ERROR:  client sent startup parameters in format 2 but we only support format 1
+CONTEXT:  slot "regression_slot", output plugin "pglogical_output", in the startup callback
+-- Should be OK and result in proto version 1 selection, though we won't
+-- see that here.
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '2',
+	'startup_params_format', '1');
+ data 
+------
+(0 rows)
+
+-- no such encoding / encoding mismatch
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'bork',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1');
+ERROR:  only "UTF8" encoding is supported by this server, client requested bork
+CONTEXT:  slot "regression_slot", output plugin "pglogical_output", in the startup callback
+-- Currently we're sensitive to the encoding name's format (TODO)
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF-8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1');
+ERROR:  only "UTF8" encoding is supported by this server, client requested UTF-8
+CONTEXT:  slot "regression_slot", output plugin "pglogical_output", in the startup callback
+SELECT 'drop' FROM pg_drop_replication_slot('regression_slot');
+ ?column? 
+----------
+ drop
+(1 row)
+
diff --git a/contrib/pglogical_output/expected/pre-clean.out b/contrib/pglogical_output/expected/pre-clean.out
new file mode 100644
index 0000000..e69de29
diff --git a/contrib/pglogical_output/pglogical_config.c b/contrib/pglogical_output/pglogical_config.c
new file mode 100644
index 0000000..276a9bc
--- /dev/null
+++ b/contrib/pglogical_output/pglogical_config.c
@@ -0,0 +1,498 @@
+/*-------------------------------------------------------------------------
+ *
+ * pglogical_config.c
+ *		  Logical Replication output plugin
+ *
+ * Copyright (c) 2012-2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  pglogical_config.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "pglogical_output/compat.h"
+#include "pglogical_config.h"
+#include "pglogical_output.h"
+
+#include "catalog/catversion.h"
+#include "catalog/namespace.h"
+
+#include "mb/pg_wchar.h"
+
+#include "utils/builtins.h"
+#include "utils/int8.h"
+#include "utils/inval.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/rel.h"
+#include "utils/relcache.h"
+#include "utils/syscache.h"
+#include "utils/typcache.h"
+
+typedef enum PGLogicalOutputParamType
+{
+	OUTPUT_PARAM_TYPE_BOOL,
+	OUTPUT_PARAM_TYPE_UINT32,
+	OUTPUT_PARAM_TYPE_STRING,
+	OUTPUT_PARAM_TYPE_QUALIFIED_NAME
+} PGLogicalOutputParamType;
+
+/* param parsing */
+static Datum get_param_value(DefElem *elem, bool null_ok,
+		PGLogicalOutputParamType type);
+
+static Datum get_param(List *options, const char *name, bool missing_ok,
+					   bool null_ok, PGLogicalOutputParamType type,
+					   bool *found);
+static bool parse_param_bool(DefElem *elem);
+static uint32 parse_param_uint32(DefElem *elem);
+
+static void
+process_parameters_v1(List *options, PGLogicalOutputData *data);
+
+enum {
+	PARAM_UNRECOGNISED,
+	PARAM_MAX_PROTOCOL_VERSION,
+	PARAM_MIN_PROTOCOL_VERSION,
+	PARAM_EXPECTED_ENCODING,
+	PARAM_BINARY_BIGENDIAN,
+	PARAM_BINARY_SIZEOF_DATUM,
+	PARAM_BINARY_SIZEOF_INT,
+	PARAM_BINARY_SIZEOF_LONG,
+	PARAM_BINARY_FLOAT4BYVAL,
+	PARAM_BINARY_FLOAT8BYVAL,
+	PARAM_BINARY_INTEGER_DATETIMES,
+	PARAM_BINARY_WANT_INTERNAL_BASETYPES,
+	PARAM_BINARY_WANT_BINARY_BASETYPES,
+	PARAM_BINARY_BASETYPES_MAJOR_VERSION,
+	PARAM_PG_VERSION,
+	PARAM_FORWARD_CHANGESETS,
+	PARAM_HOOKS_SETUP_FUNCTION,
+} OutputPluginParamKey;
+
+typedef struct {
+	const char * const paramname;
+	int paramkey;
+} OutputPluginParam;
+
+/* Oh, if only C had switch on strings */
+static OutputPluginParam param_lookup[] = {
+	{"max_proto_version", PARAM_MAX_PROTOCOL_VERSION},
+	{"min_proto_version", PARAM_MIN_PROTOCOL_VERSION},
+	{"expected_encoding", PARAM_EXPECTED_ENCODING},
+	{"binary.bigendian", PARAM_BINARY_BIGENDIAN},
+	{"binary.sizeof_datum", PARAM_BINARY_SIZEOF_DATUM},
+	{"binary.sizeof_int", PARAM_BINARY_SIZEOF_INT},
+	{"binary.sizeof_long", PARAM_BINARY_SIZEOF_LONG},
+	{"binary.float4_byval", PARAM_BINARY_FLOAT4BYVAL},
+	{"binary.float8_byval", PARAM_BINARY_FLOAT8BYVAL},
+	{"binary.integer_datetimes", PARAM_BINARY_INTEGER_DATETIMES},
+	{"binary.want_internal_basetypes", PARAM_BINARY_WANT_INTERNAL_BASETYPES},
+	{"binary.want_binary_basetypes", PARAM_BINARY_WANT_BINARY_BASETYPES},
+	{"binary.basetypes_major_version", PARAM_BINARY_BASETYPES_MAJOR_VERSION},
+	{"pg_version", PARAM_PG_VERSION},
+	{"forward_changesets", PARAM_FORWARD_CHANGESETS},
+	{"hooks.setup_function", PARAM_HOOKS_SETUP_FUNCTION},
+	{NULL, PARAM_UNRECOGNISED}
+};
+
+/*
+ * Look up a param name to find the enum value for the
+ * param, or PARAM_UNRECOGNISED if not found.
+ */
+static int
+get_param_key(const char * const param_name)
+{
+	OutputPluginParam *param = &param_lookup[0];
+
+	do {
+		if (strcmp(param->paramname, param_name) == 0)
+			return param->paramkey;
+		param++;
+	} while (param->paramname != NULL);
+
+	return PARAM_UNRECOGNISED;
+}
+
+
+void
+process_parameters_v1(List *options, PGLogicalOutputData *data)
+{
+	Datum		val;
+	bool    	found;
+	ListCell	*lc;
+
+	/*
+	 * max_proto_version and min_proto_version are specified
+	 * as required, and must be parsed before anything else.
+	 *
+	 * TODO: We should still parse them as optional and
+	 * delay the ERROR until after the startup reply.
+	 */
+	val = get_param(options, "max_proto_version", false, false,
+					OUTPUT_PARAM_TYPE_UINT32, &found);
+	data->client_max_proto_version = DatumGetUInt32(val);
+
+	val = get_param(options, "min_proto_version", false, false,
+					OUTPUT_PARAM_TYPE_UINT32, &found);
+	data->client_min_proto_version = DatumGetUInt32(val);
+
+	/* Examine all the other params in the v1 message. */
+	foreach(lc, options)
+	{
+		DefElem    *elem = lfirst(lc);
+
+		Assert(elem->arg == NULL || IsA(elem->arg, String));
+
+		/* Check each param, whether or not we recognise it */
+		switch(get_param_key(elem->defname))
+		{
+			val = get_param_value(elem, false, OUTPUT_PARAM_TYPE_UINT32);
+
+			case PARAM_BINARY_BIGENDIAN:
+				val = get_param_value(elem, false, OUTPUT_PARAM_TYPE_BOOL);
+				data->client_binary_bigendian_set = true;
+				data->client_binary_bigendian = DatumGetBool(val);
+				break;
+
+			case PARAM_BINARY_SIZEOF_DATUM:
+				val = get_param_value(elem, false, OUTPUT_PARAM_TYPE_UINT32);
+				data->client_binary_sizeofdatum = DatumGetUInt32(val);
+				break;
+
+			case PARAM_BINARY_SIZEOF_INT:
+				val = get_param_value(elem, false, OUTPUT_PARAM_TYPE_UINT32);
+				data->client_binary_sizeofint = DatumGetUInt32(val);
+				break;
+
+			case PARAM_BINARY_SIZEOF_LONG:
+				val = get_param_value(elem, false, OUTPUT_PARAM_TYPE_UINT32);
+				data->client_binary_sizeoflong = DatumGetUInt32(val);
+				break;
+
+			case PARAM_BINARY_FLOAT4BYVAL:
+				val = get_param_value(elem, false, OUTPUT_PARAM_TYPE_BOOL);
+				data->client_binary_float4byval_set = true;
+				data->client_binary_float4byval = DatumGetBool(val);
+				break;
+
+			case PARAM_BINARY_FLOAT8BYVAL:
+				val = get_param_value(elem, false, OUTPUT_PARAM_TYPE_BOOL);
+				data->client_binary_float4byval_set = true;
+				data->client_binary_float4byval = DatumGetBool(val);
+				break;
+
+			case PARAM_BINARY_INTEGER_DATETIMES:
+				val = get_param_value(elem, false, OUTPUT_PARAM_TYPE_BOOL);
+				data->client_binary_intdatetimes_set = true;
+				data->client_binary_intdatetimes = DatumGetBool(val);
+				break;
+
+			case PARAM_EXPECTED_ENCODING:
+				val = get_param_value(elem, false, OUTPUT_PARAM_TYPE_STRING);
+				data->client_expected_encoding = DatumGetCString(val);
+				break;
+
+			case PARAM_PG_VERSION:
+				val = get_param_value(elem, false, OUTPUT_PARAM_TYPE_UINT32);
+				data->client_pg_version = DatumGetUInt32(val);
+				break;
+
+			case PARAM_FORWARD_CHANGESETS:
+				/*
+				 * Check to see if the client asked for changeset forwarding
+				 *
+				 * Note that we cannot support this on 9.4. We'll tell the client
+				 * in the startup reply message.
+				 */
+				val = get_param_value(elem, false, OUTPUT_PARAM_TYPE_BOOL);
+				data->client_forward_changesets_set = true;
+				data->client_forward_changesets = DatumGetBool(val);
+				break;
+
+			case PARAM_BINARY_WANT_INTERNAL_BASETYPES:
+				/* check if we want to use internal data representation */
+				val = get_param_value(elem, false, OUTPUT_PARAM_TYPE_BOOL);
+				data->client_want_internal_basetypes_set = true;
+				data->client_want_internal_basetypes = DatumGetBool(val);
+				break;
+
+			case PARAM_BINARY_WANT_BINARY_BASETYPES:
+				/* check if we want to use binary data representation */
+				val = get_param_value(elem, false, OUTPUT_PARAM_TYPE_BOOL);
+				data->client_want_binary_basetypes_set = true;
+				data->client_want_binary_basetypes = DatumGetBool(val);
+				break;
+
+			case PARAM_BINARY_BASETYPES_MAJOR_VERSION:
+				val = get_param_value(elem, false, OUTPUT_PARAM_TYPE_UINT32);
+				data->client_binary_basetypes_major_version = DatumGetUInt32(val);
+				break;
+
+			case PARAM_HOOKS_SETUP_FUNCTION:
+				val = get_param_value(elem, false, OUTPUT_PARAM_TYPE_QUALIFIED_NAME);
+				data->hooks_setup_funcname = (List*) PointerGetDatum(val);
+				break;
+
+			case PARAM_UNRECOGNISED:
+				ereport(DEBUG1,
+						(errmsg("Unrecognised pglogical parameter %s ignored", elem->defname)));
+				break;
+		}
+	}
+}
+
+/*
+ * Read parameters sent by client at startup and store recognised
+ * ones in the parameters PGLogicalOutputData.
+ *
+ * The PGLogicalOutputData must have all client-surprised parameter fields
+ * zeroed, such as by memset or palloc0, since values not supplied
+ * by the client are not set.
+ */
+int
+process_parameters(List *options, PGLogicalOutputData *data)
+{
+	Datum	val;
+	bool    found;
+	int		params_format;
+
+	val = get_param(options, "startup_params_format", false, false,
+					OUTPUT_PARAM_TYPE_UINT32, &found);
+
+	params_format = DatumGetUInt32(val);
+
+	if (params_format == 1)
+	{
+		process_parameters_v1(options, data);
+	}
+
+	return params_format;
+}
+
+static Datum
+get_param_value(DefElem *elem, bool null_ok, PGLogicalOutputParamType type)
+{
+	/* Check for NULL value */
+	if (elem->arg == NULL || strVal(elem->arg) == NULL)
+	{
+		if (null_ok)
+			return (Datum) 0;
+		else
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("parameter \"%s\" cannot be NULL", elem->defname)));
+	}
+
+	switch (type)
+	{
+		case OUTPUT_PARAM_TYPE_UINT32:
+			return UInt32GetDatum(parse_param_uint32(elem));
+		case OUTPUT_PARAM_TYPE_BOOL:
+			return BoolGetDatum(parse_param_bool(elem));
+		case OUTPUT_PARAM_TYPE_STRING:
+			return PointerGetDatum(pstrdup(strVal(elem->arg)));
+		case OUTPUT_PARAM_TYPE_QUALIFIED_NAME:
+			return PointerGetDatum(textToQualifiedNameList(cstring_to_text(pstrdup(strVal(elem->arg)))));
+		default:
+			elog(ERROR, "unknown parameter type %d", type);
+	}
+}
+
+/*
+ * Param parsing
+ *
+ * This is not exactly fast but since it's only called on replication start
+ * we'll leave it for now.
+ */
+static Datum
+get_param(List *options, const char *name, bool missing_ok, bool null_ok,
+		  PGLogicalOutputParamType type, bool *found)
+{
+	ListCell	   *option;
+
+	*found = false;
+
+	foreach(option, options)
+	{
+		DefElem    *elem = lfirst(option);
+
+		Assert(elem->arg == NULL || IsA(elem->arg, String));
+
+		/* Search until matching parameter found */
+		if (pg_strcasecmp(name, elem->defname))
+			continue;
+
+		*found = true;
+
+		return get_param_value(elem, null_ok, type);
+	}
+
+	if (!missing_ok)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("missing required parameter \"%s\"", name)));
+
+	return (Datum) 0;
+}
+
+static bool
+parse_param_bool(DefElem *elem)
+{
+	bool		res;
+
+	if (!parse_bool(strVal(elem->arg), &res))
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("could not parse boolean value \"%s\" for parameter \"%s\"",
+						strVal(elem->arg), elem->defname)));
+
+	return res;
+}
+
+static uint32
+parse_param_uint32(DefElem *elem)
+{
+	int64		res;
+
+	if (!scanint8(strVal(elem->arg), true, &res))
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("could not parse integer value \"%s\" for parameter \"%s\"",
+						strVal(elem->arg), elem->defname)));
+
+	if (res > PG_UINT32_MAX || res < 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("value \"%s\" out of range for parameter \"%s\"",
+						strVal(elem->arg), elem->defname)));
+
+	return (uint32) res;
+}
+
+static void
+append_startup_msg_key(StringInfo si, const char *key)
+{
+	appendStringInfoString(si, key);
+	appendStringInfoChar(si, '\0');
+}
+
+static void
+append_startup_msg_s(StringInfo si, const char *key, const char *val)
+{
+	append_startup_msg_key(si, key);
+	appendStringInfoString(si, val);
+	appendStringInfoChar(si, '\0');
+}
+
+static void
+append_startup_msg_i(StringInfo si, const char *key, int val)
+{
+	append_startup_msg_key(si, key);
+	appendStringInfo(si, "%d", val);
+	appendStringInfoChar(si, '\0');
+}
+
+static void
+append_startup_msg_b(StringInfo si, const char *key, bool val)
+{
+	append_startup_msg_s(si, key, val ? "t" : "f");
+}
+
+/*
+ * This builds the protocol startup message, which is always the first
+ * message on the wire after the client sends START_REPLICATION.
+ *
+ * It confirms to the client that we could apply requested options, and
+ * tells the client our capabilities.
+ *
+ * The message is a series of null-terminated strings, alternating keys
+ * and values.
+ *
+ * See the protocol docs for details.
+ *
+ * Any additional parameters provided by the startup hook are also output
+ * now.
+ *
+ * The output param 'msg' is a null-terminated char* palloc'd in the current
+ * memory context and the length 'len' of that string that is valid. The caller
+ * should pfree the result after use.
+ *
+ * This is a bit less efficient than direct pq_sendblah calls, but
+ * separates config handling from the protocol implementation, and
+ * it's not like startup msg performance matters much.
+ */
+void
+prepare_startup_message(PGLogicalOutputData *data, char **msg, int *len)
+{
+	StringInfoData si;
+	ListCell *lc;
+
+	initStringInfo(&si);
+
+	append_startup_msg_s(&si, "max_proto_version", "1");
+	append_startup_msg_s(&si, "min_proto_version", "1");
+
+	/* We don't support understand column types yet */
+	append_startup_msg_b(&si, "coltypes", false);
+
+	/* Info about our Pg host */
+	append_startup_msg_i(&si, "pg_version_num", PG_VERSION_NUM);
+	append_startup_msg_s(&si, "pg_version", PG_VERSION);
+	append_startup_msg_i(&si, "pg_catversion", CATALOG_VERSION_NO);
+
+	append_startup_msg_s(&si, "encoding", GetDatabaseEncodingName());
+
+	append_startup_msg_b(&si, "forward_changesets",
+			data->forward_changesets);
+	append_startup_msg_b(&si, "forward_changeset_origins",
+			data->forward_changeset_origins);
+
+	/* binary options enabled */
+	append_startup_msg_b(&si, "binary.internal_basetypes",
+			data->allow_internal_basetypes);
+	append_startup_msg_b(&si, "binary.binary_basetypes",
+			data->allow_binary_basetypes);
+
+	/* Binary format characteristics of server */
+	append_startup_msg_i(&si, "binary.basetypes_major_version", PG_VERSION_NUM/100);
+	append_startup_msg_i(&si, "binary.sizeof_int", sizeof(int));
+	append_startup_msg_i(&si, "binary.sizeof_long", sizeof(long));
+	append_startup_msg_i(&si, "binary.sizeof_datum", sizeof(Datum));
+	append_startup_msg_i(&si, "binary.maxalign", MAXIMUM_ALIGNOF);
+	append_startup_msg_b(&si, "binary.bigendian", server_bigendian());
+	append_startup_msg_b(&si, "binary.float4_byval", server_float4_byval());
+	append_startup_msg_b(&si, "binary.float8_byval", server_float8_byval());
+	append_startup_msg_b(&si, "binary.integer_datetimes", server_integer_datetimes());
+	/* We don't know how to send in anything except our host's format */
+	append_startup_msg_i(&si, "binary.binary_pg_version",
+			PG_VERSION_NUM/100);
+
+
+	/*
+	 * Confirm that we've enabled any requested hook functions.
+	 */
+	append_startup_msg_b(&si, "hooks.startup_hook_enabled",
+			data->hooks.startup_hook != NULL);
+	append_startup_msg_b(&si, "hooks.shutdown_hook_enabled",
+			data->hooks.shutdown_hook != NULL);
+	append_startup_msg_b(&si, "hooks.row_filter_enabled",
+			data->hooks.row_filter_hook != NULL);
+	append_startup_msg_b(&si, "hooks.transaction_filter_enabled",
+			data->hooks.txn_filter_hook != NULL);
+
+	/*
+	 * Output any extra params supplied by a startup hook.
+	 */
+	foreach(lc, data->extra_startup_params)
+	{
+		DefElem *param = (DefElem*)lfirst(lc);
+		Assert(IsA(param->arg, String) && strVal(param->arg) != NULL);
+		append_startup_msg_s(&si, param->defname, strVal(param->arg));
+	}
+
+	*msg = si.data;
+	*len = si.len;
+}
diff --git a/contrib/pglogical_output/pglogical_config.h b/contrib/pglogical_output/pglogical_config.h
new file mode 100644
index 0000000..fe07041
--- /dev/null
+++ b/contrib/pglogical_output/pglogical_config.h
@@ -0,0 +1,56 @@
+#ifndef PG_LOGICAL_CONFIG_H
+#define PG_LOGICAL_CONFIG_H
+
+#ifndef PG_VERSION_NUM
+#error <postgres.h> must be included first
+#endif
+
+inline static bool
+server_float4_byval(void)
+{
+#ifdef USE_FLOAT4_BYVAL
+	return true;
+#else
+	return false;
+#endif
+}
+
+inline static bool
+server_float8_byval(void)
+{
+#ifdef USE_FLOAT8_BYVAL
+	return true;
+#else
+	return false;
+#endif
+}
+
+inline static bool
+server_integer_datetimes(void)
+{
+#ifdef USE_INTEGER_DATETIMES
+	return true;
+#else
+	return false;
+#endif
+}
+
+inline static bool
+server_bigendian(void)
+{
+#ifdef WORDS_BIGENDIAN
+	return true;
+#else
+	return false;
+#endif
+}
+
+typedef struct List List;
+typedef struct PGLogicalOutputData PGLogicalOutputData;
+
+extern int process_parameters(List *options, PGLogicalOutputData *data);
+
+extern void prepare_startup_message(PGLogicalOutputData *data,
+		char **msg, int *length);
+
+#endif
diff --git a/contrib/pglogical_output/pglogical_hooks.c b/contrib/pglogical_output/pglogical_hooks.c
new file mode 100644
index 0000000..652d48f
--- /dev/null
+++ b/contrib/pglogical_output/pglogical_hooks.c
@@ -0,0 +1,234 @@
+#include "postgres.h"
+
+#include "access/xact.h"
+
+#include "catalog/pg_proc.h"
+#include "catalog/pg_type.h"
+
+#ifdef HAVE_REPLICATION_ORIGINS
+#include "replication/origin.h"
+#endif
+
+#include "parser/parse_func.h"
+
+#include "utils/acl.h"
+#include "utils/lsyscache.h"
+
+#include "miscadmin.h"
+
+#include "pglogical_hooks.h"
+#include "pglogical_output.h"
+
+/*
+ * Returns Oid of the hooks function specified in funcname.
+ *
+ * Error is thrown if function doesn't exist or doen't return correct datatype
+ * or is volatile.
+ */
+static Oid
+get_hooks_function_oid(List *funcname)
+{
+	Oid			funcid;
+	Oid			funcargtypes[1];
+
+	funcargtypes[0] = INTERNALOID;
+
+	/* find the the function */
+	funcid = LookupFuncName(funcname, 1, funcargtypes, false);
+
+	/* Validate that the function returns void */
+	if (get_func_rettype(funcid) != VOIDOID)
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+				 errmsg("function %s must return void",
+						NameListToString(funcname))));
+	}
+
+	if (func_volatile(funcid) == PROVOLATILE_VOLATILE)
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+				 errmsg("function %s must not be VOLATILE",
+						NameListToString(funcname))));
+	}
+
+	if (pg_proc_aclcheck(funcid, GetUserId(), ACL_EXECUTE) != ACLCHECK_OK)
+	{
+		const char * username;
+#if PG_VERSION_NUM >= 90500
+		username = GetUserNameFromId(GetUserId(), false);
+#else
+		username = GetUserNameFromId(GetUserId());
+#endif
+		ereport(ERROR,
+				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+				 errmsg("current user %s does not have permission to call function %s",
+					 username, NameListToString(funcname))));
+	}
+
+	return funcid;
+}
+
+/*
+ * If a hook setup function was specified in the startup parameters, look it up
+ * in the catalogs, check permissions, call it, and store the resulting hook
+ * info struct.
+ */
+void
+load_hooks(PGLogicalOutputData *data)
+{
+	Oid hooks_func;
+	MemoryContext old_ctxt;
+	bool txn_started = false;
+
+	if (!IsTransactionState())
+	{
+		txn_started = true;
+		StartTransactionCommand();
+	}
+
+	if (data->hooks_setup_funcname != NIL)
+	{
+		hooks_func = get_hooks_function_oid(data->hooks_setup_funcname);
+
+		old_ctxt = MemoryContextSwitchTo(data->hooks_mctxt);
+		(void) OidFunctionCall1(hooks_func, PointerGetDatum(&data->hooks));
+		MemoryContextSwitchTo(old_ctxt);
+
+		elog(DEBUG3, "pglogical_output: Loaded hooks from function %u. Hooks are: \n"
+				"\tstartup_hook: %p\n"
+				"\tshutdown_hook: %p\n"
+				"\trow_filter_hook: %p\n"
+				"\ttxn_filter_hook: %p\n"
+				"\thooks_private_data: %p\n",
+				hooks_func,
+				data->hooks.startup_hook,
+				data->hooks.shutdown_hook,
+				data->hooks.row_filter_hook,
+				data->hooks.txn_filter_hook,
+				data->hooks.hooks_private_data);
+	}
+
+	if (txn_started)
+		CommitTransactionCommand();
+}
+
+void
+call_startup_hook(PGLogicalOutputData *data, List *plugin_params)
+{
+	struct PGLogicalStartupHookArgs args;
+	MemoryContext old_ctxt;
+
+	if (data->hooks.startup_hook != NULL)
+	{
+		bool tx_started = false;
+
+		args.private_data = data->hooks.hooks_private_data;
+		args.in_params = plugin_params;
+		args.out_params = NIL;
+
+		elog(DEBUG3, "calling pglogical startup hook");
+
+		if (!IsTransactionState())
+		{
+			tx_started = true;
+			StartTransactionCommand();
+		}
+
+		old_ctxt = MemoryContextSwitchTo(data->hooks_mctxt);
+		(void) (*data->hooks.startup_hook)(&args);
+		MemoryContextSwitchTo(old_ctxt);
+
+		if (tx_started)
+			CommitTransactionCommand();
+
+		data->extra_startup_params = args.out_params;
+		/* The startup hook might change the private data seg */
+		data->hooks.hooks_private_data = args.private_data;
+
+		elog(DEBUG3, "called pglogical startup hook");
+	}
+}
+
+void
+call_shutdown_hook(PGLogicalOutputData *data)
+{
+	struct PGLogicalShutdownHookArgs args;
+	MemoryContext old_ctxt;
+
+	if (data->hooks.shutdown_hook != NULL)
+	{
+		args.private_data = data->hooks.hooks_private_data;
+
+		elog(DEBUG3, "calling pglogical shutdown hook");
+
+		old_ctxt = MemoryContextSwitchTo(data->hooks_mctxt);
+		(void) (*data->hooks.shutdown_hook)(&args);
+		MemoryContextSwitchTo(old_ctxt);
+
+		data->hooks.hooks_private_data = args.private_data;
+
+		elog(DEBUG3, "called pglogical shutdown hook");
+	}
+}
+
+/*
+ * Decide if the individual change should be filtered out by
+ * calling a client-provided hook.
+ */
+bool
+call_row_filter_hook(PGLogicalOutputData *data, ReorderBufferTXN *txn,
+		Relation rel, ReorderBufferChange *change)
+{
+	struct  PGLogicalRowFilterArgs hook_args;
+	MemoryContext old_ctxt;
+	bool ret = true;
+
+	if (data->hooks.row_filter_hook != NULL)
+	{
+		hook_args.change_type = change->action;
+		hook_args.private_data = data->hooks.hooks_private_data;
+		hook_args.changed_rel = rel;
+
+		elog(DEBUG3, "calling pglogical row filter hook");
+
+		old_ctxt = MemoryContextSwitchTo(data->hooks_mctxt);
+		ret = (*data->hooks.row_filter_hook)(&hook_args);
+		MemoryContextSwitchTo(old_ctxt);
+
+		/* Filter hooks shouldn't change the private data ptr */
+		Assert(data->hooks.hooks_private_data == hook_args.private_data);
+
+		elog(DEBUG3, "called pglogical row filter hook, returned %d", (int)ret);
+	}
+
+	return ret;
+}
+
+bool
+call_txn_filter_hook(PGLogicalOutputData *data, RepOriginId txn_origin)
+{
+	struct PGLogicalTxnFilterArgs hook_args;
+	bool ret = true;
+	MemoryContext old_ctxt;
+
+	if (data->hooks.txn_filter_hook != NULL)
+	{
+		hook_args.private_data = data->hooks.hooks_private_data;
+		hook_args.origin_id = txn_origin;
+
+		elog(DEBUG3, "calling pglogical txn filter hook");
+
+		old_ctxt = MemoryContextSwitchTo(data->hooks_mctxt);
+		ret = (*data->hooks.txn_filter_hook)(&hook_args);
+		MemoryContextSwitchTo(old_ctxt);
+
+		/* Filter hooks shouldn't change the private data ptr */
+		Assert(data->hooks.hooks_private_data == hook_args.private_data);
+
+		elog(DEBUG3, "called pglogical txn filter hook, returned %d", (int)ret);
+	}
+
+	return ret;
+}
diff --git a/contrib/pglogical_output/pglogical_hooks.h b/contrib/pglogical_output/pglogical_hooks.h
new file mode 100644
index 0000000..df661f3
--- /dev/null
+++ b/contrib/pglogical_output/pglogical_hooks.h
@@ -0,0 +1,22 @@
+#ifndef PGLOGICAL_HOOKS_H
+#define PGLOGICAL_HOOKS_H
+
+#include "replication/reorderbuffer.h"
+
+/* public interface for hooks */
+#include "pglogical_output/hooks.h"
+
+extern void load_hooks(PGLogicalOutputData *data);
+
+extern void call_startup_hook(PGLogicalOutputData *data, List *plugin_params);
+
+extern void call_shutdown_hook(PGLogicalOutputData *data);
+
+extern bool call_row_filter_hook(PGLogicalOutputData *data,
+		ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change);
+
+extern bool call_txn_filter_hook(PGLogicalOutputData *data,
+		RepOriginId txn_origin);
+
+
+#endif
diff --git a/contrib/pglogical_output/pglogical_output.c b/contrib/pglogical_output/pglogical_output.c
new file mode 100644
index 0000000..3269878
--- /dev/null
+++ b/contrib/pglogical_output/pglogical_output.c
@@ -0,0 +1,463 @@
+/*-------------------------------------------------------------------------
+ *
+ * pglogical_output.c
+ *		  Logical Replication output plugin
+ *
+ * Copyright (c) 2012-2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  pglogical_output.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "pglogical_output/compat.h"
+#include "pglogical_config.h"
+#include "pglogical_output.h"
+#include "pglogical_proto.h"
+#include "pglogical_hooks.h"
+
+#include "access/hash.h"
+#include "access/sysattr.h"
+#include "access/xact.h"
+
+#include "catalog/pg_class.h"
+#include "catalog/pg_proc.h"
+#include "catalog/pg_type.h"
+
+#include "mb/pg_wchar.h"
+
+#include "nodes/parsenodes.h"
+
+#include "parser/parse_func.h"
+
+#include "replication/output_plugin.h"
+#include "replication/logical.h"
+#ifdef HAVE_REPLICATION_ORIGINS
+#include "replication/origin.h"
+#endif
+
+#include "utils/builtins.h"
+#include "utils/catcache.h"
+#include "utils/int8.h"
+#include "utils/inval.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/rel.h"
+#include "utils/relcache.h"
+#include "utils/syscache.h"
+#include "utils/typcache.h"
+
+PG_MODULE_MAGIC;
+
+extern void		_PG_output_plugin_init(OutputPluginCallbacks *cb);
+
+/* These must be available to pg_dlsym() */
+static void pg_decode_startup(LogicalDecodingContext * ctx,
+							  OutputPluginOptions *opt, bool is_init);
+static void pg_decode_shutdown(LogicalDecodingContext * ctx);
+static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
+					ReorderBufferTXN *txn);
+static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
+					 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
+static void pg_decode_change(LogicalDecodingContext *ctx,
+				 ReorderBufferTXN *txn, Relation rel,
+				 ReorderBufferChange *change);
+
+#ifdef HAVE_REPLICATION_ORIGINS
+static bool pg_decode_origin_filter(LogicalDecodingContext *ctx,
+						RepOriginId origin_id);
+#endif
+
+static void send_startup_message(LogicalDecodingContext *ctx,
+		PGLogicalOutputData *data, bool last_message);
+
+static bool startup_message_sent = false;
+
+/* specify output plugin callbacks */
+void
+_PG_output_plugin_init(OutputPluginCallbacks *cb)
+{
+	AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
+
+	cb->startup_cb = pg_decode_startup;
+	cb->begin_cb = pg_decode_begin_txn;
+	cb->change_cb = pg_decode_change;
+	cb->commit_cb = pg_decode_commit_txn;
+#ifdef HAVE_REPLICATION_ORIGINS
+	cb->filter_by_origin_cb = pg_decode_origin_filter;
+#endif
+	cb->shutdown_cb = pg_decode_shutdown;
+}
+
+static bool
+check_binary_compatibility(PGLogicalOutputData *data)
+{
+	if (data->client_binary_basetypes_major_version != PG_VERSION_NUM / 100)
+		return false;
+
+	if (data->client_binary_bigendian_set
+		&& data->client_binary_bigendian != server_bigendian())
+	{
+		elog(DEBUG1, "Binary mode rejected: Server and client endian mis-match");
+		return false;
+	}
+
+	if (data->client_binary_sizeofdatum != 0
+		&& data->client_binary_sizeofdatum != sizeof(Datum))
+	{
+		elog(DEBUG1, "Binary mode rejected: Server and client endian sizeof(Datum) mismatch");
+		return false;
+	}
+
+	if (data->client_binary_sizeofint != 0
+		&& data->client_binary_sizeofint != sizeof(int))
+	{
+		elog(DEBUG1, "Binary mode rejected: Server and client endian sizeof(int) mismatch");
+		return false;
+	}
+
+	if (data->client_binary_sizeoflong != 0
+		&& data->client_binary_sizeoflong != sizeof(long))
+	{
+		elog(DEBUG1, "Binary mode rejected: Server and client endian sizeof(long) mismatch");
+		return false;
+	}
+
+	if (data->client_binary_float4byval_set
+		&& data->client_binary_float4byval != server_float4_byval())
+	{
+		elog(DEBUG1, "Binary mode rejected: Server and client endian float4byval mismatch");
+		return false;
+	}
+
+	if (data->client_binary_float8byval_set
+		&& data->client_binary_float8byval != server_float8_byval())
+	{
+		elog(DEBUG1, "Binary mode rejected: Server and client endian float8byval mismatch");
+		return false;
+	}
+
+	if (data->client_binary_intdatetimes_set
+		&& data->client_binary_intdatetimes != server_integer_datetimes())
+	{
+		elog(DEBUG1, "Binary mode rejected: Server and client endian integer datetimes mismatch");
+		return false;
+	}
+
+	return true;
+}
+
+/* initialize this plugin */
+static void
+pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt,
+				  bool is_init)
+{
+	PGLogicalOutputData  *data = palloc0(sizeof(PGLogicalOutputData));
+
+	data->context = AllocSetContextCreate(TopMemoryContext,
+										  "pglogical conversion context",
+										  ALLOCSET_DEFAULT_MINSIZE,
+										  ALLOCSET_DEFAULT_INITSIZE,
+										  ALLOCSET_DEFAULT_MAXSIZE);
+	data->allow_internal_basetypes = false;
+	data->allow_binary_basetypes = false;
+
+	ctx->output_plugin_private = data;
+
+	/*
+	 * Tell logical decoding that we will be doing binary output. This is
+	 * not the same thing as the selection of binary or text format for
+	 * output of individual fields.
+	 */
+	opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
+
+	/*
+	 * This is replication start and not slot initialization.
+	 *
+	 * Parse and validate options passed by the client.
+	 */
+	if (!is_init)
+	{
+		int		params_format;
+
+		startup_message_sent = false;
+
+		/* Now parse the rest of the params and ERROR if we see any we don't recognise */
+		params_format = process_parameters(ctx->output_plugin_options, data);
+
+		/* TODO: delay until after sending startup reply */
+		if (params_format != 1)
+			ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("client sent startup parameters in format %d but we only support format 1",
+					params_format)));
+
+		/* TODO: Should delay our ERROR until sending startup reply */
+		if (data->client_min_proto_version > PG_LOGICAL_PROTO_VERSION_NUM)
+			ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("client sent min_proto_version=%d but we only support protocol %d or lower",
+					 data->client_min_proto_version, PG_LOGICAL_PROTO_VERSION_NUM)));
+
+		/* TODO: Should delay our ERROR until sending startup reply */
+		if (data->client_max_proto_version < PG_LOGICAL_PROTO_MIN_VERSION_NUM)
+			ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("client sent max_proto_version=%d but we only support protocol %d or higher",
+				 	data->client_max_proto_version, PG_LOGICAL_PROTO_MIN_VERSION_NUM)));
+
+		/* check for encoding match if specific encoding demanded by client */
+		/* TODO: Should parse encoding name and compare properly */
+		if (data->client_expected_encoding != NULL
+				&& strlen(data->client_expected_encoding) != 0
+				&& strcmp(data->client_expected_encoding, GetDatabaseEncodingName()) != 0)
+		{
+			ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("only \"%s\" encoding is supported by this server, client requested %s",
+				 	GetDatabaseEncodingName(), data->client_expected_encoding)));
+		}
+
+		if (data->client_want_internal_basetypes)
+		{
+			data->allow_internal_basetypes =
+				check_binary_compatibility(data);
+		}
+
+		if (data->client_want_binary_basetypes &&
+			data->client_binary_basetypes_major_version == PG_VERSION_NUM / 100)
+		{
+			data->allow_binary_basetypes = true;
+		}
+
+		/*
+		 * Will we forward changesets? We have to if we're on 9.4;
+		 * otherwise honour the client's request.
+		 */
+		if (PG_VERSION_NUM/100 == 904)
+		{
+			/*
+			 * 9.4 unconditionally forwards changesets due to lack of
+			 * replication origins, and it can't ever send origin info
+			 * for the same reason.
+			 */
+			data->forward_changesets = true;
+			data->forward_changeset_origins = false;
+
+			if (data->client_forward_changesets_set
+				&& !data->client_forward_changesets)
+			{
+				ereport(DEBUG1,
+						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+						 errmsg("Cannot disable changeset forwarding on PostgreSQL 9.4")));
+			}
+		}
+		else if (data->client_forward_changesets_set
+				 && data->client_forward_changesets)
+		{
+			/* Client explicitly asked for forwarding; forward csets and origins */
+			data->forward_changesets = true;
+			data->forward_changeset_origins = true;
+		}
+		else
+		{
+			/* Default to not forwarding or honour client's request not to fwd */
+			data->forward_changesets = false;
+			data->forward_changeset_origins = false;
+		}
+
+		if (data->hooks_setup_funcname != NIL)
+		{
+
+			data->hooks_mctxt = AllocSetContextCreate(ctx->context,
+					"pglogical_output hooks context",
+					ALLOCSET_SMALL_MINSIZE,
+					ALLOCSET_SMALL_INITSIZE,
+					ALLOCSET_SMALL_MAXSIZE);
+
+			load_hooks(data);
+			call_startup_hook(data, ctx->output_plugin_options);
+		}
+	}
+}
+
+/*
+ * BEGIN callback
+ */
+void
+pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
+{
+	PGLogicalOutputData* data = (PGLogicalOutputData*)ctx->output_plugin_private;
+	bool send_replication_origin = data->forward_changeset_origins;
+
+	if (!startup_message_sent)
+		send_startup_message( ctx, data, false /* can't be last message */);
+
+#ifdef HAVE_REPLICATION_ORIGINS
+	/* If the record didn't originate locally, send origin info */
+	send_replication_origin &= txn->origin_id != InvalidRepOriginId;
+#endif
+
+	OutputPluginPrepareWrite(ctx, !send_replication_origin);
+	pglogical_write_begin(ctx->out, txn);
+
+#ifdef HAVE_REPLICATION_ORIGINS
+	if (send_replication_origin)
+	{
+		char *origin;
+
+		/* Message boundary */
+		OutputPluginWrite(ctx, false);
+		OutputPluginPrepareWrite(ctx, true);
+
+		/*
+		 * XXX: which behaviour we want here?
+		 *
+		 * Alternatives:
+		 *  - don't send origin message if origin name not found
+		 *    (that's what we do now)
+		 *  - throw error - that will break replication, not good
+		 *  - send some special "unknown" origin
+		 */
+		if (replorigin_by_oid(txn->origin_id, true, &origin))
+			pglogical_write_origin(ctx->out, origin, txn->origin_lsn);
+	}
+#endif
+
+	OutputPluginWrite(ctx, true);
+}
+
+/*
+ * COMMIT callback
+ */
+void
+pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+					 XLogRecPtr commit_lsn)
+{
+	OutputPluginPrepareWrite(ctx, true);
+	pglogical_write_commit(ctx->out, txn, commit_lsn);
+	OutputPluginWrite(ctx, true);
+}
+
+void
+pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+				 Relation relation, ReorderBufferChange *change)
+{
+	PGLogicalOutputData *data;
+	MemoryContext old;
+
+	data = ctx->output_plugin_private;
+
+	/* First check the table filter */
+	if (!call_row_filter_hook(data, txn, relation, change))
+		return;
+
+	/* Avoid leaking memory by using and resetting our own context */
+	old = MemoryContextSwitchTo(data->context);
+
+	/* TODO: add caching (send only if changed) */
+	OutputPluginPrepareWrite(ctx, false);
+	pglogical_write_rel(ctx->out, relation);
+	OutputPluginWrite(ctx, false);
+
+	/* Send the data */
+	switch (change->action)
+	{
+		case REORDER_BUFFER_CHANGE_INSERT:
+			OutputPluginPrepareWrite(ctx, true);
+			pglogical_write_insert(ctx->out, data, relation,
+									&change->data.tp.newtuple->tuple);
+			OutputPluginWrite(ctx, true);
+			break;
+		case REORDER_BUFFER_CHANGE_UPDATE:
+			{
+				HeapTuple oldtuple = change->data.tp.oldtuple ?
+					&change->data.tp.oldtuple->tuple : NULL;
+
+				OutputPluginPrepareWrite(ctx, true);
+				pglogical_write_update(ctx->out, data, relation, oldtuple,
+										&change->data.tp.newtuple->tuple);
+				OutputPluginWrite(ctx, true);
+				break;
+			}
+		case REORDER_BUFFER_CHANGE_DELETE:
+			if (change->data.tp.oldtuple)
+			{
+				OutputPluginPrepareWrite(ctx, true);
+				pglogical_write_delete(ctx->out, data, relation,
+										&change->data.tp.oldtuple->tuple);
+				OutputPluginWrite(ctx, true);
+			}
+			else
+				elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
+			break;
+		default:
+			Assert(false);
+	}
+
+	/* Cleanup */
+	MemoryContextSwitchTo(old);
+	MemoryContextReset(data->context);
+}
+
+#ifdef HAVE_REPLICATION_ORIGINS
+/*
+ * Decide if the whole transaction with specific origin should be filtered out.
+ */
+static bool
+pg_decode_origin_filter(LogicalDecodingContext *ctx,
+						RepOriginId origin_id)
+{
+	PGLogicalOutputData *data = ctx->output_plugin_private;
+
+	if (!call_txn_filter_hook(data, origin_id))
+		return true;
+
+	if (!data->forward_changesets && origin_id != InvalidRepOriginId)
+		return true;
+
+	return false;
+}
+#endif
+
+static void
+send_startup_message(LogicalDecodingContext *ctx,
+		PGLogicalOutputData *data, bool last_message)
+{
+	char *msg;
+	int len;
+
+	Assert(!startup_message_sent);
+
+	prepare_startup_message(data, &msg, &len);
+
+	/*
+	 * We could free the extra_startup_params DefElem list here, but it's
+	 * pretty harmless to just ignore it, since it's in the decoding memory
+	 * context anyway, and we don't know if it's safe to free the defnames or
+	 * not.
+	 */
+
+	OutputPluginPrepareWrite(ctx, last_message);
+	write_startup_message(ctx->out, msg, len);
+	OutputPluginWrite(ctx, last_message);
+
+	pfree(msg);
+
+	startup_message_sent = true;
+}
+
+static void pg_decode_shutdown(LogicalDecodingContext * ctx)
+{
+	PGLogicalOutputData* data = (PGLogicalOutputData*)ctx->output_plugin_private;
+
+	call_shutdown_hook(data);
+
+	if (data->hooks_mctxt != NULL)
+	{
+		MemoryContextDelete(data->hooks_mctxt);
+		data->hooks_mctxt = NULL;
+	}
+}
diff --git a/contrib/pglogical_output/pglogical_output.h b/contrib/pglogical_output/pglogical_output.h
new file mode 100644
index 0000000..e835dca
--- /dev/null
+++ b/contrib/pglogical_output/pglogical_output.h
@@ -0,0 +1,98 @@
+/*-------------------------------------------------------------------------
+ *
+ * pglogical_output.h
+ *		pglogical output plugin
+ *
+ * Copyright (c) 2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		pglogical_output.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_LOGICAL_OUTPUT_H
+#define PG_LOGICAL_OUTPUT_H
+
+#include "nodes/parsenodes.h"
+
+#include "replication/logical.h"
+#include "replication/output_plugin.h"
+
+#include "storage/lock.h"
+
+#include "pglogical_output/hooks.h"
+
+#define PG_LOGICAL_PROTO_VERSION_NUM 1
+#define PG_LOGICAL_PROTO_MIN_VERSION_NUM 1
+
+/*
+ * The name of a hook function. This is used instead of the usual List*
+ * because can serve as a hash key.
+ *
+ * Must be zeroed on allocation if used as a hash key since padding is
+ * *not* ignored on compare.
+ */
+typedef struct HookFuncName
+{
+	/* funcname is more likely to be unique, so goes first */
+	char    function[NAMEDATALEN];
+	char    schema[NAMEDATALEN];
+} HookFuncName;
+
+typedef struct PGLogicalOutputData
+{
+	MemoryContext context;
+
+	/* protocol */
+	bool	allow_internal_basetypes;
+	bool	allow_binary_basetypes;
+	bool	forward_changesets;
+	bool	forward_changeset_origins;
+
+	/*
+	 * client info
+	 *
+	 * TODO: Lots of this should move to a separate
+	 * shorter-lived struct used only during parameter
+	 * reading.
+	 */
+	uint32	client_pg_version;
+	uint32	client_max_proto_version;
+	uint32	client_min_proto_version;
+	const char *client_expected_encoding;
+	uint32  client_binary_basetypes_major_version;
+	bool	client_want_internal_basetypes_set;
+	bool	client_want_internal_basetypes;
+	bool	client_want_binary_basetypes_set;
+	bool	client_want_binary_basetypes;
+	bool	client_binary_bigendian_set;
+	bool	client_binary_bigendian;
+	uint32	client_binary_sizeofdatum;
+	uint32	client_binary_sizeofint;
+	uint32	client_binary_sizeoflong;
+	bool	client_binary_float4byval_set;
+	bool	client_binary_float4byval;
+	bool	client_binary_float8byval_set;
+	bool	client_binary_float8byval;
+	bool	client_binary_intdatetimes_set;
+	bool	client_binary_intdatetimes;
+	bool	client_forward_changesets_set;
+	bool	client_forward_changesets;
+
+	/* hooks */
+	List *hooks_setup_funcname;
+	struct PGLogicalHooks hooks;
+	MemoryContext hooks_mctxt;
+
+	/* DefElem<String> list populated by startup hook */
+	List *extra_startup_params;
+} PGLogicalOutputData;
+
+typedef struct PGLogicalTupleData
+{
+	Datum	values[MaxTupleAttributeNumber];
+	bool	nulls[MaxTupleAttributeNumber];
+	bool	changed[MaxTupleAttributeNumber];
+} PGLogicalTupleData;
+
+#endif /* PG_LOGICAL_OUTPUT_H */
diff --git a/contrib/pglogical_output/pglogical_output/README b/contrib/pglogical_output/pglogical_output/README
new file mode 100644
index 0000000..5480e5c
--- /dev/null
+++ b/contrib/pglogical_output/pglogical_output/README
@@ -0,0 +1,7 @@
+/*
+ * This directory contains the public header files for the pglogical_output
+ * extension. It is installed into the PostgreSQL source tree when the extension
+ * is installed.
+ *
+ * These headers are not part of the PostgreSQL project its self.
+ */
diff --git a/contrib/pglogical_output/pglogical_output/compat.h b/contrib/pglogical_output/pglogical_output/compat.h
new file mode 100644
index 0000000..6d0b778
--- /dev/null
+++ b/contrib/pglogical_output/pglogical_output/compat.h
@@ -0,0 +1,19 @@
+#ifndef PG_LOGICAL_COMPAT_H
+#define PG_LOGICAL_COMPAT_H
+
+#include "pg_config.h"
+
+/* 9.4 lacks replication origins */
+#if PG_VERSION_NUM >= 90500
+#define HAVE_REPLICATION_ORIGINS
+#else
+/* To allow the same signature on hooks in 9.4 */
+typedef uint16 RepOriginId;
+#endif
+
+/* 9.4 lacks PG_UINT32_MAX */
+#ifndef PG_UINT32_MAX
+#define PG_UINT32_MAX UINT32_MAX
+#endif
+
+#endif
diff --git a/contrib/pglogical_output/pglogical_output/hooks.h b/contrib/pglogical_output/pglogical_output/hooks.h
new file mode 100644
index 0000000..139af44
--- /dev/null
+++ b/contrib/pglogical_output/pglogical_output/hooks.h
@@ -0,0 +1,74 @@
+#ifndef PGLOGICAL_OUTPUT_HOOKS_H
+#define PGLOGICAL_OUTPUT_HOOKS_H
+
+#include "access/xlogdefs.h"
+#include "nodes/pg_list.h"
+#include "utils/rel.h"
+#include "utils/palloc.h"
+#include "replication/reorderbuffer.h"
+
+#include "pglogical_output/compat.h"
+
+struct PGLogicalOutputData;
+typedef struct PGLogicalOutputData PGLogicalOutputData;
+
+/*
+ * This header is to be included by extensions that implement pglogical output
+ * plugin callback hooks for transaction origin and row filtering, etc. It is
+ * installed as "pglogical_output/hooks.h"
+ *
+ * See the README.md and the example in examples/hooks/ for details on hooks.
+ */
+
+
+struct PGLogicalStartupHookArgs
+{
+	void	   *private_data;
+	List	   *in_params;
+	List	   *out_params;
+};
+
+typedef void (*pglogical_startup_hook_fn)(struct PGLogicalStartupHookArgs *args);
+
+
+struct PGLogicalTxnFilterArgs
+{
+	void 	   *private_data;
+	RepOriginId	origin_id;
+};
+
+typedef bool (*pglogical_txn_filter_hook_fn)(struct PGLogicalTxnFilterArgs *args);
+
+
+struct PGLogicalRowFilterArgs
+{
+	void 	   *private_data;
+	Relation	changed_rel;
+	enum ReorderBufferChangeType	change_type;
+};
+
+typedef bool (*pglogical_row_filter_hook_fn)(struct PGLogicalRowFilterArgs *args);
+
+
+struct PGLogicalShutdownHookArgs
+{
+	void	   *private_data;
+};
+
+typedef void (*pglogical_shutdown_hook_fn)(struct PGLogicalShutdownHookArgs *args);
+
+/*
+ * This struct is passed to the pglogical_get_hooks_fn as the first argument,
+ * typed 'internal', and is unwrapped with `DatumGetPointer`.
+ */
+struct PGLogicalHooks
+{
+	pglogical_startup_hook_fn startup_hook;
+	pglogical_shutdown_hook_fn shutdown_hook;
+	pglogical_txn_filter_hook_fn txn_filter_hook;
+	pglogical_row_filter_hook_fn row_filter_hook;
+	void *hooks_private_data;
+};
+
+
+#endif /* PGLOGICAL_OUTPUT_HOOKS_H */
diff --git a/contrib/pglogical_output/pglogical_proto.c b/contrib/pglogical_output/pglogical_proto.c
new file mode 100644
index 0000000..0ce3014
--- /dev/null
+++ b/contrib/pglogical_output/pglogical_proto.c
@@ -0,0 +1,484 @@
+/*-------------------------------------------------------------------------
+ *
+ * pglogical_proto.c
+ * 		pglogical protocol functions
+ *
+ * Copyright (c) 2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  pglogical_proto.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "miscadmin.h"
+
+#include "pglogical_output.h"
+#include "pglogical_proto.h"
+
+#include "access/sysattr.h"
+#include "access/tuptoaster.h"
+#include "access/xact.h"
+
+#include "catalog/catversion.h"
+#include "catalog/index.h"
+
+#include "catalog/namespace.h"
+#include "catalog/pg_class.h"
+#include "catalog/pg_database.h"
+#include "catalog/pg_namespace.h"
+#include "catalog/pg_type.h"
+
+#include "commands/dbcommands.h"
+
+#include "executor/spi.h"
+
+#include "libpq/pqformat.h"
+
+#include "mb/pg_wchar.h"
+
+#include "utils/builtins.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/rel.h"
+#include "utils/syscache.h"
+#include "utils/timestamp.h"
+#include "utils/typcache.h"
+
+#define IS_REPLICA_IDENTITY 1
+
+static void pglogical_write_attrs(StringInfo out, Relation rel);
+static void pglogical_write_tuple(StringInfo out, PGLogicalOutputData *data,
+								   Relation rel, HeapTuple tuple);
+static char decide_datum_transfer(Form_pg_attribute att,
+								  Form_pg_type typclass,
+								  bool allow_internal_basetypes,
+								  bool allow_binary_basetypes);
+
+/*
+ * Write relation description to the output stream.
+ */
+void
+pglogical_write_rel(StringInfo out, Relation rel)
+{
+	const char *nspname;
+	uint8		nspnamelen;
+	const char *relname;
+	uint8		relnamelen;
+	uint8		flags = 0;
+
+	pq_sendbyte(out, 'R');		/* sending RELATION */
+
+	/* send the flags field */
+	pq_sendbyte(out, flags);
+
+	/* use Oid as relation identifier */
+	pq_sendint(out, RelationGetRelid(rel), 4);
+
+	nspname = get_namespace_name(rel->rd_rel->relnamespace);
+	if (nspname == NULL)
+		elog(ERROR, "cache lookup failed for namespace %u",
+			 rel->rd_rel->relnamespace);
+	nspnamelen = strlen(nspname) + 1;
+
+	relname = NameStr(rel->rd_rel->relname);
+	relnamelen = strlen(relname) + 1;
+
+	pq_sendbyte(out, nspnamelen);		/* schema name length */
+	pq_sendbytes(out, nspname, nspnamelen);
+
+	pq_sendbyte(out, relnamelen);		/* table name length */
+	pq_sendbytes(out, relname, relnamelen);
+
+	/* send the attribute info */
+	pglogical_write_attrs(out, rel);
+}
+
+/*
+ * Write relation attributes to the outputstream.
+ */
+static void
+pglogical_write_attrs(StringInfo out, Relation rel)
+{
+	TupleDesc	desc;
+	int			i;
+	uint16		nliveatts = 0;
+	Bitmapset  *idattrs;
+
+	desc = RelationGetDescr(rel);
+
+	pq_sendbyte(out, 'A');			/* sending ATTRS */
+
+	/* send number of live attributes */
+	for (i = 0; i < desc->natts; i++)
+	{
+		if (desc->attrs[i]->attisdropped)
+			continue;
+		nliveatts++;
+	}
+	pq_sendint(out, nliveatts, 2);
+
+	/* fetch bitmap of REPLICATION IDENTITY attributes */
+	idattrs = RelationGetIndexAttrBitmap(rel, INDEX_ATTR_BITMAP_IDENTITY_KEY);
+
+	/* send the attributes */
+	for (i = 0; i < desc->natts; i++)
+	{
+		Form_pg_attribute att = desc->attrs[i];
+		uint8			flags = 0;
+		uint16			len;
+		const char	   *attname;
+
+		if (att->attisdropped)
+			continue;
+
+		if (bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+						  idattrs))
+			flags |= IS_REPLICA_IDENTITY;
+
+		pq_sendbyte(out, 'C');		/* column definition follows */
+		pq_sendbyte(out, flags);
+
+		pq_sendbyte(out, 'N');		/* column name block follows */
+		attname = NameStr(att->attname);
+		len = strlen(attname) + 1;
+		pq_sendint(out, len, 2);
+		pq_sendbytes(out, attname, len); /* data */
+	}
+}
+
+/*
+ * Write BEGIN to the output stream.
+ */
+void
+pglogical_write_begin(StringInfo out, ReorderBufferTXN *txn)
+{
+	uint8	flags = 0;
+
+	pq_sendbyte(out, 'B');		/* BEGIN */
+
+	/* send the flags field its self */
+	pq_sendbyte(out, flags);
+
+	/* fixed fields */
+	pq_sendint64(out, txn->final_lsn);
+	pq_sendint64(out, txn->commit_time);
+	pq_sendint(out, txn->xid, 4);
+}
+
+/*
+ * Write COMMIT to the output stream.
+ */
+void
+pglogical_write_commit(StringInfo out, ReorderBufferTXN *txn,
+						XLogRecPtr commit_lsn)
+{
+	uint8 flags = 0;
+
+	pq_sendbyte(out, 'C');		/* sending COMMIT */
+
+	/* send the flags field */
+	pq_sendbyte(out, flags);
+
+	/* send fixed fields */
+	pq_sendint64(out, commit_lsn);
+	pq_sendint64(out, txn->end_lsn);
+	pq_sendint64(out, txn->commit_time);
+}
+
+/*
+ * Write ORIGIN to the output stream.
+ */
+void
+pglogical_write_origin(StringInfo out, const char *origin,
+						XLogRecPtr origin_lsn)
+{
+	uint8	flags = 0;
+	uint8	len;
+
+	Assert(strlen(origin) < 255);
+
+	pq_sendbyte(out, 'O');		/* ORIGIN */
+
+	/* send the flags field its self */
+	pq_sendbyte(out, flags);
+
+	/* fixed fields */
+	pq_sendint64(out, origin_lsn);
+
+	/* origin */
+	len = strlen(origin) + 1;
+	pq_sendbyte(out, len);
+	pq_sendbytes(out, origin, len);
+}
+
+/*
+ * Write INSERT to the output stream.
+ */
+void
+pglogical_write_insert(StringInfo out, PGLogicalOutputData *data,
+						Relation rel, HeapTuple newtuple)
+{
+	uint8 flags = 0;
+
+	pq_sendbyte(out, 'I');		/* action INSERT */
+
+	/* send the flags field */
+	pq_sendbyte(out, flags);
+
+	/* use Oid as relation identifier */
+	pq_sendint(out, RelationGetRelid(rel), 4);
+
+	pq_sendbyte(out, 'N');		/* new tuple follows */
+	pglogical_write_tuple(out, data, rel, newtuple);
+}
+
+/*
+ * Write UPDATE to the output stream.
+ */
+void
+pglogical_write_update(StringInfo out, PGLogicalOutputData *data,
+						Relation rel, HeapTuple oldtuple, HeapTuple newtuple)
+{
+	uint8 flags = 0;
+
+	pq_sendbyte(out, 'U');		/* action UPDATE */
+
+	/* send the flags field */
+	pq_sendbyte(out, flags);
+
+	/* use Oid as relation identifier */
+	pq_sendint(out, RelationGetRelid(rel), 4);
+
+	/* FIXME support whole tuple (O tuple type) */
+	if (oldtuple != NULL)
+	{
+		pq_sendbyte(out, 'K');	/* old key follows */
+		pglogical_write_tuple(out, data, rel, oldtuple);
+	}
+
+	pq_sendbyte(out, 'N');		/* new tuple follows */
+	pglogical_write_tuple(out, data, rel, newtuple);
+}
+
+/*
+ * Write DELETE to the output stream.
+ */
+void
+pglogical_write_delete(StringInfo out, PGLogicalOutputData *data,
+						Relation rel, HeapTuple oldtuple)
+{
+	uint8 flags = 0;
+
+	pq_sendbyte(out, 'D');		/* action DELETE */
+
+	/* send the flags field */
+	pq_sendbyte(out, flags);
+
+	/* use Oid as relation identifier */
+	pq_sendint(out, RelationGetRelid(rel), 4);
+
+	/* FIXME support whole tuple (O tuple type) */
+	pq_sendbyte(out, 'K');	/* old key follows */
+	pglogical_write_tuple(out, data, rel, oldtuple);
+}
+
+/*
+ * Most of the brains for startup message creation lives in
+ * pglogical_config.c, so this presently just sends the set of key/value pairs.
+ */
+void
+write_startup_message(StringInfo out, const char *msg, int len)
+{
+	pq_sendbyte(out, 'S');	/* message type field */
+	pq_sendbyte(out, 1); 	/* startup message version */
+	pq_sendbytes(out, msg, len);	/* null-terminated key/value pairs */
+}
+
+/*
+ * Write a tuple to the outputstream, in the most efficient format possible.
+ */
+static void
+pglogical_write_tuple(StringInfo out, PGLogicalOutputData *data,
+					   Relation rel, HeapTuple tuple)
+{
+	TupleDesc	desc;
+	Datum		values[MaxTupleAttributeNumber];
+	bool		isnull[MaxTupleAttributeNumber];
+	int			i;
+	uint16		nliveatts = 0;
+
+	desc = RelationGetDescr(rel);
+
+	pq_sendbyte(out, 'T');			/* sending TUPLE */
+
+	for (i = 0; i < desc->natts; i++)
+	{
+		if (desc->attrs[i]->attisdropped)
+			continue;
+		nliveatts++;
+	}
+	pq_sendint(out, nliveatts, 2);
+
+	/* try to allocate enough memory from the get go */
+	enlargeStringInfo(out, tuple->t_len +
+					  nliveatts * (1 + 4));
+
+	/*
+	 * XXX: should this prove to be a relevant bottleneck, it might be
+	 * interesting to inline heap_deform_tuple() here, we don't actually need
+	 * the information in the form we get from it.
+	 */
+	heap_deform_tuple(tuple, desc, values, isnull);
+
+	for (i = 0; i < desc->natts; i++)
+	{
+		HeapTuple	typtup;
+		Form_pg_type typclass;
+		Form_pg_attribute att = desc->attrs[i];
+		char		transfer_type;
+
+		/* skip dropped columns */
+		if (att->attisdropped)
+			continue;
+
+		if (isnull[i])
+		{
+			pq_sendbyte(out, 'n');	/* null column */
+			continue;
+		}
+		else if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
+		{
+			pq_sendbyte(out, 'u');	/* unchanged toast column */
+			continue;
+		}
+
+		typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
+		if (!HeapTupleIsValid(typtup))
+			elog(ERROR, "cache lookup failed for type %u", att->atttypid);
+		typclass = (Form_pg_type) GETSTRUCT(typtup);
+
+		transfer_type = decide_datum_transfer(att, typclass,
+											  data->allow_internal_basetypes,
+											  data->allow_binary_basetypes);
+
+		switch (transfer_type)
+		{
+			case 'i':
+				pq_sendbyte(out, 'i');	/* internal-format binary data follows */
+
+				/* pass by value */
+				if (att->attbyval)
+				{
+					pq_sendint(out, att->attlen, 4); /* length */
+
+					enlargeStringInfo(out, att->attlen);
+					store_att_byval(out->data + out->len, values[i],
+									att->attlen);
+					out->len += att->attlen;
+					out->data[out->len] = '\0';
+				}
+				/* fixed length non-varlena pass-by-reference type */
+				else if (att->attlen > 0)
+				{
+					pq_sendint(out, att->attlen, 4); /* length */
+
+					appendBinaryStringInfo(out, DatumGetPointer(values[i]),
+										   att->attlen);
+				}
+				/* varlena type */
+				else if (att->attlen == -1)
+				{
+					char *data = DatumGetPointer(values[i]);
+
+					/* send indirect datums inline */
+					if (VARATT_IS_EXTERNAL_INDIRECT(values[i]))
+					{
+						struct varatt_indirect redirect;
+						VARATT_EXTERNAL_GET_POINTER(redirect, data);
+						data = (char *) redirect.pointer;
+					}
+
+					Assert(!VARATT_IS_EXTERNAL(data));
+
+					pq_sendint(out, VARSIZE_ANY(data), 4); /* length */
+
+					appendBinaryStringInfo(out, data, VARSIZE_ANY(data));
+				}
+				else
+					elog(ERROR, "unsupported tuple type");
+
+				break;
+
+			case 'b':
+				{
+					bytea	   *outputbytes;
+					int			len;
+
+					pq_sendbyte(out, 'b');	/* binary send/recv data follows */
+
+					outputbytes = OidSendFunctionCall(typclass->typsend,
+													  values[i]);
+
+					len = VARSIZE(outputbytes) - VARHDRSZ;
+					pq_sendint(out, len, 4); /* length */
+					pq_sendbytes(out, VARDATA(outputbytes), len); /* data */
+					pfree(outputbytes);
+				}
+				break;
+
+			default:
+				{
+					char   	   *outputstr;
+					int			len;
+
+					pq_sendbyte(out, 't');	/* 'text' data follows */
+
+					outputstr =	OidOutputFunctionCall(typclass->typoutput,
+													  values[i]);
+					len = strlen(outputstr) + 1;
+					pq_sendint(out, len, 4); /* length */
+					appendBinaryStringInfo(out, outputstr, len); /* data */
+					pfree(outputstr);
+				}
+		}
+
+		ReleaseSysCache(typtup);
+	}
+}
+
+/*
+ * Make the executive decision about which protocol to use.
+ */
+static char
+decide_datum_transfer(Form_pg_attribute att, Form_pg_type typclass,
+					  bool allow_internal_basetypes,
+					  bool allow_binary_basetypes)
+{
+	/*
+	 * Use the binary protocol, if allowed, for builtin & plain datatypes.
+	 */
+	if (allow_internal_basetypes &&
+		typclass->typtype == 'b' &&
+		att->atttypid < FirstNormalObjectId &&
+		typclass->typelem == InvalidOid)
+	{
+		return 'i';
+	}
+	/*
+	 * Use send/recv, if allowed, if the type is plain or builtin.
+	 *
+	 * XXX: we can't use send/recv for array or composite types for now due to
+	 * the embedded oids.
+	 */
+	else if (allow_binary_basetypes &&
+			 OidIsValid(typclass->typreceive) &&
+			 (att->atttypid < FirstNormalObjectId || typclass->typtype != 'c') &&
+			 (att->atttypid < FirstNormalObjectId || typclass->typelem == InvalidOid))
+	{
+		return 'b';
+	}
+
+	return 't';
+}
diff --git a/contrib/pglogical_output/pglogical_proto.h b/contrib/pglogical_output/pglogical_proto.h
new file mode 100644
index 0000000..77cff87
--- /dev/null
+++ b/contrib/pglogical_output/pglogical_proto.h
@@ -0,0 +1,36 @@
+/*-------------------------------------------------------------------------
+ *
+ * pglogical_proto.c
+ *		pglogical protocol
+ *
+ * Copyright (c) 2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  pglogical_proto.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_LOGICAL_PROTO_H
+#define PG_LOGICAL_PROTO_H
+
+
+void pglogical_write_rel(StringInfo out, Relation rel);
+
+void pglogical_write_begin(StringInfo out, ReorderBufferTXN *txn);
+void pglogical_write_commit(StringInfo out, ReorderBufferTXN *txn,
+							 XLogRecPtr commit_lsn);
+
+void pglogical_write_origin(StringInfo out, const char *origin,
+							 XLogRecPtr origin_lsn);
+
+void pglogical_write_insert(StringInfo out, PGLogicalOutputData *data,
+							 Relation rel, HeapTuple newtuple);
+void pglogical_write_update(StringInfo out, PGLogicalOutputData *data,
+							 Relation rel, HeapTuple oldtuple,
+							 HeapTuple newtuple);
+void pglogical_write_delete(StringInfo out, PGLogicalOutputData *data,
+							 Relation rel, HeapTuple oldtuple);
+
+void write_startup_message(StringInfo out, const char *msg, int len);
+
+#endif /* PG_LOGICAL_PROTO_H */
diff --git a/contrib/pglogical_output/regression.conf b/contrib/pglogical_output/regression.conf
new file mode 100644
index 0000000..367f706
--- /dev/null
+++ b/contrib/pglogical_output/regression.conf
@@ -0,0 +1,2 @@
+wal_level = logical
+max_replication_slots = 4
diff --git a/contrib/pglogical_output/sql/basic.sql b/contrib/pglogical_output/sql/basic.sql
new file mode 100644
index 0000000..8f08bdc
--- /dev/null
+++ b/contrib/pglogical_output/sql/basic.sql
@@ -0,0 +1,49 @@
+SET synchronous_commit = on;
+
+-- Schema setup
+
+CREATE TABLE demo (
+	seq serial primary key,
+	tx text,
+	ts timestamp,
+	jsb jsonb,
+	js json,
+	ba bytea
+);
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pglogical_output');
+
+-- Queue up some work to decode with a variety of types
+
+INSERT INTO demo(tx) VALUES ('textval');
+INSERT INTO demo(ba) VALUES (BYTEA '\xDEADBEEF0001');
+INSERT INTO demo(ts, tx) VALUES (TIMESTAMP '2045-09-12 12:34:56.00', 'blah');
+INSERT INTO demo(js, jsb) VALUES ('{"key":"value"}', '{"key":"value"}');
+
+-- Simple decode with text-format tuples
+--
+-- It's still the logical decoding binary protocol and as such it has
+-- embedded timestamps, and pglogical its self has embedded LSNs, xids,
+-- etc. So all we can really do is say "yup, we got the expected number
+-- of messages".
+SELECT count(data) FROM pg_logical_slot_peek_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1');
+
+-- ... and send/recv binary format
+-- The main difference visible is that the bytea fields aren't encoded
+SELECT count(data) FROM pg_logical_slot_peek_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'binary.want_binary_basetypes', '1',
+	'binary.basetypes_major_version', (current_setting('server_version_num')::integer / 100)::text);
+
+SELECT 'drop' FROM pg_drop_replication_slot('regression_slot');
+
+DROP TABLE demo;
diff --git a/contrib/pglogical_output/sql/hooks.sql b/contrib/pglogical_output/sql/hooks.sql
new file mode 100644
index 0000000..adcdfb1
--- /dev/null
+++ b/contrib/pglogical_output/sql/hooks.sql
@@ -0,0 +1,86 @@
+CREATE EXTENSION pglogical_output_plhooks;
+
+CREATE FUNCTION test_filter(relid regclass, action "char", nodeid text)
+returns bool stable language plpgsql AS $$
+BEGIN
+	IF nodeid <> 'foo' THEN
+	    RAISE EXCEPTION 'Expected nodeid <foo>, got <%>',nodeid;
+	END IF;
+	RETURN relid::regclass::text NOT LIKE '%_filter%';
+END
+$$;
+
+CREATE FUNCTION test_action_filter(relid regclass, action "char", nodeid text)
+returns bool stable language plpgsql AS $$
+BEGIN
+    RETURN action NOT IN ('U', 'D');
+END
+$$;
+
+CREATE FUNCTION wrong_signature_fn(relid regclass)
+returns bool stable language plpgsql as $$
+BEGIN
+END;
+$$;
+
+CREATE TABLE test_filter(id integer);
+CREATE TABLE test_nofilt(id integer);
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pglogical_output');
+
+INSERT INTO test_filter(id) SELECT generate_series(1,10);
+INSERT INTO test_nofilt(id) SELECT generate_series(1,10);
+
+DELETE FROM test_filter WHERE id % 2 = 0;
+DELETE FROM test_nofilt WHERE id % 2 = 0;
+UPDATE test_filter SET id = id*100 WHERE id = 5;
+UPDATE test_nofilt SET id = id*100 WHERE id = 5;
+
+SELECT count(data) FROM pg_logical_slot_peek_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'hooks.setup_function', 'public.pglo_plhooks_setup_fn',
+	'pglo_plhooks.row_filter_hook', 'public.test_filter',
+	'pglo_plhooks.client_hook_arg', 'foo'
+	);
+
+SELECT count(data) FROM pg_logical_slot_peek_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'hooks.setup_function', 'public.pglo_plhooks_setup_fn',
+	'pglo_plhooks.row_filter_hook', 'public.test_action_filter'
+	);
+
+SELECT count(data) FROM pg_logical_slot_peek_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'hooks.setup_function', 'public.pglo_plhooks_setup_fn',
+	'pglo_plhooks.row_filter_hook', 'public.nosuchfunction'
+	);
+
+SELECT count(data) FROM pg_logical_slot_peek_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'hooks.setup_function', 'public.pglo_plhooks_setup_fn',
+	'pglo_plhooks.row_filter_hook', 'public.wrong_signature_fn'
+	);
+
+
+SELECT 'drop' FROM pg_drop_replication_slot('regression_slot');
+
+DROP TABLE test_filter;
+DROP TABLE test_nofilt;
+
+DROP EXTENSION pglogical_output_plhooks;
diff --git a/contrib/pglogical_output/sql/params.sql b/contrib/pglogical_output/sql/params.sql
new file mode 100644
index 0000000..24ac347
--- /dev/null
+++ b/contrib/pglogical_output/sql/params.sql
@@ -0,0 +1,77 @@
+SET synchronous_commit = on;
+
+-- no need to CREATE EXTENSION as we intentionally don't have any catalog presence
+-- Instead, just create a slot.
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pglogical_output');
+
+-- Minimal invocation with no data
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1');
+
+--
+-- Various invalid parameter combos:
+--
+
+-- Text mode is not supported
+SELECT data FROM pg_logical_slot_get_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1');
+
+-- error, only supports proto v1
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '2',
+	'max_proto_version', '1',
+	'startup_params_format', '1');
+
+-- error, only supports proto v1
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '2',
+	'max_proto_version', '2',
+	'startup_params_format', '1');
+
+-- error, unrecognised startup params format
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '2');
+
+-- Should be OK and result in proto version 1 selection, though we won't
+-- see that here.
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '2',
+	'startup_params_format', '1');
+
+-- no such encoding / encoding mismatch
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'bork',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1');
+
+-- Currently we're sensitive to the encoding name's format (TODO)
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF-8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1');
+
+SELECT 'drop' FROM pg_drop_replication_slot('regression_slot');
diff --git a/contrib/pglogical_output/test/Makefile b/contrib/pglogical_output/test/Makefile
new file mode 100644
index 0000000..7a6c0ed
--- /dev/null
+++ b/contrib/pglogical_output/test/Makefile
@@ -0,0 +1,6 @@
+all: check
+
+check:
+	@python -m unittest discover
+
+.PHONY: check all
diff --git a/contrib/pglogical_output/test/README.md b/contrib/pglogical_output/test/README.md
new file mode 100644
index 0000000..9a91e1e
--- /dev/null
+++ b/contrib/pglogical_output/test/README.md
@@ -0,0 +1,91 @@
+What are these tests?
+---
+
+These tests exersise the pglogical protocol, parameter validation, hooks and
+filters, and the overall behaviour of the extension. They are *not* the tests
+run by `make check` or `make installcheck` on the top level source directory;
+those are the `pg_regress` tests discussed in the "tests" section of the
+top-level `README.md`.
+
+QUICK START
+---
+
+To run these tests:
+
+* Install the output plugin into your PostgreSQL instance, e.g.
+
+        make USE_PGXS=1 install
+
+  Use the same options, environment variables, etc as used for compiling,
+  most notably the `PATH` to ensure the same `pg_config` is used.
+
+* Create a temporary PostgreSQL datadir at any location of your choosing:
+
+        initdb -A trust -D tmp_install
+
+* Start the temporary PostgreSQL instance with:
+
+        PGPORT=5142 postgres -D tmp_install -c max_replication_slots=5 -c wal_level=logical -c max_wal_senders=10 -c track_commit_timestamp=on
+
+  (leave out `track_commit_timestamp=on` for 9.4)
+
+* In another session, in the test directory:
+
+        PGPORT=5142 make
+
+RUNNING JUST ONE TEST
+---
+
+To run just one test, specify the class-qualified method name.
+
+    PGPORT=5142 python test/test_filter.py FilterTest.test_filter
+
+WALSENDER VS SQL MODE
+---
+
+By default the tests use the SQL interface for logical decoding.
+
+You can instead use the walsender interface, i.e. the streaming replication
+protocol. However, this requires a patched psycopg2 at time of writing. You
+can get the branch from https://github.com/zalando/psycopg2/tree/feature/replication-protocol
+
+You should uninstall your existing `psycopg2` packages, then:
+
+    git clone https://github.com/zalando/psycopg2.git
+    git checkout feature/replication-protocol
+    PATH=/path/to/pg/bin:$PATH python setup.py build
+    sudo PATH=/path/to/pg/bin:$PATH python setup.py install
+
+Now run the tests with the extra enviroment variable PGLOGICALTEST_USEWALSENDER=1
+set, e.g.
+
+    PGLOGICALTEST_USEWALSENDER=1 PGPORT=5142 make
+
+At time of writing the walsender tests may not always be passing, as the
+SQL tests are the authorative ones.
+
+DETAILED LOGGING
+---
+
+You can get more detailed info about what's being done by setting the env var
+`PGLOGICALTEST_LOGLEVEL=DEBUG`
+
+TROUBLESHOOTING
+---
+
+No module named psycopg2
+===
+
+If you get an error like:
+
+    ImportError: No module named psycopg2
+
+you need to install `psycopg2` for your local Python install. It'll be
+available as a package via the same channel you installed Python its self from.
+
+could not access file "pglogical_output": No such file or directory
+===
+
+You forgot to install the output plugin before running the tests, or
+the tests are connecting to a different PostgreSQL instance than the
+one you installed the plugin in.
diff --git a/contrib/pglogical_output/test/base.py b/contrib/pglogical_output/test/base.py
new file mode 100644
index 0000000..ab7fec0
--- /dev/null
+++ b/contrib/pglogical_output/test/base.py
@@ -0,0 +1,285 @@
+import unittest
+import psycopg2
+import psycopg2.extras;
+import cStringIO
+import logging
+import pprint
+import psycopg2.extensions
+import select
+import time
+import sys
+import os
+from pglogical_protoreader import ProtocolReader
+
+from pglogical_proto import ReplicationMessage
+
+SLOT_NAME = 'test'
+
+class BaseDecodingInterface(object):
+    """Helper for handling the different decoding interfaces"""
+
+    conn = None
+    cur = None
+
+    def __init__(self, connstring, logger):
+        # Establish base connection, which we use in walsender mode too
+        self.logger = logger
+        self.connstring = connstring
+        self.conn = psycopg2.connect(self.connstring)
+        self.logger.debug("Acquired connection with pid %s", self.conn.get_backend_pid())
+        self.conn.autocommit = True
+        self.cur = self.conn.cursor()
+
+    def slot_exists(self):
+        self.cur.execute("SELECT 1 FROM pg_replication_slots WHERE slot_name = %s", (SLOT_NAME,))
+        return self.cur.rowcount == 1
+
+    def drop_slot_when_inactive(self):
+        self.logger.debug("Dropping slot %s", SLOT_NAME)
+        try:
+            # We can't use the walsender protocol connection to drop
+            # the slot because we have no way to exit COPY BOTH mode
+            # so close the connection (above) and drop from SQL.
+            if self.cur is not None:
+                # There's a race between walsender disconnect and the slot becoming
+                # free. We should use a DO block, but this will do for now.
+                #
+                # this is only an issue in walsender mode, but might as well do
+                # it anyway.
+                self.cur.execute("""
+                DO
+                LANGUAGE plpgsql
+                $$
+                DECLARE
+                    timeleft float := 5.0;
+                    _slotname name := %s;
+                BEGIN
+                    IF NOT EXISTS (SELECT 1 FROM pg_catalog.pg_replication_slots WHERE slot_name = _slotname)
+                    THEN
+                        RETURN;
+                    END IF;
+                    WHILE (SELECT active FROM pg_catalog.pg_replication_slots WHERE slot_name = _slotname) AND timeleft > 0
+                    LOOP
+                        PERFORM pg_sleep(0.1);
+                        timeleft := timeleft - 0.1;
+                    END LOOP;
+                    IF timeleft > 0 THEN
+                        PERFORM pg_drop_replication_slot(_slotname);
+                    ELSE
+                        RAISE EXCEPTION 'Timed out waiting for slot to become unused';
+                    END IF;
+                END;
+                $$
+                """, (SLOT_NAME,))
+        except psycopg2.ProgrammingError, ex:
+            self.logger.exception("Attempt to DROP slot %s failed", SLOT_NAME)
+        self.logger.debug("Dropped slot %s", SLOT_NAME)
+
+    def cleanup(self):
+        if self.cur is not None:
+            self.cur.close()
+        if self.conn is not None:
+            self.conn.close()
+
+    def _get_changes_params(self, kwargs):
+        params_dict = {
+                'expected_encoding': 'UTF8',
+                'min_proto_version': '1',
+                'max_proto_version': '1',
+                'startup_params_format': '1'
+                }
+        params_dict.update(kwargs)
+        return params_dict
+
+
+
+class SQLDecodingInterface(BaseDecodingInterface):
+    """Use the SQL level logical decoding interfaces"""
+
+    def __init__(self, connstring, parentlogger=logging.getLogger('base')):
+        BaseDecodingInterface.__init__(self, connstring, logger=parentlogger.getChild('sqldecoding:%s' % hex(id(self))))
+
+        # cleanup old slot
+        if self.slot_exists():
+            self.cur.execute("SELECT * FROM pg_drop_replication_slot(%s)", (SLOT_NAME,))
+
+        # Create slot to use in testing
+        self.cur.execute("SELECT * FROM pg_create_logical_replication_slot(%s, 'pglogical_output')", (SLOT_NAME,))
+
+    def cleanup(self):
+        self.logger.debug("Closing sql decoding connection")
+        self.drop_slot_when_inactive()
+        BaseDecodingInterface.cleanup(self)
+        self.logger.debug("Closed sql decoding connection")
+
+    def get_changes(self, kwargs = {}):
+        params_dict = self._get_changes_params(kwargs)
+
+        # Filter out entries with value None
+        params = [i for k, v in params_dict.items() for i in [k,str(v)] if v is not None]
+        # and create the slot
+        try:
+            self.cur.execute("SELECT * FROM pg_logical_slot_get_binary_changes(%s, NULL, NULL" + (", %s" * len(params)) + ")",
+                    [SLOT_NAME] + params);
+        finally:
+            self.conn.commit()
+
+        for row in self.cur:
+            yield ReplicationMessage(row)
+
+
+
+class WalsenderDecodingInterface(BaseDecodingInterface):
+    """Use the replication protocol interfaces"""
+
+    walcur = None
+    walconn = None
+    select_timeout = 1
+    replication_started = False
+
+    def __init__(self, connstring, parentlogger=logging.getLogger('base')):
+        BaseDecodingInterface.__init__(self, connstring, logger=parentlogger.getChild('waldecoding:%s' % hex(id(self))))
+
+        # Establish an async logical replication connection
+        self.walconn = psycopg2.connect(self.connstring,
+                connection_factory=psycopg2.extras.LogicalReplicationConnection)
+        self.logger.debug("Acquired replication connection with pid %s", self.walconn.get_backend_pid())
+        self.walcur = self.walconn.cursor()
+
+        # clean up old slot
+        if self.slot_exists():
+                self.walcur.drop_replication_slot(SLOT_NAME)
+
+        # Create slot to use in testing
+        self.walcur.create_replication_slot(SLOT_NAME, output_plugin='pglogical_output')
+        slotinfo = self.walcur.fetchone()
+        self.logger.debug("Got slot info %s", slotinfo)
+
+
+    def cleanup(self):
+        self.logger.debug("Closing walsender connection")
+
+        if self.walcur is not None:
+            self.walcur.close()
+        if self.walconn is not None:
+            self.walconn.close()
+
+        self.replication_started = False
+
+        self.drop_slot_when_inactive()
+        BaseDecodingInterface.cleanup(self)
+        self.logger.debug("Closed walsender connection")
+
+    def get_changes(self, kwargs = {}):
+        params_dict = self._get_changes_params(kwargs)
+
+        if not self.replication_started:
+            self.walcur.start_replication(slot_name=SLOT_NAME,
+                    options={k: v for k, v in params_dict.iteritems() if v is not None})
+            self.replication_started = True
+        try:
+            while True:
+                # There's never any "done" or "last message", so just keep
+                # reading as long as the caller asks. If select times out,
+                # a normal client would send feedback. We'll treat it as a
+                # failure instead, since the caller asked for a message we
+                # are apparently not going to receive.
+                message = self.walcur.read_replication_message(decode=False)
+                if message is None:
+                    self.logger.debug("No message pending, select()ing with timeout %s", self.select_timeout)
+                    sel = select.select([self.walcur], [], [], self.select_timeout)
+                    if not sel[0]:
+                        raise IOError("Server didn't send an expected message before timeout")
+                else:
+                    if len(message.payload) < 200:
+                        self.logger.debug("payload: %s", repr(message.payload))
+                    else:
+                        self.logger.debug("payload (truncated): %s...", repr(message.payload)[:200])
+                    yield ReplicationMessage((message.data_start, None, message.payload))
+        except psycopg2.InternalError, ex:
+            self.logger.debug("While retrieving a message: sqlstate=%s", ex.pgcode, exc_info=True)
+
+
+
+
+class PGLogicalOutputTest(unittest.TestCase):
+
+    connstring = "dbname=postgres host=localhost"
+    interface = None
+
+    def setUp(self):
+        # A counter we can increment each time we reconnet with decoding,
+        # for logging purposes.
+        self.decoding_generation = 0
+
+        # Set up our logger
+        self.logger = logging.getLogger(self.__class__.__name__)
+        self.loghandler = logging.StreamHandler()
+        for handler in self.logger.handlers:
+            self.logger.removeHandler(handler)
+        self.logger.addHandler(self.loghandler)
+        self.logger.setLevel(os.environ.get('PGLOGICALTEST_LOGLEVEL', 'INFO'))
+
+        # Get connections for test classes to use to run SQL
+        self.conn = psycopg2.connect(self.connstring, connection_factory=psycopg2.extras.LoggingConnection)
+        self.conn.initialize(self.logger.getChild('sql'))
+        self.cur = self.conn.cursor()
+
+        if hasattr(self, 'set_up'):
+            self.set_up()
+
+    def tearDown(self):
+        if hasattr(self, 'tear_down'):
+            self.tear_down()
+
+        if self.conn is not None:
+            self.conn.close()
+
+    def doCleanups(self):
+        if self.interface:
+            self.interface.cleanup()
+
+    def reconnect_decoding(self):
+        """
+        Close the logical decoding connection and re-establish it.
+
+        This is useful when we want to restart decoding with different parameters,
+        since in walsender mode there's no way to end a decoding session once
+        begun.
+        """
+        if self.interface is not None:
+            self.logger.debug("Disconnecting old decoding session and forcing reconnect")
+            self.interface.cleanup()
+
+        self.connect_decoding()
+
+    def connect_decoding(self):
+        """
+        Make a slot and establish a decoding connection.
+
+        Prior to this changes are not recorded, which is useful for setup.
+        """
+        self.decoding_generation += 1
+        fmt = logging.Formatter('%%(name)-50s w=%s %%(message)s' % (self.decoding_generation,))
+        self.loghandler.setFormatter(fmt)
+
+        if os.environ.get("PGLOGICALTEST_USEWALSENDER", None):
+            self.interface = WalsenderDecodingInterface(self.connstring, parentlogger=self.logger)
+        else:
+            self.interface = SQLDecodingInterface(self.connstring, parentlogger=self.logger)
+
+
+    def get_changes(self, kwargs = {}):
+        """
+        Get a stream of messages as a generator that may be read from
+        to fetch a new message each call. Messages are instances of
+        class ReplicationMessage .
+
+        The generator has helper methods for decoding particular
+        types of message, for validation, etc.
+        """
+        if self.interface is None:
+            raise ValueError("No logical decoding connection. Call connect_decoding()")
+
+        msg_gen = self.interface.get_changes(kwargs)
+        return ProtocolReader(msg_gen, tester=self, parentlogger=self.logger)
diff --git a/contrib/pglogical_output/test/pglogical_proto.py b/contrib/pglogical_output/test/pglogical_proto.py
new file mode 100644
index 0000000..e8642e3
--- /dev/null
+++ b/contrib/pglogical_output/test/pglogical_proto.py
@@ -0,0 +1,240 @@
+from StringIO import StringIO
+import struct
+import datetime
+
+class UnchangedField(object):
+    """Opaque placeholder object for a TOASTed field that didn't change"""
+    pass
+
+def readcstr(f):
+    buf = bytearray()
+    while True:
+        b = f.read(1)
+        if b is None or len(b) == 0:
+            if len(buf) == 0:
+                return None
+            else:
+                raise ValueError("non-terminated string at EOF")
+        elif b is '\0':
+            return str(buf)
+        else:
+            buf.append(b)
+
+class ReplicationMessage(object):
+    def __new__(cls, msg):
+        msgtype = msg[2][0]
+        if msgtype == "S":
+            cls = StartupMessage
+        elif msgtype == "B":
+            cls = BeginMessage
+        elif msgtype == "C":
+            cls = CommitMessage
+        elif msgtype == "O":
+            cls = OriginMessage
+        elif msgtype == "R":
+            cls = RelationMessage
+        elif msgtype == "I":
+            cls = InsertMessage
+        elif msgtype == "U":
+            cls = UpdateMessage
+        elif msgtype == "D":
+            cls = DeleteMessage
+        else:
+            raise Exception("Unknown message type %s", msgtype)
+
+        return super(ReplicationMessage, cls).__new__(cls)
+
+    def __init__(self, row):
+        self.lsn = row[0]
+        self.xid = row[1]
+        self.msg = row[2]
+
+    @property
+    def message_type(self):
+        return self.msg[0]
+
+    @property
+    def message(self):
+        return None
+
+    def __repr__(self):
+        return repr(self.message)
+
+    def parse_tuple(self, msg):
+        assert msg.read(1) == "T"
+        numcols = struct.unpack("!H", msg.read(2))[0]
+
+        cols = []
+        for i in xrange(0, numcols):
+            typ = msg.read(1)
+            if typ == 'n':
+                cols.append(None)
+            elif typ == 'u':
+                cols.append(UnchangedField())
+            else:
+                assert typ in ('i','b','t') #typ should be 'i'nternal-binary, 'b'inary, 't'ext
+                datalen = struct.unpack("!I", msg.read(4))[0]
+                cols.append(msg.read(datalen))
+
+        return cols
+
+class ChangeMessage(ReplicationMessage):
+    pass
+
+class TransactionMessage(ReplicationMessage):
+    pass
+
+class StartupMessage(ReplicationMessage):
+    @property
+    def message(self):
+        res = {"type": "S"}
+
+        msg = StringIO(self.msg)
+        msg.read(1) # 'S'
+        res['startup_msg_version'] = struct.unpack("b", msg.read(1))[0]
+        # Now split the null-terminated k/v strings
+        # and store as a dict, since we don't care about order.
+        params = {}
+        while True:
+            k = readcstr(msg)
+            if k is None:
+                break;
+            v = readcstr(msg)
+            if (v is None):
+                raise ValueError("Value for key %s missing, read key as last entry" % k)
+            params[k] = v
+        res['params'] = params
+
+        return res
+
+class BeginMessage(TransactionMessage):
+    @property
+    def message(self):
+        res = {"type": "B"}
+
+        msg = StringIO(self.msg)
+        msg.read(1) # 'B'
+        msg.read(1) # flags
+
+        lsn, time, xid = struct.unpack("!QQI", msg.read(20))
+        res['final_lsn'] = lsn
+        res['timestamp'] = datetime.datetime.fromtimestamp(time)
+        res['xid'] = xid
+
+        return res
+
+class OriginMessage(ReplicationMessage):
+    @property
+    def message(self):
+        res = {"type": "O"}
+
+        msg = StringIO(self.msg)
+        msg.read(1) # 'O'
+        msg.read(1) # flags
+
+        origin_lsn, namelen = struct.unpack("!QB", msg.read(9))
+        res['origin_lsn'] = origin_lsn
+        res['origin_name'] = msg.read(namelen)
+
+        return res
+
+class RelationMessage(ReplicationMessage):
+    @property
+    def message(self):
+        res = {"type": "R"}
+
+        msg = StringIO(self.msg)
+        msg.read(1) # 'R'
+        msg.read(1) # flags
+
+        relid, namelen = struct.unpack("!IB", msg.read(5))
+        res['relid'] = relid
+        res['namespace'] = msg.read(namelen)
+        namelen = struct.unpack("B", msg.read(1))[0]
+        res['relation'] = msg.read(namelen)
+
+        assert msg.read(1) == "A" # attributes
+        numcols = struct.unpack("!H", msg.read(2))[0]
+
+        cols = []
+        for i in xrange(0, numcols):
+            assert msg.read(1) == "C" # column
+            msg.read(1) # flags
+            assert msg.read(1) == "N" # name
+
+            namelen = struct.unpack("!H", msg.read(2))[0]
+            cols.append(msg.read(namelen))
+
+        res["columns"] = cols
+
+        return res
+
+class CommitMessage(TransactionMessage):
+    @property
+    def message(self):
+        res = {"type": "C"}
+
+        msg = StringIO(self.msg)
+        msg.read(1) # 'C'
+        msg.read(1) # flags
+
+        commit_lsn, end_lsn, time = struct.unpack("!QQQ", msg.read(24))
+        res['commit_lsn'] = commit_lsn
+        res['end_lsn'] = end_lsn
+        res['timestamp'] = datetime.datetime.fromtimestamp(time)
+
+        return res
+
+class InsertMessage(ChangeMessage):
+    @property
+    def message(self):
+        res = {"type": "I"}
+
+        msg = StringIO(self.msg)
+        msg.read(1) # 'I'
+        msg.read(1) # flags
+
+        res["relid"] = struct.unpack("!I", msg.read(4))[0]
+
+        assert msg.read(1) == "N"
+        res["newtup"] = self.parse_tuple(msg)
+
+        return res
+
+class UpdateMessage(ChangeMessage):
+    @property
+    def message(self):
+        res = {"type": "U"}
+
+        msg = StringIO(self.msg)
+        msg.read(1) # 'I'
+        msg.read(1) # flags
+
+        res["relid"] = struct.unpack("!I", msg.read(4))[0]
+
+        tuptyp = msg.read(1)
+        if tuptyp == "K":
+            res["keytup"] = self.parse_tuple(msg)
+            tuptyp = msg.read(1)
+
+        tuptyp == "N"
+        res["newtup"] = self.parse_tuple(msg)
+
+        return res
+
+class DeleteMessage(ChangeMessage):
+    @property
+    def message(self):
+        res = {"type": "D"}
+
+        msg = StringIO(self.msg)
+        msg.read(1) # 'I'
+        msg.read(1) # flags
+
+        res["relid"] = struct.unpack("!I", msg.read(4))[0]
+
+        assert msg.read(1) == "K"
+        res["keytup"] = self.parse_tuple(msg)
+
+        return res
+
diff --git a/contrib/pglogical_output/test/pglogical_protoreader.py b/contrib/pglogical_output/test/pglogical_protoreader.py
new file mode 100644
index 0000000..5d570e0
--- /dev/null
+++ b/contrib/pglogical_output/test/pglogical_protoreader.py
@@ -0,0 +1,112 @@
+import collections
+import logging
+
+class ProtocolReader(collections.Iterator):
+    """
+    A protocol generator wrapper that can validate a message before returning
+    it and has helpers for reading different message types.
+
+    The underlying message generator is stored as the message_generator
+    member, but you shouldn't consume any messages from it directly, since
+    that'll break validation if enabled.
+    """
+
+    startup_params = None
+
+    def __init__(self, message_generator, validator=None, tester=None, parentlogger=logging.getLogger('base')):
+        """
+        Build a protocol reader to wrap the passed message_generator, which
+        must return a ReplicationMessage instance when next() is called.
+
+        A validator class may be provided. If supplied, it must have
+        a validate(...) method taking a ReplicationMessage instance as
+        an argument. It should throw exceptions if it sees things it
+        doesn't like.
+
+        A tester class may be provided. This class should be an instance
+        of unittest.TestCase. If provided, unittest assertions are used
+        to check message types, etc.
+        """
+        self.logger = parentlogger.getChild(self.__class__.__name__)
+        self.message_generator = message_generator
+        self.validator = validator
+        self.tester = tester
+
+    def next(self):
+        """Validating for generator"""
+        msg = self.message_generator.next()
+        if self.validator:
+            self.validator.validate(msg)
+        return msg
+
+    def expect_msg(self, msgtype):
+        """Read a message and check it's type char is as specified"""
+        m = self.next()
+        # this is ugly, better suggestions welcome:
+        try:
+            if self.tester:
+                self.tester.assertEqual(m.message_type, msgtype)
+            elif m.message_type <> msgtype:
+                raise ValueError("Expected message %s but got %s", msgtype, m.message_type)
+        except Exception, ex:
+            self.logger.debug("Expecting %s msg, got %s, unexpected message was: %s", msgtype, m.message_type, m)
+            raise
+        return m
+
+    def expect_startup(self):
+        """Get startup message and return the message and params objects"""
+        m = self.expect_msg('S')
+        # this is ugly, better suggestions welcome:
+        if self.tester:
+            self.tester.assertEquals(m.message['startup_msg_version'], 1)
+        elif m.message['startup_msg_version'] <> 1:
+            raise ValueError("Expected startup_msg_version 1, got %s", m.message['startup_msg_version'])
+        self.startup_params = m.message['params']
+        return (m, self.startup_params)
+
+    def expect_begin(self):
+        """Read a message and ensure it's a begin"""
+        return self.expect_msg('B')
+
+    def expect_row_meta(self):
+        """Read a message and ensure it's a rowmeta message"""
+        return self.expect_msg('R')
+
+    def expect_commit(self):
+        """Read a message and ensure it's a commit"""
+        return self.expect_msg('C')
+
+    def expect_insert(self):
+        """Read a message and ensure it's an insert"""
+        return self.expect_msg('I')
+
+    def expect_update(self):
+        """Read a message and ensure it's an update"""
+        return self.expect_msg('U')
+
+    def expect_delete(self):
+        """Read a message and ensure it's a delete"""
+        return self.expect_msg('D')
+
+    def expect_origin(self):
+        """
+        Read a message and ensure it's a replication origin message.
+        """
+        return self.expect_msg('O')
+
+    def maybe_expect_origin(self):
+        """
+        If the upstream sends replication origins, read one, otherwise
+        do nothing and return None.
+
+        If the upstream is 9.4 then it'll always send replication origin
+        messages. For other upstreams they're sent only if enabled.
+
+        Requires that the startup message was read with expect_startup(..)
+        """
+        if self.startup_params is None:
+            raise ValueError("Startup message was not read with expect_startup()")
+        if self.startup_params['forward_changeset_origins'] == 't':
+            return self.expect_origin()
+        else:
+            return None
diff --git a/contrib/pglogical_output/test/test_basic.py b/contrib/pglogical_output/test/test_basic.py
new file mode 100644
index 0000000..f4e86c6
--- /dev/null
+++ b/contrib/pglogical_output/test/test_basic.py
@@ -0,0 +1,89 @@
+import random
+import string
+import unittest
+from base import PGLogicalOutputTest
+
+class BasicTest(PGLogicalOutputTest):
+    def rand_string(self, length):
+        return ''.join([random.choice(string.ascii_letters + string.digits) for n in xrange(length)])
+
+    def setUp(self):
+        PGLogicalOutputTest.setUp(self)
+        cur = self.conn.cursor()
+        cur.execute("DROP TABLE IF EXISTS test_changes;")
+        cur.execute("CREATE TABLE test_changes (cola serial PRIMARY KEY, colb timestamptz default now(), colc text);")
+        self.conn.commit()
+        self.connect_decoding()
+
+    def tearDown(self):
+        cur = self.conn.cursor()
+        cur.execute("DROP TABLE test_changes;")
+        self.conn.commit()
+        PGLogicalOutputTest.tearDown(self)
+
+    def test_changes(self):
+        cur = self.conn.cursor()
+        cur.execute("INSERT INTO test_changes(colb, colc) VALUES(%s, %s)", ('2015-08-08', 'foobar'))
+        cur.execute("INSERT INTO test_changes(colb, colc) VALUES(%s, %s)", ('2015-08-08', 'bazbar'))
+        self.conn.commit()
+
+        cur.execute("DELETE FROM test_changes WHERE cola = 1")
+        cur.execute("UPDATE test_changes SET colc = 'foobar' WHERE cola = 2")
+        self.conn.commit()
+
+        messages = self.get_changes()
+
+        # Startup msg
+        (m, params) = messages.expect_startup()
+
+        self.assertEquals(params['max_proto_version'], '1')
+        self.assertEquals(params['min_proto_version'], '1')
+
+        if int(params['pg_version_num'])/100 == 904:
+            self.assertEquals(params['forward_changeset_origins'], 'f')
+            self.assertEquals(params['forward_changesets'], 't')
+        else:
+            self.assertEquals(params['forward_changeset_origins'], 'f')
+            self.assertEquals(params['forward_changesets'], 'f')
+
+        anybool = ['t', 'f']
+        self.assertIn(params['binary.bigendian'], anybool)
+        self.assertIn(params['binary.internal_basetypes'], anybool)
+        self.assertIn(params['binary.binary_basetypes'], anybool)
+        self.assertIn(params['binary.float4_byval'], anybool)
+        self.assertIn(params['binary.float8_byval'], anybool)
+        self.assertIn(params['binary.integer_datetimes'], anybool)
+        self.assertIn(params['binary.maxalign'], ['4', '8'])
+        self.assertIn(params['binary.sizeof_int'], ['4', '8'])
+        self.assertIn(params['binary.sizeof_long'], ['4', '8'])
+
+        self.assertIn("encoding", params)
+        self.assertEquals(params['coltypes'], 'f')
+
+        self.assertIn('pg_catversion', params)
+        self.assertIn('pg_version', params)
+        self.assertIn('pg_version_num', params)
+
+        # two inserts in one tx
+        messages.expect_begin()
+        messages.expect_row_meta()
+        m = messages.expect_insert()
+        self.assertEqual(m.message['newtup'][2], 'foobar\0')
+        messages.expect_row_meta()
+        m = messages.expect_insert()
+        self.assertEqual(m.message['newtup'][2], 'bazbar\0')
+        messages.expect_commit()
+
+        # delete and update in one tx
+        messages.expect_begin()
+        messages.expect_row_meta()
+        m = messages.expect_delete()
+        self.assertEqual(m.message['keytup'][0], '1\0')
+        messages.expect_row_meta()
+        m = messages.expect_update()
+        self.assertEqual(m.message['newtup'][0], '2\0')
+        self.assertEqual(m.message['newtup'][2], 'foobar\0')
+        messages.expect_commit()
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/contrib/pglogical_output/test/test_binary_mode.py b/contrib/pglogical_output/test/test_binary_mode.py
new file mode 100644
index 0000000..da3b11f
--- /dev/null
+++ b/contrib/pglogical_output/test/test_binary_mode.py
@@ -0,0 +1,172 @@
+import random
+import string
+import unittest
+from base import PGLogicalOutputTest
+
+class BinaryModeTest(PGLogicalOutputTest):
+
+    def setUp(self):
+        PGLogicalOutputTest.setUp(self)
+        cur = self.conn.cursor()
+        cur.execute("DROP TABLE IF EXISTS test_binary;")
+        cur.execute("CREATE TABLE test_binary (colv bytea, colts timestamp);")
+        self.conn.commit()
+        self.connect_decoding()
+
+    def tearDown(self):
+        cur = self.conn.cursor()
+        cur.execute("DROP TABLE IF EXISTS test_binary;")
+        self.conn.commit()
+        PGLogicalOutputTest.tearDown(self)
+
+    def probe_for_server_params(self):
+        cur = self.conn.cursor()
+
+        # Execute a dummy transaction so we have something to decode
+        cur.execute("INSERT INTO test_binary values (decode('ff','hex'), NULL);")
+        self.conn.commit()
+
+        # Make a connection to decode the dummy tx. We're just doing this
+        # so we can capture the startup response from the server, then
+        # we'll disconnect and reconnect with the binary settings captured
+        # from the server to ensure we make a request that'll get binary
+        # mode enabled.
+        messages = self.get_changes()
+
+        (m, params) = messages.expect_startup()
+
+        # Check we got everything we expected from the startup params
+        expected_params = ['pg_version_num', 'binary.bigendian',
+                'binary.sizeof_datum', 'binary.sizeof_int',
+                'binary.sizeof_long', 'binary.float4_byval',
+                'binary.float8_byval', 'binary.integer_datetimes',
+                'binary.internal_basetypes', 'binary.binary_basetypes',
+                'binary.basetypes_major_version']
+        for pn in expected_params:
+            self.assertTrue(pn in params, msg="Expected startup msg param binary.basetypes_major_version absent")
+
+        self.assertEquals(int(params['pg_version_num'])/100,
+                int(params['binary.basetypes_major_version']),
+                msg="pg_version_num/100 <> binary.basetypes_major_version")
+
+        # We didn't ask for it, so binary and send/recv must be disabled
+        self.assertEquals(params['binary.internal_basetypes'], 'f')
+        self.assertEquals(params['binary.binary_basetypes'], 'f')
+
+        # Read and discard the fields of our dummy tx
+        messages.expect_begin()
+        messages.expect_row_meta()
+        messages.expect_insert()
+        messages.expect_commit()
+
+        cur.close()
+
+        # We have to disconnect and reconnect if using walsender so that
+        # we can start a new replication session
+        self.logger.debug("before: Interface is %s", self.interface)
+        self.reconnect_decoding()
+        self.logger.debug("after: Interface is %s", self.interface)
+
+        return params
+
+    def test_binary_mode(self):
+        params = self.probe_for_server_params()
+        major_version = int(params['pg_version_num'])/100
+
+        cur = self.conn.cursor()
+
+        # Now that we know the server's parameters, do another transaction and
+        # decode it in binary mode using the format we know the server speaks.
+        cur.execute("INSERT INTO test_binary values (decode('aa','hex'), TIMESTAMP '2000-01-02 12:34:56');")
+        self.conn.commit()
+
+        messages = self.get_changes({
+            'binary.want_internal_basetypes': 't',
+            'binary.want_binary_basetypes': 't',
+            'binary.basetypes_major_version': str(major_version),
+            'binary.bigendian' : params['binary.bigendian'],
+            'binary.sizeof_datum' : params['binary.sizeof_datum'],
+            'binary.sizeof_int' : params['binary.sizeof_int'],
+            'binary.sizeof_long' : params['binary.sizeof_long'],
+            'binary.float4_byval' : params['binary.float4_byval'],
+            'binary.float8_byval' : params['binary.float8_byval'],
+            'binary.integer_datetimes' : params['binary.integer_datetimes']
+            })
+
+        # Decode the startup message
+        (m, params) = messages.expect_startup()
+        # Binary mode should be enabled since we sent the params the server wants
+        self.assertEquals(params['binary.internal_basetypes'], 't')
+        # send/recv mode is implied by binary mode
+        self.assertEquals(params['binary.binary_basetypes'], 't')
+
+        # Decode the transaction we sent
+        messages.expect_begin()
+        messages.expect_row_meta()
+        m = messages.expect_insert()
+
+        # and verify that the message fields are in the expected binary representation
+        self.assertEqual(m.message['newtup'][0], '\x05\xaa')
+        # FIXME this is probably wrong on bigendian
+        self.assertEqual(m.message['newtup'][1], '\x00\x7c\xb1\xa9\x1e\x00\x00\x00')
+
+        messages.expect_commit()
+
+    def test_sendrecv_mode(self):
+        params = self.probe_for_server_params()
+        major_version = int(params['pg_version_num'])/100
+
+        cur = self.conn.cursor()
+
+        # Now that we know the server's parameters, do another transaction and
+        # decode it in binary mode using the format we know the server speaks.
+        cur.execute("INSERT INTO test_binary values (decode('aa','hex'), TIMESTAMP '2000-01-02 12:34:56');")
+        self.conn.commit()
+
+        # Send options that don't match the server's binary mode, so it falls
+        # back to send/recv even though we requested binary too.
+        if int(params['binary.sizeof_long']) == 8:
+            want_sizeof_long = 4
+        elif int(params['binary.sizeof_long']) == 4:
+            want_sizeof_long = 8
+        else:
+            self.fail("What platform has sizeof(long) == %s anyway?" % params['binary.sizeof_long'])
+
+        messages = self.get_changes({
+            # Request binary even though we know we won't get it
+            'binary.want_internal_basetypes': 't',
+            # and expect to fall back to send/recv
+            'binary.want_binary_basetypes': 't',
+            'binary.basetypes_major_version': str(major_version),
+            'binary.bigendian' : params['binary.bigendian'],
+            'binary.sizeof_datum' : params['binary.sizeof_datum'],
+            'binary.sizeof_int' : params['binary.sizeof_int'],
+            'binary.sizeof_long' : want_sizeof_long,
+            'binary.float4_byval' : params['binary.float4_byval'],
+            'binary.float8_byval' : params['binary.float8_byval'],
+            'binary.integer_datetimes' : params['binary.integer_datetimes']
+            })
+
+        # Decode the startup message
+        (m, params) = messages.expect_startup()
+        # Binary mode should be disabled because we aren't compatible
+        self.assertEquals(params['binary.internal_basetypes'], 'f')
+        # send/recv mode should be on, since we're compatible with the same
+        # major version.
+        self.assertEquals(params['binary.binary_basetypes'], 't')
+
+        # Decode the transaction we sent
+        messages.expect_begin()
+        messages.expect_row_meta()
+        m = messages.expect_insert()
+
+        # and verify that the message fields are in the expected send/recv representation
+        # The text field lacks the length prefix
+        self.assertEqual(m.message['newtup'][0], '\xaa')
+        # and the timestamp is in network byte order
+        self.assertEqual(m.message['newtup'][1], '\x00\x00\x00\x1e\xa9\xb1\x7c\x00')
+
+        messages.expect_commit()
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/contrib/pglogical_output/test/test_filter.py b/contrib/pglogical_output/test/test_filter.py
new file mode 100644
index 0000000..fce827b
--- /dev/null
+++ b/contrib/pglogical_output/test/test_filter.py
@@ -0,0 +1,182 @@
+import random
+import string
+import unittest
+import pprint
+from base import PGLogicalOutputTest
+
+class FilterTest(PGLogicalOutputTest):
+    def rand_string(self, length):
+        return ''.join([random.choice(string.ascii_letters + string.digits) for n in xrange(length)])
+
+    def set_up(self):
+        cur = self.conn.cursor()
+        cur.execute("DROP EXTENSION IF EXISTS pglogical_output_plhooks;")
+        cur.execute("DROP TABLE IF EXISTS test_changes, test_changes_filter;")
+        cur.execute("DROP FUNCTION IF EXISTS test_filter(regclass, \"char\", text)")
+        cur.execute("DROP FUNCTION IF EXISTS test_action_filter(regclass, \"char\", text)")
+        cur.execute("CREATE TABLE test_changes (cola serial PRIMARY KEY, colb timestamptz default now(), colc text);")
+        cur.execute("CREATE TABLE test_changes_filter (cola serial PRIMARY KEY, colb timestamptz default now(), colc text);")
+        cur.execute("CREATE EXTENSION pglogical_output_plhooks;")
+
+
+        # Filter function that filters out (removes) all changes
+        # in tables named *_filter*
+        cur.execute("""
+            CREATE FUNCTION test_filter(relid regclass, action "char", nodeid text)
+            returns bool stable language plpgsql AS $$
+            BEGIN
+                IF nodeid <> 'foo' THEN
+                    RAISE EXCEPTION 'Expected nodeid <foo>, got <%>',nodeid;
+                END IF;
+                RETURN relid::regclass::text NOT LIKE '%_filter%';
+            END
+            $$;
+            """)
+
+        # function to filter out Deletes and Updates - Only Inserts pass through
+        cur.execute("""
+            CREATE FUNCTION test_action_filter(relid regclass, action "char", nodeid text)
+            returns bool stable language plpgsql AS $$
+            BEGIN
+                RETURN action NOT IN ('U', 'D');
+            END
+            $$;
+            """)
+
+        self.conn.commit()
+        self.connect_decoding()
+
+    def tear_down(self):
+        cur = self.conn.cursor()
+        cur.execute("DROP TABLE test_changes, test_changes_filter;")
+        cur.execute("DROP FUNCTION test_filter(regclass, \"char\", text)");
+        cur.execute("DROP FUNCTION test_action_filter(regclass, \"char\", text)");
+        cur.execute("DROP EXTENSION pglogical_output_plhooks;")
+        self.conn.commit()
+
+    def exec_changes(self):
+        """Execute a stream of changes we can process via various filters"""
+        cur = self.conn.cursor()
+        cur.execute("INSERT INTO test_changes(colb, colc) VALUES(%s, %s)", ('2015-08-08', 'foobar'))
+        cur.execute("INSERT INTO test_changes_filter(colb, colc) VALUES(%s, %s)", ('2015-08-08', 'foobar'))
+        cur.execute("INSERT INTO test_changes(colb, colc) VALUES(%s, %s)", ('2015-08-08', 'bazbar'))
+        self.conn.commit()
+
+        cur.execute("INSERT INTO test_changes_filter(colb, colc) VALUES(%s, %s)", ('2015-08-08', 'bazbar'))
+        self.conn.commit()
+
+        cur.execute("UPDATE test_changes set colc = 'oobar' where cola=1")
+        cur.execute("UPDATE test_changes_filter set colc = 'oobar' where cola=1")
+        self.conn.commit()
+
+        cur.execute("DELETE FROM test_changes where cola=2")
+        cur.execute("DELETE FROM test_changes_filter where cola=2")
+        self.conn.commit()
+
+
+    def test_filter(self):
+        self.exec_changes();
+
+        params = {
+                'hooks.setup_function': 'public.pglo_plhooks_setup_fn',
+                'pglo_plhooks.row_filter_hook': 'public.test_filter',
+                'pglo_plhooks.client_hook_arg': 'foo'
+                }
+
+        messages = self.get_changes(params)
+
+        (m, params) = messages.expect_startup()
+
+        self.assertIn('hooks.row_filter_enabled', m.message['params'])
+        self.assertEquals(m.message['params']['hooks.row_filter_enabled'], 't')
+
+        # two inserts into test_changes, the test_changes_filter insert is filtered out
+        messages.expect_begin()
+        messages.expect_row_meta()
+        m = messages.expect_insert()
+        self.assertEqual(m.message['newtup'][2], 'foobar\0')
+        messages.expect_row_meta()
+        m = messages.expect_insert()
+        self.assertEqual(m.message['newtup'][2], 'bazbar\0')
+        messages.expect_commit()
+
+        # just an empty tx as the  test_changes_filter insert is filtered out
+        messages.expect_begin()
+        messages.expect_commit()
+
+        # 1 update each into test_changes and test_changes_filter
+        # update of test_changes_filter is filtered out
+        messages.expect_begin()
+        messages.expect_row_meta()
+        m = messages.expect_update()
+        self.assertEqual(m.message['newtup'][0], '1\0')
+        self.assertEqual(m.message['newtup'][2], 'oobar\0')
+        messages.expect_commit()
+
+        # 1 delete each into test_changes and test_changes_filter
+        # delete of test_changes_filter is filtered out
+        messages.expect_begin()
+        messages.expect_row_meta()
+        m = messages.expect_delete()
+        self.assertEqual(m.message['keytup'][0], '2\0')
+        messages.expect_commit()
+
+    def test_action_filter(self):
+        self.exec_changes();
+
+        params = {
+                'hooks.setup_function': 'public.pglo_plhooks_setup_fn',
+                'pglo_plhooks.row_filter_hook': 'public.test_action_filter'
+                }
+
+        messages = self.get_changes(params)
+
+        (m, params) = messages.expect_startup()
+
+        self.assertIn('hooks.row_filter_enabled', params)
+        self.assertEquals(params['hooks.row_filter_enabled'], 't')
+
+        # two inserts into test_changes, the test_changes_filter insert is filtered out
+        messages.expect_begin()
+        messages.expect_row_meta()
+        m = messages.expect_insert()
+        self.assertEqual(m.message['newtup'][2], 'foobar\0')
+        messages.expect_row_meta()
+        m = messages.expect_insert()
+        self.assertEqual(m.message['newtup'][2], 'foobar\0')
+        messages.expect_row_meta()
+        m = messages.expect_insert()
+        self.assertEqual(m.message['newtup'][2], 'bazbar\0')
+        messages.expect_commit()
+
+        messages.expect_begin()
+        messages.expect_row_meta()
+        m = messages.expect_insert()
+        self.assertEqual(m.message['newtup'][2], 'bazbar\0')
+        messages.expect_commit()
+
+        # just empty tx as updates are filtered out
+        messages.expect_begin()
+        messages.expect_commit()
+
+        # just empty tx as deletes are filtered out
+        messages.expect_begin()
+        messages.expect_commit()
+
+#       def test_hooks(self):
+#           params = {
+#                   'hooks.setup_function', 'public.pglo_plhooks_setup_fn',
+#                   'pglo_plhooks.startup_hook', 'pglo_plhooks_demo_startup',
+#                   'pglo_plhooks.row_filter_hook', 'pglo_plhooks_demo_row_filter',
+#                   'pglo_plhooks.txn_filter_hook', 'pglo_plhooks_demo_txn_filter',
+#                   'pglo_plhooks.shutdown_hook', 'pglo_plhooks_demo_shutdown',
+#                   'pglo_plhooks.client_hook_arg', 'test data'
+#                   }
+#
+    def test_validation(self):
+        with self.assertRaises(Exception):
+            self.get_changes({'hooks.row_filter': 'public.foobar'}).next()
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/contrib/pglogical_output/test/test_parameters.py b/contrib/pglogical_output/test/test_parameters.py
new file mode 100644
index 0000000..9cfe4e1
--- /dev/null
+++ b/contrib/pglogical_output/test/test_parameters.py
@@ -0,0 +1,80 @@
+import random
+import string
+import unittest
+from base import PGLogicalOutputTest
+import psycopg2
+
+class ParametersTest(PGLogicalOutputTest):
+
+    def setUp(self):
+        PGLogicalOutputTest.setUp(self)
+        self.cur.execute("DROP TABLE IF EXISTS blah;")
+        self.cur.execute("CREATE TABLE blah(id integer);")
+        self.conn.commit()
+        self.connect_decoding()
+
+    def tearDown(self):
+        self.cur.execute("DROP TABLE blah;")
+        self.conn.commit()
+        PGLogicalOutputTest.tearDown(self)
+
+    def test_protoversion(self):
+        with self.assertRaises(psycopg2.DatabaseError):
+            list(self.get_changes({'startup_params_format': 'borkbork'}))
+
+        with self.assertRaises(psycopg2.DatabaseError):
+            list(self.get_changes({'startup_params_format': '2'}))
+
+        with self.assertRaises(psycopg2.DatabaseError):
+            list(self.get_changes({'startup_params_format': None}))
+
+        with self.assertRaises(psycopg2.DatabaseError):
+            list(self.get_changes({'max_proto_version': None}))
+
+        with self.assertRaises(psycopg2.DatabaseError):
+            list(self.get_changes({'min_proto_version': None}))
+
+        with self.assertRaises(psycopg2.DatabaseError):
+            list(self.get_changes({'min_proto_version': '2'}))
+
+        with self.assertRaises(psycopg2.DatabaseError):
+            list(self.get_changes({'max_proto_version': '0'}))
+
+        with self.assertRaises(psycopg2.DatabaseError):
+            list(self.get_changes({'max_proto_version': 'borkbork'}))
+
+    def test_unknown_params(self):
+        # Should get ignored
+        self.do_dummy_tx()
+        self.get_startup_msg(self.get_changes({'unknown_parameter': 'unknown'}))
+
+    def test_unknown_params(self):
+        # Should get ignored
+        self.do_dummy_tx()
+        self.get_startup_msg(self.get_changes({'unknown.some_param': 'unknown'}))
+
+    def test_encoding_missing(self):
+        # Should be ignored, server should send reply params
+        messages = self.get_changes({'expected_encoding': None})
+
+    def test_encoding_bogus(self):
+        with self.assertRaises(psycopg2.DatabaseError):
+            list(self.get_changes({'expected_encoding': 'gobblegobble'}))
+
+    def test_encoding_differs(self):
+        with self.assertRaises(psycopg2.DatabaseError):
+            list(self.get_changes({'expected_encoding': 'LATIN-1'}))
+
+    def do_dummy_tx(self):
+        """force a dummy tx so there's something to decode"""
+        self.cur.execute("INSERT INTO blah(id) VALUES (1)")
+        self.conn.commit()
+
+    def get_startup_msg(self, messages):
+        """Read and return the startup message"""
+        m = messages.next()
+        self.assertEqual(m.message_type, 'S')
+        return m
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/contrib/pglogical_output/test/test_replication_origin.py b/contrib/pglogical_output/test/test_replication_origin.py
new file mode 100644
index 0000000..d65b5d0
--- /dev/null
+++ b/contrib/pglogical_output/test/test_replication_origin.py
@@ -0,0 +1,327 @@
+import random
+import string
+import unittest
+from base import PGLogicalOutputTest
+
+class ReplicationOriginTest(PGLogicalOutputTest):
+    """
+    Tests for handling of changeset forwarding and replication origins.
+
+    These tests have to deal with a bit of a wrinkle: if the decoding plugin
+    is running in PostgreSQL 9.4 the lack of replication origins support means
+    that we cannot send replication origin information, and we always forward
+    transactions from other peer nodes. So it produces output you can't get
+    from 9.5+: all changesets forwarded, but without origin information.
+
+    9.4 also lacks the functions for setting up replication origins, so we
+    have to special-case that.
+    """
+
+    fake_upstream_origin_name = "pglogical_test_fake_upstream";
+    fake_xact_lsn = '14/abcdef0'
+    fake_xact_timestamp = '2015-10-08 12:13:14.1234'
+
+    def setUp(self):
+        PGLogicalOutputTest.setUp(self)
+        cur = self.conn.cursor()
+        cur.execute("DROP TABLE IF EXISTS test_origin;")
+        if self.conn.server_version/100 != 904:
+            cur.execute("""
+                SELECT pg_replication_origin_drop(%s)
+                FROM pg_replication_origin
+                WHERE roname = %s;
+                """,
+                (self.fake_upstream_origin_name, self.fake_upstream_origin_name))
+        self.conn.commit()
+
+        cur.execute("CREATE TABLE test_origin (cola serial PRIMARY KEY, colb timestamptz default now(), colc text);")
+        self.conn.commit()
+
+        self.is95plus = self.conn.server_version/100 > 904
+
+        if self.is95plus:
+            # Create the replication origin for the fake remote node
+            cur.execute("SELECT pg_replication_origin_create(%s);", (self.fake_upstream_origin_name,))
+            self.conn.commit()
+
+            # Ensure that commit timestamps are enabled.
+            cur.execute("SHOW track_commit_timestamp");
+            if cur.fetchone()[0] != 'on':
+                raise ValueError("This test requires track_commit_timestamp to be on")
+            self.conn.commit()
+
+        self.connect_decoding()
+
+    def tearDown(self):
+        cur = self.conn.cursor()
+        cur.execute("DROP TABLE IF EXISTS test_origin;")
+        self.conn.commit()
+        self.teardown_replication_session_origin(cur);
+        self.conn.commit()
+        PGLogicalOutputTest.tearDown(self)
+
+    def setup_replication_session_origin(self, cur):
+        """Sets session-level replication origin info. Ignored on 9.4."""
+        if self.conn.get_transaction_status() != 0:
+            raise ValueError("Transaction open or aborted, expected no open transaction")
+
+        if self.is95plus:
+            # Set our session up so it appears to be replaying from the nonexistent remote node
+            cur.execute("SELECT pg_replication_origin_session_setup(%s);", (self.fake_upstream_origin_name,))
+            self.conn.commit()
+
+    def setup_xact_origin(self, cur, origin_lsn, origin_commit_timestamp):
+        """Sets transaction-level replication origin info. Ignored on 9.4. Implicitly begins a tx."""
+        if self.conn.get_transaction_status() != 0:
+            raise ValueError("Transaction open or aborted, expected no open transaction")
+
+        if self.is95plus:
+            # Run transactions that seem to come from the remote node
+            cur.execute("SELECT pg_replication_origin_xact_setup(%s, %s);", (origin_lsn, origin_commit_timestamp))
+
+    def reset_replication_session_origin(self, cur):
+        """
+        Reset session's replication origin setup to defaults.
+
+        Always executes an empty transaction on 9.5+; does nothing
+        on 9.4.
+        """
+        if self.conn.get_transaction_status() != 0:
+            raise ValueError("Transaction open or aborted, expected no open transaction")
+        if self.is95plus:
+            cur.execute("SELECT pg_replication_origin_session_reset();")
+            self.conn.commit()
+
+    def teardown_replication_session_origin(self, cur):
+        if self.conn.get_transaction_status() != 0:
+            raise ValueError("Transaction open or aborted, expected no open transaction")
+        if self.is95plus:
+            cur.execute("SELECT pg_replication_origin_session_is_setup()")
+            if cur.fetchone()[0] == 't':
+                cur.execute("SELECT pg_replication_origin_session_reset();")
+            self.conn.commit()
+            cur.execute("SELECT pg_replication_origin_drop(%s);", (self.fake_upstream_origin_name,))
+            self.conn.commit()
+
+    def expect_origin_progress(self, cur, lsn):
+        if self.is95plus:
+            initialtxstate = self.conn.get_transaction_status()
+            if initialtxstate not in (0,2):
+                raise ValueError("Expected open valid tx or no tx")
+            cur.execute("SELECT local_id, external_id, remote_lsn FROM pg_show_replication_origin_status()")
+            if lsn is not None:
+                (local_id, external_id, remote_lsn) = cur.fetchone()
+                self.assertEquals(local_id, 1)
+                self.assertEquals(external_id, self.fake_upstream_origin_name)
+                self.assertEquals(remote_lsn.lower(), lsn.lower())
+            self.assertIsNone(cur.fetchone(), msg="Expected only one replication origin to exist")
+            if initialtxstate == 0:
+                self.conn.commit()
+
+    def run_test_transactions(self, cur):
+        """
+        Run a set of transactions with and without a replication origin set.
+
+        This simulates a mix of local transactions and remotely-originated
+        transactions being applied by a pglogical downstream or some other
+        replication-origin aware replication agent.
+
+        On 9.5+ the with-origin transaction simulates what the apply side of a
+        logical replication downstream would do by setting replication origin
+        information on the session and transaction. So to the server it's just
+        like this transaction was forwarded from another node.
+
+        On 9.4 it runs like a locally originated tx because 9.4 lacks origins
+        support.
+
+        All the tests will decode the same series of transactions, but with
+        different connection settings.
+        """
+
+        self.expect_origin_progress(cur, None)
+
+        # Some locally originated tx's for data we'll then modify
+        cur.execute("INSERT INTO test_origin(colb, colc) VALUES(%s, %s)", ('2015-10-08', 'foobar'))
+        self.assertEquals(cur.rowcount, 1)
+        cur.execute("INSERT INTO test_origin(colb, colc) VALUES(%s, %s)", ('2015-10-08', 'bazbar'))
+        self.assertEquals(cur.rowcount, 1)
+        self.conn.commit()
+
+        self.expect_origin_progress(cur, None)
+
+        # Now the fake remotely-originated tx
+        self.setup_replication_session_origin(cur)
+        self.setup_xact_origin(cur, self.fake_xact_lsn, self.fake_xact_timestamp)
+        # Some remotely originated inserts
+        cur.execute("INSERT INTO test_origin(colb, colc) VALUES(%s, %s)", ('2016-10-08', 'fakeor'))
+        self.assertEquals(cur.rowcount, 1)
+        cur.execute("INSERT INTO test_origin(colb, colc) VALUES(%s, %s)", ('2016-10-08', 'igin'))
+        self.assertEquals(cur.rowcount, 1)
+        # Delete a tuple that was inserted locally
+        cur.execute("DELETE FROM test_origin WHERE colb = '2015-10-08' and colc = 'foobar'")
+        self.assertEquals(cur.rowcount, 1)
+        # modify a tuple that was inserted locally
+        cur.execute("UPDATE test_origin SET colb = '2016-10-08' where colc = 'bazbar'")
+        self.assertEquals(cur.rowcount, 1)
+        self.conn.commit()
+
+        self.expect_origin_progress(cur, self.fake_xact_lsn)
+
+        # Reset replication origin to return to locally originated tx's
+        self.reset_replication_session_origin(cur)
+
+        self.expect_origin_progress(cur, self.fake_xact_lsn)
+
+        # and finally use a local tx to modify remotely originated transactions
+        # Delete and modify remotely originated tuples
+        cur.execute("DELETE FROM test_origin WHERE colc = 'fakeor'")
+        self.assertEquals(cur.rowcount, 1)
+        cur.execute("UPDATE test_origin SET colb = '2015-10-08' WHERE colc = 'igin'")
+        self.assertEquals(cur.rowcount, 1)
+        # and insert a new row mainly to verify that the origin reset was respected
+        cur.execute("INSERT INTO test_origin(colb, colc) VALUES (%s, %s)", ('2017-10-08', 'blahblah'))
+        self.assertEquals(cur.rowcount, 1)
+        self.conn.commit()
+
+        self.expect_origin_progress(cur, self.fake_xact_lsn)
+
+    def decode_test_transactions(self, messages, expect_origins, expect_forwarding):
+        """
+        Decode the transactions from run_test_transactions, varying the
+        expected output based on whether we've been told we should be getting
+        origin messages, and whether we should be getting forwarded transaction
+        data.
+
+        This intentionally doesn't use maybe_expect_origin() to make sure it's
+        testing what the unit test specifies, not what the server sent in the
+        startup message.
+        """
+        # two inserts in one locally originated tx. No forwarding. Local tx's
+        # never get origins.
+        messages.expect_begin()
+        messages.expect_row_meta()
+        m = messages.expect_insert()
+        messages.expect_row_meta()
+        m = messages.expect_insert()
+        messages.expect_commit()
+
+        # The remotely originated transaction is still replayed when forwarding
+        # is off, but on 9.5+ the data from it is omitted.
+        #
+        # An origin message will be received only if on 9.5+.
+        if expect_forwarding:
+            messages.expect_begin()
+            if expect_origins:
+                messages.expect_origin()
+            # 9.4 forwards unconditionally
+            m = messages.expect_row_meta()
+            m = messages.expect_insert()
+            m = messages.expect_row_meta()
+            m = messages.expect_insert()
+            m = messages.expect_row_meta()
+            m = messages.expect_delete()
+            m = messages.expect_row_meta()
+            m = messages.expect_update()
+            messages.expect_commit()
+
+        # The second locally originated tx modifies the remotely
+        # originated tuples. It's locally originated so no origin
+        # message is sent.
+        messages.expect_begin()
+        m = messages.expect_row_meta()
+        m = messages.expect_delete()
+        m = messages.expect_row_meta()
+        m = messages.expect_update()
+        m = messages.expect_row_meta()
+        m = messages.expect_insert()
+        messages.expect_commit()
+
+    def test_forwarding_not_requested_95plus(self):
+        """
+        For this test case we don't ask for forwarding to be enabled.
+
+        For 9.5+ we should get only transactions originated locally, so any transaction
+        with an origin set will be ignored. No origin info messages will be sent.
+        """
+        cur = self.conn.cursor()
+
+        if not self.is95plus:
+            self.skipTest("Cannot run forwarding-off origins-off test on a PostgreSQL 9.4 server")
+
+        self.run_test_transactions(cur)
+
+        # Forwarding not requested.
+        messages = self.get_changes({'forward_changesets':'f'})
+
+        # Startup msg
+        (m, params) = messages.expect_startup()
+
+        # We should not get origins, ever
+        self.assertEquals(params['forward_changeset_origins'], 'f')
+        # Changeset forwarding off respected by 9.5+
+        self.assertEquals(params['forward_changesets'], 'f')
+
+        # decode, expecting no origins
+        self.decode_test_transactions(messages, False, False)
+
+    # Upstream doesn't send origin correctly yet
+    @unittest.skip("Doesn't work yet")
+    def test_forwarding_requested_95plus(self):
+        """
+        In this test we request that forwarding be enabled. We'll get
+        forwarded transactions and origin messages for them.
+        """
+        cur = self.conn.cursor()
+
+        if not self.is95plus:
+            self.skipTest("Cannot run forwarding-on with-origins test on a PostgreSQL 9.4 server")
+
+        self.run_test_transactions(cur)
+
+        #client requests to forward changesets
+        messages = self.get_changes({'forward_changesets': 't'})
+
+        # Startup msg
+        (m, params) = messages.expect_startup()
+
+        # Changeset forwarding is forced on by 9.4 and was requested
+        # for 9.5+ so should always be on.
+        self.assertEquals(params['forward_changesets'], 't')
+        # 9.5+ will always forward origins if cset forwarding is
+        # requested.
+        self.assertEquals(params['forward_changeset_origins'], 't')
+
+        # Decode, expecting forwarding, and expecting origins unless 9.4
+        self.decode_test_transactions(messages, self.is95plus, True)
+
+    def test_forwarding_not_requested_94(self):
+        """
+        For this test case we don't ask for forwarding to be enabled.
+
+        For 9.4, we should get all transactions, even those that were originated "remotely".
+        9.4 doesn't support replication identifiers so we couldn't tell the server that the
+        tx's were applied from a remote node, and it'd have to way to store that info anyway.
+        """
+        cur = self.conn.cursor()
+
+        if self.is95plus:
+            self.skipTest("9.4-specific test doesn't make sense on this server version")
+
+        self.run_test_transactions(cur)
+
+        # Forwarding not requested.
+        messages = self.get_changes({'forward_changesets':'f'})
+
+        # Startup msg
+        (m, params) = messages.expect_startup()
+
+        # We should not get origins, ever
+        self.assertEquals(params['forward_changeset_origins'], 'f')
+        # Changeset forwarding is forced on by 9.4
+        self.assertEquals(params['forward_changesets'], 't')
+
+        # decode, expecting no origins, and forwarding
+        self.decode_test_transactions(messages, False, True)
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/contrib/pglogical_output/test/test_tuple_fields.py b/contrib/pglogical_output/test/test_tuple_fields.py
new file mode 100644
index 0000000..eb37529
--- /dev/null
+++ b/contrib/pglogical_output/test/test_tuple_fields.py
@@ -0,0 +1,159 @@
+import random
+import string
+import unittest
+from pglogical_proto import UnchangedField
+from base import PGLogicalOutputTest
+
+class TupleFieldsTest(PGLogicalOutputTest):
+
+    def setUp(self):
+        PGLogicalOutputTest.setUp(self)
+        cur = self.conn.cursor()
+        cur.execute("DROP TABLE IF EXISTS test_tuplen;")
+	#teardown after test
+        cur.execute("DROP TABLE IF EXISTS test_text;")
+        cur.execute("DROP TABLE IF EXISTS test_binary;")
+        cur.execute("DROP TABLE IF EXISTS toasttest;")
+
+        cur.execute("CREATE TABLE test_tuplen (cola serial PRIMARY KEY, colb timestamptz default now(), colc text);")
+	cur.execute("CREATE TABLE toasttest(descr text, cnt int DEFAULT 0, f1 text, f2 text);")
+        cur.execute("CREATE TABLE test_text (cola text, colb text);")
+        cur.execute("CREATE TABLE test_binary (colv bytea);")
+
+        self.conn.commit()
+        self.connect_decoding()
+
+    def tearDown(self):
+        cur = self.conn.cursor()
+        cur.execute("DROP TABLE IF EXISTS test_tuplen;")
+	#teardown after test
+        cur.execute("DROP TABLE IF EXISTS test_text;")
+        cur.execute("DROP TABLE IF EXISTS test_binary;")
+        cur.execute("DROP TABLE toasttest;")
+        self.conn.commit()
+
+        PGLogicalOutputTest.tearDown(self)
+
+    def test_null_tuple_field(self):
+        """Make sure null in tuple fields is sent as a 'n' row-value message"""
+        cur = self.conn.cursor()
+
+        cur.execute("INSERT INTO test_tuplen(colb, colc) VALUES('2015-08-08', null)")
+        # testing 'n'ull fields
+        self.conn.commit()
+
+        messages = self.get_changes()
+
+        messages.expect_startup()
+
+        messages.expect_begin()
+        messages.expect_row_meta()
+        m = messages.expect_insert()
+        # 'n'ull is reported as None by the test tuple reader
+        self.assertEqual(m.message['newtup'][2], None)
+        messages.expect_commit()
+
+    def test_unchanged_toasted_tuple_field(self):
+        """
+        Large TOASTed fields are sent as 'u'nchanged if they're from an UPDATE
+        and the UPDATE didn't change the TOASTed field, just other fields in the
+        same tuple.
+        """
+
+        # TODO: A future version should let us force unpacking of TOASTed fields,
+        # see bug #19
+
+        cur = self.conn.cursor()
+
+        cur.execute("INSERT INTO toasttest(descr, f1, f2) VALUES('one-toasted', repeat('1234567890',30000), 'atext');")
+        self.conn.commit()
+
+        # testing 'u'nchanged tuples
+        cur.execute("UPDATE toasttest SET cnt = 2 WHERE descr = 'one-toasted'")
+        self.conn.commit()
+
+        # but make sure they're replicated when actually changed
+        cur.execute("UPDATE toasttest SET cnt = 3, f1 = repeat('0987654321',25000) WHERE descr = 'one-toasted';")
+        self.conn.commit()
+
+
+        messages = self.get_changes()
+
+        # Startup msg
+        messages.expect_startup()
+
+        # consume the insert
+        messages.expect_begin()
+        messages.expect_row_meta()
+        m = messages.expect_insert()
+        self.assertEqual(m.message['newtup'][0], 'one-toasted\0')
+        self.assertEqual(m.message['newtup'][1], '0\0') # default of cnt field
+        self.assertEqual(m.message['newtup'][2], '1234567890'*30000 + '\0')
+        self.assertEqual(m.message['newtup'][3], 'atext\0')
+        messages.expect_commit()
+
+
+        # First UPDATE
+        messages.expect_begin()
+        messages.expect_row_meta()
+        m = messages.expect_update()
+        self.assertEqual(m.message['newtup'][0], 'one-toasted\0')
+        self.assertEqual(m.message['newtup'][1], '2\0')
+        # The big value is TOASTed, and since we didn't change it, it's sent
+        # as an unchanged field marker.
+        self.assertIsInstance(m.message['newtup'][2], UnchangedField)
+        # While unchanged, 'atext' is small enough that it's not stored out of line
+        # so it's written despite being unchanged.
+        self.assertEqual(m.message['newtup'][3], 'atext\0')
+        messages.expect_commit()
+
+
+
+        # Second UPDATE
+        messages.expect_begin()
+        messages.expect_row_meta()
+        m = messages.expect_update()
+        self.assertEqual(m.message['newtup'][0], 'one-toasted\0')
+        self.assertEqual(m.message['newtup'][1], '3\0')
+        # this time we changed the TOASTed field, so it's been sent
+        self.assertEqual(m.message['newtup'][2], '0987654321'*25000 + '\0')
+        self.assertEqual(m.message['newtup'][3], 'atext\0')
+        messages.expect_commit()
+
+
+
+    def test_default_modes(self):
+        cur = self.conn.cursor()
+
+        cur.execute("INSERT INTO test_text(cola, colb) VALUES('sample1', E'sam\\160le2\\n')")
+        cur.execute("INSERT INTO test_binary values (decode('ff','hex'));")
+        self.conn.commit()
+
+        messages = self.get_changes()
+
+        messages.expect_startup()
+
+        # consume the insert
+        messages.expect_begin()
+        messages.expect_row_meta()
+
+        # The values of the two text fields will be the original text,
+        # returned unchanged, but the escapes in the second one will
+        # have been decoded, so it'll have a literal newline in it
+        # and the octal escape decoded.
+        m = messages.expect_insert()
+        self.assertEqual(m.message['newtup'][0], 'sample1\0')
+        self.assertEqual(m.message['newtup'][1], 'sample2\n\0')
+
+        messages.expect_row_meta()
+        m = messages.expect_insert()
+
+	# While this is a bytea field, we didn't negotiate binary or send/recv
+        # mode with the server, so what we'll receive is the hex-encoded text
+        # representation of the bytea value as as text-format literal.
+        self.assertEqual(m.message['newtup'][0], '\\xff\0')
+
+        messages.expect_commit()
+
+if __name__ == '__main__':
+    unittest.main()
-- 
2.1.0

