>From 7f1e6c2aaeb17f2ee33fc88de5e06a6057634751 Mon Sep 17 00:00:00 2001
From: Abhijit Menon-Sen <ams@2ndQuadrant.com>
Date: Fri, 11 Jan 2013 14:36:48 +0530
Subject: [PATCH 18/19] wal_decoding: Add test_logical_replication extension
 for easier testing of logical decoding

This extension provides three functions for manipulating replication slots:
* init_logical_replication - initiate a replication slot and wait for consistent state
* start_logical_replication - return all changes since the last call up to now, without blocking
* free_logical_replication - free the logical slot again

Those are pretty direct synonyms for the replication connection commands.

Due to questions about how to integrate logical replication tests this module
also contains the current tests of logical replication itself.

Author: Abhijit Menon-Sen
---
 contrib/Makefile                                   |   1 +
 contrib/test_logical_replication/Makefile          |  20 ++
 contrib/test_logical_replication/expected/ddl.out  | 250 +++++++++++++++
 contrib/test_logical_replication/sql/ddl.sql       | 154 +++++++++
 .../test_logical_replication--1.0.sql              |  14 +
 .../test_logical_replication.c                     | 352 +++++++++++++++++++++
 .../test_logical_replication.control               |   5 +
 7 files changed, 796 insertions(+)
 create mode 100644 contrib/test_logical_replication/Makefile
 create mode 100644 contrib/test_logical_replication/expected/ddl.out
 create mode 100644 contrib/test_logical_replication/sql/ddl.sql
 create mode 100644 contrib/test_logical_replication/test_logical_replication--1.0.sql
 create mode 100644 contrib/test_logical_replication/test_logical_replication.c
 create mode 100644 contrib/test_logical_replication/test_logical_replication.control

diff --git a/contrib/Makefile b/contrib/Makefile
index 432e915..1cc30fe 100644
--- a/contrib/Makefile
+++ b/contrib/Makefile
@@ -50,6 +50,7 @@ SUBDIRS = \
 		tcn		\
 		test_parser	\
 		test_decoding	\
+		test_logical_replication \
 		tsearch2	\
 		unaccent	\
 		vacuumlo	\
