Logical replication timeout

Started by RECHTÉ Marcabout 1 year ago17 messages
#1RECHTÉ Marc
marc.rechte@meteo.fr

Hello,

For some unknown reason (probably a very big transaction at the source), we experienced a logical decoding breakdown,
due to a timeout from the subscriber side (either wal_receiver_timeout or connexion drop by network equipment due to inactivity).

The problem is, that due to that failure, the wal_receiver process stops. When the wal_sender is ready to send some data, it finds the connexion broken and exits.
A new wal_sender process is created that restarts from the beginning (restart LSN). This is an endless loop.

Checking the network connexion between wal_sender and wal_receiver, we found that no traffic occurs for hours.

We first increased wal_receiver_timeout up to 12h and still got a disconnection on the receiver party:

2024-10-17 16:31:58.645 GMT [1356203:2] user=,db=,app=,client= ERROR: terminating logical replication worker due to timeout
2024-10-17 16:31:58.648 GMT [849296:212] user=,db=,app=,client= LOG: background worker "logical replication worker" (PID 1356203) exited with exit code 1

Then put this parameter to 0, but got then disconnected by the network (note the slight difference in message):

2024-10-21 11:45:42.867 GMT [1697787:2] user=,db=,app=,client= ERROR: could not receive data from WAL stream: could not receive data from server: Connection timed out
2024-10-21 11:45:42.869 GMT [849296:40860] user=,db=,app=,client= LOG: background worker "logical replication worker" (PID 1697787) exited with exit code 1

The message is generated in libpqrcv_receive function (replication/libpqwalreceiver/libpqwalreceiver.c) which calls pqsecure_raw_read (interfaces/libpq/fe-secure.c)

The last message "Connection timed out" is the errno translation from the recv system function:

ETIMEDOUT Connection timed out (POSIX.1-2001)

When those timeout occurred, the sender was still busy deleting files from data/pg_replslot/bdcpb21_sene, accumulating more than 6 millions small ".spill" files.
It seems this very long pause is at cleanup stage were PG is blindly trying to delete those files.

strace on wal sender show tons of calls like:

