# Copyright (c) 2021-2023, PostgreSQL Global Development Group

# Tests for already-propagated WAL segments ending in incomplete WAL records.

use strict;
use warnings;

use File::Copy;
use PostgreSQL::Test::Cluster;
use Test::More;
use Fcntl qw(SEEK_SET);

use integer;    # causes / operator to use integer math

# Values queried from the server
my $WAL_SEGMENT_SIZE;
my $WAL_BLOCK_SIZE;
my $TLI;

# Build name of a WAL segment.
sub wal_segment_name
{
	my $tli = shift;
	my $segment = shift;
	return sprintf("%08X%08X%08X", $tli, 0, $segment);
}

# Calculate from a LSN (in bytes) its segment number and its offset.
sub lsn_to_segment_and_offset
{
	my $lsn = shift;
	return ($lsn / $WAL_SEGMENT_SIZE, $lsn % $WAL_SEGMENT_SIZE);
}

# Write some arbitrary data in WAL for the given segment at LSN.
# This should be called while the cluster is not running.
sub write_wal
{
	my $node = shift;
	my $tli = shift;
	my $lsn = shift;
	my $data = shift;

	my ($segment, $offset) = lsn_to_segment_and_offset($lsn);
    my $segment_name = wal_segment_name($tli, $segment);
	my $path = sprintf("%s/pg_wal/%s", $node->data_dir, $segment_name);

	open my $fh, "+<:raw", $path or die;
	seek($fh, $offset, SEEK_SET) or die;
	print $fh $data;
	close $fh;

    return $path;
}

# Emit a WAL record of arbitrary size.  Returns the end LSN of the
# record inserted, in bytes.
sub emit_message
{
	my $node = shift;
	my $size = shift;
	return int(
		$node->safe_psql(
			'postgres',
			"SELECT pg_logical_emit_message(true, '', repeat('a', $size)) - '0/0'"
		));
}

# Get the current insert LSN of a node, in bytes.
sub get_insert_lsn
{
	my $node = shift;
	return int(
		$node->safe_psql(
			'postgres', "SELECT pg_current_wal_insert_lsn() - '0/0'"));
}

# Get GUC value, converted to an int.
sub get_int_setting
{
	my $node = shift;
	my $name = shift;
	return int(
		$node->safe_psql(
			'postgres',
			"SELECT setting FROM pg_settings WHERE name = '$name'"));
}

sub start_of_page
{
	my $lsn = shift;
	return $lsn & ~($WAL_BLOCK_SIZE - 1);
}

# Make sure we are far away enough from the end of a page that we could insert
# a couple of small records.  This inserts a few records of a fixed size, until
# the threshold gets close enough to the end of the WAL page inserting records
# to.
sub advance_out_of_record_splitting_zone
{
	my $node = shift;

	my $page_threshold = $WAL_BLOCK_SIZE / 4;
	my $end_lsn = get_insert_lsn($node);
	my $page_offset = $end_lsn % $WAL_BLOCK_SIZE;
	while ($page_offset >= $WAL_BLOCK_SIZE - $page_threshold)
	{
		emit_message($node, $page_threshold);
		$end_lsn = get_insert_lsn($node);
		$page_offset = $end_lsn % $WAL_BLOCK_SIZE;
	}
	return $end_lsn;
}

my $primary = PostgreSQL::Test::Cluster->new('primary');
$primary->init(allows_streaming => 1);
# We need these settings for stability of WAL behavior.
$primary->append_conf(
	'postgresql.conf', qq(
autovacuum = off
wal_keep_size = 1GB
));
$primary->enable_archiving();
$primary->start;
$primary->backup('backup');

$primary->safe_psql('postgres', "CREATE TABLE t AS SELECT 0");

$WAL_SEGMENT_SIZE = get_int_setting($primary, 'wal_segment_size');
$WAL_BLOCK_SIZE = get_int_setting($primary, 'wal_block_size');
$TLI = $primary->safe_psql('postgres',
	"SELECT timeline_id FROM pg_control_checkpoint()");

# make sure that the current page has enought space to fit
# the beginning of the record that spans on two pages.
emit_message($primary, 0);
my $end_lsn = advance_out_of_record_splitting_zone($primary);

# Do some math to find the record size that will overflow the page.
my $overflow_size = $WAL_BLOCK_SIZE - ($end_lsn % $WAL_BLOCK_SIZE);

# write the record and get the LSN where it ends.
$end_lsn = emit_message($primary, $overflow_size);
$primary->stop('immediate');

# Find the beginning of the page with continuation record and fill
# the entire page with zero bytes to simulate broken replication.
my $start_page = start_of_page($end_lsn);
my $wal_file = write_wal($primary, $TLI, $start_page,
                         "\x00" x $WAL_BLOCK_SIZE);

# copy the file we just "hacked" to the archive
copy($wal_file, $primary->archive_dir);

# start standby nodes and make sure they replayed out file from the archive
my $standby1 = PostgreSQL::Test::Cluster->new('standby1');
$standby1->init_from_backup($primary, 'backup');
$standby1->set_standby_mode();
$standby1->enable_restoring($primary, 0);

my $standby2 = PostgreSQL::Test::Cluster->new('standby2');
$standby2->init_from_backup($primary, 'backup');
$standby2->set_standby_mode();
$standby2->enable_restoring($primary, 0);


my $log_size1 = -s $standby1->logfile;
my $log_size2 = -s $standby2->logfile;

$standby1->start;
$standby2->start;

my ($segment, $offset) = lsn_to_segment_and_offset($start_page);
my $segment_name = wal_segment_name($TLI, $segment);
my $pattern = qq(invalid magic number 0000 .* segment $segment_name.* offset $offset);

# We expect both standby nodes to complain about empty page when trying to
# assemble the record that spans over two pages.
ok($standby1->log_contains($pattern, $log_size1), 'no split record');
ok($standby2->log_contains($pattern, $log_size2), 'no split record');


$standby1->promote;

# This command makes standby2 to go into the infinite loop of reading
# continuation record from the page that is filled with zero bytes.
# i.e., wait_for_catchup() will fail with timeout.
$standby1->safe_psql('postgres', 'SELECT pg_switch_wal()');

# Make sure WAL moves forward.
$standby1->safe_psql('postgres', 'INSERT INTO t SELECT * FROM generate_series(1, 1000)');

# configure standby2 to stream from just promoted standby1 (it also pulls WAL
# files from the archive).
$standby2->enable_streaming($standby1);
$standby2->reload;

# If there was a WAL switch shortly after promote the standby2 will
# never catch up and its log will be full of error messages like:
# invalid magic number 0000 in WAL segment 000000020000000000000003, LSN 0/301A000, offset 106496
$standby1->wait_for_replay_catchup($standby2);

my $result = $standby2->safe_psql('postgres', "SELECT count(*) FROM t");
print "standby2: $result\n";
is($result, qq(1001), 'check streamed content on standby2');

done_testing();
