#!/bin/bash

set -ue
set -o pipefail

cd "${BASH_SOURCE[0]%/*}" || exit

POSTGRES_VERSION=17

# Adjust these for the system under test
PATH="/usr/pgsql-$POSTGRES_VERSION/bin:$PATH"
PRIMARY_DIR="/var/lib/pgsql/$POSTGRES_VERSION/primary"
REPLICA_DIR="/var/lib/pgsql/$POSTGRES_VERSION/replica"

PRIMARY_PORT=5340
REPLICA_PORT=5341

DBNAME="test_db"
TABLE="test_table"
TRIGGER_FILE="./stop_inserting"

CYAN="\e[1;36m"
RESET="\e[0m"

TEST_NUM=0
reset_databases() (
    test_start_time=$(date +"%Y-%m-%d_%H-%M-%S")

    if pg_ctl status -D "$REPLICA_DIR" > /dev/null 2>&1 && ! pg_ctl stop -m immediate -D "$REPLICA_DIR"; then
        echo "ERROR: Could not stop replica."
        return 1
    fi

    if ! rm -rf "$REPLICA_DIR"; then
        echo "ERROR: Could not delete replica directory \"$REPLICA_DIR\"."
        return 1
    fi

    if pg_ctl status -D "$PRIMARY_DIR" > /dev/null 2>&1 && ! pg_ctl stop -m immediate -D "$PRIMARY_DIR"; then
        echo "ERROR: Could not stop primary."
        return 1
    fi

    if ! rm -rf "$PRIMARY_DIR"; then
        echo "ERROR: Could not delete primary directory \"$PRIMARY_DIR\"."
        return 1
    fi

    if ! initdb -D "$PRIMARY_DIR" --locale=C; then
        echo "ERROR: Could not create primary."
        return 1
    fi

    cat >> "$PRIMARY_DIR"/postgresql.auto.conf << EOF
port = $PRIMARY_PORT
wal_level = logical
max_worker_processes = 10
max_replication_slots = 10
max_wal_senders = 10
max_connections = 1000
log_directory = '$PWD/log'
log_filename = 'postgresql-$test_start_time-test-$TEST_NUM.log'
log_checkpoints = on
log_connections = on
log_disconnections = on
log_replication_commands = on
log_duration = off
log_min_duration_statement = 0
log_statement = all
log_replication_commands = on
log_line_prefix = '%m [%p] port=$PRIMARY_PORT %q%u@%d/%a '
EOF

    cat >> "$PRIMARY_DIR"/pg_hba.conf << EOF
host   replication      postgres     127.0.0.1/32            trust
host       test_db      postgres     127.0.0.1/32            trust
host       test_db      postgres     127.0.0.1/32            trust
EOF

    if ! pg_ctl start -D "$PRIMARY_DIR"; then
        echo "ERROR: Could not start primary."
        return 1
    fi

    if ! pg_basebackup -h localhost -U postgres -p "$PRIMARY_PORT" -D "$REPLICA_DIR" -P -Xs -R; then
        echo "ERROR: Could not create replica."
        return 1
    fi

    cat >> "$REPLICA_DIR"/postgresql.auto.conf << EOF
port = $REPLICA_PORT
archive_mode = off
hot_standby = on
log_directory = '$PWD/log'
log_filename = 'postgresql-$test_start_time-test-$TEST_NUM.log'
log_checkpoints = on
log_connections = on
log_disconnections = on
log_replication_commands = on
log_duration = off
log_min_duration_statement = 0
log_statement = all
log_replication_commands = on
log_line_prefix = '%m [%p] port=$REPLICA_PORT %q%u@%d/%a '
EOF

    if ! pg_ctl start -D "$REPLICA_DIR"; then
        echo "ERROR: Could not start replica."
        return 1
    fi
)

create_test_database() (
    psql -p "$PRIMARY_PORT" -c "CREATE DATABASE $DBNAME"
    psql -p "$PRIMARY_PORT" -d "$DBNAME" -c "CREATE TABLE $TABLE (f1 int primary key, insert_time timestamp without time zone)"
)

# Adjust these variables to tweak the load characteristics
NUM_THREADS=${NUM_THREADS:-20}
INSERT_WIDTH=${INSERT_WIDTH:-100}

add_records_to_test_table() (
    id=$1
    start_time=$(date +%s)
    start=$((id * INSERT_WIDTH)) end=0

    while true; do
        end=$((start + INSERT_WIDTH - 1))
        echo "INSERT INTO $TABLE SELECT val, CURRENT_TIMESTAMP FROM generate_series($start, $end) S(val);" |
            psql -p "$PRIMARY_PORT" -d "$DBNAME" > /dev/null
        start=$((start + NUM_THREADS * INSERT_WIDTH))

        if [[ -f "$TRIGGER_FILE" ]] || (( $(date "+%s") - start_time > 15 )); then
            break
        fi
    done

    return 0
)

INSERT_PIDS=()
start_insert_threads() {
    echo "*** STARTING $NUM_THREADS LOOPS INSERTING $INSERT_WIDTH RECORDS PER ITERATION ..."
    INSERT_PIDS=()
    for id in $(seq 0 $((NUM_THREADS - 1))); do
        add_records_to_test_table "$id" &
        INSERT_PIDS+=($!)
    done
}

create_subscriber() (
    echo "*** Stopping replica, then running pg_createsubscriber..."

    pg_ctl stop -m fast -D "$REPLICA_DIR"

    pg_createsubscriber -D "$REPLICA_DIR" \
        --database="$DBNAME" \
        --subscription="sub" \
        --publication="pub" \
        --publisher-server="host=localhost port=$PRIMARY_PORT dbname=$DBNAME"

    pg_ctl start -D "$REPLICA_DIR"

    echo "*** Successfully started logical replica on port $REPLICA_PORT."
)

run_test() (
    create_test_database
    rm -f "$TRIGGER_FILE"
    start_insert_threads
    sleep 2
    create_subscriber
    sleep 0.1

    touch "$TRIGGER_FILE"
    errors=0
    for pid in "${INSERT_PIDS[@]}"; do
        if ! wait "$pid"; then
            echo "ERROR: insert thread with PID $pid failed"
            errors=1
        fi
    done
    (( errors )) && return 1

    echo "*** ALL INSERT LOOPS FINISHED"

    last_src_count=0
    last_dest_count=0

    # wait until the counts stop increasing
    while true; do
        src_count=$(psql -qt -p "$PRIMARY_PORT" -d "$DBNAME" -c "SELECT COUNT(f1) FROM $TABLE")
        dest_count=$(psql -qt -p "$REPLICA_PORT" -d "$DBNAME" -c "SELECT COUNT(f1) FROM $TABLE")

        if [[ $dest_count -ne $last_dest_count ]] || [[ $src_count -ne $last_src_count ]]; then
            last_dest_count=$dest_count
            last_src_count=$src_count
            sleep 0.5s
            continue;
        fi

        echo "SOURCE COUNT = $src_count"
        echo "DEST COUNT   = $dest_count"

        if (( src_count != dest_count )); then
            echo "ERROR: record count mismatch"
            return 1
        fi

        break
    done

    echo -e "*** TEST PASSED\n"
)

for TEST_NUM in {1..10000}; do
    echo -e "${CYAN}*** STARTING TEST #$TEST_NUM${RESET}"
    reset_databases && run_test
done