unlink("pg_replslot/bdcpb21_sene/xid-2 721 821 917-lsn-439C-0.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-1000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-2000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-3000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-4000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-5000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)

This occurs in ReorderBufferRestoreCleanup (backend/replication/logical/reorderbuffer.c).
The call stack presumes this may probably occur in DecodeCommit or DecodeAbort (backend/replication/logical/decode.c):

unlink("pg_replslot/bdcpb21_sene/xid-2730444214-lsn-43A6-88000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)

/usr/lib64/libc-2.17.so(unlink+0x7) [0xf12e7]
/usr/pgsql-15/bin/postgres(ReorderBufferRestoreCleanup.isra.17+0x5d) [0x769e3d]
/usr/pgsql-15/bin/postgres(ReorderBufferCleanupTXN+0x166) [0x76aec6] <=== replication/logical/reorderbuff.c:1480 (mais cette fonction (static) n'est utiliée qu'au sein de ce module ...)
/usr/pgsql-15/bin/postgres(xact_decode+0x1e7) [0x75f217] <=== replication/logical/decode.c:175
/usr/pgsql-15/bin/postgres(LogicalDecodingProcessRecord+0x73) [0x75eee3] <=== replication/logical/decode.c:90, appelle la fonction rmgr.rm_decode(ctx, &buf) = 1 des 6 méthodes du resource manager
/usr/pgsql-15/bin/postgres(XLogSendLogical+0x4e) [0x78294e]
/usr/pgsql-15/bin/postgres(WalSndLoop+0x151) [0x785121]
/usr/pgsql-15/bin/postgres(exec_replication_command+0xcba) [0x785f4a]
/usr/pgsql-15/bin/postgres(PostgresMain+0xfa8) [0x7d0588]
/usr/pgsql-15/bin/postgres(ServerLoop+0xa8a) [0x493b97]
/usr/pgsql-15/bin/postgres(PostmasterMain+0xe6c) [0x74d66c]
/usr/pgsql-15/bin/postgres(main+0x1c5) [0x494a05]
/usr/lib64/libc-2.17.so(__libc_start_main+0xf4) [0x22554]
/usr/pgsql-15/bin/postgres(_start+0x28) [0x494fb8]

We did not find any other option than deleting the subscription to stop that loop and start a new one (thus loosing transactions).

The publisher is PostgreSQL 15.6
The subscriber is PostgreSQL 14.5

Thanks

#2Ashutosh Bapat
ashutosh.bapat.oss@gmail.com
In reply to: RECHTÉ Marc (#1)
Re: Logical replication timeout

On Wed, Nov 6, 2024 at 1:07 PM RECHTÉ Marc <marc.rechte@meteo.fr> wrote:

Hello,

For some unknown reason (probably a very big transaction at the source), we experienced a logical decoding breakdown,
due to a timeout from the subscriber side (either wal_receiver_timeout or connexion drop by network equipment due to inactivity).

The problem is, that due to that failure, the wal_receiver process stops. When the wal_sender is ready to send some data, it finds the connexion broken and exits.
A new wal_sender process is created that restarts from the beginning (restart LSN). This is an endless loop.

Checking the network connexion between wal_sender and wal_receiver, we found that no traffic occurs for hours.

We first increased wal_receiver_timeout up to 12h and still got a disconnection on the receiver party:

2024-10-17 16:31:58.645 GMT [1356203:2] user=,db=,app=,client= ERROR: terminating logical replication worker due to timeout
2024-10-17 16:31:58.648 GMT [849296:212] user=,db=,app=,client= LOG: background worker "logical replication worker" (PID 1356203) exited with exit code 1

Then put this parameter to 0, but got then disconnected by the network (note the slight difference in message):

2024-10-21 11:45:42.867 GMT [1697787:2] user=,db=,app=,client= ERROR: could not receive data from WAL stream: could not receive data from server: Connection timed out
2024-10-21 11:45:42.869 GMT [849296:40860] user=,db=,app=,client= LOG: background worker "logical replication worker" (PID 1697787) exited with exit code 1

The message is generated in libpqrcv_receive function (replication/libpqwalreceiver/libpqwalreceiver.c) which calls pqsecure_raw_read (interfaces/libpq/fe-secure.c)

The last message "Connection timed out" is the errno translation from the recv system function:

ETIMEDOUT Connection timed out (POSIX.1-2001)

When those timeout occurred, the sender was still busy deleting files from data/pg_replslot/bdcpb21_sene, accumulating more than 6 millions small ".spill" files.
It seems this very long pause is at cleanup stage were PG is blindly trying to delete those files.

strace on wal sender show tons of calls like:

unlink("pg_replslot/bdcpb21_sene/xid-2 721 821 917-lsn-439C-0.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-1000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-2000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-3000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-4000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-5000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)

This occurs in ReorderBufferRestoreCleanup (backend/replication/logical/reorderbuffer.c).
The call stack presumes this may probably occur in DecodeCommit or DecodeAbort (backend/replication/logical/decode.c):

unlink("pg_replslot/bdcpb21_sene/xid-2730444214-lsn-43A6-88000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)

/usr/lib64/libc-2.17.so(unlink+0x7) [0xf12e7]
/usr/pgsql-15/bin/postgres(ReorderBufferRestoreCleanup.isra.17+0x5d) [0x769e3d]
/usr/pgsql-15/bin/postgres(ReorderBufferCleanupTXN+0x166) [0x76aec6] <=== replication/logical/reorderbuff.c:1480 (mais cette fonction (static) n'est utiliée qu'au sein de ce module ...)
/usr/pgsql-15/bin/postgres(xact_decode+0x1e7) [0x75f217] <=== replication/logical/decode.c:175
/usr/pgsql-15/bin/postgres(LogicalDecodingProcessRecord+0x73) [0x75eee3] <=== replication/logical/decode.c:90, appelle la fonction rmgr.rm_decode(ctx, &buf) = 1 des 6 méthodes du resource manager
/usr/pgsql-15/bin/postgres(XLogSendLogical+0x4e) [0x78294e]
/usr/pgsql-15/bin/postgres(WalSndLoop+0x151) [0x785121]
/usr/pgsql-15/bin/postgres(exec_replication_command+0xcba) [0x785f4a]
/usr/pgsql-15/bin/postgres(PostgresMain+0xfa8) [0x7d0588]
/usr/pgsql-15/bin/postgres(ServerLoop+0xa8a) [0x493b97]
/usr/pgsql-15/bin/postgres(PostmasterMain+0xe6c) [0x74d66c]
/usr/pgsql-15/bin/postgres(main+0x1c5) [0x494a05]
/usr/lib64/libc-2.17.so(__libc_start_main+0xf4) [0x22554]
/usr/pgsql-15/bin/postgres(_start+0x28) [0x494fb8]

I think, we need a call to rb->update_progress_txn(rb, txn,
change->lsn) at regular intervals in ReorderBufferRestoreCleanup()
similar to ReorderBufferProcessTXN(). And may be at more places where
we have potentially long running loops.

--
Best Wishes,
Ashutosh Bapat

#3Shlok Kyal
shlok.kyal.oss@gmail.com
In reply to: RECHTÉ Marc (#1)
Re: Logical replication timeout

On Wed, 6 Nov 2024 at 13:07, RECHTÉ Marc <marc.rechte@meteo.fr> wrote:

Hello,

For some unknown reason (probably a very big transaction at the source), we experienced a logical decoding breakdown,
due to a timeout from the subscriber side (either wal_receiver_timeout or connexion drop by network equipment due to inactivity).

The problem is, that due to that failure, the wal_receiver process stops. When the wal_sender is ready to send some data, it finds the connexion broken and exits.
A new wal_sender process is created that restarts from the beginning (restart LSN). This is an endless loop.

Checking the network connexion between wal_sender and wal_receiver, we found that no traffic occurs for hours.

We first increased wal_receiver_timeout up to 12h and still got a disconnection on the receiver party:

2024-10-17 16:31:58.645 GMT [1356203:2] user=,db=,app=,client= ERROR: terminating logical replication worker due to timeout
2024-10-17 16:31:58.648 GMT [849296:212] user=,db=,app=,client= LOG: background worker "logical replication worker" (PID 1356203) exited with exit code 1

Then put this parameter to 0, but got then disconnected by the network (note the slight difference in message):

2024-10-21 11:45:42.867 GMT [1697787:2] user=,db=,app=,client= ERROR: could not receive data from WAL stream: could not receive data from server: Connection timed out
2024-10-21 11:45:42.869 GMT [849296:40860] user=,db=,app=,client= LOG: background worker "logical replication worker" (PID 1697787) exited with exit code 1

The message is generated in libpqrcv_receive function (replication/libpqwalreceiver/libpqwalreceiver.c) which calls pqsecure_raw_read (interfaces/libpq/fe-secure.c)

The last message "Connection timed out" is the errno translation from the recv system function:

ETIMEDOUT Connection timed out (POSIX.1-2001)

When those timeout occurred, the sender was still busy deleting files from data/pg_replslot/bdcpb21_sene, accumulating more than 6 millions small ".spill" files.
It seems this very long pause is at cleanup stage were PG is blindly trying to delete those files.

strace on wal sender show tons of calls like:

unlink("pg_replslot/bdcpb21_sene/xid-2 721 821 917-lsn-439C-0.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-1000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-2000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-3000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-4000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-5000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)

This occurs in ReorderBufferRestoreCleanup (backend/replication/logical/reorderbuffer.c).
The call stack presumes this may probably occur in DecodeCommit or DecodeAbort (backend/replication/logical/decode.c):

unlink("pg_replslot/bdcpb21_sene/xid-2730444214-lsn-43A6-88000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)

/usr/lib64/libc-2.17.so(unlink+0x7) [0xf12e7]
/usr/pgsql-15/bin/postgres(ReorderBufferRestoreCleanup.isra.17+0x5d) [0x769e3d]
/usr/pgsql-15/bin/postgres(ReorderBufferCleanupTXN+0x166) [0x76aec6] <=== replication/logical/reorderbuff.c:1480 (mais cette fonction (static) n'est utiliée qu'au sein de ce module ...)
/usr/pgsql-15/bin/postgres(xact_decode+0x1e7) [0x75f217] <=== replication/logical/decode.c:175
/usr/pgsql-15/bin/postgres(LogicalDecodingProcessRecord+0x73) [0x75eee3] <=== replication/logical/decode.c:90, appelle la fonction rmgr.rm_decode(ctx, &buf) = 1 des 6 méthodes du resource manager
/usr/pgsql-15/bin/postgres(XLogSendLogical+0x4e) [0x78294e]
/usr/pgsql-15/bin/postgres(WalSndLoop+0x151) [0x785121]
/usr/pgsql-15/bin/postgres(exec_replication_command+0xcba) [0x785f4a]
/usr/pgsql-15/bin/postgres(PostgresMain+0xfa8) [0x7d0588]
/usr/pgsql-15/bin/postgres(ServerLoop+0xa8a) [0x493b97]
/usr/pgsql-15/bin/postgres(PostmasterMain+0xe6c) [0x74d66c]
/usr/pgsql-15/bin/postgres(main+0x1c5) [0x494a05]
/usr/lib64/libc-2.17.so(__libc_start_main+0xf4) [0x22554]
/usr/pgsql-15/bin/postgres(_start+0x28) [0x494fb8]

We did not find any other option than deleting the subscription to stop that loop and start a new one (thus loosing transactions).

The publisher is PostgreSQL 15.6
The subscriber is PostgreSQL 14.5

Thanks

Hi,

Do you have a reproducible test case for the above scenario? Please
share the same.
I am also trying to reproduce the above issue by generating large no.
of spill files.

Thanks and Regards,
Shlok Kyal

#4RECHTÉ Marc
marc.rechte@meteo.fr
In reply to: Shlok Kyal (#3)
Re: Logical replication timeout

This how to reproduce the problem.

Session 1:

psql -c "CREATE TABLE test (i int)" -c "INSERT INTO test SELECT generate_series(1, 2_000_000)"

Session 2:

pg_recvlogical -d postgres --slot=test --create-slot
pg_recvlogical -d postgres --slot=test --start -f -

Session 3:

cd data/pg_repslots
watch 'ls test | wc -l'

Session 1:

date
time psql -c "BEGIN" -c "
DO LANGUAGE plpgsql
\$\$
DECLARE
cur CURSOR FOR SELECT * FROM test FOR UPDATE;
rec record;
BEGIN
FOR rec IN cur LOOP
BEGIN
UPDATE test SET i = i + 1 WHERE CURRENT OF cur;
EXCEPTION
WHEN no_data_found THEN
RAISE NOTICE 'no data found exception';
END;
END LOOP;
END;
\$\$
" -c "ROLLBACK"

date
mer. 11 déc. 2024 08:59:03 CET
BEGIN
DO
ROLLBACK

real 0m17,071s
user 0m0,003s
sys 0m0,000s
mer. 11 déc. 2024 08:59:21 CET

Session 3: Watch session

Count increases up to

Wed Dec 11 09:00:02 2024
1434930

Then decreases down to 1

Wed Dec 11 09:03:17 2024
1

Session 2:

Appears last (after spill files deleted)

BEGIN 12874409
COMMIT 12874409

Conclusion:

- The exception block is responsible for generating subtransactions
- Although the transaction lasted 17s, one can see that the decoding was a bit late (40 seconds), but
- spent an extra 200s to delete the spill files !

On Wed, 6 Nov 2024 at 13:07, RECHTÉ Marc <marc.rechte@meteo.fr> wrote:

Hello,

For some unknown reason (probably a very big transaction at the source), we experienced a logical decoding breakdown,
due to a timeout from the subscriber side (either wal_receiver_timeout or connexion drop by network equipment due to inactivity).

The problem is, that due to that failure, the wal_receiver process stops. When the wal_sender is ready to send some data, it finds the connexion broken and exits.
A new wal_sender process is created that restarts from the beginning (restart LSN). This is an endless loop.

Checking the network connexion between wal_sender and wal_receiver, we found that no traffic occurs for hours.

We first increased wal_receiver_timeout up to 12h and still got a disconnection on the receiver party:

2024-10-17 16:31:58.645 GMT [1356203:2] user=,db=,app=,client= ERROR: terminating logical replication worker due to timeout
2024-10-17 16:31:58.648 GMT [849296:212] user=,db=,app=,client= LOG: background worker "logical replication worker" (PID 1356203) exited with exit code 1

Then put this parameter to 0, but got then disconnected by the network (note the slight difference in message):

2024-10-21 11:45:42.867 GMT [1697787:2] user=,db=,app=,client= ERROR: could not receive data from WAL stream: could not receive data from server: Connection timed out
2024-10-21 11:45:42.869 GMT [849296:40860] user=,db=,app=,client= LOG: background worker "logical replication worker" (PID 1697787) exited with exit code 1

The message is generated in libpqrcv_receive function (replication/libpqwalreceiver/libpqwalreceiver.c) which calls pqsecure_raw_read (interfaces/libpq/fe-secure.c)

The last message "Connection timed out" is the errno translation from the recv system function:

ETIMEDOUT Connection timed out (POSIX.1-2001)

When those timeout occurred, the sender was still busy deleting files from data/pg_replslot/bdcpb21_sene, accumulating more than 6 millions small ".spill" files.
It seems this very long pause is at cleanup stage were PG is blindly trying to delete those files.

strace on wal sender show tons of calls like:

unlink("pg_replslot/bdcpb21_sene/xid-2 721 821 917-lsn-439C-0.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-1000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-2000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-3000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-4000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-5000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)

This occurs in ReorderBufferRestoreCleanup (backend/replication/logical/reorderbuffer.c).
The call stack presumes this may probably occur in DecodeCommit or DecodeAbort (backend/replication/logical/decode.c):

unlink("pg_replslot/bdcpb21_sene/xid-2730444214-lsn-43A6-88000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)

/usr/lib64/libc-2.17.so(unlink+0x7) [0xf12e7]
/usr/pgsql-15/bin/postgres(ReorderBufferRestoreCleanup.isra.17+0x5d) [0x769e3d]
/usr/pgsql-15/bin/postgres(ReorderBufferCleanupTXN+0x166) [0x76aec6] <=== replication/logical/reorderbuff.c:1480 (mais cette fonction (static) n'est utiliée qu'au sein de ce module ...)
/usr/pgsql-15/bin/postgres(xact_decode+0x1e7) [0x75f217] <=== replication/logical/decode.c:175
/usr/pgsql-15/bin/postgres(LogicalDecodingProcessRecord+0x73) [0x75eee3] <=== replication/logical/decode.c:90, appelle la fonction rmgr.rm_decode(ctx, &buf) = 1 des 6 méthodes du resource manager
/usr/pgsql-15/bin/postgres(XLogSendLogical+0x4e) [0x78294e]
/usr/pgsql-15/bin/postgres(WalSndLoop+0x151) [0x785121]
/usr/pgsql-15/bin/postgres(exec_replication_command+0xcba) [0x785f4a]
/usr/pgsql-15/bin/postgres(PostgresMain+0xfa8) [0x7d0588]
/usr/pgsql-15/bin/postgres(ServerLoop+0xa8a) [0x493b97]
/usr/pgsql-15/bin/postgres(PostmasterMain+0xe6c) [0x74d66c]
/usr/pgsql-15/bin/postgres(main+0x1c5) [0x494a05]
/usr/lib64/libc-2.17.so(__libc_start_main+0xf4) [0x22554]
/usr/pgsql-15/bin/postgres(_start+0x28) [0x494fb8]

We did not find any other option than deleting the subscription to stop that loop and start a new one (thus loosing transactions).

The publisher is PostgreSQL 15.6
The subscriber is PostgreSQL 14.5

Thanks

Hi,

Do you have a reproducible test case for the above scenario? Please
share the same.
I am also trying to reproduce the above issue by generating large no.
of spill files.

Thanks and Regards,
Shlok Kyal

#5Shlok Kyal
shlok.kyal.oss@gmail.com
In reply to: RECHTÉ Marc (#4)
1 attachment(s)
Re: Logical replication timeout

On Wed, 11 Dec 2024 at 14:29, RECHTÉ Marc <marc.rechte@meteo.fr> wrote:

This how to reproduce the problem.

Session 1:

psql -c "CREATE TABLE test (i int)" -c "INSERT INTO test SELECT generate_series(1, 2_000_000)"

Session 2:

pg_recvlogical -d postgres --slot=test --create-slot
pg_recvlogical -d postgres --slot=test --start -f -

Session 3:

cd data/pg_repslots
watch 'ls test | wc -l'

Session 1:

date
time psql -c "BEGIN" -c "
DO LANGUAGE plpgsql
\$\$
DECLARE
cur CURSOR FOR SELECT * FROM test FOR UPDATE;
rec record;
BEGIN
FOR rec IN cur LOOP
BEGIN
UPDATE test SET i = i + 1 WHERE CURRENT OF cur;
EXCEPTION
WHEN no_data_found THEN
RAISE NOTICE 'no data found exception';
END;
END LOOP;
END;
\$\$
" -c "ROLLBACK"

date
mer. 11 déc. 2024 08:59:03 CET
BEGIN
DO
ROLLBACK

real 0m17,071s
user 0m0,003s
sys 0m0,000s
mer. 11 déc. 2024 08:59:21 CET

Session 3: Watch session

Count increases up to

Wed Dec 11 09:00:02 2024
1434930

Then decreases down to 1

Wed Dec 11 09:03:17 2024
1

Session 2:

Appears last (after spill files deleted)

BEGIN 12874409
COMMIT 12874409

Conclusion:

- The exception block is responsible for generating subtransactions
- Although the transaction lasted 17s, one can see that the decoding was a bit late (40 seconds), but
- spent an extra 200s to delete the spill files !

Hi,

Thanks for sharing the test case.
Unfortunately I donot have a powerful machine which would generate
such large number of spill files. But I created a patch as per your
suggestion in point(2) in thread [1]/messages/by-id/1430556325.185731745.1731484846410.JavaMail.zimbra@meteo.fr. Can you test with this patch on
your machine?

With this patch instead of calling unlink for every wal segment, we
are first reading the directory and filtering the files related to our
transaction and then unlinking those files.
You can apply the patch on your publisher source code and check. I
have created this patch on top of Postgres 15.6.

[1]: /messages/by-id/1430556325.185731745.1731484846410.JavaMail.zimbra@meteo.fr

Thanks and Regards,
Shlok Kyal

Attachments:

scan_dir.patchapplication/x-patch; name=scan_dir.patchDownload
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 3a68a393d2..2f5077c0b6 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -4409,27 +4409,38 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 static void
 ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
 {
-	XLogSegNo	first;
-	XLogSegNo	cur;
-	XLogSegNo	last;
+	DIR		   *spill_dir;
+	struct dirent *spill_de;
+	struct stat statbuf;
+	char		path[MAXPGPATH * 2 + sizeof(PG_REPLSLOT_DIR)];
+	char		file_filter[MAXPGPATH];
 
-	Assert(txn->first_lsn != InvalidXLogRecPtr);
-	Assert(txn->final_lsn != InvalidXLogRecPtr);
+	sprintf(path, "%s/%s", PG_REPLSLOT_DIR,
+			NameStr(MyReplicationSlot->data.name));
 
-	XLByteToSeg(txn->first_lsn, first, wal_segment_size);
-	XLByteToSeg(txn->final_lsn, last, wal_segment_size);
+	/* we're only handling directories here, skip if it's not ours */
+	if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
+		return;
+
+	snprintf(file_filter, MAXPGPATH, "xid-%u-", txn->xid);
 
-	/* iterate over all possible filenames, and delete them */
-	for (cur = first; cur <= last; cur++)
+	spill_dir = AllocateDir(path);
+	while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
 	{
-		char		path[MAXPGPATH];
+		/* only look at names that can be ours */
+		if (strstr(spill_de->d_name, file_filter))
+		{
+			snprintf(path, sizeof(path),
+					 "%s/%s/%s", PG_REPLSLOT_DIR,
+					 NameStr(MyReplicationSlot->data.name), spill_de->d_name);
 
-		ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, cur);
-		if (unlink(path) != 0 && errno != ENOENT)
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not remove file \"%s\": %m", path)));
+			if (unlink(path) != 0)
+				ereport(ERROR,
+						(errcode_for_file_access(),
+						 errmsg("could not remove file \"%s\": %m", path)));
+		}
 	}
+	FreeDir(spill_dir);
 }
 
 /*
#6RECHTÉ Marc
marc.rechte@meteo.fr
In reply to: Shlok Kyal (#5)
1 attachment(s)
Re: Logical replication timeout

Hi,

Thanks for sharing the test case.
Unfortunately I donot have a powerful machine which would generate
such large number of spill files. But I created a patch as per your
suggestion in point(2) in thread [1]/messages/by-id/1430556325.185731745.1731484846410.JavaMail.zimbra@meteo.fr. Can you test with this patch on
your machine?

With this patch instead of calling unlink for every wal segment, we
are first reading the directory and filtering the files related to our
transaction and then unlinking those files.
You can apply the patch on your publisher source code and check. I
have created this patch on top of Postgres 15.6.

[1]: /messages/by-id/1430556325.185731745.1731484846410.JavaMail.zimbra@meteo.fr

Thanks and Regards,
Shlok Kyal

Thanks for the parch.

I tried it, but it does not compile.

Attached another version that I tested on PostgreSQL 17.2.

This is much worse: it deletes only 3 files / s !

The problem here, is that for a given xid, there is just one spill file to delete.
ReorderBufferRestoreCleanup is called over a million times, so for each call,
it has to open the directory and filter the one file to delete.

By the way, you don't need a powerful machine to test, as spill files are very small.

Marc

Attachments:

scan_dir2.patchtext/x-patch; name=scan_dir2.patchDownload
--- a/src/backend/replication/logical/reorderbuffer.c	2024-12-12 10:21:12.962202250 +0100
+++ b/src/backend/replication/logical/reorderbuffer.c	2024-12-12 11:40:24.217634527 +0100
@@ -4567,27 +4567,35 @@
 static void
 ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
 {
-	XLogSegNo	first;
-	XLogSegNo	cur;
-	XLogSegNo	last;
+	DIR *spill_dir;
+	struct dirent *spill_de;
+	char path[MAXPGPATH];
+	char spill_path[MAXPGPATH];
+	char file_filter[MAXPGPATH];
+	int len_filter;
+ 
+	snprintf(spill_path, MAXPGPATH, "pg_replslot/%s", 
+			NameStr(MyReplicationSlot->data.name));
 
-	Assert(txn->first_lsn != InvalidXLogRecPtr);
-	Assert(txn->final_lsn != InvalidXLogRecPtr);
-
-	XLByteToSeg(txn->first_lsn, first, wal_segment_size);
-	XLByteToSeg(txn->final_lsn, last, wal_segment_size);
-
-	/* iterate over all possible filenames, and delete them */
-	for (cur = first; cur <= last; cur++)
-	{
-		char		path[MAXPGPATH];
-
-		ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, cur);
-		if (unlink(path) != 0 && errno != ENOENT)
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not remove file \"%s\": %m", path)));
-	}
+ 	snprintf(file_filter, sizeof(file_filter), "xid-%u-", txn->xid);
+	len_filter = strlen(file_filter);
+ 
+	spill_dir = AllocateDir(spill_path);
+	while ((spill_de = ReadDirExtended(spill_dir, spill_path, INFO)) != NULL)
+ 	{
+		/* only look at names that can be ours */
+		if (strncmp(spill_de->d_name, file_filter, len_filter) == 0)
+		{
+			snprintf(path, sizeof(path), "%s/%s", spill_path,
+					spill_de->d_name);
+ 
+			if (unlink(path) != 0)
+				ereport(ERROR,
+						(errcode_for_file_access(),
+						 errmsg("could not remove file \"%s\": %m", path)));
+		}
+ 	}
+	FreeDir(spill_dir);
 }
 
 /*
#7Shlok Kyal
shlok.kyal.oss@gmail.com
In reply to: RECHTÉ Marc (#6)
1 attachment(s)
Re: Logical replication timeout

On Thu, 12 Dec 2024 at 19:20, RECHTÉ Marc <marc.rechte@meteo.fr> wrote:

Hi,

Thanks for sharing the test case.
Unfortunately I donot have a powerful machine which would generate
such large number of spill files. But I created a patch as per your
suggestion in point(2) in thread [1]. Can you test with this patch on
your machine?

With this patch instead of calling unlink for every wal segment, we
are first reading the directory and filtering the files related to our
transaction and then unlinking those files.
You can apply the patch on your publisher source code and check. I
have created this patch on top of Postgres 15.6.

[1]: /messages/by-id/1430556325.185731745.1731484846410.JavaMail.zimbra@meteo.fr

Thanks and Regards,
Shlok Kyal

Thanks for the parch.

I tried it, but it does not compile.

Attached another version that I tested on PostgreSQL 17.2.

This is much worse: it deletes only 3 files / s !

The problem here, is that for a given xid, there is just one spill file to delete.
ReorderBufferRestoreCleanup is called over a million times, so for each call,
it has to open the directory and filter the one file to delete.

By the way, you don't need a powerful machine to test, as spill files are very small.

Thanks for sharing the analysis.

I tested the patch on my machine as well and it has worse performance
for me as well.
I came up with an alternate approach. In this approach we keep track
of wal segment the transaction is part of. This helps to iterate
through only required files during clean up.

On my machine, I am running the testcase provided by you in [1]. It is
generating ~1.9 million spill files. For me the transaction completed
in 56sec.
Cleanup (deletion of spill files) took around following time:
With HEAD : ~ 5min
With latest patch (attached here) : ~2min

Can you test if this improves performance for you?

The patch applies on HEAD.

Thanks and Regards,
Shlok Kyal

Attachments:

track_wal_segments.patchapplication/octet-stream; name=track_wal_segments.patchDownload
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index e3a5c7b660..61943e72d8 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -149,7 +149,7 @@ typedef struct ReorderBufferIterTXNEntry
 	ReorderBufferChange *change;
 	ReorderBufferTXN *txn;
 	TXNEntryFile file;
-	XLogSegNo	segno;
+	int	restore_from;
 } ReorderBufferIterTXNEntry;
 
 typedef struct ReorderBufferIterTXNState
@@ -254,7 +254,7 @@ static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
 static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 										 int fd, ReorderBufferChange *change);
 static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
-										TXNEntryFile *file, XLogSegNo *segno);
+										TXNEntryFile *file, int *restore_from);
 static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 									   char *data);
 static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
@@ -432,6 +432,7 @@ ReorderBufferGetTXN(ReorderBuffer *rb)
 	/* InvalidCommandId is not zero, so set it explicitly */
 	txn->command_id = InvalidCommandId;
 	txn->output_plugin_private = NULL;
+	txn->walsgmts = NIL;
 
 	return txn;
 }
@@ -1305,7 +1306,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	for (off = 0; off < state->nr_txns; off++)
 	{
 		state->entries[off].file.vfd = -1;
-		state->entries[off].segno = 0;
+		state->entries[off].restore_from = 0;
 	}
 
 	/* allocate heap */
@@ -1333,7 +1334,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
 			/* serialize remaining changes */
 			ReorderBufferSerializeTXN(rb, txn);
 			ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
-										&state->entries[off].segno);
+										&state->entries[off].restore_from);
 		}
 
 		cur_change = dlist_head_element(ReorderBufferChange, node,
@@ -1363,7 +1364,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
 				ReorderBufferSerializeTXN(rb, cur_txn);
 				ReorderBufferRestoreChanges(rb, cur_txn,
 											&state->entries[off].file,
-											&state->entries[off].segno);
+											&state->entries[off].restore_from);
 			}
 			cur_change = dlist_head_element(ReorderBufferChange, node,
 											&cur_txn->changes);
