From 49239fdc27ac26ef0acd5500c8486f2aeccf1a39 Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Thu, 7 May 2020 15:18:39 -0400 Subject: [PATCH v1 08/11] Convert progress-reporting code to a bbsink. --- src/backend/replication/Makefile | 1 + src/backend/replication/basebackup.c | 104 +------ src/backend/replication/basebackup_progress.c | 287 ++++++++++++++++++ src/include/replication/basebackup_sink.h | 8 + 4 files changed, 304 insertions(+), 96 deletions(-) create mode 100644 src/backend/replication/basebackup_progress.c diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile index 58b6c228bb..7de4f82882 100644 --- a/src/backend/replication/Makefile +++ b/src/backend/replication/Makefile @@ -18,6 +18,7 @@ OBJS = \ backup_manifest.o \ basebackup.o \ basebackup_libpq.o \ + basebackup_progress.o \ basebackup_sink.o \ basebackup_throttle.o \ repl_gram.o \ diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index 6fe0da2f49..1655806f1f 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -18,7 +18,6 @@ #include "access/xlog_internal.h" /* for pg_start/stop_backup */ #include "common/file_perm.h" -#include "commands/progress.h" #include "lib/stringinfo.h" #include "miscadmin.h" #include "nodes/pg_list.h" @@ -74,7 +73,6 @@ static void convert_link_to_directory(const char *pathbuf, struct stat *statbuf) static void perform_base_backup(basebackup_options *opt); static void parse_basebackup_options(List *options, basebackup_options *opt); static int compareWalFileNames(const ListCell *a, const ListCell *b); -static void update_basebackup_progress(int64 delta); static bool is_checksummed_file(const char *fullpath, const char *filename); /* Was the backup currently in-progress initiated in recovery mode? */ @@ -109,15 +107,6 @@ static long long int total_checksum_failures; /* Do not verify checksums. */ static bool noverify_checksums = false; -/* - * Total amount of backup data that will be streamed. - * -1 means that the size is not estimated. - */ -static int64 backup_total = 0; - -/* Amount of backup data already streamed */ -static int64 backup_streamed = 0; - /* * Definition of one element part of an exclusion list, used for paths part * of checksum validation or base backups. "name" is the name of the file @@ -252,26 +241,14 @@ perform_base_backup(basebackup_options *opt) int datadirpathlen; List *tablespaces = NIL; bbsink *sink = bbsink_libpq_new(); + bbsink *progress_sink; /* Set up network throttling, if client requested it */ if (opt->maxrate > 0) sink = bbsink_throttle_new(sink, opt->maxrate); - backup_total = 0; - backup_streamed = 0; - pgstat_progress_start_command(PROGRESS_COMMAND_BASEBACKUP, InvalidOid); - - /* - * If the estimation of the total backup size is disabled, make the - * backup_total column in the view return NULL by setting the parameter to - * -1. - */ - if (!opt->progress) - { - backup_total = -1; - pgstat_progress_update_param(PROGRESS_BASEBACKUP_BACKUP_TOTAL, - backup_total); - } + /* Set up progress reporting. */ + sink = progress_sink = bbsink_progress_new(sink, opt->progress); /* we're going to use a BufFile, so we need a ResourceOwner */ Assert(CurrentResourceOwner == NULL); @@ -288,8 +265,7 @@ perform_base_backup(basebackup_options *opt) total_checksum_failures = 0; - pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE, - PROGRESS_BASEBACKUP_PHASE_WAIT_CHECKPOINT); + basebackup_progress_wait_checkpoint(); startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &starttli, labelfile, &tablespaces, tblspc_map_file, opt->sendtblspcmapfile); @@ -305,7 +281,6 @@ perform_base_backup(basebackup_options *opt) { ListCell *lc; tablespaceinfo *ti; - int tblspc_streamed = 0; /* * Calculate the relative path of temporary statistics directory in @@ -330,8 +305,7 @@ perform_base_backup(basebackup_options *opt) */ if (opt->progress) { - pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE, - PROGRESS_BASEBACKUP_PHASE_ESTIMATE_BACKUP_SIZE); + basebackup_progress_estimate_backup_size(); foreach(lc, tablespaces) { @@ -343,25 +317,9 @@ perform_base_backup(basebackup_options *opt) else tmp->size = sendTablespace(sink, tmp->path, tmp->oid, true, NULL); - backup_total += tmp->size; } } - /* Report that we are now streaming database files as a base backup */ - { - const int index[] = { - PROGRESS_BASEBACKUP_PHASE, - PROGRESS_BASEBACKUP_BACKUP_TOTAL, - PROGRESS_BASEBACKUP_TBLSPC_TOTAL - }; - const int64 val[] = { - PROGRESS_BASEBACKUP_PHASE_STREAM_BACKUP, - backup_total, list_length(tablespaces) - }; - - pgstat_progress_update_multi_param(3, index, val); - } - /* notify basebackup sink about start of backup */ bbsink_begin_backup(sink, startptr, starttli, tablespaces); @@ -423,14 +381,9 @@ perform_base_backup(basebackup_options *opt) } else bbsink_end_archive(sink); - - tblspc_streamed++; - pgstat_progress_update_param(PROGRESS_BASEBACKUP_TBLSPC_STREAMED, - tblspc_streamed); } - pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE, - PROGRESS_BASEBACKUP_PHASE_WAIT_WAL_ARCHIVE); + basebackup_progress_wait_wal_archive(progress_sink); endptr = do_pg_stop_backup(labelfile->data, !opt->nowait, &endtli); } PG_END_ENSURE_ERROR_CLEANUP(do_pg_abort_backup, BoolGetDatum(false)); @@ -456,8 +409,7 @@ perform_base_backup(basebackup_options *opt) ListCell *lc; TimeLineID tli; - pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE, - PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL); + basebackup_progress_transfer_wal(); /* * I'd rather not worry about timelines here, so scan pg_wal and @@ -605,7 +557,6 @@ perform_base_backup(basebackup_options *opt) { CheckXLogRemoved(segno, tli); bbsink_archive_contents(sink, buf, cnt); - update_basebackup_progress(cnt); len += cnt; @@ -692,7 +643,7 @@ perform_base_backup(basebackup_options *opt) /* clean up the resource owner we created */ WalSndResourceCleanup(true); - pgstat_progress_end_command(); + basebackup_progress_done(); } /* @@ -936,7 +887,6 @@ sendFileWithContent(bbsink *sink, const char *filename, const char *content, _tarWriteHeader(sink, filename, NULL, &statbuf, false); bbsink_archive_contents(sink, content, len); - update_basebackup_progress(len); /* Pad to a multiple of the tar block size. */ pad = tarPaddingBytesRequired(len); @@ -946,7 +896,6 @@ sendFileWithContent(bbsink *sink, const char *filename, const char *content, MemSet(buf, 0, pad); bbsink_archive_contents(sink, buf, pad); - update_basebackup_progress(pad); } pg_checksum_update(&checksum_ctx, (uint8 *) content, len); @@ -1575,7 +1524,6 @@ sendFile(bbsink *sink, const char *readfilename, const char *tarfilename, } bbsink_archive_contents(sink, buf, cnt); - update_basebackup_progress(cnt); /* Also feed it to the checksum machinery. */ pg_checksum_update(&checksum_ctx, (uint8 *) buf, cnt); @@ -1604,7 +1552,6 @@ sendFile(bbsink *sink, const char *readfilename, const char *tarfilename, cnt = Min(sizeof(buf), statbuf->st_size - len); bbsink_archive_contents(sink, buf, cnt); pg_checksum_update(&checksum_ctx, (uint8 *) buf, cnt); - update_basebackup_progress(cnt); len += cnt; } } @@ -1619,7 +1566,6 @@ sendFile(bbsink *sink, const char *readfilename, const char *tarfilename, { MemSet(buf, 0, pad); bbsink_archive_contents(sink, buf, pad); - update_basebackup_progress(pad); } FreeFile(fp); @@ -1677,7 +1623,6 @@ _tarWriteHeader(bbsink *sink, const char *filename, const char *linktarget, } bbsink_archive_contents(sink, h, sizeof(h)); - update_basebackup_progress(sizeof(h)); } return sizeof(h); @@ -1698,36 +1643,3 @@ convert_link_to_directory(const char *pathbuf, struct stat *statbuf) #endif statbuf->st_mode = S_IFDIR | pg_dir_create_mode; } - -/* - * Increment the counter for the amount of data already streamed - * by the given number of bytes, and update the progress report for - * pg_stat_progress_basebackup. - */ -static void -update_basebackup_progress(int64 delta) -{ - const int index[] = { - PROGRESS_BASEBACKUP_BACKUP_STREAMED, - PROGRESS_BASEBACKUP_BACKUP_TOTAL - }; - int64 val[2]; - int nparam = 0; - - backup_streamed += delta; - val[nparam++] = backup_streamed; - - /* - * Avoid overflowing past 100% or the full size. This may make the total - * size number change as we approach the end of the backup (the estimate - * will always be wrong if WAL is included), but that's better than having - * the done column be bigger than the total. - */ - if (backup_total > -1 && backup_streamed > backup_total) - { - backup_total = backup_streamed; - val[nparam++] = backup_total; - } - - pgstat_progress_update_multi_param(nparam, index, val); -} diff --git a/src/backend/replication/basebackup_progress.c b/src/backend/replication/basebackup_progress.c new file mode 100644 index 0000000000..1dcb9d8390 --- /dev/null +++ b/src/backend/replication/basebackup_progress.c @@ -0,0 +1,287 @@ +/*------------------------------------------------------------------------- + * + * basebackup_progress.c + * Basebackup sink implementing progress reporting. Data is forwarded to + * the next base backup sink in the chain and the number of bytes + * forwarded is used to update shared memory progress counters. + * + * Progress reporting requires extra callbacks that most base backup sinks + * don't. Rather than cramming those into the interface, we just have a few + * extra functions here that basebackup.c can call. (We could put the logic + * directly into that file as it's fairly simple, but it seems cleaner to + * have it all in one place.) + * + * Portions Copyright (c) 2010-2020, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/basebackup_progress.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "commands/progress.h" +#include "miscadmin.h" +#include "replication/basebackup.h" +#include "replication/basebackup_sink.h" +#include "pgstat.h" +#include "storage/latch.h" +#include "utils/timestamp.h" + +typedef struct bbsink_progress +{ + /* Common information for all types of sink. */ + bbsink base; + + /* Are we estimating the backup size? */ + bool estimate_backup_size; + + /* + * Estimated total amount of backup data that will be streamed. + * -1 means that the size is not estimated. + */ + int64 backup_total; + + /* Amount of backup data already streamed */ + int64 backup_streamed; + + /* Total number of tablespaces. */ + int tblspc_total; + + /* Number of those that have been streamed. */ + int tblspc_streamed; +} bbsink_progress; + +static void bbsink_progress_begin_backup(bbsink *sink, XLogRecPtr startptr, + TimeLineID starttli, + List *tablespaces); +static void bbsink_progress_archive_contents(bbsink *sink, + const char *data, size_t len); +static void bbsink_progress_end_archive(bbsink *sink); + +const bbsink_ops bbsink_progress_ops = { + .begin_backup = bbsink_progress_begin_backup, + .begin_archive = bbsink_forward_begin_archive, + .archive_contents = bbsink_progress_archive_contents, + .end_archive = bbsink_progress_end_archive, + .begin_manifest = bbsink_forward_begin_manifest, + .manifest_contents = bbsink_forward_manifest_contents, + .end_manifest = bbsink_forward_end_manifest, + .end_backup = bbsink_forward_end_backup +}; + +/* + * Create a new basebackup sink that performs progress reporting and forwards + * data to a successor sink. + */ +bbsink * +bbsink_progress_new(bbsink *next, bool estimate_backup_size) +{ + bbsink_progress *sink; + + Assert(next != NULL); + + sink = palloc0(sizeof(bbsink_progress)); + *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_progress_ops; + sink->base.bbs_next = next; + + sink->estimate_backup_size = estimate_backup_size; + sink->backup_total = -1; + sink->backup_streamed = 0; + + /* + * Report that a base backup is in progress, and set the total size of + * the backup to -1, which will get translated to NULL. If we're estimating + * the backup size, we'll insert the real estimate when we have it. + */ + pgstat_progress_start_command(PROGRESS_COMMAND_BASEBACKUP, InvalidOid); + pgstat_progress_update_param(PROGRESS_BASEBACKUP_BACKUP_TOTAL, + sink->backup_total); + + return &sink->base; +} + +/* + * Progress reporting at start of backup. + */ +static void +bbsink_progress_begin_backup(bbsink *sink, XLogRecPtr startptr, + TimeLineID starttli, List *tablespaces) +{ + const int index[] = { + PROGRESS_BASEBACKUP_PHASE, + PROGRESS_BASEBACKUP_BACKUP_TOTAL, + PROGRESS_BASEBACKUP_TBLSPC_TOTAL + }; + int64 val[3]; + bbsink_progress *mysink = (bbsink_progress *) sink; + + /* Save count of tablespaces. */ + mysink->tblspc_total = list_length(tablespaces); + + /* + * If the sizes of the individual tablespaces are being calculated, add + * them up to get a total size. + */ + if (mysink->estimate_backup_size) + { + ListCell *lc; + + mysink->backup_total = 0; + foreach(lc, tablespaces) + { + tablespaceinfo *ti = (tablespaceinfo *) lfirst(lc); + + mysink->backup_total += ti->size; + } + } + + /* + * Report that we are now streaming database files as a base backup. + * Also advertise the number of tablespaces, and, if known, the estimated + * total backup size. + */ + val[0] = PROGRESS_BASEBACKUP_PHASE_STREAM_BACKUP; + val[1] = mysink->backup_total; + val[2] = mysink->tblspc_total; + pgstat_progress_update_multi_param(3, index, val); + + /* Delegate to next sink. */ + Assert(sink->bbs_next != NULL); + bbsink_begin_backup(sink->bbs_next, startptr, starttli, tablespaces); +} + +/* + * End-of archive progress reporting. + */ +static void +bbsink_progress_end_archive(bbsink *sink) +{ + bbsink_progress *mysink = (bbsink_progress *) sink; + + /* + * We assume that the end of an archive means we've reached the end of a + * tablespace. That's not ideal: we might want to decouple those two + * concepts better. + * + * If WAL is included in the backup, we'll mark the last tablespace + * complete before the last archive is complete, so we need a guard here + * to ensure that the number of tablespaces streamed doesn't exceed the + * total. + */ + if (mysink->tblspc_streamed < mysink->tblspc_total) + { + mysink->tblspc_streamed++; + pgstat_progress_update_param(PROGRESS_BASEBACKUP_TBLSPC_STREAMED, + mysink->tblspc_streamed); + } +} + +/* + * First pass archive contents to next sink, and then perform progress updates. + * + * Increment the counter for the amount of data already streamed + * by the given number of bytes, and update the progress report for + * pg_stat_progress_basebackup. + */ +static void +bbsink_progress_archive_contents(bbsink *sink, const char *data, size_t len) +{ + const int index[] = { + PROGRESS_BASEBACKUP_BACKUP_STREAMED, + PROGRESS_BASEBACKUP_BACKUP_TOTAL + }; + int64 val[2]; + int nparam = 0; + bbsink_progress *mysink = (bbsink_progress *) sink; + + /* First forward to next sink. */ + Assert(sink->bbs_next != NULL); + bbsink_archive_contents(sink->bbs_next, data, len); + + /* Now increment count of what was sent by length of data. */ + mysink->backup_streamed += len; + val[nparam++] = mysink->backup_streamed; + + /* + * Avoid overflowing past 100% or the full size. This may make the total + * size number change as we approach the end of the backup (the estimate + * will always be wrong if WAL is included), but that's better than having + * the done column be bigger than the total. + */ + if (mysink->backup_total > -1 && + mysink->backup_streamed > mysink->backup_total) + { + mysink->backup_total = mysink->backup_streamed; + val[nparam++] = mysink->backup_total; + } + + pgstat_progress_update_multi_param(nparam, index, val); +} + +/* + * Advertise that we are waiting for the start-of-backup checkpoint. + */ +void +basebackup_progress_wait_checkpoint(void) +{ + pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE, + PROGRESS_BASEBACKUP_PHASE_WAIT_CHECKPOINT); +} + +/* + * Advertise that we are estimating the backup size. + */ +void +basebackup_progress_estimate_backup_size(void) +{ + pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE, + PROGRESS_BASEBACKUP_PHASE_ESTIMATE_BACKUP_SIZE); +} + +/* + * Advertise that we are waiting for WAL archiving at end-of-backup. + */ +void +basebackup_progress_wait_wal_archive(bbsink *sink) +{ + const int index[] = { + PROGRESS_BASEBACKUP_PHASE, + PROGRESS_BASEBACKUP_TBLSPC_STREAMED + }; + int64 val[2]; + bbsink_progress *mysink = (bbsink_progress *) sink; + + Assert(mysink->tblspc_streamed >= mysink->tblspc_total - 1); + Assert(mysink->tblspc_streamed <= mysink->tblspc_total); + Assert(sink->bbs_ops == &bbsink_progress_ops); + + /* + * We report having finished all tablespaces at this point, even if + * the archive for the main tablespace is still open, because what's + * going to be added is WAL files, not files that are really from the + * main tablespace. + */ + val[0] = PROGRESS_BASEBACKUP_PHASE_WAIT_WAL_ARCHIVE; + val[1] = mysink->tblspc_total = mysink->tblspc_streamed; + pgstat_progress_update_multi_param(2, index, val); +} + +/* + * Advertise that we are transferring WAL files into the final archive. + */ +void +basebackup_progress_transfer_wal(void) +{ + pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE, + PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL); +} + +/* + * Advertise that we are no longer performing a backup. + */ +void +basebackup_progress_done(void) +{ + pgstat_progress_end_command(); +} diff --git a/src/include/replication/basebackup_sink.h b/src/include/replication/basebackup_sink.h index bc1710e2eb..bf2d71fafa 100644 --- a/src/include/replication/basebackup_sink.h +++ b/src/include/replication/basebackup_sink.h @@ -175,6 +175,14 @@ extern void bbsink_forward_end_backup(bbsink *sink, XLogRecPtr endptr, /* Constructors for various types of sinks. */ extern bbsink *bbsink_libpq_new(void); +extern bbsink *bbsink_progress_new(bbsink *next, bool estimate_backup_size); extern bbsink *bbsink_throttle_new(bbsink *next, uint32 maxrate); +/* Extra interface functions for progress reporting. */ +extern void basebackup_progress_wait_checkpoint(void); +extern void basebackup_progress_estimate_backup_size(void); +extern void basebackup_progress_wait_wal_archive(bbsink *); +extern void basebackup_progress_transfer_wal(void); +extern void basebackup_progress_done(void); + #endif -- 2.24.2 (Apple Git-127)