From 187e47acc12b4983a13c9c4aad7fbc66f92db0f6 Mon Sep 17 00:00:00 2001 From: Amul Sul Date: Wed, 5 Nov 2025 15:40:36 +0530 Subject: [PATCH v6 4/8] pg_waldump: Add support for archived WAL decoding. pg_waldump can now accept the path to a tar archive containing WAL files and decode them. This feature was added primarily for pg_verifybackup, which previously disabled WAL parsing for tar-formatted backups. Note that this patch requires that the WAL files within the archive be in sequential order; an error will be reported otherwise. The next patch is planned to remove this restriction. --- doc/src/sgml/ref/pg_waldump.sgml | 8 +- src/bin/pg_waldump/Makefile | 7 +- src/bin/pg_waldump/archive_waldump.c | 577 +++++++++++++++++++++++++++ src/bin/pg_waldump/meson.build | 4 +- src/bin/pg_waldump/pg_waldump.c | 217 +++++++--- src/bin/pg_waldump/pg_waldump.h | 36 +- src/bin/pg_waldump/t/001_basic.pl | 84 +++- src/tools/pgindent/typedefs.list | 3 + 8 files changed, 860 insertions(+), 76 deletions(-) create mode 100644 src/bin/pg_waldump/archive_waldump.c diff --git a/doc/src/sgml/ref/pg_waldump.sgml b/doc/src/sgml/ref/pg_waldump.sgml index ce23add5577..d004bb0f67e 100644 --- a/doc/src/sgml/ref/pg_waldump.sgml +++ b/doc/src/sgml/ref/pg_waldump.sgml @@ -141,13 +141,17 @@ PostgreSQL documentation - Specifies a directory to search for WAL segment files or a - directory with a pg_wal subdirectory that + Specifies a tar archive or a directory to search for WAL segment files + or a directory with a pg_wal subdirectory that contains such files. The default is to search in the current directory, the pg_wal subdirectory of the current directory, and the pg_wal subdirectory of PGDATA. + + If a tar archive is provided, its WAL segment files must be in + sequential order; otherwise, an error will be reported. + diff --git a/src/bin/pg_waldump/Makefile b/src/bin/pg_waldump/Makefile index 4c1ee649501..05ac5763a57 100644 --- a/src/bin/pg_waldump/Makefile +++ b/src/bin/pg_waldump/Makefile @@ -3,6 +3,9 @@ PGFILEDESC = "pg_waldump - decode and display WAL" PGAPPICON=win32 +# make these available to TAP test scripts +export TAR + subdir = src/bin/pg_waldump top_builddir = ../../.. include $(top_builddir)/src/Makefile.global @@ -12,11 +15,13 @@ OBJS = \ $(WIN32RES) \ compat.o \ pg_waldump.o \ + archive_waldump.o \ rmgrdesc.o \ xlogreader.o \ xlogstats.o -override CPPFLAGS := -DFRONTEND $(CPPFLAGS) +override CPPFLAGS := -DFRONTEND -I$(libpq_srcdir) $(CPPFLAGS) +LDFLAGS_INTERNAL += -L$(top_builddir)/src/fe_utils -lpgfeutils RMGRDESCSOURCES = $(sort $(notdir $(wildcard $(top_srcdir)/src/backend/access/rmgrdesc/*desc*.c))) RMGRDESCOBJS = $(patsubst %.c,%.o,$(RMGRDESCSOURCES)) diff --git a/src/bin/pg_waldump/archive_waldump.c b/src/bin/pg_waldump/archive_waldump.c new file mode 100644 index 00000000000..2830c89a7be --- /dev/null +++ b/src/bin/pg_waldump/archive_waldump.c @@ -0,0 +1,577 @@ +/*------------------------------------------------------------------------- + * + * archive_waldump.c + * A generic facility for reading WAL data from tar archives via archive + * streamer. + * + * Portions Copyright (c) 2025, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_waldump/archive_waldump.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres_fe.h" + +#include + +#include "access/xlog_internal.h" +#include "common/hashfn.h" +#include "common/logging.h" +#include "fe_utils/simple_list.h" +#include "pg_waldump.h" + +/* + * How many bytes should we try to read from a file at once? + */ +#define READ_CHUNK_SIZE (128 * 1024) + +/* Structure for storing the WAL segment data from the archive */ +typedef struct ArchivedWALEntry +{ + uint32 status; /* hash status */ + XLogSegNo segno; /* hash key: WAL segment number */ + TimeLineID timeline; /* timeline of this wal file */ + + StringInfoData buf; + bool tmpseg_exists; /* spill file exists? */ + + int total_read; /* total read of this WAL segment, including + * buffered and temporarily written data */ +} ArchivedWALEntry; + +#define SH_PREFIX ArchivedWAL +#define SH_ELEMENT_TYPE ArchivedWALEntry +#define SH_KEY_TYPE XLogSegNo +#define SH_KEY segno +#define SH_HASH_KEY(tb, key) murmurhash64((uint64) key) +#define SH_EQUAL(tb, a, b) (a == b) +#define SH_GET_HASH(tb, a) a->hash +#define SH_SCOPE static inline +#define SH_RAW_ALLOCATOR pg_malloc0 +#define SH_DECLARE +#define SH_DEFINE +#include "lib/simplehash.h" + +static ArchivedWAL_hash *ArchivedWAL_HTAB = NULL; + +typedef struct astreamer_waldump +{ + astreamer base; + XLogDumpPrivate *privateInfo; +} astreamer_waldump; + +static int read_archive_file(XLogDumpPrivate *privateInfo, Size count); +static ArchivedWALEntry *get_archive_wal_entry(XLogSegNo segno, + XLogDumpPrivate *privateInfo); + +static astreamer *astreamer_waldump_new(XLogDumpPrivate *privateInfo); +static void astreamer_waldump_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_waldump_finalize(astreamer *streamer); +static void astreamer_waldump_free(astreamer *streamer); + +static bool member_is_wal_file(astreamer_waldump *mystreamer, + astreamer_member *member, + XLogSegNo *curSegNo, + TimeLineID *curTimeline); + +static const astreamer_ops astreamer_waldump_ops = { + .content = astreamer_waldump_content, + .finalize = astreamer_waldump_finalize, + .free = astreamer_waldump_free +}; + +/* + * Returns true if the given file is a tar archive and outputs its compression + * algorithm. + */ +bool +is_archive_file(const char *fname, pg_compress_algorithm *compression) +{ + int fname_len = strlen(fname); + pg_compress_algorithm compress_algo; + + /* Now, check the compression type of the tar */ + if (fname_len > 4 && + strcmp(fname + fname_len - 4, ".tar") == 0) + compress_algo = PG_COMPRESSION_NONE; + else if (fname_len > 4 && + strcmp(fname + fname_len - 4, ".tgz") == 0) + compress_algo = PG_COMPRESSION_GZIP; + else if (fname_len > 7 && + strcmp(fname + fname_len - 7, ".tar.gz") == 0) + compress_algo = PG_COMPRESSION_GZIP; + else if (fname_len > 8 && + strcmp(fname + fname_len - 8, ".tar.lz4") == 0) + compress_algo = PG_COMPRESSION_LZ4; + else if (fname_len > 8 && + strcmp(fname + fname_len - 8, ".tar.zst") == 0) + compress_algo = PG_COMPRESSION_ZSTD; + else + return false; + + *compression = compress_algo; + + return true; +} + +/* + * Initializes the tar archive reader to read WAL files from the archive, + * creates a hash table to store them, performs quick existence checks for WAL + * entries in the archive and retrieves the WAL segment size, and sets up + * filtering criteria for relevant entries. + */ +void +init_archive_reader(XLogDumpPrivate *privateInfo, const char *waldir, + pg_compress_algorithm compression) +{ + int fd; + astreamer *streamer; + ArchivedWALEntry *entry = NULL; + XLogLongPageHeader longhdr; + + /* Open tar archive and store its file descriptor */ + fd = open_file_in_directory(waldir, privateInfo->archive_name); + + if (fd < 0) + pg_fatal("could not open file \"%s\"", privateInfo->archive_name); + + privateInfo->archive_fd = fd; + + streamer = astreamer_waldump_new(privateInfo); + + /* Before that we must parse the tar archive. */ + streamer = astreamer_tar_parser_new(streamer); + + /* Before that we must decompress, if archive is compressed. */ + if (compression == PG_COMPRESSION_GZIP) + streamer = astreamer_gzip_decompressor_new(streamer); + else if (compression == PG_COMPRESSION_LZ4) + streamer = astreamer_lz4_decompressor_new(streamer); + else if (compression == PG_COMPRESSION_ZSTD) + streamer = astreamer_zstd_decompressor_new(streamer); + + privateInfo->archive_streamer = streamer; + + /* Hash table storing WAL entries read from the archive */ + ArchivedWAL_HTAB = ArchivedWAL_create(16, NULL); + + /* + * Verify that the archive contains valid WAL files and fetch WAL segment + * size + */ + while (entry == NULL || entry->buf.len < XLOG_BLCKSZ) + { + if (read_archive_file(privateInfo, XLOG_BLCKSZ) == 0) + pg_fatal("could not find WAL in \"%s\" archive", + privateInfo->archive_name); + + entry = privateInfo->cur_wal; + } + + /* Set WalSegSz if WAL data is successfully read */ + longhdr = (XLogLongPageHeader) entry->buf.data; + + WalSegSz = longhdr->xlp_seg_size; + + if (!IsValidWalSegSize(WalSegSz)) + { + pg_log_error(ngettext("invalid WAL segment size in WAL file from archive \"%s\" (%d byte)", + "invalid WAL segment size in WAL file from archive \"%s\" (%d bytes)", + WalSegSz), + privateInfo->archive_name, WalSegSz); + pg_log_error_detail("The WAL segment size must be a power of two between 1 MB and 1 GB."); + exit(1); + } + + /* + * With the WAL segment size available, we can now initialize the + * dependent start and end segment numbers. + */ + XLByteToSeg(privateInfo->startptr, privateInfo->startSegNo, WalSegSz); + XLByteToSeg(privateInfo->endptr, privateInfo->endSegNo, WalSegSz); +} + +/* + * Release the archive streamer chain and close the archive file. + */ +void +free_archive_reader(XLogDumpPrivate *privateInfo) +{ + /* + * NB: Normally, astreamer_finalize() is called before astreamer_free() to + * flush any remaining buffered data or to ensure the end of the tar + * archive is reached. However, when decoding a WAL file, once we hit the + * end LSN, any remaining WAL data in the buffer or the tar archive's + * unreached end can be safely ignored. + */ + astreamer_free(privateInfo->archive_streamer); + + /* Close the file. */ + if (close(privateInfo->archive_fd) != 0) + pg_log_error("could not close file \"%s\": %m", + privateInfo->archive_name); +} + +/* + * Copies WAL data from astreamer to readBuff; if unavailable, fetches more + * from the tar archive via astreamer. + */ +int +read_archive_wal_page(XLogDumpPrivate *privateInfo, XLogRecPtr targetPagePtr, + Size count, char *readBuff) +{ + char *p = readBuff; + Size nbytes = count; + XLogRecPtr recptr = targetPagePtr; + XLogSegNo segno; + ArchivedWALEntry *entry; + + XLByteToSeg(targetPagePtr, segno, WalSegSz); + entry = get_archive_wal_entry(segno, privateInfo); + + while (nbytes > 0) + { + char *buf = entry->buf.data; + int len = entry->buf.len; + + /* WAL record range that the buffer contains */ + XLogRecPtr endPtr; + XLogRecPtr startPtr; + + XLogSegNoOffsetToRecPtr(entry->segno, entry->total_read, + WalSegSz, endPtr); + startPtr = endPtr - len; + + Assert((endPtr - startPtr) == len); + + /* + * pg_waldump never ask the same WAL bytes more than once, so if we're + * now being asked for data beyond the end of what we've already read, + * that means none of the data we currently have in the buffer will + * ever be consulted again. So, we can discard the existing buffer + * contents and start over. + */ + if (recptr >= endPtr) + { + len = 0; + + /* Discard the buffered data */ + resetStringInfo(&entry->buf); + } + + if (len > 0 && recptr > startPtr) + { + int skipBytes = 0; + + /* + * The required offset is not at the start of the buffer, so skip + * bytes until reaching the desired offset of the target page. + */ + skipBytes = recptr - startPtr; + + buf += skipBytes; + len -= skipBytes; + } + + if (len > 0) + { + int readBytes = len >= nbytes ? nbytes : len; + + /* Ensure reading correct WAL record */ + Assert(recptr >= startPtr && recptr < endPtr); + + memcpy(p, buf, readBytes); + + /* Update state for read */ + nbytes -= readBytes; + p += readBytes; + recptr += readBytes; + } + else + { + /* + * Fetch more data; raise an error if it's not the current segment + * being read by the archive streamer or if reading of the + * archived file has finished. + */ + if (privateInfo->cur_wal != entry || + read_archive_file(privateInfo, READ_CHUNK_SIZE) == 0) + { + char fname[MAXFNAMELEN]; + + XLogFileName(fname, privateInfo->timeline, entry->segno, + WalSegSz); + pg_fatal("could not read file \"%s\" from archive \"%s\": read %lld of %lld", + fname, privateInfo->archive_name, + (long long int) count - nbytes, + (long long int) nbytes); + } + } + } + + /* + * Should have either have successfully read all the requested bytes or + * reported a failure before this point. + */ + Assert(nbytes == 0); + + /* + * NB: We return the fixed value provided as input. Although we could + * return a boolean since we either successfully read the WAL page or + * raise an error, but the caller expects this value to be returned. The + * routine that reads WAL pages from the physical WAL file follows the + * same convention. + */ + return count; +} + +/* + * Reads the archive file and passes it to the archive streamer for + * decompression. + */ +static int +read_archive_file(XLogDumpPrivate *privateInfo, Size count) +{ + int rc; + char *buffer; + + buffer = pg_malloc(READ_CHUNK_SIZE * sizeof(uint8)); + + rc = read(privateInfo->archive_fd, buffer, count); + if (rc < 0) + pg_fatal("could not read file \"%s\": %m", + privateInfo->archive_name); + + /* + * Decompress (if required), and then parse the previously read contents + * of the tar file. + */ + if (rc > 0) + astreamer_content(privateInfo->archive_streamer, NULL, + buffer, rc, ASTREAMER_UNKNOWN); + pg_free(buffer); + + return rc; +} + +/* + * Returns the archived WAL entry from the hash table if it exists. Otherwise, + * it invokes the routine to read the archived file and retrieve the entry if + * it is not already in hash table. + */ +static ArchivedWALEntry * +get_archive_wal_entry(XLogSegNo segno, XLogDumpPrivate *privateInfo) +{ + ArchivedWALEntry *entry = NULL; + char fname[MAXFNAMELEN]; + + /* Search hash table */ + entry = ArchivedWAL_lookup(ArchivedWAL_HTAB, segno); + + if (entry != NULL) + return entry; + + /* Needed WAL yet to be decoded from archive, do the same */ + while (1) + { + entry = privateInfo->cur_wal; + + /* Fetch more data */ + if (entry == NULL || entry->buf.len == 0) + { + if (read_archive_file(privateInfo, READ_CHUNK_SIZE) == 0) + break; /* archive file ended */ + } + + /* + * Either, here for the first time, or the archived streamer is + * reading a non-WAL file or an irrelevant WAL file. + */ + if (entry == NULL) + continue; + + /* Found the required entry */ + if (entry->segno == segno) + return entry; + + /* + * Ignore if the timeline is different or the current segment is not + * the desired one. + */ + if (privateInfo->timeline != entry->timeline || + privateInfo->startSegNo > entry->segno || + privateInfo->endSegNo < entry->segno) + { + privateInfo->cur_wal = NULL; + continue; + } + + /* WAL segments must be archived in order */ + pg_log_error("WAL files are not archived in sequential order"); + pg_log_error_detail("Expecting segment number " UINT64_FORMAT " but found " UINT64_FORMAT ".", + segno, entry->segno); + exit(1); + } + + /* Requested WAL segment not found */ + XLogFileName(fname, privateInfo->timeline, segno, WalSegSz); + pg_fatal("could not find file \"%s\" in archive", fname); +} + +/* + * Create an astreamer that can read WAL from tar file. + */ +static astreamer * +astreamer_waldump_new(XLogDumpPrivate *privateInfo) +{ + astreamer_waldump *streamer; + + streamer = palloc0(sizeof(astreamer_waldump)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_waldump_ops; + + streamer->privateInfo = privateInfo; + + return &streamer->base; +} + +/* + * Main entry point of the archive streamer for reading WAL data from a tar + * file. If a member is identified as a valid WAL file, a hash entry is created + * for it, and its contents are copied into that entry's buffer, making them + * accessible to the decoding routine. + */ +static void +astreamer_waldump_content(astreamer *streamer, astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) +{ + astreamer_waldump *mystreamer = (astreamer_waldump *) streamer; + XLogDumpPrivate *privateInfo = mystreamer->privateInfo; + + Assert(context != ASTREAMER_UNKNOWN); + + switch (context) + { + case ASTREAMER_MEMBER_HEADER: + { + XLogSegNo segno; + TimeLineID timeline; + ArchivedWALEntry *entry; + bool found; + + pg_log_debug("reading \"%s\"", member->pathname); + + if (!member_is_wal_file(mystreamer, member, + &segno, &timeline)) + break; + + entry = ArchivedWAL_insert(ArchivedWAL_HTAB, segno, &found); + + /* + * Shouldn't happen, but if it does, simply ignore the + * duplicate WAL file. + */ + if (found) + { + pg_log_warning("ignoring duplicate WAL file found in archive: \"%s\"", + member->pathname); + break; + } + + initStringInfo(&entry->buf); + entry->timeline = timeline; + entry->total_read = 0; + + privateInfo->cur_wal = entry; + } + break; + + case ASTREAMER_MEMBER_CONTENTS: + if (privateInfo->cur_wal) + { + appendBinaryStringInfo(&privateInfo->cur_wal->buf, data, len); + privateInfo->cur_wal->total_read += len; + } + break; + + case ASTREAMER_MEMBER_TRAILER: + privateInfo->cur_wal = NULL; + break; + + case ASTREAMER_ARCHIVE_TRAILER: + break; + + default: + /* Shouldn't happen. */ + pg_fatal("unexpected state while parsing tar file"); + } +} + +/* + * End-of-stream processing for a astreamer_waldump stream. + */ +static void +astreamer_waldump_finalize(astreamer *streamer) +{ + Assert(streamer->bbs_next == NULL); +} + +/* + * Free memory associated with a astreamer_waldump stream. + */ +static void +astreamer_waldump_free(astreamer *streamer) +{ + Assert(streamer->bbs_next == NULL); + pfree(streamer); +} + +/* + * Returns true if the archive member name matches the WAL naming format. If + * successful, it also outputs the WAL segment number, and timeline. + */ +static bool +member_is_wal_file(astreamer_waldump *mystreamer, astreamer_member *member, + XLogSegNo *curSegNo, TimeLineID *curTimeline) +{ + int pathlen; + XLogSegNo segNo; + TimeLineID timeline; + char *fname; + + /* We are only interested in normal files. */ + if (member->is_directory || member->is_link) + return false; + + pathlen = strlen(member->pathname); + if (pathlen < XLOG_FNAME_LEN) + return false; + + /* WAL file could be with full path */ + fname = member->pathname + (pathlen - XLOG_FNAME_LEN); + if (!IsXLogFileName(fname)) + return false; + + /* + * XXX: On some systems (e.g., OpenBSD), the tar utility includes + * PaxHeaders when creating an archive. These are special entries that + * store extended metadata for the file entry immediately following them, + * and they share the exact same name as that file. + */ + if (strstr(member->pathname, "PaxHeaders.")) + return false; + + /* Parse position from file */ + XLogFromFileName(fname, &timeline, &segNo, WalSegSz); + + *curSegNo = segNo; + *curTimeline = timeline; + + return true; +} diff --git a/src/bin/pg_waldump/meson.build b/src/bin/pg_waldump/meson.build index 937e0d68841..da00746587c 100644 --- a/src/bin/pg_waldump/meson.build +++ b/src/bin/pg_waldump/meson.build @@ -3,6 +3,7 @@ pg_waldump_sources = files( 'compat.c', 'pg_waldump.c', + 'archive_waldump.c', 'rmgrdesc.c', ) @@ -18,7 +19,7 @@ endif pg_waldump = executable('pg_waldump', pg_waldump_sources, - dependencies: [frontend_code, lz4, zstd], + dependencies: [frontend_code, lz4, zstd, libpq], c_args: ['-DFRONTEND'], # needed for xlogreader et al kwargs: default_bin_args, ) @@ -29,6 +30,7 @@ tests += { 'sd': meson.current_source_dir(), 'bd': meson.current_build_dir(), 'tap': { + 'env': {'TAR': tar.found() ? tar.full_path() : ''}, 'tests': [ 't/001_basic.pl', 't/002_save_fullpage.pl', diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index 0dc28ea360c..7425d386d0c 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -177,7 +177,7 @@ split_path(const char *path, char **dir, char **fname) * * return a read only fd */ -static int +int open_file_in_directory(const char *directory, const char *fname) { int fd = -1; @@ -436,6 +436,44 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, return count; } +/* + * pg_waldump's XLogReaderRoutine->segment_open callback to support dumping WAL + * files from tar archives. + */ +static void +TarWALDumpOpenSegment(XLogReaderState *state, XLogSegNo nextSegNo, + TimeLineID *tli_p) +{ + /* No action needed */ +} + +/* + * pg_waldump's XLogReaderRoutine->segment_close callback. + */ +static void +TarWALDumpCloseSegment(XLogReaderState *state) +{ + /* No action needed */ +} + +/* + * pg_waldump's XLogReaderRoutine->page_read callback to support dumping WAL + * files from tar archives. + */ +static int +TarWALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, + XLogRecPtr targetPtr, char *readBuff) +{ + XLogDumpPrivate *private = state->private_data; + int count = required_read_len(private, targetPagePtr, reqLen); + + if (private->endptr_reached) + return -1; + + /* Read the WAL page from the archive streamer */ + return read_archive_wal_page(private, targetPagePtr, count, readBuff); +} + /* * Boolean to return whether the given WAL record matches a specific relation * and optionally block. @@ -773,8 +811,8 @@ usage(void) printf(_(" -F, --fork=FORK only show records that modify blocks in fork FORK;\n" " valid names are main, fsm, vm, init\n")); printf(_(" -n, --limit=N number of records to display\n")); - printf(_(" -p, --path=PATH directory in which to find WAL segment files or a\n" - " directory with a ./pg_wal that contains such files\n" + printf(_(" -p, --path=PATH tar archive or a directory in which to find WAL segment files or\n" + " a directory with a ./pg_wal that contains such files\n" " (default: current directory, ./pg_wal, $PGDATA/pg_wal)\n")); printf(_(" -q, --quiet do not print any output, except for errors\n")); printf(_(" -r, --rmgr=RMGR only show records generated by resource manager RMGR;\n" @@ -806,7 +844,10 @@ main(int argc, char **argv) XLogRecord *record; XLogRecPtr first_record; char *waldir = NULL; + char *walpath = NULL; char *errormsg; + bool is_archive = false; + pg_compress_algorithm compression; static struct option long_options[] = { {"bkp-details", no_argument, NULL, 'b'}, @@ -938,7 +979,7 @@ main(int argc, char **argv) } break; case 'p': - waldir = pg_strdup(optarg); + walpath = pg_strdup(optarg); break; case 'q': config.quiet = true; @@ -1102,10 +1143,27 @@ main(int argc, char **argv) goto bad_argument; } - if (waldir != NULL) + if (walpath != NULL) { + /* validate path points to tar archive */ + if (is_archive_file(walpath, &compression)) + { + char *fname = NULL; + + split_path(walpath, &waldir, &fname); + + /* + * A NULL WAL directory indicates that the archive file is located + * in the current working directory of the pg_waldump execution + */ + if (waldir == NULL) + waldir = pg_strdup("."); + + private.archive_name = fname; + is_archive = true; + } /* validate path points to directory */ - if (!verify_directory(waldir)) + else if (!verify_directory(walpath)) { pg_log_error("could not open directory \"%s\": %m", waldir); goto bad_argument; @@ -1123,6 +1181,17 @@ main(int argc, char **argv) int fd; XLogSegNo segno; + /* + * If a tar archive is passed using the --path option, all other + * arguments become unnecessary. + */ + if (is_archive) + { + pg_log_error("unnecessary command-line arguments specified with tar archive (first is \"%s\")", + argv[optind]); + goto bad_argument; + } + split_path(argv[optind], &directory, &fname); if (waldir == NULL && directory != NULL) @@ -1133,69 +1202,78 @@ main(int argc, char **argv) pg_fatal("could not open directory \"%s\": %m", waldir); } - waldir = identify_target_directory(waldir, fname); - fd = open_file_in_directory(waldir, fname); - if (fd < 0) - pg_fatal("could not open file \"%s\"", fname); - close(fd); - - /* parse position from file */ - XLogFromFileName(fname, &private.timeline, &segno, WalSegSz); - - if (!XLogRecPtrIsValid(private.startptr)) - XLogSegNoOffsetToRecPtr(segno, 0, WalSegSz, private.startptr); - else if (!XLByteInSeg(private.startptr, segno, WalSegSz)) + if (fname != NULL && is_archive_file(fname, &compression)) { - pg_log_error("start WAL location %X/%08X is not inside file \"%s\"", - LSN_FORMAT_ARGS(private.startptr), - fname); - goto bad_argument; + waldir = walpath ? pg_strdup(walpath) : pg_strdup("."); + private.archive_name = fname; + is_archive = true; } - - /* no second file specified, set end position */ - if (!(optind + 1 < argc) && !XLogRecPtrIsValid(private.endptr)) - XLogSegNoOffsetToRecPtr(segno + 1, 0, WalSegSz, private.endptr); - - /* parse ENDSEG if passed */ - if (optind + 1 < argc) + else { - XLogSegNo endsegno; - - /* ignore directory, already have that */ - split_path(argv[optind + 1], &directory, &fname); - + waldir = identify_target_directory(waldir, fname); fd = open_file_in_directory(waldir, fname); if (fd < 0) pg_fatal("could not open file \"%s\"", fname); close(fd); /* parse position from file */ - XLogFromFileName(fname, &private.timeline, &endsegno, WalSegSz); + XLogFromFileName(fname, &private.timeline, &segno, WalSegSz); - if (endsegno < segno) - pg_fatal("ENDSEG %s is before STARTSEG %s", - argv[optind + 1], argv[optind]); + if (!XLogRecPtrIsValid(private.startptr)) + XLogSegNoOffsetToRecPtr(segno, 0, WalSegSz, private.startptr); + else if (!XLByteInSeg(private.startptr, segno, WalSegSz)) + { + pg_log_error("start WAL location %X/%08X is not inside file \"%s\"", + LSN_FORMAT_ARGS(private.startptr), + fname); + goto bad_argument; + } - if (!XLogRecPtrIsValid(private.endptr)) - XLogSegNoOffsetToRecPtr(endsegno + 1, 0, WalSegSz, - private.endptr); + /* no second file specified, set end position */ + if (!(optind + 1 < argc) && !XLogRecPtrIsValid(private.endptr)) + XLogSegNoOffsetToRecPtr(segno + 1, 0, WalSegSz, private.endptr); - /* set segno to endsegno for check of --end */ - segno = endsegno; - } + /* parse ENDSEG if passed */ + if (optind + 1 < argc) + { + XLogSegNo endsegno; + /* ignore directory, already have that */ + split_path(argv[optind + 1], &directory, &fname); - if (!XLByteInSeg(private.endptr, segno, WalSegSz) && - private.endptr != (segno + 1) * WalSegSz) - { - pg_log_error("end WAL location %X/%08X is not inside file \"%s\"", - LSN_FORMAT_ARGS(private.endptr), - argv[argc - 1]); - goto bad_argument; + fd = open_file_in_directory(waldir, fname); + if (fd < 0) + pg_fatal("could not open file \"%s\"", fname); + close(fd); + + /* parse position from file */ + XLogFromFileName(fname, &private.timeline, &endsegno, WalSegSz); + + if (endsegno < segno) + pg_fatal("ENDSEG %s is before STARTSEG %s", + argv[optind + 1], argv[optind]); + + if (!XLogRecPtrIsValid(private.endptr)) + XLogSegNoOffsetToRecPtr(endsegno + 1, 0, WalSegSz, + private.endptr); + + /* set segno to endsegno for check of --end */ + segno = endsegno; + } + + + if (!XLByteInSeg(private.endptr, segno, WalSegSz) && + private.endptr != (segno + 1) * WalSegSz) + { + pg_log_error("end WAL location %X/%08X is not inside file \"%s\"", + LSN_FORMAT_ARGS(private.endptr), + argv[argc - 1]); + goto bad_argument; + } } } - else - waldir = identify_target_directory(waldir, NULL); + else if (!is_archive) + waldir = identify_target_directory(walpath, NULL); /* we don't know what to print */ if (!XLogRecPtrIsValid(private.startptr)) @@ -1207,12 +1285,30 @@ main(int argc, char **argv) /* done with argument parsing, do the actual work */ /* we have everything we need, start reading */ - xlogreader_state = - XLogReaderAllocate(WalSegSz, waldir, - XL_ROUTINE(.page_read = WALDumpReadPage, - .segment_open = WALDumpOpenSegment, - .segment_close = WALDumpCloseSegment), - &private); + if (is_archive) + { + /* Set up for reading tar file */ + init_archive_reader(&private, waldir, compression); + + /* Routine to decode WAL files in tar archive */ + xlogreader_state = + XLogReaderAllocate(WalSegSz, waldir, + XL_ROUTINE(.page_read = TarWALDumpReadPage, + .segment_open = TarWALDumpOpenSegment, + .segment_close = TarWALDumpCloseSegment), + &private); + } + else + { + /* Routine to decode WAL files */ + xlogreader_state = + XLogReaderAllocate(WalSegSz, waldir, + XL_ROUTINE(.page_read = WALDumpReadPage, + .segment_open = WALDumpOpenSegment, + .segment_close = WALDumpCloseSegment), + &private); + } + if (!xlogreader_state) pg_fatal("out of memory while allocating a WAL reading processor"); @@ -1321,6 +1417,9 @@ main(int argc, char **argv) XLogReaderFree(xlogreader_state); + if (is_archive) + free_archive_reader(&private); + return EXIT_SUCCESS; bad_argument: diff --git a/src/bin/pg_waldump/pg_waldump.h b/src/bin/pg_waldump/pg_waldump.h index 9e62b64ead5..54758c3548a 100644 --- a/src/bin/pg_waldump/pg_waldump.h +++ b/src/bin/pg_waldump/pg_waldump.h @@ -12,9 +12,13 @@ #define PG_WALDUMP_H #include "access/xlogdefs.h" +#include "fe_utils/astreamer.h" extern int WalSegSz; +/* Forward declaration */ +struct ArchivedWALEntry; + /* Contains the necessary information to drive WAL decoding */ typedef struct XLogDumpPrivate { @@ -22,6 +26,36 @@ typedef struct XLogDumpPrivate XLogRecPtr startptr; XLogRecPtr endptr; bool endptr_reached; + + /* Fields required to read WAL from archive */ + char *archive_name; /* Tar archive name */ + int archive_fd; /* File descriptor for the open tar file */ + + astreamer *archive_streamer; + + /* What the archive streamer is currently reading */ + struct ArchivedWALEntry *cur_wal; + + /* + * Although these values can be easily derived from startptr and endptr, + * doing so repeatedly for each archived member would be inefficient, as + * it would involve recalculating and filtering out irrelevant WAL + * segments. + */ + XLogSegNo startSegNo; + XLogSegNo endSegNo; } XLogDumpPrivate; -#endif /* end of PG_WALDUMP_H */ +extern int open_file_in_directory(const char *directory, const char *fname); + +extern bool is_archive_file(const char *fname, + pg_compress_algorithm *compression); +extern void init_archive_reader(XLogDumpPrivate *privateInfo, + const char *waldir, + pg_compress_algorithm compression); +extern void free_archive_reader(XLogDumpPrivate *privateInfo); +extern int read_archive_wal_page(XLogDumpPrivate *privateInfo, + XLogRecPtr targetPagePtr, + Size count, char *readBuff); + +#endif /* end of PG_WALDUMP_H */ diff --git a/src/bin/pg_waldump/t/001_basic.pl b/src/bin/pg_waldump/t/001_basic.pl index 1b712e8d74d..443126a9ce6 100644 --- a/src/bin/pg_waldump/t/001_basic.pl +++ b/src/bin/pg_waldump/t/001_basic.pl @@ -3,10 +3,13 @@ use strict; use warnings FATAL => 'all'; +use Cwd; use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; +my $tar = $ENV{TAR}; + program_help_ok('pg_waldump'); program_version_ok('pg_waldump'); program_options_handling_ok('pg_waldump'); @@ -235,7 +238,7 @@ command_like( sub test_pg_waldump { local $Test::Builder::Level = $Test::Builder::Level + 1; - my @opts = @_; + my ($path, @opts) = @_; my ($stdout, $stderr); @@ -243,6 +246,7 @@ sub test_pg_waldump 'pg_waldump', '--start' => $start_lsn, '--end' => $end_lsn, + '--path' => $path, @opts ], '>' => \$stdout, @@ -254,11 +258,50 @@ sub test_pg_waldump return @lines; } -my @lines; +# Create a tar archive, sorting the file order +sub generate_archive +{ + my ($archive, $directory, $compression_flags) = @_; + + my @files; + opendir my $dh, $directory or die "opendir: $!"; + while (my $entry = readdir $dh) { + # Skip '.' and '..' + next if $entry eq '.' || $entry eq '..'; + push @files, $entry; + } + closedir $dh; + + @files = sort @files; + + # move into the WAL directory before archiving files + my $cwd = getcwd; + chdir($directory) || die "chdir: $!"; + command_ok([$tar, $compression_flags, $archive, @files]); + chdir($cwd) || die "chdir: $!"; +} + +my $tmp_dir = PostgreSQL::Test::Utils::tempdir_short(); my @scenario = ( { - 'path' => $node->data_dir + 'path' => $node->data_dir, + 'is_archive' => 0, + 'enabled' => 1 + }, + { + 'path' => "$tmp_dir/pg_wal.tar", + 'compression_method' => 'none', + 'compression_flags' => '-cf', + 'is_archive' => 1, + 'enabled' => 1 + }, + { + 'path' => "$tmp_dir/pg_wal.tar.gz", + 'compression_method' => 'gzip', + 'compression_flags' => '-czf', + 'is_archive' => 1, + 'enabled' => check_pg_config("#define HAVE_LIBZ 1") }); for my $scenario (@scenario) @@ -267,6 +310,19 @@ for my $scenario (@scenario) SKIP: { + skip "tar command is not available", 3 + if !defined $tar; + skip "$scenario->{'compression_method'} compression not supported by this build", 3 + if !$scenario->{'enabled'} && $scenario->{'is_archive'}; + + # create pg_wal archive + if ($scenario->{'is_archive'}) + { + generate_archive($path, + $node->data_dir . '/pg_wal', + $scenario->{'compression_flags'}); + } + command_fails_like( [ 'pg_waldump', '--path' => $path ], qr/error: no start WAL location given/, @@ -298,38 +354,42 @@ for my $scenario (@scenario) qr/error: error in WAL record at/, 'errors are shown with --quiet'); - @lines = test_pg_waldump('--path' => $path); + my @lines; + @lines = test_pg_waldump($path); is(grep(!/^rmgr: \w/, @lines), 0, 'all output lines are rmgr lines'); - @lines = test_pg_waldump('--path' => $path, '--limit' => 6); + @lines = test_pg_waldump($path, '--limit' => 6); is(@lines, 6, 'limit option observed'); - @lines = test_pg_waldump('--path' => $path, '--fullpage'); + @lines = test_pg_waldump($path, '--fullpage'); is(grep(!/^rmgr:.*\bFPW\b/, @lines), 0, 'all output lines are FPW'); - @lines = test_pg_waldump('--path' => $path, '--stats'); + @lines = test_pg_waldump($path, '--stats'); like($lines[0], qr/WAL statistics/, "statistics on stdout"); is(grep(/^rmgr:/, @lines), 0, 'no rmgr lines output'); - @lines = test_pg_waldump('--path' => $path, '--stats=record'); + @lines = test_pg_waldump($path, '--stats=record'); like($lines[0], qr/WAL statistics/, "statistics on stdout"); is(grep(/^rmgr:/, @lines), 0, 'no rmgr lines output'); - @lines = test_pg_waldump('--path' => $path, '--rmgr' => 'Btree'); + @lines = test_pg_waldump($path, '--rmgr' => 'Btree'); is(grep(!/^rmgr: Btree/, @lines), 0, 'only Btree lines'); - @lines = test_pg_waldump('--path' => $path, '--fork' => 'init'); + @lines = test_pg_waldump($path, '--fork' => 'init'); is(grep(!/fork init/, @lines), 0, 'only init fork lines'); - @lines = test_pg_waldump('--path' => $path, + @lines = test_pg_waldump($path, '--relation' => "$default_ts_oid/$postgres_db_oid/$rel_t1_oid"); is(grep(!/rel $default_ts_oid\/$postgres_db_oid\/$rel_t1_oid/, @lines), 0, 'only lines for selected relation'); - @lines = test_pg_waldump('--path' => $path, + @lines = test_pg_waldump($path, '--relation' => "$default_ts_oid/$postgres_db_oid/$rel_i1a_oid", '--block' => 1); is(grep(!/\bblk 1\b/, @lines), 0, 'only lines for selected block'); + + # Cleanup. + unlink $path if $scenario->{'is_archive'}; } } diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 23bce72ae64..0c8d6bfa3e1 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -139,6 +139,8 @@ ArchiveOpts ArchiveShutdownCB ArchiveStartupCB ArchiveStreamState +ArchivedWALEntry +ArchivedWAL_hash ArchiverOutput ArchiverStage ArrayAnalyzeExtraData @@ -3461,6 +3463,7 @@ astreamer_recovery_injector astreamer_tar_archiver astreamer_tar_parser astreamer_verify +astreamer_waldump astreamer_zstd_frame auth_password_hook_typ autovac_table -- 2.47.1