@@ -1448,7 +1449,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 		 */
 		rb->totalBytes += entry->txn->size;
 		if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
-										&state->entries[off].segno))
+										&state->entries[off].restore_from))
 		{
 			/* successfully restored changes from disk */
 			ReorderBufferChange *next_change =
@@ -3715,6 +3716,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	XLogSegNo	curOpenSegNo = 0;
 	Size		spilled = 0;
 	Size		size = txn->size;
+	MemoryContext oldcontext;
 
 	elog(DEBUG2, "spill %u changes in XID %u to disk",
 		 (uint32) txn->nentries_mem, txn->xid);
@@ -3758,7 +3760,17 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 
 			/* open segment, create it if necessary */
 			fd = OpenTransientFile(path,
-								   O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
+								   O_CREAT | O_EXCL | O_WRONLY | O_APPEND | PG_BINARY);
+
+			if (fd < 0)
+				fd = OpenTransientFile(path,
+									   O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
+			else
+			{
+				oldcontext = MemoryContextSwitchTo(rb->context);
+				txn->walsgmts = lappend(txn->walsgmts, curOpenSegNo);
+				MemoryContextSwitchTo(oldcontext);
+			}
 
 			if (fd < 0)
 				ereport(ERROR,
@@ -4255,16 +4267,12 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
  */
 static Size
 ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
-							TXNEntryFile *file, XLogSegNo *segno)
+							TXNEntryFile *file,	int *restore_from)
 {
 	Size		restored = 0;
-	XLogSegNo	last_segno;
 	dlist_mutable_iter cleanup_iter;
 	File	   *fd = &file->vfd;
 
-	Assert(txn->first_lsn != InvalidXLogRecPtr);
-	Assert(txn->final_lsn != InvalidXLogRecPtr);
-
 	/* free current entries, so we have memory for more */
 	dlist_foreach_modify(cleanup_iter, &txn->changes)
 	{
@@ -4277,9 +4285,8 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	txn->nentries_mem = 0;
 	Assert(dlist_is_empty(&txn->changes));
 
-	XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
-
-	while (restored < max_changes_in_memory && *segno <= last_segno)
+	while (restored < max_changes_in_memory &&
+		   (*restore_from) < txn->walsgmts->length)
 	{
 		int			readBytes;
 		ReorderBufferDiskChange *ondisk;
@@ -4289,19 +4296,21 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		if (*fd == -1)
 		{
 			char		path[MAXPGPATH];
+			ListCell *lc;
+			XLogSegNo segno;
 
-			/* first time in */
-			if (*segno == 0)
-				XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
+			/* next wal segment for the transaction */
+			lc = list_nth_cell(txn->walsgmts, *restore_from);
+			segno = lfirst(lc);
 
-			Assert(*segno != 0 || dlist_is_empty(&txn->changes));
+			Assert(segno != 0 || dlist_is_empty(&txn->changes));
 
 			/*
 			 * No need to care about TLIs here, only used during a single run,
 			 * so each LSN only maps to a specific WAL record.
 			 */
 			ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
-										*segno);
+										segno);
 
 			*fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
 
@@ -4311,7 +4320,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
 			if (*fd < 0 && errno == ENOENT)
 			{
 				*fd = -1;
-				(*segno)++;
+				(*restore_from)++;
 				continue;
 			}
 			else if (*fd < 0)
@@ -4336,7 +4345,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		{
 			FileClose(*fd);
 			*fd = -1;
-			(*segno)++;
+			(*restore_from)++;
 			continue;
 		}
 		else if (readBytes < 0)
@@ -4567,27 +4576,26 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 static void
 ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
 {
-	XLogSegNo	first;
-	XLogSegNo	cur;
-	XLogSegNo	last;
-
-	Assert(txn->first_lsn != InvalidXLogRecPtr);
-	Assert(txn->final_lsn != InvalidXLogRecPtr);
-
-	XLByteToSeg(txn->first_lsn, first, wal_segment_size);
-	XLByteToSeg(txn->final_lsn, last, wal_segment_size);
+	ListCell *cell;
 
 	/* iterate over all possible filenames, and delete them */
-	for (cur = first; cur <= last; cur++)
+	foreach(cell, txn->walsgmts)
 	{
+		XLogSegNo curr_segno = (XLogSegNo) lfirst(cell);
 		char		path[MAXPGPATH];
 
-		ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, cur);
+		ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, curr_segno);
 		if (unlink(path) != 0 && errno != ENOENT)
 			ereport(ERROR,
 					(errcode_for_file_access(),
 					 errmsg("could not remove file \"%s\": %m", path)));
 	}
+
+	if(txn->walsgmts != NIL)
+	{
+		pfree(txn->walsgmts);
+		txn->walsgmts = NIL;
+	}
 }
 
 /*
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 3bc365a7b0..c23fa877dc 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -426,6 +426,11 @@ typedef struct ReorderBufferTXN
 	 * Private data pointer of the output plugin.
 	 */
 	void	   *output_plugin_private;
+
+	/*
+	 * List of wal segments this txn is part of
+	 */
+	List	*walsgmts;
 } ReorderBufferTXN;
 
 /* so we can define the callbacks used inside struct ReorderBuffer itself */
