#!/bin/bash

port_primary=6633
port_standby=6634
port_subscriber=6635

echo '=========='
echo '=Clean up='
echo '=========='

./pg_ctl stop -D ../../pubdb/
./pg_ctl stop -D ../../standbydb/
./pg_ctl stop -D ../../subdb/

rm -rf ../../pubdb
rm -rf ../../subdb
rm -rf ../../standbydb
rm  ../../pub.log
rm  ../../sub.log
rm  ../../standby.log

echo '======================='
echo '=Set up primary server='
echo '======================='

./initdb -D ../../pubdb

cat << EOF >> ../../pubdb/postgresql.conf
wal_level = logical
port = $port_primary
synchronized_standby_slots = 'standby_1'
max_wal_senders=500
max_worker_processes=1000
max_replication_slots=500
log_replication_commands = 'on'
EOF

cat << EOF >> ../../pubdb/pg_hba.conf
host all,replication all 0.0.0.0/0 trust
EOF

./pg_ctl -D ../../pubdb/ -w -l ../../pub.log start
./psql -p $port_primary -d postgres -c "SELECT pg_create_physical_replication_slot('standby_1');"
./psql -p $port_primary -d postgres -c "CREATE ROLE replication WITH REPLICATION PASSWORD 'password' LOGIN;"
./pg_ctl -D ../../pubdb/ -w -l ../../pub.log stop
./pg_ctl -D ../../pubdb/ -w -l ../../pub.log start



echo '========================='
echo '=Set up standby server='
echo '========================='

./pg_basebackup -h 127.0.0.1 -D ../../standbydb  -R -P -U replication -X s  -p $port_primary

cat << EOF >> ../../standbydb/postgresql.conf
port = $port_standby
primary_slot_name = 'standby_1'
hot_standby_feedback = on
synchronized_standby_slots = ''
wal_receiver_status_interval=1s
log_min_messages='DEBUG1'
EOF
cat << EOF >> ../../standbydb/postgresql.auto.conf
primary_conninfo = 'user=replication host=127.0.0.1 port=6633 dbname=postgres'
EOF

./pg_ctl -D ../../standbydb/ -w -l ../../standby.log start

echo '========================='
echo '=primary: create logical slot and perform DDLs'
echo '========================='

./psql -p $port_primary -d postgres -c "SELECT pg_create_logical_replication_slot('sub1','pgoutput', false, false, true);"

./psql -p $port_primary -d postgres -c "create table tab1 (id int primary key, a int);"
./psql -p $port_primary -d postgres -c "create table tab2 (id int primary key, a int);"

# let the standby catch up
sleep 2;

./psql -p $port_primary -d postgres -c "select slot_name,xmin, catalog_xmin from pg_replication_slots";
./psql -p $port_primary -d postgres -c "select xmin from pg_class where relname='tab1';"


echo '==================='
echo '=Set up subscriber='
echo '==================='

./initdb -D ../../subdb

cat << EOF >> ../../subdb/postgresql.conf
port = $port_subscriber
max_wal_senders=500
max_worker_processes=1000
max_replication_slots=500
max_logical_replication_workers=500
EOF

./pg_ctl start -D ../../subdb -w -l ../../sub.log


echo '==================='
echo '=Create publication and subscription using already created logical slot.'
echo '==================='
./psql -p $port_primary -d postgres -c "create publication pub1 for all tables;"

./psql -p $port_subscriber -d postgres -c "create table tab1 (id int primary key, a int);"
./psql -p $port_subscriber -d postgres -c "create table tab2 (id int primary key, a int);"
./psql -p $port_subscriber -d postgres -c "create subscription sub1 connection 'dbname=postgres host=localhost port=6633' publication pub1 with (create_slot = false, failover=true);"

# let the subscriber start and catch up.
sleep 2

./psql -p $port_primary -d postgres -c "select slot_name,xmin, catalog_xmin from pg_replication_slots";


echo '==================='
echo '=Sync replication slots on standby'
echo '==================='
./psql -p $port_standby -d postgres -c "SELECT pg_sync_replication_slots();"

sleep 1

echo '==================='
echo '=Check both primary and synced slot after sync'
echo '==================='

echo '=PRIMARY='
./psql -p $port_primary -d postgres -c "select slot_name, synced, catalog_xmin, restart_lsn, confirmed_flush_lsn from pg_replication_slots where slot_type='logical'";
echo '=STANDBY='
./psql -p $port_standby -d postgres -c "select slot_name, synced, catalog_xmin, restart_lsn, confirmed_flush_lsn from pg_replication_slots where slot_type='logical'";


echo '==================='
echo '=New DDLs on pub and sub'
echo '==================='
# Some more new tables on pub
./psql -p $port_primary -d postgres -c "create table tab3 (id int primary key, a int);"
./psql -p $port_primary -d postgres -c "create table tab4 (id int primary key, a int);"

# new tables on sub
./psql -p $port_subscriber -d postgres -c "create table tab3 (id int primary key, a int);"
./psql -p $port_subscriber -d postgres -c "create table tab4 (id int primary key, a int);"

# Refresh publication
./psql -p $port_subscriber -d postgres -c "alter subscription sub1 refresh publication;"

# Give enough time to the subscriber to catch up
sleep 2

./psql -p $port_primary -d postgres -c "select slot_name,xmin, catalog_xmin from pg_replication_slots";

echo '==================='
echo '=Sync replication slots multiple times on standby'
echo '==================='
./psql -p $port_standby -d postgres -c "SELECT pg_sync_replication_slots();"

./psql -p $port_primary -d postgres -c "insert into tab3 values (10,10);"
./psql -p $port_primary -d postgres -c "insert into tab4 values (10,10);"
sleep 2

./psql -p $port_standby -d postgres -c "SELECT pg_sync_replication_slots();"

sleep 2

echo '==================='
echo '=Check both primary and synced slot after sync'
echo '==================='

echo '=PRIMARY='
./psql -p $port_primary -d postgres -c "select slot_name, synced, catalog_xmin, restart_lsn, confirmed_flush_lsn from pg_replication_slots where slot_type='logical'";
echo '=STANDBY='
./psql -p $port_standby -d postgres -c "select slot_name, synced, catalog_xmin, restart_lsn, confirmed_flush_lsn from pg_replication_slots where slot_type='logical'";

