#!/bin/bash

# The intent of this script is to reproduce the issue due to confirmed_lsn
# backward movement without the use of two_phase option.
# The problem arises when changes applied by the table sync worker are duplicated by
# the apply worker if the confirmed_flush_lsn moves backward after the table sync.
#
# This script reproduces this issue with the help of three injection points.
#
# After this script run, the error expected on sub due to above stated issue:
# ERROR:  conflict detected on relation "public.tab2": conflict=insert_exists
# DETAIL:  Key already exists in unique index "tab2_a_key", modified in transaction ....
#	Key (a)=(2); existing local tuple (2); remote tuple (2).

port_primary=5432
port_subscriber=5434

echo '=========='
echo '=Clean up='
echo '=========='

pg_ctl stop -D data_primary
pg_ctl stop -D data_subscriber

rm -rf data_* *log

echo '======================='
echo '=Set up primary server='
echo '======================='

initdb -D data_primary

cat << EOF >> data_primary/postgresql.conf
wal_level = logical 
port = $port_primary
#standby_slot_names = 'physical'
#log_replication_commands = 'on'
#max_slot_wal_keep_size = 64kB

max_wal_senders=550
max_worker_processes=1000 
max_replication_slots=550 
log_replication_commands = 'on' 
checkpoint_timeout = 1d 
shared_buffers = 6GB 
max_worker_processes = 32 
max_parallel_maintenance_workers = 24 
max_parallel_workers = 32 
synchronous_commit = on 
checkpoint_timeout = 1d 
max_wal_size = 24GB 
min_wal_size = 15GB 
autovacuum = off

wal_sender_timeout = 6000s
wal_receiver_timeout = 6000s

max_prepared_transactions = 10
log_min_messages = 'debug1'

EOF


pg_ctl -D data_primary start -w -l primary.log 

psql -d postgres -p $port_primary -c "SELECT * FROM pg_create_physical_replication_slot('physical');"

echo '==================='
echo '=Set up subscriber='
echo '==================='

initdb -D data_subscriber

cat << EOF >> data_subscriber/postgresql.conf
port = $port_subscriber


checkpoint_timeout = 1h
shared_buffers = '8GB'
wal_buffers = '1GB'
max_connections = '5000'
max_wal_size = 20GB
min_wal_size = 10GB
max_wal_senders = 100
max_replication_slots = 101
autovacuum = off

wal_sender_timeout = 6000s
wal_receiver_timeout = 6000s


wal_receiver_status_interval = 1
max_prepared_transactions = 10
log_min_messages = 'debug1'
EOF

pg_ctl start -D data_subscriber -l sub.log


# Put injection point in tablesync worker after initial copy is done
# and before it can update state to SUBREL_STATE_SYNCWAIT.
# It is important to stop tablesync before updating the state to SUBREL_STATE_SYNCWAIT
# as we want the apply worker to keep on processing the changes instead of loop-waiting
# for table sync to finish.
psql -d postgres -p $port_subscriber -c "create extension injection_points;SELECT injection_points_attach('table-sync-wait-2', 'wait');"

psql -d postgres -p $port_primary -c "CREATE TABLE tab1(a int); INSERT INTO tab1 VALUES(1); CREATE PUBLICATION pub FOR TABLE tab1;"

psql -d postgres -p $port_subscriber -c "CREATE TABLE tab1(a int);"

# Start the subscription with copy_data to false
psql -d postgres -p $port_subscriber -c "CREATE SUBSCRIPTION sub CONNECTION 'dbname=postgres port=$port_primary' PUBLICATION pub WITH (slot_name='logicalslot', create_slot=true, copy_data = false, two_phase=false, failover=true, enabled=true)"

sleep 2

psql -d postgres -p $port_primary -c "SELECT slot_name, confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name='logicalslot'"

# Insert the data into tab1. After this insert:
# --apply worker's origin_lsn on sub will be advanced to this INSERT lsn (say lsn1)
# --confirmed_flush on pub will also be advanced to same lsn (lsn1)
psql -d postgres -p $port_primary -c "INSERT INTO tab1 VALUES(2);"

sleep 1

# check both confirmed_flush and origin_lsn, they will be lsn1.
psql -d postgres -p $port_subscriber -c "select * from pg_replication_origin_status where local_id = 1;"
psql -d postgres -p $port_primary -c "SELECT slot_name, confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name='logicalslot'"