#8Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: RECHTÉ Marc (#1)
RE: Logical replication timeout

Dear Marc,

For some unknown reason (probably a very big transaction at the source), we
experienced a logical decoding breakdown,

...

When those timeout occurred, the sender was still busy deleting files from
data/pg_replslot/bdcpb21_sene, accumulating more than 6 millions small
".spill" files. It seems this very long pause is at cleanup stage were PG is
blindly trying to delete those files.

Thanks for reporting the issue! We will discuss and provide fix if possible.
Apart from the code fix, I have some comments from another perspective.

The publisher is PostgreSQL 15.6
The subscriber is PostgreSQL 14.5

Can you enable the parameter "streaming" to on on your system [1]https://www.postgresql.org/docs/14/sql-createsubscription.html? It allows to
stream the in-progress transactions to the subscriber side. I feel this can avoid
the case that there are many .spill files on the publisher side.

Another approach is to tune the logical_decoding_work_mem parameter [2]https://www.postgresql.org/docs/14/runtime-config-resource.html#GUC-LOGICAL-DECODING-WORK-MEM.
This specifies the maximum amount of memory used by the logical decoding, and
some changes are spilled when it exceeds the limitation. Naively, this setting
can reduce the number of files.

I hope both settings can optimize your system.

[1]: https://www.postgresql.org/docs/14/sql-createsubscription.html
[2]: https://www.postgresql.org/docs/14/runtime-config-resource.html#GUC-LOGICAL-DECODING-WORK-MEM

