Logical replication timeout
Hello,
For some unknown reason (probably a very big transaction at the source), we experienced a logical decoding breakdown,
due to a timeout from the subscriber side (either wal_receiver_timeout or connexion drop by network equipment due to inactivity).
The problem is, that due to that failure, the wal_receiver process stops. When the wal_sender is ready to send some data, it finds the connexion broken and exits.
A new wal_sender process is created that restarts from the beginning (restart LSN). This is an endless loop.
Checking the network connexion between wal_sender and wal_receiver, we found that no traffic occurs for hours.
We first increased wal_receiver_timeout up to 12h and still got a disconnection on the receiver party:
2024-10-17 16:31:58.645 GMT [1356203:2] user=,db=,app=,client= ERROR: terminating logical replication worker due to timeout
2024-10-17 16:31:58.648 GMT [849296:212] user=,db=,app=,client= LOG: background worker "logical replication worker" (PID 1356203) exited with exit code 1
Then put this parameter to 0, but got then disconnected by the network (note the slight difference in message):
2024-10-21 11:45:42.867 GMT [1697787:2] user=,db=,app=,client= ERROR: could not receive data from WAL stream: could not receive data from server: Connection timed out
2024-10-21 11:45:42.869 GMT [849296:40860] user=,db=,app=,client= LOG: background worker "logical replication worker" (PID 1697787) exited with exit code 1
The message is generated in libpqrcv_receive function (replication/libpqwalreceiver/libpqwalreceiver.c) which calls pqsecure_raw_read (interfaces/libpq/fe-secure.c)
The last message "Connection timed out" is the errno translation from the recv system function:
ETIMEDOUT Connection timed out (POSIX.1-2001)
When those timeout occurred, the sender was still busy deleting files from data/pg_replslot/bdcpb21_sene, accumulating more than 6 millions small ".spill" files.
It seems this very long pause is at cleanup stage were PG is blindly trying to delete those files.
strace on wal sender show tons of calls like:
unlink("pg_replslot/bdcpb21_sene/xid-2 721 821 917-lsn-439C-0.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-1000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-2000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-3000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-4000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-5000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
This occurs in ReorderBufferRestoreCleanup (backend/replication/logical/reorderbuffer.c).
The call stack presumes this may probably occur in DecodeCommit or DecodeAbort (backend/replication/logical/decode.c):
unlink("pg_replslot/bdcpb21_sene/xid-2730444214-lsn-43A6-88000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
/usr/lib64/libc-2.17.so(unlink+0x7) [0xf12e7]
/usr/pgsql-15/bin/postgres(ReorderBufferRestoreCleanup.isra.17+0x5d) [0x769e3d]
/usr/pgsql-15/bin/postgres(ReorderBufferCleanupTXN+0x166) [0x76aec6] <=== replication/logical/reorderbuff.c:1480 (mais cette fonction (static) n'est utiliée qu'au sein de ce module ...)
/usr/pgsql-15/bin/postgres(xact_decode+0x1e7) [0x75f217] <=== replication/logical/decode.c:175
/usr/pgsql-15/bin/postgres(LogicalDecodingProcessRecord+0x73) [0x75eee3] <=== replication/logical/decode.c:90, appelle la fonction rmgr.rm_decode(ctx, &buf) = 1 des 6 méthodes du resource manager
/usr/pgsql-15/bin/postgres(XLogSendLogical+0x4e) [0x78294e]
/usr/pgsql-15/bin/postgres(WalSndLoop+0x151) [0x785121]
/usr/pgsql-15/bin/postgres(exec_replication_command+0xcba) [0x785f4a]
/usr/pgsql-15/bin/postgres(PostgresMain+0xfa8) [0x7d0588]
/usr/pgsql-15/bin/postgres(ServerLoop+0xa8a) [0x493b97]
/usr/pgsql-15/bin/postgres(PostmasterMain+0xe6c) [0x74d66c]
/usr/pgsql-15/bin/postgres(main+0x1c5) [0x494a05]
/usr/lib64/libc-2.17.so(__libc_start_main+0xf4) [0x22554]
/usr/pgsql-15/bin/postgres(_start+0x28) [0x494fb8]
We did not find any other option than deleting the subscription to stop that loop and start a new one (thus loosing transactions).
The publisher is PostgreSQL 15.6
The subscriber is PostgreSQL 14.5
Thanks
On Wed, Nov 6, 2024 at 1:07 PM RECHTÉ Marc <marc.rechte@meteo.fr> wrote:
Hello,
For some unknown reason (probably a very big transaction at the source), we experienced a logical decoding breakdown,
due to a timeout from the subscriber side (either wal_receiver_timeout or connexion drop by network equipment due to inactivity).The problem is, that due to that failure, the wal_receiver process stops. When the wal_sender is ready to send some data, it finds the connexion broken and exits.
A new wal_sender process is created that restarts from the beginning (restart LSN). This is an endless loop.Checking the network connexion between wal_sender and wal_receiver, we found that no traffic occurs for hours.
We first increased wal_receiver_timeout up to 12h and still got a disconnection on the receiver party:
2024-10-17 16:31:58.645 GMT [1356203:2] user=,db=,app=,client= ERROR: terminating logical replication worker due to timeout
2024-10-17 16:31:58.648 GMT [849296:212] user=,db=,app=,client= LOG: background worker "logical replication worker" (PID 1356203) exited with exit code 1Then put this parameter to 0, but got then disconnected by the network (note the slight difference in message):
2024-10-21 11:45:42.867 GMT [1697787:2] user=,db=,app=,client= ERROR: could not receive data from WAL stream: could not receive data from server: Connection timed out
2024-10-21 11:45:42.869 GMT [849296:40860] user=,db=,app=,client= LOG: background worker "logical replication worker" (PID 1697787) exited with exit code 1The message is generated in libpqrcv_receive function (replication/libpqwalreceiver/libpqwalreceiver.c) which calls pqsecure_raw_read (interfaces/libpq/fe-secure.c)
The last message "Connection timed out" is the errno translation from the recv system function:
ETIMEDOUT Connection timed out (POSIX.1-2001)
When those timeout occurred, the sender was still busy deleting files from data/pg_replslot/bdcpb21_sene, accumulating more than 6 millions small ".spill" files.
It seems this very long pause is at cleanup stage were PG is blindly trying to delete those files.strace on wal sender show tons of calls like:
unlink("pg_replslot/bdcpb21_sene/xid-2 721 821 917-lsn-439C-0.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-1000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-2000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-3000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-4000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-5000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)This occurs in ReorderBufferRestoreCleanup (backend/replication/logical/reorderbuffer.c).
The call stack presumes this may probably occur in DecodeCommit or DecodeAbort (backend/replication/logical/decode.c):unlink("pg_replslot/bdcpb21_sene/xid-2730444214-lsn-43A6-88000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
/usr/lib64/libc-2.17.so(unlink+0x7) [0xf12e7]
/usr/pgsql-15/bin/postgres(ReorderBufferRestoreCleanup.isra.17+0x5d) [0x769e3d]
/usr/pgsql-15/bin/postgres(ReorderBufferCleanupTXN+0x166) [0x76aec6] <=== replication/logical/reorderbuff.c:1480 (mais cette fonction (static) n'est utiliée qu'au sein de ce module ...)
/usr/pgsql-15/bin/postgres(xact_decode+0x1e7) [0x75f217] <=== replication/logical/decode.c:175
/usr/pgsql-15/bin/postgres(LogicalDecodingProcessRecord+0x73) [0x75eee3] <=== replication/logical/decode.c:90, appelle la fonction rmgr.rm_decode(ctx, &buf) = 1 des 6 méthodes du resource manager
/usr/pgsql-15/bin/postgres(XLogSendLogical+0x4e) [0x78294e]
/usr/pgsql-15/bin/postgres(WalSndLoop+0x151) [0x785121]
/usr/pgsql-15/bin/postgres(exec_replication_command+0xcba) [0x785f4a]
/usr/pgsql-15/bin/postgres(PostgresMain+0xfa8) [0x7d0588]
/usr/pgsql-15/bin/postgres(ServerLoop+0xa8a) [0x493b97]
/usr/pgsql-15/bin/postgres(PostmasterMain+0xe6c) [0x74d66c]
/usr/pgsql-15/bin/postgres(main+0x1c5) [0x494a05]
/usr/lib64/libc-2.17.so(__libc_start_main+0xf4) [0x22554]
/usr/pgsql-15/bin/postgres(_start+0x28) [0x494fb8]
I think, we need a call to rb->update_progress_txn(rb, txn,
change->lsn) at regular intervals in ReorderBufferRestoreCleanup()
similar to ReorderBufferProcessTXN(). And may be at more places where
we have potentially long running loops.
--
Best Wishes,
Ashutosh Bapat
On Wed, 6 Nov 2024 at 13:07, RECHTÉ Marc <marc.rechte@meteo.fr> wrote:
Hello,
For some unknown reason (probably a very big transaction at the source), we experienced a logical decoding breakdown,
due to a timeout from the subscriber side (either wal_receiver_timeout or connexion drop by network equipment due to inactivity).The problem is, that due to that failure, the wal_receiver process stops. When the wal_sender is ready to send some data, it finds the connexion broken and exits.
A new wal_sender process is created that restarts from the beginning (restart LSN). This is an endless loop.Checking the network connexion between wal_sender and wal_receiver, we found that no traffic occurs for hours.
We first increased wal_receiver_timeout up to 12h and still got a disconnection on the receiver party:
2024-10-17 16:31:58.645 GMT [1356203:2] user=,db=,app=,client= ERROR: terminating logical replication worker due to timeout
2024-10-17 16:31:58.648 GMT [849296:212] user=,db=,app=,client= LOG: background worker "logical replication worker" (PID 1356203) exited with exit code 1Then put this parameter to 0, but got then disconnected by the network (note the slight difference in message):
2024-10-21 11:45:42.867 GMT [1697787:2] user=,db=,app=,client= ERROR: could not receive data from WAL stream: could not receive data from server: Connection timed out
2024-10-21 11:45:42.869 GMT [849296:40860] user=,db=,app=,client= LOG: background worker "logical replication worker" (PID 1697787) exited with exit code 1The message is generated in libpqrcv_receive function (replication/libpqwalreceiver/libpqwalreceiver.c) which calls pqsecure_raw_read (interfaces/libpq/fe-secure.c)
The last message "Connection timed out" is the errno translation from the recv system function:
ETIMEDOUT Connection timed out (POSIX.1-2001)
When those timeout occurred, the sender was still busy deleting files from data/pg_replslot/bdcpb21_sene, accumulating more than 6 millions small ".spill" files.
It seems this very long pause is at cleanup stage were PG is blindly trying to delete those files.strace on wal sender show tons of calls like:
unlink("pg_replslot/bdcpb21_sene/xid-2 721 821 917-lsn-439C-0.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-1000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-2000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-3000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-4000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-5000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)This occurs in ReorderBufferRestoreCleanup (backend/replication/logical/reorderbuffer.c).
The call stack presumes this may probably occur in DecodeCommit or DecodeAbort (backend/replication/logical/decode.c):unlink("pg_replslot/bdcpb21_sene/xid-2730444214-lsn-43A6-88000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
/usr/lib64/libc-2.17.so(unlink+0x7) [0xf12e7]
/usr/pgsql-15/bin/postgres(ReorderBufferRestoreCleanup.isra.17+0x5d) [0x769e3d]
/usr/pgsql-15/bin/postgres(ReorderBufferCleanupTXN+0x166) [0x76aec6] <=== replication/logical/reorderbuff.c:1480 (mais cette fonction (static) n'est utiliée qu'au sein de ce module ...)
/usr/pgsql-15/bin/postgres(xact_decode+0x1e7) [0x75f217] <=== replication/logical/decode.c:175
/usr/pgsql-15/bin/postgres(LogicalDecodingProcessRecord+0x73) [0x75eee3] <=== replication/logical/decode.c:90, appelle la fonction rmgr.rm_decode(ctx, &buf) = 1 des 6 méthodes du resource manager
/usr/pgsql-15/bin/postgres(XLogSendLogical+0x4e) [0x78294e]
/usr/pgsql-15/bin/postgres(WalSndLoop+0x151) [0x785121]
/usr/pgsql-15/bin/postgres(exec_replication_command+0xcba) [0x785f4a]
/usr/pgsql-15/bin/postgres(PostgresMain+0xfa8) [0x7d0588]
/usr/pgsql-15/bin/postgres(ServerLoop+0xa8a) [0x493b97]
/usr/pgsql-15/bin/postgres(PostmasterMain+0xe6c) [0x74d66c]
/usr/pgsql-15/bin/postgres(main+0x1c5) [0x494a05]
/usr/lib64/libc-2.17.so(__libc_start_main+0xf4) [0x22554]
/usr/pgsql-15/bin/postgres(_start+0x28) [0x494fb8]We did not find any other option than deleting the subscription to stop that loop and start a new one (thus loosing transactions).
The publisher is PostgreSQL 15.6
The subscriber is PostgreSQL 14.5Thanks
Hi,
Do you have a reproducible test case for the above scenario? Please
share the same.
I am also trying to reproduce the above issue by generating large no.
of spill files.
Thanks and Regards,
Shlok Kyal
This how to reproduce the problem.
Session 1:
psql -c "CREATE TABLE test (i int)" -c "INSERT INTO test SELECT generate_series(1, 2_000_000)"
Session 2:
pg_recvlogical -d postgres --slot=test --create-slot
pg_recvlogical -d postgres --slot=test --start -f -
Session 3:
cd data/pg_repslots
watch 'ls test | wc -l'
Session 1:
date
time psql -c "BEGIN" -c "
DO LANGUAGE plpgsql
\$\$
DECLARE
cur CURSOR FOR SELECT * FROM test FOR UPDATE;
rec record;
BEGIN
FOR rec IN cur LOOP
BEGIN
UPDATE test SET i = i + 1 WHERE CURRENT OF cur;
EXCEPTION
WHEN no_data_found THEN
RAISE NOTICE 'no data found exception';
END;
END LOOP;
END;
\$\$
" -c "ROLLBACK"
date
mer. 11 déc. 2024 08:59:03 CET
BEGIN
DO
ROLLBACK
real 0m17,071s
user 0m0,003s
sys 0m0,000s
mer. 11 déc. 2024 08:59:21 CET
Session 3: Watch session
Count increases up to
Wed Dec 11 09:00:02 2024
1434930
Then decreases down to 1
Wed Dec 11 09:03:17 2024
1
Session 2:
Appears last (after spill files deleted)
BEGIN 12874409
COMMIT 12874409
Conclusion:
- The exception block is responsible for generating subtransactions
- Although the transaction lasted 17s, one can see that the decoding was a bit late (40 seconds), but
- spent an extra 200s to delete the spill files !
On Wed, 6 Nov 2024 at 13:07, RECHTÉ Marc <marc.rechte@meteo.fr> wrote:
Hello,
For some unknown reason (probably a very big transaction at the source), we experienced a logical decoding breakdown,
due to a timeout from the subscriber side (either wal_receiver_timeout or connexion drop by network equipment due to inactivity).The problem is, that due to that failure, the wal_receiver process stops. When the wal_sender is ready to send some data, it finds the connexion broken and exits.
A new wal_sender process is created that restarts from the beginning (restart LSN). This is an endless loop.Checking the network connexion between wal_sender and wal_receiver, we found that no traffic occurs for hours.
We first increased wal_receiver_timeout up to 12h and still got a disconnection on the receiver party:
2024-10-17 16:31:58.645 GMT [1356203:2] user=,db=,app=,client= ERROR: terminating logical replication worker due to timeout
2024-10-17 16:31:58.648 GMT [849296:212] user=,db=,app=,client= LOG: background worker "logical replication worker" (PID 1356203) exited with exit code 1Then put this parameter to 0, but got then disconnected by the network (note the slight difference in message):
2024-10-21 11:45:42.867 GMT [1697787:2] user=,db=,app=,client= ERROR: could not receive data from WAL stream: could not receive data from server: Connection timed out
2024-10-21 11:45:42.869 GMT [849296:40860] user=,db=,app=,client= LOG: background worker "logical replication worker" (PID 1697787) exited with exit code 1The message is generated in libpqrcv_receive function (replication/libpqwalreceiver/libpqwalreceiver.c) which calls pqsecure_raw_read (interfaces/libpq/fe-secure.c)
The last message "Connection timed out" is the errno translation from the recv system function:
ETIMEDOUT Connection timed out (POSIX.1-2001)
When those timeout occurred, the sender was still busy deleting files from data/pg_replslot/bdcpb21_sene, accumulating more than 6 millions small ".spill" files.
It seems this very long pause is at cleanup stage were PG is blindly trying to delete those files.strace on wal sender show tons of calls like:
unlink("pg_replslot/bdcpb21_sene/xid-2 721 821 917-lsn-439C-0.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-1000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-2000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-3000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-4000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
unlink("pg_replslot/bdcpb21_sene/xid-2721821917-lsn-439C-5000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)This occurs in ReorderBufferRestoreCleanup (backend/replication/logical/reorderbuffer.c).
The call stack presumes this may probably occur in DecodeCommit or DecodeAbort (backend/replication/logical/decode.c):unlink("pg_replslot/bdcpb21_sene/xid-2730444214-lsn-43A6-88000000.spill") = -1 ENOENT (Aucun fichier ou dossier de ce type)
/usr/lib64/libc-2.17.so(unlink+0x7) [0xf12e7]
/usr/pgsql-15/bin/postgres(ReorderBufferRestoreCleanup.isra.17+0x5d) [0x769e3d]
/usr/pgsql-15/bin/postgres(ReorderBufferCleanupTXN+0x166) [0x76aec6] <=== replication/logical/reorderbuff.c:1480 (mais cette fonction (static) n'est utiliée qu'au sein de ce module ...)
/usr/pgsql-15/bin/postgres(xact_decode+0x1e7) [0x75f217] <=== replication/logical/decode.c:175
/usr/pgsql-15/bin/postgres(LogicalDecodingProcessRecord+0x73) [0x75eee3] <=== replication/logical/decode.c:90, appelle la fonction rmgr.rm_decode(ctx, &buf) = 1 des 6 méthodes du resource manager
/usr/pgsql-15/bin/postgres(XLogSendLogical+0x4e) [0x78294e]
/usr/pgsql-15/bin/postgres(WalSndLoop+0x151) [0x785121]
/usr/pgsql-15/bin/postgres(exec_replication_command+0xcba) [0x785f4a]
/usr/pgsql-15/bin/postgres(PostgresMain+0xfa8) [0x7d0588]
/usr/pgsql-15/bin/postgres(ServerLoop+0xa8a) [0x493b97]
/usr/pgsql-15/bin/postgres(PostmasterMain+0xe6c) [0x74d66c]
/usr/pgsql-15/bin/postgres(main+0x1c5) [0x494a05]
/usr/lib64/libc-2.17.so(__libc_start_main+0xf4) [0x22554]
/usr/pgsql-15/bin/postgres(_start+0x28) [0x494fb8]We did not find any other option than deleting the subscription to stop that loop and start a new one (thus loosing transactions).
The publisher is PostgreSQL 15.6
The subscriber is PostgreSQL 14.5Thanks
Hi,
Do you have a reproducible test case for the above scenario? Please
share the same.
I am also trying to reproduce the above issue by generating large no.
of spill files.
Thanks and Regards,
Shlok Kyal
On Wed, 11 Dec 2024 at 14:29, RECHTÉ Marc <marc.rechte@meteo.fr> wrote:
This how to reproduce the problem.
Session 1:
psql -c "CREATE TABLE test (i int)" -c "INSERT INTO test SELECT generate_series(1, 2_000_000)"
Session 2:
pg_recvlogical -d postgres --slot=test --create-slot
pg_recvlogical -d postgres --slot=test --start -f -Session 3:
cd data/pg_repslots
watch 'ls test | wc -l'Session 1:
date
time psql -c "BEGIN" -c "
DO LANGUAGE plpgsql
\$\$
DECLARE
cur CURSOR FOR SELECT * FROM test FOR UPDATE;
rec record;
BEGIN
FOR rec IN cur LOOP
BEGIN
UPDATE test SET i = i + 1 WHERE CURRENT OF cur;
EXCEPTION
WHEN no_data_found THEN
RAISE NOTICE 'no data found exception';
END;
END LOOP;
END;
\$\$
" -c "ROLLBACK"date
mer. 11 déc. 2024 08:59:03 CET
BEGIN
DO
ROLLBACKreal 0m17,071s
user 0m0,003s
sys 0m0,000s
mer. 11 déc. 2024 08:59:21 CETSession 3: Watch session
Count increases up to
Wed Dec 11 09:00:02 2024
1434930Then decreases down to 1
Wed Dec 11 09:03:17 2024
1Session 2:
Appears last (after spill files deleted)
BEGIN 12874409
COMMIT 12874409Conclusion:
- The exception block is responsible for generating subtransactions
- Although the transaction lasted 17s, one can see that the decoding was a bit late (40 seconds), but
- spent an extra 200s to delete the spill files !
Hi,
Thanks for sharing the test case.
Unfortunately I donot have a powerful machine which would generate
such large number of spill files. But I created a patch as per your
suggestion in point(2) in thread [1]/messages/by-id/1430556325.185731745.1731484846410.JavaMail.zimbra@meteo.fr. Can you test with this patch on
your machine?
With this patch instead of calling unlink for every wal segment, we
are first reading the directory and filtering the files related to our
transaction and then unlinking those files.
You can apply the patch on your publisher source code and check. I
have created this patch on top of Postgres 15.6.
[1]: /messages/by-id/1430556325.185731745.1731484846410.JavaMail.zimbra@meteo.fr
Thanks and Regards,
Shlok Kyal
Attachments:
scan_dir.patchapplication/x-patch; name=scan_dir.patchDownload
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 3a68a393d2..2f5077c0b6 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -4409,27 +4409,38 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
static void
ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
{
- XLogSegNo first;
- XLogSegNo cur;
- XLogSegNo last;
+ DIR *spill_dir;
+ struct dirent *spill_de;
+ struct stat statbuf;
+ char path[MAXPGPATH * 2 + sizeof(PG_REPLSLOT_DIR)];
+ char file_filter[MAXPGPATH];
- Assert(txn->first_lsn != InvalidXLogRecPtr);
- Assert(txn->final_lsn != InvalidXLogRecPtr);
+ sprintf(path, "%s/%s", PG_REPLSLOT_DIR,
+ NameStr(MyReplicationSlot->data.name));
- XLByteToSeg(txn->first_lsn, first, wal_segment_size);
- XLByteToSeg(txn->final_lsn, last, wal_segment_size);
+ /* we're only handling directories here, skip if it's not ours */
+ if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
+ return;
+
+ snprintf(file_filter, MAXPGPATH, "xid-%u-", txn->xid);
- /* iterate over all possible filenames, and delete them */
- for (cur = first; cur <= last; cur++)
+ spill_dir = AllocateDir(path);
+ while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
{
- char path[MAXPGPATH];
+ /* only look at names that can be ours */
+ if (strstr(spill_de->d_name, file_filter))
+ {
+ snprintf(path, sizeof(path),
+ "%s/%s/%s", PG_REPLSLOT_DIR,
+ NameStr(MyReplicationSlot->data.name), spill_de->d_name);
- ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, cur);
- if (unlink(path) != 0 && errno != ENOENT)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not remove file \"%s\": %m", path)));
+ if (unlink(path) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not remove file \"%s\": %m", path)));
+ }
}
+ FreeDir(spill_dir);
}
/*
Hi,
Thanks for sharing the test case.
Unfortunately I donot have a powerful machine which would generate
such large number of spill files. But I created a patch as per your
suggestion in point(2) in thread [1]/messages/by-id/1430556325.185731745.1731484846410.JavaMail.zimbra@meteo.fr. Can you test with this patch on
your machine?
With this patch instead of calling unlink for every wal segment, we
are first reading the directory and filtering the files related to our
transaction and then unlinking those files.
You can apply the patch on your publisher source code and check. I
have created this patch on top of Postgres 15.6.
[1]: /messages/by-id/1430556325.185731745.1731484846410.JavaMail.zimbra@meteo.fr
Thanks and Regards,
Shlok Kyal
Thanks for the parch.
I tried it, but it does not compile.
Attached another version that I tested on PostgreSQL 17.2.
This is much worse: it deletes only 3 files / s !
The problem here, is that for a given xid, there is just one spill file to delete.
ReorderBufferRestoreCleanup is called over a million times, so for each call,
it has to open the directory and filter the one file to delete.
By the way, you don't need a powerful machine to test, as spill files are very small.
Marc
Attachments:
scan_dir2.patchtext/x-patch; name=scan_dir2.patchDownload
--- a/src/backend/replication/logical/reorderbuffer.c 2024-12-12 10:21:12.962202250 +0100
+++ b/src/backend/replication/logical/reorderbuffer.c 2024-12-12 11:40:24.217634527 +0100
@@ -4567,27 +4567,35 @@
static void
ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
{
- XLogSegNo first;
- XLogSegNo cur;
- XLogSegNo last;
+ DIR *spill_dir;
+ struct dirent *spill_de;
+ char path[MAXPGPATH];
+ char spill_path[MAXPGPATH];
+ char file_filter[MAXPGPATH];
+ int len_filter;
+
+ snprintf(spill_path, MAXPGPATH, "pg_replslot/%s",
+ NameStr(MyReplicationSlot->data.name));
- Assert(txn->first_lsn != InvalidXLogRecPtr);
- Assert(txn->final_lsn != InvalidXLogRecPtr);
-
- XLByteToSeg(txn->first_lsn, first, wal_segment_size);
- XLByteToSeg(txn->final_lsn, last, wal_segment_size);
-
- /* iterate over all possible filenames, and delete them */
- for (cur = first; cur <= last; cur++)
- {
- char path[MAXPGPATH];
-
- ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, cur);
- if (unlink(path) != 0 && errno != ENOENT)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not remove file \"%s\": %m", path)));
- }
+ snprintf(file_filter, sizeof(file_filter), "xid-%u-", txn->xid);
+ len_filter = strlen(file_filter);
+
+ spill_dir = AllocateDir(spill_path);
+ while ((spill_de = ReadDirExtended(spill_dir, spill_path, INFO)) != NULL)
+ {
+ /* only look at names that can be ours */
+ if (strncmp(spill_de->d_name, file_filter, len_filter) == 0)
+ {
+ snprintf(path, sizeof(path), "%s/%s", spill_path,
+ spill_de->d_name);
+
+ if (unlink(path) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not remove file \"%s\": %m", path)));
+ }
+ }
+ FreeDir(spill_dir);
}
/*
On Thu, 12 Dec 2024 at 19:20, RECHTÉ Marc <marc.rechte@meteo.fr> wrote:
Hi,
Thanks for sharing the test case.
Unfortunately I donot have a powerful machine which would generate
such large number of spill files. But I created a patch as per your
suggestion in point(2) in thread [1]. Can you test with this patch on
your machine?With this patch instead of calling unlink for every wal segment, we
are first reading the directory and filtering the files related to our
transaction and then unlinking those files.
You can apply the patch on your publisher source code and check. I
have created this patch on top of Postgres 15.6.[1]: /messages/by-id/1430556325.185731745.1731484846410.JavaMail.zimbra@meteo.fr
Thanks and Regards,
Shlok KyalThanks for the parch.
I tried it, but it does not compile.
Attached another version that I tested on PostgreSQL 17.2.
This is much worse: it deletes only 3 files / s !
The problem here, is that for a given xid, there is just one spill file to delete.
ReorderBufferRestoreCleanup is called over a million times, so for each call,
it has to open the directory and filter the one file to delete.By the way, you don't need a powerful machine to test, as spill files are very small.
Thanks for sharing the analysis.
I tested the patch on my machine as well and it has worse performance
for me as well.
I came up with an alternate approach. In this approach we keep track
of wal segment the transaction is part of. This helps to iterate
through only required files during clean up.
On my machine, I am running the testcase provided by you in [1]. It is
generating ~1.9 million spill files. For me the transaction completed
in 56sec.
Cleanup (deletion of spill files) took around following time:
With HEAD : ~ 5min
With latest patch (attached here) : ~2min
Can you test if this improves performance for you?
The patch applies on HEAD.
Thanks and Regards,
Shlok Kyal
Attachments:
track_wal_segments.patchapplication/octet-stream; name=track_wal_segments.patchDownload
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index e3a5c7b660..61943e72d8 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -149,7 +149,7 @@ typedef struct ReorderBufferIterTXNEntry
ReorderBufferChange *change;
ReorderBufferTXN *txn;
TXNEntryFile file;
- XLogSegNo segno;
+ int restore_from;
} ReorderBufferIterTXNEntry;
typedef struct ReorderBufferIterTXNState
@@ -254,7 +254,7 @@ static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
int fd, ReorderBufferChange *change);
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
- TXNEntryFile *file, XLogSegNo *segno);
+ TXNEntryFile *file, int *restore_from);
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
char *data);
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
@@ -432,6 +432,7 @@ ReorderBufferGetTXN(ReorderBuffer *rb)
/* InvalidCommandId is not zero, so set it explicitly */
txn->command_id = InvalidCommandId;
txn->output_plugin_private = NULL;
+ txn->walsgmts = NIL;
return txn;
}
@@ -1305,7 +1306,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
for (off = 0; off < state->nr_txns; off++)
{
state->entries[off].file.vfd = -1;
- state->entries[off].segno = 0;
+ state->entries[off].restore_from = 0;
}
/* allocate heap */
@@ -1333,7 +1334,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
/* serialize remaining changes */
ReorderBufferSerializeTXN(rb, txn);
ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
- &state->entries[off].segno);
+ &state->entries[off].restore_from);
}
cur_change = dlist_head_element(ReorderBufferChange, node,
@@ -1363,7 +1364,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
ReorderBufferSerializeTXN(rb, cur_txn);
ReorderBufferRestoreChanges(rb, cur_txn,
&state->entries[off].file,
- &state->entries[off].segno);
+ &state->entries[off].restore_from);
}
cur_change = dlist_head_element(ReorderBufferChange, node,
&cur_txn->changes);
@@ -1448,7 +1449,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
*/
rb->totalBytes += entry->txn->size;
if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
- &state->entries[off].segno))
+ &state->entries[off].restore_from))
{
/* successfully restored changes from disk */
ReorderBufferChange *next_change =
@@ -3715,6 +3716,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
XLogSegNo curOpenSegNo = 0;
Size spilled = 0;
Size size = txn->size;
+ MemoryContext oldcontext;
elog(DEBUG2, "spill %u changes in XID %u to disk",
(uint32) txn->nentries_mem, txn->xid);
@@ -3758,7 +3760,17 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
/* open segment, create it if necessary */
fd = OpenTransientFile(path,
- O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
+ O_CREAT | O_EXCL | O_WRONLY | O_APPEND | PG_BINARY);
+
+ if (fd < 0)
+ fd = OpenTransientFile(path,
+ O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
+ else
+ {
+ oldcontext = MemoryContextSwitchTo(rb->context);
+ txn->walsgmts = lappend(txn->walsgmts, curOpenSegNo);
+ MemoryContextSwitchTo(oldcontext);
+ }
if (fd < 0)
ereport(ERROR,
@@ -4255,16 +4267,12 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
*/
static Size
ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
- TXNEntryFile *file, XLogSegNo *segno)
+ TXNEntryFile *file, int *restore_from)
{
Size restored = 0;
- XLogSegNo last_segno;
dlist_mutable_iter cleanup_iter;
File *fd = &file->vfd;
- Assert(txn->first_lsn != InvalidXLogRecPtr);
- Assert(txn->final_lsn != InvalidXLogRecPtr);
-
/* free current entries, so we have memory for more */
dlist_foreach_modify(cleanup_iter, &txn->changes)
{
@@ -4277,9 +4285,8 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
txn->nentries_mem = 0;
Assert(dlist_is_empty(&txn->changes));
- XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
-
- while (restored < max_changes_in_memory && *segno <= last_segno)
+ while (restored < max_changes_in_memory &&
+ (*restore_from) < txn->walsgmts->length)
{
int readBytes;
ReorderBufferDiskChange *ondisk;
@@ -4289,19 +4296,21 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
if (*fd == -1)
{
char path[MAXPGPATH];
+ ListCell *lc;
+ XLogSegNo segno;
- /* first time in */
- if (*segno == 0)
- XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
+ /* next wal segment for the transaction */
+ lc = list_nth_cell(txn->walsgmts, *restore_from);
+ segno = lfirst(lc);
- Assert(*segno != 0 || dlist_is_empty(&txn->changes));
+ Assert(segno != 0 || dlist_is_empty(&txn->changes));
/*
* No need to care about TLIs here, only used during a single run,
* so each LSN only maps to a specific WAL record.
*/
ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
- *segno);
+ segno);
*fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
@@ -4311,7 +4320,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
if (*fd < 0 && errno == ENOENT)
{
*fd = -1;
- (*segno)++;
+ (*restore_from)++;
continue;
}
else if (*fd < 0)
@@ -4336,7 +4345,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
{
FileClose(*fd);
*fd = -1;
- (*segno)++;
+ (*restore_from)++;
continue;
}
else if (readBytes < 0)
@@ -4567,27 +4576,26 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
static void
ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
{
- XLogSegNo first;
- XLogSegNo cur;
- XLogSegNo last;
-
- Assert(txn->first_lsn != InvalidXLogRecPtr);
- Assert(txn->final_lsn != InvalidXLogRecPtr);
-
- XLByteToSeg(txn->first_lsn, first, wal_segment_size);
- XLByteToSeg(txn->final_lsn, last, wal_segment_size);
+ ListCell *cell;
/* iterate over all possible filenames, and delete them */
- for (cur = first; cur <= last; cur++)
+ foreach(cell, txn->walsgmts)
{
+ XLogSegNo curr_segno = (XLogSegNo) lfirst(cell);
char path[MAXPGPATH];
- ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, cur);
+ ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, curr_segno);
if (unlink(path) != 0 && errno != ENOENT)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not remove file \"%s\": %m", path)));
}
+
+ if(txn->walsgmts != NIL)
+ {
+ pfree(txn->walsgmts);
+ txn->walsgmts = NIL;
+ }
}
/*
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 3bc365a7b0..c23fa877dc 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -426,6 +426,11 @@ typedef struct ReorderBufferTXN
* Private data pointer of the output plugin.
*/
void *output_plugin_private;
+
+ /*
+ * List of wal segments this txn is part of
+ */
+ List *walsgmts;
} ReorderBufferTXN;
/* so we can define the callbacks used inside struct ReorderBuffer itself */
Dear Marc,
For some unknown reason (probably a very big transaction at the source), we
experienced a logical decoding breakdown,
...
When those timeout occurred, the sender was still busy deleting files from
data/pg_replslot/bdcpb21_sene, accumulating more than 6 millions small
".spill" files. It seems this very long pause is at cleanup stage were PG is
blindly trying to delete those files.
Thanks for reporting the issue! We will discuss and provide fix if possible.
Apart from the code fix, I have some comments from another perspective.
The publisher is PostgreSQL 15.6
The subscriber is PostgreSQL 14.5
Can you enable the parameter "streaming" to on on your system [1]https://www.postgresql.org/docs/14/sql-createsubscription.html? It allows to
stream the in-progress transactions to the subscriber side. I feel this can avoid
the case that there are many .spill files on the publisher side.
Another approach is to tune the logical_decoding_work_mem parameter [2]https://www.postgresql.org/docs/14/runtime-config-resource.html#GUC-LOGICAL-DECODING-WORK-MEM.
This specifies the maximum amount of memory used by the logical decoding, and
some changes are spilled when it exceeds the limitation. Naively, this setting
can reduce the number of files.
I hope both settings can optimize your system.
[1]: https://www.postgresql.org/docs/14/sql-createsubscription.html
[2]: https://www.postgresql.org/docs/14/runtime-config-resource.html#GUC-LOGICAL-DECODING-WORK-MEM
Best regards,
Hayato Kuroda
FUJITSU LIMITED
Can you enable the parameter "streaming" to on on your system [1]? It allows to
stream the in-progress transactions to the subscriber side. I feel this can avoid
the case that there are many .spill files on the publisher side.
Another approach is to tune the logical_decoding_work_mem parameter [2].
This specifies the maximum amount of memory used by the logical decoding, and
some changes are spilled when it exceeds the limitation. Naively, this setting
can reduce the number of files.
[1]: https://www.postgresql.org/docs/14/sql-createsubscription.html
[2]: https://www.postgresql.org/docs/14/runtime-config-resource.html#GUC-LOGICAL-DECODING-WORK-MEM
Best regards,
Hayato Kuroda
FUJITSU LIMITED
Dear Hayato,
Thanks for your suggestions that were both already tested. In our (real) case (a single transaction with 12 millions sub-transactions):
1) setting the subscription as streaming, just delay a bit the spill file surge. It does not prevent the creation of spill files.
2) we set logical_decoding_work_mem to 20GB, which probably also delayed the problem, but did not solve it.
The real problem is spill file deletions that can take days in this particular case !
Marc
Dear Marc,
Thanks for the reply!
Thanks for your suggestions that were both already tested. In our (real) case (a
single transaction with 12 millions sub-transactions):1) setting the subscription as streaming, just delay a bit the spill file surge. It does
not prevent the creation of spill files.
It is bit surprised for me because I don't know the path which transactions can
be serialized even in the streaming=on case. Let me think over it...
2) we set logical_decoding_work_mem to 20GB, which probably also delayed the
problem, but did not solve it.
Oh, I understood that you've already increased the parameter to the appropriate
value on your env. Is it right?
Best regards,
Hayato Kuroda
FUJITSU LIMITED
Dear Shlok,
Thanks for sharing the analysis.
I tested the patch on my machine as well and it has worse performance
for me as well.
I came up with an alternate approach. In this approach we keep track
of wal segment the transaction is part of. This helps to iterate
through only required files during clean up.On my machine, I am running the testcase provided by you in [1]. It is
generating ~1.9 million spill files. For me the transaction completed
in 56sec.
Cleanup (deletion of spill files) took around following time:
With HEAD : ~ 5min
With latest patch (attached here) : ~2minCan you test if this improves performance for you?
I'm also not sure the performance, but I can post my comments.
I'm not sure your patch can properly handle the list operations.
```
+ oldcontext = MemoryContextSwitchTo(rb->context);
+ txn->walsgmts = lappend(txn->walsgmts, curOpenSegNo);
+ MemoryContextSwitchTo(oldcontext);
+
```
IIUC lappend() accepts a point of a Datum, but here a normal value is passed.
Should we define a new struct which represents a node of list and append it
after it is palloc()'d?
Or your code is enough for some reasons?
```
/* iterate over all possible filenames, and delete them */
- for (cur = first; cur <= last; cur++)
+ foreach(cell, txn->walsgmts)
{
+ XLogSegNo curr_segno = (XLogSegNo) lfirst(cell);
char path[MAXPGPATH];
- ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, cur);
+ ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, curr_segno);
if (unlink(path) != 0 && errno != ENOENT)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not remove file \"%s\": %m", path)));
}
+
+ if(txn->walsgmts != NIL)
+ {
+ pfree(txn->walsgmts);
+ txn->walsgmts = NIL;
+ }
```
If above comment is accepted, I feel you can use foreach_delete_current().
=======
Also, even when we optimize the truncation of files, there is a possibility that
replication is timed out. Can you also create a patch which implements [1]/messages/by-id/CAExHW5s2_T9mULDQRKsdV72wpnA+NLT63cX51b51QQVEV4sG5g@mail.gmail.com?
[1]: /messages/by-id/CAExHW5s2_T9mULDQRKsdV72wpnA+NLT63cX51b51QQVEV4sG5g@mail.gmail.com
Best regards,
Hayato Kuroda
FUJITSU LIMITED
I came up with an alternate approach. In this approach we keep track
of wal segment the transaction is part of. This helps to iterate
through only required files during clean up.On my machine, I am running the testcase provided by you in [1]. It is
generating ~1.9 million spill files. For me the transaction completed
in 56sec.
Cleanup (deletion of spill files) took around following time:
With HEAD : ~ 5min
With latest patch (attached here) : ~2minCan you test if this improves performance for you?
The patch applies on HEAD.
Thanks again for this new patch.
Unfortunately it does not compile (17.2 source):
reorderbuffer.c: In function 'ReorderBufferSerializeTXN':
reorderbuffer.c:3771:72: error: passing argument 2 of 'lappend' makes pointer from integer without a cast [-Wint-conversion]
3771 | txn->walsgmts = lappend(txn->walsgmts, curOpenSegNo);
| ^~~~~~~~~~~~
| |
| XLogSegNo {aka long unsigned int}
and
reorderbuffer.c: In function 'ReorderBufferRestoreChanges':
reorderbuffer.c:4304:31: error: assignment to 'XLogSegNo' {aka 'long unsigned int'} from 'void *' makes integer from pointer without a cast [-Wint-conversion]
4304 | segno = lfirst(lc);
| ^
Dear Marc,
Thanks again for this new patch.
Unfortunately it does not compile (17.2 source):
Right, because of the reason I posted [1]/messages/by-id/OSCPR01MB14966B646506E0C9B81B3A4CFF5022@OSCPR01MB14966.jpnprd01.prod.outlook.com.
I updated the patch which did the same approach. It could pass my CI.
Could you please apply on 17.2 and test it?
[1]: /messages/by-id/OSCPR01MB14966B646506E0C9B81B3A4CFF5022@OSCPR01MB14966.jpnprd01.prod.outlook.com
Best regards,
Hayato Kuroda
FUJITSU LIMITED
Attachments:
0001-WIP-track-wal-segments.patchapplication/octet-stream; name=0001-WIP-track-wal-segments.patchDownload
From 99ffc05998a157c2632711aaa8b5d7c4e22ebd70 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Wed, 25 Dec 2024 08:02:33 +0000
Subject: [PATCH] WIP: track wal segments
---
.../replication/logical/reorderbuffer.c | 84 +++++++++++--------
src/include/replication/reorderbuffer.h | 7 ++
2 files changed, 58 insertions(+), 33 deletions(-)
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 9c742e96eb..b2bb4423b8 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -149,7 +149,7 @@ typedef struct ReorderBufferIterTXNEntry
ReorderBufferChange *change;
ReorderBufferTXN *txn;
TXNEntryFile file;
- XLogSegNo segno;
+ int restore_from;
} ReorderBufferIterTXNEntry;
typedef struct ReorderBufferIterTXNState
@@ -215,6 +215,11 @@ static const Size max_changes_in_memory = 4096; /* XXX for restore only */
/* GUC variable */
int debug_logical_replication_streaming = DEBUG_LOGICAL_REP_STREAMING_BUFFERED;
+typedef struct WalSgmtsEntry
+{
+ XLogSegNo segno;
+} WalSgmtsEntry;
+
/* ---------------------------------------
* primary reorderbuffer support routines
* ---------------------------------------
@@ -254,7 +259,7 @@ static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
int fd, ReorderBufferChange *change);
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
- TXNEntryFile *file, XLogSegNo *segno);
+ TXNEntryFile *file, int *restore_from);
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
char *data);
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
@@ -432,6 +437,7 @@ ReorderBufferGetTXN(ReorderBuffer *rb)
/* InvalidCommandId is not zero, so set it explicitly */
txn->command_id = InvalidCommandId;
txn->output_plugin_private = NULL;
+ txn->walsgmts = NIL;
return txn;
}
@@ -1305,7 +1311,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
for (off = 0; off < state->nr_txns; off++)
{
state->entries[off].file.vfd = -1;
- state->entries[off].segno = 0;
+ state->entries[off].restore_from = 0;
}
/* allocate heap */
@@ -1333,7 +1339,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
/* serialize remaining changes */
ReorderBufferSerializeTXN(rb, txn);
ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
- &state->entries[off].segno);
+ &state->entries[off].restore_from);
}
cur_change = dlist_head_element(ReorderBufferChange, node,
@@ -1363,7 +1369,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
ReorderBufferSerializeTXN(rb, cur_txn);
ReorderBufferRestoreChanges(rb, cur_txn,
&state->entries[off].file,
- &state->entries[off].segno);
+ &state->entries[off].restore_from);
}
cur_change = dlist_head_element(ReorderBufferChange, node,
&cur_txn->changes);
@@ -1448,7 +1454,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
*/
rb->totalBytes += entry->txn->size;
if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
- &state->entries[off].segno))
+ &state->entries[off].restore_from))
{
/* successfully restored changes from disk */
ReorderBufferChange *next_change =
@@ -3715,6 +3721,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
XLogSegNo curOpenSegNo = 0;
Size spilled = 0;
Size size = txn->size;
+ MemoryContext oldcontext;
elog(DEBUG2, "spill %u changes in XID %u to disk",
(uint32) txn->nentries_mem, txn->xid);
@@ -3758,7 +3765,23 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
/* open segment, create it if necessary */
fd = OpenTransientFile(path,
- O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
+ O_CREAT | O_EXCL | O_WRONLY | O_APPEND | PG_BINARY);
+
+ if (fd < 0)
+ fd = OpenTransientFile(path,
+ O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
+ else
+ {
+ WalSgmtsEntry *entry;
+
+ oldcontext = MemoryContextSwitchTo(rb->context);
+
+ entry = palloc(sizeof(WalSgmtsEntry));
+ entry->segno = curOpenSegNo;
+
+ txn->walsgmts = lappend(txn->walsgmts, entry);
+ MemoryContextSwitchTo(oldcontext);
+ }
if (fd < 0)
ereport(ERROR,
@@ -4255,16 +4278,12 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
*/
static Size
ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
- TXNEntryFile *file, XLogSegNo *segno)
+ TXNEntryFile *file, int *restore_from)
{
Size restored = 0;
- XLogSegNo last_segno;
dlist_mutable_iter cleanup_iter;
File *fd = &file->vfd;
- Assert(txn->first_lsn != InvalidXLogRecPtr);
- Assert(txn->final_lsn != InvalidXLogRecPtr);
-
/* free current entries, so we have memory for more */
dlist_foreach_modify(cleanup_iter, &txn->changes)
{
@@ -4277,9 +4296,8 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
txn->nentries_mem = 0;
Assert(dlist_is_empty(&txn->changes));
- XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
-
- while (restored < max_changes_in_memory && *segno <= last_segno)
+ while (restored < max_changes_in_memory &&
+ (*restore_from) < txn->walsgmts->length)
{
int readBytes;
ReorderBufferDiskChange *ondisk;
@@ -4289,19 +4307,23 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
if (*fd == -1)
{
char path[MAXPGPATH];
+ ListCell *lc;
+ WalSgmtsEntry *entry;
+ XLogSegNo segno;
- /* first time in */
- if (*segno == 0)
- XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
+ /* Next wal segment for the transaction */
+ lc = list_nth_cell(txn->walsgmts, *restore_from);
+ entry = (WalSgmtsEntry *) lfirst(lc);
+ segno = entry->segno;
- Assert(*segno != 0 || dlist_is_empty(&txn->changes));
+ Assert(segno != 0 || dlist_is_empty(&txn->changes));
/*
* No need to care about TLIs here, only used during a single run,
* so each LSN only maps to a specific WAL record.
*/
ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
- *segno);
+ segno);
*fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
@@ -4311,7 +4333,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
if (*fd < 0 && errno == ENOENT)
{
*fd = -1;
- (*segno)++;
+ (*restore_from)++;
continue;
}
else if (*fd < 0)
@@ -4336,7 +4358,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
{
FileClose(*fd);
*fd = -1;
- (*segno)++;
+ (*restore_from)++;
continue;
}
else if (readBytes < 0)
@@ -4567,26 +4589,22 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
static void
ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
{
- XLogSegNo first;
- XLogSegNo cur;
- XLogSegNo last;
-
- Assert(txn->first_lsn != InvalidXLogRecPtr);
- Assert(txn->final_lsn != InvalidXLogRecPtr);
-
- XLByteToSeg(txn->first_lsn, first, wal_segment_size);
- XLByteToSeg(txn->final_lsn, last, wal_segment_size);
+ ListCell *cell;
/* iterate over all possible filenames, and delete them */
- for (cur = first; cur <= last; cur++)
+ foreach(cell, txn->walsgmts)
{
+ WalSgmtsEntry *entry = (WalSgmtsEntry *)lfirst(cell);
+ XLogSegNo curr_segno = entry->segno;
char path[MAXPGPATH];
- ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, cur);
+ ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, curr_segno);
if (unlink(path) != 0 && errno != ENOENT)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not remove file \"%s\": %m", path)));
+
+ txn->walsgmts = foreach_delete_current(txn->walsgmts, cell);
}
}
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 7de50462dc..b2ef2640aa 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -422,6 +422,13 @@ typedef struct ReorderBufferTXN
* Private data pointer of the output plugin.
*/
void *output_plugin_private;
+
+ /*
+ * List of wal segments this txn is part of.
+ *
+ * XXX: check whether the attribute doesn't break ABI
+ */
+ List *walsgmts;
} ReorderBufferTXN;
/* so we can define the callbacks used inside struct ReorderBuffer itself */
--
2.43.0
On Wed, 25 Dec 2024 at 13:55, Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:
Dear Marc,
Thanks again for this new patch.
Unfortunately it does not compile (17.2 source):
Right, because of the reason I posted [1].
I updated the patch which did the same approach. It could pass my CI.
Let's conduct some performance tests with varying numbers of spill
files (e.g., small ones like 1, 5, and 10, and larger ones like 100,
1000, and 10,000) along with different levels of concurrent
transactions. We can then compare the results with the current HEAD.
Regards,
Vignesh
Right, because of the reason I posted [1].
I updated the patch which did the same approach. It could pass my CI.
Could you please apply on 17.2 and test it?[1]:
/messages/by-id/OSCPR01MB14966B646506E0C9B81B3A4CFF5022@OSCPR01MB14966.jpnprd01.prod.outlook.com
This is a considerable improvement, the cleanup phase took less than 30s (compared to the former 200s).
However, we are talking of a 12s transaction, that takes an overall 64s to be replicated.
In this particular case, the replication system spends most of its time creating / deleting small files.
Would not be possible to create just one spill file for the main transaction ?
On Fri, 27 Dec 2024 at 09:41, vignesh C <vignesh21@gmail.com> wrote:
On Wed, 25 Dec 2024 at 13:55, Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:Dear Marc,
Thanks again for this new patch.
Unfortunately it does not compile (17.2 source):
Right, because of the reason I posted [1].
I updated the patch which did the same approach. It could pass my CI.
Let's conduct some performance tests with varying numbers of spill
files (e.g., small ones like 1, 5, and 10, and larger ones like 100,
1000, and 10,000) along with different levels of concurrent
transactions. We can then compare the results with the current HEAD.
Hi Vignesh,
I did the performance testing with the patch and compared the time
taken for Cleanup of spill files with HEAD and with Patch. Following
are the results:
(All timing are average of 3 runs)
No. of Spill Files | Head (nano sec) | Avg Patch (nano sec) | No. to
times Patch is faster than HEAD
--------------------------------------------------------------------------------------------------------------------------------------
10000 | 68057452 | 442923.3333
| 153.6551518
1000 | 737519.6667 | 46049.33333
| 16.0158598
500 | 199108 | 19401.66667
| 10.26241732
100 | 12566.66667 | 3921
| 3.20496472
10 | 576 | 466.3333333
| 1.235167977
1 | 60 | 56.33333333
| 1.065088757
I have made a patch to log the time for cleanup for the transaction. I
have attached the test perl script here as well.
To get the time to clean up the spill files, I run the perl script and
then check the publisher log for the time of cleanup of the very first
transaction.
Thanks and Regards,
Shlok Kyal
Attachments:
cleanup_logs.patchapplication/octet-stream; name=cleanup_logs.patchDownload
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 9313d2b01e..de4a6f7baa 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -88,6 +88,7 @@
#include <unistd.h>
#include <sys/stat.h>
+#include <sys/time.h>
#include "access/detoast.h"
#include "access/heapam.h"
@@ -1515,6 +1516,9 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
bool found;
dlist_mutable_iter iter;
Size mem_freed = 0;
+ ListCell *cell;
+ struct timeval stop, start;
+ gettimeofday(&start, NULL);
/* cleanup subtransactions & their changes */
dlist_foreach_modify(iter, &txn->subtxns)
@@ -1614,6 +1618,9 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
if (rbtxn_is_serialized(txn))
ReorderBufferRestoreCleanup(rb, txn);
+ gettimeofday(&stop, NULL);
+ elog(LOG, "Time to Cleanup txn: %d is : %lu", txn->xid, (stop.tv_sec - start.tv_sec) * 1000000 + stop.tv_usec - start.tv_usec);
+
/* deallocate */
ReorderBufferReturnTXN(rb, txn);
}
Hayato Kuroda kindly rebased the patch.
Attachments:
v2-0001-WIP-track-wal-segments.patchapplication/mbox; name=v2-0001-WIP-track-wal-segments.patchDownload
From 1f78955508dd31887f681d802c44b77eae921a30 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Wed, 25 Dec 2024 08:02:33 +0000
Subject: [PATCH v2] WIP: track wal segments
---
.../replication/logical/reorderbuffer.c | 84 +++++++++++--------
src/include/replication/reorderbuffer.h | 7 ++
2 files changed, 58 insertions(+), 33 deletions(-)
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 5186ad2a39..5775b5022c 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -150,7 +150,7 @@ typedef struct ReorderBufferIterTXNEntry
ReorderBufferChange *change;
ReorderBufferTXN *txn;
TXNEntryFile file;
- XLogSegNo segno;
+ int restore_from;
} ReorderBufferIterTXNEntry;
typedef struct ReorderBufferIterTXNState
@@ -216,6 +216,11 @@ static const Size max_changes_in_memory = 4096; /* XXX for restore only */
/* GUC variable */
int debug_logical_replication_streaming = DEBUG_LOGICAL_REP_STREAMING_BUFFERED;
+typedef struct WalSgmtsEntry
+{
+ XLogSegNo segno;
+} WalSgmtsEntry;
+
/* ---------------------------------------
* primary reorderbuffer support routines
* ---------------------------------------
@@ -255,7 +260,7 @@ static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
int fd, ReorderBufferChange *change);
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
- TXNEntryFile *file, XLogSegNo *segno);
+ TXNEntryFile *file, int *restore_from);
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
char *data);
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
@@ -435,6 +440,7 @@ ReorderBufferGetTXN(ReorderBuffer *rb)
/* InvalidCommandId is not zero, so set it explicitly */
txn->command_id = InvalidCommandId;
txn->output_plugin_private = NULL;
+ txn->walsgmts = NIL;
return txn;
}
@@ -1308,7 +1314,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
for (off = 0; off < state->nr_txns; off++)
{
state->entries[off].file.vfd = -1;
- state->entries[off].segno = 0;
+ state->entries[off].restore_from = 0;
}
/* allocate heap */
@@ -1336,7 +1342,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
/* serialize remaining changes */
ReorderBufferSerializeTXN(rb, txn);
ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
- &state->entries[off].segno);
+ &state->entries[off].restore_from);
}
cur_change = dlist_head_element(ReorderBufferChange, node,
@@ -1366,7 +1372,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
ReorderBufferSerializeTXN(rb, cur_txn);
ReorderBufferRestoreChanges(rb, cur_txn,
&state->entries[off].file,
- &state->entries[off].segno);
+ &state->entries[off].restore_from);
}
cur_change = dlist_head_element(ReorderBufferChange, node,
&cur_txn->changes);
@@ -1451,7 +1457,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
*/
rb->totalBytes += entry->txn->size;
if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
- &state->entries[off].segno))
+ &state->entries[off].restore_from))
{
/* successfully restored changes from disk */
ReorderBufferChange *next_change =
@@ -3838,6 +3844,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
XLogSegNo curOpenSegNo = 0;
Size spilled = 0;
Size size = txn->size;
+ MemoryContext oldcontext;
elog(DEBUG2, "spill %u changes in XID %u to disk",
(uint32) txn->nentries_mem, txn->xid);
@@ -3881,7 +3888,23 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
/* open segment, create it if necessary */
fd = OpenTransientFile(path,
- O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
+ O_CREAT | O_EXCL | O_WRONLY | O_APPEND | PG_BINARY);
+
+ if (fd < 0)
+ fd = OpenTransientFile(path,
+ O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
+ else
+ {
+ WalSgmtsEntry *entry;
+
+ oldcontext = MemoryContextSwitchTo(rb->context);
+
+ entry = palloc(sizeof(WalSgmtsEntry));
+ entry->segno = curOpenSegNo;
+
+ txn->walsgmts = lappend(txn->walsgmts, entry);
+ MemoryContextSwitchTo(oldcontext);
+ }
if (fd < 0)
ereport(ERROR,
@@ -4378,16 +4401,12 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
*/
static Size
ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
- TXNEntryFile *file, XLogSegNo *segno)
+ TXNEntryFile *file, int *restore_from)
{
Size restored = 0;
- XLogSegNo last_segno;
dlist_mutable_iter cleanup_iter;
File *fd = &file->vfd;
- Assert(txn->first_lsn != InvalidXLogRecPtr);
- Assert(txn->final_lsn != InvalidXLogRecPtr);
-
/* free current entries, so we have memory for more */
dlist_foreach_modify(cleanup_iter, &txn->changes)
{
@@ -4400,9 +4419,8 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
txn->nentries_mem = 0;
Assert(dlist_is_empty(&txn->changes));
- XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
-
- while (restored < max_changes_in_memory && *segno <= last_segno)
+ while (restored < max_changes_in_memory &&
+ (*restore_from) < txn->walsgmts->length)
{
int readBytes;
ReorderBufferDiskChange *ondisk;
@@ -4412,19 +4430,23 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
if (*fd == -1)
{
char path[MAXPGPATH];
+ ListCell *lc;
+ WalSgmtsEntry *entry;
+ XLogSegNo segno;
- /* first time in */
- if (*segno == 0)
- XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
+ /* Next wal segment for the transaction */
+ lc = list_nth_cell(txn->walsgmts, *restore_from);
+ entry = (WalSgmtsEntry *) lfirst(lc);
+ segno = entry->segno;
- Assert(*segno != 0 || dlist_is_empty(&txn->changes));
+ Assert(segno != 0 || dlist_is_empty(&txn->changes));
/*
* No need to care about TLIs here, only used during a single run,
* so each LSN only maps to a specific WAL record.
*/
ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
- *segno);
+ segno);
*fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
@@ -4434,7 +4456,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
if (*fd < 0 && errno == ENOENT)
{
*fd = -1;
- (*segno)++;
+ (*restore_from)++;
continue;
}
else if (*fd < 0)
@@ -4459,7 +4481,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
{
FileClose(*fd);
*fd = -1;
- (*segno)++;
+ (*restore_from)++;
continue;
}
else if (readBytes < 0)
@@ -4690,26 +4712,22 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
static void
ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
{
- XLogSegNo first;
- XLogSegNo cur;
- XLogSegNo last;
-
- Assert(txn->first_lsn != InvalidXLogRecPtr);
- Assert(txn->final_lsn != InvalidXLogRecPtr);
-
- XLByteToSeg(txn->first_lsn, first, wal_segment_size);
- XLByteToSeg(txn->final_lsn, last, wal_segment_size);
+ ListCell *cell;
/* iterate over all possible filenames, and delete them */
- for (cur = first; cur <= last; cur++)
+ foreach(cell, txn->walsgmts)
{
+ WalSgmtsEntry *entry = (WalSgmtsEntry *)lfirst(cell);
+ XLogSegNo curr_segno = entry->segno;
char path[MAXPGPATH];
- ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, cur);
+ ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, curr_segno);
if (unlink(path) != 0 && errno != ENOENT)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not remove file \"%s\": %m", path)));
+
+ txn->walsgmts = foreach_delete_current(txn->walsgmts, cell);
}
}
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 517a8e3634..fa72b040fb 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -452,6 +452,13 @@ typedef struct ReorderBufferTXN
* Private data pointer of the output plugin.
*/
void *output_plugin_private;
+
+ /*
+ * List of wal segments this txn is part of.
+ *
+ * XXX: check whether the attribute doesn't break ABI.
+ */
+ List *walsgmts;
} ReorderBufferTXN;
/* so we can define the callbacks used inside struct ReorderBuffer itself */
--
2.43.5