diff --git a/contrib/test_logical_replication/Makefile b/contrib/test_logical_replication/Makefile
new file mode 100644
index 0000000..7ebbc44
--- /dev/null
+++ b/contrib/test_logical_replication/Makefile
@@ -0,0 +1,20 @@
+MODULE_big = test_logical_replication
+OBJS = test_logical_replication.o
+
+EXTENSION = test_logical_replication
+DATA = test_logical_replication--1.0.sql
+
+REGRESS = ddl
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/test_logical_replication
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
+
+test_logical_replication.o: test_logical_replication.c
diff --git a/contrib/test_logical_replication/expected/ddl.out b/contrib/test_logical_replication/expected/ddl.out
new file mode 100644
index 0000000..226f8f8
--- /dev/null
+++ b/contrib/test_logical_replication/expected/ddl.out
@@ -0,0 +1,250 @@
+CREATE EXTENSION test_logical_replication;
+-- predictability
+SET synchronous_commit = on;
+-- faster startup
+CHECKPOINT;
+SELECT 'init' FROM init_logical_replication('test_decoding');
+ ?column? 
+----------
+ init
+(1 row)
+
+CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120));
+BEGIN;
+INSERT INTO replication_example(somedata, text) VALUES (1, 1);
+INSERT INTO replication_example(somedata, text) VALUES (1, 2);
+COMMIT;
+ALTER TABLE replication_example ADD COLUMN bar int;
+INSERT INTO replication_example(somedata, text, bar) VALUES (2, 1, 4);
+BEGIN;
+INSERT INTO replication_example(somedata, text, bar) VALUES (2, 2, 4);
+INSERT INTO replication_example(somedata, text, bar) VALUES (2, 3, 4);
+INSERT INTO replication_example(somedata, text, bar) VALUES (2, 4, NULL);
+COMMIT;
+ALTER TABLE replication_example DROP COLUMN bar;
+INSERT INTO replication_example(somedata, text) VALUES (3, 1);
+BEGIN;
+INSERT INTO replication_example(somedata, text) VALUES (3, 2);
+INSERT INTO replication_example(somedata, text) VALUES (3, 3);
+COMMIT;
+ALTER TABLE replication_example RENAME COLUMN text TO somenum;
+INSERT INTO replication_example(somedata, somenum) VALUES (4, 1);
+-- collect all changes
+SELECT data FROM start_logical_replication('now');
+                                                                                                                                         data                                                                                                                                         
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ BEGIN
+ table "replication_example_id_seq": INSERT: sequence_name[name]:replication_example_id_seq last_value[int8]:1 start_value[int8]:1 increment_by[int8]:1 max_value[int8]:9223372036854775807 min_value[int8]:1 cache_value[int8]:1 log_cnt[int8]:0 is_cycled[bool]:f is_called[bool]:f
+ COMMIT
+ BEGIN
+ table "replication_example": INSERT: id[int4]:1 somedata[int4]:1 text[varchar]:1
+ table "replication_example": INSERT: id[int4]:2 somedata[int4]:1 text[varchar]:2
+ COMMIT
+ BEGIN
+ COMMIT
+ BEGIN
+ table "replication_example": INSERT: id[int4]:3 somedata[int4]:2 text[varchar]:1 bar[int4]:4
+ COMMIT
+ BEGIN
+ table "replication_example": INSERT: id[int4]:4 somedata[int4]:2 text[varchar]:2 bar[int4]:4
+ table "replication_example": INSERT: id[int4]:5 somedata[int4]:2 text[varchar]:3 bar[int4]:4
+ table "replication_example": INSERT: id[int4]:6 somedata[int4]:2 text[varchar]:4 bar[int4]:(null)
+ COMMIT
+ BEGIN
+ COMMIT
+ BEGIN
+ table "replication_example": INSERT: id[int4]:7 somedata[int4]:3 text[varchar]:1
+ COMMIT
+ BEGIN
+ table "replication_example": INSERT: id[int4]:8 somedata[int4]:3 text[varchar]:2
+ table "replication_example": INSERT: id[int4]:9 somedata[int4]:3 text[varchar]:3
+ COMMIT
+ BEGIN
+ COMMIT
+ BEGIN
+ table "replication_example": INSERT: id[int4]:10 somedata[int4]:4 somenum[varchar]:1
+ COMMIT
+(31 rows)
+
+ALTER TABLE replication_example ALTER COLUMN somenum TYPE int4 USING (somenum::int4);
+-- throw away changes, they contain oids
+SELECT count(data) FROM start_logical_replication('now');
+ count 
+-------
+    12
+(1 row)
+
+INSERT INTO replication_example(somedata, somenum) VALUES (5, 1);
+BEGIN;
+INSERT INTO replication_example(somedata, somenum) VALUES (6, 1);
+ALTER TABLE replication_example ADD COLUMN zaphod1 int;
+INSERT INTO replication_example(somedata, somenum, zaphod1) VALUES (6, 2, 1);
+ALTER TABLE replication_example ADD COLUMN zaphod2 int;
+INSERT INTO replication_example(somedata, somenum, zaphod2) VALUES (6, 3, 1);
+INSERT INTO replication_example(somedata, somenum, zaphod1) VALUES (6, 4, 2);
+COMMIT;
+/*
+ * check whether the correct indexes are chosen for deletions
+ */
+CREATE TABLE tr_unique(id2 serial unique NOT NULL, data int);
+INSERT INTO tr_unique(data) VALUES(10);
+--show deletion with unique index
+DELETE FROM tr_unique;
+ALTER TABLE tr_unique RENAME TO tr_pkey;
+-- show changes
+SELECT data FROM start_logical_replication('now');
+                                                                                                                                data                                                                                                                                
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ BEGIN
+ table "replication_example": INSERT: id[int4]:11 somedata[int4]:5 somenum[int4]:1
+ COMMIT
+ BEGIN
+ table "replication_example": INSERT: id[int4]:12 somedata[int4]:6 somenum[int4]:1
+ table "replication_example": INSERT: id[int4]:13 somedata[int4]:6 somenum[int4]:2 zaphod1[int4]:1
+ table "replication_example": INSERT: id[int4]:14 somedata[int4]:6 somenum[int4]:3 zaphod1[int4]:(null) zaphod2[int4]:1
+ table "replication_example": INSERT: id[int4]:15 somedata[int4]:6 somenum[int4]:4 zaphod1[int4]:2 zaphod2[int4]:(null)
+ COMMIT
+ BEGIN
+ table "tr_unique_id2_seq": INSERT: sequence_name[name]:tr_unique_id2_seq last_value[int8]:1 start_value[int8]:1 increment_by[int8]:1 max_value[int8]:9223372036854775807 min_value[int8]:1 cache_value[int8]:1 log_cnt[int8]:0 is_cycled[bool]:f is_called[bool]:f
+ COMMIT
+ BEGIN
+ table "tr_unique": INSERT: id2[int4]:1 data[int4]:10
+ COMMIT
+ BEGIN
+ table "tr_unique": DELETE: id2[int4]:1
+ COMMIT
+ BEGIN
+ COMMIT
+(20 rows)
+
+-- hide changes bc of oid visible in full table rewrites
+ALTER TABLE tr_pkey ADD COLUMN id serial primary key;
+SELECT count(data) FROM start_logical_replication('now');
+ count 
+-------
+     3
+(1 row)
+
+INSERT INTO tr_pkey(data) VALUES(1);
+--show deletion with primary key
+DELETE FROM tr_pkey;
+/* display results */
+SELECT data FROM start_logical_replication('now');
+                             data                             
+--------------------------------------------------------------
+ BEGIN
+ table "tr_pkey": INSERT: id2[int4]:2 data[int4]:1 id[int4]:1
+ COMMIT
+ BEGIN
+ table "tr_pkey": DELETE: id[int4]:1
+ COMMIT
+(6 rows)
+
+/*
+ * check that disk spooling works
+ */
+BEGIN;
+CREATE TABLE tr_etoomuch (id serial primary key, data int);
+INSERT INTO tr_etoomuch(data) SELECT g.i FROM generate_series(1, 10234) g(i);
+DELETE FROM tr_etoomuch WHERE id < 5000;
+UPDATE tr_etoomuch SET data = - data WHERE id > 5000;
+COMMIT;
+/* display results, but hide most of the output */
+SELECT count(*), min(data), max(data)
+FROM start_logical_replication('now')
+GROUP BY substring(data, 1, 24)
+ORDER BY 1;
+ count |                                                                                                                                 min                                                                                                                                  |                                                                                                                                 max                                                                                                                                  
+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+     1 | COMMIT                                                                                                                                                                                                                                                               | COMMIT
+     1 | BEGIN                                                                                                                                                                                                                                                                | BEGIN
+     1 | table "tr_etoomuch_id_seq": INSERT: sequence_name[name]:tr_etoomuch_id_seq last_value[int8]:1 start_value[int8]:1 increment_by[int8]:1 max_value[int8]:9223372036854775807 min_value[int8]:1 cache_value[int8]:1 log_cnt[int8]:0 is_cycled[bool]:f is_called[bool]:f | table "tr_etoomuch_id_seq": INSERT: sequence_name[name]:tr_etoomuch_id_seq last_value[int8]:1 start_value[int8]:1 increment_by[int8]:1 max_value[int8]:9223372036854775807 min_value[int8]:1 cache_value[int8]:1 log_cnt[int8]:0 is_cycled[bool]:f is_called[bool]:f
+  4999 | table "tr_etoomuch": DELETE: id[int4]:1                                                                                                                                                                                                                              | table "tr_etoomuch": DELETE: id[int4]:999
+  5234 | table "tr_etoomuch": UPDATE: id[int4]:10000 data[int4]:-10000                                                                                                                                                                                                        | table "tr_etoomuch": UPDATE: id[int4]:9999 data[int4]:-9999
+ 10234 | table "tr_etoomuch": INSERT: id[int4]:10000 data[int4]:10000                                                                                                                                                                                                         | table "tr_etoomuch": INSERT: id[int4]:9 data[int4]:9
+(6 rows)
+
+/*
+ * check whether we subtransactions correctly in relation with each other
+ */
+CREATE TABLE tr_sub (id serial primary key, path text);
+-- toplevel, subtxn, toplevel, subtxn, subtxn
+BEGIN;
+INSERT INTO tr_sub(path) VALUES ('1-top-#1');
+SAVEPOINT a;
+INSERT INTO tr_sub(path) VALUES ('1-top-1-#1');
+INSERT INTO tr_sub(path) VALUES ('1-top-1-#2');
+RELEASE SAVEPOINT a;
+SAVEPOINT b;
+SAVEPOINT c;
+INSERT INTO tr_sub(path) VALUES ('1-top-2-1-#1');
+INSERT INTO tr_sub(path) VALUES ('1-top-2-1-#2');
+RELEASE SAVEPOINT c;
+INSERT INTO tr_sub(path) VALUES ('1-top-2-#1');
+RELEASE SAVEPOINT b;
+COMMIT;
+SELECT data FROM start_logical_replication('now');
+                                                                                                                            data                                                                                                                            
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ BEGIN
+ table "tr_sub_id_seq": INSERT: sequence_name[name]:tr_sub_id_seq last_value[int8]:1 start_value[int8]:1 increment_by[int8]:1 max_value[int8]:9223372036854775807 min_value[int8]:1 cache_value[int8]:1 log_cnt[int8]:0 is_cycled[bool]:f is_called[bool]:f
+ COMMIT
+ BEGIN
+ table "tr_sub": INSERT: id[int4]:1 path[text]:1-top-#1
+ table "tr_sub": INSERT: id[int4]:2 path[text]:1-top-1-#1
+ table "tr_sub": INSERT: id[int4]:3 path[text]:1-top-1-#2
+ table "tr_sub": INSERT: id[int4]:4 path[text]:1-top-2-1-#1
+ table "tr_sub": INSERT: id[int4]:5 path[text]:1-top-2-1-#2
+ table "tr_sub": INSERT: id[int4]:6 path[text]:1-top-2-#1
+ COMMIT
+(11 rows)
+
+-- check that we handle xlog assignments correctly
+BEGIN;
+-- nest 80 subtxns
+SAVEPOINT subtop;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+-- assign xid by inserting
+INSERT INTO tr_sub(path) VALUES ('2-top-1...--#1');
+INSERT INTO tr_sub(path) VALUES ('2-top-1...--#2');
+INSERT INTO tr_sub(path) VALUES ('2-top-1...--#3');
+RELEASE SAVEPOINT subtop;
+INSERT INTO tr_sub(path) VALUES ('2-top-#1');
+COMMIT;
+SELECT data FROM start_logical_replication('now');
+                             data                             
+--------------------------------------------------------------
+ BEGIN
+ table "tr_sub": INSERT: id[int4]:7 path[text]:2-top-1...--#1
+ table "tr_sub": INSERT: id[int4]:8 path[text]:2-top-1...--#2
+ table "tr_sub": INSERT: id[int4]:9 path[text]:2-top-1...--#3
+ table "tr_sub": INSERT: id[int4]:10 path[text]:2-top-#1
+ COMMIT
+(6 rows)
+
+-- done, free logical replication slot
+SELECT data FROM start_logical_replication('now');
+ data 
+------
+(0 rows)
+
+SELECT stop_logical_replication();
+ stop_logical_replication 
+--------------------------
+                        0
+(1 row)
+
diff --git a/contrib/test_logical_replication/sql/ddl.sql b/contrib/test_logical_replication/sql/ddl.sql
new file mode 100644
index 0000000..ce18b7e
--- /dev/null
+++ b/contrib/test_logical_replication/sql/ddl.sql
@@ -0,0 +1,154 @@
+CREATE EXTENSION test_logical_replication;
+-- predictability
+SET synchronous_commit = on;
+
+-- faster startup
+CHECKPOINT;
+
+SELECT 'init' FROM init_logical_replication('test_decoding');
+
+CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120));
+BEGIN;
+INSERT INTO replication_example(somedata, text) VALUES (1, 1);
+INSERT INTO replication_example(somedata, text) VALUES (1, 2);
+COMMIT;
+
+ALTER TABLE replication_example ADD COLUMN bar int;
+
+INSERT INTO replication_example(somedata, text, bar) VALUES (2, 1, 4);
+
+BEGIN;
+INSERT INTO replication_example(somedata, text, bar) VALUES (2, 2, 4);
+INSERT INTO replication_example(somedata, text, bar) VALUES (2, 3, 4);
+INSERT INTO replication_example(somedata, text, bar) VALUES (2, 4, NULL);
+COMMIT;
+
+ALTER TABLE replication_example DROP COLUMN bar;
+INSERT INTO replication_example(somedata, text) VALUES (3, 1);
+
+BEGIN;
+INSERT INTO replication_example(somedata, text) VALUES (3, 2);
+INSERT INTO replication_example(somedata, text) VALUES (3, 3);
+COMMIT;
+
+ALTER TABLE replication_example RENAME COLUMN text TO somenum;
+
+INSERT INTO replication_example(somedata, somenum) VALUES (4, 1);
+
+-- collect all changes
+SELECT data FROM start_logical_replication('now');
+
+ALTER TABLE replication_example ALTER COLUMN somenum TYPE int4 USING (somenum::int4);
+-- throw away changes, they contain oids
+SELECT count(data) FROM start_logical_replication('now');
+
+INSERT INTO replication_example(somedata, somenum) VALUES (5, 1);
+
+BEGIN;
+INSERT INTO replication_example(somedata, somenum) VALUES (6, 1);
+ALTER TABLE replication_example ADD COLUMN zaphod1 int;
+INSERT INTO replication_example(somedata, somenum, zaphod1) VALUES (6, 2, 1);
+ALTER TABLE replication_example ADD COLUMN zaphod2 int;
+INSERT INTO replication_example(somedata, somenum, zaphod2) VALUES (6, 3, 1);
+INSERT INTO replication_example(somedata, somenum, zaphod1) VALUES (6, 4, 2);
+COMMIT;
+
+/*
+ * check whether the correct indexes are chosen for deletions
+ */
+
+CREATE TABLE tr_unique(id2 serial unique NOT NULL, data int);
+INSERT INTO tr_unique(data) VALUES(10);
+--show deletion with unique index
+DELETE FROM tr_unique;
+
+ALTER TABLE tr_unique RENAME TO tr_pkey;
+
+-- show changes
+SELECT data FROM start_logical_replication('now');
+
+-- hide changes bc of oid visible in full table rewrites
+ALTER TABLE tr_pkey ADD COLUMN id serial primary key;
+SELECT count(data) FROM start_logical_replication('now');
+
+INSERT INTO tr_pkey(data) VALUES(1);
+--show deletion with primary key
+DELETE FROM tr_pkey;
+
+/* display results */
+SELECT data FROM start_logical_replication('now');
+
+/*
+ * check that disk spooling works
+ */
+BEGIN;
+CREATE TABLE tr_etoomuch (id serial primary key, data int);
+INSERT INTO tr_etoomuch(data) SELECT g.i FROM generate_series(1, 10234) g(i);
+DELETE FROM tr_etoomuch WHERE id < 5000;
+UPDATE tr_etoomuch SET data = - data WHERE id > 5000;
+COMMIT;
+
+/* display results, but hide most of the output */
+SELECT count(*), min(data), max(data)
+FROM start_logical_replication('now')
+GROUP BY substring(data, 1, 24)
+ORDER BY 1;
+
+/*
+ * check whether we subtransactions correctly in relation with each other
+ */
+CREATE TABLE tr_sub (id serial primary key, path text);
+
+-- toplevel, subtxn, toplevel, subtxn, subtxn
+BEGIN;
+INSERT INTO tr_sub(path) VALUES ('1-top-#1');
+
+SAVEPOINT a;
+INSERT INTO tr_sub(path) VALUES ('1-top-1-#1');
+INSERT INTO tr_sub(path) VALUES ('1-top-1-#2');
+RELEASE SAVEPOINT a;
+
+SAVEPOINT b;
+SAVEPOINT c;
+INSERT INTO tr_sub(path) VALUES ('1-top-2-1-#1');
+INSERT INTO tr_sub(path) VALUES ('1-top-2-1-#2');
+RELEASE SAVEPOINT c;
+INSERT INTO tr_sub(path) VALUES ('1-top-2-#1');
+RELEASE SAVEPOINT b;
+COMMIT;
+
+SELECT data FROM start_logical_replication('now');
+
+-- check that we handle xlog assignments correctly
+BEGIN;
+-- nest 80 subtxns
+SAVEPOINT subtop;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+-- assign xid by inserting
+INSERT INTO tr_sub(path) VALUES ('2-top-1...--#1');
+INSERT INTO tr_sub(path) VALUES ('2-top-1...--#2');
+INSERT INTO tr_sub(path) VALUES ('2-top-1...--#3');
+RELEASE SAVEPOINT subtop;
+INSERT INTO tr_sub(path) VALUES ('2-top-#1');
+COMMIT;
+
+SELECT data FROM start_logical_replication('now');
+
+
+-- done, free logical replication slot
+SELECT data FROM start_logical_replication('now');
+SELECT stop_logical_replication();
diff --git a/contrib/test_logical_replication/test_logical_replication--1.0.sql b/contrib/test_logical_replication/test_logical_replication--1.0.sql
new file mode 100644
index 0000000..724ac20
--- /dev/null
+++ b/contrib/test_logical_replication/test_logical_replication--1.0.sql
@@ -0,0 +1,14 @@
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION test_logical_replication" to load this file. \quit
+
+CREATE FUNCTION init_logical_replication (plugin text, OUT slot_name text, OUT xlog_position text)
+AS 'MODULE_PATHNAME', 'init_logical_replication'
+LANGUAGE C IMMUTABLE STRICT;
+
+CREATE FUNCTION start_logical_replication (pos text, OUT location text, OUT xid bigint, OUT data text) RETURNS SETOF record
+AS 'MODULE_PATHNAME', 'start_logical_replication'
+LANGUAGE C IMMUTABLE STRICT;
+
+CREATE FUNCTION stop_logical_replication () RETURNS int
+AS 'MODULE_PATHNAME', 'stop_logical_replication'
+LANGUAGE C IMMUTABLE STRICT;
diff --git a/contrib/test_logical_replication/test_logical_replication.c b/contrib/test_logical_replication/test_logical_replication.c
new file mode 100644
index 0000000..35b69b1
--- /dev/null
+++ b/contrib/test_logical_replication/test_logical_replication.c
@@ -0,0 +1,352 @@
+#include "postgres.h"
+
+#include "access/timeline.h"
+#include "access/xlog_internal.h"
+#include "catalog/pg_type.h"
+#include "libpq/pqformat.h"
+#include "replication/decode.h"
+#include "replication/logical.h"
+#include "replication/snapbuild.h"
+#include "storage/procarray.h"
+#include "utils/builtins.h"
+#include "utils/memutils.h"
+#include "utils/resowner.h"
+#include "utils/syscache.h"
+#include "miscadmin.h"
+#include "funcapi.h"
+
+PG_MODULE_MAGIC;
+
+Datum init_logical_replication(PG_FUNCTION_ARGS);
+Datum start_logical_replication(PG_FUNCTION_ARGS);
+Datum stop_logical_replication(PG_FUNCTION_ARGS);
+
+static const char *slot_name = NULL;
+static Tuplestorestate *tupstore = NULL;
+static TupleDesc tupdesc;
+
+extern void XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count);
+
+static int
+test_read_page(XLogReaderState* state, XLogRecPtr targetPagePtr, int reqLen,
+                 char* cur_page, TimeLineID *pageTLI)
+{
+    XLogRecPtr flushptr, loc;
+    int count;
+
+	loc = targetPagePtr + reqLen;
+	while (1) {
+		flushptr = GetFlushRecPtr();
+		if (loc <= flushptr)
+			break;
+		pg_usleep(1000L);
+	}
+
+    /* more than one block available */
+    if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
+        count = XLOG_BLCKSZ;
+    /* not enough data there */
+    else if (targetPagePtr + reqLen > flushptr)
+        return -1;
+    /* part of the page available */
+    else
+        count = flushptr - targetPagePtr;
+
+    /* FIXME: more sensible/efficient implementation */
+    XLogRead(cur_page, ThisTimeLineID, targetPagePtr, XLOG_BLCKSZ);
+
+    return count;
+}
+
+static void store_tuple(XLogRecPtr ptr, TransactionId xid, StringInfo si)
+{
+	Datum values[3];
+	bool nulls[3];
+	char buf[60];
+
+	sprintf(buf, "%X/%X", (uint32)(ptr >> 32), (uint32)ptr);
+
+	memset(nulls, 0, sizeof(nulls));
+	values[0] = CStringGetTextDatum(buf);
+	values[1] = Int64GetDatum(xid);
+	values[2] = CStringGetTextDatum(si->data);
+
+	tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+}
+
+static void
+begin_txn_wrapper(ReorderBuffer* cache, ReorderBufferTXN* txn)
+{
+	ReaderApplyState *state = cache->private_data;
+	bool send;
+
+	resetStringInfo(state->out);
+
+	send = state->begin_cb(state->user_private, state->out, txn);
+
+	if (send)
+		store_tuple(txn->lsn, txn->xid, state->out);
+}
+
+static void
+commit_txn_wrapper(ReorderBuffer* cache, ReorderBufferTXN* txn, XLogRecPtr commit_lsn)
+{
+	ReaderApplyState *state = cache->private_data;
+	bool send;
+
+	resetStringInfo(state->out);
+
+	send = state->commit_cb(state->user_private, state->out, txn, commit_lsn);
+
+	if (send)
+		store_tuple(commit_lsn, txn->xid, state->out);
+}
+
+static void
+change_wrapper(ReorderBuffer* cache, ReorderBufferTXN* txn, ReorderBufferChange* change)
+{
+	ReaderApplyState *state = cache->private_data;
+	bool send;
+	HeapTuple table;
+	Oid reloid;
+
+	resetStringInfo(state->out);
+
+	table = LookupRelationByRelFileNode(&change->relnode);
+	Assert(table);
+	reloid = HeapTupleHeaderGetOid(table->t_data);
+	ReleaseSysCache(table);
+
+	send = state->change_cb(state->user_private, state->out, txn,
+							reloid, change);
+
+	if (send)
+		store_tuple(change->lsn, txn->xid, state->out);
+}
+
+
+PG_FUNCTION_INFO_V1(init_logical_replication);
+
+Datum
+init_logical_replication(PG_FUNCTION_ARGS)
+{
+	const char *plugin;
+	char		xpos[MAXFNAMELEN];
+	XLogReaderState *logical_reader;
+
+	TupleDesc   tupdesc;
+	HeapTuple   tuple;
+	Datum       result;
+	Datum       values[2];
+	bool        nulls[2];
+
+	if (slot_name)
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 (errmsg("sorry, can't init logical replication twice"))));
+
+	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+		elog(ERROR, "return type must be a row type");
+
+	/* Acquire a logical replication slot */
+	plugin = text_to_cstring(PG_GETARG_TEXT_P(0));
+	CheckLogicalReplicationRequirements();
+	LogicalDecodingAcquireFreeSlot(plugin);
+
+	/*
+	 * Use the same initial_snapshot_reader, but with our own read_page
+	 * callback that does not depend on walsender.
+	 */
+	logical_reader = initial_snapshot_reader(MyLogicalDecodingSlot->last_required_checkpoint,
+											 MyLogicalDecodingSlot->xmin,
+											 NameStr(MyLogicalDecodingSlot->plugin));
+	logical_reader->read_page = test_read_page;
+
+	/* Wait for a consistent starting point */
+	for (;;)
+	{
+		XLogRecord *record;
+		XLogRecordBuffer buf;
+		ReaderApplyState* apply_state = logical_reader->private_data;
+		char *err = NULL;
+
+		/* the read_page callback waits for new WAL */
+		record = XLogReadRecord(logical_reader, InvalidXLogRecPtr, &err);
+		if (err)
+			elog(ERROR, "%s", err);
+
+		Assert(record);
+
+		buf.origptr = logical_reader->ReadRecPtr;
+		buf.record = *record;
+		buf.record_data = XLogRecGetData(record);
+		DecodeRecordIntoReorderBuffer(logical_reader, apply_state, &buf);
+
+		if (initial_snapshot_ready(logical_reader))
+			break;
+	}
+
+	/* Extract the values we want */
+	MyLogicalDecodingSlot->confirmed_flush = logical_reader->EndRecPtr;
+	slot_name = NameStr(MyLogicalDecodingSlot->name);
+	snprintf(xpos, sizeof(xpos), "%X/%X",
+			 (uint32) (MyLogicalDecodingSlot->confirmed_flush >> 32),
+			 (uint32) MyLogicalDecodingSlot->confirmed_flush);
+
+	/* Release the slot and return the values */
+	LogicalDecodingReleaseSlot();
+
+	values[0] = CStringGetTextDatum(slot_name);
+	values[1] = CStringGetTextDatum(xpos);
+
+	memset(nulls, 0, sizeof(nulls));
+
+	tuple = heap_form_tuple(tupdesc, values, nulls);
+	result = HeapTupleGetDatum(tuple);
+
+	PG_RETURN_DATUM(result);
+}
+
+PG_FUNCTION_INFO_V1(start_logical_replication);
+
+Datum
+start_logical_replication(PG_FUNCTION_ARGS)
+{
+	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	MemoryContext per_query_ctx;
+	MemoryContext oldcontext;
+
+	XLogRecPtr now;
+	XLogReaderState *logical_reader;
+	ReaderApplyState *apply_state;
+	ReorderBuffer *reorder;
+
+	ResourceOwner old_resowner = CurrentResourceOwner;
+
+	if (!slot_name)
+		ereport(ERROR,
+				(errcode(ERRCODE_INTERNAL_ERROR),
+				 (errmsg("sorry, can't start logical replication outside of an init/stop pair"))));
+
+	/* check to see if caller supports us returning a tuplestore */
+	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("set-valued function called in context that cannot accept a set")));
+	if (!(rsinfo->allowedModes & SFRM_Materialize))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("materialize mode required, but it is not " \
+						"allowed in this context")));
+
+	/* Build a tuple descriptor for our result type */
+	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+		elog(ERROR, "return type must be a row type");
+
+	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+	oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+	tupstore = tuplestore_begin_heap(true, false, work_mem);
+	rsinfo->returnMode = SFRM_Materialize;
+	rsinfo->setResult = tupstore;
+	rsinfo->setDesc = tupdesc;
+
+	MemoryContextSwitchTo(oldcontext);
+
+	/*
+	 * XXX: It's impolite to ignore our argument and keep decoding
+	 * until the current position.
+	 */
+	now = GetFlushRecPtr();
+
+	/*
+	 * We need to create a normal_snapshot_reader, but adjust it to use
+	 * our page_read callback, and also make its reorder buffer use our
+	 * callback wrappers that don't depend on walsender.
+	 */
+
+	CheckLogicalReplicationRequirements();
+	LogicalDecodingReAcquireSlot(slot_name);
+	logical_reader = normal_snapshot_reader(MyLogicalDecodingSlot->last_required_checkpoint,
+											MyLogicalDecodingSlot->xmin,
+											NameStr(MyLogicalDecodingSlot->plugin),
+											MyLogicalDecodingSlot->confirmed_flush);
+
+	logical_reader->read_page = test_read_page;
+	apply_state = (ReaderApplyState *)logical_reader->private_data;
+
+	reorder = apply_state->reorderbuffer;
+	reorder->begin = begin_txn_wrapper;
+	reorder->apply_change = change_wrapper;
+	reorder->commit = commit_txn_wrapper;
+
+	elog(DEBUG1, "Starting logical replication from %X/%X to %X/%x",
+		 (uint32)(MyLogicalDecodingSlot->last_required_checkpoint>>32), (uint32)MyLogicalDecodingSlot->last_required_checkpoint,
+		 (uint32)(now>>32), (uint32)now);
+
+	CurrentResourceOwner = ResourceOwnerCreate(CurrentResourceOwner, "logical decoding");
+
+	for (;;)
+	{
+		XLogRecord *record;
+		char *errm = NULL;
+
+		record = XLogReadRecord(logical_reader, InvalidXLogRecPtr, &errm);
+		if (errm)
+			elog(ERROR, "%s", errm);
+
+		if (record != NULL)
+		{
+			XLogRecPtr rp;
+			XLogRecordBuffer buf;
+			ReaderApplyState* apply_state = logical_reader->private_data;
+
+			buf.origptr = logical_reader->ReadRecPtr;
+			buf.record = *record;
+			buf.record_data = XLogRecGetData(record);
+
+			/*
+			 * The {begin_txn,change,commit_txn}_wrapper callbacks above
+			 * will store the description into our tuplestore.
+			 */
+			DecodeRecordIntoReorderBuffer(logical_reader, apply_state, &buf);
+
+			rp = logical_reader->EndRecPtr;
+			if (rp >= now)
+			{
+				elog(DEBUG1, "Reached endpoint (wanted: %X/%X, got: %X/%X)",
+					 (uint32)(now>>32), (uint32)now,
+					 (uint32)(rp>>32), (uint32)rp);
+				break;
+			}
+		}
+	}
+
+	tuplestore_donestoring(tupstore);
+
+	CurrentResourceOwner = old_resowner;
+
+	/* Next time, start where we left off */
+	MyLogicalDecodingSlot->confirmed_flush = logical_reader->EndRecPtr;
+
+	LogicalDecodingReleaseSlot();
+
+	return (Datum) 0;
+}
+
+PG_FUNCTION_INFO_V1(stop_logical_replication);
+
+Datum
+stop_logical_replication(PG_FUNCTION_ARGS)
+{
+	if (!slot_name)
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 (errmsg("sorry, can't stop logical replication before init"))));
+
+	CheckLogicalReplicationRequirements();
+	LogicalDecodingFreeSlot(slot_name);
+	slot_name = NULL;
+
+	PG_RETURN_INT32(0);
+}
diff --git a/contrib/test_logical_replication/test_logical_replication.control b/contrib/test_logical_replication/test_logical_replication.control
new file mode 100644
index 0000000..e73b797
--- /dev/null
+++ b/contrib/test_logical_replication/test_logical_replication.control
@@ -0,0 +1,5 @@
+# test_logical_replication extension
+comment = 'test logical replication'
+default_version = '1.0'
+module_pathname = '$libdir/test_logical_replication'
+relocatable = true
-- 
1.7.12.289.g0ce9864.dirty