Best regards,
Hayato Kuroda
FUJITSU LIMITED

#9RECHTÉ Marc
marc.rechte@meteo.fr
In reply to: Hayato Kuroda (Fujitsu) (#8)
Re: Logical replication timeout

Can you enable the parameter "streaming" to on on your system [1]? It allows to
stream the in-progress transactions to the subscriber side. I feel this can avoid
the case that there are many .spill files on the publisher side.

Another approach is to tune the logical_decoding_work_mem parameter [2].
This specifies the maximum amount of memory used by the logical decoding, and
some changes are spilled when it exceeds the limitation. Naively, this setting
can reduce the number of files.

[1]: https://www.postgresql.org/docs/14/sql-createsubscription.html
[2]: https://www.postgresql.org/docs/14/runtime-config-resource.html#GUC-LOGICAL-DECODING-WORK-MEM

Best regards,
Hayato Kuroda
FUJITSU LIMITED

Dear Hayato,

Thanks for your suggestions that were both already tested. In our (real) case (a single transaction with 12 millions sub-transactions):

1) setting the subscription as streaming, just delay a bit the spill file surge. It does not prevent the creation of spill files.

2) we set logical_decoding_work_mem to 20GB, which probably also delayed the problem, but did not solve it.

The real problem is spill file deletions that can take days in this particular case !

Marc

#10Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: RECHTÉ Marc (#9)
RE: Logical replication timeout

Dear Marc,

Thanks for the reply!

Thanks for your suggestions that were both already tested. In our (real) case (a
single transaction with 12 millions sub-transactions):

1) setting the subscription as streaming, just delay a bit the spill file surge. It does
not prevent the creation of spill files.

It is bit surprised for me because I don't know the path which transactions can
be serialized even in the streaming=on case. Let me think over it...

2) we set logical_decoding_work_mem to 20GB, which probably also delayed the
problem, but did not solve it.

Oh, I understood that you've already increased the parameter to the appropriate
value on your env. Is it right?

Best regards,
Hayato Kuroda
FUJITSU LIMITED

#11Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: Shlok Kyal (#7)
RE: Logical replication timeout

Dear Shlok,

Thanks for sharing the analysis.

I tested the patch on my machine as well and it has worse performance
for me as well.
I came up with an alternate approach. In this approach we keep track
of wal segment the transaction is part of. This helps to iterate
through only required files during clean up.

On my machine, I am running the testcase provided by you in [1]. It is
generating ~1.9 million spill files. For me the transaction completed
in 56sec.
Cleanup (deletion of spill files) took around following time:
With HEAD : ~ 5min
With latest patch (attached here) : ~2min

Can you test if this improves performance for you?

I'm also not sure the performance, but I can post my comments.
I'm not sure your patch can properly handle the list operations.

```
+                oldcontext = MemoryContextSwitchTo(rb->context);
+                txn->walsgmts = lappend(txn->walsgmts, curOpenSegNo);
+                MemoryContextSwitchTo(oldcontext);
+
```

