From a3d21e70a9de7a1e5bb2093f592dde53bae707cb Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Mon, 2 Nov 2015 19:34:21 +0800
Subject: [PATCH] Add contrib/pglogical_output, a logical decoding plugin

---
 contrib/Makefile                                   |   2 +
 contrib/pglogical_output/.gitignore                |   6 +
 contrib/pglogical_output/Makefile                  |  27 +
 contrib/pglogical_output/README.md                 | 622 +++++++++++++++++++++
 contrib/pglogical_output/doc/.gitignore            |   1 +
 contrib/pglogical_output/doc/DESIGN.md             | 124 ++++
 contrib/pglogical_output/doc/protocol.txt          | 546 ++++++++++++++++++
 contrib/pglogical_output/expected/basic_json.out   | 139 +++++
 contrib/pglogical_output/expected/basic_json_1.out | 108 ++++
 contrib/pglogical_output/expected/basic_native.out |  99 ++++
 contrib/pglogical_output/expected/cleanup.out      |   4 +
 .../pglogical_output/expected/encoding_json.out    |  59 ++
 contrib/pglogical_output/expected/hooks_json.out   | 202 +++++++
 contrib/pglogical_output/expected/hooks_json_1.out | 139 +++++
 contrib/pglogical_output/expected/hooks_native.out | 104 ++++
 .../pglogical_output/expected/params_native.out    | 118 ++++
 .../pglogical_output/expected/params_native_1.out  | 118 ++++
 contrib/pglogical_output/expected/prep.out         |  26 +
 contrib/pglogical_output/pglogical_config.c        | 499 +++++++++++++++++
 contrib/pglogical_output/pglogical_config.h        |  55 ++
 contrib/pglogical_output/pglogical_hooks.c         | 232 ++++++++
 contrib/pglogical_output/pglogical_hooks.h         |  22 +
 contrib/pglogical_output/pglogical_output.c        | 537 ++++++++++++++++++
 contrib/pglogical_output/pglogical_output.h        | 105 ++++
 contrib/pglogical_output/pglogical_output/README   |   7 +
 contrib/pglogical_output/pglogical_output/hooks.h  |  72 +++
 contrib/pglogical_output/pglogical_proto.c         |  49 ++
 contrib/pglogical_output/pglogical_proto.h         |  57 ++
 contrib/pglogical_output/pglogical_proto_json.c    | 204 +++++++
 contrib/pglogical_output/pglogical_proto_json.h    |  32 ++
 contrib/pglogical_output/pglogical_proto_native.c  | 494 ++++++++++++++++
 contrib/pglogical_output/pglogical_proto_native.h  |  37 ++
 contrib/pglogical_output/regression.conf           |   2 +
 contrib/pglogical_output/sql/basic_json.sql        |  24 +
 contrib/pglogical_output/sql/basic_native.sql      |  27 +
 contrib/pglogical_output/sql/basic_setup.sql       |  62 ++
 contrib/pglogical_output/sql/basic_teardown.sql    |   4 +
 contrib/pglogical_output/sql/cleanup.sql           |   4 +
 contrib/pglogical_output/sql/encoding_json.sql     |  58 ++
 contrib/pglogical_output/sql/hooks_json.sql        |  49 ++
 contrib/pglogical_output/sql/hooks_native.sql      |  48 ++
 contrib/pglogical_output/sql/hooks_setup.sql       |  37 ++
 contrib/pglogical_output/sql/hooks_teardown.sql    |  10 +
 contrib/pglogical_output/sql/params_native.sql     |  95 ++++
 contrib/pglogical_output/sql/prep.sql              |  30 +
 contrib/pglogical_output_plhooks/.gitignore        |   1 +
 contrib/pglogical_output_plhooks/Makefile          |  13 +
 .../README.pglogical_output_plhooks                | 158 ++++++
 .../pglogical_output_plhooks--1.0.sql              |  89 +++
 .../pglogical_output_plhooks.c                     | 414 ++++++++++++++
 .../pglogical_output_plhooks.control               |   4 +
 51 files changed, 5975 insertions(+)
 create mode 100644 contrib/pglogical_output/.gitignore
 create mode 100644 contrib/pglogical_output/Makefile
 create mode 100644 contrib/pglogical_output/README.md
 create mode 100644 contrib/pglogical_output/doc/.gitignore
 create mode 100644 contrib/pglogical_output/doc/DESIGN.md
 create mode 100644 contrib/pglogical_output/doc/protocol.txt
 create mode 100644 contrib/pglogical_output/expected/basic_json.out
 create mode 100644 contrib/pglogical_output/expected/basic_json_1.out
 create mode 100644 contrib/pglogical_output/expected/basic_native.out
 create mode 100644 contrib/pglogical_output/expected/cleanup.out
 create mode 100644 contrib/pglogical_output/expected/encoding_json.out
 create mode 100644 contrib/pglogical_output/expected/hooks_json.out
 create mode 100644 contrib/pglogical_output/expected/hooks_json_1.out
 create mode 100644 contrib/pglogical_output/expected/hooks_native.out
 create mode 100644 contrib/pglogical_output/expected/params_native.out
 create mode 100644 contrib/pglogical_output/expected/params_native_1.out
 create mode 100644 contrib/pglogical_output/expected/prep.out
 create mode 100644 contrib/pglogical_output/pglogical_config.c
 create mode 100644 contrib/pglogical_output/pglogical_config.h
 create mode 100644 contrib/pglogical_output/pglogical_hooks.c
 create mode 100644 contrib/pglogical_output/pglogical_hooks.h
 create mode 100644 contrib/pglogical_output/pglogical_output.c
 create mode 100644 contrib/pglogical_output/pglogical_output.h
 create mode 100644 contrib/pglogical_output/pglogical_output/README
 create mode 100644 contrib/pglogical_output/pglogical_output/hooks.h
 create mode 100644 contrib/pglogical_output/pglogical_proto.c
 create mode 100644 contrib/pglogical_output/pglogical_proto.h
 create mode 100644 contrib/pglogical_output/pglogical_proto_json.c
 create mode 100644 contrib/pglogical_output/pglogical_proto_json.h
 create mode 100644 contrib/pglogical_output/pglogical_proto_native.c
 create mode 100644 contrib/pglogical_output/pglogical_proto_native.h
 create mode 100644 contrib/pglogical_output/regression.conf
 create mode 100644 contrib/pglogical_output/sql/basic_json.sql
 create mode 100644 contrib/pglogical_output/sql/basic_native.sql
 create mode 100644 contrib/pglogical_output/sql/basic_setup.sql
 create mode 100644 contrib/pglogical_output/sql/basic_teardown.sql
 create mode 100644 contrib/pglogical_output/sql/cleanup.sql
 create mode 100644 contrib/pglogical_output/sql/encoding_json.sql
 create mode 100644 contrib/pglogical_output/sql/hooks_json.sql
 create mode 100644 contrib/pglogical_output/sql/hooks_native.sql
 create mode 100644 contrib/pglogical_output/sql/hooks_setup.sql
 create mode 100644 contrib/pglogical_output/sql/hooks_teardown.sql
 create mode 100644 contrib/pglogical_output/sql/params_native.sql
 create mode 100644 contrib/pglogical_output/sql/prep.sql
 create mode 100644 contrib/pglogical_output_plhooks/.gitignore
 create mode 100644 contrib/pglogical_output_plhooks/Makefile
 create mode 100644 contrib/pglogical_output_plhooks/README.pglogical_output_plhooks
 create mode 100644 contrib/pglogical_output_plhooks/pglogical_output_plhooks--1.0.sql
 create mode 100644 contrib/pglogical_output_plhooks/pglogical_output_plhooks.c
 create mode 100644 contrib/pglogical_output_plhooks/pglogical_output_plhooks.control

diff --git a/contrib/Makefile b/contrib/Makefile
index bd251f6..028fd9a 100644
--- a/contrib/Makefile
+++ b/contrib/Makefile
@@ -35,6 +35,8 @@ SUBDIRS = \
 		pg_stat_statements \
 		pg_trgm		\
 		pgcrypto	\
+		pglogical_output \
+		pglogical_output_plhooks \
 		pgrowlocks	\
 		pgstattuple	\
 		postgres_fdw	\
