#!/bin/bash

# other files
#    instances.sh: start instances with assertions
#    instances_fast.sh: start instances without assertions
#    testset.sh: calls pgbench_derail2.sh  (this file)
#    pubsub.sh:  dump some replication state info
#
# peculiar to my (er) setup:
#    server version at compile time stamped with date + commit hash
#    I misuse information_schema.sql_packages  at compile time to store patch information
#    instances are  in $pg_stuff_dir/pg_installations/pgsql.<project name>

pg_stuff_dir=$HOME/pg_stuff

# assume both instances are running, on port 6972 (master) and 6973 (replica)
port1=6972  logfile1=$pg_stuff_dir/pg_installations/pgsql.logical_replication/logfile.logical_replication
port2=6973  logfile2=$pg_stuff_dir/pg_installations/pgsql.logical_replication2/logfile.logical_replication2

# clear logs
echo > $logfile1
echo > $logfile2

function pgsettings()
{
for port in $port1 $port2;
do
sql1="select 
   current_setting('port')                            as port
 , current_setting('shared_buffers')                  as sb
 , current_setting('work_mem')                        as wm
 , current_setting('effective_cache_size')            as ecs
 , current_setting('maintenance_work_mem')            as mwm
 , current_setting('max_replication_slots')           as mrs
 , current_setting('max_worker_processes')            as mwp
 , current_setting('max_logical_replication_workers') as mlrw
 , current_setting('max_wal_senders')                 as mws
 , current_setting('wal_sender_timeout')              as wst
 , current_setting('debug_assertions')                as da

" 
hdr1=$( echo "$sql1" | psql -qAX  -p $port | head -n1 )
bod1=$( echo "$sql1" | psql -qtAX -p $port )
 md1=$( echo $bod1   | md5sum )
echo "-- $hdr1   $bod1   ${md1:1:9}" | tr "[\|]" "[ ]"
done
}

logfile_ts=$( date +"%Y%m%d_%H_%M_%S" )

scale=1;              if [[ ! "$1" == "" ]]; then scale=$1;      fi
clients=1;            if [[ ! "$2" == "" ]]; then clients=$2;    fi
duration=60;          if [[ ! "$3" == "" ]]; then duration=$3;   fi
date_str=$logfile_ts; if [[ ! "$4" == "" ]]; then date_str=$4;   fi
CLEAN_ONLY='';        if [[ ! "$5" == "" ]]; then CLEAN_ONLY=$5; fi

  threads=8

     master_version=$( psql -qtAXc "show server_version" -p $port1 )
    replica_version=$( psql -qtAXc "show server_version" -p $port2 )
 master_commit_hash=$( psql -qtAXc "show server_version" -p $port1 | cut -d _ -f 6 )
replica_commit_hash=$( psql -qtAXc "show server_version" -p $port2 | cut -d _ -f 6 )
  master_start_time=$( psql -qtAXc "select pg_postmaster_start_time()" -p $port1 )
 replica_start_time=$( psql -qtAXc "select pg_postmaster_start_time()" -p $port2 )
   master_patch_md5=$( echo "select md5(comments) from information_schema.sql_packages where feature_name = 'patch md5'"|psql -qtAXp $port1 )
  replica_patch_md5=$( echo "select md5(comments) from information_schema.sql_packages where feature_name = 'patch md5'"|psql -qtAXp $port2 )
         master_s_c=$( psql -qtAXc "show synchronous_commit" -p $port1 )
        replica_s_c=$( psql -qtAXc "show synchronous_commit" -p $port2 )
      master_assert=$( psql -qtAXc "show debug_assertions" -p $port1 )
     replica_assert=$( psql -qtAXc "show debug_assertions" -p $port2 )

echo    "============================================================================"
echo    "-- scale $scale   clients $clients   duration $duration   CLEAN_ONLY=$CLEAN_ONLY"
echo    "============================================================================"
echo    "-- hostname: "$( hostname -s )
echo    "-- time: "$( date +"%Y.%m.%d %H:%M:%S" )
echo    "-- timestamp: $date_str"
#echo -n "-- "; ps -ef f | grep 6972 | grep -Evw 'grep|xterm|screen|SCREEN' | less -iS
#echo -n "-- "; ps -ef f | grep 6973 | grep -Evw 'grep|xterm|screen|SCREEN' | less -iS
echo    "-- master_version:  ${master_version}"
echo    "-- replica_version: ${replica_version}"
echo    "-- master_start_time $master_start_time     replica_start_time $replica_start_time"
if [[ "$master_patch_md5" == "$replica_patch_md5" ]];
then
    echo "-- master  patch-md5 [$master_patch_md5]"
    echo "-- replica patch-md5 [$replica_patch_md5] (ok)"