IIUC lappend() accepts a point of a Datum, but here a normal value is passed.
Should we define a new struct which represents a node of list and append it
after it is palloc()'d?
Or your code is enough for some reasons?

```
     /* iterate over all possible filenames, and delete them */
-    for (cur = first; cur <= last; cur++)
+    foreach(cell, txn->walsgmts)
     {
+        XLogSegNo curr_segno = (XLogSegNo) lfirst(cell);
         char        path[MAXPGPATH];
-        ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, cur);
+        ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, curr_segno);
         if (unlink(path) != 0 && errno != ENOENT)
             ereport(ERROR,
                     (errcode_for_file_access(),
                      errmsg("could not remove file \"%s\": %m", path)));
     }
+
+    if(txn->walsgmts != NIL)
+    {
+        pfree(txn->walsgmts);
+        txn->walsgmts = NIL;
+    }
```

If above comment is accepted, I feel you can use foreach_delete_current().

=======

Also, even when we optimize the truncation of files, there is a possibility that
replication is timed out. Can you also create a patch which implements [1]/messages/by-id/CAExHW5s2_T9mULDQRKsdV72wpnA+NLT63cX51b51QQVEV4sG5g@mail.gmail.com?

[1]: /messages/by-id/CAExHW5s2_T9mULDQRKsdV72wpnA+NLT63cX51b51QQVEV4sG5g@mail.gmail.com

Best regards,
Hayato Kuroda
FUJITSU LIMITED

#12RECHTÉ Marc
marc.rechte@meteo.fr
In reply to: Shlok Kyal (#7)
Re: Logical replication timeout

I came up with an alternate approach. In this approach we keep track
of wal segment the transaction is part of. This helps to iterate
through only required files during clean up.

On my machine, I am running the testcase provided by you in [1]. It is
generating ~1.9 million spill files. For me the transaction completed
in 56sec.
Cleanup (deletion of spill files) took around following time:
With HEAD : ~ 5min
With latest patch (attached here) : ~2min

Can you test if this improves performance for you?

The patch applies on HEAD.

Thanks again for this new patch.

Unfortunately it does not compile (17.2 source):

reorderbuffer.c: In function 'ReorderBufferSerializeTXN':
reorderbuffer.c:3771:72: error: passing argument 2 of 'lappend' makes pointer from integer without a cast [-Wint-conversion]
3771 | txn->walsgmts = lappend(txn->walsgmts, curOpenSegNo);
| ^~~~~~~~~~~~
| |
| XLogSegNo {aka long unsigned int}

and

reorderbuffer.c: In function 'ReorderBufferRestoreChanges':
reorderbuffer.c:4304:31: error: assignment to 'XLogSegNo' {aka 'long unsigned int'} from 'void *' makes integer from pointer without a cast [-Wint-conversion]
4304 | segno = lfirst(lc);
| ^

#13Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: RECHTÉ Marc (#12)
1 attachment(s)
RE: Logical replication timeout

Dear Marc,

Thanks again for this new patch.

Unfortunately it does not compile (17.2 source):

Right, because of the reason I posted [1]/messages/by-id/OSCPR01MB14966B646506E0C9B81B3A4CFF5022@OSCPR01MB14966.jpnprd01.prod.outlook.com.

I updated the patch which did the same approach. It could pass my CI.
Could you please apply on 17.2 and test it?

[1]: /messages/by-id/OSCPR01MB14966B646506E0C9B81B3A4CFF5022@OSCPR01MB14966.jpnprd01.prod.outlook.com

Best regards,
Hayato Kuroda
FUJITSU LIMITED

Attachments:

0001-WIP-track-wal-segments.patchapplication/octet-stream; name=0001-WIP-track-wal-segments.patchDownload
From 99ffc05998a157c2632711aaa8b5d7c4e22ebd70 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Wed, 25 Dec 2024 08:02:33 +0000
Subject: [PATCH] WIP: track wal segments

---
 .../replication/logical/reorderbuffer.c       | 84 +++++++++++--------
 src/include/replication/reorderbuffer.h       |  7 ++
 2 files changed, 58 insertions(+), 33 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 9c742e96eb..b2bb4423b8 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -149,7 +149,7 @@ typedef struct ReorderBufferIterTXNEntry
 	ReorderBufferChange *change;
 	ReorderBufferTXN *txn;
 	TXNEntryFile file;
-	XLogSegNo	segno;
+	int	restore_from;
 } ReorderBufferIterTXNEntry;
 
 typedef struct ReorderBufferIterTXNState
@@ -215,6 +215,11 @@ static const Size max_changes_in_memory = 4096; /* XXX for restore only */
 /* GUC variable */
 int			debug_logical_replication_streaming = DEBUG_LOGICAL_REP_STREAMING_BUFFERED;
 
+typedef struct WalSgmtsEntry
+{
+	XLogSegNo segno;
+} WalSgmtsEntry;
+
 /* ---------------------------------------
  * primary reorderbuffer support routines
  * ---------------------------------------
@@ -254,7 +259,7 @@ static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
 static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 										 int fd, ReorderBufferChange *change);
 static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
-										TXNEntryFile *file, XLogSegNo *segno);
+										TXNEntryFile *file, int *restore_from);
 static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 									   char *data);
 static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
@@ -432,6 +437,7 @@ ReorderBufferGetTXN(ReorderBuffer *rb)
 	/* InvalidCommandId is not zero, so set it explicitly */
 	txn->command_id = InvalidCommandId;
 	txn->output_plugin_private = NULL;
+	txn->walsgmts = NIL;
 
 	return txn;
 }
@@ -1305,7 +1311,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	for (off = 0; off < state->nr_txns; off++)
 	{
 		state->entries[off].file.vfd = -1;
-		state->entries[off].segno = 0;
+		state->entries[off].restore_from = 0;
 	}
 
 	/* allocate heap */
@@ -1333,7 +1339,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
 			/* serialize remaining changes */
 			ReorderBufferSerializeTXN(rb, txn);
 			ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
-										&state->entries[off].segno);
+										&state->entries[off].restore_from);
 		}
 
 		cur_change = dlist_head_element(ReorderBufferChange, node,
@@ -1363,7 +1369,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
 				ReorderBufferSerializeTXN(rb, cur_txn);
 				ReorderBufferRestoreChanges(rb, cur_txn,
 											&state->entries[off].file,
-											&state->entries[off].segno);
+											&state->entries[off].restore_from);
 			}
 			cur_change = dlist_head_element(ReorderBufferChange, node,
 											&cur_txn->changes);
@@ -1448,7 +1454,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 		 */
 		rb->totalBytes += entry->txn->size;
 		if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
