From 26efdeeecaf701b59d18f8c75954e28cfee5c0c8 Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Sat, 9 Mar 2024 16:54:56 +1300 Subject: [PATCH v6 2/3] Use vectored I/O for bulk writes. bulk_write.c was originally designed with the goal of being able to use the new vectored write APIs, but couldn't initially because the vectored variant of smgrextend() didn't exist yet. Now that smgrwritev() can also handle extension, we can use it here to get wide write system calls. Reviewed-by: Heikki Linnakangas Discussion: https://postgr.es/m/CA%2BhUKGLx5bLwezZKAYB2O_qHj%3Dov10RpgRVY7e8TSJVE74oVjg%40mail.gmail.com --- src/backend/storage/smgr/bulk_write.c | 96 ++++++++++++++++++++------- 1 file changed, 73 insertions(+), 23 deletions(-) diff --git a/src/backend/storage/smgr/bulk_write.c b/src/backend/storage/smgr/bulk_write.c index 4a10ece4c3..848c3054f5 100644 --- a/src/backend/storage/smgr/bulk_write.c +++ b/src/backend/storage/smgr/bulk_write.c @@ -8,7 +8,7 @@ * the regular buffer manager and the bulk loading interface! * * We bypass the buffer manager to avoid the locking overhead, and call - * smgrextend() directly. A downside is that the pages will need to be + * smgrwritev() directly. A downside is that the pages will need to be * re-read into shared buffers on first use after the build finishes. That's * usually a good tradeoff for large relations, and for small relations, the * overhead isn't very significant compared to creating the relation in the @@ -45,6 +45,12 @@ #define MAX_PENDING_WRITES XLR_MAX_BLOCK_ID +/* + * How many blocks to send to smgrwritev() at a time. Arbitrary value for + * now. + */ +#define MAX_BLOCKS_PER_WRITE ((128 * 1024) / BLCKSZ) + static const PGIOAlignedBlock zero_buffer = {{0}}; /* worth BLCKSZ */ typedef struct PendingWrite @@ -225,35 +231,79 @@ smgr_bulk_flush(BulkWriteState *bulkstate) for (int i = 0; i < npending; i++) { - BlockNumber blkno = pending_writes[i].blkno; - Page page = pending_writes[i].buf->data; - + Page page; + const void *pages[MAX_BLOCKS_PER_WRITE]; + BlockNumber blkno; + int nblocks; + int max_nblocks; + + /* Prepare to write the first block. */ + blkno = pending_writes[i].blkno; + page = pending_writes[i].buf->data; PageSetChecksumInplace(page, blkno); + pages[0] = page; + nblocks = 1; - if (blkno >= bulkstate->pages_written) + /* + * If we have to write pages nonsequentially, fill in the space with + * zeroes until we come back and overwrite. This is not logically + * necessary on standard Unix filesystems (unwritten space will read + * as zeroes anyway), but it should help to avoid fragmentation. The + * dummy pages aren't WAL-logged though. + */ + while (blkno > bulkstate->pages_written) + { + /* don't set checksum for all-zero page */ + smgrextend(bulkstate->smgr, bulkstate->forknum, + bulkstate->pages_written++, + &zero_buffer, + true); + } + + if (blkno < bulkstate->pages_written) { /* - * If we have to write pages nonsequentially, fill in the space - * with zeroes until we come back and overwrite. This is not - * logically necessary on standard Unix filesystems (unwritten - * space will read as zeroes anyway), but it should help to avoid - * fragmentation. The dummy pages aren't WAL-logged though. + * We're overwriting. Clamp at the existing size, because we + * can't mix writing and extending in a single operation. */ - while (blkno > bulkstate->pages_written) - { - /* don't set checksum for all-zero page */ - smgrextend(bulkstate->smgr, bulkstate->forknum, - bulkstate->pages_written++, - &zero_buffer, - true); - } - - smgrextend(bulkstate->smgr, bulkstate->forknum, blkno, page, true); - bulkstate->pages_written = pending_writes[i].blkno + 1; + max_nblocks = Min(lengthof(pages), + bulkstate->pages_written - blkno); } else - smgrwrite(bulkstate->smgr, bulkstate->forknum, blkno, page, true); - pfree(page); + { + /* We're extending. */ + Assert(blkno == bulkstate->pages_written); + max_nblocks = lengthof(pages); + } + + /* Find as many consecutive blocks as we can. */ + while (i + 1 < npending && + pending_writes[i + 1].blkno == blkno + nblocks && + nblocks < max_nblocks) + { + page = pending_writes[++i].buf->data; + PageSetChecksumInplace(page, pending_writes[i].blkno); + pages[nblocks++] = page; + } + + /* Extend or overwrite. */ + if (blkno == bulkstate->pages_written) + { + smgrwritev(bulkstate->smgr, bulkstate->forknum, blkno, + pages, nblocks, + SMGR_WRITE_SKIP_FSYNC | SMGR_WRITE_EXTEND); + bulkstate->pages_written += nblocks; + } + else + { + Assert(blkno + nblocks <= bulkstate->pages_written); + smgrwritev(bulkstate->smgr, bulkstate->forknum, blkno, + pages, nblocks, + SMGR_WRITE_SKIP_FSYNC); + } + + for (int j = 0; j < nblocks; ++j) + pfree(pending_writes[i - j].buf->data); } bulkstate->npending = 0; -- 2.39.3 (Apple Git-146)