# Now add another table to publication.
psql -d postgres -p $port_primary -c "CREATE TABLE tab2 (a int UNIQUE); ALTER PUBLICATION pub ADD TABLE tab2;"

psql -d postgres -p $port_subscriber -c "CREATE TABLE tab2 (a int UNIQUE);"

# Refresh the subscription.
# It will start tablesync for tab2. Tablesync worker will do initial copy
# but will wait before updating the state to SUBREL_STATE_SYNCWAIT due to 
# injection point table-sync-wait-2
psql -d postgres -p $port_subscriber -c "ALTER SUBSCRIPTION sub REFRESH PUBLICATION"

sleep 1

# Insert the data to tab2, it will not be consumed by tablesync worker on sub yet.
# Apply worker will see this change and will ignore it.
# Lets say the remote lsn for this change is lsn2.
psql -d postgres -p $port_primary -c "INSERT INTO tab2 VALUES(2);"
sleep 3

psql -d postgres -p $port_subscriber -c "select * from pg_replication_origin_status where local_id = 1;"

# By this time confirmed_flush must have moved to lsn2 now (where lsn2 > lsn1 )
# due to keepalive message handling in apply worker
psql -d postgres -p $port_primary -c "SELECT slot_name, confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name='logicalslot'"

# Before we wake up table-sync worker to catchup, block apply worker
# before it does maybe_reread_subscription() and process_syncing_tables().
# This is to avoid moving the state to SUBREL_STATE_READY
psql -d postgres -p $port_subscriber -c "SELECT injection_points_attach('reread-sub', 'wait');"

# Wake up table sync worker, it will now catch-up and will consume
# the data inserted above in tab2. Table-sync worker's origin_lsn 
# will be lsn2 now. 
psql -d postgres -p $port_subscriber -c "SELECT injection_points_wakeup('table-sync-wait-2');SELECT injection_points_detach('table-sync-wait-2')"

psql -d postgres -p $port_subscriber -c "alter subscription sub disable;"

sleep 1

# Now wake up apply worker, it will re-read subscription and will exit due to sub
# being disabled before moving the state to SUBREL_STATE_READY
psql -d postgres -p $port_subscriber -c "SELECT injection_points_wakeup('reread-sub');SELECT injection_points_detach('reread-sub')"

sleep 1

psql -d postgres -p $port_subscriber -c "alter subscription sub enable;"

# Now let the apply worker start and move the state to SUBREL_STATE_READY 
# and let the table sync finish and exit now.
sleep 3

# Now the state is: 
# --table sync is finished on sub, changes are synced upton lsn2
# --apply worker has processed and ignored the changes upto lsn2 without updating origin_lsn
# --apply worker's origin_lsn at sub is still lsn1
# --confirmed_flush on pub is at lsn2
#
# Now disable sub and before we re-enable, stop walsender to process any more 
# replies or send any more keepalive.
psql -d postgres -p $port_primary -c "SELECT slot_name, confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name='logicalslot'"
psql -d postgres -p $port_subscriber -c "alter subscription sub disable;"

sleep 1

psql -d postgres -p $port_primary -c "create extension injection_points;;SELECT injection_points_attach('process-replies', 'wait');"

psql -d postgres -p $port_subscriber -c "alter subscription sub enable;"

# Due to lack of any message from walsnder, let apply worker send feedback with 
# flush position as lsn1 (origin_lsn).
# But this will only be processed by walsender after we release injection point
# 'process-replies'.

sleep 1

# Check origin is still at lsn1 and confirmed_flush at lsn2
psql -d postgres -p $port_subscriber -c "select * from pg_replication_origin_status where local_id = 1;"
psql -d postgres -p $port_primary -c "SELECT slot_name, confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name='logicalslot'"

# Disable the subscription  and release walsender's wait before ProcessRepliesIfAny()
psql -d postgres -p $port_subscriber -c "alter subscription sub disable;"

psql -d postgres -p $port_primary -c "; SELECT injection_points_wakeup('process-replies');SELECT injection_points_detach('process-replies');"

# After injection point is released, let previous walsender process the reply from apply worker
# and move the confirmed_flush to lsn1. It will then exit
sleep 1

# Check confirmed_flush is moved to lsn1
psql -d postgres -p $port_primary -c "SELECT slot_name, confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name='logicalslot'"

# Now let new walsnder start streaming from lsn1. This will result in replay of 
# 'INSERT to tab2 (lsn2)' and data duplication in tab2.
psql -d postgres -p $port_subscriber -c "alter subscription sub enable;"

exit