-										&state->entries[off].segno))
+										&state->entries[off].restore_from))
 		{
 			/* successfully restored changes from disk */
 			ReorderBufferChange *next_change =
@@ -3715,6 +3721,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	XLogSegNo	curOpenSegNo = 0;
 	Size		spilled = 0;
 	Size		size = txn->size;
+	MemoryContext oldcontext;
 
 	elog(DEBUG2, "spill %u changes in XID %u to disk",
 		 (uint32) txn->nentries_mem, txn->xid);
@@ -3758,7 +3765,23 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 
 			/* open segment, create it if necessary */
 			fd = OpenTransientFile(path,
-								   O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
+								   O_CREAT | O_EXCL | O_WRONLY | O_APPEND | PG_BINARY);
+
+			if (fd < 0)
+				fd = OpenTransientFile(path,
+									   O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
+			else
+			{
+				WalSgmtsEntry *entry;
+
+				oldcontext = MemoryContextSwitchTo(rb->context);
+
+				entry = palloc(sizeof(WalSgmtsEntry));
+				entry->segno = curOpenSegNo;
+
+				txn->walsgmts = lappend(txn->walsgmts, entry);
+				MemoryContextSwitchTo(oldcontext);
+			}
 
 			if (fd < 0)
 				ereport(ERROR,
@@ -4255,16 +4278,12 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
  */
 static Size
 ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
-							TXNEntryFile *file, XLogSegNo *segno)
+							TXNEntryFile *file,	int *restore_from)
 {
 	Size		restored = 0;
-	XLogSegNo	last_segno;
 	dlist_mutable_iter cleanup_iter;
 	File	   *fd = &file->vfd;
 
-	Assert(txn->first_lsn != InvalidXLogRecPtr);
-	Assert(txn->final_lsn != InvalidXLogRecPtr);
-
 	/* free current entries, so we have memory for more */
 	dlist_foreach_modify(cleanup_iter, &txn->changes)
 	{
@@ -4277,9 +4296,8 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	txn->nentries_mem = 0;
 	Assert(dlist_is_empty(&txn->changes));
 
-	XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
-
-	while (restored < max_changes_in_memory && *segno <= last_segno)
+	while (restored < max_changes_in_memory &&
+		   (*restore_from) < txn->walsgmts->length)
 	{
 		int			readBytes;
 		ReorderBufferDiskChange *ondisk;
@@ -4289,19 +4307,23 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		if (*fd == -1)
 		{
 			char		path[MAXPGPATH];
+			ListCell *lc;
+			WalSgmtsEntry *entry;
+			XLogSegNo segno;
 
-			/* first time in */
-			if (*segno == 0)
-				XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
+			/* Next wal segment for the transaction */
+			lc = list_nth_cell(txn->walsgmts, *restore_from);
+			entry = (WalSgmtsEntry *) lfirst(lc);
+			segno = entry->segno;
 
-			Assert(*segno != 0 || dlist_is_empty(&txn->changes));
+			Assert(segno != 0 || dlist_is_empty(&txn->changes));
 
 			/*
 			 * No need to care about TLIs here, only used during a single run,
 			 * so each LSN only maps to a specific WAL record.
 			 */
 			ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
-										*segno);
+										segno);
 
 			*fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
 
@@ -4311,7 +4333,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
 			if (*fd < 0 && errno == ENOENT)
 			{
 				*fd = -1;
-				(*segno)++;
+				(*restore_from)++;
 				continue;
 			}
 			else if (*fd < 0)
@@ -4336,7 +4358,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		{
 			FileClose(*fd);
 			*fd = -1;
-			(*segno)++;
+			(*restore_from)++;
 			continue;
 		}
 		else if (readBytes < 0)
@@ -4567,26 +4589,22 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 static void
 ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
 {
-	XLogSegNo	first;
-	XLogSegNo	cur;
-	XLogSegNo	last;
-
-	Assert(txn->first_lsn != InvalidXLogRecPtr);
-	Assert(txn->final_lsn != InvalidXLogRecPtr);
-
-	XLByteToSeg(txn->first_lsn, first, wal_segment_size);
-	XLByteToSeg(txn->final_lsn, last, wal_segment_size);
+	ListCell *cell;
 
 	/* iterate over all possible filenames, and delete them */
-	for (cur = first; cur <= last; cur++)
+	foreach(cell, txn->walsgmts)
 	{
+		WalSgmtsEntry *entry = (WalSgmtsEntry *)lfirst(cell);
+		XLogSegNo curr_segno = entry->segno;
 		char		path[MAXPGPATH];
 
-		ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, cur);
+		ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, curr_segno);
 		if (unlink(path) != 0 && errno != ENOENT)
 			ereport(ERROR,
 					(errcode_for_file_access(),
 					 errmsg("could not remove file \"%s\": %m", path)));
+
+		txn->walsgmts = foreach_delete_current(txn->walsgmts, cell);
 	}
 }
 
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 7de50462dc..b2ef2640aa 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -422,6 +422,13 @@ typedef struct ReorderBufferTXN
 	 * Private data pointer of the output plugin.
 	 */
 	void	   *output_plugin_private;
+
+	/*
+	 * List of wal segments this txn is part of.
+	 *
+	 * XXX: check whether the attribute doesn't break ABI 
+	 */
+	List	*walsgmts;
 } ReorderBufferTXN;
 
 /* so we can define the callbacks used inside struct ReorderBuffer itself */
-- 
2.43.0

#14vignesh C
vignesh21@gmail.com
In reply to: Hayato Kuroda (Fujitsu) (#13)
Re: Logical replication timeout

On Wed, 25 Dec 2024 at 13:55, Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:

Dear Marc,

Thanks again for this new patch.

Unfortunately it does not compile (17.2 source):

Right, because of the reason I posted [1].

I updated the patch which did the same approach. It could pass my CI.

Let's conduct some performance tests with varying numbers of spill
files (e.g., small ones like 1, 5, and 10, and larger ones like 100,
1000, and 10,000) along with different levels of concurrent
transactions. We can then compare the results with the current HEAD.

Regards,
Vignesh

#15RECHTÉ Marc
marc.rechte@meteo.fr
In reply to: Hayato Kuroda (Fujitsu) (#13)
Re: Logical replication timeout

Right, because of the reason I posted [1].

I updated the patch which did the same approach. It could pass my CI.
Could you please apply on 17.2 and test it?

[1]:
/messages/by-id/OSCPR01MB14966B646506E0C9B81B3A4CFF5022@OSCPR01MB14966.jpnprd01.prod.outlook.com

This is a considerable improvement, the cleanup phase took less than 30s (compared to the former 200s).

However, we are talking of a 12s transaction, that takes an overall 64s to be replicated.
In this particular case, the replication system spends most of its time creating / deleting small files.
Would not be possible to create just one spill file for the main transaction ?

#16Shlok Kyal
shlok.kyal.oss@gmail.com
In reply to: vignesh C (#14)
2 attachment(s)
Re: Logical replication timeout

On Fri, 27 Dec 2024 at 09:41, vignesh C <vignesh21@gmail.com> wrote:

On Wed, 25 Dec 2024 at 13:55, Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:

Dear Marc,

Thanks again for this new patch.

Unfortunately it does not compile (17.2 source):

Right, because of the reason I posted [1].

I updated the patch which did the same approach. It could pass my CI.

Let's conduct some performance tests with varying numbers of spill
files (e.g., small ones like 1, 5, and 10, and larger ones like 100,
1000, and 10,000) along with different levels of concurrent
transactions. We can then compare the results with the current HEAD.

Hi Vignesh,
I did the performance testing with the patch and compared the time
taken for Cleanup of spill files with HEAD and with Patch. Following
are the results:
(All timing are average of 3 runs)

No. of Spill Files | Head (nano sec) | Avg Patch (nano sec) | No. to
times Patch is faster than HEAD
--------------------------------------------------------------------------------------------------------------------------------------
10000 | 68057452 | 442923.3333
| 153.6551518
1000 | 737519.6667 | 46049.33333
| 16.0158598
500 | 199108 | 19401.66667
| 10.26241732
100 | 12566.66667 | 3921
| 3.20496472
10 | 576 | 466.3333333
| 1.235167977
1 | 60 | 56.33333333
| 1.065088757

I have made a patch to log the time for cleanup for the transaction. I
have attached the test perl script here as well.
To get the time to clean up the spill files, I run the perl script and
then check the publisher log for the time of cleanup of the very first
transaction.

Thanks and Regards,
Shlok Kyal

Attachments:

cleanup_logs.patchapplication/octet-stream; name=cleanup_logs.patchDownload
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 9313d2b01e..de4a6f7baa 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -88,6 +88,7 @@
 
 #include <unistd.h>
 #include <sys/stat.h>
+#include <sys/time.h>
 
 #include "access/detoast.h"
 #include "access/heapam.h"
@@ -1515,6 +1516,9 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	bool		found;
 	dlist_mutable_iter iter;
 	Size		mem_freed = 0;
+	ListCell *cell;
+	struct timeval stop, start;
+	gettimeofday(&start, NULL);
 
 	/* cleanup subtransactions & their changes */
 	dlist_foreach_modify(iter, &txn->subtxns)
@@ -1614,6 +1618,9 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	if (rbtxn_is_serialized(txn))
 		ReorderBufferRestoreCleanup(rb, txn);
 
+	gettimeofday(&stop, NULL);
+	elog(LOG, "Time to Cleanup txn: %d is : %lu", txn->xid, (stop.tv_sec - start.tv_sec) * 1000000 + stop.tv_usec - start.tv_usec);
+
 	/* deallocate */
 	ReorderBufferReturnTXN(rb, txn);
 }
101_test.plapplication/octet-stream; name=101_test.plDownload
#17RECHTÉ Marc
marc.rechte@meteo.fr
In reply to: Shlok Kyal (#16)
1 attachment(s)
Re: Logical replication timeout

Hayato Kuroda kindly rebased the patch.

Attachments:

v2-0001-WIP-track-wal-segments.patchapplication/mbox; name=v2-0001-WIP-track-wal-segments.patchDownload
From 1f78955508dd31887f681d802c44b77eae921a30 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Wed, 25 Dec 2024 08:02:33 +0000
Subject: [PATCH v2] WIP: track wal segments

---
 .../replication/logical/reorderbuffer.c       | 84 +++++++++++--------
 src/include/replication/reorderbuffer.h       |  7 ++
 2 files changed, 58 insertions(+), 33 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 5186ad2a39..5775b5022c 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -150,7 +150,7 @@ typedef struct ReorderBufferIterTXNEntry
 	ReorderBufferChange *change;
 	ReorderBufferTXN *txn;
 	TXNEntryFile file;
-	XLogSegNo	segno;
+	int	restore_from;
 } ReorderBufferIterTXNEntry;
 
 typedef struct ReorderBufferIterTXNState
@@ -216,6 +216,11 @@ static const Size max_changes_in_memory = 4096; /* XXX for restore only */
 /* GUC variable */
 int			debug_logical_replication_streaming = DEBUG_LOGICAL_REP_STREAMING_BUFFERED;
 
+typedef struct WalSgmtsEntry
+{
+	XLogSegNo segno;
+} WalSgmtsEntry;
+
 /* ---------------------------------------
  * primary reorderbuffer support routines
  * ---------------------------------------
@@ -255,7 +260,7 @@ static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
 static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 										 int fd, ReorderBufferChange *change);
 static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
-										TXNEntryFile *file, XLogSegNo *segno);
+										TXNEntryFile *file, int *restore_from);
 static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 									   char *data);
 static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
@@ -435,6 +440,7 @@ ReorderBufferGetTXN(ReorderBuffer *rb)
 	/* InvalidCommandId is not zero, so set it explicitly */
 	txn->command_id = InvalidCommandId;
 	txn->output_plugin_private = NULL;
+	txn->walsgmts = NIL;
 
 	return txn;
 }
@@ -1308,7 +1314,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	for (off = 0; off < state->nr_txns; off++)
 	{
 		state->entries[off].file.vfd = -1;
-		state->entries[off].segno = 0;
+		state->entries[off].restore_from = 0;
 	}
 
 	/* allocate heap */
@@ -1336,7 +1342,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
 			/* serialize remaining changes */
 			ReorderBufferSerializeTXN(rb, txn);
 			ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
-										&state->entries[off].segno);
+										&state->entries[off].restore_from);
 		}
 
 		cur_change = dlist_head_element(ReorderBufferChange, node,
@@ -1366,7 +1372,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
 				ReorderBufferSerializeTXN(rb, cur_txn);
 				ReorderBufferRestoreChanges(rb, cur_txn,
 											&state->entries[off].file,
-											&state->entries[off].segno);
+											&state->entries[off].restore_from);
 			}
 			cur_change = dlist_head_element(ReorderBufferChange, node,
 											&cur_txn->changes);