else
    echo    "-- master  patch-md5 [$master_patch_md5] - replica patch-md5 NOT the same, bailing out"
    echo    "-- replica patch-md5 [$replica_patch_md5] - replica patch-md5 NOT the same, bailing out"
    exit -1
fi
echo "-- synchronous_commit, master  [$master_s_c]  replica [$replica_s_c]"
echo "-- master_assert  [$master_assert]  replica_assert [$replica_assert]"
echo "-- self md5 "$( md5sum $0 )

pgsettings

unset PGSERVICEFILE PGSERVICE   # PGPORT PGDATA PGHOST
export PGDATABASE=testdb

 pgdata_master=$pg_stuff_dir/pg_installations/pgsql.logical_replication/data
pgdata_replica=$pg_stuff_dir/pg_installations/pgsql.logical_replication2/data

function cb()
{
  #  display the 4 pgbench tables' accumulated content as md5s
  #  a,b,t,h stand for:  pgbench_accounts, -branches, -tellers, -history
  num_tables=$( echo "select count(*) from pg_class where relkind = 'r' and relname ~ '^pgbench_'" | psql -qtAX )
  if [[ $num_tables -ne 4 ]] 
  then
     echo "pgbench tables not 4 - exit" 
     exit
  fi
  for port in $port1 $port2
  do
    md5_a=$(echo "select * from pgbench_accounts order by aid"|psql -qtAXp $port|md5sum|cut -b 1-9)
    md5_b=$(echo "select * from pgbench_branches order by bid"|psql -qtAXp $port|md5sum|cut -b 1-9)
    md5_t=$(echo "select * from pgbench_tellers  order by tid"|psql -qtAXp $port|md5sum|cut -b 1-9)
    md5_h=$(echo "select * from pgbench_history  order by hid"|psql -qtAXp $port|md5sum|cut -b 1-9)
    cnt_a=$(echo "select count(*) from pgbench_accounts"      |psql -qtAXp $port)
    cnt_b=$(echo "select count(*) from pgbench_branches"      |psql -qtAXp $port)
    cnt_t=$(echo "select count(*) from pgbench_tellers"       |psql -qtAXp $port)
    cnt_h=$(echo "select count(*) from pgbench_history"       |psql -qtAXp $port)
    md5_total[$port]=$( echo "${md5_a} ${md5_b} ${md5_t} ${md5_h}" | md5sum )
    printf "$port a,b,t,h: %8d %6d %6d %6d" $cnt_a $cnt_b $cnt_t $cnt_h
    echo -n "  $md5_a $md5_b $md5_t $md5_h"
    if   [[ $port -eq $port1 ]]; then echo    " master"
    elif [[ $port -eq $port2 ]]; then echo -n " replica"
    else                              echo    "           ERROR  "
    fi
  done
  if [[ "${md5_total[$port1]}" == "${md5_total[$port2]}" ]]
  then
    echo " ok"
  else
    echo " NOK"
  fi
}