diff --git a/contrib/pglogical_output/.gitignore b/contrib/pglogical_output/.gitignore
new file mode 100644
index 0000000..2322e13
--- /dev/null
+++ b/contrib/pglogical_output/.gitignore
@@ -0,0 +1,6 @@
+pglogical_output.so
+results/
+regression.diffs
+tmp_install/
+tmp_check/
+log/
diff --git a/contrib/pglogical_output/Makefile b/contrib/pglogical_output/Makefile
new file mode 100644
index 0000000..bc95140
--- /dev/null
+++ b/contrib/pglogical_output/Makefile
@@ -0,0 +1,27 @@
+MODULE_big = pglogical_output
+PGFILEDESC = "pglogical_output - logical replication output plugin"
+
+OBJS = pglogical_output.o pglogical_hooks.o pglogical_config.o \
+	   pglogical_proto.o pglogical_proto_native.o \
+	   pglogical_proto_json.o
+
+REGRESS = prep params_native basic_native hooks_native basic_json hooks_json encoding_json cleanup
+
+
+subdir = contrib/pglogical_output
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+
+# 'make installcheck' disabled when building in-tree because these tests
+# require "wal_level=logical", which typical installcheck users do not have
+# (e.g. buildfarm clients).
+installcheck:
+	;
+
+EXTRA_INSTALL += contrib/pglogical_output_plhooks
+EXTRA_REGRESS_OPTS += --temp-config=./regression.conf
+
+install: all
+	$(MKDIR_P) '$(DESTDIR)$(includedir)'/pglogical_output
+	$(INSTALL_DATA) pglogical_output/hooks.h '$(DESTDIR)$(includedir)'/pglogical_output
diff --git a/contrib/pglogical_output/README.md b/contrib/pglogical_output/README.md
new file mode 100644
index 0000000..23ff9c2
--- /dev/null
+++ b/contrib/pglogical_output/README.md
@@ -0,0 +1,622 @@
+# `pglogical` Output Plugin
+
+This is the [logical decoding](http://www.postgresql.org/docs/current/static/logicaldecoding.html)
+[output plugin](http://www.postgresql.org/docs/current/static/logicaldecoding-output-plugin.html)
+for `pglogical`. Its purpose is to extract a change stream from a PostgreSQL
+database and send it to a client over a network connection using a
+well-defined, efficient protocol that multiple different applications can
+consume.
+
+The primary purpose of `pglogical_output` is to supply data to logical
+streaming replication solutions, but any application can potentially use its
+data stream. The output stream is designed to be compact and fast to decode,
+and the plugin supports upstream filtering of data so that only the required
+information is sent.
+
+Only one database is replicated, rather than the whole PostgreSQL install. A
+subset of that database may be selected for replication, currently based on
+table and on replication origin. Filtering by a WHERE clause can be supported
+easily in future.
+
+No triggers are required to collect the change stream and no external ticker or
+other daemon is required. It's accumulated using
+[replication slots](http://www.postgresql.org/docs/current/static/logicaldecoding-explanation.html#AEN66446),
+as supported in PostgreSQL 9.4 or newer, and sent on top of the
+[PostgreSQL streaming replication protocol](http://www.postgresql.org/docs/current/static/protocol-replication.html).
+
+Unlike block-level ("physical") streaming replication, the change stream from
+the `pglogical` output plugin is compatible across different PostgreSQL
+versions and can even be consumed by non-PostgreSQL clients.
+
+Because logical decoding is used, only the changed rows are sent on the wire.
+There's no index change data, no vacuum activity, etc transmitted.
+
+The use of a replication slot means that the change stream is reliable and
+crash-safe. If the client disconnects or crashes it can reconnect and resume
+replay from the last message that client processed. Server-side changes that
+occur while the client is disconnected are accumulated in the queue to be sent
+when the client reconnects. This reliability also means that server-side
+resources are consumed whether or not a client is connected.
+
+# Why another output plugin?
+
+See [`DESIGN.md`](DESIGN.md) for a discussion of why using one of the existing
+generic logical decoding output plugins like `wal2json` to drive a logical
+replication downstream isn't ideal. It's mostly about speed.
+
+# Architecture and high level interaction
+
+The output plugin is loaded by a PostgreSQL walsender process when a client
+connects to PostgreSQL using the PostgreSQL wire protocol with connection
+option `replication=database`, then uses
+[the `CREATE_REPLICATION_SLOT ... LOGICAL ...` or `START_REPLICATION SLOT ... LOGICAL ...` commands](http://www.postgresql.org/docs/current/static/logicaldecoding-walsender.html) to start streaming changes. (It can also be used via
+[SQL level functions](http://www.postgresql.org/docs/current/static/logicaldecoding-sql.html)
+over a non-replication connection, but this is mainly for debugging purposes).
+
+The client supplies parameters to the  `START_REPLICATION SLOT ... LOGICAL ...`
+command to specify the version of the `pglogical` protocol it supports,
+whether it wants binary format, etc.
+
+The output plugin processes the connection parameters and the connection enters
+streaming replication protocol mode, sometimes called "COPY BOTH" mode because
+it's based on the protocol used for the `COPY` command.  PostgreSQL then calls
+functions in this plugin to send it a stream of transactions to decode and
+translate into network messages. This stream of changes continues until the
+client disconnects.
+
+The only client-to-server interaction after startup is the sending of periodic
+feedback messages that allow the replication slot to discard no-longer-needed
+change history. The client *must* send feedback, otherwise `pg_xlog` on the
+server will eventually fill up and the server will stop working.
+
+
+# Usage
+
+The overall flow of client/server interaction is:
+
+* Client makes PostgreSQL fe/be protocol connection to server
+    * Connection options must include `replication=database` and `dbname=[...]` parameters
+    * The PostgreSQL client library can be `libpq` or anything else that supports the replication sub-protocol
+    * The same mechanisms are used for authentication and protocol encryption as for a normal non-replication connection
+* [Client issues `IDENTIFY_SYSTEM`
+    * Server responds with a single row containing system identity info
+* Client issues `CREATE_REPLICATION_SLOT slotname LOGICAL 'pglogical'` if it's setting up for the first time
+    * Server responds with success info and a snapshot identifier
+    * Client may at this point use the snapshot identifier on other connections while leaving this one idle
+* Client issues `START_REPLICATION SLOT slotname LOGICAL 0/0 (...options...)` to start streaming, which loops:
+    * Server emits `pglogical` message block encapsulated in a replication protocol `CopyData` message
+    * Client receives and unwraps message, then decodes the `pglogical` message block
+    * Client intermittently sends a standby status update message to server to confirm replay
+* ... until client sends a graceful connection termination message on the fe/be protocol level or the connection is broken
+
+ The details of `IDENTIFY_SYSTEM`, `CREATE_REPLICATION_SLOT` and `START_REPLICATION` are discussed in the [replication protocol docs](http://www.postgresql.org/docs/current/static/protocol-replication.html) and will not be repeated here.
+
+## Make a replication connection
+
+To use the `pglogical` plugin you must first establish a PostgreSQL FE/BE
+protocol connection using the client library of your choice, passing
+`replication=database` as one of the connection parameters. `database` is a
+literal string and is not replaced with the database name; instead the database
+name is passed separately in the usual `dbname` parameter. Note that
+`replication` is not a GUC (configuration parameter) and may not be passed in
+the `options` parameter on the connection, it's a top-level parameter like
+`user` or `dbname`.
+
+Example connection string for `libpq`:
+
+    'user=postgres replication=database sslmode=verify-full dbname=mydb'
+
+The plug-in name to pass on logical slot creation is `'pglogical'`.
+
+Details are in the replication protocol docs.
+
+## Get system identity
+
+If required you can use the `IDENTIFY_SYSTEM` command, which reports system
+information:
+
+	  systemid       | timeline |  xlogpos  | dbname | dboid
+    ---------------------+----------+-----------+--------+-------
+     6153224364663410513 |        1 | 0/C429C48 | testd  | 16385
+    (1 row)
+
+Details are in the replication protocol docs.
+
+## Create the slot if required
+
+If your application creates its own slots on first use and hasn't previously
+connected to this database on this system you'll need to create a replication
+slot. This keeps track of the client's replay state even while it's disconnected.
+
+The slot name may be anything your application wants up to a limit of 63
+characters in length. It's strongly advised that the slot name clearly identify
+the application and the host it runs on.
+
+Pass `pglogical` as the plugin name.
+
+e.g.
+
+    CREATE_REPLICATION_SLOT "reporting_host_42" LOGICAL "pglogical";
+
+`CREATE_REPLICATION_SLOT` returns a snapshot identifier that may be used with
+[`SET TRANSACTION SNAPSHOT`](http://www.postgresql.org/docs/current/static/sql-set-transaction.html)
+to see the database's state as of the moment of the slot's creation. The first
+change streamed from the slot will be the change immediately after this
+snapshot was taken. The snapshot is useful when cloning the initial state of a
+database being replicted. Applications that want to see the change stream
+going forward, but don't care about the initial state, can ignore this. The
+snapshot is only valid as long as the connection that issued the
+`CREATE_REPLICATION_SLOT` remains open and has not run another command.
+
+## Send replication parameters
+
+The client now sends:
+
+    START_REPLICATION SLOT "the_slot_name" LOGICAL (
+	'Expected_encoding', 'UTF8',
+	'Max_proto_major_version', '1',
+	'Min_proto_major_version', '1',
+	...moreparams...
+    );
+
+to start replication.
+
+The parameters are very important for ensuring that the plugin accepts
+the replication request and streams changes in the expected form. `pglogical`
+parameters are discussed in the separate `pglogical` protocol documentation.
+
+## Process the startup message
+
+`pglogical`'s output plugin will send a `CopyData` message containing its
+startup message as the first protocol message. This message contains a
+set of key/value entries describing the capabilities of the upstream output
+plugin, its version and the Pg version, the tuple format options selected,
+etc.
+
+The downstream client may choose to cleanly close the connection and disconnect
+at this point if it doesn't like the reply. It might then inform the user
+or reconnect with different parameters based on what it learned from the
+first connection's startup message.
+
+## Consume the change stream
+
+`pglogical`'s output plugin now sends a continuous series of `CopyData`
+protocol messages, each of which encapsulates a `pglogical` protocol message
+as documented in the separate protocol docs.
+
+These messages provide information about transaction boundaries, changed
+rows, etc.
+
+The stream continues until the client disconnects, the upstream server is
+restarted, the upstream walsender is terminated by admin action, there's
+a network issue, or the connection is otherwise broken.
+
+The client should send periodic feedback messages to the server to acknowledge
+that it's replayed to a given point and let the server release the resources
+it's holding in case that change stream has to be replayed again. See
+["Hot standby feedback message" in the replication protocol docs](http://www.postgresql.org/docs/current/static/protocol-replication.html)
+for details.
+
+## Disconnect gracefully
+
+Disconnection works just like any normal client; you use your client library's
+usual method for closing the connection. No special action is required before
+disconnection, though it's usually a good idea to send a final standby status
+message just before you disconnect.
+
+# Tests
+
+The `pg_regress` tests check invalid parameter handling and basic
+functionality.  They're intended for use by the buildfarm using an in-tree
+`make check`, but may also be run with an out-of-tree PGXS build against an
+existing PostgreSQL install using `make USE_PGXS=1 clean installcheck`.
+
+The tests may fail on installations that are not utf-8 encoded because the
+payloads of the binary protocol output will have text in different encodings,
+which aren't visible to psql as text to be decoded. Avoiding anything except
+7-bit ascii in the tests *should* prevent the problem.
+
+# Changeset forwarding
+
+It's possible to use `pglogical_output` to cascade replication between multiple
+PostgreSQL servers, in combination with an appropriate client to apply the
+changes to the downstreams.
+
+There are three forwarding modes:
+
+* Forward everything. Transactions are replicated whether they were made directly
+  on the immediate upstream or some other node upstream of it. All rows from all
+  transactions are sent.
+
+  Selected by setting `forward_changesets` to true (default) and not setting a
+  row or transaction filter hook.
+
+* No forwarding. Only transactions applied immediately on the upstream node are
+  forwarded. Transactions with any non-local origin are skipped. All rows from
+  locally originated transactions are sent.
+
+  Selected by setting `forward_changesets` to false. Remember to confirm by
+  checking the startup reply message.
+
+* Filtered forwarding. Transactions are replicated unless a client-supplied
+  transaction filter hook says to skip this transaction. Row changes are
+  replicated unless the client-supplied row filter hook (if provided) says to
+  skip that row.
+
+  Selected by setting `forward_changesets` to `true` and installing a
+  transaction and/or row filter hook (see "hooks").
+
+If the upstream server is 9.5 or newer and `forward_changesets` is enabled, the
+server will enable changeset origin information. It will set
+`forward_changeset_origins` to true in the startup reply message to indicate
+this. It will then send changeset origin messages after the `BEGIN` for each
+transaction, per the protocol documentation. Origin messages are omitted for
+transactions originating directly on the immediate upstream to save bandwidth.
+If `forward_changeset_origins` is true then transactions without an origin are
+always from the immediate upstream that’s running the decoding plugin.
+
+Note that changeset forwarding may be forced to on if not requested by some
+servers, so the client _should_ check the forward_changesets and
+`forward_changeset_origins` params in the startup reply message.
+
+Clients may use this facility to form arbitrarily complex topologies when
+combined with hooks to determine which transactions are forwarded. An obvious
+case is bi-directional (mutual) replication.
+
+# Selective replication
+
+By specifying a row filter hook it's possible to filter the replication stream
+server-side so that only a subset of changes is replicated.
+
+
+# Hooks
+
+`pglogical_output` exposes a number of extension points where applications can
+modify or override its behaviour.
+
+All hooks are called in their own memory context, which lasts for the duration
+of the logical decoding session. They may switch to longer lived contexts if
+needed, but are then responsible for their own cleanup.
+
+## Hook setup function
+
+The downstream must specify the fully-qualified name of a SQL-callable function
+on the server as the value of the `hooks.setup_function` client parameter.
+The SQL signature of this function is
+
+    CREATE OR REPLACE FUNCTION funcname(hooks internal, memory_context internal)
+    RETURNS void STABLE
+    LANGUAGE c AS 'MODULE_PATHNAME';
+
+Permissions are checked. This function must be callable by the user that the
+output plugin is running as. The function name *must* be schema-qualified and is
+parsed like any other qualified identifier.
+
+The function receives a pointer to a newly allocated structure of hook function
+pointers to populate as its first argument. The function must not free the
+argument.
+
+If the hooks need a private data area to store information across calls, the
+setup function should get the `MemoryContext` pointer from the 2nd argument,
+then `MemoryContextAlloc` a struct for the data in that memory context and
+store the pointer to it in `hooks->hooks_private_data`. This will then be
+accessible on future calls to hook functions. It need not be manually freed, as
+the memory context used for logical decoding will free it when it's freed.
+Don't put anything in it that needs manual cleanup.
+
+Each hook has its own C signature (defined below) and the pointers must be
+directly to the functions. Hooks that the client does not wish to set must be
+left null.
+
+An example is provided in `contrib/pglogical_output_plhooks` and the argument
+structs are defined in `pglogical_output/hooks.h`, which is installed into the
+PostgreSQL source tree when the extension is installed.
+
+Each hook that is enabled results in a new startup parameter being emitted in
+the startup reply message. Clients must check for these and must not assume a
+hook was successfully activated because no error is seen.
+
+Hook functions are called in the context of the backend doing logical decoding.
+Except for the startup hook, hooks see the catalog state as it was at the time
+the transaction or row change being examined was made. Access to to non-catalog
+tables is unsafe unless they have the `user_catalog_table` reloption set.
+
+## Startup hook
+
+The startup hook is called when logical decoding starts.
+
+This hook can inspect the parameters passed by the client to the output
+plugin as in_params. These parameters *must not* be modified.
+
+It can add new parameters to the set to be returned to the client in the
+startup parameters message, by appending to List out_params, which is
+initially NIL. Each element must be a `DefElem` with the param name
+as the `defname` and a `String` value as the arg, as created with
+`makeDefElem(...)`. It and its contents must be allocated in the
+logical decoding memory context.
+
+For walsender based decoding the startup hook is called only once, and
+cleanup might not be called at the end of the session.
+
+Multiple decoding sessions, and thus multiple startup hook calls, may happen
+in a session if the SQL interface for logical decoding is being used. In
+that case it's guaranteed that the cleanup hook will be called between each
+startup.
+
+When successfully enabled, the output parameter `hooks.startup_hook_enabled` is
+set to true in the startup reply message.
+
+Unlike the other hooks, this hook sees a snapshot of the database's current
+state, not a time-traveled catalog state. It is safe to access all tables from
+this hook.
+
+## Transaction filter hook
+
+The transaction filter hook can exclude entire transactions from being decoded
+and replicated based on the node they originated from.
+
+It is passed a `const TxFilterHookArgs *` containing:
+
+* The hook argument supplied by the client, if any
+* The `RepOriginId` that this transaction originated from
+
+and must return boolean, where true retains the transaction for sending to the
+client and false discards it. (Note that this is the reverse sense of the low
+level logical decoding transaction filter hook).
+
+The hook function must *not* free the argument struct or modify its contents.
+
+Note that individual changes within a transaction may have different origins to
+the transaction as a whole; see "Origin filtering" for more details. If a
+transaction is filtered out, all changes are filtered out even if their origins
+differ from that of the transaction as a whole.
+
+When successfully enabled, the output parameter
+`hooks.transaction_filter_enabled` is set to true in the startup reply message.
+
+## Row filter hook
+
+The row filter hook is called for each row. It is passed information about the
+table, the transaction origin, and the row origin.
+
+It is passed a `const RowFilterHookArgs*` containing:
+
+* The hook argument supplied by the client, if any
+* The `Relation` the change affects
+* The change type - 'I'nsert, 'U'pdate or 'D'elete
+
+It can return true to retain this row change, sending it to the client, or
+false to discard it.
+
+The function *must not* free the argument struct or modify its contents.
+
+Note that it is more efficient to exclude whole transactions with the
+transaction filter hook rather than filtering out individual rows.
+
+When successfully enabled, the output parameter
+`hooks.row_filter_enabled` is set to true in the startup reply message.
+
+## Shutdown hook
+
+The shutdown hook is called when a decoding session ends. You can't rely on
+this hook being invoked reliably, since a replication-protocol walsender-based
+session might just terminate. It's mostly useful for cleanup to handle repeated
+invocations under the SQL interface to logical decoding.
+
+You don't need a hook to free memory you allocated, unless you explicitly
+switched to a longer lived memory context like TopMemoryContext. Memory allocated
+in the hook context will be automatically when the decoding session shuts down.
+
+## Writing hooks in procedural languages
+
+You can write hooks in PL/PgSQL, etc, too, via the `pglogical_output_plhooks`
+adapter extension in `contrib`. They won't perform very well though.
+
+# Limitations
+
+The advantages of logical decoding in general and `pglogical_output` in
+particular are discussed above. There are also some limitations that apply to
+`pglogical_output`, and to Pg's logical decoding in general.
+
+(TODO: move much of this to the main logical decoding docs)
+
+Notably:
+
+## Doesn't replicate DDL
+
+Logical decoding doesn't decode catalog changes directly. So the plugin can't
+just send a `CREATE TABLE` statement when a new table is added.
+
+If the data being decoded is being applied to another PostgreSQL database then
+its table definitions must be kept in sync via some means external to the logical
+decoding plugin its self, such as:
+
+* Event triggers using DDL deparse to capture DDL changes as they happen and write them to a table to be replicated and applied on the other end; or
+* doing DDL management via tools that synchronise DDL on all nodes
+
+## Doesn't replicate global objects/shared catalog changes
+
+PostgreSQL has a number of object types that exist across all databases, stored
+in *shared catalogs*. These include:
+
+* Roles (users/groups)
+* Security labels on users and databases
+
+Such objects cannot be replicated by `pglogical_output`. They're managed with DDL that
+can't be captured within a single database and isn't decoded anyway.
+
+DDL for global object changes must be synchronized via some external means.
+
+## Mostly one-way communication
+
+Per the protocol documentation, the downstream can't send anything except
+replay progress messages to the upstream after replication begins, and can't
+re-initialise replication without a disconnect.
+
+To achieve downstream-to-upstream communication, clients can use a regular
+libpq connection to the upstream then write to tables or call functions.
+Alternately, a separate replication connection in the opposite direction can be
+created by the application to carry information from downstream to upstream.
+
+See "Protocol flow" in the protocol documentation for more information.
+
+## Physical replica failover
+
+Logical decoding cannot follow a physical replication failover because
+replication slot state is not replicated to physical replicas. If you fail over
+to a streaming replica you have to manually reconnect your logical replication
+clients, creating new slots, etc. This is a core PostgreSQL limitation.
+
+Also, there's no built-in way to guarantee that the logical replication slot
+from the failed master hasn't replayed further than the physical streaming
+replica you failed over to. You could receive changes on your logical decoding
+stream from the old master that never made it to the physical streaming
+replica. This is true (albeit very unlikely) *even if the physical streaming
+replica is synchronous* because PostgreSQL sends the replication data anyway,
+then just delays the commit's visibility on the master. Support for strictly
+ordered standbys would be required in PostgreSQL to avoid this.
+
+To achieve failover with logical replication you cannot mix in physical
+standbys. The logical replication client has to take responsibility for
+maintaining slots on logical replicas intended as failover candidates
+and for ensuring that the furthest-ahead replica is promoted if there is
+more than one.
+
+## Can only replicate complete transactions
+
+Logical decoding can only replicate a transaction after it has committed. This
+usefully skips replication of rolled back transactions, but it also means that
+very large transactions must be completed upstream before they can begin on the
+downstream, adding to replication latency.
+
+## Replicates only one transaction at a time
+
+Logical decoding serializes transactions in commit order, so pglogical_output
+cannot replay interleaved concurrent transactions. This can lead to high latencies
+when big transactions are being replayed, since smaller transactions get queued
+up behind them.
+
+## Unique index required for inserts or updates
+
+To replicate `INSERT`s or `UPDATE`s it is necessary to have a `PRIMARY KEY`
+or a (non-partial, columns-only) `UNIQUE` index on the table, so the table
+has a `REPLICA IDENTITY`. Without that `pglogical_output` doesn't know what
+old key to send to allow the receiver to tell which tuple is being updated.
+
+## UNLOGGED tables aren't replicated
+
+Because `UNLOGGED` tables aren't written to WAL, they aren't replicated by
+logical or physical replication. You can only replicate `UNLOGGED` tables
+with trigger-based solutions.
+
+## Unchanged fields are often sent in `UPDATE`
+
+Because there's no tracking of dirty/clean fields when a tuple is updated,
+logical decoding can't tell if a given field was changed by an update.
+Unchanged fields can only by identified and omitted if they're a variable
+length TOASTable type and are big enough to get stored out-of-line in
+a TOAST table.
+
+# Troubleshooting and debugging
+
+## Non-destructively previewing pending data on a slot
+
+Using the json mode of `pglogical_output` you can examine pending transactions
+on a slot without consuming them, so they are still delivered to the usual
+client application that created/owns this slot. This is best done using the SQL
+interface to logical decoding, since it gives you finer control than using
+`pg_recvlogical`.
+
+You can only peek at a slot while there is no other client connected to that
+slot.
+
+Use `pg_logical_slot_peek_changes` to examine the change stream without
+destructively consuming changes. This is extremely helpful when trying to
+determine why an error occurs in a downstream, since you can examine a
+json-ified representation of the xact. It's necessary to supply a minimal
+set of required parameters to the output plugin.
+
+e.g. given setup:
+
+    CREATE TABLE discard_test(blah text);
+    SELECT 'init' FROM pg_create_logical_replication_slot('demo_slot', 'pglogical_output');
+    INSERT INTO discard_test(blah) VALUES('one');
+    INSERT INTO discard_test(blah) VALUES('two1'),('two2'),('two3');
+    INSERT INTO discard_test(blah) VALUES('three1'),('three2');
+
+you can peek at the change stream with:
+
+     SELECT location, xid, data
+     FROM pg_logical_slot_peek_changes('demo_slot', NULL, NULL,
+              'min_proto_version', '1', 'max_proto_version', '1',
+              'startup_params_format', '1', 'proto_format', 'json');
+
+The two `NULL`s mean you don't want to stop decoding after any particular
+LSN or any particular number of changes. Decoding will stop when there's nothing
+left to decode or you cancel the query.
+
+This will emit a key/value startup message then change data rows like:
+
+     location  | xid  |                                            data
+     0/4E8AAF0 | 5562 | {"action":"B", has_catalog_changes:"f", xid:"5562", first_lsn:"0/4E8AAF0", commit_time:"2015-11-13 14:26:21.404425+08"}
+     0/4E8AAF0 | 5562 | {"action":"I","relation":["public","discard_test"],"newtuple":{"blah":"one"}}
+     0/4E8AB70 | 5562 | {"action":"C", final_lsn:"0/4E8AB30", end_lsn:"0/4E8AB70"}
+     0/4E8ABA8 | 5563 | {"action":"B", has_catalog_changes:"f", xid:"5563", first_lsn:"0/4E8ABA8", commit_time:"2015-11-13 14:26:32.015611+08"}
+     0/4E8ABA8 | 5563 | {"action":"I","relation":["public","discard_test"],"newtuple":{"blah":"two1"}}
+     0/4E8ABE8 | 5563 | {"action":"I","relation":["public","discard_test"],"newtuple":{"blah":"two2"}}
+     0/4E8AC28 | 5563 | {"action":"I","relation":["public","discard_test"],"newtuple":{"blah":"two3"}}
+     0/4E8ACA8 | 5563 | {"action":"C", final_lsn:"0/4E8AC68", end_lsn:"0/4E8ACA8"}
+     ....
+
+The output is the LSN (log sequence number) associated with a change, the top
+level transaction ID that performed the change, and the change data as json.
+
+You can see the transaction boundaries by xid changes and by the "B"egin and
+"C"ommit messages, and you can see the individual row "I"nserts. Replication
+origins, commit timestamps, etc will be shown if known.
+
+See http://www.postgresql.org/docs/current/static/functions-admin.html for
+information on the peek functions.
+
+If you want the binary format you can get that with
+`pg_logical_slot_peek_binary_changes` and the `native` protocol, but that's
+generally much less useful.
+
+# Manually discarding a change from a slot
+
+Sometimes it's desirable to manually purge one or more changes from a
+replication slot. This is usually an error recovery step when problems arise
+with the downstream code that's replaying from the slot.
+
+You can use the peek functions to determine the point in the stream you want to
+discard up to, as identifed by LSN (log sequence number). See
+"non-destructively previewing pending data on a slot" above for details.
+
+You can't control the point you start discarding from, it's always from the
+current stream position up to a point you specify. If the peek shows that
+there's data you still want to retain you must make sure that the downstream
+replays up to the point you want to keep changes and sends replay confirmation.
+In other words there's no way to cut a sequence of changes out of the middle of
+the pending change stream.
+
+Once you've peeked the stream and know the LSN you want to discard up to, you
+can use `pg_logical_slot_peek_changes`, specifying an `upto_lsn`, to consume
+changes from the slot up to but not including that point, i.e. that will be the
+point at which replay resumes.
+
+For example, if you wanted to discard the first transaction in the example
+from the section above, i.e. discard xact 5562 and start decoding at xact
+5563 from its' BEGIN lsn `0/4E8ABA8`, you'd run:
+
+      SELECT location, xid, data
+      FROM pg_logical_slot_get_changes('demo_slot', '0/4E8ABA8', NULL,
+               'min_proto_version', '1', 'max_proto_version', '1',
+               'startup_params_format', '1', 'proto_format', 'json');
+
+Note that `_get_changes` is used instead of `_peek_changes` and that
+the `upto_lsn` is `'0/4E8ABA8'` instead of `NULL`.
+
+
+
+
+
diff --git a/contrib/pglogical_output/doc/.gitignore b/contrib/pglogical_output/doc/.gitignore
new file mode 100644
index 0000000..2874bff
--- /dev/null
+++ b/contrib/pglogical_output/doc/.gitignore
@@ -0,0 +1 @@
+protocol.html
diff --git a/contrib/pglogical_output/doc/DESIGN.md b/contrib/pglogical_output/doc/DESIGN.md
new file mode 100644
index 0000000..05fb4d1
--- /dev/null
+++ b/contrib/pglogical_output/doc/DESIGN.md
@@ -0,0 +1,124 @@
+# Design decisions
+
+Explanations of why things are done the way they are.
+
+## Why does pglogical_output exist when there's wal2json etc?
+
+`pglogical_output` does plenty more than convert logical decoding change
+messages to a wire format and send them to the client.
+
+It handles format negotiations, sender-side filtering using pluggable hooks
+(and the associated plugin handling), etc. The protocol its self is also
+important, and incorporates elements like binary datum transfer that can't be
+easily or efficiently achieved with json.
+
+## Custom binary protocol
+
+Why do we have a custom binary protocol inside the walsender / copy both protocol,
+rather than using a json message representation?
+
+Speed and compactness. It's expensive to create json, with lots of allocations.
+It's expensive to decode it too. You can't represent raw binary in json, and must
+encode it, which adds considerable overhead for some data types. Using the
+obvious, easy to decode json representations also makes it difficult to do
+later enhancements planned for the protocol and decoder, like caching row
+metadata.
+
+The protocol implementation is fairly well encapsulated, so in future it should
+be possible to emit json instead for clients that request it. Right now that's
+not the priority as tools like wal2json already exist for that.
+
+## Column metadata
+
+The output plugin sends metadata for columsn - at minimum, the column names -
+before each row. It will soon be changed to send the data before each row from
+a new, different table, so that streams of inserts from COPY etc don't repeat
+the metadata each time. That's just a pending feature.
+
+The reason metadata must be sent is that the upstream and downstream table's
+attnos don't necessarily correspond. The column names might, and their ordering
+might even be the same, but any column drop or column type change will result
+in a dropped column on one side. So at the user level the tables look the same,
+but their attnos don't match, and if we rely on attno for replication we'll get
+the wrong data in the wrong columns. Not pretty.
+
+That could be avoided by requiring that the downstream table be strictly
+maintained by DDL replication, but:
+
+* We don't want to require DDL replication
+* That won't work with multiple upstreams feeding into a table
+* The initial table creation still won't be correct if the table has dropped
+  columns, unless we (ab)use `pg_dump`'s `--binary-upgrade` support to emit
+  tables with dropped columns, which we don't want to do.
+
+So despite the bandwidth cost, we need to send metadata.
+
+In future a client-negotiated cache is planned, so that clients can announce
+to the output plugin that they can cache metadata across change series, and
+metadata can only be sent when invalidated by relation changes or when a new
+relation is seen.
+
+Support for type metadata is penciled in to the protocol so that clients that
+don't have table definitions at all - like queueing engines - can decode the
+data. That'll also permit type validation sanity checking on the apply side
+with logical replication.
+
+## Hook entry point as a SQL function
+
+The hooks entry point is a SQL function that populates a passed `internal`
+struct with hook function pointers.
+
+The reason for this is that hooks are specified by a remote peer over the
+network. We can't just let the peer say "dlsym() this arbitrary function name
+and call it with these arguments" for fairly obvious security reasons. At bare
+minimum all replication using hooks would have to be superuser-only if we did
+that.
+
+The SQL entry point is only called once per decoding session and the rest of
+the calls are plain C function pointers.
+
+## The startup reply message
+
+The protocol design choices available to `pg_logical` are constrained by being
+contained in the copy-both protocol within the fe/be protocol, running as a
+logical decoding plugin. The plugin has no direct access to the network socket
+and can't send or receive messages whenever it wants, only under the control of
+the walsender and logical decoding framework.
+
+The only opportunity for the client to send data directly to the logical
+decoding plugin is in the  `START_REPLICATION` parameters, and it can't send
+anything to the client before that point.
+
+This means there's no opportunity for a multi-way step negotiation between
+client and server. We have to do all the negotiation we're going to in a single
+exchange of messages - the setup parameters and then the replication start
+message. All the client can do if it doesn't like the offer the server makes is
+disconnect and try again with different parameters.
+
+That's what the startup message is for. It reports the plugin's capabilities
+and tells the client which requested options were honoured. This gives the
+client a chance to decide if it's happy with the output plugin's decision
+or if it wants to reconnect and try again with different options. Iterative
+negotiation, effectively.
+
+## Unrecognised parameters MUST be ignored by client and server
+
+To ensure upward and downward compatibility, the output plugin must ignore
+parameters set by the client if it doesn't recognise them, and the client
+must ignore parameters it doesn't recognise in the server's startup reply
+message.
+
+This ensures that older clients can talk to newer servers and vice versa.
+
+For this to work, the server must never enable new functionality such as
+protocol message types, row formats, etc without the client explicitly
+specifying via a startup parameter that it understands the new functionality.
+Everything must be negotiated.
+
+Similarly, a newer client talking to an older server may ask the server to
+enable functionality, but it can't assume the server will actually honour that
+request. It must check the server's startup reply message to see if the server
+confirmed that it enabled the requested functionality. It might choose to
+disconnect and report an error to the user if the server didn't do what it
+asked. This can be important, e.g. when a security-significant hook is
+specified.
diff --git a/contrib/pglogical_output/doc/protocol.txt b/contrib/pglogical_output/doc/protocol.txt
new file mode 100644
index 0000000..0ab41bf
--- /dev/null
+++ b/contrib/pglogical_output/doc/protocol.txt
@@ -0,0 +1,546 @@
+= Pg_logical protocol
+
+pglogical_output defines a libpq subprocotol for streaming tuples, metadata,
+etc, from the decoding plugin to receivers.
+
+This protocol is an inner layer in a stack:
+
+ * tcp or unix sockets
+ ** libpq protocol
+ *** libpq replication subprotocol (COPY BOTH etc)
+ **** pg_logical output plugin => consumer protocol
+
+so clients can simply use libpq's existing replication protocol support,
+directly or via their libpq-wrapper driver.
+
+This is a binary protocol intended for compact representation.
+
+`pglogical_output` also supports a json-based text protocol with json
+representations of the same changesets, supporting all the same hooks etc,
+intended mainly for tracing/debugging/diagnostics. That protocol is not
+discussed here.
+
+== ToC
+
+== Protocol flow
+
+The protocol flow is primarily from upstream walsender/decoding plugin to the
+downstream receiver.
+
+The only information the flows downstream-to-upstream is:
+
+ * The initial parameter list sent to `START_REPLICATION`; and
+ * replay progress messages
+
+We can accept an arbitrary list of params to `START_REPLICATION`. After
+that we have no general purpose channel for information to flow upstream. That
+means we can't do a multi-step negotiation/handshake for determining the
+replication options to use, binary protocol, etc.
+
+The main form of negotiation is the client getting a "take it or leave it" set
+of settings from the server in an initial startup message sent before any
+replication data (see below) and, if it doesn't like them, reconnecting with
+different startup options.
+
+Except for the negotiation via initial parameter list and then startup message
+the protocol flow is the same as any other walsender-based logical replication
+plugin. The data stream is sent in COPY BOTH mode as a series of CopyData
+messages encapsulating replication data, and ends when the client disconnects.
+There's no facility for ending the COPY BOTH mode and returning to the
+walsender command parser to issue new commands. This is a limiation of the
+walsender interface, not pglogical_output.
+
+== Protocol messages
+
+The individual protocol messages are discussed in the following sub-sections.
+Protocol flow and logic comes in the next major section.
+
+Absolutely all top-level protocol messages begin with a message type byte.
+While represented in code as a character, this is a signed byte with no
+associated encoding.
+
+Since the PostgreSQL libpq COPY protocol supplies a message length there’s no
+need for top-level protocol messages to embed a length in their header.
+
+=== BEGIN message
+
+A stream of rows starts with a `BEGIN` message. Rows may only be sent after a
+`BEGIN` and before a `COMMIT`.
+
+|===
+|*Message*|*Type/Size*|*Notes*
+
+|Message type|signed char|Literal ‘**B**’ (0x42)
+|flags|uint8| * 0-3: Reserved, client _must_ ERROR if set and not recognised.
+|lsn|uint64|“final_lsn” in decoding context - currently it means lsn of commit
+|commit time|uint64|“commit_time” in decoding context
+|remote XID|uint32|“xid” in decoding context
+|===
+
+=== Forwarded transaction origin message
+
+The message after the `BEGIN` may be a _forwarded transaction origin_ message
+indicating what upstream node the transaction came from.
+
+Sent if the immediately prior message was a `BEGIN` message, the upstream
+transaction was forwarded from another node, and replication origin forwarding
+is enabled, i.e. `forward_changeset_origins` is `t` in the startup reply
+message.
+
+A "node" could be another host, another DB on the same host, or pretty much
+anything. Whatever origin name is found gets forwarded.  The origin identifier
+is of arbitrary and application-defined format.  Applications _should_ prefix
+their origin identifier with a fixed application name part, like `bdr_`,
+`myapp_`, etc. It is application-defined what an application does with
+forwarded transactions from other applications.
+
+An origin message with a zero-length origin name indicates that the origin
+could not be identified but was (probably) not the local node. It is
+client-defined what action is taken in this case.
+
+It is a protocol error to send/receive a forwarded transaction origin message
+at any time other than immediately after a `BEGIN` message.
+
+The origin identifier is typically closely related to replication slot names
+and replication origins’ names in an application system.
+
+For more detail see _Changeset Forwarding_ in the README.
+
+|===
+|*Message*|*Type/Size*|*Notes*
+
+|Message type|signed char|Literal ‘**O**’ (0x4f)
+|flags|uint8| * 0-3: Reserved, application _must_ ERROR if set and not recognised
+|origin_lsn|uint64|Log sequence number (LSN, XLogRecPtr) of the transaction’s commit record on its origin node (as opposed to the forwarding node’s commit LSN, which is ‘lsn’ in the BEGIN message)
+|origin_identifier_length|uint8|Length in bytes of origin_identifier
+|origin_identifier|signed char[origin_identifier_length]|An origin identifier of arbitrary, upstream-application-defined structure. _Should_ be text in the same encoding as the upstream database. NULL-terminated. _Should_ be 7-bit ASCII.
+|===
+
+=== COMMIT message
+A stream of rows ends with a `COMMIT` message.
+
+There is no `ROLLBACK` message because aborted transactions are not sent by the
+upstream.
+
+|===
+|*Message*|*Type/Size*|*Notes*
+
+|Message type|signed char|Literal ‘**C**’ (0x43)
+|Flags|uint8| * 0-3: Reserved, client _must_ ERROR if set and not recognised
+|Commit LSN|uint64|commit_lsn in decoding commit decode callback. This is the same value as in the BEGIN message, and marks the end of the transaction.
+|End LSN|uint64|end_lsn in decoding transaction context
+|Commit time|uint64|commit_time in decoding transaction context
+|===
+
+=== INSERT, UPDATE or DELETE message
+
+After a `BEGIN` or metadata message, the downstream should expect to receive
+zero or more row change messages, composed of an insert/update/delete message
+with zero or more tuple fields, each of which has one or more tuple field
+values.
+
+The row’s relidentifier _must_ match that of the most recently preceding
+metadata message. All consecutive row messages must currently have the same
+relidentifier. (_Later extensions to add metadata caching will relax these
+requirements for clients that advertise caching support; see the documentation
+on metadata messages for more detail_).
+
+It is an error to decode rows using metadata received after the row was
+received, or using metadata that is not the most recently received metadata
+revision that still predates the row. I.e. in the sequence M1, R1, R2, M2, R3,
+M4: R1 and R2 must be decoded using M1, and R3  must be decoded using M2. It is
+an error to use M4 to decode any of the rows, to use M1 to decode R3, or to use
+M2 to decode R1 and R2.
+
+Row messages _may not_ arrive except during a transaction as delimited by `BEGIN`
+and `COMMIT` messages. It is an error to receive a row message outside a
+transaction.
+
+Any unrecognised tuple type or tuple part type is an error on the downstream
+that must result in a client disconnect and error message. Downstreams are
+expected to negotiate compatibility, and upstreams must not add new tuple types
+or tuple field types without negotiation.
+
+The downstream reads rows until the next non-row message is received. There is
+no other end marker or any indication of how many rows to expect in a sequence.
+
+==== Row message header
+
+|===
+|*Message*|*Type/Size*|*Notes*
+
+|Message type|signed char|Literal ‘**I**’nsert (0x49), ‘**U**’pdate’ (0x55) or ‘**D**’elete (0x44)
+|flags|uint8|Row flags (reserved)
+|relidentifier|uint32|relidentifier that matches the table metadata message sent for this row.
+(_Not present in BDR, which sends nspname and relname instead_)
+|[tuple parts]|[composite]|
+|===
+
+One or more tuple-parts fields follow.
+
+==== Tuple fields
+
+|===
+|Tuple type|signed char|Identifies the kind of tuple being sent.
+
+|tupleformat|signed char|‘**T**’ (0x54)
+|natts|uint16|Number of fields sent in this tuple part.
+(_Present in BDR, but meaning significantly different here)_
+|[tuple field values]|[composite]|
+|===
+
+===== Tuple tupleformat compatibility
+
+Unrecognised _tupleformat_ kinds are a protocol error for the downstream.
+
+==== Tuple field value fields
+
+These message parts describe individual fields within a tuple.
+
+There are two kinds of tuple value fields, abbreviated and full. Which is being
+read is determined based on the first field, _kind_.
+
+Abbreviated tuple value fields are nothing but the message kind:
+
+|===
+|*Message*|*Type/Size*|*Notes*
+
+|kind|signed char| * ‘**n**’ull (0x6e) field
+|===
+
+Full tuple value fields have a length and datum:
+
+|===
+|*Message*|*Type/Size*|*Notes*
+
+|kind|signed char| * ‘**i**’nternal binary (0x62) field
+|length|int4|Only defined for kind = i\|b\|t
+|data|[length]|Data in a format defined by the table metadata and column _kind_.
+|===
+
+===== Tuple field values kind compatibility
+
+Unrecognised field _kind_ values are a protocol error for the downstream. The
+downstream may not continue processing the protocol stream after this
+point**.**
+
+The upstream may not send ‘**i**’nternal or ‘**b**’inary format values to the
+downstream without the downstream negotiating acceptance of such values. The
+downstream will also generally negotiate to receive type information to use to
+decode the values. See the section on startup parameters and the startup
+message for details.
+
+=== Table/row metadata messages
+
+Before sending changed rows for a relation, a metadata message for the relation
+must be sent so the downstream knows the namespace, table name, column names,
+optional column types, etc. A relidentifier field, an arbitrary numeric value
+unique for that relation on that upstream connection, maps the metadata to
+following rows.
+
+A client should not assume that relation metadata will be followed immediately
+(or at all) by rows, since future changes may lead to metadata messages being
+delivered at other times. Metadata messages may arrive during or between
+transactions.
+
+The upstream may not assume that the downstream retains more metadata than the
+one most recent table metadata message. This applies across all tables, so a
+client is permitted to discard metadata for table x when getting metadata for
+table y. The upstream must send a new metadata message before sending rows for
+a different table, even if that metadata was already sent in the same session
+or even same transaction. _This requirement will later be weakened by the
+addition of client metadata caching, which will be advertised to the upstream
+with an output plugin parameter._
+
+Columns in metadata messages are numbered from 0 to natts-1, reading
+consecutively from start to finish. The column numbers do not have to be a
+complete description of the columns in the upstream relation, so long as all
+columns that will later have row values sent are described. The upstream may
+choose to omit columns it doesn’t expect to send changes for in any given
+series of rows. Column numbers are not necessarily stable across different sets
+of metadata for the same table, even if the table hasn’t changed structurally.
+
+A metadata message may not be used to decode rows received before that metadata
+message.
+
+==== Table metadata header
+
+|===
+|*Message*|*Type/Size*|*Notes*
+
+|Message type|signed char|Literal ‘**R**’ (0x52)
+|flags|uint8| * 0-6: Reserved, client _must_ ERROR if set and not recognised.
+|relidentifier|uint32|Arbitrary relation id, unique for this upstream. In practice this will probably be the upstream table’s oid, but the downstream can’t assume anything.
+|nspnamelength|uint8|Length of namespace name
+|nspname|signed char[nspnamelength]|Relation namespace (null terminated)
+|relnamelength|uint8|Length of relation name
+|relname|char[relname]|Relation name (null terminated)
+|attrs block|signed char|Literal: ‘**A**’ (0x41)
+|natts|uint16|number of attributes
+|[fields]|[composite]|Sequence of ‘natts’ column metadata blocks, each of which begins with a column delimiter followed by zero or more column metadata blocks, each with the same column metadata block header.
+
+This chunked format is used so that new metadata messages can be added without breaking existing clients.
+|===
+
+==== Column delimiter
+
+Each column’s metadata begins with a column metadata header. This comes
+immediately after the natts field in the table metadata header or after the
+last metadata block in the prior column.
+
+It has the same char header as all the others, and the flags field is the same
+size as the length field in other blocks, so it’s safe to read this as a column
+metadata block header.
+
+|===
+|*Message*|*Type/Size*|*Notes*
+
+|blocktype|signed char|‘**C**’ (0x43) - column
+|flags|uint8|Column info flags
+|===
+
+==== Column metadata block header
+
+All column metadata blocks share the same header, which is the same length as a
+column delimiter:
+
+|===
+|*Message*|*Type/Size*|*Notes*
+
+|blocktype|signed char|Identifies the kind of metadata block that follows.
+|blockbodylength|uint16|Length of block in bytes, excluding blocktype char and length field.
+|===
+
+==== Column name block
+
+This block just carries the name of the column, nothing more. It begins with a
+column metadata block, and the rest of the message is the column name.
+
+|===
+|*Message*|*Type/Size*|*Notes*
+
+|[column metadata block header]|[composite]|blocktype = ‘**N**’ (0x4e)
+|colname|char[blockbodylength]|Column name.
+|===
+
+
+==== Column type block
+
+T.B.D.
+
+Not defined in first protocol revision.
+
+Likely to send a type identifier (probably the upstream oid) as a reference to
+a “type info” protocol message to be delivered before. Then we can cache the
+type descriptions and avoid repeating long schemas and names, just using the
+oids.
+
+Needs to have room to handle:
+
+ * built-in core types
+ * extension types (ext version may vary)
+ * enum types (CREATE TYPE … AS ENUM)
+ * range types (CREATE TYPE … AS RANGE)
+ * composite types (CREATE TYPE … AS (...))
+ * custom types (CREATE TYPE ( input = x_in, output = x_out ))
+
+… some of which can be nested
+
+== Startup message
+
+After processing output plugin arguments, the upstream output plugin must send
+a startup message as its first message on the wire. It is a trivial header
+followed by alternating key and value strings represented as null-terminated
+unsigned char strings.
+
+This message specifies the capabilities the output plugin enabled and describes
+the upstream server and plugin. This may change how the client decodes the data
+stream, and/or permit the client to disconnect and report an error to the user
+if the result isn’t acceptable.
+
+If replication is rejected because the client is incompatible or the server is
+unable to satisfy required options, the startup message may be followed by a
+libpq protocol FATAL message that terminates the session. See “Startup errors”
+below.
+
+The parameter names and values are sent as alternating key/value pairs as
+null-terminated strings, e.g.
+
++“key1\0parameter1\0key2\0value2\0”+
+
+|===
+|*Message*|*Type/Size*|*Notes*
+
+|Message type|signed char|‘**S**’ (0x53) - startup
+|Startup message version|uint8|Value is always “1”.
+|(parameters)|null-terminated key/value pairs|See table below for parameter definitions.
+|===
+
+=== Startup message parameters 
+
+Since all parameter values are sent as strings, the value types given below specify what the value must be reasonably interpretable as.
+
+|===
+|*Key name*|*Value type*|*Description*
+
+|max_proto_version|integer|Newest version of the protocol supported by output plugin.
+|min_proto_version|integer|Oldest protocol version supported by server.
+|proto_format|text|Protocol format requested. native (documented here) or json. Default is native.
+|coltypes|boolean|Column types will be sent in table metadata.
+|pg_version_num|integer|PostgreSQL server_version_num of server, if it’s PostgreSQL. e.g. 090400
+|pg_version|string|PostgreSQL server_version of server, if it’s PostgreSQL.
+|pg_catversion|uint32|Version of the PostgreSQL system catalogs on the upstream server, if it’s PostgreSQL.
+|binary|_set of parameters, specified separately_|See “_the __‘binary’__ parameters_” below, and “_Parameters relating to exchange of binary values_”
+|database_encoding|string|The native text encoding of the database the plugin is running in
+|encoding|string|Field values for textual data will be in this encoding in native protocol text, binary or internal representation. For the native protocol this is currently always the same as `database_encoding`. For text-mode json protocol this is always the same as `client_encoding`.
+|forward_changesets|bool|Specifies that all transactions, not just those originating on the upstream, will be forwarded. See “_Changeset forwarding_”.
+|forward_changeset_origins|bool|Tells the client that the server will send changeset origin information. Independent of forward_changesets. See “_Changeset forwarding_” for details.
+|no_txinfo|bool|Requests that variable transaction info such as XIDs, LSNs, and timestamps be omitted from output. Mainly for tests. Currently ignored for protos other than json.
+|===
+
+
+The ‘binary’ parameter set:
+== 
+|===
+|*Key name*|*Value type*|*Description*
+
+|binary.internal_basetypes|boolean|If true, PostgreSQL internal binary representations for row field data may be used for some or all row fields, if here the type is appropriate and the binary compatibility parameters of upstream and downstream match. See binary.want_internal_basetypes in the output plugin parameters for details.
+
+May only be true if _binary.want_internal_basetypes_ was set to true by the client in the parameters and the client’s accepted binary format matches that of the server.
+|binary.binary_basetypes|boolean|If true, external binary format (send/recv format) may be used for some or all row field data where the field type is a built-in base type whose send/recv format is compatible with binary.binary_pg_version .
+
+May only be set if _binary.want_binary_basetypes_ was set to true by the client in the parameters and the client’s accepted send/recv format matches that of the server.
+|binary.binary_pg_version|uint16|The PostgreSQL major version that send/recv format values will be compatible with. This is not necessarily the actual upstream PostgreSQL version.
+|binary.sizeof_int|uint8|sizeof(int) on the upstream.
+|binary.sizeof_long|uint8|sizeof(long) on the upstream.
+|binary.sizeof_datum|uint8|Same as sizeof_int, but for the PostgreSQL Datum typedef.
+|binary.maxalign|uint8|Upstream PostgreSQL server’s MAXIMUM_ALIGNOF value - platform dependent, determined at build time.
+|binary.bigendian|bool|True iff the upstream is big-endian.
+|binary.float4_byval|bool|Upstream PostgreSQL’s float4_byval compile option.
+|binary.float8_byval|bool|Upstream PostgreSQL’s float8_byval compile option.
+|binary.integer_datetimes|bool|Whether TIME, TIMESTAMP and TIMESTAMP WITH TIME ZONE will be sent using integer or floating point representation.
+
+Usually this is the value of the upstream PostgreSQL’s integer_datetimes compile option.
+|===
+== Startup errors
+
+If the server rejects the client’s connection - due to non-overlapping protocol
+support, unrecognised parameter formats, unsupported required parameters like
+hooks, etc - then it will follow the startup reply message with a
++++<u>+++normal libpq protocol error message+++</u>+++. (Current versions send
+this before the startup message).
+
+== Arguments client supplies to output plugin
+
+The one opportunity for the downstream client to send information (other than replay feedback) to the upstream is at connect-time, as an array of arguments to the output plugin supplied to START LOGICAL REPLICATION.
+
+There is no back-and-forth, no handshake.
+
+As a result, the client mainly announces capabilities and makes requests of the output plugin. The output plugin will ERROR if required parameters are unset, or where incompatibilities that cannot be resolved are found. Otherwise the output plugin reports what it could and could not honour in the startup message it sends as the first message on the wire down to the client. The client chooses whether to continue replay or to disconnect and report an error to the user, then possibly reconnect with different options.
+
+=== Output plugin arguments
+
+The output plugin’s key/value arguments are specified in pairs, as key and value. They’re what’s passed to START_REPLICATION, etc.
+
+All parameters are passed in text form. They _should_ be limited to 7-bit ASCII, since the server’s text encoding is not known, but _may_ be normalized precomposed UTF-8. The types specified for parameters indicate what the output plugin should attempt to convert the text into. Clients should not send text values that are outside the range for that type.
+
+==== Capabilities
+
+Many values are capabilities flags for the client, indicating that it understands optional features like metadata caching, binary format transfers, etc. In general the output plugin _may_ disregard capabilities the client advertises as supported and act as if they are not supported. If a capability is advertised as unsupported or is not advertised the output plugin _must not_ enable the corresponding features.
+
+In other words, don’t send the client something it’s not expecting.
+
+==== Protocol versioning
+
+Two parameters max_proto_version and min_proto_version, which clients must always send, allow negotiation of the protocol version. The output plugin must ERROR if the client protocol support does not overlap its own protocol support range.
+
+The protocol version is only incremented when there are major breaking changes that all or most clients must be modified to accommodate. Most changes are done by adding new optional messages and/or by having clients advertise capabilities to opt in to features.
+
+Because these versions are expected to be incremented, to make it clear that the format of the startup parameters themselves haven’t changed, the first key/value pair _must_ be the parameter startup_params_format with value “1”.
+
+|===
+|*Key*|*Type*|*Value(s)*|*Notes*
+
+|startup_params_format|int8|1|The format version of this startup parameter set. Always the digit 1 (0x31), null terminated.
+|max_proto_version|int32|1|Newest version of the protocol supported by client. Output plugin must ERROR if supported version too old. *Required*, ERROR if missing.
+|min_proto_version|int32|1|Oldest version of the protocol supported by client. Output plugin must ERROR if supported version too old. *Required*, ERROR if missing.
+|===
+
+==== Client requirements and capabilities
+
+|===
+|*Key*|*Type*|*Default*|*Notes*
+
+|expected_encoding|string|null|The text encoding the downstream expects field values to be in. Applies to text, binary and internal representations of field values in native format. Has no effect on other protocol content. If specified, the upstream must honour it. For json protocol, must be unset or match `client_encoding`. (Current plugin versions ERROR if this is set for the native protocol and not equal to the upstream database's encoding).
+|forward_changesets|bool|false|Request that all transactions, not just those originating on the upstream, be forwarded. See “_Changeset forwarding_”.
+|want_coltypes|boolean|false|The client wants to receive data type information about columns.
+|===
+
+==== General client information
+
+These keys tell the output plugin about the client. They’re mainly for informational purposes. In particular, the versions must _not_ be used to determine compatibility for binary or send/recv format, as non-PostgreSQL clients will simply not send them at all but may still understand binary or send/recv format fields.
+
+|===
+|*Key*|*Type*|*Default*|*Notes*
+
+|pg_version_num|integer|null|PostgreSQL server_version_num of client, if it’s PostgreSQL. e.g. 090400
+|pg_version|string|null|PostgreSQL server_version of client, if it’s PostgreSQL.
+|===
+
+
+==== Parameters relating to exchange of binary values
+
+The downstream may specify to the upstream that it is capable of understanding binary (PostgreSQL internal binary datum format), and/or send/recv (PostgreSQL binary interchange) format data by setting the binary.want_binary_basetypes and/or binary.want_internal_basetypes options, or other yet-to-be-defined options.
+
+An upstream output plugin that does not support one or both formats _may_ ignore the downstream’s binary support and send text format, in which case it may ignore all binary. parameters. All downstreams _must_ support text format. An upstream output plugin _must not_ send binary or send/recv format unless the downstream has announced it can receive it. If both upstream and downstream support both formats an upstream should prefer binary format and fall back to send/recv, then to text, if compatibility requires.
+
+Internal and binary format selection should be done on a type-by-type basis. It is quite normal to send ‘text’ format for extension types while sending binary for built-in types.
+
+The downstream _must_ specify its compatibility requirements for internal and binary data if it requests either or both formats. The upstream _must_ honour these by falling back from binary to send/recv, and from send/recv to text, where the upstream and downstream are not compatible.
+
+An unspecified compatibility field _must_ presumed to be unsupported by the downstream so that older clients that don’t know about a change in a newer version don’t receive unexpected data. For example, in the unlikely event that PostgreSQL 99.8 switched to 128-bit DPD (Densely Packed Decimal) representations of NUMERIC instead of the current arbitrary-length BCD (Binary Coded Decimal) format, a new binary.dpd_numerics parameter would be added. Clients that didn’t know about the change wouldn’t know to set it, so the upstream would presume it unsupported and send text format NUMERIC to those clients. This also means that clients that support the new format wouldn’t be able to receive the old format in binary from older servers since they’d specify dpd_numerics = true in their compatibility parameters.
+
+At this time a downstream may specify compatibility with only one value for a given option; i.e. a downstream cannot say it supports both 4-byte and 8-byte sizeof(int).  Leaving it unspecified means the upstream must assume the downstream supports neither. (A future protocol extension may allow clients to specify alternative sets of supported formats).
+
+The `pg_version` option _must not_ be used to decide compatibility. Use `binary.basetypes_major_version` instead.
+
+|===
+|*Key name*|*Value type*|*Default*|*Description*
+
+|binary.want_binary_basetypes|boolean|false|True if the client accepts binary interchange (send/recv) format rows for PostgreSQL built-in base types.
+|binary.want_internal_basetypes|boolean|false|True if the client accepts PostgreSQL internal-format binary output for base PostgreSQL types not otherwise specified elsewhere.
+|binary.basetypes_major_version|uint16|null|The PostgreSQL major version (x.y) the downstream expects binary and send/recv format values to be in. Represented as an integer in XXYY format (no leading zero since it’s an integer), e.g. 9.5 is 905. This corresponds to PG_VERSION_NUM/100 in PostgreSQL.
+|binary.sizeof_int|uint8|+null+|sizeof(int) on the downstream.
+|binary.sizeof_long|uint8|null|sizeof(long) on the downstream.
+|binary.sizeof_datum|uint8|null|Same as sizeof_int, but for the PostgreSQL Datum typedef.
+|binary.maxalign|uint8|null|Downstream PostgreSQL server’s maxalign value - platform dependent, determined at build time.
+|binary.bigendian|bool|null|True iff the downstream is big-endian.
+|binary.float4_byval|bool|null|Downstream PostgreSQL’s float4_byval compile option.
+|binary.float8_byval|bool|null|Downstream PostgreSQL’s float8_byval compile option.
+|binary.integer_datetimes|bool|null|Downstream PostgreSQL’s integer_datetimes compile option.
+|===
+
+== Extensibility
+
+Because of the use of optional parameters in output plugin arguments, and the
+confirmation/response sent in the startup packet, a basic handshake is possible
+between upstream and downstream, allowing negotiation of capabilities.
+
+The output plugin must never send non-optional data or change its wire format
+without confirmation from the client that it can understand the new data. It
+may send optional data without negotiation.
+
+When extending the output plugin arguments, add-ons are expected to prefix all
+keys with the extension name, and should preferably use a single top level key
+with a json object value to carry their extension information. Additions to the
+startup message should follow the same pattern.
+
+Hooks and plugins can be used to add functionality specific to a client.
+
+== JSON protocol
+
+If `proto_format` is set to `json` then the output plugin will emit JSON
+instead of the custom binary protocol. JSON support is intended mainly for
+debugging and diagnostics.
+
+The JSON format supports all the same hooks.
diff --git a/contrib/pglogical_output/expected/basic_json.out b/contrib/pglogical_output/expected/basic_json.out
new file mode 100644
index 0000000..271189e
--- /dev/null
+++ b/contrib/pglogical_output/expected/basic_json.out
@@ -0,0 +1,139 @@
+\i sql/basic_setup.sql
+SET synchronous_commit = on;
+-- Schema setup
+CREATE TABLE demo (
+	seq serial primary key,
+	tx text,
+	ts timestamp,
+	jsb jsonb,
+	js json,
+	ba bytea
+);
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pglogical_output');
+ ?column? 
+----------
+ init
+(1 row)
+
+-- Queue up some work to decode with a variety of types
+INSERT INTO demo(tx) VALUES ('textval');
+INSERT INTO demo(ba) VALUES (BYTEA '\xDEADBEEF0001');
+INSERT INTO demo(ts, tx) VALUES (TIMESTAMP '2045-09-12 12:34:56.00', 'blah');
+INSERT INTO demo(js, jsb) VALUES ('{"key":"value"}', '{"key":"value"}');
+-- Rolled back txn
+BEGIN;
+DELETE FROM demo;
+INSERT INTO demo(tx) VALUES ('blahblah');
+ROLLBACK;
+-- Multi-statement transaction with subxacts
+BEGIN;
+SAVEPOINT sp1;
+INSERT INTO demo(tx) VALUES ('row1');
+RELEASE SAVEPOINT sp1;
+SAVEPOINT sp2;
+UPDATE demo SET tx = 'update-rollback' WHERE tx = 'row1';
+ROLLBACK TO SAVEPOINT sp2;
+SAVEPOINT sp3;
+INSERT INTO demo(tx) VALUES ('row2');
+INSERT INTO demo(tx) VALUES ('row3');
+RELEASE SAVEPOINT sp3;
+SAVEPOINT sp4;
+DELETE FROM demo WHERE tx = 'row2';
+RELEASE SAVEPOINT sp4;
+SAVEPOINT sp5;
+UPDATE demo SET tx = 'updated' WHERE tx = 'row1';
+COMMIT;
+-- txn with catalog changes
+BEGIN;
+CREATE TABLE cat_test(id integer);
+INSERT INTO cat_test(id) VALUES (42);
+COMMIT;
+-- Aborted subxact with catalog changes
+BEGIN;
+INSERT INTO demo(tx) VALUES ('1');
+SAVEPOINT sp1;
+ALTER TABLE demo DROP COLUMN tx;
+ROLLBACK TO SAVEPOINT sp1;
+INSERT INTO demo(tx) VALUES ('2');
+COMMIT;
+-- Simple decode with text-format tuples
+TRUNCATE TABLE json_decoding_output;
+INSERT INTO json_decoding_output(ch, rn)
+SELECT
+  data::jsonb,
+  row_number() OVER ()
+FROM pg_logical_slot_peek_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'proto_format', 'json',
+	'no_txinfo', 't');
+SELECT * FROM get_startup_params();
+               key                | value  
+----------------------------------+--------
+ binary.binary_basetypes          | "f"
+ binary.float4_byval              | "t"
+ binary.float8_byval              | "t"
+ binary.internal_basetypes        | "f"
+ binary.sizeof_datum              | "8"
+ binary.sizeof_int                | "4"
+ binary.sizeof_long               | "8"
+ coltypes                         | "f"
+ database_encoding                | "UTF8"
+ encoding                         | "UTF8"
+ forward_changeset_origins        | "f"
+ forward_changesets               | "f"
+ hooks.row_filter_enabled         | "f"
+ hooks.shutdown_hook_enabled      | "f"
+ hooks.startup_hook_enabled       | "f"
+ hooks.transaction_filter_enabled | "f"
+ max_proto_version                | "1"
+ min_proto_version                | "1"
+ no_txinfo                        | "t"
+(19 rows)
+
+SELECT * FROM get_queued_data();
+                                                                             data                                                                             
+--------------------------------------------------------------------------------------------------------------------------------------------------------------
+ {"action": "B", "has_catalog_changes": "f"}
+ {"action": "I", "newtuple": {"ba": null, "js": null, "ts": null, "tx": "textval", "jsb": null, "seq": 1}, "relation": ["public", "demo"]}
+ {"action": "C"}
+ {"action": "B", "has_catalog_changes": "f"}
+ {"action": "I", "newtuple": {"ba": "\\xdeadbeef0001", "js": null, "ts": null, "tx": null, "jsb": null, "seq": 2}, "relation": ["public", "demo"]}
+ {"action": "C"}
+ {"action": "B", "has_catalog_changes": "f"}
+ {"action": "I", "newtuple": {"ba": null, "js": null, "ts": "2045-09-12T12:34:56", "tx": "blah", "jsb": null, "seq": 3}, "relation": ["public", "demo"]}
+ {"action": "C"}
+ {"action": "B", "has_catalog_changes": "f"}
+ {"action": "I", "newtuple": {"ba": null, "js": {"key": "value"}, "ts": null, "tx": null, "jsb": {"key": "value"}, "seq": 4}, "relation": ["public", "demo"]}
+ {"action": "C"}
+ {"action": "B", "has_catalog_changes": "f"}
+ {"action": "I", "newtuple": {"ba": null, "js": null, "ts": null, "tx": "row1", "jsb": null, "seq": 6}, "relation": ["public", "demo"]}
+ {"action": "I", "newtuple": {"ba": null, "js": null, "ts": null, "tx": "row2", "jsb": null, "seq": 7}, "relation": ["public", "demo"]}
+ {"action": "I", "newtuple": {"ba": null, "js": null, "ts": null, "tx": "row3", "jsb": null, "seq": 8}, "relation": ["public", "demo"]}
+ {"action": "D", "oldtuple": {"ba": null, "js": null, "ts": null, "tx": null, "jsb": null, "seq": 7}, "relation": ["public", "demo"]}
+ {"action": "U", "newtuple": {"ba": null, "js": null, "ts": null, "tx": "updated", "jsb": null, "seq": 6}, "relation": ["public", "demo"]}
+ {"action": "C"}
+ {"action": "B", "has_catalog_changes": "t"}
+ {"action": "I", "newtuple": {"id": 42}, "relation": ["public", "cat_test"]}
+ {"action": "C"}
+ {"action": "B", "has_catalog_changes": "f"}
+ {"action": "I", "newtuple": {"ba": null, "js": null, "ts": null, "tx": "1", "jsb": null, "seq": 9}, "relation": ["public", "demo"]}
+ {"action": "I", "newtuple": {"ba": null, "js": null, "ts": null, "tx": "2", "jsb": null, "seq": 10}, "relation": ["public", "demo"]}
+ {"action": "C"}
+ {"action": "B", "has_catalog_changes": "t"}
+ {"action": "C"}
+(28 rows)
+
+TRUNCATE TABLE json_decoding_output;
+\i sql/basic_teardown.sql
+SELECT 'drop' FROM pg_drop_replication_slot('regression_slot');
+ ?column? 
+----------
+ drop
+(1 row)
+
+DROP TABLE demo;
+DROP TABLE cat_test;
diff --git a/contrib/pglogical_output/expected/basic_json_1.out b/contrib/pglogical_output/expected/basic_json_1.out
new file mode 100644
index 0000000..293a8e6
--- /dev/null
+++ b/contrib/pglogical_output/expected/basic_json_1.out
@@ -0,0 +1,108 @@
+\i sql/basic_setup.sql
+SET synchronous_commit = on;
+-- Schema setup
+CREATE TABLE demo (
+	seq serial primary key,
+	tx text,
+	ts timestamp,
+	jsb jsonb,
+	js json,
+	ba bytea
+);
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pglogical_output');
+ ?column? 
+----------
+ init
+(1 row)
+
+-- Queue up some work to decode with a variety of types
+INSERT INTO demo(tx) VALUES ('textval');
+INSERT INTO demo(ba) VALUES (BYTEA '\xDEADBEEF0001');
+INSERT INTO demo(ts, tx) VALUES (TIMESTAMP '2045-09-12 12:34:56.00', 'blah');
+INSERT INTO demo(js, jsb) VALUES ('{"key":"value"}', '{"key":"value"}');
+-- Rolled back txn
+BEGIN;
+DELETE FROM demo;
+INSERT INTO demo(tx) VALUES ('blahblah');
+ROLLBACK;
+-- Multi-statement transaction with subxacts
+BEGIN;
+SAVEPOINT sp1;
+INSERT INTO demo(tx) VALUES ('row1');
+RELEASE SAVEPOINT sp1;
+SAVEPOINT sp2;
+UPDATE demo SET tx = 'update-rollback' WHERE tx = 'row1';
+ROLLBACK TO SAVEPOINT sp2;
+SAVEPOINT sp3;
+INSERT INTO demo(tx) VALUES ('row2');
+INSERT INTO demo(tx) VALUES ('row3');
+RELEASE SAVEPOINT sp3;
+SAVEPOINT sp4;
+DELETE FROM demo WHERE tx = 'row2';
+RELEASE SAVEPOINT sp4;
+SAVEPOINT sp5;
+UPDATE demo SET tx = 'updated' WHERE tx = 'row1';
+COMMIT;
+-- txn with catalog changes
+BEGIN;
+CREATE TABLE cat_test(id integer);
+INSERT INTO cat_test(id) VALUES (42);
+COMMIT;
+-- Aborted subxact with catalog changes
+BEGIN;
+INSERT INTO demo(tx) VALUES ('1');
+SAVEPOINT sp1;
+ALTER TABLE demo DROP COLUMN tx;
+ROLLBACK TO SAVEPOINT sp1;
+INSERT INTO demo(tx) VALUES ('2');
+COMMIT;
+-- Simple decode with text-format tuples
+SELECT data::json
+FROM pg_logical_slot_peek_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'proto_format', 'json',
+	'no_txinfo', 't');
+                                                                                                                                                                                                                                                                                                                                                                                         data                                                                                                                                                                                                                                                                                                                                                                                         
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ {"action":"S", "params": {"max_proto_version":"1","min_proto_version":"1","coltypes":"f","pg_version_num":"90405","pg_version":"9.4.5","pg_catversion":"201409291","database_encoding":"UTF8","encoding":"UTF8","forward_changesets":"t","forward_changeset_origins":"f","binary.internal_basetypes":"f","binary.binary_basetypes":"f","binary.basetypes_major_version":"904","binary.sizeof_int":"4","binary.sizeof_long":"8","binary.sizeof_datum":"8","binary.maxalign":"8","binary.bigendian":"f","binary.float4_byval":"t","binary.float8_byval":"t","binary.integer_datetimes":"t","binary.binary_pg_version":"904","no_txinfo":"t","hooks.startup_hook_enabled":"f","hooks.shutdown_hook_enabled":"f","hooks.row_filter_enabled":"f","hooks.transaction_filter_enabled":"f"}}
+ {"action":"B", "has_catalog_changes":"f"}
+ {"action":"I","relation":["public","demo"],"newtuple":{"seq":1,"tx":"textval","ts":null,"jsb":null,"js":null,"ba":null}}
+ {"action":"C"}
+ {"action":"B", "has_catalog_changes":"f"}
+ {"action":"I","relation":["public","demo"],"newtuple":{"seq":2,"tx":null,"ts":null,"jsb":null,"js":null,"ba":"\\xdeadbeef0001"}}
+ {"action":"C"}
+ {"action":"B", "has_catalog_changes":"f"}
+ {"action":"I","relation":["public","demo"],"newtuple":{"seq":3,"tx":"blah","ts":"2045-09-12T12:34:56","jsb":null,"js":null,"ba":null}}
+ {"action":"C"}
+ {"action":"B", "has_catalog_changes":"f"}
+ {"action":"I","relation":["public","demo"],"newtuple":{"seq":4,"tx":null,"ts":null,"jsb":{"key": "value"},"js":{"key":"value"},"ba":null}}
+ {"action":"C"}
+ {"action":"B", "has_catalog_changes":"f"}
+ {"action":"I","relation":["public","demo"],"newtuple":{"seq":6,"tx":"row1","ts":null,"jsb":null,"js":null,"ba":null}}
+ {"action":"I","relation":["public","demo"],"newtuple":{"seq":7,"tx":"row2","ts":null,"jsb":null,"js":null,"ba":null}}
+ {"action":"I","relation":["public","demo"],"newtuple":{"seq":8,"tx":"row3","ts":null,"jsb":null,"js":null,"ba":null}}
+ {"action":"D","relation":["public","demo"],"oldtuple":{"seq":7,"tx":null,"ts":null,"jsb":null,"js":null,"ba":null}}
+ {"action":"U","relation":["public","demo"],"newtuple":{"seq":6,"tx":"updated","ts":null,"jsb":null,"js":null,"ba":null}}
+ {"action":"C"}
+ {"action":"B", "has_catalog_changes":"t"}
+ {"action":"I","relation":["public","cat_test"],"newtuple":{"id":42}}
+ {"action":"C"}
+ {"action":"B", "has_catalog_changes":"f"}
+ {"action":"I","relation":["public","demo"],"newtuple":{"seq":9,"tx":"1","ts":null,"jsb":null,"js":null,"ba":null}}
+ {"action":"I","relation":["public","demo"],"newtuple":{"seq":10,"tx":"2","ts":null,"jsb":null,"js":null,"ba":null}}
+ {"action":"C"}
+(27 rows)
+
+\i sql/basic_teardown.sql
+SELECT 'drop' FROM pg_drop_replication_slot('regression_slot');
+ ?column? 
+----------
+ drop
+(1 row)
+
+DROP TABLE demo;
+DROP TABLE cat_test;
diff --git a/contrib/pglogical_output/expected/basic_native.out b/contrib/pglogical_output/expected/basic_native.out
new file mode 100644
index 0000000..a7c88f3
--- /dev/null
+++ b/contrib/pglogical_output/expected/basic_native.out
@@ -0,0 +1,99 @@
+\i sql/basic_setup.sql
+SET synchronous_commit = on;
+-- Schema setup
+CREATE TABLE demo (
+	seq serial primary key,
+	tx text,
+	ts timestamp,
+	jsb jsonb,
+	js json,
+	ba bytea
+);
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pglogical_output');
+ ?column? 
+----------
+ init
+(1 row)
+
+-- Queue up some work to decode with a variety of types
+INSERT INTO demo(tx) VALUES ('textval');
+INSERT INTO demo(ba) VALUES (BYTEA '\xDEADBEEF0001');
+INSERT INTO demo(ts, tx) VALUES (TIMESTAMP '2045-09-12 12:34:56.00', 'blah');
+INSERT INTO demo(js, jsb) VALUES ('{"key":"value"}', '{"key":"value"}');
+-- Rolled back txn
+BEGIN;
+DELETE FROM demo;
+INSERT INTO demo(tx) VALUES ('blahblah');
+ROLLBACK;
+-- Multi-statement transaction with subxacts
+BEGIN;
+SAVEPOINT sp1;
+INSERT INTO demo(tx) VALUES ('row1');
+RELEASE SAVEPOINT sp1;
+SAVEPOINT sp2;
+UPDATE demo SET tx = 'update-rollback' WHERE tx = 'row1';
+ROLLBACK TO SAVEPOINT sp2;
+SAVEPOINT sp3;
+INSERT INTO demo(tx) VALUES ('row2');
+INSERT INTO demo(tx) VALUES ('row3');
+RELEASE SAVEPOINT sp3;
+SAVEPOINT sp4;
+DELETE FROM demo WHERE tx = 'row2';
+RELEASE SAVEPOINT sp4;
+SAVEPOINT sp5;
+UPDATE demo SET tx = 'updated' WHERE tx = 'row1';
+COMMIT;
+-- txn with catalog changes
+BEGIN;
+CREATE TABLE cat_test(id integer);
+INSERT INTO cat_test(id) VALUES (42);
+COMMIT;
+-- Aborted subxact with catalog changes
+BEGIN;
+INSERT INTO demo(tx) VALUES ('1');
+SAVEPOINT sp1;
+ALTER TABLE demo DROP COLUMN tx;
+ROLLBACK TO SAVEPOINT sp1;
+INSERT INTO demo(tx) VALUES ('2');
+COMMIT;
+-- Simple decode with text-format tuples
+--
+-- It's still the logical decoding binary protocol and as such it has
+-- embedded timestamps, and pglogical its self has embedded LSNs, xids,
+-- etc. So all we can really do is say "yup, we got the expected number
+-- of messages".
+SELECT count(data) FROM pg_logical_slot_peek_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1');
+ count 
+-------
+    39
+(1 row)
+
+-- ... and send/recv binary format
+-- The main difference visible is that the bytea fields aren't encoded
+SELECT count(data) FROM pg_logical_slot_peek_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'binary.want_binary_basetypes', '1',
+	'binary.basetypes_major_version', (current_setting('server_version_num')::integer / 100)::text);
+ count 
+-------
+    39
+(1 row)
+
+\i sql/basic_teardown.sql
+SELECT 'drop' FROM pg_drop_replication_slot('regression_slot');
+ ?column? 
+----------
+ drop
+(1 row)
+
+DROP TABLE demo;
+DROP TABLE cat_test;
diff --git a/contrib/pglogical_output/expected/cleanup.out b/contrib/pglogical_output/expected/cleanup.out
new file mode 100644
index 0000000..e7a02c8
--- /dev/null
+++ b/contrib/pglogical_output/expected/cleanup.out
@@ -0,0 +1,4 @@
+DROP TABLE excluded_startup_keys;
+DROP TABLE json_decoding_output;
+DROP FUNCTION get_queued_data();
+DROP FUNCTION get_startup_params();
diff --git a/contrib/pglogical_output/expected/encoding_json.out b/contrib/pglogical_output/expected/encoding_json.out
new file mode 100644
index 0000000..82c719a
--- /dev/null
+++ b/contrib/pglogical_output/expected/encoding_json.out
@@ -0,0 +1,59 @@
+SET synchronous_commit = on;
+-- This file doesn't share common setup with the native tests,
+-- since it's specific to how the text protocol handles encodings.
+CREATE TABLE enctest(blah text);
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pglogical_output');
+ ?column? 
+----------
+ init
+(1 row)
+
+SET client_encoding = 'UTF-8';
+INSERT INTO enctest(blah)
+VALUES
+('áàä'),('ﬂ'), ('½⅓'), ('カンジ');
+RESET client_encoding;
+SET client_encoding = 'LATIN-1';
+-- Will ERROR, explicit encoding request doesn't match client_encoding
+SELECT data
+FROM pg_logical_slot_peek_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'proto_format', 'json',
+	'no_txinfo', 't');
+ERROR:  expected_encoding must be unset or match client_encoding in text protocols
+CONTEXT:  slot "regression_slot", output plugin "pglogical_output", in the startup callback
+-- Will succeed since we don't request any encoding
+-- then ERROR because it can't turn the kanjii into latin-1
+SELECT data
+FROM pg_logical_slot_peek_changes('regression_slot',
+	NULL, NULL,
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'proto_format', 'json',
+	'no_txinfo', 't');
+ERROR:  character with byte sequence 0xef 0xac 0x82 in encoding "UTF8" has no equivalent in encoding "LATIN1"
+-- Will succeed since it matches the current encoding
+-- then ERROR because it can't turn the kanjii into latin-1
+SELECT data
+FROM pg_logical_slot_peek_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'LATIN-1',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'proto_format', 'json',
+	'no_txinfo', 't');
+ERROR:  character with byte sequence 0xef 0xac 0x82 in encoding "UTF8" has no equivalent in encoding "LATIN1"
+RESET client_encoding;
+SELECT 'drop' FROM pg_drop_replication_slot('regression_slot');
+ ?column? 
+----------
+ drop
+(1 row)
+
+DROP TABLE enctest;
diff --git a/contrib/pglogical_output/expected/hooks_json.out b/contrib/pglogical_output/expected/hooks_json.out
new file mode 100644
index 0000000..f7a0839
--- /dev/null
+++ b/contrib/pglogical_output/expected/hooks_json.out
@@ -0,0 +1,202 @@
+\i sql/hooks_setup.sql
+CREATE EXTENSION pglogical_output_plhooks;
+CREATE FUNCTION test_filter(relid regclass, action "char", nodeid text)
+returns bool stable language plpgsql AS $$
+BEGIN
+	IF nodeid <> 'foo' THEN
+	    RAISE EXCEPTION 'Expected nodeid <foo>, got <%>',nodeid;
+	END IF;
+	RETURN relid::regclass::text NOT LIKE '%_filter%';
+END
+$$;
+CREATE FUNCTION test_action_filter(relid regclass, action "char", nodeid text)
+returns bool stable language plpgsql AS $$
+BEGIN
+    RETURN action NOT IN ('U', 'D');
+END
+$$;
+CREATE FUNCTION wrong_signature_fn(relid regclass)
+returns bool stable language plpgsql as $$
+BEGIN
+END;
+$$;
+CREATE TABLE test_filter(id integer);
+CREATE TABLE test_nofilt(id integer);
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pglogical_output');
+ ?column? 
+----------
+ init
+(1 row)
+
+INSERT INTO test_filter(id) SELECT generate_series(1,10);
+INSERT INTO test_nofilt(id) SELECT generate_series(1,10);
+DELETE FROM test_filter WHERE id % 2 = 0;
+DELETE FROM test_nofilt WHERE id % 2 = 0;
+UPDATE test_filter SET id = id*100 WHERE id = 5;
+UPDATE test_nofilt SET id = id*100 WHERE id = 5;
+-- Test table filter
+TRUNCATE TABLE json_decoding_output;
+INSERT INTO json_decoding_output(ch, rn)
+SELECT
+  data::jsonb,
+  row_number() OVER ()
+FROM pg_logical_slot_peek_changes('regression_slot',
+ 	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'hooks.setup_function', 'public.pglo_plhooks_setup_fn',
+	'pglo_plhooks.row_filter_hook', 'public.test_filter',
+	'pglo_plhooks.client_hook_arg', 'foo',
+	'proto_format', 'json',
+	'no_txinfo', 't');
+SELECT * FROM get_startup_params();
+               key                | value  
+----------------------------------+--------
+ binary.binary_basetypes          | "f"
+ binary.float4_byval              | "t"
+ binary.float8_byval              | "t"
+ binary.internal_basetypes        | "f"
+ binary.sizeof_datum              | "8"
+ binary.sizeof_int                | "4"
+ binary.sizeof_long               | "8"
+ coltypes                         | "f"
+ database_encoding                | "UTF8"
+ encoding                         | "UTF8"
+ forward_changeset_origins        | "f"
+ forward_changesets               | "f"
+ hooks.row_filter_enabled         | "t"
+ hooks.shutdown_hook_enabled      | "t"
+ hooks.startup_hook_enabled       | "t"
+ hooks.transaction_filter_enabled | "t"
+ max_proto_version                | "1"
+ min_proto_version                | "1"
+ no_txinfo                        | "t"
+(19 rows)
+
+SELECT * FROM get_queued_data();
+                                      data                                       
+---------------------------------------------------------------------------------
+ {"action": "B", "has_catalog_changes": "f"}
+ {"action": "C"}
+ {"action": "B", "has_catalog_changes": "f"}
+ {"action": "I", "newtuple": {"id": 1}, "relation": ["public", "test_nofilt"]}
+ {"action": "I", "newtuple": {"id": 2}, "relation": ["public", "test_nofilt"]}
+ {"action": "I", "newtuple": {"id": 3}, "relation": ["public", "test_nofilt"]}
+ {"action": "I", "newtuple": {"id": 4}, "relation": ["public", "test_nofilt"]}
+ {"action": "I", "newtuple": {"id": 5}, "relation": ["public", "test_nofilt"]}
+ {"action": "I", "newtuple": {"id": 6}, "relation": ["public", "test_nofilt"]}
+ {"action": "I", "newtuple": {"id": 7}, "relation": ["public", "test_nofilt"]}
+ {"action": "I", "newtuple": {"id": 8}, "relation": ["public", "test_nofilt"]}
+ {"action": "I", "newtuple": {"id": 9}, "relation": ["public", "test_nofilt"]}
+ {"action": "I", "newtuple": {"id": 10}, "relation": ["public", "test_nofilt"]}
+ {"action": "C"}
+ {"action": "B", "has_catalog_changes": "f"}
+ {"action": "C"}
+ {"action": "B", "has_catalog_changes": "f"}
+ {"action": "C"}
+ {"action": "B", "has_catalog_changes": "f"}
+ {"action": "C"}
+ {"action": "B", "has_catalog_changes": "f"}
+ {"action": "U", "newtuple": {"id": 500}, "relation": ["public", "test_nofilt"]}
+ {"action": "C"}
+ {"action": "B", "has_catalog_changes": "t"}
+ {"action": "C"}
+(25 rows)
+
+-- test action filter
+TRUNCATE TABLE json_decoding_output;
+INSERT INTO json_decoding_output (ch, rn)
+SELECT
+  data::jsonb,
+  row_number() OVER ()
+FROM pg_logical_slot_peek_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'hooks.setup_function', 'public.pglo_plhooks_setup_fn',
+	'pglo_plhooks.row_filter_hook', 'public.test_action_filter',
+	'proto_format', 'json',
+	'no_txinfo', 't');
+SELECT * FROM get_startup_params();
+               key                | value  
+----------------------------------+--------
+ binary.binary_basetypes          | "f"
+ binary.float4_byval              | "t"
+ binary.float8_byval              | "t"
+ binary.internal_basetypes        | "f"
+ binary.sizeof_datum              | "8"
+ binary.sizeof_int                | "4"
+ binary.sizeof_long               | "8"
+ coltypes                         | "f"
+ database_encoding                | "UTF8"
+ encoding                         | "UTF8"
+ forward_changeset_origins        | "f"
+ forward_changesets               | "f"
+ hooks.row_filter_enabled         | "t"
+ hooks.shutdown_hook_enabled      | "t"
+ hooks.startup_hook_enabled       | "t"
+ hooks.transaction_filter_enabled | "t"
+ max_proto_version                | "1"
+ min_proto_version                | "1"
+ no_txinfo                        | "t"
+(19 rows)
+
+SELECT * FROM get_queued_data();
+                                      data                                      
+--------------------------------------------------------------------------------
+ {"action": "B", "has_catalog_changes": "f"}
+ {"action": "I", "newtuple": {"id": 1}, "relation": ["public", "test_filter"]}
+ {"action": "I", "newtuple": {"id": 2}, "relation": ["public", "test_filter"]}
+ {"action": "I", "newtuple": {"id": 3}, "relation": ["public", "test_filter"]}
+ {"action": "I", "newtuple": {"id": 4}, "relation": ["public", "test_filter"]}
+ {"action": "I", "newtuple": {"id": 5}, "relation": ["public", "test_filter"]}
+ {"action": "I", "newtuple": {"id": 6}, "relation": ["public", "test_filter"]}
+ {"action": "I", "newtuple": {"id": 7}, "relation": ["public", "test_filter"]}
+ {"action": "I", "newtuple": {"id": 8}, "relation": ["public", "test_filter"]}
+ {"action": "I", "newtuple": {"id": 9}, "relation": ["public", "test_filter"]}
+ {"action": "I", "newtuple": {"id": 10}, "relation": ["public", "test_filter"]}
+ {"action": "C"}
+ {"action": "B", "has_catalog_changes": "f"}
+ {"action": "I", "newtuple": {"id": 1}, "relation": ["public", "test_nofilt"]}
+ {"action": "I", "newtuple": {"id": 2}, "relation": ["public", "test_nofilt"]}
+ {"action": "I", "newtuple": {"id": 3}, "relation": ["public", "test_nofilt"]}
+ {"action": "I", "newtuple": {"id": 4}, "relation": ["public", "test_nofilt"]}
+ {"action": "I", "newtuple": {"id": 5}, "relation": ["public", "test_nofilt"]}
+ {"action": "I", "newtuple": {"id": 6}, "relation": ["public", "test_nofilt"]}
+ {"action": "I", "newtuple": {"id": 7}, "relation": ["public", "test_nofilt"]}
+ {"action": "I", "newtuple": {"id": 8}, "relation": ["public", "test_nofilt"]}
+ {"action": "I", "newtuple": {"id": 9}, "relation": ["public", "test_nofilt"]}
+ {"action": "I", "newtuple": {"id": 10}, "relation": ["public", "test_nofilt"]}
+ {"action": "C"}
+ {"action": "B", "has_catalog_changes": "f"}
+ {"action": "C"}
+ {"action": "B", "has_catalog_changes": "f"}
+ {"action": "C"}
+ {"action": "B", "has_catalog_changes": "f"}
+ {"action": "C"}
+ {"action": "B", "has_catalog_changes": "f"}
+ {"action": "C"}
+ {"action": "B", "has_catalog_changes": "t"}
+ {"action": "C"}
+ {"action": "B", "has_catalog_changes": "t"}
+ {"action": "C"}
+(36 rows)
+
+TRUNCATE TABLE json_decoding_output;
+\i sql/hooks_teardown.sql
+SELECT 'drop' FROM pg_drop_replication_slot('regression_slot');
+ ?column? 
+----------
+ drop
+(1 row)
+
+DROP TABLE test_filter;
+DROP TABLE test_nofilt;
+DROP FUNCTION test_filter(relid regclass, action "char", nodeid text);
+DROP FUNCTION test_action_filter(relid regclass, action "char", nodeid text);
+DROP FUNCTION wrong_signature_fn(relid regclass);
+DROP EXTENSION pglogical_output_plhooks;
diff --git a/contrib/pglogical_output/expected/hooks_json_1.out b/contrib/pglogical_output/expected/hooks_json_1.out
new file mode 100644
index 0000000..4f4a0c7
--- /dev/null
+++ b/contrib/pglogical_output/expected/hooks_json_1.out
@@ -0,0 +1,139 @@
+\i sql/hooks_setup.sql
+CREATE EXTENSION pglogical_output_plhooks;
+CREATE FUNCTION test_filter(relid regclass, action "char", nodeid text)
+returns bool stable language plpgsql AS $$
+BEGIN
+	IF nodeid <> 'foo' THEN
+	    RAISE EXCEPTION 'Expected nodeid <foo>, got <%>',nodeid;
+	END IF;
+	RETURN relid::regclass::text NOT LIKE '%_filter%';
+END
+$$;
+CREATE FUNCTION test_action_filter(relid regclass, action "char", nodeid text)
+returns bool stable language plpgsql AS $$
+BEGIN
+    RETURN action NOT IN ('U', 'D');
+END
+$$;
+CREATE FUNCTION wrong_signature_fn(relid regclass)
+returns bool stable language plpgsql as $$
+BEGIN
+END;
+$$;
+CREATE TABLE test_filter(id integer);
+CREATE TABLE test_nofilt(id integer);
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pglogical_output');
+ ?column? 
+----------
+ init
+(1 row)
+
+INSERT INTO test_filter(id) SELECT generate_series(1,10);
+INSERT INTO test_nofilt(id) SELECT generate_series(1,10);
+DELETE FROM test_filter WHERE id % 2 = 0;
+DELETE FROM test_nofilt WHERE id % 2 = 0;
+UPDATE test_filter SET id = id*100 WHERE id = 5;
+UPDATE test_nofilt SET id = id*100 WHERE id = 5;
+-- Test table filter
+SELECT data::json
+FROM pg_logical_slot_peek_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'hooks.setup_function', 'public.pglo_plhooks_setup_fn',
+	'pglo_plhooks.row_filter_hook', 'public.test_filter',
+	'pglo_plhooks.client_hook_arg', 'foo',
+	'proto_format', 'json',
+	'no_txinfo', 't');
+                                                                                                                                                                                                                                                                                                                                                                                         data                                                                                                                                                                                                                                                                                                                                                                                         
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ {"action":"S", "params": {"max_proto_version":"1","min_proto_version":"1","coltypes":"f","pg_version_num":"90405","pg_version":"9.4.5","pg_catversion":"201409291","database_encoding":"UTF8","encoding":"UTF8","forward_changesets":"t","forward_changeset_origins":"f","binary.internal_basetypes":"f","binary.binary_basetypes":"f","binary.basetypes_major_version":"904","binary.sizeof_int":"4","binary.sizeof_long":"8","binary.sizeof_datum":"8","binary.maxalign":"8","binary.bigendian":"f","binary.float4_byval":"t","binary.float8_byval":"t","binary.integer_datetimes":"t","binary.binary_pg_version":"904","no_txinfo":"t","hooks.startup_hook_enabled":"t","hooks.shutdown_hook_enabled":"t","hooks.row_filter_enabled":"t","hooks.transaction_filter_enabled":"t"}}
+ {"action":"B", "has_catalog_changes":"f"}
+ {"action":"C"}
+ {"action":"B", "has_catalog_changes":"f"}
+ {"action":"I","relation":["public","test_nofilt"],"newtuple":{"id":1}}
+ {"action":"I","relation":["public","test_nofilt"],"newtuple":{"id":2}}
+ {"action":"I","relation":["public","test_nofilt"],"newtuple":{"id":3}}
+ {"action":"I","relation":["public","test_nofilt"],"newtuple":{"id":4}}
+ {"action":"I","relation":["public","test_nofilt"],"newtuple":{"id":5}}
+ {"action":"I","relation":["public","test_nofilt"],"newtuple":{"id":6}}
+ {"action":"I","relation":["public","test_nofilt"],"newtuple":{"id":7}}
+ {"action":"I","relation":["public","test_nofilt"],"newtuple":{"id":8}}
+ {"action":"I","relation":["public","test_nofilt"],"newtuple":{"id":9}}
+ {"action":"I","relation":["public","test_nofilt"],"newtuple":{"id":10}}
+ {"action":"C"}
+ {"action":"B", "has_catalog_changes":"f"}
+ {"action":"C"}
+ {"action":"B", "has_catalog_changes":"f"}
+ {"action":"C"}
+ {"action":"B", "has_catalog_changes":"f"}
+ {"action":"C"}
+ {"action":"B", "has_catalog_changes":"f"}
+ {"action":"U","relation":["public","test_nofilt"],"newtuple":{"id":500}}
+ {"action":"C"}
+(24 rows)
+
+-- test action filter
+SELECT data::json
+FROM pg_logical_slot_peek_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'hooks.setup_function', 'public.pglo_plhooks_setup_fn',
+	'pglo_plhooks.row_filter_hook', 'public.test_action_filter',
+	'proto_format', 'json',
+	'no_txinfo', 't');
+                                                                                                                                                                                                                                                                                                                                                                                         data                                                                                                                                                                                                                                                                                                                                                                                         
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ {"action":"S", "params": {"max_proto_version":"1","min_proto_version":"1","coltypes":"f","pg_version_num":"90405","pg_version":"9.4.5","pg_catversion":"201409291","database_encoding":"UTF8","encoding":"UTF8","forward_changesets":"t","forward_changeset_origins":"f","binary.internal_basetypes":"f","binary.binary_basetypes":"f","binary.basetypes_major_version":"904","binary.sizeof_int":"4","binary.sizeof_long":"8","binary.sizeof_datum":"8","binary.maxalign":"8","binary.bigendian":"f","binary.float4_byval":"t","binary.float8_byval":"t","binary.integer_datetimes":"t","binary.binary_pg_version":"904","no_txinfo":"t","hooks.startup_hook_enabled":"t","hooks.shutdown_hook_enabled":"t","hooks.row_filter_enabled":"t","hooks.transaction_filter_enabled":"t"}}
+ {"action":"B", "has_catalog_changes":"f"}
+ {"action":"I","relation":["public","test_filter"],"newtuple":{"id":1}}
+ {"action":"I","relation":["public","test_filter"],"newtuple":{"id":2}}
+ {"action":"I","relation":["public","test_filter"],"newtuple":{"id":3}}
+ {"action":"I","relation":["public","test_filter"],"newtuple":{"id":4}}
+ {"action":"I","relation":["public","test_filter"],"newtuple":{"id":5}}
+ {"action":"I","relation":["public","test_filter"],"newtuple":{"id":6}}
+ {"action":"I","relation":["public","test_filter"],"newtuple":{"id":7}}
+ {"action":"I","relation":["public","test_filter"],"newtuple":{"id":8}}
+ {"action":"I","relation":["public","test_filter"],"newtuple":{"id":9}}
+ {"action":"I","relation":["public","test_filter"],"newtuple":{"id":10}}
+ {"action":"C"}
+ {"action":"B", "has_catalog_changes":"f"}
+ {"action":"I","relation":["public","test_nofilt"],"newtuple":{"id":1}}
+ {"action":"I","relation":["public","test_nofilt"],"newtuple":{"id":2}}
+ {"action":"I","relation":["public","test_nofilt"],"newtuple":{"id":3}}
+ {"action":"I","relation":["public","test_nofilt"],"newtuple":{"id":4}}
+ {"action":"I","relation":["public","test_nofilt"],"newtuple":{"id":5}}
+ {"action":"I","relation":["public","test_nofilt"],"newtuple":{"id":6}}
+ {"action":"I","relation":["public","test_nofilt"],"newtuple":{"id":7}}
+ {"action":"I","relation":["public","test_nofilt"],"newtuple":{"id":8}}
+ {"action":"I","relation":["public","test_nofilt"],"newtuple":{"id":9}}
+ {"action":"I","relation":["public","test_nofilt"],"newtuple":{"id":10}}
+ {"action":"C"}
+ {"action":"B", "has_catalog_changes":"f"}
+ {"action":"C"}
+ {"action":"B", "has_catalog_changes":"f"}
+ {"action":"C"}
+ {"action":"B", "has_catalog_changes":"f"}
+ {"action":"C"}
+ {"action":"B", "has_catalog_changes":"f"}
+ {"action":"C"}
+(33 rows)
+
+\i sql/hooks_teardown.sql
+SELECT 'drop' FROM pg_drop_replication_slot('regression_slot');
+ ?column? 
+----------
+ drop
+(1 row)
+
+DROP TABLE test_filter;
+DROP TABLE test_nofilt;
+DROP FUNCTION test_filter(relid regclass, action "char", nodeid text);
+DROP FUNCTION test_action_filter(relid regclass, action "char", nodeid text);
+DROP FUNCTION wrong_signature_fn(relid regclass);
+DROP EXTENSION pglogical_output_plhooks;
diff --git a/contrib/pglogical_output/expected/hooks_native.out b/contrib/pglogical_output/expected/hooks_native.out
new file mode 100644
index 0000000..4a547cb
--- /dev/null
+++ b/contrib/pglogical_output/expected/hooks_native.out
@@ -0,0 +1,104 @@
+\i sql/hooks_setup.sql
+CREATE EXTENSION pglogical_output_plhooks;
+CREATE FUNCTION test_filter(relid regclass, action "char", nodeid text)
+returns bool stable language plpgsql AS $$
+BEGIN
+	IF nodeid <> 'foo' THEN
+	    RAISE EXCEPTION 'Expected nodeid <foo>, got <%>',nodeid;
+	END IF;
+	RETURN relid::regclass::text NOT LIKE '%_filter%';
+END
+$$;
+CREATE FUNCTION test_action_filter(relid regclass, action "char", nodeid text)
+returns bool stable language plpgsql AS $$
+BEGIN
+    RETURN action NOT IN ('U', 'D');
+END
+$$;
+CREATE FUNCTION wrong_signature_fn(relid regclass)
+returns bool stable language plpgsql as $$
+BEGIN
+END;
+$$;
+CREATE TABLE test_filter(id integer);
+CREATE TABLE test_nofilt(id integer);
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pglogical_output');
+ ?column? 
+----------
+ init
+(1 row)
+
+INSERT INTO test_filter(id) SELECT generate_series(1,10);
+INSERT INTO test_nofilt(id) SELECT generate_series(1,10);
+DELETE FROM test_filter WHERE id % 2 = 0;
+DELETE FROM test_nofilt WHERE id % 2 = 0;
+UPDATE test_filter SET id = id*100 WHERE id = 5;
+UPDATE test_nofilt SET id = id*100 WHERE id = 5;
+-- Regular hook setup
+SELECT count(data) FROM pg_logical_slot_peek_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'hooks.setup_function', 'public.pglo_plhooks_setup_fn',
+	'pglo_plhooks.row_filter_hook', 'public.test_filter',
+	'pglo_plhooks.client_hook_arg', 'foo'
+	);
+ count 
+-------
+    40
+(1 row)
+
+-- Test action filter
+SELECT count(data) FROM pg_logical_slot_peek_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'hooks.setup_function', 'public.pglo_plhooks_setup_fn',
+	'pglo_plhooks.row_filter_hook', 'public.test_action_filter'
+	);
+ count 
+-------
+    53
+(1 row)
+
+-- Invalid row fiter hook function
+SELECT count(data) FROM pg_logical_slot_peek_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'hooks.setup_function', 'public.pglo_plhooks_setup_fn',
+	'pglo_plhooks.row_filter_hook', 'public.nosuchfunction'
+	);
+ERROR:  function public.nosuchfunction(regclass, "char", text) does not exist
+CONTEXT:  slot "regression_slot", output plugin "pglogical_output", in the startup callback
+-- Hook filter functoin with wrong signature
+SELECT count(data) FROM pg_logical_slot_peek_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'hooks.setup_function', 'public.pglo_plhooks_setup_fn',
+	'pglo_plhooks.row_filter_hook', 'public.wrong_signature_fn'
+	);
+ERROR:  function public.wrong_signature_fn(regclass, "char", text) does not exist
+CONTEXT:  slot "regression_slot", output plugin "pglogical_output", in the startup callback
+\i sql/hooks_teardown.sql
+SELECT 'drop' FROM pg_drop_replication_slot('regression_slot');
+ ?column? 
+----------
+ drop
+(1 row)
+
+DROP TABLE test_filter;
+DROP TABLE test_nofilt;
+DROP FUNCTION test_filter(relid regclass, action "char", nodeid text);
+DROP FUNCTION test_action_filter(relid regclass, action "char", nodeid text);
+DROP FUNCTION wrong_signature_fn(relid regclass);
+DROP EXTENSION pglogical_output_plhooks;
diff --git a/contrib/pglogical_output/expected/params_native.out b/contrib/pglogical_output/expected/params_native.out
new file mode 100644
index 0000000..9475035
--- /dev/null
+++ b/contrib/pglogical_output/expected/params_native.out
@@ -0,0 +1,118 @@
+SET synchronous_commit = on;
+-- no need to CREATE EXTENSION as we intentionally don't have any catalog presence
+-- Instead, just create a slot.
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pglogical_output');
+ ?column? 
+----------
+ init
+(1 row)
+
+-- Minimal invocation with no data
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1');
+ data 
+------
+(0 rows)
+
+--
+-- Various invalid parameter combos:
+--
+-- Text mode is not supported for native protocol
+SELECT data FROM pg_logical_slot_get_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1');
+ERROR:  logical decoding output plugin "pglogical_output" produces binary output, but function "pg_logical_slot_get_changes(name,pg_lsn,integer,text[])" expects textual data
+-- error, only supports proto v1
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '2',
+	'max_proto_version', '1',
+	'startup_params_format', '1');
+ERROR:  client sent min_proto_version=2 but we only support protocol 1 or lower
+CONTEXT:  slot "regression_slot", output plugin "pglogical_output", in the startup callback
+-- error, only supports proto v1
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '2',
+	'max_proto_version', '2',
+	'startup_params_format', '1');
+ERROR:  client sent min_proto_version=2 but we only support protocol 1 or lower
+CONTEXT:  slot "regression_slot", output plugin "pglogical_output", in the startup callback
+-- error, unrecognised startup params format
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '2');
+ERROR:  client sent startup parameters in format 2 but we only support format 1
+CONTEXT:  slot "regression_slot", output plugin "pglogical_output", in the startup callback
+-- Should be OK and result in proto version 1 selection, though we won't
+-- see that here.
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '2',
+	'startup_params_format', '1');
+ data 
+------
+(0 rows)
+
+-- no such encoding / encoding mismatch
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'bork',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1');
+ERROR:  unrecognised encoding name bork passed to expected_encoding
+CONTEXT:  slot "regression_slot", output plugin "pglogical_output", in the startup callback
+-- Different spellings of encodings are OK too
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF-8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1');
+ data 
+------
+(0 rows)
+
+-- bogus param format
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'proto_format', 'invalid');
+ERROR:  client requested protocol invalid but only "json" or "native" are supported
+CONTEXT:  slot "regression_slot", output plugin "pglogical_output", in the startup callback
+-- native params format explicitly
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'proto_format', 'native');
+ data 
+------
+(0 rows)
+
+SELECT 'drop' FROM pg_drop_replication_slot('regression_slot');
+ ?column? 
+----------
+ drop
+(1 row)
+
diff --git a/contrib/pglogical_output/expected/params_native_1.out b/contrib/pglogical_output/expected/params_native_1.out
new file mode 100644
index 0000000..e8d2745
--- /dev/null
+++ b/contrib/pglogical_output/expected/params_native_1.out
@@ -0,0 +1,118 @@
+SET synchronous_commit = on;
+-- no need to CREATE EXTENSION as we intentionally don't have any catalog presence
+-- Instead, just create a slot.
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pglogical_output');
+ ?column? 
+----------
+ init
+(1 row)
+
+-- Minimal invocation with no data
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1');
+ data 
+------
+(0 rows)
+
+--
+-- Various invalid parameter combos:
+--
+-- Text mode is not supported for native protocol
+SELECT data FROM pg_logical_slot_get_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1');
+ERROR:  logical decoding output plugin "pglogical_output" produces binary output, but "pg_logical_slot_get_changes(name,pg_lsn,integer,text[])" expects textual data
+-- error, only supports proto v1
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '2',
+	'max_proto_version', '1',
+	'startup_params_format', '1');
+ERROR:  client sent min_proto_version=2 but we only support protocol 1 or lower
+CONTEXT:  slot "regression_slot", output plugin "pglogical_output", in the startup callback
+-- error, only supports proto v1
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '2',
+	'max_proto_version', '2',
+	'startup_params_format', '1');
+ERROR:  client sent min_proto_version=2 but we only support protocol 1 or lower
+CONTEXT:  slot "regression_slot", output plugin "pglogical_output", in the startup callback
+-- error, unrecognised startup params format
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '2');
+ERROR:  client sent startup parameters in format 2 but we only support format 1
+CONTEXT:  slot "regression_slot", output plugin "pglogical_output", in the startup callback
+-- Should be OK and result in proto version 1 selection, though we won't
+-- see that here.
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '2',
+	'startup_params_format', '1');
+ data 
+------
+(0 rows)
+
+-- no such encoding / encoding mismatch
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'bork',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1');
+ERROR:  unrecognised encoding name bork passed to expected_encoding
+CONTEXT:  slot "regression_slot", output plugin "pglogical_output", in the startup callback
+-- Different spellings of encodings are OK too
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF-8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1');
+ data 
+------
+(0 rows)
+
+-- bogus param format
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'proto_format', 'invalid');
+ERROR:  client requested protocol invalid but only "json" or "native" are supported
+CONTEXT:  slot "regression_slot", output plugin "pglogical_output", in the startup callback
+-- native params format explicitly
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'proto_format', 'native');
+ data 
+------
+(0 rows)
+
+SELECT 'drop' FROM pg_drop_replication_slot('regression_slot');
+ ?column? 
+----------
+ drop
+(1 row)
+
diff --git a/contrib/pglogical_output/expected/prep.out b/contrib/pglogical_output/expected/prep.out
new file mode 100644
index 0000000..6501951
--- /dev/null
+++ b/contrib/pglogical_output/expected/prep.out
@@ -0,0 +1,26 @@
+CREATE TABLE excluded_startup_keys (key_name text primary key);
+INSERT INTO excluded_startup_keys
+VALUES
+('pg_version_num'),('pg_version'),('pg_catversion'),('binary.basetypes_major_version'),('binary.integer_datetimes'),('binary.bigendian'),('binary.maxalign'),('binary.binary_pg_version'),('sizeof_int'),('sizeof_long'),('sizeof_datum');
+CREATE UNLOGGED TABLE json_decoding_output(ch jsonb, rn integer);
+CREATE OR REPLACE FUNCTION get_startup_params()
+RETURNS TABLE ("key" text, "value" jsonb)
+LANGUAGE sql
+AS $$
+SELECT key, value
+FROM json_decoding_output
+CROSS JOIN LATERAL jsonb_each(ch -> 'params')
+WHERE rn = 1
+  AND key NOT IN (SELECT * FROM excluded_startup_keys)
+  AND ch ->> 'action' = 'S'
+ORDER BY key;
+$$;
+CREATE OR REPLACE FUNCTION get_queued_data()
+RETURNS TABLE (data jsonb)
+LANGUAGE sql
+AS $$
+SELECT ch
+FROM json_decoding_output
+WHERE rn > 1
+ORDER BY rn ASC;
+$$;
diff --git a/contrib/pglogical_output/pglogical_config.c b/contrib/pglogical_output/pglogical_config.c
new file mode 100644
index 0000000..cc22700
--- /dev/null
+++ b/contrib/pglogical_output/pglogical_config.c
@@ -0,0 +1,499 @@
+/*-------------------------------------------------------------------------
+ *
+ * pglogical_config.c
+ *		  Logical Replication output plugin
+ *
+ * Copyright (c) 2012-2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  pglogical_config.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "pglogical_config.h"
+#include "pglogical_output.h"
+
+#include "catalog/catversion.h"
+#include "catalog/namespace.h"
+
+#include "mb/pg_wchar.h"
+
+#include "nodes/makefuncs.h"
+
+#include "utils/builtins.h"
+#include "utils/int8.h"
+#include "utils/inval.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/rel.h"
+#include "utils/relcache.h"
+#include "utils/syscache.h"
+#include "utils/typcache.h"
+
+typedef enum PGLogicalOutputParamType
+{
+	OUTPUT_PARAM_TYPE_BOOL,
+	OUTPUT_PARAM_TYPE_UINT32,
+	OUTPUT_PARAM_TYPE_STRING,
+	OUTPUT_PARAM_TYPE_QUALIFIED_NAME
+} PGLogicalOutputParamType;
+
+/* param parsing */
+static Datum get_param_value(DefElem *elem, bool null_ok,
+		PGLogicalOutputParamType type);
+
+static Datum get_param(List *options, const char *name, bool missing_ok,
+					   bool null_ok, PGLogicalOutputParamType type,
+					   bool *found);
+static bool parse_param_bool(DefElem *elem);
+static uint32 parse_param_uint32(DefElem *elem);
+
+static void
+process_parameters_v1(List *options, PGLogicalOutputData *data);
+
+enum {
+	PARAM_UNRECOGNISED,
+	PARAM_MAX_PROTOCOL_VERSION,
+	PARAM_MIN_PROTOCOL_VERSION,
+	PARAM_PROTOCOL_FORMAT,
+	PARAM_EXPECTED_ENCODING,
+	PARAM_BINARY_BIGENDIAN,
+	PARAM_BINARY_SIZEOF_DATUM,
+	PARAM_BINARY_SIZEOF_INT,
+	PARAM_BINARY_SIZEOF_LONG,
+	PARAM_BINARY_FLOAT4BYVAL,
+	PARAM_BINARY_FLOAT8BYVAL,
+	PARAM_BINARY_INTEGER_DATETIMES,
+	PARAM_BINARY_WANT_INTERNAL_BASETYPES,
+	PARAM_BINARY_WANT_BINARY_BASETYPES,
+	PARAM_BINARY_BASETYPES_MAJOR_VERSION,
+	PARAM_PG_VERSION,
+	PARAM_FORWARD_CHANGESETS,
+	PARAM_HOOKS_SETUP_FUNCTION,
+	PARAM_NO_TXINFO
+} OutputPluginParamKey;
+
+typedef struct {
+	const char * const paramname;
+	int paramkey;
+} OutputPluginParam;
+
+/* Oh, if only C had switch on strings */
+static OutputPluginParam param_lookup[] = {
+	{"max_proto_version", PARAM_MAX_PROTOCOL_VERSION},
+	{"min_proto_version", PARAM_MIN_PROTOCOL_VERSION},
+	{"proto_format", PARAM_PROTOCOL_FORMAT},
+	{"expected_encoding", PARAM_EXPECTED_ENCODING},
+	{"binary.bigendian", PARAM_BINARY_BIGENDIAN},
+	{"binary.sizeof_datum", PARAM_BINARY_SIZEOF_DATUM},
+	{"binary.sizeof_int", PARAM_BINARY_SIZEOF_INT},
+	{"binary.sizeof_long", PARAM_BINARY_SIZEOF_LONG},
+	{"binary.float4_byval", PARAM_BINARY_FLOAT4BYVAL},
+	{"binary.float8_byval", PARAM_BINARY_FLOAT8BYVAL},
+	{"binary.integer_datetimes", PARAM_BINARY_INTEGER_DATETIMES},
+	{"binary.want_internal_basetypes", PARAM_BINARY_WANT_INTERNAL_BASETYPES},
+	{"binary.want_binary_basetypes", PARAM_BINARY_WANT_BINARY_BASETYPES},
+	{"binary.basetypes_major_version", PARAM_BINARY_BASETYPES_MAJOR_VERSION},
+	{"pg_version", PARAM_PG_VERSION},
+	{"forward_changesets", PARAM_FORWARD_CHANGESETS},
+	{"hooks.setup_function", PARAM_HOOKS_SETUP_FUNCTION},
+	{"no_txinfo", PARAM_NO_TXINFO},
+	{NULL, PARAM_UNRECOGNISED}
+};
+
+/*
+ * Look up a param name to find the enum value for the
+ * param, or PARAM_UNRECOGNISED if not found.
+ */
+static int
+get_param_key(const char * const param_name)
+{
+	OutputPluginParam *param = &param_lookup[0];
+
+	do {
+		if (strcmp(param->paramname, param_name) == 0)
+			return param->paramkey;
+		param++;
+	} while (param->paramname != NULL);
+
+	return PARAM_UNRECOGNISED;
+}
+
+
+void
+process_parameters_v1(List *options, PGLogicalOutputData *data)
+{
+	Datum		val;
+	bool    	found;
+	ListCell	*lc;
+
+	/*
+	 * max_proto_version and min_proto_version are specified
+	 * as required, and must be parsed before anything else.
+	 *
+	 * TODO: We should still parse them as optional and
+	 * delay the ERROR until after the startup reply.
+	 */
+	val = get_param(options, "max_proto_version", false, false,
+					OUTPUT_PARAM_TYPE_UINT32, &found);
+	data->client_max_proto_version = DatumGetUInt32(val);
+
+	val = get_param(options, "min_proto_version", false, false,
+					OUTPUT_PARAM_TYPE_UINT32, &found);
+	data->client_min_proto_version = DatumGetUInt32(val);
+
+	/* Examine all the other params in the v1 message. */
+	foreach(lc, options)
+	{
+		DefElem    *elem = lfirst(lc);
+
+		Assert(elem->arg == NULL || IsA(elem->arg, String));
+
+		/* Check each param, whether or not we recognise it */
+		switch(get_param_key(elem->defname))
+		{
+			val = get_param_value(elem, false, OUTPUT_PARAM_TYPE_UINT32);
+
+			case PARAM_BINARY_BIGENDIAN:
+				val = get_param_value(elem, false, OUTPUT_PARAM_TYPE_BOOL);
+				data->client_binary_bigendian_set = true;
+				data->client_binary_bigendian = DatumGetBool(val);
+				break;
+
+			case PARAM_BINARY_SIZEOF_DATUM:
+				val = get_param_value(elem, false, OUTPUT_PARAM_TYPE_UINT32);
+				data->client_binary_sizeofdatum = DatumGetUInt32(val);
+				break;
+
+			case PARAM_BINARY_SIZEOF_INT:
+				val = get_param_value(elem, false, OUTPUT_PARAM_TYPE_UINT32);
+				data->client_binary_sizeofint = DatumGetUInt32(val);
+				break;
+
+			case PARAM_BINARY_SIZEOF_LONG:
+				val = get_param_value(elem, false, OUTPUT_PARAM_TYPE_UINT32);
+				data->client_binary_sizeoflong = DatumGetUInt32(val);
+				break;
+
+			case PARAM_BINARY_FLOAT4BYVAL:
+				val = get_param_value(elem, false, OUTPUT_PARAM_TYPE_BOOL);
+				data->client_binary_float4byval_set = true;
+				data->client_binary_float4byval = DatumGetBool(val);
+				break;
+
+			case PARAM_BINARY_FLOAT8BYVAL:
+				val = get_param_value(elem, false, OUTPUT_PARAM_TYPE_BOOL);
+				data->client_binary_float4byval_set = true;
+				data->client_binary_float4byval = DatumGetBool(val);
+				break;
+
+			case PARAM_BINARY_INTEGER_DATETIMES:
+				val = get_param_value(elem, false, OUTPUT_PARAM_TYPE_BOOL);
+				data->client_binary_intdatetimes_set = true;
+				data->client_binary_intdatetimes = DatumGetBool(val);
+				break;
+
+			case PARAM_PROTOCOL_FORMAT:
+				val = get_param_value(elem, false, OUTPUT_PARAM_TYPE_STRING);
+				data->client_protocol_format = DatumGetCString(val);
+				break;
+
+			case PARAM_EXPECTED_ENCODING:
+				val = get_param_value(elem, false, OUTPUT_PARAM_TYPE_STRING);
+				data->client_expected_encoding = DatumGetCString(val);
+				break;
+
+			case PARAM_PG_VERSION:
+				val = get_param_value(elem, false, OUTPUT_PARAM_TYPE_UINT32);
+				data->client_pg_version = DatumGetUInt32(val);
+				break;
+
+			case PARAM_FORWARD_CHANGESETS:
+				/*
+				 * Check to see if the client asked for changeset forwarding
+				 *
+				 * Note that we cannot support this on 9.4. We'll tell the client
+				 * in the startup reply message.
+				 */
+				val = get_param_value(elem, false, OUTPUT_PARAM_TYPE_BOOL);
+				data->client_forward_changesets_set = true;
+				data->client_forward_changesets = DatumGetBool(val);
+				break;
+
+			case PARAM_BINARY_WANT_INTERNAL_BASETYPES:
+				/* check if we want to use internal data representation */
+				val = get_param_value(elem, false, OUTPUT_PARAM_TYPE_BOOL);
+				data->client_want_internal_basetypes_set = true;
+				data->client_want_internal_basetypes = DatumGetBool(val);
+				break;
+
+			case PARAM_BINARY_WANT_BINARY_BASETYPES:
+				/* check if we want to use binary data representation */
+				val = get_param_value(elem, false, OUTPUT_PARAM_TYPE_BOOL);
+				data->client_want_binary_basetypes_set = true;
+				data->client_want_binary_basetypes = DatumGetBool(val);
+				break;
+
+			case PARAM_BINARY_BASETYPES_MAJOR_VERSION:
+				val = get_param_value(elem, false, OUTPUT_PARAM_TYPE_UINT32);
+				data->client_binary_basetypes_major_version = DatumGetUInt32(val);
+				break;
+
+			case PARAM_HOOKS_SETUP_FUNCTION:
+				val = get_param_value(elem, false, OUTPUT_PARAM_TYPE_QUALIFIED_NAME);
+				data->hooks_setup_funcname = (List*) PointerGetDatum(val);
+				break;
+
+			case PARAM_NO_TXINFO:
+				val = get_param_value(elem, false, OUTPUT_PARAM_TYPE_BOOL);
+				data->client_no_txinfo = DatumGetBool(val);
+				break;
+
+			case PARAM_UNRECOGNISED:
+				ereport(DEBUG1,
+						(errmsg("Unrecognised pglogical parameter %s ignored", elem->defname)));
+				break;
+		}
+	}
+}
+
+/*
+ * Read parameters sent by client at startup and store recognised
+ * ones in the parameters PGLogicalOutputData.
+ *
+ * The PGLogicalOutputData must have all client-surprised parameter fields
+ * zeroed, such as by memset or palloc0, since values not supplied
+ * by the client are not set.
+ */
+int
+process_parameters(List *options, PGLogicalOutputData *data)
+{
+	Datum	val;
+	bool    found;
+	int		params_format;
+
+	val = get_param(options, "startup_params_format", false, false,
+					OUTPUT_PARAM_TYPE_UINT32, &found);
+
+	params_format = DatumGetUInt32(val);
+
+	if (params_format == 1)
+	{
+		process_parameters_v1(options, data);
+	}
+
+	return params_format;
+}
+
+static Datum
+get_param_value(DefElem *elem, bool null_ok, PGLogicalOutputParamType type)
+{
+	/* Check for NULL value */
+	if (elem->arg == NULL || strVal(elem->arg) == NULL)
+	{
+		if (null_ok)
+			return (Datum) 0;
+		else
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("parameter \"%s\" cannot be NULL", elem->defname)));
+	}
+
+	switch (type)
+	{
+		case OUTPUT_PARAM_TYPE_UINT32:
+			return UInt32GetDatum(parse_param_uint32(elem));
+		case OUTPUT_PARAM_TYPE_BOOL:
+			return BoolGetDatum(parse_param_bool(elem));
+		case OUTPUT_PARAM_TYPE_STRING:
+			return PointerGetDatum(pstrdup(strVal(elem->arg)));
+		case OUTPUT_PARAM_TYPE_QUALIFIED_NAME:
+			return PointerGetDatum(textToQualifiedNameList(cstring_to_text(pstrdup(strVal(elem->arg)))));
+		default:
+			elog(ERROR, "unknown parameter type %d", type);
+	}
+}
+
+/*
+ * Param parsing
+ *
+ * This is not exactly fast but since it's only called on replication start
+ * we'll leave it for now.
+ */
+static Datum
+get_param(List *options, const char *name, bool missing_ok, bool null_ok,
+		  PGLogicalOutputParamType type, bool *found)
+{
+	ListCell	   *option;
+
+	*found = false;
+
+	foreach(option, options)
+	{
+		DefElem    *elem = lfirst(option);
+
+		Assert(elem->arg == NULL || IsA(elem->arg, String));
+
+		/* Search until matching parameter found */
+		if (pg_strcasecmp(name, elem->defname))
+			continue;
+
+		*found = true;
+
+		return get_param_value(elem, null_ok, type);
+	}
+
+	if (!missing_ok)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("missing required parameter \"%s\"", name)));
+
+	return (Datum) 0;
+}
+
+static bool
+parse_param_bool(DefElem *elem)
+{
+	bool		res;
+
+	if (!parse_bool(strVal(elem->arg), &res))
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("could not parse boolean value \"%s\" for parameter \"%s\"",
+						strVal(elem->arg), elem->defname)));
+
+	return res;
+}
+
+static uint32
+parse_param_uint32(DefElem *elem)
+{
+	int64		res;
+
+	if (!scanint8(strVal(elem->arg), true, &res))
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("could not parse integer value \"%s\" for parameter \"%s\"",
+						strVal(elem->arg), elem->defname)));
+
+	if (res > PG_UINT32_MAX || res < 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("value \"%s\" out of range for parameter \"%s\"",
+						strVal(elem->arg), elem->defname)));
+
+	return (uint32) res;
+}
+
+static List*
+add_startup_msg_s(List *l, char *key, char *val)
+{
+	return lappend(l, makeDefElem(key, (Node*)makeString(val)));
+}
+
+static List*
+add_startup_msg_i(List *l, char *key, int val)
+{
+	return lappend(l, makeDefElem(key, (Node*)makeString(psprintf("%d", val))));
+}
+
+static List*
+add_startup_msg_b(List *l, char *key, bool val)
+{
+	return lappend(l, makeDefElem(key, (Node*)makeString(val ? "t" : "f")));
+}
+
+/*
+ * This builds the protocol startup message, which is always the first
+ * message on the wire after the client sends START_REPLICATION.
+ *
+ * It confirms to the client that we could apply requested options, and
+ * tells the client our capabilities.
+ *
+ * Any additional parameters provided by the startup hook are also output
+ * now.
+ *
+ * The output param 'msg' is a null-terminated char* palloc'd in the current
+ * memory context and the length 'len' of that string that is valid. The caller
+ * should pfree the result after use.
+ *
+ * This is a bit less efficient than direct pq_sendblah calls, but
+ * separates config handling from the protocol implementation, and
+ * it's not like startup msg performance matters much.
+ */
+List *
+prepare_startup_message(PGLogicalOutputData *data)
+{
+	ListCell *lc;
+	List *l = NIL;
+
+	l = add_startup_msg_s(l, "max_proto_version", "1");
+	l = add_startup_msg_s(l, "min_proto_version", "1");
+
+	/* We don't support understand column types yet */
+	l = add_startup_msg_b(l, "coltypes", false);
+
+	/* Info about our Pg host */
+	l = add_startup_msg_i(l, "pg_version_num", PG_VERSION_NUM);
+	l = add_startup_msg_s(l, "pg_version", PG_VERSION);
+	l = add_startup_msg_i(l, "pg_catversion", CATALOG_VERSION_NO);
+
+	l = add_startup_msg_s(l, "database_encoding", (char*)GetDatabaseEncodingName());
+
+	l = add_startup_msg_s(l, "encoding", (char*)pg_encoding_to_char(data->field_datum_encoding));
+
+	l = add_startup_msg_b(l, "forward_changesets",
+			data->forward_changesets);
+	l = add_startup_msg_b(l, "forward_changeset_origins",
+			data->forward_changeset_origins);
+
+	/* binary options enabled */
+	l = add_startup_msg_b(l, "binary.internal_basetypes",
+			data->allow_internal_basetypes);
+	l = add_startup_msg_b(l, "binary.binary_basetypes",
+			data->allow_binary_basetypes);
+
+	/* Binary format characteristics of server */
+	l = add_startup_msg_i(l, "binary.basetypes_major_version", PG_VERSION_NUM/100);
+	l = add_startup_msg_i(l, "binary.sizeof_int", sizeof(int));
+	l = add_startup_msg_i(l, "binary.sizeof_long", sizeof(long));
+	l = add_startup_msg_i(l, "binary.sizeof_datum", sizeof(Datum));
+	l = add_startup_msg_i(l, "binary.maxalign", MAXIMUM_ALIGNOF);
+	l = add_startup_msg_b(l, "binary.bigendian", server_bigendian());
+	l = add_startup_msg_b(l, "binary.float4_byval", server_float4_byval());
+	l = add_startup_msg_b(l, "binary.float8_byval", server_float8_byval());
+	l = add_startup_msg_b(l, "binary.integer_datetimes", server_integer_datetimes());
+	/* We don't know how to send in anything except our host's format */
+	l = add_startup_msg_i(l, "binary.binary_pg_version",
+			PG_VERSION_NUM/100);
+
+	l = add_startup_msg_b(l, "no_txinfo", data->client_no_txinfo);
+
+
+	/*
+	 * Confirm that we've enabled any requested hook functions.
+	 */
+	l = add_startup_msg_b(l, "hooks.startup_hook_enabled",
+			data->hooks.startup_hook != NULL);
+	l = add_startup_msg_b(l, "hooks.shutdown_hook_enabled",
+			data->hooks.shutdown_hook != NULL);
+	l = add_startup_msg_b(l, "hooks.row_filter_enabled",
+			data->hooks.row_filter_hook != NULL);
+	l = add_startup_msg_b(l, "hooks.transaction_filter_enabled",
+			data->hooks.txn_filter_hook != NULL);
+
+	/*
+	 * Output any extra params supplied by a startup hook by appending
+	 * them verbatim to the params list.
+	 */
+	foreach(lc, data->extra_startup_params)
+	{
+		DefElem *param = (DefElem*)lfirst(lc);
+		Assert(IsA(param->arg, String) && strVal(param->arg) != NULL);
+		l = lappend(l, param);
+	}
+
+	return l;
+}
diff --git a/contrib/pglogical_output/pglogical_config.h b/contrib/pglogical_output/pglogical_config.h
new file mode 100644
index 0000000..3af3ce8
--- /dev/null
+++ b/contrib/pglogical_output/pglogical_config.h
@@ -0,0 +1,55 @@
+#ifndef PG_LOGICAL_CONFIG_H
+#define PG_LOGICAL_CONFIG_H
+
+#ifndef PG_VERSION_NUM
+#error <postgres.h> must be included first
+#endif
+
+inline static bool
+server_float4_byval(void)
+{
+#ifdef USE_FLOAT4_BYVAL
+	return true;
+#else
+	return false;
+#endif
+}
+
+inline static bool
+server_float8_byval(void)
+{
+#ifdef USE_FLOAT8_BYVAL
+	return true;
+#else
+	return false;
+#endif
+}
+
+inline static bool
+server_integer_datetimes(void)
+{
+#ifdef USE_INTEGER_DATETIMES
+	return true;
+#else
+	return false;
+#endif
+}
+
+inline static bool
+server_bigendian(void)
+{
+#ifdef WORDS_BIGENDIAN
+	return true;
+#else
+	return false;
+#endif
+}
+
+typedef struct List List;
+typedef struct PGLogicalOutputData PGLogicalOutputData;
+
+extern int process_parameters(List *options, PGLogicalOutputData *data);
+
+extern List * prepare_startup_message(PGLogicalOutputData *data);
+
+#endif
diff --git a/contrib/pglogical_output/pglogical_hooks.c b/contrib/pglogical_output/pglogical_hooks.c
new file mode 100644
index 0000000..73e8120
--- /dev/null
+++ b/contrib/pglogical_output/pglogical_hooks.c
@@ -0,0 +1,232 @@
+#include "postgres.h"
+
+#include "access/xact.h"
+
+#include "catalog/pg_proc.h"
+#include "catalog/pg_type.h"
+
+#include "replication/origin.h"
+
+#include "parser/parse_func.h"
+
+#include "utils/acl.h"
+#include "utils/lsyscache.h"
+
+#include "miscadmin.h"
+
+#include "pglogical_hooks.h"
+#include "pglogical_output.h"
+
+/*
+ * Returns Oid of the hooks function specified in funcname.
+ *
+ * Error is thrown if function doesn't exist or doen't return correct datatype
+ * or is volatile.
+ */
+static Oid
+get_hooks_function_oid(List *funcname)
+{
+	Oid			funcid;
+	Oid			funcargtypes[1];
+
+	funcargtypes[0] = INTERNALOID;
+
+	/* find the the function */
+	funcid = LookupFuncName(funcname, 1, funcargtypes, false);
+
+	/* Validate that the function returns void */
+	if (get_func_rettype(funcid) != VOIDOID)
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+				 errmsg("function %s must return void",
+						NameListToString(funcname))));
+	}
+
+	if (func_volatile(funcid) == PROVOLATILE_VOLATILE)
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+				 errmsg("function %s must not be VOLATILE",
+						NameListToString(funcname))));
+	}
+
+	if (pg_proc_aclcheck(funcid, GetUserId(), ACL_EXECUTE) != ACLCHECK_OK)
+	{
+		const char * username;
+#if PG_VERSION_NUM >= 90500
+		username = GetUserNameFromId(GetUserId(), false);
+#else
+		username = GetUserNameFromId(GetUserId());
+#endif
+		ereport(ERROR,
+				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+				 errmsg("current user %s does not have permission to call function %s",
+					 username, NameListToString(funcname))));
+	}
+
+	return funcid;
+}
+
+/*
+ * If a hook setup function was specified in the startup parameters, look it up
+ * in the catalogs, check permissions, call it, and store the resulting hook
+ * info struct.
+ */
+void
+load_hooks(PGLogicalOutputData *data)
+{
+	Oid hooks_func;
+	MemoryContext old_ctxt;
+	bool txn_started = false;
+
+	if (!IsTransactionState())
+	{
+		txn_started = true;
+		StartTransactionCommand();
+	}
+
+	if (data->hooks_setup_funcname != NIL)
+	{
+		hooks_func = get_hooks_function_oid(data->hooks_setup_funcname);
+
+		old_ctxt = MemoryContextSwitchTo(data->hooks_mctxt);
+		(void) OidFunctionCall1(hooks_func, PointerGetDatum(&data->hooks));
+		MemoryContextSwitchTo(old_ctxt);
+
+		elog(DEBUG3, "pglogical_output: Loaded hooks from function %u. Hooks are: \n"
+				"\tstartup_hook: %p\n"
+				"\tshutdown_hook: %p\n"
+				"\trow_filter_hook: %p\n"
+				"\ttxn_filter_hook: %p\n"
+				"\thooks_private_data: %p\n",
+				hooks_func,
+				data->hooks.startup_hook,
+				data->hooks.shutdown_hook,
+				data->hooks.row_filter_hook,
+				data->hooks.txn_filter_hook,
+				data->hooks.hooks_private_data);
+	}
+
+	if (txn_started)
+		CommitTransactionCommand();
+}
+
+void
+call_startup_hook(PGLogicalOutputData *data, List *plugin_params)
+{
+	struct PGLogicalStartupHookArgs args;
+	MemoryContext old_ctxt;
+
+	if (data->hooks.startup_hook != NULL)
+	{
+		bool tx_started = false;
+
+		args.private_data = data->hooks.hooks_private_data;
+		args.in_params = plugin_params;
+		args.out_params = NIL;
+
+		elog(DEBUG3, "calling pglogical startup hook");
+
+		if (!IsTransactionState())
+		{
+			tx_started = true;
+			StartTransactionCommand();
+		}
+
+		old_ctxt = MemoryContextSwitchTo(data->hooks_mctxt);
+		(void) (*data->hooks.startup_hook)(&args);
+		MemoryContextSwitchTo(old_ctxt);
+
+		if (tx_started)
+			CommitTransactionCommand();
+
+		data->extra_startup_params = args.out_params;
+		/* The startup hook might change the private data seg */
+		data->hooks.hooks_private_data = args.private_data;
+
+		elog(DEBUG3, "called pglogical startup hook");
+	}
+}
+
+void
+call_shutdown_hook(PGLogicalOutputData *data)
+{
+	struct PGLogicalShutdownHookArgs args;
+	MemoryContext old_ctxt;
+
+	if (data->hooks.shutdown_hook != NULL)
+	{
+		args.private_data = data->hooks.hooks_private_data;
+
+		elog(DEBUG3, "calling pglogical shutdown hook");
+
+		old_ctxt = MemoryContextSwitchTo(data->hooks_mctxt);
+		(void) (*data->hooks.shutdown_hook)(&args);
+		MemoryContextSwitchTo(old_ctxt);
+
+		data->hooks.hooks_private_data = args.private_data;
+
+		elog(DEBUG3, "called pglogical shutdown hook");
+	}
+}
+
+/*
+ * Decide if the individual change should be filtered out by
+ * calling a client-provided hook.
+ */
+bool
+call_row_filter_hook(PGLogicalOutputData *data, ReorderBufferTXN *txn,
+		Relation rel, ReorderBufferChange *change)
+{
+	struct  PGLogicalRowFilterArgs hook_args;
+	MemoryContext old_ctxt;
+	bool ret = true;
+
+	if (data->hooks.row_filter_hook != NULL)
+	{
+		hook_args.change_type = change->action;
+		hook_args.private_data = data->hooks.hooks_private_data;
+		hook_args.changed_rel = rel;
+
+		elog(DEBUG3, "calling pglogical row filter hook");
+
+		old_ctxt = MemoryContextSwitchTo(data->hooks_mctxt);
+		ret = (*data->hooks.row_filter_hook)(&hook_args);
+		MemoryContextSwitchTo(old_ctxt);
+
+		/* Filter hooks shouldn't change the private data ptr */
+		Assert(data->hooks.hooks_private_data == hook_args.private_data);
+
+		elog(DEBUG3, "called pglogical row filter hook, returned %d", (int)ret);
+	}
+
+	return ret;
+}
+
+bool
+call_txn_filter_hook(PGLogicalOutputData *data, RepOriginId txn_origin)
+{
+	struct PGLogicalTxnFilterArgs hook_args;
+	bool ret = true;
+	MemoryContext old_ctxt;
+
+	if (data->hooks.txn_filter_hook != NULL)
+	{
+		hook_args.private_data = data->hooks.hooks_private_data;
+		hook_args.origin_id = txn_origin;
+
+		elog(DEBUG3, "calling pglogical txn filter hook");
+
+		old_ctxt = MemoryContextSwitchTo(data->hooks_mctxt);
+		ret = (*data->hooks.txn_filter_hook)(&hook_args);
+		MemoryContextSwitchTo(old_ctxt);
+
+		/* Filter hooks shouldn't change the private data ptr */
+		Assert(data->hooks.hooks_private_data == hook_args.private_data);
+
+		elog(DEBUG3, "called pglogical txn filter hook, returned %d", (int)ret);
+	}
+
+	return ret;
+}
diff --git a/contrib/pglogical_output/pglogical_hooks.h b/contrib/pglogical_output/pglogical_hooks.h
new file mode 100644
index 0000000..df661f3
--- /dev/null
+++ b/contrib/pglogical_output/pglogical_hooks.h
@@ -0,0 +1,22 @@
+#ifndef PGLOGICAL_HOOKS_H
+#define PGLOGICAL_HOOKS_H
+
+#include "replication/reorderbuffer.h"
+
+/* public interface for hooks */
+#include "pglogical_output/hooks.h"
+
+extern void load_hooks(PGLogicalOutputData *data);
+
+extern void call_startup_hook(PGLogicalOutputData *data, List *plugin_params);
+
+extern void call_shutdown_hook(PGLogicalOutputData *data);
+
+extern bool call_row_filter_hook(PGLogicalOutputData *data,
+		ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change);
+
+extern bool call_txn_filter_hook(PGLogicalOutputData *data,
+		RepOriginId txn_origin);
+
+
+#endif
diff --git a/contrib/pglogical_output/pglogical_output.c b/contrib/pglogical_output/pglogical_output.c
new file mode 100644
index 0000000..b8fc55e
--- /dev/null
+++ b/contrib/pglogical_output/pglogical_output.c
@@ -0,0 +1,537 @@
+/*-------------------------------------------------------------------------
+ *
+ * pglogical_output.c
+ *		  Logical Replication output plugin
+ *
+ * Copyright (c) 2012-2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  pglogical_output.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "pglogical_config.h"
+#include "pglogical_output.h"
+#include "pglogical_proto.h"
+#include "pglogical_hooks.h"
+
+#include "access/hash.h"
+#include "access/sysattr.h"
+#include "access/xact.h"
+
+#include "catalog/pg_class.h"
+#include "catalog/pg_proc.h"
+#include "catalog/pg_type.h"
+
+#include "mb/pg_wchar.h"
+
+#include "nodes/parsenodes.h"
+
+#include "parser/parse_func.h"
+
+#include "replication/output_plugin.h"
+#include "replication/logical.h"
+#include "replication/origin.h"
+
+#include "utils/builtins.h"
+#include "utils/catcache.h"
+#include "utils/guc.h"
+#include "utils/int8.h"
+#include "utils/inval.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/rel.h"
+#include "utils/relcache.h"
+#include "utils/syscache.h"
+#include "utils/typcache.h"
+
+PG_MODULE_MAGIC;
+
+extern void		_PG_output_plugin_init(OutputPluginCallbacks *cb);
+
+/* These must be available to pg_dlsym() */
+static void pg_decode_startup(LogicalDecodingContext * ctx,
+							  OutputPluginOptions *opt, bool is_init);
+static void pg_decode_shutdown(LogicalDecodingContext * ctx);
+static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
+					ReorderBufferTXN *txn);
+static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
+					 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
+static void pg_decode_change(LogicalDecodingContext *ctx,
+				 ReorderBufferTXN *txn, Relation rel,
+				 ReorderBufferChange *change);
+
+static bool pg_decode_origin_filter(LogicalDecodingContext *ctx,
+						RepOriginId origin_id);
+
+static void send_startup_message(LogicalDecodingContext *ctx,
+		PGLogicalOutputData *data, bool last_message);
+
+static bool startup_message_sent = false;
+
+/* specify output plugin callbacks */
+void
+_PG_output_plugin_init(OutputPluginCallbacks *cb)
+{
+	AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
+
+	cb->startup_cb = pg_decode_startup;
+	cb->begin_cb = pg_decode_begin_txn;
+	cb->change_cb = pg_decode_change;
+	cb->commit_cb = pg_decode_commit_txn;
+	cb->filter_by_origin_cb = pg_decode_origin_filter;
+	cb->shutdown_cb = pg_decode_shutdown;
+}
+
+static bool
+check_binary_compatibility(PGLogicalOutputData *data)
+{
+	if (data->client_binary_basetypes_major_version != PG_VERSION_NUM / 100)
+		return false;
+
+	if (data->client_binary_bigendian_set
+		&& data->client_binary_bigendian != server_bigendian())
+	{
+		elog(DEBUG1, "Binary mode rejected: Server and client endian mis-match");
+		return false;
+	}
+
+	if (data->client_binary_sizeofdatum != 0
+		&& data->client_binary_sizeofdatum != sizeof(Datum))
+	{
+		elog(DEBUG1, "Binary mode rejected: Server and client endian sizeof(Datum) mismatch");
+		return false;
+	}
+
+	if (data->client_binary_sizeofint != 0
+		&& data->client_binary_sizeofint != sizeof(int))
+	{
+		elog(DEBUG1, "Binary mode rejected: Server and client endian sizeof(int) mismatch");
+		return false;
+	}
+
+	if (data->client_binary_sizeoflong != 0
+		&& data->client_binary_sizeoflong != sizeof(long))
+	{
+		elog(DEBUG1, "Binary mode rejected: Server and client endian sizeof(long) mismatch");
+		return false;
+	}
+
+	if (data->client_binary_float4byval_set
+		&& data->client_binary_float4byval != server_float4_byval())
+	{
+		elog(DEBUG1, "Binary mode rejected: Server and client endian float4byval mismatch");
+		return false;
+	}
+
+	if (data->client_binary_float8byval_set
+		&& data->client_binary_float8byval != server_float8_byval())
+	{
+		elog(DEBUG1, "Binary mode rejected: Server and client endian float8byval mismatch");
+		return false;
+	}
+
+	if (data->client_binary_intdatetimes_set
+		&& data->client_binary_intdatetimes != server_integer_datetimes())
+	{
+		elog(DEBUG1, "Binary mode rejected: Server and client endian integer datetimes mismatch");
+		return false;
+	}
+
+	return true;
+}
+
+/* initialize this plugin */
+static void
+pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt,
+				  bool is_init)
+{
+	PGLogicalOutputData  *data = palloc0(sizeof(PGLogicalOutputData));
+
+	data->context = AllocSetContextCreate(TopMemoryContext,
+										  "pglogical conversion context",
+										  ALLOCSET_DEFAULT_MINSIZE,
+										  ALLOCSET_DEFAULT_INITSIZE,
+										  ALLOCSET_DEFAULT_MAXSIZE);
+	data->allow_internal_basetypes = false;
+	data->allow_binary_basetypes = false;
+
+
+	ctx->output_plugin_private = data;
+
+	/*
+	 * This is replication start and not slot initialization.
+	 *
+	 * Parse and validate options passed by the client.
+	 */
+	if (!is_init)
+	{
+		int		params_format;
+
+		 /*
+		 * Ideally we'd send the startup message immediately. That way
+		 * it'd arrive before any error we emit if we see incompatible
+		 * options sent by the client here. That way the client could
+		 * possibly adjust its options and reconnect. It'd also make
+		 * sure the client gets the startup message in a timely way if
+		 * the server is idle, since otherwise it could be a while
+		 * before the next callback.
+		 *
+		 * The decoding plugin API doesn't let us write to the stream
+		 * from here, though, so we have to delay the startup message
+		 * until the first change processed on the stream, in a begin
+		 * callback.
+		 *
+		 * If we ERROR there, the startup message is buffered but not
+		 * sent since the callback didn't finish. So we'd have to send
+		 * the startup message, finish the callback and check in the
+		 * next callback if we need to ERROR.
+		 *
+		 * That's a bit much hoop jumping, so for now ERRORs are
+		 * immediate. A way to emit a message from the startup callback
+		 * is really needed to change that.
+		 */
+		startup_message_sent = false;
+
+		/* Now parse the rest of the params and ERROR if we see any we don't recognise */
+		params_format = process_parameters(ctx->output_plugin_options, data);
+
+		if (params_format != 1)
+			ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("client sent startup parameters in format %d but we only support format 1",
+					params_format)));
+
+		if (data->client_min_proto_version > PG_LOGICAL_PROTO_VERSION_NUM)
+			ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("client sent min_proto_version=%d but we only support protocol %d or lower",
+					 data->client_min_proto_version, PG_LOGICAL_PROTO_VERSION_NUM)));
+
+		if (data->client_max_proto_version < PG_LOGICAL_PROTO_MIN_VERSION_NUM)
+			ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("client sent max_proto_version=%d but we only support protocol %d or higher",
+				 	data->client_max_proto_version, PG_LOGICAL_PROTO_MIN_VERSION_NUM)));
+
+		/*
+		 * Set correct protocol format.
+		 *
+		 * This is the output plugin protocol format, this is different
+		 * from the individual fields binary vs textual format.
+		 */
+		if (data->client_protocol_format != NULL
+				&& strcmp(data->client_protocol_format, "json") == 0)
+		{
+			data->api = pglogical_init_api(PGLogicalProtoJson);
+			opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
+		}
+		else if ((data->client_protocol_format != NULL
+			     && strcmp(data->client_protocol_format, "native") == 0)
+				 || data->client_protocol_format == NULL)
+		{
+			data->api = pglogical_init_api(PGLogicalProtoNative);
+			opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
+
+			if (data->client_no_txinfo)
+			{
+				elog(WARNING, "no_txinfo option ignored for protocols other than json");
+				data->client_no_txinfo = false;
+			}
+		}
+		else
+		{
+			ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("client requested protocol %s but only \"json\" or \"native\" are supported",
+				 	data->client_protocol_format)));
+		}
+
+		/* check for encoding match if specific encoding demanded by client */
+		if (data->client_expected_encoding != NULL
+				&& strlen(data->client_expected_encoding) != 0)
+		{
+			int wanted_encoding = pg_char_to_encoding(data->client_expected_encoding);
+
+			if (wanted_encoding == -1)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("unrecognised encoding name %s passed to expected_encoding",
+								data->client_expected_encoding)));
+
+			if (opt->output_type == OUTPUT_PLUGIN_TEXTUAL_OUTPUT)
+			{
+				/*
+				 * datum encoding must match assigned client_encoding in text
+				 * proto, since everything is subject to client_encoding
+				 * conversion.
+				 */
+				if (wanted_encoding != pg_get_client_encoding())
+					ereport(ERROR,
+							(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+							 errmsg("expected_encoding must be unset or match client_encoding in text protocols")));
+			}
+			else
+			{
+				/*
+				 * currently in the binary protocol we can only emit encoded
+				 * datums in the server encoding. There's no support for encoding
+				 * conversion.
+				 */
+				if (wanted_encoding != GetDatabaseEncoding())
+					ereport(ERROR,
+							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+							 errmsg("encoding conversion for binary datum not supported yet"),
+							 errdetail("expected_encoding %s must be unset or match server_encoding %s",
+								 data->client_expected_encoding, GetDatabaseEncodingName())));
+			}
+
+			data->field_datum_encoding = wanted_encoding;
+		}
+
+		/*
+		 * It's obviously not possible to send binary representatio of data
+		 * unless we use the binary output.
+		 */
+		if (opt->output_type == OUTPUT_PLUGIN_BINARY_OUTPUT &&
+			data->client_want_internal_basetypes)
+		{
+			data->allow_internal_basetypes =
+				check_binary_compatibility(data);
+		}
+
+		if (opt->output_type == OUTPUT_PLUGIN_BINARY_OUTPUT &&
+			data->client_want_binary_basetypes &&
+			data->client_binary_basetypes_major_version == PG_VERSION_NUM / 100)
+		{
+			data->allow_binary_basetypes = true;
+		}
+
+		/*
+		 * Will we forward changesets? We have to if we're on 9.4;
+		 * otherwise honour the client's request.
+		 */
+		if (PG_VERSION_NUM/100 == 904)
+		{
+			/*
+			 * 9.4 unconditionally forwards changesets due to lack of
+			 * replication origins, and it can't ever send origin info
+			 * for the same reason.
+			 */
+			data->forward_changesets = true;
+			data->forward_changeset_origins = false;
+
+			if (data->client_forward_changesets_set
+				&& !data->client_forward_changesets)
+			{
+				ereport(DEBUG1,
+						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+						 errmsg("Cannot disable changeset forwarding on PostgreSQL 9.4")));
+			}
+		}
+		else if (data->client_forward_changesets_set
+				 && data->client_forward_changesets)
+		{
+			/* Client explicitly asked for forwarding; forward csets and origins */
+			data->forward_changesets = true;
+			data->forward_changeset_origins = true;
+		}
+		else
+		{
+			/* Default to not forwarding or honour client's request not to fwd */
+			data->forward_changesets = false;
+			data->forward_changeset_origins = false;
+		}
+
+		if (data->hooks_setup_funcname != NIL)
+		{
+
+			data->hooks_mctxt = AllocSetContextCreate(ctx->context,
+					"pglogical_output hooks context",
+					ALLOCSET_SMALL_MINSIZE,
+					ALLOCSET_SMALL_INITSIZE,
+					ALLOCSET_SMALL_MAXSIZE);
+
+			load_hooks(data);
+			call_startup_hook(data, ctx->output_plugin_options);
+		}
+	}
+}
+
+/*
+ * BEGIN callback
+ */
+void
+pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
+{
+	PGLogicalOutputData* data = (PGLogicalOutputData*)ctx->output_plugin_private;
+	bool send_replication_origin = data->forward_changeset_origins;
+
+	if (!startup_message_sent)
+		send_startup_message(ctx, data, false /* can't be last message */);
+
+	/* If the record didn't originate locally, send origin info */
+	send_replication_origin &= txn->origin_id != InvalidRepOriginId;
+
+	OutputPluginPrepareWrite(ctx, !send_replication_origin);
+	data->api->write_begin(ctx->out, data, txn);
+
+	if (send_replication_origin)
+	{
+		char *origin;
+
+		/* Message boundary */
+		OutputPluginWrite(ctx, false);
+		OutputPluginPrepareWrite(ctx, true);
+
+		/*
+		 * XXX: which behaviour we want here?
+		 *
+		 * Alternatives:
+		 *  - don't send origin message if origin name not found
+		 *    (that's what we do now)
+		 *  - throw error - that will break replication, not good
+		 *  - send some special "unknown" origin
+		 */
+		if (data->api->write_origin &&
+			replorigin_by_oid(txn->origin_id, true, &origin))
+			data->api->write_origin(ctx->out, origin, txn->origin_lsn);
+	}
+
+	OutputPluginWrite(ctx, true);
+}
+
+/*
+ * COMMIT callback
+ */
+void
+pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+					 XLogRecPtr commit_lsn)
+{
+	PGLogicalOutputData* data = (PGLogicalOutputData*)ctx->output_plugin_private;
+
+	OutputPluginPrepareWrite(ctx, true);
+	data->api->write_commit(ctx->out, data, txn, commit_lsn);
+	OutputPluginWrite(ctx, true);
+}
+
+void
+pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+				 Relation relation, ReorderBufferChange *change)
+{
+	PGLogicalOutputData *data = ctx->output_plugin_private;
+	MemoryContext old;
+
+	/* First check the table filter */
+	if (!call_row_filter_hook(data, txn, relation, change))
+		return;
+
+	/* Avoid leaking memory by using and resetting our own context */
+	old = MemoryContextSwitchTo(data->context);
+
+	/* TODO: add caching (send only if changed) */
+	if (data->api->write_rel)
+	{
+		OutputPluginPrepareWrite(ctx, false);
+		data->api->write_rel(ctx->out, relation);
+		OutputPluginWrite(ctx, false);
+	}
+
+	/* Send the data */
+	switch (change->action)
+	{
+		case REORDER_BUFFER_CHANGE_INSERT:
+			OutputPluginPrepareWrite(ctx, true);
+			data->api->write_insert(ctx->out, data, relation,
+									&change->data.tp.newtuple->tuple);
+			OutputPluginWrite(ctx, true);
+			break;
+		case REORDER_BUFFER_CHANGE_UPDATE:
+			{
+				HeapTuple oldtuple = change->data.tp.oldtuple ?
+					&change->data.tp.oldtuple->tuple : NULL;
+
+				OutputPluginPrepareWrite(ctx, true);
+				data->api->write_update(ctx->out, data, relation, oldtuple,
+										&change->data.tp.newtuple->tuple);
+				OutputPluginWrite(ctx, true);
+				break;
+			}
+		case REORDER_BUFFER_CHANGE_DELETE:
+			if (change->data.tp.oldtuple)
+			{
+				OutputPluginPrepareWrite(ctx, true);
+				data->api->write_delete(ctx->out, data, relation,
+										&change->data.tp.oldtuple->tuple);
+				OutputPluginWrite(ctx, true);
+			}
+			else
+				elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
+			break;
+		default:
+			Assert(false);
+	}
+
+	/* Cleanup */
+	MemoryContextSwitchTo(old);
+	MemoryContextReset(data->context);
+}
+
+/*
+ * Decide if the whole transaction with specific origin should be filtered out.
+ */
+static bool
+pg_decode_origin_filter(LogicalDecodingContext *ctx,
+						RepOriginId origin_id)
+{
+	PGLogicalOutputData *data = ctx->output_plugin_private;
+
+	if (!call_txn_filter_hook(data, origin_id))
+		return true;
+
+	if (!data->forward_changesets && origin_id != InvalidRepOriginId)
+		return true;
+
+	return false;
+}
+
+static void
+send_startup_message(LogicalDecodingContext *ctx,
+		PGLogicalOutputData *data, bool last_message)
+{
+	List *msg;
+
+	Assert(!startup_message_sent);
+
+	msg = prepare_startup_message(data);
+
+	/*
+	 * We could free the extra_startup_params DefElem list here, but it's
+	 * pretty harmless to just ignore it, since it's in the decoding memory
+	 * context anyway, and we don't know if it's safe to free the defnames or
+	 * not.
+	 */
+
+	OutputPluginPrepareWrite(ctx, last_message);
+	data->api->write_startup_message(ctx->out, msg);
+	OutputPluginWrite(ctx, last_message);
+
+	pfree(msg);
+
+	startup_message_sent = true;
+}
+
+static void pg_decode_shutdown(LogicalDecodingContext * ctx)
+{
+	PGLogicalOutputData* data = (PGLogicalOutputData*)ctx->output_plugin_private;
+
+	call_shutdown_hook(data);
+
+	if (data->hooks_mctxt != NULL)
+	{
+		MemoryContextDelete(data->hooks_mctxt);
+		data->hooks_mctxt = NULL;
+	}
+}
diff --git a/contrib/pglogical_output/pglogical_output.h b/contrib/pglogical_output/pglogical_output.h
new file mode 100644
index 0000000..a874c40
--- /dev/null
+++ b/contrib/pglogical_output/pglogical_output.h
@@ -0,0 +1,105 @@
+/*-------------------------------------------------------------------------
+ *
+ * pglogical_output.h
+ *		pglogical output plugin
+ *
+ * Copyright (c) 2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		pglogical_output.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_LOGICAL_OUTPUT_H
+#define PG_LOGICAL_OUTPUT_H
+
+#include "nodes/parsenodes.h"
+
+#include "replication/logical.h"
+#include "replication/output_plugin.h"
+
+#include "storage/lock.h"
+
+#include "pglogical_output/hooks.h"
+
+#include "pglogical_proto.h"
+
+#define PG_LOGICAL_PROTO_VERSION_NUM 1
+#define PG_LOGICAL_PROTO_MIN_VERSION_NUM 1
+
+/*
+ * The name of a hook function. This is used instead of the usual List*
+ * because can serve as a hash key.
+ *
+ * Must be zeroed on allocation if used as a hash key since padding is
+ * *not* ignored on compare.
+ */
+typedef struct HookFuncName
+{
+	/* funcname is more likely to be unique, so goes first */
+	char    function[NAMEDATALEN];
+	char    schema[NAMEDATALEN];
+} HookFuncName;
+
+typedef struct PGLogicalOutputData
+{
+	MemoryContext context;
+
+	PGLogicalProtoAPI *api;
+
+	/* protocol */
+	bool	allow_internal_basetypes;
+	bool	allow_binary_basetypes;
+	bool	forward_changesets;
+	bool	forward_changeset_origins;
+	int		field_datum_encoding;
+
+	/*
+	 * client info
+	 *
+	 * Lots of this should move to a separate shorter-lived struct used only
+	 * during parameter reading, since it contains what the client asked for.
+	 * Once we've processed this during startup we don't refer to it again.
+	 */
+	uint32	client_pg_version;
+	uint32	client_max_proto_version;
+	uint32	client_min_proto_version;
+	const char *client_expected_encoding;
+	const char *client_protocol_format;
+	uint32  client_binary_basetypes_major_version;
+	bool	client_want_internal_basetypes_set;
+	bool	client_want_internal_basetypes;
+	bool	client_want_binary_basetypes_set;
+	bool	client_want_binary_basetypes;
+	bool	client_binary_bigendian_set;
+	bool	client_binary_bigendian;
+	uint32	client_binary_sizeofdatum;
+	uint32	client_binary_sizeofint;
+	uint32	client_binary_sizeoflong;
+	bool	client_binary_float4byval_set;
+	bool	client_binary_float4byval;
+	bool	client_binary_float8byval_set;
+	bool	client_binary_float8byval;
+	bool	client_binary_intdatetimes_set;
+	bool	client_binary_intdatetimes;
+	bool	client_forward_changesets_set;
+	bool	client_forward_changesets;
+	bool	client_no_txinfo;
+
+	/* hooks */
+	List *hooks_setup_funcname;
+	struct PGLogicalHooks hooks;
+	MemoryContext hooks_mctxt;
+
+	/* DefElem<String> list populated by startup hook */
+	List *extra_startup_params;
+} PGLogicalOutputData;
+
+typedef struct PGLogicalTupleData
+{
+	Datum	values[MaxTupleAttributeNumber];
+	bool	nulls[MaxTupleAttributeNumber];
+	bool	changed[MaxTupleAttributeNumber];
+} PGLogicalTupleData;
+
+#endif /* PG_LOGICAL_OUTPUT_H */
diff --git a/contrib/pglogical_output/pglogical_output/README b/contrib/pglogical_output/pglogical_output/README
new file mode 100644
index 0000000..5480e5c
--- /dev/null
+++ b/contrib/pglogical_output/pglogical_output/README
@@ -0,0 +1,7 @@
+/*
+ * This directory contains the public header files for the pglogical_output
+ * extension. It is installed into the PostgreSQL source tree when the extension
+ * is installed.
+ *
+ * These headers are not part of the PostgreSQL project its self.
+ */
diff --git a/contrib/pglogical_output/pglogical_output/hooks.h b/contrib/pglogical_output/pglogical_output/hooks.h
new file mode 100644
index 0000000..b20fa72
--- /dev/null
+++ b/contrib/pglogical_output/pglogical_output/hooks.h
@@ -0,0 +1,72 @@
+#ifndef PGLOGICAL_OUTPUT_HOOKS_H
+#define PGLOGICAL_OUTPUT_HOOKS_H
+
+#include "access/xlogdefs.h"
+#include "nodes/pg_list.h"
+#include "utils/rel.h"
+#include "utils/palloc.h"
+#include "replication/reorderbuffer.h"
+
+struct PGLogicalOutputData;
+typedef struct PGLogicalOutputData PGLogicalOutputData;
+
+/*
+ * This header is to be included by extensions that implement pglogical output
+ * plugin callback hooks for transaction origin and row filtering, etc. It is
+ * installed as "pglogical_output/hooks.h"
+ *
+ * See the README.md and the example in examples/hooks/ for details on hooks.
+ */
+
+
+struct PGLogicalStartupHookArgs
+{
+	void	   *private_data;
+	List	   *in_params;
+	List	   *out_params;
+};
+
+typedef void (*pglogical_startup_hook_fn)(struct PGLogicalStartupHookArgs *args);
+
+
+struct PGLogicalTxnFilterArgs
+{
+	void 	   *private_data;
+	RepOriginId	origin_id;
+};
+
+typedef bool (*pglogical_txn_filter_hook_fn)(struct PGLogicalTxnFilterArgs *args);
+
+
+struct PGLogicalRowFilterArgs
+{
+	void 	   *private_data;
+	Relation	changed_rel;
+	enum ReorderBufferChangeType	change_type;
+};
+
+typedef bool (*pglogical_row_filter_hook_fn)(struct PGLogicalRowFilterArgs *args);
+
+
+struct PGLogicalShutdownHookArgs
+{
+	void	   *private_data;
+};
+
+typedef void (*pglogical_shutdown_hook_fn)(struct PGLogicalShutdownHookArgs *args);
+
+/*
+ * This struct is passed to the pglogical_get_hooks_fn as the first argument,
+ * typed 'internal', and is unwrapped with `DatumGetPointer`.
+ */
+struct PGLogicalHooks
+{
+	pglogical_startup_hook_fn startup_hook;
+	pglogical_shutdown_hook_fn shutdown_hook;
+	pglogical_txn_filter_hook_fn txn_filter_hook;
+	pglogical_row_filter_hook_fn row_filter_hook;
+	void *hooks_private_data;
+};
+
+
+#endif /* PGLOGICAL_OUTPUT_HOOKS_H */
diff --git a/contrib/pglogical_output/pglogical_proto.c b/contrib/pglogical_output/pglogical_proto.c
new file mode 100644
index 0000000..47a883f
--- /dev/null
+++ b/contrib/pglogical_output/pglogical_proto.c
@@ -0,0 +1,49 @@
+/*-------------------------------------------------------------------------
+ *
+ * pglogical_proto.c
+ * 		pglogical protocol functions
+ *
+ * Copyright (c) 2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  pglogical_proto.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "pglogical_output.h"
+#include "pglogical_proto.h"
+#include "pglogical_proto_native.h"
+#include "pglogical_proto_json.h"
+
+PGLogicalProtoAPI *
+pglogical_init_api(PGLogicalProtoType typ)
+{
+	PGLogicalProtoAPI  *res = palloc0(sizeof(PGLogicalProtoAPI));
+
+	if (typ == PGLogicalProtoJson)
+	{
+		res->write_rel = NULL;
+		res->write_begin = pglogical_json_write_begin;
+		res->write_commit = pglogical_json_write_commit;
+		res->write_origin = NULL;
+		res->write_insert = pglogical_json_write_insert;
+		res->write_update = pglogical_json_write_update;
+		res->write_delete = pglogical_json_write_delete;
+		res->write_startup_message = json_write_startup_message;
+	}
+	else
+	{
+		res->write_rel = pglogical_write_rel;
+		res->write_begin = pglogical_write_begin;
+		res->write_commit = pglogical_write_commit;
+		res->write_origin = pglogical_write_origin;
+		res->write_insert = pglogical_write_insert;
+		res->write_update = pglogical_write_update;
+		res->write_delete = pglogical_write_delete;
+		res->write_startup_message = write_startup_message;
+	}
+
+	return res;
+}
diff --git a/contrib/pglogical_output/pglogical_proto.h b/contrib/pglogical_output/pglogical_proto.h
new file mode 100644
index 0000000..b56ff6f
--- /dev/null
+++ b/contrib/pglogical_output/pglogical_proto.h
@@ -0,0 +1,57 @@
+/*-------------------------------------------------------------------------
+ *
+ * pglogical_proto.h
+ *		pglogical protocol
+ *
+ * Copyright (c) 2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  pglogical_proto.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_LOGICAL_PROTO_H
+#define PG_LOGICAL_PROTO_H
+
+typedef void (*pglogical_write_rel_fn)(StringInfo out, Relation rel);
+
+typedef void (*pglogical_write_begin_fn)(StringInfo out, PGLogicalOutputData *data,
+							 ReorderBufferTXN *txn);
+typedef void (*pglogical_write_commit_fn)(StringInfo out, PGLogicalOutputData *data,
+							 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
+
+typedef void (*pglogical_write_origin_fn)(StringInfo out, const char *origin,
+							 XLogRecPtr origin_lsn);
+
+typedef void (*pglogical_write_insert_fn)(StringInfo out, PGLogicalOutputData *data,
+							 Relation rel, HeapTuple newtuple);
+typedef void (*pglogical_write_update_fn)(StringInfo out, PGLogicalOutputData *data,
+							 Relation rel, HeapTuple oldtuple,
+							 HeapTuple newtuple);
+typedef void (*pglogical_write_delete_fn)(StringInfo out, PGLogicalOutputData *data,
+							 Relation rel, HeapTuple oldtuple);
+
+typedef void (*write_startup_message_fn)(StringInfo out, List *msg);
+
+typedef struct PGLogicalProtoAPI
+{
+	pglogical_write_rel_fn		write_rel;
+	pglogical_write_begin_fn	write_begin;
+	pglogical_write_commit_fn	write_commit;
+	pglogical_write_origin_fn	write_origin;
+	pglogical_write_insert_fn	write_insert;
+	pglogical_write_update_fn	write_update;
+	pglogical_write_delete_fn	write_delete;
+	write_startup_message_fn	write_startup_message;
+} PGLogicalProtoAPI;
+
+
+typedef enum PGLogicalProtoType
+{
+	PGLogicalProtoNative,
+	PGLogicalProtoJson
+} PGLogicalProtoType;
+
+extern PGLogicalProtoAPI *pglogical_init_api(PGLogicalProtoType typ);
+
+#endif /* PG_LOGICAL_PROTO_H */
diff --git a/contrib/pglogical_output/pglogical_proto_json.c b/contrib/pglogical_output/pglogical_proto_json.c
new file mode 100644
index 0000000..ae5a591
--- /dev/null
+++ b/contrib/pglogical_output/pglogical_proto_json.c
@@ -0,0 +1,204 @@
+/*-------------------------------------------------------------------------
+ *
+ * pglogical_proto_json.c
+ * 		pglogical protocol functions for json support
+ *
+ * Copyright (c) 2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  pglogical_proto_json.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "miscadmin.h"
+
+#include "pglogical_output.h"
+#include "pglogical_proto_json.h"
+
+#include "access/sysattr.h"
+#include "access/tuptoaster.h"
+#include "access/xact.h"
+
+#include "catalog/catversion.h"
+#include "catalog/index.h"
+
+#include "catalog/namespace.h"
+#include "catalog/pg_class.h"
+#include "catalog/pg_database.h"
+#include "catalog/pg_namespace.h"
+#include "catalog/pg_type.h"
+
+#include "commands/dbcommands.h"
+
+#include "executor/spi.h"
+
+#include "libpq/pqformat.h"
+
+#include "mb/pg_wchar.h"
+
+#ifdef HAVE_REPLICATION_ORIGINS
+#include "replication/origin.h"
+#endif
+
+#include "utils/builtins.h"
+#include "utils/json.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/rel.h"
+#include "utils/syscache.h"
+#include "utils/timestamp.h"
+#include "utils/typcache.h"
+
+
+/*
+ * Write BEGIN to the output stream.
+ */
+void
+pglogical_json_write_begin(StringInfo out, PGLogicalOutputData *data, ReorderBufferTXN *txn)
+{
+	appendStringInfoChar(out, '{');
+	appendStringInfoString(out, "\"action\":\"B\"");
+	appendStringInfo(out, ", \"has_catalog_changes\":\"%c\"",
+		txn->has_catalog_changes ? 't' : 'f');
+#ifdef HAVE_REPLICATION_ORIGINS
+	if (txn->origin_id != InvalidRepOriginId)
+		appendStringInfo(out, ", \"origin_id\":\"%u\"", txn->origin_id);
+#endif
+	if (!data->client_no_txinfo)
+	{
+		appendStringInfo(out, ", \"xid\":\"%u\"", txn->xid);
+		appendStringInfo(out, ", \"first_lsn\":\"%X/%X\"",
+			(uint32)(txn->first_lsn >> 32), (uint32)(txn->first_lsn));
+#ifdef HAVE_REPLICATION_ORIGINS
+		appendStringInfo(out, ", \"origin_lsn\":\"%X/%X\"",
+			(uint32)(txn->origin_lsn >> 32), (uint32)(txn->origin_lsn));
+#endif
+		if (txn->commit_time != 0)
+		appendStringInfo(out, ", \"commit_time\":\"%s\"",
+			timestamptz_to_str(txn->commit_time));
+	}
+	appendStringInfoChar(out, '}');
+}
+
+/*
+ * Write COMMIT to the output stream.
+ */
+void
+pglogical_json_write_commit(StringInfo out, PGLogicalOutputData *data, ReorderBufferTXN *txn,
+						XLogRecPtr commit_lsn)
+{
+	appendStringInfoChar(out, '{');
+	appendStringInfoString(out, "\"action\":\"C\"");
+	if (!data->client_no_txinfo)
+	{
+		appendStringInfo(out, ", \"final_lsn\":\"%X/%X\"",
+			(uint32)(txn->final_lsn >> 32), (uint32)(txn->final_lsn));
+		appendStringInfo(out, ", \"end_lsn\":\"%X/%X\"",
+			(uint32)(txn->end_lsn >> 32), (uint32)(txn->end_lsn));
+	}
+	appendStringInfoChar(out, '}');
+}
+
+/*
+ * Write a tuple to the outputstream, in the most efficient format possible.
+ */
+static void
+json_write_tuple(StringInfo out, Relation rel, HeapTuple tuple)
+{
+	TupleDesc	desc;
+	Datum		tupdatum,
+				json;
+
+	desc = RelationGetDescr(rel);
+	tupdatum = heap_copy_tuple_as_datum(tuple, desc);
+	json = DirectFunctionCall1(row_to_json, tupdatum);
+
+	appendStringInfoString(out, TextDatumGetCString(json));
+}
+
+/*
+ * Write change.
+ *
+ * Generic function handling DML changes.
+ */
+static void
+pglogical_json_write_change(StringInfo out, const char *change, Relation rel,
+							HeapTuple oldtuple, HeapTuple newtuple)
+{
+	appendStringInfoChar(out, '{');
+	appendStringInfo(out, "\"action\":\"%s\",\"relation\":[\"%s\",\"%s\"]",
+					 change,
+					 get_namespace_name(RelationGetNamespace(rel)),
+					 RelationGetRelationName(rel));
+
+	if (oldtuple)
+	{
+		appendStringInfoString(out, ",\"oldtuple\":");
+		json_write_tuple(out, rel, oldtuple);
+	}
+	if (newtuple)
+	{
+		appendStringInfoString(out, ",\"newtuple\":");
+		json_write_tuple(out, rel, newtuple);
+	}
+	appendStringInfoChar(out, '}');
+}
+
+/*
+ * Write INSERT to the output stream.
+ */
+void
+pglogical_json_write_insert(StringInfo out, PGLogicalOutputData *data,
+							Relation rel, HeapTuple newtuple)
+{
+	pglogical_json_write_change(out, "I", rel, NULL, newtuple);
+}
+
+/*
+ * Write UPDATE to the output stream.
+ */
+void
+pglogical_json_write_update(StringInfo out, PGLogicalOutputData *data,
+							Relation rel, HeapTuple oldtuple,
+							HeapTuple newtuple)
+{
+	pglogical_json_write_change(out, "U", rel, oldtuple, newtuple);
+}
+
+/*
+ * Write DELETE to the output stream.
+ */
+void
+pglogical_json_write_delete(StringInfo out, PGLogicalOutputData *data,
+							Relation rel, HeapTuple oldtuple)
+{
+	pglogical_json_write_change(out, "D", rel, oldtuple, NULL);
+}
+
+/*
+ * The startup message should be constructed as a json object, one
+ * key/value per DefElem list member.
+ */
+void
+json_write_startup_message(StringInfo out, List *msg)
+{
+	ListCell *lc;
+	bool first = true;
+
+	appendStringInfoString(out, "{\"action\":\"S\", \"params\": {");
+	foreach (lc, msg)
+	{
+		DefElem *param = (DefElem*)lfirst(lc);
+		Assert(IsA(param->arg, String) && strVal(param->arg) != NULL);
+		if (first)
+			first = false;
+		else
+			appendStringInfoChar(out, ',');
+		escape_json(out, param->defname);
+		appendStringInfoChar(out, ':');
+		escape_json(out, strVal(param->arg));
+	}
+	appendStringInfoString(out, "}}");
+}
diff --git a/contrib/pglogical_output/pglogical_proto_json.h b/contrib/pglogical_output/pglogical_proto_json.h
new file mode 100644
index 0000000..d853e9e
--- /dev/null
+++ b/contrib/pglogical_output/pglogical_proto_json.h
@@ -0,0 +1,32 @@
+/*-------------------------------------------------------------------------
+ *
+ * pglogical_proto_json.h
+ *		pglogical protocol, json implementation
+ *
+ * Copyright (c) 2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  pglogical_proto_json.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_LOGICAL_PROTO_JSON_H
+#define PG_LOGICAL_PROTO_JSON_H
+
+
+extern void pglogical_json_write_begin(StringInfo out, PGLogicalOutputData *data,
+								 ReorderBufferTXN *txn);
+extern void pglogical_json_write_commit(StringInfo out, PGLogicalOutputData *data,
+								 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
+
+extern void pglogical_json_write_insert(StringInfo out, PGLogicalOutputData *data,
+								 Relation rel, HeapTuple newtuple);
+extern void pglogical_json_write_update(StringInfo out, PGLogicalOutputData *data,
+								 Relation rel, HeapTuple oldtuple,
+								 HeapTuple newtuple);
+extern void pglogical_json_write_delete(StringInfo out, PGLogicalOutputData *data,
+								 Relation rel, HeapTuple oldtuple);
+
+extern void json_write_startup_message(StringInfo out, List *msg);
+
+#endif /* PG_LOGICAL_PROTO_JSON_H */
diff --git a/contrib/pglogical_output/pglogical_proto_native.c b/contrib/pglogical_output/pglogical_proto_native.c
new file mode 100644
index 0000000..baaf324
--- /dev/null
+++ b/contrib/pglogical_output/pglogical_proto_native.c
@@ -0,0 +1,494 @@
+/*-------------------------------------------------------------------------
+ *
+ * pglogical_proto_native.c
+ * 		pglogical binary protocol functions
+ *
+ * Copyright (c) 2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  pglogical_proto_native.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "miscadmin.h"
+
+#include "pglogical_output.h"
+#include "pglogical_proto_native.h"
+
+#include "access/sysattr.h"
+#include "access/tuptoaster.h"
+#include "access/xact.h"
+
+#include "catalog/catversion.h"
+#include "catalog/index.h"
+
+#include "catalog/namespace.h"
+#include "catalog/pg_class.h"
+#include "catalog/pg_database.h"
+#include "catalog/pg_namespace.h"
+#include "catalog/pg_type.h"
+
+#include "commands/dbcommands.h"
+
+#include "executor/spi.h"
+
+#include "libpq/pqformat.h"
+
+#include "mb/pg_wchar.h"
+
+#include "utils/builtins.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/rel.h"
+#include "utils/syscache.h"
+#include "utils/timestamp.h"
+#include "utils/typcache.h"
+
+#define IS_REPLICA_IDENTITY 1
+
+static void pglogical_write_attrs(StringInfo out, Relation rel);
+static void pglogical_write_tuple(StringInfo out, PGLogicalOutputData *data,
+								   Relation rel, HeapTuple tuple);
+static char decide_datum_transfer(Form_pg_attribute att,
+								  Form_pg_type typclass,
+								  bool allow_internal_basetypes,
+								  bool allow_binary_basetypes);
+
+/*
+ * Write relation description to the output stream.
+ */
+void
+pglogical_write_rel(StringInfo out, Relation rel)
+{
+	const char *nspname;
+	uint8		nspnamelen;
+	const char *relname;
+	uint8		relnamelen;
+	uint8		flags = 0;
+
+	pq_sendbyte(out, 'R');		/* sending RELATION */
+
+	/* send the flags field */
+	pq_sendbyte(out, flags);
+
+	/* use Oid as relation identifier */
+	pq_sendint(out, RelationGetRelid(rel), 4);
+
+	nspname = get_namespace_name(rel->rd_rel->relnamespace);
+	if (nspname == NULL)
+		elog(ERROR, "cache lookup failed for namespace %u",
+			 rel->rd_rel->relnamespace);
+	nspnamelen = strlen(nspname) + 1;
+
+	relname = NameStr(rel->rd_rel->relname);
+	relnamelen = strlen(relname) + 1;
+
+	pq_sendbyte(out, nspnamelen);		/* schema name length */
+	pq_sendbytes(out, nspname, nspnamelen);
+
+	pq_sendbyte(out, relnamelen);		/* table name length */
+	pq_sendbytes(out, relname, relnamelen);
+
+	/* send the attribute info */
+	pglogical_write_attrs(out, rel);
+}
+
+/*
+ * Write relation attributes to the outputstream.
+ */
+static void
+pglogical_write_attrs(StringInfo out, Relation rel)
+{
+	TupleDesc	desc;
+	int			i;
+	uint16		nliveatts = 0;
+	Bitmapset  *idattrs;
+
+	desc = RelationGetDescr(rel);
+
+	pq_sendbyte(out, 'A');			/* sending ATTRS */
+
+	/* send number of live attributes */
+	for (i = 0; i < desc->natts; i++)
+	{
+		if (desc->attrs[i]->attisdropped)
+			continue;
+		nliveatts++;
+	}
+	pq_sendint(out, nliveatts, 2);
+
+	/* fetch bitmap of REPLICATION IDENTITY attributes */
+	idattrs = RelationGetIndexAttrBitmap(rel, INDEX_ATTR_BITMAP_IDENTITY_KEY);
+
+	/* send the attributes */
+	for (i = 0; i < desc->natts; i++)
+	{
+		Form_pg_attribute att = desc->attrs[i];
+		uint8			flags = 0;
+		uint16			len;
+		const char	   *attname;
+
+		if (att->attisdropped)
+			continue;
+
+		if (bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+						  idattrs))
+			flags |= IS_REPLICA_IDENTITY;
+
+		pq_sendbyte(out, 'C');		/* column definition follows */
+		pq_sendbyte(out, flags);
+
+		pq_sendbyte(out, 'N');		/* column name block follows */
+		attname = NameStr(att->attname);
+		len = strlen(attname) + 1;
+		pq_sendint(out, len, 2);
+		pq_sendbytes(out, attname, len); /* data */
+	}
+}
+
+/*
+ * Write BEGIN to the output stream.
+ */
+void
+pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
+					  ReorderBufferTXN *txn)
+{
+	uint8	flags = 0;
+
+	pq_sendbyte(out, 'B');		/* BEGIN */
+
+	/* send the flags field its self */
+	pq_sendbyte(out, flags);
+
+	/* fixed fields */
+	pq_sendint64(out, txn->final_lsn);
+	pq_sendint64(out, txn->commit_time);
+	pq_sendint(out, txn->xid, 4);
+}
+
+/*
+ * Write COMMIT to the output stream.
+ */
+void
+pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
+					   ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
+{
+	uint8 flags = 0;
+
+	pq_sendbyte(out, 'C');		/* sending COMMIT */
+
+	/* send the flags field */
+	pq_sendbyte(out, flags);
+
+	/* send fixed fields */
+	pq_sendint64(out, commit_lsn);
+	pq_sendint64(out, txn->end_lsn);
+	pq_sendint64(out, txn->commit_time);
+}
+
+/*
+ * Write ORIGIN to the output stream.
+ */
+void
+pglogical_write_origin(StringInfo out, const char *origin,
+						XLogRecPtr origin_lsn)
+{
+	uint8	flags = 0;
+	uint8	len;
+
+	Assert(strlen(origin) < 255);
+
+	pq_sendbyte(out, 'O');		/* ORIGIN */
+
+	/* send the flags field its self */
+	pq_sendbyte(out, flags);
+
+	/* fixed fields */
+	pq_sendint64(out, origin_lsn);
+
+	/* origin */
+	len = strlen(origin) + 1;
+	pq_sendbyte(out, len);
+	pq_sendbytes(out, origin, len);
+}
+
+/*
+ * Write INSERT to the output stream.
+ */
+void
+pglogical_write_insert(StringInfo out, PGLogicalOutputData *data,
+						Relation rel, HeapTuple newtuple)
+{
+	uint8 flags = 0;
+
+	pq_sendbyte(out, 'I');		/* action INSERT */
+
+	/* send the flags field */
+	pq_sendbyte(out, flags);
+
+	/* use Oid as relation identifier */
+	pq_sendint(out, RelationGetRelid(rel), 4);
+
+	pq_sendbyte(out, 'N');		/* new tuple follows */
+	pglogical_write_tuple(out, data, rel, newtuple);
+}
+
+/*
+ * Write UPDATE to the output stream.
+ */
+void
+pglogical_write_update(StringInfo out, PGLogicalOutputData *data,
+						Relation rel, HeapTuple oldtuple, HeapTuple newtuple)
+{
+	uint8 flags = 0;
+
+	pq_sendbyte(out, 'U');		/* action UPDATE */
+
+	/* send the flags field */
+	pq_sendbyte(out, flags);
+
+	/* use Oid as relation identifier */
+	pq_sendint(out, RelationGetRelid(rel), 4);
+
+	/* FIXME support whole tuple (O tuple type) */
+	if (oldtuple != NULL)
+	{
+		pq_sendbyte(out, 'K');	/* old key follows */
+		pglogical_write_tuple(out, data, rel, oldtuple);
+	}
+
+	pq_sendbyte(out, 'N');		/* new tuple follows */
+	pglogical_write_tuple(out, data, rel, newtuple);
+}
+
+/*
+ * Write DELETE to the output stream.
+ */
+void
+pglogical_write_delete(StringInfo out, PGLogicalOutputData *data,
+						Relation rel, HeapTuple oldtuple)
+{
+	uint8 flags = 0;
+
+	pq_sendbyte(out, 'D');		/* action DELETE */
+
+	/* send the flags field */
+	pq_sendbyte(out, flags);
+
+	/* use Oid as relation identifier */
+	pq_sendint(out, RelationGetRelid(rel), 4);
+
+	/* FIXME support whole tuple (O tuple type) */
+	pq_sendbyte(out, 'K');	/* old key follows */
+	pglogical_write_tuple(out, data, rel, oldtuple);
+}
+
+/*
+ * Most of the brains for startup message creation lives in
+ * pglogical_config.c, so this presently just sends the set of key/value pairs.
+ */
+void
+write_startup_message(StringInfo out, List *msg)
+{
+	ListCell *lc;
+
+	pq_sendbyte(out, 'S');	/* message type field */
+	pq_sendbyte(out, 1); 	/* startup message version */
+	foreach (lc, msg)
+	{
+		DefElem *param = (DefElem*)lfirst(lc);
+		Assert(IsA(param->arg, String) && strVal(param->arg) != NULL);
+		/* null-terminated key and value pairs, in client_encoding */
+		pq_sendstring(out, param->defname);
+		pq_sendstring(out, strVal(param->arg));
+	}
+}
+
+/*
+ * Write a tuple to the outputstream, in the most efficient format possible.
+ */
+static void
+pglogical_write_tuple(StringInfo out, PGLogicalOutputData *data,
+					   Relation rel, HeapTuple tuple)
+{
+	TupleDesc	desc;
+	Datum		values[MaxTupleAttributeNumber];
+	bool		isnull[MaxTupleAttributeNumber];
+	int			i;
+	uint16		nliveatts = 0;
+
+	desc = RelationGetDescr(rel);
+
+	pq_sendbyte(out, 'T');			/* sending TUPLE */
+
+	for (i = 0; i < desc->natts; i++)
+	{
+		if (desc->attrs[i]->attisdropped)
+			continue;
+		nliveatts++;
+	}
+	pq_sendint(out, nliveatts, 2);
+
+	/* try to allocate enough memory from the get go */
+	enlargeStringInfo(out, tuple->t_len +
+					  nliveatts * (1 + 4));
+
+	/*
+	 * XXX: should this prove to be a relevant bottleneck, it might be
+	 * interesting to inline heap_deform_tuple() here, we don't actually need
+	 * the information in the form we get from it.
+	 */
+	heap_deform_tuple(tuple, desc, values, isnull);
+
+	for (i = 0; i < desc->natts; i++)
+	{
+		HeapTuple	typtup;
+		Form_pg_type typclass;
+		Form_pg_attribute att = desc->attrs[i];
+		char		transfer_type;
+
+		/* skip dropped columns */
+		if (att->attisdropped)
+			continue;
+
+		if (isnull[i])
+		{
+			pq_sendbyte(out, 'n');	/* null column */
+			continue;
+		}
+		else if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
+		{
+			pq_sendbyte(out, 'u');	/* unchanged toast column */
+			continue;
+		}
+
+		typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
+		if (!HeapTupleIsValid(typtup))
+			elog(ERROR, "cache lookup failed for type %u", att->atttypid);
+		typclass = (Form_pg_type) GETSTRUCT(typtup);
+
+		transfer_type = decide_datum_transfer(att, typclass,
+											  data->allow_internal_basetypes,
+											  data->allow_binary_basetypes);
+
+		switch (transfer_type)
+		{
+			case 'i':
+				pq_sendbyte(out, 'i');	/* internal-format binary data follows */
+
+				/* pass by value */
+				if (att->attbyval)
+				{
+					pq_sendint(out, att->attlen, 4); /* length */
+
+					enlargeStringInfo(out, att->attlen);
+					store_att_byval(out->data + out->len, values[i],
+									att->attlen);
+					out->len += att->attlen;
+					out->data[out->len] = '\0';
+				}
+				/* fixed length non-varlena pass-by-reference type */
+				else if (att->attlen > 0)
+				{
+					pq_sendint(out, att->attlen, 4); /* length */
+
+					appendBinaryStringInfo(out, DatumGetPointer(values[i]),
+										   att->attlen);
+				}
+				/* varlena type */
+				else if (att->attlen == -1)
+				{
+					char *data = DatumGetPointer(values[i]);
+
+					/* send indirect datums inline */
+					if (VARATT_IS_EXTERNAL_INDIRECT(values[i]))
+					{
+						struct varatt_indirect redirect;
+						VARATT_EXTERNAL_GET_POINTER(redirect, data);
+						data = (char *) redirect.pointer;
+					}
+
+					Assert(!VARATT_IS_EXTERNAL(data));
+
+					pq_sendint(out, VARSIZE_ANY(data), 4); /* length */
+
+					appendBinaryStringInfo(out, data, VARSIZE_ANY(data));
+				}
+				else
+					elog(ERROR, "unsupported tuple type");
+
+				break;
+
+			case 'b':
+				{
+					bytea	   *outputbytes;
+					int			len;
+
+					pq_sendbyte(out, 'b');	/* binary send/recv data follows */
+
+					outputbytes = OidSendFunctionCall(typclass->typsend,
+													  values[i]);
+
+					len = VARSIZE(outputbytes) - VARHDRSZ;
+					pq_sendint(out, len, 4); /* length */
+					pq_sendbytes(out, VARDATA(outputbytes), len); /* data */
+					pfree(outputbytes);
+				}
+				break;
+
+			default:
+				{
+					char   	   *outputstr;
+					int			len;
+
+					pq_sendbyte(out, 't');	/* 'text' data follows */
+
+					outputstr =	OidOutputFunctionCall(typclass->typoutput,
+													  values[i]);
+					len = strlen(outputstr) + 1;
+					pq_sendint(out, len, 4); /* length */
+					appendBinaryStringInfo(out, outputstr, len); /* data */
+					pfree(outputstr);
+				}
+		}
+
+		ReleaseSysCache(typtup);
+	}
+}
+
+/*
+ * Make the executive decision about which protocol to use.
+ */
+static char
+decide_datum_transfer(Form_pg_attribute att, Form_pg_type typclass,
+					  bool allow_internal_basetypes,
+					  bool allow_binary_basetypes)
+{
+	/*
+	 * Use the binary protocol, if allowed, for builtin & plain datatypes.
+	 */
+	if (allow_internal_basetypes &&
+		typclass->typtype == 'b' &&
+		att->atttypid < FirstNormalObjectId &&
+		typclass->typelem == InvalidOid)
+	{
+		return 'i';
+	}
+	/*
+	 * Use send/recv, if allowed, if the type is plain or builtin.
+	 *
+	 * XXX: we can't use send/recv for array or composite types for now due to
+	 * the embedded oids.
+	 */
+	else if (allow_binary_basetypes &&
+			 OidIsValid(typclass->typreceive) &&
+			 (att->atttypid < FirstNormalObjectId || typclass->typtype != 'c') &&
+			 (att->atttypid < FirstNormalObjectId || typclass->typelem == InvalidOid))
+	{
+		return 'b';
+	}
+
+	return 't';
+}
diff --git a/contrib/pglogical_output/pglogical_proto_native.h b/contrib/pglogical_output/pglogical_proto_native.h
new file mode 100644
index 0000000..729bee0
--- /dev/null
+++ b/contrib/pglogical_output/pglogical_proto_native.h
@@ -0,0 +1,37 @@
+/*-------------------------------------------------------------------------
+ *
+ * pglogical_proto_native.h
+ *		pglogical protocol, native implementation
+ *
+ * Copyright (c) 2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  pglogical_proto_native.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_LOGICAL_PROTO_NATIVE_H
+#define PG_LOGICAL_PROTO_NATIVE_H
+
+
+extern void pglogical_write_rel(StringInfo out, Relation rel);
+
+extern void pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
+							ReorderBufferTXN *txn);
+extern void pglogical_write_commit(StringInfo out,PGLogicalOutputData *data,
+							ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
+
+extern void pglogical_write_origin(StringInfo out, const char *origin,
+							XLogRecPtr origin_lsn);
+
+extern void pglogical_write_insert(StringInfo out, PGLogicalOutputData *data,
+							Relation rel, HeapTuple newtuple);
+extern void pglogical_write_update(StringInfo out, PGLogicalOutputData *data,
+							Relation rel, HeapTuple oldtuple,
+							HeapTuple newtuple);
+extern void pglogical_write_delete(StringInfo out, PGLogicalOutputData *data,
+							Relation rel, HeapTuple oldtuple);
+
+extern void write_startup_message(StringInfo out, List *msg);
+
+#endif /* PG_LOGICAL_PROTO_NATIVE_H */
diff --git a/contrib/pglogical_output/regression.conf b/contrib/pglogical_output/regression.conf
new file mode 100644
index 0000000..367f706
--- /dev/null
+++ b/contrib/pglogical_output/regression.conf
@@ -0,0 +1,2 @@
+wal_level = logical
+max_replication_slots = 4
diff --git a/contrib/pglogical_output/sql/basic_json.sql b/contrib/pglogical_output/sql/basic_json.sql
new file mode 100644
index 0000000..e8a2352
--- /dev/null
+++ b/contrib/pglogical_output/sql/basic_json.sql
@@ -0,0 +1,24 @@
+\i sql/basic_setup.sql
+
+-- Simple decode with text-format tuples
+TRUNCATE TABLE json_decoding_output;
+
+INSERT INTO json_decoding_output(ch, rn)
+SELECT
+  data::jsonb,
+  row_number() OVER ()
+FROM pg_logical_slot_peek_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'proto_format', 'json',
+	'no_txinfo', 't');
+
+SELECT * FROM get_startup_params();
+SELECT * FROM get_queued_data();
+
+TRUNCATE TABLE json_decoding_output;
+
+\i sql/basic_teardown.sql
diff --git a/contrib/pglogical_output/sql/basic_native.sql b/contrib/pglogical_output/sql/basic_native.sql
new file mode 100644
index 0000000..5b6ec4b
--- /dev/null
+++ b/contrib/pglogical_output/sql/basic_native.sql
@@ -0,0 +1,27 @@
+\i sql/basic_setup.sql
+
+-- Simple decode with text-format tuples
+--
+-- It's still the logical decoding binary protocol and as such it has
+-- embedded timestamps, and pglogical its self has embedded LSNs, xids,
+-- etc. So all we can really do is say "yup, we got the expected number
+-- of messages".
+SELECT count(data) FROM pg_logical_slot_peek_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1');
+
+-- ... and send/recv binary format
+-- The main difference visible is that the bytea fields aren't encoded
+SELECT count(data) FROM pg_logical_slot_peek_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'binary.want_binary_basetypes', '1',
+	'binary.basetypes_major_version', (current_setting('server_version_num')::integer / 100)::text);
+
+\i sql/basic_teardown.sql
diff --git a/contrib/pglogical_output/sql/basic_setup.sql b/contrib/pglogical_output/sql/basic_setup.sql
new file mode 100644
index 0000000..19e154c
--- /dev/null
+++ b/contrib/pglogical_output/sql/basic_setup.sql
@@ -0,0 +1,62 @@
+SET synchronous_commit = on;
+
+-- Schema setup
+
+CREATE TABLE demo (
+	seq serial primary key,
+	tx text,
+	ts timestamp,
+	jsb jsonb,
+	js json,
+	ba bytea
+);
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pglogical_output');
+
+-- Queue up some work to decode with a variety of types
+
+INSERT INTO demo(tx) VALUES ('textval');
+INSERT INTO demo(ba) VALUES (BYTEA '\xDEADBEEF0001');
+INSERT INTO demo(ts, tx) VALUES (TIMESTAMP '2045-09-12 12:34:56.00', 'blah');
+INSERT INTO demo(js, jsb) VALUES ('{"key":"value"}', '{"key":"value"}');
+
+-- Rolled back txn
+BEGIN;
+DELETE FROM demo;
+INSERT INTO demo(tx) VALUES ('blahblah');
+ROLLBACK;
+
+-- Multi-statement transaction with subxacts
+BEGIN;
+SAVEPOINT sp1;
+INSERT INTO demo(tx) VALUES ('row1');
+RELEASE SAVEPOINT sp1;
+SAVEPOINT sp2;
+UPDATE demo SET tx = 'update-rollback' WHERE tx = 'row1';
+ROLLBACK TO SAVEPOINT sp2;
+SAVEPOINT sp3;
+INSERT INTO demo(tx) VALUES ('row2');
+INSERT INTO demo(tx) VALUES ('row3');
+RELEASE SAVEPOINT sp3;
+SAVEPOINT sp4;
+DELETE FROM demo WHERE tx = 'row2';
+RELEASE SAVEPOINT sp4;
+SAVEPOINT sp5;
+UPDATE demo SET tx = 'updated' WHERE tx = 'row1';
+COMMIT;
+
+
+-- txn with catalog changes
+BEGIN;
+CREATE TABLE cat_test(id integer);
+INSERT INTO cat_test(id) VALUES (42);
+COMMIT;
+
+-- Aborted subxact with catalog changes
+BEGIN;
+INSERT INTO demo(tx) VALUES ('1');
+SAVEPOINT sp1;
+ALTER TABLE demo DROP COLUMN tx;
+ROLLBACK TO SAVEPOINT sp1;
+INSERT INTO demo(tx) VALUES ('2');
+COMMIT;
diff --git a/contrib/pglogical_output/sql/basic_teardown.sql b/contrib/pglogical_output/sql/basic_teardown.sql
new file mode 100644
index 0000000..d7a752f
--- /dev/null
+++ b/contrib/pglogical_output/sql/basic_teardown.sql
@@ -0,0 +1,4 @@
+SELECT 'drop' FROM pg_drop_replication_slot('regression_slot');
+
+DROP TABLE demo;
+DROP TABLE cat_test;
diff --git a/contrib/pglogical_output/sql/cleanup.sql b/contrib/pglogical_output/sql/cleanup.sql
new file mode 100644
index 0000000..e7a02c8
--- /dev/null
+++ b/contrib/pglogical_output/sql/cleanup.sql
@@ -0,0 +1,4 @@
+DROP TABLE excluded_startup_keys;
+DROP TABLE json_decoding_output;
+DROP FUNCTION get_queued_data();
+DROP FUNCTION get_startup_params();
diff --git a/contrib/pglogical_output/sql/encoding_json.sql b/contrib/pglogical_output/sql/encoding_json.sql
new file mode 100644
index 0000000..543c306
--- /dev/null
+++ b/contrib/pglogical_output/sql/encoding_json.sql
@@ -0,0 +1,58 @@
+SET synchronous_commit = on;
+
+-- This file doesn't share common setup with the native tests,
+-- since it's specific to how the text protocol handles encodings.
+
+CREATE TABLE enctest(blah text);
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pglogical_output');
+
+
+SET client_encoding = 'UTF-8';
+INSERT INTO enctest(blah)
+VALUES
+('áàä'),('ﬂ'), ('½⅓'), ('カンジ');
+RESET client_encoding;
+
+
+SET client_encoding = 'LATIN-1';
+
+-- Will ERROR, explicit encoding request doesn't match client_encoding
+SELECT data
+FROM pg_logical_slot_peek_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'proto_format', 'json',
+	'no_txinfo', 't');
+
+-- Will succeed since we don't request any encoding
+-- then ERROR because it can't turn the kanjii into latin-1
+SELECT data
+FROM pg_logical_slot_peek_changes('regression_slot',
+	NULL, NULL,
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'proto_format', 'json',
+	'no_txinfo', 't');
+
+-- Will succeed since it matches the current encoding
+-- then ERROR because it can't turn the kanjii into latin-1
+SELECT data
+FROM pg_logical_slot_peek_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'LATIN-1',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'proto_format', 'json',
+	'no_txinfo', 't');
+
+RESET client_encoding;
+
+SELECT 'drop' FROM pg_drop_replication_slot('regression_slot');
+
+DROP TABLE enctest;
diff --git a/contrib/pglogical_output/sql/hooks_json.sql b/contrib/pglogical_output/sql/hooks_json.sql
new file mode 100644
index 0000000..cd58960
--- /dev/null
+++ b/contrib/pglogical_output/sql/hooks_json.sql
@@ -0,0 +1,49 @@
+\i sql/hooks_setup.sql
+
+
+-- Test table filter
+TRUNCATE TABLE json_decoding_output;
+
+INSERT INTO json_decoding_output(ch, rn)
+SELECT
+  data::jsonb,
+  row_number() OVER ()
+FROM pg_logical_slot_peek_changes('regression_slot',
+ 	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'hooks.setup_function', 'public.pglo_plhooks_setup_fn',
+	'pglo_plhooks.row_filter_hook', 'public.test_filter',
+	'pglo_plhooks.client_hook_arg', 'foo',
+	'proto_format', 'json',
+	'no_txinfo', 't');
+
+SELECT * FROM get_startup_params();
+SELECT * FROM get_queued_data();
+
+-- test action filter
+TRUNCATE TABLE json_decoding_output;
+
+INSERT INTO json_decoding_output (ch, rn)
+SELECT
+  data::jsonb,
+  row_number() OVER ()
+FROM pg_logical_slot_peek_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'hooks.setup_function', 'public.pglo_plhooks_setup_fn',
+	'pglo_plhooks.row_filter_hook', 'public.test_action_filter',
+	'proto_format', 'json',
+	'no_txinfo', 't');
+
+SELECT * FROM get_startup_params();
+SELECT * FROM get_queued_data();
+
+TRUNCATE TABLE json_decoding_output;
+
+\i sql/hooks_teardown.sql
diff --git a/contrib/pglogical_output/sql/hooks_native.sql b/contrib/pglogical_output/sql/hooks_native.sql
new file mode 100644
index 0000000..e2bfc54
--- /dev/null
+++ b/contrib/pglogical_output/sql/hooks_native.sql
@@ -0,0 +1,48 @@
+\i sql/hooks_setup.sql
+
+-- Regular hook setup
+SELECT count(data) FROM pg_logical_slot_peek_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'hooks.setup_function', 'public.pglo_plhooks_setup_fn',
+	'pglo_plhooks.row_filter_hook', 'public.test_filter',
+	'pglo_plhooks.client_hook_arg', 'foo'
+	);
+
+-- Test action filter
+SELECT count(data) FROM pg_logical_slot_peek_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'hooks.setup_function', 'public.pglo_plhooks_setup_fn',
+	'pglo_plhooks.row_filter_hook', 'public.test_action_filter'
+	);
+
+-- Invalid row fiter hook function
+SELECT count(data) FROM pg_logical_slot_peek_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'hooks.setup_function', 'public.pglo_plhooks_setup_fn',
+	'pglo_plhooks.row_filter_hook', 'public.nosuchfunction'
+	);
+
+-- Hook filter functoin with wrong signature
+SELECT count(data) FROM pg_logical_slot_peek_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'hooks.setup_function', 'public.pglo_plhooks_setup_fn',
+	'pglo_plhooks.row_filter_hook', 'public.wrong_signature_fn'
+	);
+
+\i sql/hooks_teardown.sql
diff --git a/contrib/pglogical_output/sql/hooks_setup.sql b/contrib/pglogical_output/sql/hooks_setup.sql
new file mode 100644
index 0000000..4de15b7
--- /dev/null
+++ b/contrib/pglogical_output/sql/hooks_setup.sql
@@ -0,0 +1,37 @@
+CREATE EXTENSION pglogical_output_plhooks;
+
+CREATE FUNCTION test_filter(relid regclass, action "char", nodeid text)
+returns bool stable language plpgsql AS $$
+BEGIN
+	IF nodeid <> 'foo' THEN
+	    RAISE EXCEPTION 'Expected nodeid <foo>, got <%>',nodeid;
+	END IF;
+	RETURN relid::regclass::text NOT LIKE '%_filter%';
+END
+$$;
+
+CREATE FUNCTION test_action_filter(relid regclass, action "char", nodeid text)
+returns bool stable language plpgsql AS $$
+BEGIN
+    RETURN action NOT IN ('U', 'D');
+END
+$$;
+
+CREATE FUNCTION wrong_signature_fn(relid regclass)
+returns bool stable language plpgsql as $$
+BEGIN
+END;
+$$;
+
+CREATE TABLE test_filter(id integer);
+CREATE TABLE test_nofilt(id integer);
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pglogical_output');
+
+INSERT INTO test_filter(id) SELECT generate_series(1,10);
+INSERT INTO test_nofilt(id) SELECT generate_series(1,10);
+
+DELETE FROM test_filter WHERE id % 2 = 0;
+DELETE FROM test_nofilt WHERE id % 2 = 0;
+UPDATE test_filter SET id = id*100 WHERE id = 5;
+UPDATE test_nofilt SET id = id*100 WHERE id = 5;
diff --git a/contrib/pglogical_output/sql/hooks_teardown.sql b/contrib/pglogical_output/sql/hooks_teardown.sql
new file mode 100644
index 0000000..837e2d0
--- /dev/null
+++ b/contrib/pglogical_output/sql/hooks_teardown.sql
@@ -0,0 +1,10 @@
+SELECT 'drop' FROM pg_drop_replication_slot('regression_slot');
+
+DROP TABLE test_filter;
+DROP TABLE test_nofilt;
+
+DROP FUNCTION test_filter(relid regclass, action "char", nodeid text);
+DROP FUNCTION test_action_filter(relid regclass, action "char", nodeid text);
+DROP FUNCTION wrong_signature_fn(relid regclass);
+
+DROP EXTENSION pglogical_output_plhooks;
diff --git a/contrib/pglogical_output/sql/params_native.sql b/contrib/pglogical_output/sql/params_native.sql
new file mode 100644
index 0000000..8b08732
--- /dev/null
+++ b/contrib/pglogical_output/sql/params_native.sql
@@ -0,0 +1,95 @@
+SET synchronous_commit = on;
+
+-- no need to CREATE EXTENSION as we intentionally don't have any catalog presence
+-- Instead, just create a slot.
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pglogical_output');
+
+-- Minimal invocation with no data
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1');
+
+--
+-- Various invalid parameter combos:
+--
+
+-- Text mode is not supported for native protocol
+SELECT data FROM pg_logical_slot_get_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1');
+
+-- error, only supports proto v1
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '2',
+	'max_proto_version', '1',
+	'startup_params_format', '1');
+
+-- error, only supports proto v1
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '2',
+	'max_proto_version', '2',
+	'startup_params_format', '1');
+
+-- error, unrecognised startup params format
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '2');
+
+-- Should be OK and result in proto version 1 selection, though we won't
+-- see that here.
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '2',
+	'startup_params_format', '1');
+
+-- no such encoding / encoding mismatch
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'bork',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1');
+
+-- Different spellings of encodings are OK too
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF-8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1');
+
+-- bogus param format
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'proto_format', 'invalid');
+
+-- native params format explicitly
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot',
+	NULL, NULL,
+	'expected_encoding', 'UTF8',
+	'min_proto_version', '1',
+	'max_proto_version', '1',
+	'startup_params_format', '1',
+	'proto_format', 'native');
+
+SELECT 'drop' FROM pg_drop_replication_slot('regression_slot');
diff --git a/contrib/pglogical_output/sql/prep.sql b/contrib/pglogical_output/sql/prep.sql
new file mode 100644
index 0000000..26e79c8
--- /dev/null
+++ b/contrib/pglogical_output/sql/prep.sql
@@ -0,0 +1,30 @@
+CREATE TABLE excluded_startup_keys (key_name text primary key);
+
+INSERT INTO excluded_startup_keys
+VALUES
+('pg_version_num'),('pg_version'),('pg_catversion'),('binary.basetypes_major_version'),('binary.integer_datetimes'),('binary.bigendian'),('binary.maxalign'),('binary.binary_pg_version'),('sizeof_int'),('sizeof_long'),('sizeof_datum');
+
+CREATE UNLOGGED TABLE json_decoding_output(ch jsonb, rn integer);
+
+CREATE OR REPLACE FUNCTION get_startup_params()
+RETURNS TABLE ("key" text, "value" jsonb)
+LANGUAGE sql
+AS $$
+SELECT key, value
+FROM json_decoding_output
+CROSS JOIN LATERAL jsonb_each(ch -> 'params')
+WHERE rn = 1
+  AND key NOT IN (SELECT * FROM excluded_startup_keys)
+  AND ch ->> 'action' = 'S'
+ORDER BY key;
+$$;
+
+CREATE OR REPLACE FUNCTION get_queued_data()
+RETURNS TABLE (data jsonb)
+LANGUAGE sql
+AS $$
+SELECT ch
+FROM json_decoding_output
+WHERE rn > 1
+ORDER BY rn ASC;
+$$;
diff --git a/contrib/pglogical_output_plhooks/.gitignore b/contrib/pglogical_output_plhooks/.gitignore
new file mode 100644
index 0000000..140f8cf
--- /dev/null
+++ b/contrib/pglogical_output_plhooks/.gitignore
@@ -0,0 +1 @@
+*.so
diff --git a/contrib/pglogical_output_plhooks/Makefile b/contrib/pglogical_output_plhooks/Makefile
new file mode 100644
index 0000000..ecd3f89
--- /dev/null
+++ b/contrib/pglogical_output_plhooks/Makefile
@@ -0,0 +1,13 @@
+MODULES = pglogical_output_plhooks
+EXTENSION = pglogical_output_plhooks
+DATA = pglogical_output_plhooks--1.0.sql
+DOCS = README.pglogical_output_plhooks
+
+subdir = contrib/pglogical_output_plhooks
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+
+# Allow the hook plugin to see the pglogical_output headers
+# Necessary because !PGXS builds don't respect PG_CPPFLAGS
+override CPPFLAGS := $(CPPFLAGS) -I$(top_srcdir)/contrib/pglogical_output
diff --git a/contrib/pglogical_output_plhooks/README.pglogical_output_plhooks b/contrib/pglogical_output_plhooks/README.pglogical_output_plhooks
new file mode 100644
index 0000000..f2ad9d4
--- /dev/null
+++ b/contrib/pglogical_output_plhooks/README.pglogical_output_plhooks
@@ -0,0 +1,158 @@
+pglogical_output_plhooks is an example module for pglogical_output, showing how
+hooks can be implemented.
+
+It provides C wrappers to allow hooks to be written in any supported PL,
+such as PL/PgSQL.
+
+No effort is made to be efficient. To avoid the need to set up cache
+invalidation handling function calls are done via oid each time, with no
+FmgrInfo caching.  Also, memory contexts are reset rather freely. If you
+want efficiency, write your hook in C.
+
+(Catalog timetravel is another reason not to write hooks in PLs; see below).
+
+Simple pointless example
+===
+
+To compile and install, just "make USE_PGXS=1 install". Note that pglogical
+must already be installed so that its headers can be found. You might have
+to set the `PATH` so that `pg_config` can be found.
+
+To use it:
+
+    CREATE EXTENSION pglogical_output_plhooks IN SCHEMA public;
+
+in the target database.
+
+Then create at least one hook procedure, of the supported hooks listed below.
+For the sake of this example we'll use some of the toy examples provided in the
+extension:
+
+* startup function: pglo_plhooks_demo_startup
+* row filter: pglo_plhooks_demo_row_filter
+* txn filter: pglo_plhooks_demo_txn_filter
+* shutdown function: pglo_plhooks_demo_shutdown
+
+Now add some arguments to your pglogical_output client's logical decoding setup
+parameters to specify the hook setup function and to tell
+pglogical_output_plhooks about one or more of the hooks you wish it to run. For
+example you might add the following parameters:
+
+	hooks.setup_function, public.pglo_plhooks_setup_fn,
+	pglo_plhooks.startup_hook, pglo_plhooks_demo_startup,
+	pglo_plhooks.row_filter_hook, pglo_plhooks_demo_row_filter,
+	pglo_plhooks.txn_filter_hook, pglo_plhooks_demo_txn_filter,
+	pglo_plhooks.shutdown_hook, pglo_plhooks_demo_shutdown,
+	pglo_plhooks.client_hook_arg, 'whatever-you-want'
+
+to configure the extension to load its hooks, then configure all the demo hooks.
+
+Why the preference for C hooks?
+===
+
+Speed. The row filter hook is called for *every single row* replicated.
+
+If a hook raises an ERROR then replication will probably stop. You won't be
+able to fix it either, because when you change the hook definition the new
+definition won't be visible in the catalogs at the current replay position due
+to catalog time travel. The old definition that raises an error will keep being
+used. You'll need to remove the problem hook from your logical decoding startup
+parameters, which will disable use the hook entirely, until replay proceeds
+past the point you fixed the problem with the hook function.
+
+Similarly, if you try to add use of a newly defined hook on an existing
+replication slot that hasn't replayed past the point you defined the hook yet,
+you'll get an error complaining that the hook function doesn't exist. Even
+though it clearly does when you look at it in psql. The reason is the same: in
+the time traveled catalogs it really doesn't exist. You have to replay past the
+point the hook was created then enable it. In this case the
+pglogical_output_plhooks startup hook will actually see your functions, but
+fail when it tries to call them during decoding since they'll appear to have
+vanished.
+
+If you write your hooks in C you can redefine them rather more easily, since
+the function definition is not subject to catalog timetravel. More importantly,
+it'll probably be a lot faster. The plhooks code has to do a lot of translation
+to pass information to the PL functions and more to get results back; it also
+has to do a lot of memory allocations and a memory context reset after each
+call. That all adds up.
+
+(You could actually write C functions to be called by this extension, but
+that'd be crazy.)
+
+Available hooks
+===
+
+The four hooks provided by pglogical_output are exposed by the module. See the
+pglogical_output documentation for details on what each hook does and when it
+runs.
+
+A function for each hook must have *exactly* the specified parameters and
+return value, or you'll get an error.
+
+None of the functions may return NULL. If they do you'll get an error.
+
+If you specified `pglo_plhooks.client_hook_arg` in the startup parameters it is
+passed as `client_hook_arg` to all hooks. If not specified the empty string is
+passed.
+
+You can find some toy examples in `pglogical_output_plhooks--1.0.sql`.
+
+
+
+Startup hook
+---
+
+Configured with `pglo_plhooks.startup_hook` startup parameter. Runs when
+logical decoding starts.
+
+Signature *must* be:
+
+    CREATE FUNCTION whatever_funcname(startup_params text[], client_hook_arg text)
+    RETURNS text[]
+
+startup_params is an array of the startup params passed to the pglogical output
+plugin, as alternating key/value elements in text representation.
+
+client_hook_arg is also passed.
+
+The return value is an array of alternating key/value elements forming a set
+of parameters you wish to add to the startup reply message sent by pglogical
+on decoding start. It must not be null; return `ARRAY[]::text[]` if you don't
+want to add any params.
+
+Transaction filter
+---
+
+The arguments are the replication origin identifier and the client hook param.
+
+The return value is true to keep the transaction, false to discard it.
+
+Signature:
+
+	CREATE FUNCTION whatevername(origin_id int, client_hook_arg text)
+	RETURNS boolean
+
+Row filter
+--
+
+Called for each row. Return true to replicate the row, false to discard it.
+
+Arguments are the oid of the affected relation, and the change type: 'I'nsert,
+'U'pdate or 'D'elete. There is no way to access the change data - columns changed,
+new values, etc.
+
+Signature:
+
+	CREATE FUNCTION whatevername(affected_rel regclass, change_type "char", client_hook_arg text)
+	RETURNS boolean
+
+Shutdown hook
+--
+
+Pretty uninteresting, but included for completeness.
+
+Signature:
+
+	CREATE FUNCTION whatevername(client_hook_arg text)
+	RETURNS void
diff --git a/contrib/pglogical_output_plhooks/pglogical_output_plhooks--1.0.sql b/contrib/pglogical_output_plhooks/pglogical_output_plhooks--1.0.sql
new file mode 100644
index 0000000..cdd2af3
--- /dev/null
+++ b/contrib/pglogical_output_plhooks/pglogical_output_plhooks--1.0.sql
@@ -0,0 +1,89 @@
+\echo Use "CREATE EXTENSION pglogical_output_plhooks" to load this file. \quit
+
+-- Use @extschema@ or leave search_path unchanged, don't use explicit schema
+
+CREATE FUNCTION pglo_plhooks_setup_fn(internal)
+RETURNS void
+STABLE
+LANGUAGE c AS 'MODULE_PATHNAME';
+
+COMMENT ON FUNCTION pglo_plhooks_setup_fn(internal)
+IS 'Register pglogical output pl hooks. See docs for how to specify functions';
+
+--
+-- Called as the startup hook.
+--
+-- There's no useful way to expose the private data segment, so you
+-- just don't get to use that from pl hooks at this point. The C
+-- wrapper will extract a startup param named pglo_plhooks.client_hook_arg
+-- for you and pass it as client_hook_arg to all callbacks, though.
+--
+-- For implementation convenience, a null client_hook_arg is passed
+-- as the empty string.
+--
+-- Must return the empty array, not NULL, if it has nothing to add.
+--
+CREATE FUNCTION pglo_plhooks_demo_startup(startup_params text[], client_hook_arg text)
+RETURNS text[]
+LANGUAGE plpgsql AS $$
+DECLARE
+    elem text;
+	paramname text;
+	paramvalue text;
+BEGIN
+	FOREACH elem IN ARRAY startup_params
+	LOOP
+		IF elem IS NULL THEN
+				RAISE EXCEPTION 'Startup params may not be null';
+		END IF;
+
+		IF paramname IS NULL THEN
+				paramname := elem;
+		ELSIF paramvalue IS NULL THEN
+				paramvalue := elem;
+		ELSE
+				RAISE NOTICE 'got param: % = %', paramname, paramvalue;
+				paramname := NULL;
+				paramvalue := NULL;
+		END IF;
+	END LOOP;
+
+	RETURN ARRAY['pglo_plhooks_demo_startup_ran', 'true', 'otherparam', '42'];
+END;
+$$;
+
+CREATE FUNCTION pglo_plhooks_demo_txn_filter(origin_id int, client_hook_arg text)
+RETURNS boolean
+LANGUAGE plpgsql AS $$
+BEGIN
+		-- Not much to filter on really...
+		RAISE NOTICE 'Got tx with origin %',origin_id;
+		RETURN true;
+END;
+$$;
+
+CREATE FUNCTION pglo_plhooks_demo_row_filter(affected_rel regclass, change_type "char", client_hook_arg text)
+RETURNS boolean
+LANGUAGE plpgsql AS $$
+BEGIN
+		-- This is a totally absurd test, since it checks if the upstream user
+		-- doing replication has rights to make modifications that have already
+		-- been committed and are being decoded for replication. Still, it shows
+		-- how the hook works...
+		IF pg_catalog.has_table_privilege(current_user, affected_rel,
+				CASE change_type WHEN 'I' THEN 'INSERT' WHEN 'U' THEN 'UPDATE' WHEN 'D' THEN 'DELETE' END)
+		THEN
+				RETURN true;
+		ELSE
+				RETURN false;
+		END IF;
+END;
+$$;
+
+CREATE FUNCTION pglo_plhooks_demo_shutdown(client_hook_arg text)
+RETURNS void
+LANGUAGE plpgsql AS $$
+BEGIN
+		RAISE NOTICE 'Decoding shutdown';
+END;
+$$
diff --git a/contrib/pglogical_output_plhooks/pglogical_output_plhooks.c b/contrib/pglogical_output_plhooks/pglogical_output_plhooks.c
new file mode 100644
index 0000000..a5144f0
--- /dev/null
+++ b/contrib/pglogical_output_plhooks/pglogical_output_plhooks.c
@@ -0,0 +1,414 @@
+#include "postgres.h"
+
+#include "pglogical_output/hooks.h"
+
+#include "access/xact.h"
+
+#include "catalog/pg_type.h"
+
+#include "nodes/makefuncs.h"
+
+#include "parser/parse_func.h"
+
+#include "replication/reorderbuffer.h"
+
+#include "utils/acl.h"
+#include "utils/array.h"
+#include "utils/builtins.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/rel.h"
+
+#include "fmgr.h"
+#include "miscadmin.h"
+
+PG_MODULE_MAGIC;
+
+PGDLLEXPORT extern Datum pglo_plhooks_setup_fn(PG_FUNCTION_ARGS);
+PG_FUNCTION_INFO_V1(pglo_plhooks_setup_fn);
+
+void pglo_plhooks_startup(struct PGLogicalStartupHookArgs *startup_args);
+void pglo_plhooks_shutdown(struct PGLogicalShutdownHookArgs *shutdown_args);
+bool pglo_plhooks_row_filter(struct PGLogicalRowFilterArgs *rowfilter_args);
+bool pglo_plhooks_txn_filter(struct PGLogicalTxnFilterArgs *txnfilter_args);
+
+typedef struct PLHPrivate
+{
+	const char *client_arg;
+	Oid startup_hook;
+	Oid shutdown_hook;
+	Oid row_filter_hook;
+	Oid txn_filter_hook;
+	MemoryContext hook_call_context;
+} PLHPrivate;
+
+static void read_parameters(PLHPrivate *private, List *in_params);
+static Oid find_startup_hook(const char *proname);
+static Oid find_shutdown_hook(const char *proname);
+static Oid find_row_filter_hook(const char *proname);
+static Oid find_txn_filter_hook(const char *proname);
+static void exec_user_startup_hook(PLHPrivate *private, List *in_params, List **out_params);
+
+void
+pglo_plhooks_startup(struct PGLogicalStartupHookArgs *startup_args)
+{
+	PLHPrivate *private;
+
+	/* pglogical_output promises to call us in a tx */
+	Assert(IsTransactionState());
+
+	/* Allocated in hook memory context, scoped to the logical decoding session: */
+	startup_args->private_data = private = (PLHPrivate*)palloc(sizeof(PLHPrivate));
+
+	private->startup_hook = InvalidOid;
+	private->shutdown_hook = InvalidOid;
+	private->row_filter_hook = InvalidOid;
+	private->txn_filter_hook = InvalidOid;
+	/* client_arg is the empty string when not specified to simplify function calls */
+	private->client_arg = "";
+
+	read_parameters(private, startup_args->in_params);
+
+	private->hook_call_context = AllocSetContextCreate(CurrentMemoryContext,
+			                    "pglogical_output plhooks hook call context",
+			                    ALLOCSET_SMALL_MINSIZE,
+			                    ALLOCSET_SMALL_INITSIZE,
+			                    ALLOCSET_SMALL_MAXSIZE);
+
+
+	if (private->startup_hook != InvalidOid)
+		exec_user_startup_hook(private, startup_args->in_params, &startup_args->out_params);
+}
+
+void
+pglo_plhooks_shutdown(struct PGLogicalShutdownHookArgs *shutdown_args)
+{
+	PLHPrivate *private = (PLHPrivate*)shutdown_args->private_data;
+	MemoryContext old_ctx;
+
+	Assert(private != NULL);
+
+	if (OidIsValid(private->shutdown_hook))
+	{
+		old_ctx = MemoryContextSwitchTo(private->hook_call_context);
+		elog(DEBUG3, "calling pglo shutdown hook with %s", private->client_arg);
+		(void) OidFunctionCall1(
+				private->shutdown_hook,
+				CStringGetTextDatum(private->client_arg));
+		elog(DEBUG3, "called pglo shutdown hook");
+		MemoryContextSwitchTo(old_ctx);
+		MemoryContextReset(private->hook_call_context);
+	}
+}
+
+bool
+pglo_plhooks_row_filter(struct PGLogicalRowFilterArgs *rowfilter_args)
+{
+	PLHPrivate *private = (PLHPrivate*)rowfilter_args->private_data;
+	bool ret = true;
+	MemoryContext old_ctx;
+
+	Assert(private != NULL);
+
+	if (OidIsValid(private->row_filter_hook))
+	{
+		char change_type;
+		switch (rowfilter_args->change_type)
+		{
+			case REORDER_BUFFER_CHANGE_INSERT:
+				change_type = 'I';
+				break;
+			case REORDER_BUFFER_CHANGE_UPDATE:
+				change_type = 'U';
+				break;
+			case REORDER_BUFFER_CHANGE_DELETE:
+				change_type = 'D';
+				break;
+			default:
+				elog(ERROR, "unknown change type %d", rowfilter_args->change_type);
+				change_type = '0';	/* silence compiler */
+		}
+
+		old_ctx = MemoryContextSwitchTo(private->hook_call_context);
+		elog(DEBUG3, "calling pglo row filter hook with (%u,%c,%s)",
+				rowfilter_args->changed_rel->rd_id, change_type,
+				private->client_arg);
+		ret = DatumGetBool(OidFunctionCall3(
+				private->row_filter_hook,
+				ObjectIdGetDatum(rowfilter_args->changed_rel->rd_id),
+				CharGetDatum(change_type),
+				CStringGetTextDatum(private->client_arg)));
+		elog(DEBUG3, "called pglo row filter hook, returns %d", (int)ret);
+		MemoryContextSwitchTo(old_ctx);
+		MemoryContextReset(private->hook_call_context);
+	}
+
+	return ret;
+}
+
+bool
+pglo_plhooks_txn_filter(struct PGLogicalTxnFilterArgs *txnfilter_args)
+{
+	PLHPrivate *private = (PLHPrivate*)txnfilter_args->private_data;
+	bool ret = true;
+	MemoryContext old_ctx;
+
+	Assert(private != NULL);
+
+
+	if (OidIsValid(private->txn_filter_hook))
+	{
+		old_ctx = MemoryContextSwitchTo(private->hook_call_context);
+
+		elog(DEBUG3, "calling pglo txn filter hook with (%hu,%s)",
+				txnfilter_args->origin_id, private->client_arg);
+		ret = DatumGetBool(OidFunctionCall2(
+					private->txn_filter_hook,
+					UInt16GetDatum(txnfilter_args->origin_id),
+					CStringGetTextDatum(private->client_arg)));
+		elog(DEBUG3, "calling pglo txn filter hook, returns %d", (int)ret);
+
+		MemoryContextSwitchTo(old_ctx);
+		MemoryContextReset(private->hook_call_context);
+	}
+
+	return ret;
+}
+
+Datum
+pglo_plhooks_setup_fn(PG_FUNCTION_ARGS)
+{
+	struct PGLogicalHooks *hooks = (struct PGLogicalHooks*) PG_GETARG_POINTER(0);
+
+	/* Your code doesn't need this, it's just for the tests: */
+	Assert(hooks != NULL);
+	Assert(hooks->hooks_private_data == NULL);
+	Assert(hooks->startup_hook == NULL);
+	Assert(hooks->shutdown_hook == NULL);
+	Assert(hooks->row_filter_hook == NULL);
+	Assert(hooks->txn_filter_hook == NULL);
+
+	/*
+	 * Just assign the hook pointers. We're not meant to do much
+	 * work here.
+	 *
+	 * Note that private_data is left untouched, to be set up by the
+	 * startup hook.
+	 */
+	hooks->startup_hook = pglo_plhooks_startup;
+	hooks->shutdown_hook = pglo_plhooks_shutdown;
+	hooks->row_filter_hook = pglo_plhooks_row_filter;
+	hooks->txn_filter_hook = pglo_plhooks_txn_filter;
+	elog(DEBUG3, "configured pglo hooks");
+
+	PG_RETURN_VOID();
+}
+
+static void
+exec_user_startup_hook(PLHPrivate *private, List *in_params, List **out_params)
+{
+		ArrayType *startup_params;
+		Datum ret;
+		ListCell *lc;
+		Datum *startup_params_elems;
+		bool  *startup_params_isnulls;
+		int   n_startup_params;
+		int   i;
+		MemoryContext old_ctx;
+
+
+		old_ctx = MemoryContextSwitchTo(private->hook_call_context);
+
+		/*
+		 * Build the input parameter array. NULL parameters are passed as the
+		 * empty string for the sake of convenience. Each param is two
+		 * elements, a key then a value element.
+		 */
+		n_startup_params = list_length(in_params) * 2;
+		startup_params_elems = (Datum*)palloc0(sizeof(Datum)*n_startup_params);
+
+		i = 0;
+		foreach (lc, in_params)
+		{
+			DefElem * elem = (DefElem*)lfirst(lc);
+			const char *val;
+
+			if (elem->arg == NULL || strVal(elem->arg) == NULL)
+				val = "";
+			else
+				val = strVal(elem->arg);
+
+			startup_params_elems[i++] = CStringGetTextDatum(elem->defname);
+			startup_params_elems[i++] = CStringGetTextDatum(val);
+		}
+		Assert(i == n_startup_params);
+
+		startup_params = construct_array(startup_params_elems, n_startup_params,
+				TEXTOID, -1, false, 'i');
+
+		ret = OidFunctionCall2(
+				private->startup_hook,
+				PointerGetDatum(startup_params),
+				CStringGetTextDatum(private->client_arg));
+
+		/*
+		 * deconstruct return array and add pairs of results to a DefElem list.
+		 */
+		deconstruct_array(DatumGetArrayTypeP(ret), TEXTARRAYOID,
+				-1, false, 'i', &startup_params_elems, &startup_params_isnulls,
+				&n_startup_params);
+
+
+		*out_params = NIL;
+		for (i = 0; i < n_startup_params; i = i + 2)
+		{
+			char *value;
+			DefElem *elem;
+
+			if (startup_params_isnulls[i])
+				elog(ERROR, "Array entry corresponding to a key was null at idx=%d", i);
+
+			if (startup_params_isnulls[i+1])
+				value = "";
+			else
+				value = TextDatumGetCString(startup_params_elems[i+1]);
+
+			elem = makeDefElem(
+					TextDatumGetCString(startup_params_elems[i]),
+					(Node*)makeString(value));
+
+			*out_params = lcons(elem, *out_params);
+		}
+
+		MemoryContextSwitchTo(old_ctx);
+		MemoryContextReset(private->hook_call_context);
+}
+
+static void
+read_parameters(PLHPrivate *private, List *in_params)
+{
+	ListCell *option;
+
+	foreach(option, in_params)
+	{
+		DefElem    *elem = lfirst(option);
+
+		if (pg_strcasecmp("pglo_plhooks.client_hook_arg", elem->defname) == 0)
+		{
+			if (elem->arg == NULL || strVal(elem->arg) == NULL)
+				elog(ERROR, "pglo_plhooks.client_hook_arg may not be NULL");
+			private->client_arg = pstrdup(strVal(elem->arg));
+		}
+
+		if (pg_strcasecmp("pglo_plhooks.startup_hook", elem->defname) == 0)
+		{
+			if (elem->arg == NULL || strVal(elem->arg) == NULL)
+				elog(ERROR, "pglo_plhooks.startup_hook may not be NULL");
+			private->startup_hook = find_startup_hook(strVal(elem->arg));
+		}
+
+		if (pg_strcasecmp("pglo_plhooks.shutdown_hook", elem->defname) == 0)
+		{
+			if (elem->arg == NULL || strVal(elem->arg) == NULL)
+				elog(ERROR, "pglo_plhooks.shutdown_hook may not be NULL");
+			private->shutdown_hook = find_shutdown_hook(strVal(elem->arg));
+		}
+
+		if (pg_strcasecmp("pglo_plhooks.txn_filter_hook", elem->defname) == 0)
+		{
+			if (elem->arg == NULL || strVal(elem->arg) == NULL)
+				elog(ERROR, "pglo_plhooks.txn_filter_hook may not be NULL");
+			private->txn_filter_hook = find_txn_filter_hook(strVal(elem->arg));
+		}
+
+		if (pg_strcasecmp("pglo_plhooks.row_filter_hook", elem->defname) == 0)
+		{
+			if (elem->arg == NULL || strVal(elem->arg) == NULL)
+				elog(ERROR, "pglo_plhooks.row_filter_hook may not be NULL");
+			private->row_filter_hook = find_row_filter_hook(strVal(elem->arg));
+		}
+	}
+}
+
+static Oid
+find_hook_fn(const char *funcname, Oid funcargtypes[], int nfuncargtypes, Oid returntype)
+{
+	Oid			funcid;
+	List	   *qname;
+
+	qname = stringToQualifiedNameList(funcname);
+
+	/* find the the function */
+	funcid = LookupFuncName(qname, nfuncargtypes, funcargtypes, false);
+
+	/* Check expected return type */
+	if (get_func_rettype(funcid) != returntype)
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+				 errmsg("function %s doesn't return expected type %d",
+						NameListToString(qname), returntype)));
+	}
+
+	if (pg_proc_aclcheck(funcid, GetUserId(), ACL_EXECUTE) != ACLCHECK_OK)
+	{
+		const char * username;
+#if PG_VERSION_NUM >= 90500
+		username = GetUserNameFromId(GetUserId(), false);
+#else
+		username = GetUserNameFromId(GetUserId());
+#endif
+		ereport(ERROR,
+				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+				 errmsg("current user %s does not have permission to call function %s",
+					 username, NameListToString(qname))));
+	}
+
+	list_free_deep(qname);
+
+	return funcid;
+}
+
+static Oid
+find_startup_hook(const char *proname)
+{
+	Oid argtypes[2];
+
+	argtypes[0] = TEXTARRAYOID;
+	argtypes[1] = TEXTOID;
+
+	return find_hook_fn(proname, argtypes, 2, VOIDOID);
+}
+
+static Oid
+find_shutdown_hook(const char *proname)
+{
+	Oid argtypes[1];
+
+	argtypes[0] = TEXTOID;
+
+	return find_hook_fn(proname, argtypes, 1, VOIDOID);
+}
+
+static Oid
+find_row_filter_hook(const char *proname)
+{
+	Oid argtypes[3];
+
+	argtypes[0] = REGCLASSOID;
+	argtypes[1] = CHAROID;
+	argtypes[2] = TEXTOID;
+
+	return find_hook_fn(proname, argtypes, 3, BOOLOID);
+}
+
+static Oid
+find_txn_filter_hook(const char *proname)
+{
+	Oid argtypes[2];
+
+	argtypes[0] = INT4OID;
+	argtypes[1] = TEXTOID;
+
+	return find_hook_fn(proname, argtypes, 2, BOOLOID);
+}
diff --git a/contrib/pglogical_output_plhooks/pglogical_output_plhooks.control b/contrib/pglogical_output_plhooks/pglogical_output_plhooks.control
new file mode 100644
index 0000000..647b9ef
--- /dev/null
+++ b/contrib/pglogical_output_plhooks/pglogical_output_plhooks.control
@@ -0,0 +1,4 @@
+comment = 'pglogical_output pl hooks'
+default_version = '1.0'
+module_pathname = '$libdir/pglogical_output_plhooks'
+relocatable = false
-- 
2.1.0