@@ -1451,7 +1457,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 		 */
 		rb->totalBytes += entry->txn->size;
 		if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
-										&state->entries[off].segno))
+										&state->entries[off].restore_from))
 		{
 			/* successfully restored changes from disk */
 			ReorderBufferChange *next_change =
@@ -3838,6 +3844,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	XLogSegNo	curOpenSegNo = 0;
 	Size		spilled = 0;
 	Size		size = txn->size;
+	MemoryContext oldcontext;
 
 	elog(DEBUG2, "spill %u changes in XID %u to disk",
 		 (uint32) txn->nentries_mem, txn->xid);
@@ -3881,7 +3888,23 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 
 			/* open segment, create it if necessary */
 			fd = OpenTransientFile(path,
-								   O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
+								   O_CREAT | O_EXCL | O_WRONLY | O_APPEND | PG_BINARY);
+
+			if (fd < 0)
+				fd = OpenTransientFile(path,
+									   O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
+			else
+			{
+				WalSgmtsEntry *entry;
+
+				oldcontext = MemoryContextSwitchTo(rb->context);
+
+				entry = palloc(sizeof(WalSgmtsEntry));
+				entry->segno = curOpenSegNo;
+
+				txn->walsgmts = lappend(txn->walsgmts, entry);
+				MemoryContextSwitchTo(oldcontext);
+			}
 
 			if (fd < 0)
 				ereport(ERROR,
@@ -4378,16 +4401,12 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
  */
 static Size
 ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
-							TXNEntryFile *file, XLogSegNo *segno)
+							TXNEntryFile *file,	int *restore_from)
 {
 	Size		restored = 0;
-	XLogSegNo	last_segno;
 	dlist_mutable_iter cleanup_iter;
 	File	   *fd = &file->vfd;
 
-	Assert(txn->first_lsn != InvalidXLogRecPtr);
-	Assert(txn->final_lsn != InvalidXLogRecPtr);
-
 	/* free current entries, so we have memory for more */
 	dlist_foreach_modify(cleanup_iter, &txn->changes)
 	{
@@ -4400,9 +4419,8 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	txn->nentries_mem = 0;
 	Assert(dlist_is_empty(&txn->changes));
 
-	XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
-
-	while (restored < max_changes_in_memory && *segno <= last_segno)
+	while (restored < max_changes_in_memory &&
+		   (*restore_from) < txn->walsgmts->length)
 	{
 		int			readBytes;
 		ReorderBufferDiskChange *ondisk;
@@ -4412,19 +4430,23 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		if (*fd == -1)
 		{
 			char		path[MAXPGPATH];
+			ListCell *lc;
+			WalSgmtsEntry *entry;
+			XLogSegNo segno;
 
-			/* first time in */
-			if (*segno == 0)
-				XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
+			/* Next wal segment for the transaction */
+			lc = list_nth_cell(txn->walsgmts, *restore_from);
+			entry = (WalSgmtsEntry *) lfirst(lc);
+			segno = entry->segno;
 
-			Assert(*segno != 0 || dlist_is_empty(&txn->changes));
+			Assert(segno != 0 || dlist_is_empty(&txn->changes));
 
 			/*
 			 * No need to care about TLIs here, only used during a single run,
 			 * so each LSN only maps to a specific WAL record.
 			 */
 			ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
-										*segno);
+										segno);
 
 			*fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
 
@@ -4434,7 +4456,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
 			if (*fd < 0 && errno == ENOENT)
 			{
 				*fd = -1;
-				(*segno)++;
+				(*restore_from)++;
 				continue;
 			}
 			else if (*fd < 0)
@@ -4459,7 +4481,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		{
 			FileClose(*fd);
 			*fd = -1;
-			(*segno)++;
+			(*restore_from)++;
 			continue;
 		}
 		else if (readBytes < 0)
@@ -4690,26 +4712,22 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 static void
 ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
 {
-	XLogSegNo	first;
-	XLogSegNo	cur;
-	XLogSegNo	last;
-
-	Assert(txn->first_lsn != InvalidXLogRecPtr);
-	Assert(txn->final_lsn != InvalidXLogRecPtr);
-
-	XLByteToSeg(txn->first_lsn, first, wal_segment_size);
-	XLByteToSeg(txn->final_lsn, last, wal_segment_size);
+	ListCell *cell;
 
 	/* iterate over all possible filenames, and delete them */
-	for (cur = first; cur <= last; cur++)
+	foreach(cell, txn->walsgmts)
 	{
+		WalSgmtsEntry *entry = (WalSgmtsEntry *)lfirst(cell);
+		XLogSegNo curr_segno = entry->segno;
 		char		path[MAXPGPATH];
 
-		ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, cur);
+		ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, curr_segno);
 		if (unlink(path) != 0 && errno != ENOENT)
 			ereport(ERROR,
 					(errcode_for_file_access(),
 					 errmsg("could not remove file \"%s\": %m", path)));
+
+		txn->walsgmts = foreach_delete_current(txn->walsgmts, cell);
 	}
 }
 
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 517a8e3634..fa72b040fb 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -452,6 +452,13 @@ typedef struct ReorderBufferTXN
 	 * Private data pointer of the output plugin.
 	 */
 	void	   *output_plugin_private;
+
+	/*
+	 * List of wal segments this txn is part of.
+	 *
+	 * XXX: check whether the attribute doesn't break ABI.
+	 */
+	List	*walsgmts;
 } ReorderBufferTXN;
 
 /* so we can define the callbacks used inside struct ReorderBuffer itself */
-- 
2.43.5