function clean_pubsub()
{

if [[ 1 -eq 1 ]]
then

   echo "$1"

   pubsub.sh $port1

   sub_count=$( echo "select count(*) from pg_subscription" | psql -qtAXp $port2 )
   if  [[ $sub_count -ne 0 ]]; then
     echo "drop subscription if exists sub1"              | psql -qXp $port2 
     echo "delete from pg_subscription;
           delete from pg_subscription_rel;
           delete from pg_replication_origin; "  | psql -qXp $port2 
   fi
   
   sub_repl_slot_count=$( echo "select count(*) from pg_replication_slots" | psql -qtAXp $port2 )
   if  [[ $sub_repl_slot_count -ne 0 ]]; then
     echo "select pg_drop_replication_slot('sub1')" | psql -Xp $port1
   fi
   
   pub_count=$( echo "select count(*) from pg_publication" | psql -qtAXp $port1 )
   if  [[ $pub_count -ne 0 ]]; then
     echo "drop publication if exists pub1" | psql -qXp $port1
   fi
   
   pub_repl_slot_count=$( echo "select count(*) from pg_replication_slots" | psql -qtAXp $port1 )
   if  [[ $pub_repl_slot_count -ne 0 ]]; then
     echo "select pg_drop_replication_slot('sub1')" | psql -qXp $port1
   fi

   try_count=0 

   pub_count=$(           echo "select count(*) from pg_publication"       | psql -qtAXp $port1 )
   pub_repl_slot_count=$( echo "select count(*) from pg_replication_slots" | psql -qtAXp $port1 )
   sub_count=$(           echo "select count(*) from pg_subscription"      | psql -qtAXp $port2 )
   sub_repl_slot_count=$( echo "select count(*) from pg_replication_slots" | psql -qtAXp $port2 )

   while [[ $pub_count           -ne 0 ]] \
      || [[ $sub_count           -ne 0 ]] \
      || [[ $pub_repl_slot_count -ne 0 ]] \
      || [[ $sub_repl_slot_count -ne 0 ]] \
   ;
   do
       sleep 10
       try_count=$(( $try_count + 1 ))
       if [[ $try_count -gt 25 ]]
       then
           exit 11
       fi
       echo "
             pub_count  $pub_count
   pub_repl_slot_count  $pub_repl_slot_count
             sub_count  $sub_count
   sub_repl_slot_count  $sub_repl_slot_count             (try_count  $try_count)" 
       echo

       sub_count=$( echo "select count(*) from pg_subscription" | psql -qtAXp $port2 )
       if  [[ $sub_count -ne 0 ]]; then
         echo "sub_count -ne 0 : deleting sub1 (plain)"
         echo "drop subscription if exists sub1"              | psql -qXp $port2 
         echo "delete from pg_subscription;
               delete from pg_subscription_rel;
               delete from pg_replication_origin; " | psql -qXp $port2 
       fi
       
       sub_repl_slot_count=$( echo "select count(*) from pg_replication_slots" | psql -qtAXp $port2 )
       if  [[ $sub_repl_slot_count -ne 0 ]]; then
         echo "sub_repl_slot_count -ne 0 - deleting"
         echo "select pg_drop_replication_slot('sub1')" | psql -Xp $port1
       fi
       
       pub_count=$( echo "select count(*) from pg_publication" | psql -qtAXp $port1 )
       if  [[ $pub_count -ne 0 ]]; then
         echo "pub_count -ne 0 - deleting pub1"
         echo "drop publication if exists pub1" | psql -qXp $port1
       fi
       
       pub_repl_slot_count=$( echo "select count(*) from pg_replication_slots" | psql -qtAXp $port1 )
       if  [[ $pub_repl_slot_count -ne 0 ]]; then
         echo "pub_repl_slot_count -ne 0 - deleting (sub1)"
         echo "select pg_drop_replication_slot('sub1')" | psql -qXp $port1
       fi
       
       pub_count=$(           echo "select count(*) from pg_publication"       | psql -qtAXp $port1 )
       pub_repl_slot_count=$( echo "select count(*) from pg_replication_slots" | psql -qtAXp $port1 )
       sub_count=$(           echo "select count(*) from pg_subscription"      | psql -qtAXp $port2 )
       sub_repl_slot_count=$( echo "select count(*) from pg_replication_slots" | psql -qtAXp $port2 )
   done
    
fi

}

table_info_on_fail()
{
   # And finally if you could dump the contents of pg_subscription_rel,
   # pg_replication_origin_status on subscriber and pg_replication_slots on
   # publisher at the end of the failed run that would also help.
   echo "table pg_subscription_rel; table pg_replication_origin_status;" | psql -aqX -p $port2  # on subscriber and 
   echo "table pg_replication_slots;" | psql -aqX -p $port1   # ... on publisher
}

# invoke the function:
clean_pubsub "clean-at-start-call"

if [[ ! "$CLEAN_ONLY" == "" ]]
then
    exit 0
fi

   echo "drop table if exists pgbench_accounts; drop table if exists pgbench_branches; drop table if exists pgbench_tellers; drop table if exists pgbench_history;" | psql -qXp $port1 \
&& echo "drop table if exists pgbench_accounts; drop table if exists pgbench_branches; drop table if exists pgbench_tellers; drop table if exists pgbench_history;" | psql -qXp $port2 \
&& pgbench -p $port1 -qis ${scale//_/} && echo " alter table pgbench_history add column hid serial primary key;" | psql -q1Xp $port1 && pg_dump -F c -p $port1     \
           --exclude-table-data=pgbench_history  --exclude-table-data=pgbench_accounts --exclude-table-data=pgbench_branches --exclude-table-data=pgbench_tellers  \
           -t pgbench_history  -t pgbench_accounts -t pgbench_branches -t pgbench_tellers | pg_restore -1 -p $port2 -d testdb

appname=derail2
echo "create publication pub1 for all tables;" | psql -p $port1 -aqtAX
echo "create subscription sub1 connection 'port=${port1} application_name=$appname' publication pub1 with(enabled=false);
alter subscription sub1 enable;" | psql -p $port2 -aqtAX

pubsub.sh $port1

RUN_PGBENCH=1

if [[ $RUN_PGBENCH -eq 1 ]]
then
  pseconds=$( echo "$duration / 5" | bc )
  echo "-- pgbench -c $clients -j $threads -T $duration -P $pseconds -n   --  scale $scale"
           pgbench -c $clients -j $threads -T $duration -P $pseconds -n    #  scale $scale
else
  echo "-- not running pgbench..."
fi

waiting1=0
echo "-- waiting ${waiting1}s... (always)"
sleep $waiting1

wait_total=$waiting1

pubsub.sh $port1

date +"%Y.%m.%d %H:%M:%S"
echo "-- getting md5 (cb)"
cb_text1=$(cb)
cb_text1_md5=$( echo "$cb_text1" | md5sum )
echo "${cb_text1}  ${cb_text1_md5:1:9}"
date +"%Y.%m.%d %H:%M:%S"

loop_counter=0
wait_chunk=15
wait_max=3600
unchanged=0

while [[ 1 -eq 1 ]];
do
    # normal case:
    if echo "$cb_text1" | grep -qw 'replica ok';
    then
       echo "-- All is well."
       echo "-- ${wait_total} seconds total.  scale $scale  clients $clients  -T $duration"
       break
    fi

    if [[ $unchanged -eq 5 ]]
    then
       table_info_on_fail
       #wal_info_on_fail
    fi
    waited_already=$(( $loop_counter * $wait_chunk ))
    if [[ $waited_already -gt $wait_max ]]
    then
       echo "-- Not good, but breaking out of wait (waited more than ${wait_max} s)"
       #wal_info_on_fail
       #wal_info_on_fail
       echo "-- (wait_total ${wait_total} s)"
       break
    fi
    echo "-- wait another ${wait_chunk} s   (total ${wait_total} s) (unchanged $unchanged)"
    sleep $wait_chunk;
    wait_total=$(( $wait_total + $wait_chunk ))
    echo "-- getting md5 (cb)"
    cb_text1=$(cb)
    cb_text1_md5_new=$( echo "$cb_text1" | md5sum )
    if [[ "$cb_text1_md5_new" == "$cb_text1_md5" ]] 
    then
        unchanged=$(( $unchanged + 1 ))
    else
        unchanged=0
    fi
    echo "${cb_text1}   ${cb_text1_md5_new:1:9}"
    if [[ $unchanged -gt 20 ]]
    then
       echo "-- Not good, but breaking out of wait ($unchanged times no change)"
       table_info_on_fail
       #wal_info_on_fail
       echo "-- (wait_total ${wait_total} s)"
       break
    fi
    cb_text1_md5=$cb_text1_md5_new
    loop_counter=$(( loop_counter + 1 ))
done

# invoke the cleanup function:
clean_pubsub "clean-at-end-call"

  dest_dir='logfiles'
_time_=$( date +"%H%M")

if [[ ! -d logfiles ]]
then
  mkdir logfiles
fi

if  echo "${cb_text1}" | grep -qw 'replica ok'
then
  cp $logfile1 ${dest_dir}/logrep.$date_str.${_time_}.1.scale_${scale}.clients_$clients.ok.log
  cp $logfile2 ${dest_dir}/logrep.$date_str.${_time_}.2.scale_${scale}.clients_$clients.ok.log
else                                                                           
  cp $logfile1 ${dest_dir}/logrep.$date_str.${_time_}.1.scale_${scale}.clients_$clients.NOK.log
  cp $logfile2 ${dest_dir}/logrep.$date_str.${_time_}.2.scale_${scale}.clients_$clients.NOK.log
fi

