diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c index 558cc88a08..8cde368a10 100644 --- a/src/backend/access/heap/vacuumlazy.c +++ b/src/backend/access/heap/vacuumlazy.c @@ -24,18 +24,9 @@ * * Lazy vacuum supports parallel execution with parallel worker processes. In * a parallel vacuum, we perform both index vacuum and index cleanup with - * parallel worker processes. Individual indexes are processed by one vacuum - * process. At the beginning of a lazy vacuum (at lazy_scan_heap) we prepare - * the parallel context and initialize the DSM segment that contains shared - * information as well as the memory space for storing dead tuples. When - * starting either index vacuum or index cleanup, we launch parallel worker - * processes. Once all indexes are processed the parallel worker processes - * exit. After that, the leader process re-initializes the parallel context - * so that it can use the same DSM for multiple passes of index vacuum and - * for performing index cleanup. For updating the index statistics, we need - * to update the system table and since updates are not allowed during - * parallel mode we update the index statistics after exiting from the - * parallel mode. + * parallel worker processes. For updating the index statistics, we need to + * update the system table and since updates are not allowed during parallel + * mode we update the index statistics after exiting from the parallel mode. * * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California @@ -143,22 +134,11 @@ */ #define PREFETCH_SIZE ((BlockNumber) 32) -/* - * DSM keys for parallel vacuum. Unlike other parallel execution code, since - * we don't need to worry about DSM keys conflicting with plan_node_id we can - * use small integers. - */ -#define PARALLEL_VACUUM_KEY_SHARED 1 -#define PARALLEL_VACUUM_KEY_DEAD_TUPLES 2 -#define PARALLEL_VACUUM_KEY_QUERY_TEXT 3 -#define PARALLEL_VACUUM_KEY_BUFFER_USAGE 4 -#define PARALLEL_VACUUM_KEY_WAL_USAGE 5 - /* * Macro to check if we are in a parallel vacuum. If true, we are in the * parallel mode and the DSM segment is initialized. */ -#define ParallelVacuumIsActive(vacrel) ((vacrel)->lps != NULL) +#define ParallelVacuumIsActive(vacrel) ((vacrel)->pvc != NULL) /* Phases of vacuum during which we report error context. */ typedef enum @@ -171,137 +151,6 @@ typedef enum VACUUM_ERRCB_PHASE_TRUNCATE } VacErrPhase; -/* - * LVDeadTuples stores the dead tuple TIDs collected during the heap scan. - * This is allocated in the DSM segment in parallel mode and in local memory - * in non-parallel mode. - */ -typedef struct LVDeadTuples -{ - int max_tuples; /* # slots allocated in array */ - int num_tuples; /* current # of entries */ - /* List of TIDs of tuples we intend to delete */ - /* NB: this list is ordered by TID address */ - ItemPointerData itemptrs[FLEXIBLE_ARRAY_MEMBER]; /* array of - * ItemPointerData */ -} LVDeadTuples; - -/* The dead tuple space consists of LVDeadTuples and dead tuple TIDs */ -#define SizeOfDeadTuples(cnt) \ - add_size(offsetof(LVDeadTuples, itemptrs), \ - mul_size(sizeof(ItemPointerData), cnt)) -#define MAXDEADTUPLES(max_size) \ - (((max_size) - offsetof(LVDeadTuples, itemptrs)) / sizeof(ItemPointerData)) - -/* - * Shared information among parallel workers. So this is allocated in the DSM - * segment. - */ -typedef struct LVShared -{ - /* - * Target table relid and log level. These fields are not modified during - * the lazy vacuum. - */ - Oid relid; - int elevel; - - /* - * An indication for vacuum workers to perform either index vacuum or - * index cleanup. first_time is true only if for_cleanup is true and - * bulk-deletion is not performed yet. - */ - bool for_cleanup; - bool first_time; - - /* - * Fields for both index vacuum and cleanup. - * - * reltuples is the total number of input heap tuples. We set either old - * live tuples in the index vacuum case or the new live tuples in the - * index cleanup case. - * - * estimated_count is true if reltuples is an estimated value. (Note that - * reltuples could be -1 in this case, indicating we have no idea.) - */ - double reltuples; - bool estimated_count; - - /* - * In single process lazy vacuum we could consume more memory during index - * vacuuming or cleanup apart from the memory for heap scanning. In - * parallel vacuum, since individual vacuum workers can consume memory - * equal to maintenance_work_mem, the new maintenance_work_mem for each - * worker is set such that the parallel operation doesn't consume more - * memory than single process lazy vacuum. - */ - int maintenance_work_mem_worker; - - /* - * Shared vacuum cost balance. During parallel vacuum, - * VacuumSharedCostBalance points to this value and it accumulates the - * balance of each parallel vacuum worker. - */ - pg_atomic_uint32 cost_balance; - - /* - * Number of active parallel workers. This is used for computing the - * minimum threshold of the vacuum cost balance before a worker sleeps for - * cost-based delay. - */ - pg_atomic_uint32 active_nworkers; - - /* - * Variables to control parallel vacuum. We have a bitmap to indicate - * which index has stats in shared memory. The set bit in the map - * indicates that the particular index supports a parallel vacuum. - */ - pg_atomic_uint32 idx; /* counter for vacuuming and clean up */ - uint32 offset; /* sizeof header incl. bitmap */ - bits8 bitmap[FLEXIBLE_ARRAY_MEMBER]; /* bit map of NULLs */ - - /* Shared index statistics data follows at end of struct */ -} LVShared; - -#define SizeOfLVShared (offsetof(LVShared, bitmap) + sizeof(bits8)) -#define GetSharedIndStats(s) \ - ((LVSharedIndStats *)((char *)(s) + ((LVShared *)(s))->offset)) -#define IndStatsIsNull(s, i) \ - (!(((LVShared *)(s))->bitmap[(i) >> 3] & (1 << ((i) & 0x07)))) - -/* - * Struct for an index bulk-deletion statistic used for parallel vacuum. This - * is allocated in the DSM segment. - */ -typedef struct LVSharedIndStats -{ - bool updated; /* are the stats updated? */ - IndexBulkDeleteResult istat; -} LVSharedIndStats; - -/* Struct for maintaining a parallel vacuum state. */ -typedef struct LVParallelState -{ - ParallelContext *pcxt; - - /* Shared information among parallel vacuum workers */ - LVShared *lvshared; - - /* Points to buffer usage area in DSM */ - BufferUsage *buffer_usage; - - /* Points to WAL usage area in DSM */ - WalUsage *wal_usage; - - /* - * The number of indexes that support parallel index bulk-deletion and - * parallel index cleanup respectively. - */ - int nindexes_parallel_bulkdel; - int nindexes_parallel_cleanup; - int nindexes_parallel_condcleanup; -} LVParallelState; - typedef struct LVRelState { /* Target heap relation and its indexes */ @@ -321,7 +170,7 @@ typedef struct LVRelState /* Buffer access strategy and parallel state */ BufferAccessStrategy bstrategy; - LVParallelState *lps; + ParallelVacuumContext *pvc; /* rel's initial relfrozenxid and relminmxid */ TransactionId relfrozenxid; @@ -345,7 +194,7 @@ typedef struct LVRelState /* * State managed by lazy_scan_heap() follows */ - LVDeadTuples *dead_tuples; /* items to vacuum from indexes */ + VacDeadTuples *dead_tuples; /* items to vacuum from indexes */ BlockNumber rel_pages; /* total number of pages */ BlockNumber scanned_pages; /* number of pages we examined */ BlockNumber pinskipped_pages; /* # of pages skipped due to a pin */ @@ -416,18 +265,6 @@ static int lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno, static bool lazy_check_needs_freeze(Buffer buf, bool *hastup, LVRelState *vacrel); static bool lazy_check_wraparound_failsafe(LVRelState *vacrel); -static void do_parallel_lazy_vacuum_all_indexes(LVRelState *vacrel); -static void do_parallel_lazy_cleanup_all_indexes(LVRelState *vacrel); -static void do_parallel_vacuum_or_cleanup(LVRelState *vacrel, int nworkers); -static void do_parallel_processing(LVRelState *vacrel, - LVShared *lvshared); -static void do_serial_processing_for_unsafe_indexes(LVRelState *vacrel, - LVShared *lvshared); -static IndexBulkDeleteResult *parallel_process_one_index(Relation indrel, - IndexBulkDeleteResult *istat, - LVShared *lvshared, - LVSharedIndStats *shared_indstats, - LVRelState *vacrel); static void lazy_cleanup_all_indexes(LVRelState *vacrel); static IndexBulkDeleteResult *lazy_vacuum_one_index(Relation indrel, IndexBulkDeleteResult *istat, @@ -446,20 +283,9 @@ static long compute_max_dead_tuples(BlockNumber relblocks, bool hasindex); static void lazy_space_alloc(LVRelState *vacrel, int nworkers, BlockNumber relblocks); static void lazy_space_free(LVRelState *vacrel); -static bool lazy_tid_reaped(ItemPointer itemptr, void *state); -static int vac_cmp_itemptr(const void *left, const void *right); static bool heap_page_is_all_visible(LVRelState *vacrel, Buffer buf, TransactionId *visibility_cutoff_xid, bool *all_frozen); -static int compute_parallel_vacuum_workers(LVRelState *vacrel, - int nrequested, - bool *will_parallel_vacuum); static void update_index_statistics(LVRelState *vacrel); -static LVParallelState *begin_parallel_vacuum(LVRelState *vacrel, - BlockNumber nblocks, - int nrequested); -static void end_parallel_vacuum(LVRelState *vacrel); -static LVSharedIndStats *parallel_stats_for_idx(LVShared *lvshared, int getidx); -static bool parallel_processing_is_safe(Relation indrel, LVShared *lvshared); static void vacuum_error_callback(void *arg); static void update_vacuum_error_info(LVRelState *vacrel, LVSavedErrInfo *saved_vacrel, @@ -905,7 +731,7 @@ heap_vacuum_rel(Relation rel, VacuumParams *params, static void lazy_scan_heap(LVRelState *vacrel, VacuumParams *params, bool aggressive) { - LVDeadTuples *dead_tuples; + VacDeadTuples *dead_tuples; BlockNumber nblocks, blkno, next_unskippable_block, @@ -2039,7 +1865,7 @@ retry: */ if (lpdead_items > 0) { - LVDeadTuples *dead_tuples = vacrel->dead_tuples; + VacDeadTuples *dead_tuples = vacrel->dead_tuples; ItemPointerData tmp; Assert(!prunestate->all_visible); @@ -2251,7 +2077,7 @@ lazy_vacuum_all_indexes(LVRelState *vacrel) else { /* Outsource everything to parallel variant */ - do_parallel_lazy_vacuum_all_indexes(vacrel); + perform_parallel_index_bulkdel(vacrel->pvc, vacrel->old_live_tuples); /* * Do a postcheck to consider applying wraparound failsafe now. Note @@ -2404,7 +2230,7 @@ static int lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno, Buffer buffer, int tupindex, Buffer *vmbuffer) { - LVDeadTuples *dead_tuples = vacrel->dead_tuples; + VacDeadTuples *dead_tuples = vacrel->dead_tuples; Page page = BufferGetPage(buffer); OffsetNumber unused[MaxHeapTuplesPerPage]; int uncnt = 0; @@ -2625,351 +2451,6 @@ lazy_check_wraparound_failsafe(LVRelState *vacrel) return false; } -/* - * Perform lazy_vacuum_all_indexes() steps in parallel - */ -static void -do_parallel_lazy_vacuum_all_indexes(LVRelState *vacrel) -{ - /* Tell parallel workers to do index vacuuming */ - vacrel->lps->lvshared->for_cleanup = false; - vacrel->lps->lvshared->first_time = false; - - /* - * We can only provide an approximate value of num_heap_tuples, at least - * for now. Matches serial VACUUM case. - */ - vacrel->lps->lvshared->reltuples = vacrel->old_live_tuples; - vacrel->lps->lvshared->estimated_count = true; - - do_parallel_vacuum_or_cleanup(vacrel, - vacrel->lps->nindexes_parallel_bulkdel); -} - -/* - * Perform lazy_cleanup_all_indexes() steps in parallel - */ -static void -do_parallel_lazy_cleanup_all_indexes(LVRelState *vacrel) -{ - int nworkers; - - /* - * If parallel vacuum is active we perform index cleanup with parallel - * workers. - * - * Tell parallel workers to do index cleanup. - */ - vacrel->lps->lvshared->for_cleanup = true; - vacrel->lps->lvshared->first_time = (vacrel->num_index_scans == 0); - - /* - * Now we can provide a better estimate of total number of surviving - * tuples (we assume indexes are more interested in that than in the - * number of nominally live tuples). - */ - vacrel->lps->lvshared->reltuples = vacrel->new_rel_tuples; - vacrel->lps->lvshared->estimated_count = - (vacrel->tupcount_pages < vacrel->rel_pages); - - /* Determine the number of parallel workers to launch */ - if (vacrel->lps->lvshared->first_time) - nworkers = vacrel->lps->nindexes_parallel_cleanup + - vacrel->lps->nindexes_parallel_condcleanup; - else - nworkers = vacrel->lps->nindexes_parallel_cleanup; - - do_parallel_vacuum_or_cleanup(vacrel, nworkers); -} - -/* - * Perform index vacuum or index cleanup with parallel workers. This function - * must be used by the parallel vacuum leader process. The caller must set - * lps->lvshared->for_cleanup to indicate whether to perform vacuum or - * cleanup. - */ -static void -do_parallel_vacuum_or_cleanup(LVRelState *vacrel, int nworkers) -{ - LVParallelState *lps = vacrel->lps; - - Assert(!IsParallelWorker()); - Assert(ParallelVacuumIsActive(vacrel)); - Assert(vacrel->nindexes > 0); - - /* The leader process will participate */ - nworkers--; - - /* - * It is possible that parallel context is initialized with fewer workers - * than the number of indexes that need a separate worker in the current - * phase, so we need to consider it. See compute_parallel_vacuum_workers. - */ - nworkers = Min(nworkers, lps->pcxt->nworkers); - - /* Setup the shared cost-based vacuum delay and launch workers */ - if (nworkers > 0) - { - if (vacrel->num_index_scans > 0) - { - /* Reset the parallel index processing counter */ - pg_atomic_write_u32(&(lps->lvshared->idx), 0); - - /* Reinitialize the parallel context to relaunch parallel workers */ - ReinitializeParallelDSM(lps->pcxt); - } - - /* - * Set up shared cost balance and the number of active workers for - * vacuum delay. We need to do this before launching workers as - * otherwise, they might not see the updated values for these - * parameters. - */ - pg_atomic_write_u32(&(lps->lvshared->cost_balance), VacuumCostBalance); - pg_atomic_write_u32(&(lps->lvshared->active_nworkers), 0); - - /* - * The number of workers can vary between bulkdelete and cleanup - * phase. - */ - ReinitializeParallelWorkers(lps->pcxt, nworkers); - - LaunchParallelWorkers(lps->pcxt); - - if (lps->pcxt->nworkers_launched > 0) - { - /* - * Reset the local cost values for leader backend as we have - * already accumulated the remaining balance of heap. - */ - VacuumCostBalance = 0; - VacuumCostBalanceLocal = 0; - - /* Enable shared cost balance for leader backend */ - VacuumSharedCostBalance = &(lps->lvshared->cost_balance); - VacuumActiveNWorkers = &(lps->lvshared->active_nworkers); - } - - if (lps->lvshared->for_cleanup) - ereport(elevel, - (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)", - "launched %d parallel vacuum workers for index cleanup (planned: %d)", - lps->pcxt->nworkers_launched), - lps->pcxt->nworkers_launched, nworkers))); - else - ereport(elevel, - (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)", - "launched %d parallel vacuum workers for index vacuuming (planned: %d)", - lps->pcxt->nworkers_launched), - lps->pcxt->nworkers_launched, nworkers))); - } - - /* Process the indexes that can be processed by only leader process */ - do_serial_processing_for_unsafe_indexes(vacrel, lps->lvshared); - - /* - * Join as a parallel worker. The leader process alone processes all the - * indexes in the case where no workers are launched. - */ - do_parallel_processing(vacrel, lps->lvshared); - - /* - * Next, accumulate buffer and WAL usage. (This must wait for the workers - * to finish, or we might get incomplete data.) - */ - if (nworkers > 0) - { - /* Wait for all vacuum workers to finish */ - WaitForParallelWorkersToFinish(lps->pcxt); - - for (int i = 0; i < lps->pcxt->nworkers_launched; i++) - InstrAccumParallelQuery(&lps->buffer_usage[i], &lps->wal_usage[i]); - } - - /* - * Carry the shared balance value to heap scan and disable shared costing - */ - if (VacuumSharedCostBalance) - { - VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance); - VacuumSharedCostBalance = NULL; - VacuumActiveNWorkers = NULL; - } -} - -/* - * Index vacuum/cleanup routine used by the leader process and parallel - * vacuum worker processes to process the indexes in parallel. - */ -static void -do_parallel_processing(LVRelState *vacrel, LVShared *lvshared) -{ - /* - * Increment the active worker count if we are able to launch any worker. - */ - if (VacuumActiveNWorkers) - pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1); - - /* Loop until all indexes are vacuumed */ - for (;;) - { - int idx; - LVSharedIndStats *shared_istat; - Relation indrel; - IndexBulkDeleteResult *istat; - - /* Get an index number to process */ - idx = pg_atomic_fetch_add_u32(&(lvshared->idx), 1); - - /* Done for all indexes? */ - if (idx >= vacrel->nindexes) - break; - - /* Get the index statistics space from DSM, if any */ - shared_istat = parallel_stats_for_idx(lvshared, idx); - - /* Skip indexes not participating in parallelism */ - if (shared_istat == NULL) - continue; - - indrel = vacrel->indrels[idx]; - - /* - * Skip processing indexes that are unsafe for workers (these are - * processed in do_serial_processing_for_unsafe_indexes() by leader) - */ - if (!parallel_processing_is_safe(indrel, lvshared)) - continue; - - /* Do vacuum or cleanup of the index */ - istat = vacrel->indstats[idx]; - vacrel->indstats[idx] = parallel_process_one_index(indrel, istat, - lvshared, - shared_istat, - vacrel); - } - - /* - * We have completed the index vacuum so decrement the active worker - * count. - */ - if (VacuumActiveNWorkers) - pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1); -} - -/* - * Perform parallel processing of indexes in leader process. - * - * Handles index vacuuming (or index cleanup) for indexes that are not - * parallel safe. It's possible that this will vary for a given index, based - * on details like whether we're performing for_cleanup processing right now. - * - * Also performs processing of smaller indexes that fell under the size cutoff - * enforced by compute_parallel_vacuum_workers(). These indexes never get a - * slot for statistics in DSM. - */ -static void -do_serial_processing_for_unsafe_indexes(LVRelState *vacrel, LVShared *lvshared) -{ - Assert(!IsParallelWorker()); - - /* - * Increment the active worker count if we are able to launch any worker. - */ - if (VacuumActiveNWorkers) - pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1); - - for (int idx = 0; idx < vacrel->nindexes; idx++) - { - LVSharedIndStats *shared_istat; - Relation indrel; - IndexBulkDeleteResult *istat; - - shared_istat = parallel_stats_for_idx(lvshared, idx); - indrel = vacrel->indrels[idx]; - - /* - * We're only here for the indexes that parallel workers won't - * process. Note that the shared_istat test ensures that we process - * indexes that fell under initial size cutoff. - */ - if (shared_istat != NULL && - parallel_processing_is_safe(indrel, lvshared)) - continue; - - /* Do vacuum or cleanup of the index */ - istat = vacrel->indstats[idx]; - vacrel->indstats[idx] = parallel_process_one_index(indrel, istat, - lvshared, - shared_istat, - vacrel); - } - - /* - * We have completed the index vacuum so decrement the active worker - * count. - */ - if (VacuumActiveNWorkers) - pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1); -} - -/* - * Vacuum or cleanup index either by leader process or by one of the worker - * process. After processing the index this function copies the index - * statistics returned from ambulkdelete and amvacuumcleanup to the DSM - * segment. - */ -static IndexBulkDeleteResult * -parallel_process_one_index(Relation indrel, - IndexBulkDeleteResult *istat, - LVShared *lvshared, - LVSharedIndStats *shared_istat, - LVRelState *vacrel) -{ - IndexBulkDeleteResult *istat_res; - - /* - * Update the pointer to the corresponding bulk-deletion result if someone - * has already updated it - */ - if (shared_istat && shared_istat->updated && istat == NULL) - istat = &shared_istat->istat; - - /* Do vacuum or cleanup of the index */ - if (lvshared->for_cleanup) - istat_res = lazy_cleanup_one_index(indrel, istat, lvshared->reltuples, - lvshared->estimated_count, vacrel); - else - istat_res = lazy_vacuum_one_index(indrel, istat, lvshared->reltuples, - vacrel); - - /* - * Copy the index bulk-deletion result returned from ambulkdelete and - * amvacuumcleanup to the DSM segment if it's the first cycle because they - * allocate locally and it's possible that an index will be vacuumed by a - * different vacuum process the next cycle. Copying the result normally - * happens only the first time an index is vacuumed. For any additional - * vacuum pass, we directly point to the result on the DSM segment and - * pass it to vacuum index APIs so that workers can update it directly. - * - * Since all vacuum workers write the bulk-deletion result at different - * slots we can write them without locking. - */ - if (shared_istat && !shared_istat->updated && istat_res != NULL) - { - memcpy(&shared_istat->istat, istat_res, sizeof(IndexBulkDeleteResult)); - shared_istat->updated = true; - - /* Free the locally-allocated bulk-deletion result */ - pfree(istat_res); - - /* return the pointer to the result from shared memory */ - return &shared_istat->istat; - } - - return istat_res; -} - /* * lazy_cleanup_all_indexes() -- cleanup all indexes of relation. */ @@ -3002,7 +2483,8 @@ lazy_cleanup_all_indexes(LVRelState *vacrel) else { /* Outsource everything to parallel variant */ - do_parallel_lazy_cleanup_all_indexes(vacrel); + perform_parallel_index_cleanup(vacrel->pvc, vacrel->new_rel_tuples, + (vacrel->tupcount_pages < vacrel->rel_pages)); } } @@ -3048,13 +2530,7 @@ lazy_vacuum_one_index(Relation indrel, IndexBulkDeleteResult *istat, InvalidBlockNumber, InvalidOffsetNumber); /* Do bulk deletion */ - istat = index_bulk_delete(&ivinfo, istat, lazy_tid_reaped, - (void *) vacrel->dead_tuples); - - ereport(elevel, - (errmsg("scanned index \"%s\" to remove %d row versions", - vacrel->indname, vacrel->dead_tuples->num_tuples), - errdetail_internal("%s", pg_rusage_show(&ru0)))); + istat = vacuum_one_index(&ivinfo, istat, vacrel->dead_tuples); /* Revert to the previous phase information for error traceback */ restore_vacuum_error_info(vacrel, &saved_err_info); @@ -3088,7 +2564,6 @@ lazy_cleanup_one_index(Relation indrel, IndexBulkDeleteResult *istat, ivinfo.report_progress = false; ivinfo.estimated_count = estimated_count; ivinfo.message_level = elevel; - ivinfo.num_heap_tuples = reltuples; ivinfo.strategy = vacrel->bstrategy; @@ -3104,24 +2579,7 @@ lazy_cleanup_one_index(Relation indrel, IndexBulkDeleteResult *istat, VACUUM_ERRCB_PHASE_INDEX_CLEANUP, InvalidBlockNumber, InvalidOffsetNumber); - istat = index_vacuum_cleanup(&ivinfo, istat); - - if (istat) - { - ereport(elevel, - (errmsg("index \"%s\" now contains %.0f row versions in %u pages", - RelationGetRelationName(indrel), - istat->num_index_tuples, - istat->num_pages), - errdetail("%.0f index row versions were removed.\n" - "%u index pages were newly deleted.\n" - "%u index pages are currently deleted, of which %u are currently reusable.\n" - "%s.", - istat->tuples_removed, - istat->pages_newly_deleted, - istat->pages_deleted, istat->pages_free, - pg_rusage_show(&ru0)))); - } + istat = cleanup_one_index(&ivinfo, istat); /* Revert to the previous phase information for error traceback */ restore_vacuum_error_info(vacrel, &saved_err_info); @@ -3479,9 +2937,11 @@ compute_max_dead_tuples(BlockNumber relblocks, bool hasindex) static void lazy_space_alloc(LVRelState *vacrel, int nworkers, BlockNumber nblocks) { - LVDeadTuples *dead_tuples; + VacDeadTuples *dead_tuples; long maxtuples; + maxtuples = compute_max_dead_tuples(nblocks, vacrel->nindexes > 0); + /* * Initialize state for a parallel vacuum. As of now, only one worker can * be used for an index, so we invoke parallelism only if there are at @@ -3505,16 +2965,29 @@ lazy_space_alloc(LVRelState *vacrel, int nworkers, BlockNumber nblocks) vacrel->relname))); } else - vacrel->lps = begin_parallel_vacuum(vacrel, nblocks, nworkers); + { + ParallelVacuumCtl ctl; + + /* Create parallel vacuum context */ + ctl.rel = vacrel->rel; + ctl.indrels = vacrel->indrels; + ctl.nindexes = vacrel->nindexes; + ctl.nrequested_workers = nworkers; + ctl.maxtuples = maxtuples; + ctl.elevel = elevel; + ctl.bstrategy = vacrel->bstrategy; + vacrel->pvc = begin_parallel_vacuum(&ctl); + } /* If parallel mode started, we're done */ if (ParallelVacuumIsActive(vacrel)) + { + vacrel->dead_tuples = get_vacuum_dead_tuples(vacrel->pvc); return; + } } - maxtuples = compute_max_dead_tuples(nblocks, vacrel->nindexes > 0); - - dead_tuples = (LVDeadTuples *) palloc(SizeOfDeadTuples(maxtuples)); + dead_tuples = (VacDeadTuples *) palloc(SizeOfDeadTuples(maxtuples)); dead_tuples->num_tuples = 0; dead_tuples->max_tuples = (int) maxtuples; @@ -3534,75 +3007,8 @@ lazy_space_free(LVRelState *vacrel) * End parallel mode before updating index statistics as we cannot write * during parallel mode. */ - end_parallel_vacuum(vacrel); -} - -/* - * lazy_tid_reaped() -- is a particular tid deletable? - * - * This has the right signature to be an IndexBulkDeleteCallback. - * - * Assumes dead_tuples array is in sorted order. - */ -static bool -lazy_tid_reaped(ItemPointer itemptr, void *state) -{ - LVDeadTuples *dead_tuples = (LVDeadTuples *) state; - int64 litem, - ritem, - item; - ItemPointer res; - - litem = itemptr_encode(&dead_tuples->itemptrs[0]); - ritem = itemptr_encode(&dead_tuples->itemptrs[dead_tuples->num_tuples - 1]); - item = itemptr_encode(itemptr); - - /* - * Doing a simple bound check before bsearch() is useful to avoid the - * extra cost of bsearch(), especially if dead tuples on the heap are - * concentrated in a certain range. Since this function is called for - * every index tuple, it pays to be really fast. - */ - if (item < litem || item > ritem) - return false; - - res = (ItemPointer) bsearch((void *) itemptr, - (void *) dead_tuples->itemptrs, - dead_tuples->num_tuples, - sizeof(ItemPointerData), - vac_cmp_itemptr); - - return (res != NULL); -} - -/* - * Comparator routines for use with qsort() and bsearch(). - */ -static int -vac_cmp_itemptr(const void *left, const void *right) -{ - BlockNumber lblk, - rblk; - OffsetNumber loff, - roff; - - lblk = ItemPointerGetBlockNumber((ItemPointer) left); - rblk = ItemPointerGetBlockNumber((ItemPointer) right); - - if (lblk < rblk) - return -1; - if (lblk > rblk) - return 1; - - loff = ItemPointerGetOffsetNumber((ItemPointer) left); - roff = ItemPointerGetOffsetNumber((ItemPointer) right); - - if (loff < roff) - return -1; - if (loff > roff) - return 1; - - return 0; + end_parallel_vacuum(vacrel->pvc, vacrel->indstats); + vacrel->pvc = NULL; } /* @@ -3725,76 +3131,6 @@ heap_page_is_all_visible(LVRelState *vacrel, Buffer buf, return all_visible; } -/* - * Compute the number of parallel worker processes to request. Both index - * vacuum and index cleanup can be executed with parallel workers. The index - * is eligible for parallel vacuum iff its size is greater than - * min_parallel_index_scan_size as invoking workers for very small indexes - * can hurt performance. - * - * nrequested is the number of parallel workers that user requested. If - * nrequested is 0, we compute the parallel degree based on nindexes, that is - * the number of indexes that support parallel vacuum. This function also - * sets will_parallel_vacuum to remember indexes that participate in parallel - * vacuum. - */ -static int -compute_parallel_vacuum_workers(LVRelState *vacrel, int nrequested, - bool *will_parallel_vacuum) -{ - int nindexes_parallel = 0; - int nindexes_parallel_bulkdel = 0; - int nindexes_parallel_cleanup = 0; - int parallel_workers; - - /* - * We don't allow performing parallel operation in standalone backend or - * when parallelism is disabled. - */ - if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0) - return 0; - - /* - * Compute the number of indexes that can participate in parallel vacuum. - */ - for (int idx = 0; idx < vacrel->nindexes; idx++) - { - Relation indrel = vacrel->indrels[idx]; - uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions; - - if (vacoptions == VACUUM_OPTION_NO_PARALLEL || - RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size) - continue; - - will_parallel_vacuum[idx] = true; - - if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0) - nindexes_parallel_bulkdel++; - if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) || - ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)) - nindexes_parallel_cleanup++; - } - - nindexes_parallel = Max(nindexes_parallel_bulkdel, - nindexes_parallel_cleanup); - - /* The leader process takes one index */ - nindexes_parallel--; - - /* No index supports parallel vacuum */ - if (nindexes_parallel <= 0) - return 0; - - /* Compute the parallel degree */ - parallel_workers = (nrequested > 0) ? - Min(nrequested, nindexes_parallel) : nindexes_parallel; - - /* Cap by max_parallel_maintenance_workers */ - parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers); - - return parallel_workers; -} - /* * Update index statistics in pg_class if the statistics are accurate. */ @@ -3827,426 +3163,6 @@ update_index_statistics(LVRelState *vacrel) } } -/* - * This function prepares and returns parallel vacuum state if we can launch - * even one worker. This function is responsible for entering parallel mode, - * create a parallel context, and then initialize the DSM segment. - */ -static LVParallelState * -begin_parallel_vacuum(LVRelState *vacrel, BlockNumber nblocks, - int nrequested) -{ - LVParallelState *lps = NULL; - Relation *indrels = vacrel->indrels; - int nindexes = vacrel->nindexes; - ParallelContext *pcxt; - LVShared *shared; - LVDeadTuples *dead_tuples; - BufferUsage *buffer_usage; - WalUsage *wal_usage; - bool *will_parallel_vacuum; - long maxtuples; - Size est_shared; - Size est_deadtuples; - int nindexes_mwm = 0; - int parallel_workers = 0; - int querylen; - - /* - * A parallel vacuum must be requested and there must be indexes on the - * relation - */ - Assert(nrequested >= 0); - Assert(nindexes > 0); - - /* - * Compute the number of parallel vacuum workers to launch - */ - will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes); - parallel_workers = compute_parallel_vacuum_workers(vacrel, - nrequested, - will_parallel_vacuum); - - /* Can't perform vacuum in parallel */ - if (parallel_workers <= 0) - { - pfree(will_parallel_vacuum); - return lps; - } - - lps = (LVParallelState *) palloc0(sizeof(LVParallelState)); - - EnterParallelMode(); - pcxt = CreateParallelContext("postgres", "parallel_vacuum_main", - parallel_workers); - Assert(pcxt->nworkers > 0); - lps->pcxt = pcxt; - - /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */ - est_shared = MAXALIGN(add_size(SizeOfLVShared, BITMAPLEN(nindexes))); - for (int idx = 0; idx < nindexes; idx++) - { - Relation indrel = indrels[idx]; - uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions; - - /* - * Cleanup option should be either disabled, always performing in - * parallel or conditionally performing in parallel. - */ - Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) || - ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0)); - Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE); - - /* Skip indexes that don't participate in parallel vacuum */ - if (!will_parallel_vacuum[idx]) - continue; - - if (indrel->rd_indam->amusemaintenanceworkmem) - nindexes_mwm++; - - est_shared = add_size(est_shared, sizeof(LVSharedIndStats)); - - /* - * Remember the number of indexes that support parallel operation for - * each phase. - */ - if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0) - lps->nindexes_parallel_bulkdel++; - if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) - lps->nindexes_parallel_cleanup++; - if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0) - lps->nindexes_parallel_condcleanup++; - } - shm_toc_estimate_chunk(&pcxt->estimator, est_shared); - shm_toc_estimate_keys(&pcxt->estimator, 1); - - /* Estimate size for dead tuples -- PARALLEL_VACUUM_KEY_DEAD_TUPLES */ - maxtuples = compute_max_dead_tuples(nblocks, true); - est_deadtuples = MAXALIGN(SizeOfDeadTuples(maxtuples)); - shm_toc_estimate_chunk(&pcxt->estimator, est_deadtuples); - shm_toc_estimate_keys(&pcxt->estimator, 1); - - /* - * Estimate space for BufferUsage and WalUsage -- - * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE. - * - * If there are no extensions loaded that care, we could skip this. We - * have no way of knowing whether anyone's looking at pgBufferUsage or - * pgWalUsage, so do it unconditionally. - */ - shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(BufferUsage), pcxt->nworkers)); - shm_toc_estimate_keys(&pcxt->estimator, 1); - shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(WalUsage), pcxt->nworkers)); - shm_toc_estimate_keys(&pcxt->estimator, 1); - - /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */ - if (debug_query_string) - { - querylen = strlen(debug_query_string); - shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1); - shm_toc_estimate_keys(&pcxt->estimator, 1); - } - else - querylen = 0; /* keep compiler quiet */ - - InitializeParallelDSM(pcxt); - - /* Prepare shared information */ - shared = (LVShared *) shm_toc_allocate(pcxt->toc, est_shared); - MemSet(shared, 0, est_shared); - shared->relid = RelationGetRelid(vacrel->rel); - shared->elevel = elevel; - shared->maintenance_work_mem_worker = - (nindexes_mwm > 0) ? - maintenance_work_mem / Min(parallel_workers, nindexes_mwm) : - maintenance_work_mem; - - pg_atomic_init_u32(&(shared->cost_balance), 0); - pg_atomic_init_u32(&(shared->active_nworkers), 0); - pg_atomic_init_u32(&(shared->idx), 0); - shared->offset = MAXALIGN(add_size(SizeOfLVShared, BITMAPLEN(nindexes))); - - /* - * Initialize variables for shared index statistics, set NULL bitmap and - * the size of stats for each index. - */ - memset(shared->bitmap, 0x00, BITMAPLEN(nindexes)); - for (int idx = 0; idx < nindexes; idx++) - { - if (!will_parallel_vacuum[idx]) - continue; - - /* Set NOT NULL as this index does support parallelism */ - shared->bitmap[idx >> 3] |= 1 << (idx & 0x07); - } - - shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared); - lps->lvshared = shared; - - /* Prepare the dead tuple space */ - dead_tuples = (LVDeadTuples *) shm_toc_allocate(pcxt->toc, est_deadtuples); - dead_tuples->max_tuples = maxtuples; - dead_tuples->num_tuples = 0; - MemSet(dead_tuples->itemptrs, 0, sizeof(ItemPointerData) * maxtuples); - shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_DEAD_TUPLES, dead_tuples); - vacrel->dead_tuples = dead_tuples; - - /* - * Allocate space for each worker's BufferUsage and WalUsage; no need to - * initialize - */ - buffer_usage = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(BufferUsage), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, buffer_usage); - lps->buffer_usage = buffer_usage; - wal_usage = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(WalUsage), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_WAL_USAGE, wal_usage); - lps->wal_usage = wal_usage; - - /* Store query string for workers */ - if (debug_query_string) - { - char *sharedquery; - - sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1); - memcpy(sharedquery, debug_query_string, querylen + 1); - sharedquery[querylen] = '\0'; - shm_toc_insert(pcxt->toc, - PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery); - } - - pfree(will_parallel_vacuum); - return lps; -} - -/* - * Destroy the parallel context, and end parallel mode. - * - * Since writes are not allowed during parallel mode, copy the - * updated index statistics from DSM into local memory and then later use that - * to update the index statistics. One might think that we can exit from - * parallel mode, update the index statistics and then destroy parallel - * context, but that won't be safe (see ExitParallelMode). - */ -static void -end_parallel_vacuum(LVRelState *vacrel) -{ - IndexBulkDeleteResult **indstats = vacrel->indstats; - LVParallelState *lps = vacrel->lps; - int nindexes = vacrel->nindexes; - - Assert(!IsParallelWorker()); - - /* Copy the updated statistics */ - for (int idx = 0; idx < nindexes; idx++) - { - LVSharedIndStats *shared_istat; - - shared_istat = parallel_stats_for_idx(lps->lvshared, idx); - - /* - * Skip index -- it must have been processed by the leader, from - * inside do_serial_processing_for_unsafe_indexes() - */ - if (shared_istat == NULL) - continue; - - if (shared_istat->updated) - { - indstats[idx] = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult)); - memcpy(indstats[idx], &shared_istat->istat, sizeof(IndexBulkDeleteResult)); - } - else - indstats[idx] = NULL; - } - - DestroyParallelContext(lps->pcxt); - ExitParallelMode(); - - /* Deactivate parallel vacuum */ - pfree(lps); - vacrel->lps = NULL; -} - -/* - * Return shared memory statistics for index at offset 'getidx', if any - * - * Returning NULL indicates that compute_parallel_vacuum_workers() determined - * that the index is a totally unsuitable target for all parallel processing - * up front. For example, the index could be < min_parallel_index_scan_size - * cutoff. - */ -static LVSharedIndStats * -parallel_stats_for_idx(LVShared *lvshared, int getidx) -{ - char *p; - - if (IndStatsIsNull(lvshared, getidx)) - return NULL; - - p = (char *) GetSharedIndStats(lvshared); - for (int idx = 0; idx < getidx; idx++) - { - if (IndStatsIsNull(lvshared, idx)) - continue; - - p += sizeof(LVSharedIndStats); - } - - return (LVSharedIndStats *) p; -} - -/* - * Returns false, if the given index can't participate in parallel index - * vacuum or parallel index cleanup - */ -static bool -parallel_processing_is_safe(Relation indrel, LVShared *lvshared) -{ - uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions; - - /* first_time must be true only if for_cleanup is true */ - Assert(lvshared->for_cleanup || !lvshared->first_time); - - if (lvshared->for_cleanup) - { - /* Skip, if the index does not support parallel cleanup */ - if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) && - ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0)) - return false; - - /* - * Skip, if the index supports parallel cleanup conditionally, but we - * have already processed the index (for bulkdelete). See the - * comments for option VACUUM_OPTION_PARALLEL_COND_CLEANUP to know - * when indexes support parallel cleanup conditionally. - */ - if (!lvshared->first_time && - ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)) - return false; - } - else if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) == 0) - { - /* Skip if the index does not support parallel bulk deletion */ - return false; - } - - return true; -} - -/* - * Perform work within a launched parallel process. - * - * Since parallel vacuum workers perform only index vacuum or index cleanup, - * we don't need to report progress information. - */ -void -parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) -{ - Relation rel; - Relation *indrels; - LVShared *lvshared; - LVDeadTuples *dead_tuples; - BufferUsage *buffer_usage; - WalUsage *wal_usage; - int nindexes; - char *sharedquery; - LVRelState vacrel; - ErrorContextCallback errcallback; - - lvshared = (LVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, - false); - elevel = lvshared->elevel; - - if (lvshared->for_cleanup) - elog(DEBUG1, "starting parallel vacuum worker for cleanup"); - else - elog(DEBUG1, "starting parallel vacuum worker for bulk delete"); - - /* Set debug_query_string for individual workers */ - sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true); - debug_query_string = sharedquery; - pgstat_report_activity(STATE_RUNNING, debug_query_string); - - /* - * Open table. The lock mode is the same as the leader process. It's - * okay because the lock mode does not conflict among the parallel - * workers. - */ - rel = table_open(lvshared->relid, ShareUpdateExclusiveLock); - - /* - * Open all indexes. indrels are sorted in order by OID, which should be - * matched to the leader's one. - */ - vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels); - Assert(nindexes > 0); - - /* Set dead tuple space */ - dead_tuples = (LVDeadTuples *) shm_toc_lookup(toc, - PARALLEL_VACUUM_KEY_DEAD_TUPLES, - false); - - /* Set cost-based vacuum delay */ - VacuumCostActive = (VacuumCostDelay > 0); - VacuumCostBalance = 0; - VacuumPageHit = 0; - VacuumPageMiss = 0; - VacuumPageDirty = 0; - VacuumCostBalanceLocal = 0; - VacuumSharedCostBalance = &(lvshared->cost_balance); - VacuumActiveNWorkers = &(lvshared->active_nworkers); - - vacrel.rel = rel; - vacrel.indrels = indrels; - vacrel.nindexes = nindexes; - /* Each parallel VACUUM worker gets its own access strategy */ - vacrel.bstrategy = GetAccessStrategy(BAS_VACUUM); - vacrel.indstats = (IndexBulkDeleteResult **) - palloc0(nindexes * sizeof(IndexBulkDeleteResult *)); - - if (lvshared->maintenance_work_mem_worker > 0) - maintenance_work_mem = lvshared->maintenance_work_mem_worker; - - /* - * Initialize vacrel for use as error callback arg by parallel worker. - */ - vacrel.relnamespace = get_namespace_name(RelationGetNamespace(rel)); - vacrel.relname = pstrdup(RelationGetRelationName(rel)); - vacrel.indname = NULL; - vacrel.phase = VACUUM_ERRCB_PHASE_UNKNOWN; /* Not yet processing */ - vacrel.dead_tuples = dead_tuples; - - /* Setup error traceback support for ereport() */ - errcallback.callback = vacuum_error_callback; - errcallback.arg = &vacrel; - errcallback.previous = error_context_stack; - error_context_stack = &errcallback; - - /* Prepare to track buffer usage during parallel execution */ - InstrStartParallelQuery(); - - /* Process indexes to perform vacuum/cleanup */ - do_parallel_processing(&vacrel, lvshared); - - /* Report buffer/WAL usage during parallel execution */ - buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false); - wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false); - InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber], - &wal_usage[ParallelWorkerNumber]); - - /* Pop the error context stack */ - error_context_stack = errcallback.previous; - - vac_close_indexes(nindexes, indrels, RowExclusiveLock); - table_close(rel, ShareUpdateExclusiveLock); - FreeAccessStrategy(vacrel.bstrategy); - pfree(vacrel.indstats); -} - /* * Error context callback for errors occurring during vacuum. */ diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index bb1881f573..6b427772d5 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -25,6 +25,7 @@ #include "catalog/pg_enum.h" #include "catalog/storage.h" #include "commands/async.h" +#include "commands/vacuum.h" #include "executor/execParallel.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile index e8504f0ae4..48f7348f91 100644 --- a/src/backend/commands/Makefile +++ b/src/backend/commands/Makefile @@ -59,6 +59,7 @@ OBJS = \ typecmds.o \ user.o \ vacuum.o \ + vacuumparallel.o \ variable.o \ view.o diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index 5c4bc15b44..2fcb576540 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -32,6 +32,7 @@ #include "access/transam.h" #include "access/xact.h" #include "catalog/namespace.h" +#include "catalog/index.h" #include "catalog/pg_database.h" #include "catalog/pg_inherits.h" #include "catalog/pg_namespace.h" @@ -51,6 +52,7 @@ #include "utils/fmgroids.h" #include "utils/guc.h" #include "utils/memutils.h" +#include "utils/pg_rusage.h" #include "utils/snapmgr.h" #include "utils/syscache.h" @@ -90,6 +92,9 @@ static bool vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params); static double compute_parallel_delay(void); static VacOptValue get_vacoptval_from_boolean(DefElem *def); +static bool vac_tid_reaped(ItemPointer itemptr, void *state); +static int vac_cmp_itemptr(const void *left, const void *right); + /* * Primary entry point for manual VACUUM and ANALYZE commands * @@ -2258,3 +2263,140 @@ get_vacoptval_from_boolean(DefElem *def) { return defGetBoolean(def) ? VACOPTVALUE_ENABLED : VACOPTVALUE_DISABLED; } + +/* + * lazy_vacuum_one_index() -- vacuum index relation. + * + * Delete all the index entries pointing to tuples listed in + * dead_tuples, and update running statistics. + * + * reltuples is the number of heap tuples to be passed to the + * bulkdelete callback. It's always assumed to be estimated. + * + * Returns bulk delete stats derived from input stats + */ +IndexBulkDeleteResult * +vacuum_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat, + VacDeadTuples *dead_tuples) +{ + PGRUsage ru0; + + pg_rusage_init(&ru0); + + /* Do bulk deletion */ + istat = index_bulk_delete(ivinfo, istat, vac_tid_reaped, + (void *) dead_tuples); + + ereport(ivinfo->message_level, + (errmsg("scanned index \"%s\" to remove %d row versions", + RelationGetRelationName(ivinfo->index), + dead_tuples->num_tuples), + errdetail_internal("%s", pg_rusage_show(&ru0)))); + + return istat; +} + +/* + * lazy_cleanup_one_index() -- do post-vacuum cleanup for index relation. + * + * reltuples is the number of heap tuples and estimated_count is true + * if reltuples is an estimated value. + * + * Returns bulk delete stats derived from input stats + */ +IndexBulkDeleteResult * +cleanup_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat) +{ + PGRUsage ru0; + + pg_rusage_init(&ru0); + + istat = index_vacuum_cleanup(ivinfo, istat); + + if (istat) + { + ereport(ivinfo->message_level, + (errmsg("index \"%s\" now contains %.0f row versions in %u pages", + RelationGetRelationName(ivinfo->index), + istat->num_index_tuples, + istat->num_pages), + errdetail("%.0f index row versions were removed.\n" + "%u index pages were newly deleted.\n" + "%u index pages are currently deleted, of which %u are currently reusable.\n" + "%s.", + istat->tuples_removed, + istat->pages_newly_deleted, + istat->pages_deleted, istat->pages_free, + pg_rusage_show(&ru0)))); + } + + return istat; +} + +/* + * lazy_tid_reaped() -- is a particular tid deletable? + * + * This has the right signature to be an IndexBulkDeleteCallback. + * + * Assumes dead_tuples array is in sorted order. + */ +static bool +vac_tid_reaped(ItemPointer itemptr, void *state) +{ + VacDeadTuples *dead_tuples = (VacDeadTuples *) state; + int64 litem, + ritem, + item; + ItemPointer res; + + litem = itemptr_encode(&dead_tuples->itemptrs[0]); + ritem = itemptr_encode(&dead_tuples->itemptrs[dead_tuples->num_tuples - 1]); + item = itemptr_encode(itemptr); + + /* + * Doing a simple bound check before bsearch() is useful to avoid the + * extra cost of bsearch(), especially if dead tuples on the heap are + * concentrated in a certain range. Since this function is called for + * every index tuple, it pays to be really fast. + */ + if (item < litem || item > ritem) + return false; + + res = (ItemPointer) bsearch((void *) itemptr, + (void *) dead_tuples->itemptrs, + dead_tuples->num_tuples, + sizeof(ItemPointerData), + vac_cmp_itemptr); + + return (res != NULL); +} + +/* + * Comparator routines for use with qsort() and bsearch(). + */ +static int +vac_cmp_itemptr(const void *left, const void *right) +{ + BlockNumber lblk, + rblk; + OffsetNumber loff, + roff; + + lblk = ItemPointerGetBlockNumber((ItemPointer) left); + rblk = ItemPointerGetBlockNumber((ItemPointer) right); + + if (lblk < rblk) + return -1; + if (lblk > rblk) + return 1; + + loff = ItemPointerGetOffsetNumber((ItemPointer) left); + roff = ItemPointerGetOffsetNumber((ItemPointer) right); + + if (loff < roff) + return -1; + if (loff > roff) + return 1; + + return 0; +} diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c new file mode 100644 index 0000000000..165fa28be4 --- /dev/null +++ b/src/backend/commands/vacuumparallel.c @@ -0,0 +1,1042 @@ +/*------------------------------------------------------------------------- + * + * vacuumparallel.c + * Support routines for parallel vacuum execution. + * + * This file contains routines that are intended to support setting up, using + * and tearing down a ParallelVacuumContext. + * + * In a parallel vacuum, we perform both index bulk-deletion and index cleanup + * with parallel worker processes. Individual indexes are processed by one + * vacuum process. ParalleVacuumContext contains shared information as well + * as the memory space for storing dead tuples allocated in the DSM segment. + * When starting either parallel index bulk-deletion or index cleanup, we + * launch parallel worker processes. Once all index are processed, the + * parallel worker processes exit. In the next time, the parallel context + * is re-initialized so that the same DSM can be used for multiple passes of + * index bulk-deletion and index cleanup. At the end of a parallel vacuum, + * ParallelVacuumContext is destroyed while returning index statistics so + * that we can update them after exiting from the parallel mode. + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/commands/vacuumparallel.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/amapi.h" +#include "access/genam.h" +#include "access/parallel.h" +#include "access/table.h" +#include "access/transam.h" +#include "access/xact.h" +#include "catalog/index.h" +#include "commands/vacuum.h" +#include "miscadmin.h" +#include "optimizer/paths.h" +#include "pgstat.h" +#include "storage/bufmgr.h" +#include "storage/lmgr.h" +#include "tcop/tcopprot.h" +#include "utils/elog.h" +#include "utils/rel.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" + +/* + * DSM keys for parallel vacuum. Unlike other parallel execution code, since + * we don't need to worry about DSM keys conflicting with plan_node_id we can + * use small integers. + */ +#define PARALLEL_VACUUM_KEY_SHARED 1 +#define PARALLEL_VACUUM_KEY_DEAD_TUPLES 2 +#define PARALLEL_VACUUM_KEY_QUERY_TEXT 3 +#define PARALLEL_VACUUM_KEY_BUFFER_USAGE 4 +#define PARALLEL_VACUUM_KEY_WAL_USAGE 5 +#define PARALLEL_VACUUM_KEY_INDEX_STATS 6 + +/* + * Shared information among parallel workers. So this is allocated in the DSM + * segment. + */ +typedef struct PVShared +{ + /* + * Target table relid and log level. These fields are not modified during + * the lazy vacuum. + */ + Oid relid; + int elevel; + + /* + * Fields for both index vacuum and cleanup. + * + * reltuples is the total number of input heap tuples. We set either old + * live tuples in the index vacuum case or the new live tuples in the + * index cleanup case. + * + * estimated_count is true if reltuples is an estimated value. (Note that + * reltuples could be -1 in this case, indicating we have no idea.) + */ + double num_table_tuples; + bool estimated_count; + + /* + * In single process lazy vacuum we could consume more memory during index + * vacuuming or cleanup apart from the memory for heap scanning. In + * parallel vacuum, since individual vacuum workers can consume memory + * equal to maintenance_work_mem, the new maintenance_work_mem for each + * worker is set such that the parallel operation doesn't consume more + * memory than single process lazy vacuum. + */ + int maintenance_work_mem_worker; + + /* + * Shared vacuum cost balance. During parallel vacuum, + * VacuumSharedCostBalance points to this value and it accumulates the + * balance of each parallel vacuum worker. + */ + pg_atomic_uint32 cost_balance; + + /* + * Number of active parallel workers. This is used for computing the + * minimum threshold of the vacuum cost balance before a worker sleeps for + * cost-based delay. + */ + pg_atomic_uint32 active_nworkers; + + /* Counter for vacuuming and cleanup */ + pg_atomic_uint32 idx; +} PVShared; + +/* Status used during parallel index vacuum or cleanup */ +typedef enum PVIndVacStatus +{ + INDVAC_STATUS_INITIAL = 0, + INDVAC_STATUS_NEED_BULKDELETE, + INDVAC_STATUS_NEED_CLEANUP, + INDVAC_STATUS_COMPLETED, +} PVIndVacStatus; + +/* + * Struct for an index bulk-deletion statistic used for parallel vacuum. This + * is allocated in the DSM segment. + */ +typedef struct PVIndStats +{ + /* + * The following two fields are set by leader process before executing + * parallel index vacuum or parallel index cleanup. + * + * parallel_workers_can_process is true if both leader and worker can + * process the index, otherwise only leader can process it. This value + * is not a fixed for the entire VACUUM operation. It is only fixed for + * an individual parallel index vacuum and cleanup. + */ + PVIndVacStatus status; + bool parallel_workers_can_process; + + /* + * Individual worker or leader stores the result of index vacuum or + * cleanup. + */ + bool istat_updated; /* are the stats updated? */ + IndexBulkDeleteResult istat; +} PVIndStats; + +/* + * Struct for a parallel index vacuum. */ +typedef struct PVState +{ + /* Target indexes */ + Relation *indrels; + int nindexes; + + PVIndStats *indstats; + PVShared *shared; + VacDeadTuples *dead_tuples; + BufferAccessStrategy bstrategy; + + /* Error reporting state */ + char *relnamespace; + char *relname; + char *indname; + PVIndVacStatus status; +} PVState; + +/* Struct for maintaining a parallel vacuum state. */ +typedef struct ParallelVacuumContext +{ + ParallelContext *pcxt; + + /* Target indexes */ + Relation *indrels; + int nindexes; + + /* Shared information among parallel vacuum workers */ + PVShared *shared; + + /* Shared index statistics among parallel vacuum workers */ + PVIndStats *indstats; + + /* Shared dead tuple space among parallel vacuum workers */ + VacDeadTuples *dead_tuples; + + /* Points to buffer usage area in DSM */ + BufferUsage *buffer_usage; + + /* Points to WAL usage area in DSM */ + WalUsage *wal_usage; + + /* + * The number of indexes that support parallel index bulk-deletion and + * parallel index cleanup respectively. + */ + int nindexes_parallel_bulkdel; + int nindexes_parallel_cleanup; + int nindexes_parallel_condcleanup; + + /* Incremented by each bulkdel or cleanup */ + int num_index_scans; + + /* Buffer access strategy used by leader process */ + BufferAccessStrategy bstrategy; +} ParallelVacuumContext; + +static int compute_parallel_vacuum_workers(Relation *indrels, int nindexes, + int nrequested); +static void set_parallel_vacuum_index_status(ParallelVacuumContext *pvc, + bool bulkdel); +static void parallel_vacuum_all_indexes(ParallelVacuumContext *pvc, bool bulkdel); +static void parallel_vacuum_indexes(PVState *pvstate); +static void serial_vacuum_unsafe_indexes(PVState *pvstate); +static void parallel_vacuum_one_index(PVState *pvstate, Relation indrel, + PVIndStats *stats); +static bool index_parallel_vacuum_is_safe(Relation indrel, int num_index_scans, + bool bulkdel); +static void parallel_index_vacuum_error_callback(void *arg); + +/* + * This function prepares and returns parallel vacuum context if we can launch + * even one worker. This function is responsible for entering parallel mode, + * create a parallel context, and then initialize the DSM segment. + */ +ParallelVacuumContext * +begin_parallel_vacuum(ParallelVacuumCtl *pvctl) +{ + ParallelVacuumContext *pvc = NULL; + ParallelContext *pcxt; + PVIndStats *indstats; + PVShared *shared; + VacDeadTuples *dead_tuples; + BufferUsage *buffer_usage; + WalUsage *wal_usage; + Size est_indstats = 0; + Size est_shared = 0; + Size est_deadtuples = 0; + int nindexes_mwm = 0; + int parallel_workers = 0; + int querylen; + + /* + * A parallel vacuum must be requested and there must be indexes on the + * relation + */ + Assert(pvctl); + Assert(pvctl->nrequested_workers >= 0); + Assert(pvctl->nindexes > 0); + + /* + * Compute the number of parallel vacuum workers to launch + */ + parallel_workers = compute_parallel_vacuum_workers(pvctl->indrels, + pvctl->nindexes, + pvctl->nrequested_workers); + + /* Can't perform vacuum in parallel */ + if (parallel_workers <= 0) + return pvc; + + pvc = (ParallelVacuumContext *) palloc0(sizeof(ParallelVacuumContext)); + pvc->indrels = pvctl->indrels; + pvc->nindexes = pvctl->nindexes; + pvc->bstrategy = pvctl->bstrategy; + + EnterParallelMode(); + pcxt = CreateParallelContext("postgres", "parallel_vacuum_main", + parallel_workers); + Assert(pcxt->nworkers > 0); + pvc->pcxt = pcxt; + + /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_STATS */ + est_indstats = mul_size(sizeof(PVIndStats), pvctl->nindexes); + shm_toc_estimate_chunk(&pcxt->estimator, est_indstats); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */ + est_shared = MAXALIGN(sizeof(PVShared)); + shm_toc_estimate_chunk(&pcxt->estimator, est_shared); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Estimate size for dead tuples -- PARALLEL_VACUUM_KEY_DEAD_TUPLES */ + est_deadtuples = MAXALIGN(SizeOfDeadTuples(pvctl->maxtuples)); + shm_toc_estimate_chunk(&pcxt->estimator, est_deadtuples); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* + * Estimate space for BufferUsage and WalUsage -- + * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE. + * + * If there are no extensions loaded that care, we could skip this. We + * have no way of knowing whether anyone's looking at pgBufferUsage or + * pgWalUsage, so do it unconditionally. + */ + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(sizeof(BufferUsage), pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(sizeof(WalUsage), pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */ + if (debug_query_string) + { + querylen = strlen(debug_query_string); + shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } + else + querylen = 0; /* keep compiler quiet */ + + InitializeParallelDSM(pcxt); + + /* Prepare index vacuum stats */ + indstats = (PVIndStats *) shm_toc_allocate(pcxt->toc, est_indstats); + MemSet(indstats, 0, est_indstats); + for (int i = 0; i < pvctl->nindexes; i++) + { + Relation indrel = pvctl->indrels[i]; + uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions; + + /* + * Cleanup option should be either disabled, always performing in + * parallel or conditionally performing in parallel. + */ + Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) || + ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0)); + Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE); + + /* Skip indexes that don't participate in parallel vacuum */ + if (vacoptions == VACUUM_OPTION_NO_PARALLEL || + RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size) + continue; + + if (indrel->rd_indam->amusemaintenanceworkmem) + nindexes_mwm++; + + /* + * Remember the number of indexes that support parallel operation for + * each phase. + */ + if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0) + pvc->nindexes_parallel_bulkdel++; + if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) + pvc->nindexes_parallel_cleanup++; + if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0) + pvc->nindexes_parallel_condcleanup++; + } + + shm_toc_insert(pcxt->toc,PARALLEL_VACUUM_KEY_INDEX_STATS, indstats); + pvc->indstats = indstats; + + /* Prepare shared information */ + shared = (PVShared *) shm_toc_allocate(pcxt->toc, est_shared); + MemSet(shared, 0, est_shared); + shared->relid = RelationGetRelid(pvctl->rel); + shared->elevel = pvctl->elevel; + shared->maintenance_work_mem_worker = + (nindexes_mwm > 0) ? + maintenance_work_mem / Min(parallel_workers, nindexes_mwm) : + maintenance_work_mem; + + pg_atomic_init_u32(&(shared->cost_balance), 0); + pg_atomic_init_u32(&(shared->active_nworkers), 0); + pg_atomic_init_u32(&(shared->idx), 0); + + shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared); + pvc->shared = shared; + + /* Prepare the dead tuple space */ + dead_tuples = (VacDeadTuples *) shm_toc_allocate(pcxt->toc, est_deadtuples); + dead_tuples->max_tuples = pvctl->maxtuples; + dead_tuples->num_tuples = 0; + MemSet(dead_tuples->itemptrs, 0, + sizeof(ItemPointerData) * pvctl->maxtuples); + shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_DEAD_TUPLES, dead_tuples); + pvc->dead_tuples = dead_tuples; + + /* + * Allocate space for each worker's BufferUsage and WalUsage; no need to + * initialize + */ + buffer_usage = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(BufferUsage), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, buffer_usage); + pvc->buffer_usage = buffer_usage; + wal_usage = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(WalUsage), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_WAL_USAGE, wal_usage); + pvc->wal_usage = wal_usage; + + /* Store query string for workers */ + if (debug_query_string) + { + char *sharedquery; + + sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1); + memcpy(sharedquery, debug_query_string, querylen + 1); + sharedquery[querylen] = '\0'; + shm_toc_insert(pcxt->toc, + PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery); + } + + return pvc; +} + +/* + * Destroy the parallel context, and end parallel mode. + * + * Since writes are not allowed during parallel mode, copy updated index + * statistics from DSM into local memory so that the caller uses that to + * update the index statistics. One might think that we can exit from + * parallel mode, update the index statistics and then destroy parallel + * context, but that won't be safe (see ExitParallelMode). + */ +void +end_parallel_vacuum(ParallelVacuumContext *pvc, IndexBulkDeleteResult **indstats) +{ + /* Copy the updated statistics */ + for (int i = 0; i < pvc->nindexes; i++) + { + PVIndStats *stats = &(pvc->indstats[i]); + + if (stats->istat_updated) + { + indstats[i] = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult)); + memcpy(indstats[i], &stats->istat, sizeof(IndexBulkDeleteResult)); + } + else + indstats[i] = NULL; + } + + DestroyParallelContext(pvc->pcxt); + ExitParallelMode(); + + pfree(pvc); +} + +/* Returns the dead tuple space */ +VacDeadTuples * +get_vacuum_dead_tuples(ParallelVacuumContext *pvc) +{ + return pvc->dead_tuples; +} + +/* + * Do parallel index bulk-deletion with parallel workers. + */ +void +perform_parallel_index_bulkdel(ParallelVacuumContext *pvc, long num_table_tuples) +{ + /* + * We can only provide an approximate value of num_heap_tuples, at least + * for now. Matches serial VACUUM case. + */ + pvc->shared->num_table_tuples = num_table_tuples; + pvc->shared->estimated_count = true; + + parallel_vacuum_all_indexes(pvc, true); +} + +/* + * Do parallel index cleanup with parallel workers. + */ +void +perform_parallel_index_cleanup(ParallelVacuumContext *pvc, long num_table_tuples, + bool estimated_count) +{ + /* + * We can provide a better estimate of total number of surviving + * tuples (we assume indexes are more interested in that than in the + * number of nominally live tuples). + */ + pvc->shared->num_table_tuples = num_table_tuples; + pvc->shared->estimated_count = estimated_count; + + parallel_vacuum_all_indexes(pvc, false); +} + +/* + * Perform work within a launched parallel process. + * + * Since parallel vacuum workers perform only index vacuum or index cleanup, + * we don't need to report progress information. + */ +void +parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) +{ + Relation rel; + Relation *indrels; + PVIndStats *indstats; + PVShared *shared; + VacDeadTuples *dead_tuples; + BufferUsage *buffer_usage; + WalUsage *wal_usage; + PVState pvstate; + int nindexes; + char *sharedquery; + ErrorContextCallback errcallback; + + shared = (PVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, + false); + elog(DEBUG1, "starting parallel vacuum worker"); + + /* Set debug_query_string for individual workers */ + sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true); + debug_query_string = sharedquery; + pgstat_report_activity(STATE_RUNNING, debug_query_string); + + /* + * Open table. The lock mode is the same as the leader process. It's + * okay because the lock mode does not conflict among the parallel + * workers. + */ + rel = table_open(shared->relid, ShareUpdateExclusiveLock); + + /* + * Open all indexes. indrels are sorted in order by OID, which should be + * matched to the leader's one. + */ + vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels); + Assert(nindexes > 0); + + /* Set index statistics */ + indstats = (PVIndStats *) shm_toc_lookup(toc, + PARALLEL_VACUUM_KEY_INDEX_STATS, + false); + + /* Set dead tuple space */ + dead_tuples = (VacDeadTuples *) shm_toc_lookup(toc, + PARALLEL_VACUUM_KEY_DEAD_TUPLES, + false); + + /* Set cost-based vacuum delay */ + VacuumCostActive = (VacuumCostDelay > 0); + VacuumCostBalance = 0; + VacuumPageHit = 0; + VacuumPageMiss = 0; + VacuumPageDirty = 0; + VacuumCostBalanceLocal = 0; + VacuumSharedCostBalance = &(shared->cost_balance); + VacuumActiveNWorkers = &(shared->active_nworkers); + + if (shared->maintenance_work_mem_worker > 0) + maintenance_work_mem = shared->maintenance_work_mem_worker; + + /* Parallel vacuum state and the error callback arg */ + pvstate.indrels = indrels; + pvstate.nindexes = nindexes; + pvstate.indstats = indstats; + pvstate.shared = shared; + pvstate.dead_tuples = dead_tuples; + pvstate.bstrategy = GetAccessStrategy(BAS_VACUUM); + pvstate.relnamespace = get_namespace_name(RelationGetNamespace(rel)); + pvstate.relname = pstrdup(RelationGetRelationName(rel)); + pvstate.indname = NULL; /* filled later during index vacuuming */ + + /* Setup error traceback support for ereport() */ + errcallback.callback = parallel_index_vacuum_error_callback; + errcallback.arg = &pvstate; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* Prepare to track buffer usage during parallel execution */ + InstrStartParallelQuery(); + + /* Process indexes to perform bulk-deletion/cleanup */ + parallel_vacuum_indexes(&pvstate); + + /* Report buffer/WAL usage during parallel execution */ + buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false); + wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false); + InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber], + &wal_usage[ParallelWorkerNumber]); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; + + vac_close_indexes(nindexes, indrels, RowExclusiveLock); + table_close(rel, ShareUpdateExclusiveLock); + FreeAccessStrategy(pvstate.bstrategy); +} + +/* + * Compute the number of parallel worker processes to request. Both index + * vacuum and index cleanup can be executed with parallel workers. The index + * is eligible for parallel vacuum iff its size is greater than + * min_parallel_index_scan_size as invoking workers for very small indexes + * can hurt performance. + * + * nrequested is the number of parallel workers that user requested. If + * nrequested is 0, we compute the parallel degree based on nindexes, that is + * the number of indexes that support parallel vacuum. + */ +static int +compute_parallel_vacuum_workers(Relation *indrels, int nindexes, int nrequested) +{ + int nindexes_parallel = 0; + int nindexes_parallel_bulkdel = 0; + int nindexes_parallel_cleanup = 0; + int parallel_workers; + + /* + * We don't allow performing parallel operation in standalone backend or + * when parallelism is disabled. + */ + if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0) + return 0; + + for (int i = 0; i < nindexes; i++) + { + Relation indrel = indrels[i]; + uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions; + + if (vacoptions == VACUUM_OPTION_NO_PARALLEL || + RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size) + continue; + + if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0) + nindexes_parallel_bulkdel++; + if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) || + ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)) + nindexes_parallel_cleanup++; + } + + nindexes_parallel = Max(nindexes_parallel_bulkdel, + nindexes_parallel_cleanup); + + /* The leader process takes one index */ + nindexes_parallel--; + + /* No index supports parallel vacuum */ + if (nindexes_parallel <= 0) + return 0; + + /* Compute the parallel degree */ + parallel_workers = (nrequested > 0) ? + Min(nrequested, nindexes_parallel) : nindexes_parallel; + + /* Cap by max_parallel_maintenance_workers */ + parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers); + + return parallel_workers; +} + +static void +set_parallel_vacuum_index_status(ParallelVacuumContext *pvc, bool bulkdel) +{ + PVIndVacStatus new_status = bulkdel + ? INDVAC_STATUS_NEED_BULKDELETE + : INDVAC_STATUS_NEED_CLEANUP; + + /* Set index vacuum status and mark as parallel safe or not */ + for (int i = 0; i < pvc->nindexes; i++) + { + PVIndStats *stats = &(pvc->indstats[i]); + + Assert(stats->status == INDVAC_STATUS_INITIAL); + + stats->status = new_status; + stats->parallel_workers_can_process = + index_parallel_vacuum_is_safe(pvc->indrels[i], + pvc->num_index_scans, + bulkdel); + } +} + +/* + * Perform index vacuum or index cleanup with parallel workers. This function + * must be used by the parallel vacuum leader process. + */ +static void +parallel_vacuum_all_indexes(ParallelVacuumContext *pvc, bool bulkdel) +{ + int nworkers; + PVState pvstate; + + /* Determine the number of parallel workers to launch */ + if (bulkdel) + nworkers = pvc->nindexes_parallel_bulkdel; + else + { + nworkers = pvc->nindexes_parallel_cleanup; + + /* Add conditionally parallel-aware indexes if in the first time call */ + if (pvc->num_index_scans == 0) + nworkers += pvc->nindexes_parallel_condcleanup; + } + + /* The leader process will participate */ + nworkers--; + + /* + * It is possible that parallel context is initialized with fewer workers + * than the number of indexes that need a separate worker in the current + * phase, so we need to consider it. See compute_parallel_vacuum_workers. + */ + nworkers = Min(nworkers, pvc->pcxt->nworkers); + + /* Reset the parallel index processing counter */ + pg_atomic_write_u32(&(pvc->shared->idx), 0); + + set_parallel_vacuum_index_status(pvc, bulkdel); + + /* Setup the shared cost-based vacuum delay and launch workers */ + if (nworkers > 0) + { + /* Reinitialize the parallel context to relaunch parallel workers */ + if (pvc->num_index_scans > 0) + ReinitializeParallelDSM(pvc->pcxt); + + /* + * Set up shared cost balance and the number of active workers for + * vacuum delay. We need to do this before launching workers as + * otherwise, they might not see the updated values for these + * parameters. + */ + pg_atomic_write_u32(&(pvc->shared->cost_balance), VacuumCostBalance); + pg_atomic_write_u32(&(pvc->shared->active_nworkers), 0); + + /* + * The number of workers can vary between bulkdelete and cleanup + * phase. + */ + ReinitializeParallelWorkers(pvc->pcxt, nworkers); + + LaunchParallelWorkers(pvc->pcxt); + + if (pvc->pcxt->nworkers_launched > 0) + { + /* + * Reset the local cost values for leader backend as we have + * already accumulated the remaining balance of heap. + */ + VacuumCostBalance = 0; + VacuumCostBalanceLocal = 0; + + /* Enable shared cost balance for leader backend */ + VacuumSharedCostBalance = &(pvc->shared->cost_balance); + VacuumActiveNWorkers = &(pvc->shared->active_nworkers); + } + + if (bulkdel) + ereport(pvc->shared->elevel, + (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)", + "launched %d parallel vacuum workers for index vacuuming (planned: %d)", + pvc->pcxt->nworkers_launched), + pvc->pcxt->nworkers_launched, nworkers))); + else + ereport(pvc->shared->elevel, + (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)", + "launched %d parallel vacuum workers for index cleanup (planned: %d)", + pvc->pcxt->nworkers_launched), + pvc->pcxt->nworkers_launched, nworkers))); + } + + pvstate.indrels = pvc->indrels; + pvstate.nindexes = pvc->nindexes; + pvstate.indstats = pvc->indstats; + pvstate.shared = pvc->shared; + pvstate.dead_tuples = pvc->dead_tuples; + pvstate.bstrategy = pvc->bstrategy; + + /* Process the indexes that can be processed by only leader process */ + serial_vacuum_unsafe_indexes(&pvstate); + + /* + * Join as a parallel worker. The leader process alone processes all + * parallel-safe indexes in the case where no workers are launched. + */ + parallel_vacuum_indexes(&pvstate); + + /* + * Next, accumulate buffer and WAL usage. (This must wait for the workers + * to finish, or we might get incomplete data.) + */ + if (nworkers > 0) + { + /* Wait for all vacuum workers to finish */ + WaitForParallelWorkersToFinish(pvc->pcxt); + + for (int i = 0; i < pvc->pcxt->nworkers_launched; i++) + InstrAccumParallelQuery(&pvc->buffer_usage[i], &pvc->wal_usage[i]); + } + + /* + * Reset all index status back to invalid (while checking that we have + * processed all indexes). + */ + for (int i = 0; i < pvc->nindexes; i++) + { + PVIndStats *stats = &(pvc->indstats[i]); + + Assert(stats->status == INDVAC_STATUS_COMPLETED); + stats->status = INDVAC_STATUS_INITIAL; + } + + /* + * Carry the shared balance value to heap scan and disable shared costing + */ + if (VacuumSharedCostBalance) + { + VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance); + VacuumSharedCostBalance = NULL; + VacuumActiveNWorkers = NULL; + } + + /* Increment the count */ + pvc->num_index_scans++; +} + + +/* + * Index vacuum/cleanup routine used by the leader process and parallel + * vacuum worker processes to process the indexes in parallel. + */ +static void +parallel_vacuum_indexes(PVState *pvstate) +{ + /* + * Increment the active worker count if we are able to launch any worker. + */ + if (VacuumActiveNWorkers) + pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1); + + for (;;) + { + int idx; + PVIndStats *stats; + + /* Get an index number to process */ + idx = pg_atomic_fetch_add_u32(&(pvstate->shared->idx), 1); + + /* Done for all indexes? */ + if (idx >= pvstate->nindexes) + break; + + stats = &(pvstate->indstats[idx]); + + /* + * Parallel unsafe indexes can be processed only by leader (these are + * processed in lazy_serial_process_indexes() by leader. + */ + if (!stats->parallel_workers_can_process) + continue; + + parallel_vacuum_one_index(pvstate, pvstate->indrels[idx], stats); + } + + /* + * We have completed the index vacuum so decrement the active worker + * count. + */ + if (VacuumActiveNWorkers) + pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1); +} + +/* + * Perform parallel processing of indexes in leader process. + * + * Handles index vacuuming (or index cleanup) for indexes that are not + * parallel safe. + * + * Also performs processing of smaller indexes that fell under the size cutoff + * enforced by compute_parallel_vacuum_workers(). + */ +static void +serial_vacuum_unsafe_indexes(PVState *pvstate) +{ + Assert(!IsParallelWorker()); + + /* + * Increment the active worker count if we are able to launch any worker. + */ + if (VacuumActiveNWorkers) + pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1); + + for (int i = 0; i < pvstate->nindexes; i++) + { + PVIndStats *stats = &(pvstate->indstats[i]); + + /* Skip, safe indexes as they are processed by workers */ + if (stats->parallel_workers_can_process) + continue; + + parallel_vacuum_one_index(pvstate, pvstate->indrels[i], stats); + } + + /* + * We have completed the index vacuum so decrement the active worker + * count. + */ + if (VacuumActiveNWorkers) + pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1); +} + +/* + * Bulk-delete or cleanup index either by leader process or by one of the worker + * process. After processing the index this function copies the index + * statistics returned from ambulkdelete and amvacuumcleanup to the DSM + * segment. + */ +static void +parallel_vacuum_one_index(PVState *pvstate, Relation indrel, PVIndStats *stats) +{ + IndexBulkDeleteResult *istat = NULL; + IndexBulkDeleteResult *istat_res; + IndexVacuumInfo ivinfo; + + /* Get the index statistics space, if already updated */ + if (stats->istat_updated) + istat = &(stats->istat); + + ivinfo.index = indrel; + ivinfo.analyze_only = false; + ivinfo.report_progress = false; + ivinfo.message_level = pvstate->shared->elevel; + ivinfo.estimated_count = pvstate->shared->estimated_count; + ivinfo.num_heap_tuples = pvstate->shared->num_table_tuples; + ivinfo.strategy = pvstate->bstrategy; + + /* Update error traceback information */ + pvstate->indname = pstrdup(RelationGetRelationName(indrel)); + pvstate->status = stats->status; + + switch (stats->status) + { + case INDVAC_STATUS_NEED_BULKDELETE: + istat_res = vacuum_one_index(&ivinfo, istat, pvstate->dead_tuples); + break; + case INDVAC_STATUS_NEED_CLEANUP: + istat_res = cleanup_one_index(&ivinfo, istat); + break; + default: + elog(ERROR, "unexpected parallel index vacuum status %d", + stats->status); + } + + /* + * Copy the index bulk-deletion result returned from ambulkdelete and + * amvacuumcleanup to the DSM segment if it's the first cycle because they + * allocate locally and it's possible that an index will be vacuumed by a + * different vacuum process the next cycle. Copying the result normally + * happens only the first time an index is vacuumed. For any additional + * vacuum pass, we directly point to the result on the DSM segment and + * pass it to vacuum index APIs so that workers can update it directly. + * + * Since all vacuum workers write the bulk-deletion result at different + * slots we can write them without locking. + */ + if (!stats->istat_updated && istat_res != NULL) + { + memcpy(&(stats->istat), istat_res, sizeof(IndexBulkDeleteResult)); + stats->istat_updated = true; + pfree(istat_res); + } + + /* + * Update the status to completed. No need to lock here since each + * worker touches different indexes. + */ + stats->status = INDVAC_STATUS_COMPLETED; + + /* Reset error traceback information */ + pfree(pvstate->indname); + pvstate->indname = NULL; + pvstate->status = INDVAC_STATUS_COMPLETED; +} + +/* + * Returns false, if the given index can't participate in parallel index + * vacuum or parallel index cleanup. + */ +static bool +index_parallel_vacuum_is_safe(Relation indrel, int num_index_scans, + bool bulkdel) +{ + uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions; + + /* + * Check if the index is a totally unsuitable target for all parallel + * processing up front. For example, the index could be + * < min_parallel_index_scan_size cutoff. + */ + if (vacoptions == VACUUM_OPTION_NO_PARALLEL || + RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size) + return false; + + /* In parallel vacuum case, check if it supports parallel bulk-deletion */ + if (bulkdel) + return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0); + + /* Not safe, if the index does not support parallel cleanup */ + if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) && + ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0)) + return false; + + /* + * Not safe, if the index supports parallel cleanup conditionally, + * but we have already processed the index (for bulkdelete). See the + * comments for option VACUUM_OPTION_PARALLEL_COND_CLEANUP to know + * when indexes support parallel cleanup conditionally. + */ + if (num_index_scans > 0 && + ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)) + return false; + + return true; +} + +/* + * Error context callback for errors occurring during parallel index vacuum. + */ +static void +parallel_index_vacuum_error_callback(void *arg) +{ + PVState *errinfo = arg; + + switch (errinfo->status) + { + case INDVAC_STATUS_NEED_BULKDELETE: + errcontext("while parallelly vacuuming index \"%s\" of relation \"%s.%s\"", + errinfo->indname, + errinfo->relnamespace, + errinfo->relname); + break; + case INDVAC_STATUS_NEED_CLEANUP: + errcontext("while parallelly cleanup index \"%s\" of relation \"%s.%s\"", + errinfo->indname, + errinfo->relnamespace, + errinfo->relname); + break; + case INDVAC_STATUS_INITIAL: + case INDVAC_STATUS_COMPLETED: + default: + break; + } +} diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index e63b49fc38..4a8022fee7 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -198,7 +198,6 @@ extern void heap_get_root_tuples(Page page, OffsetNumber *root_offsets); struct VacuumParams; extern void heap_vacuum_rel(Relation rel, struct VacuumParams *params, BufferAccessStrategy bstrategy); -extern void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc); /* in heap/heapam_visibility.c */ extern bool HeapTupleSatisfiesVisibility(HeapTuple stup, Snapshot snapshot, diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h index 4cfd52eaf4..3dc9055715 100644 --- a/src/include/commands/vacuum.h +++ b/src/include/commands/vacuum.h @@ -15,6 +15,8 @@ #define VACUUM_H #include "access/htup.h" +#include "access/genam.h" +#include "access/parallel.h" #include "catalog/pg_class.h" #include "catalog/pg_statistic.h" #include "catalog/pg_type.h" @@ -62,6 +64,28 @@ /* value for checking vacuum flags */ #define VACUUM_OPTION_MAX_VALID_VALUE ((1 << 3) - 1) +/* Actual bitmap representation is private to vacuumparallel.c */ +typedef struct ParallelVacuumContext ParallelVacuumContext; + +/* Parameter data structure for BeginParallelVacuum */ +typedef struct ParallelVacuumCtl +{ + /* Table and its indexes */ + Relation rel; + Relation *indrels; + int nindexes; + + /* The number of workers requested to launch */ + int nrequested_workers; + + /* The maximum dead tuples to store */ + long maxtuples; + + /* Log level and the buffer access strategy */ + int elevel; + BufferAccessStrategy bstrategy; +} ParallelVacuumCtl; + /*---------- * ANALYZE builds one of these structs for each attribute (column) that is * to be analyzed. The struct and subsidiary data are in anl_context, @@ -230,6 +254,28 @@ typedef struct VacuumParams int nworkers; } VacuumParams; +/* + * VacDeadTuples stores the dead tuple TIDs collected during the heap scan. + * This is allocated in the DSM segment in parallel mode and in local memory + * in non-parallel mode. + */ +typedef struct VacDeadTuples +{ + int max_tuples; /* # slots allocated in array */ + int num_tuples; /* current # of entries */ + /* List of TIDs of tuples we intend to delete */ + /* NB: this list is ordered by TID address */ + ItemPointerData itemptrs[FLEXIBLE_ARRAY_MEMBER]; /* array of + * ItemPointerData */ +} VacDeadTuples; + +/* The dead tuple space consists of LVDeadTuples and dead tuple TIDs */ +#define SizeOfDeadTuples(cnt) \ + add_size(offsetof(VacDeadTuples, itemptrs), \ + mul_size(sizeof(ItemPointerData), cnt)) +#define MAXDEADTUPLES(max_size) \ + (((max_size) - offsetof(VacDeadTuples, itemptrs)) / sizeof(ItemPointerData)) + /* GUC parameters */ extern PGDLLIMPORT int default_statistics_target; /* PGDLLIMPORT for PostGIS */ extern int vacuum_freeze_min_age; @@ -282,6 +328,23 @@ extern bool vacuum_is_relation_owner(Oid relid, Form_pg_class reltuple, extern Relation vacuum_open_relation(Oid relid, RangeVar *relation, bits32 options, bool verbose, LOCKMODE lmode); +extern IndexBulkDeleteResult *vacuum_one_index(IndexVacuumInfo *ivinfo, + IndexBulkDeleteResult *istat, + VacDeadTuples *dead_tuples); +extern IndexBulkDeleteResult *cleanup_one_index(IndexVacuumInfo *ivinfo, + IndexBulkDeleteResult *istat); + +/* in commands/vacuumparallel.c */ +extern ParallelVacuumContext *begin_parallel_vacuum(ParallelVacuumCtl *pvctl); +extern void end_parallel_vacuum(ParallelVacuumContext *pvc, + IndexBulkDeleteResult **indstats); +extern VacDeadTuples *get_vacuum_dead_tuples(ParallelVacuumContext *pvc); +extern void perform_parallel_index_bulkdel(ParallelVacuumContext *pvc, + long num_table_tuples); +extern void perform_parallel_index_cleanup(ParallelVacuumContext *pvc, + long num_table_tuples, + bool estimated_count); +extern void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc); /* in commands/analyze.c */ extern void analyze_rel(Oid relid, RangeVar *relation, diff --git a/src/test/regress/expected/vacuum_parallel.out b/src/test/regress/expected/vacuum_parallel.out index ddf0ee544b..a7d8a801e0 100644 --- a/src/test/regress/expected/vacuum_parallel.out +++ b/src/test/regress/expected/vacuum_parallel.out @@ -45,5 +45,29 @@ VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table; INSERT INTO parallel_vacuum_table SELECT i FROM generate_series(1, 10000) i; RESET max_parallel_maintenance_workers; RESET min_parallel_index_scan_size; +CREATE TABLE parallel_vacuum_table2 (a int, b int4[]) WITH (autovacuum_enabled = off); +INSERT INTO parallel_vacuum_table2 SELECT g, ARRAY[1, 2, g] FROM generate_series(1, 10000) g; +-- Create different types of indexes, i.g. having different parallelvacuumoptions. +-- Also create a small index same as above. +CREATE INDEX pv_bt_index ON parallel_vacuum_table2 USING btree (a); +CREATE INDEX pv_hash_index ON parallel_vacuum_table2 USING hash (a); +CREATE INDEX pv_gin_index ON parallel_vacuum_table2 USING gin (b); +CREATE INDEX pv_brin_index ON parallel_vacuum_table2 USING brin (a); +CREATE INDEX pv_small_index ON parallel_vacuum_table2 USING btree ((1)); +-- Parallel index vacuum for various types of indexes. +DELETE FROM parallel_vacuum_table2; +VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table2; +-- Parallel index cleanup. +VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table2; +-- XXX: in order to execute index scan twice, we need about 200,000 garbage tuples +-- with the minimum maintenance_work_mem. However, it takes a long time to load. +INSERT INTO parallel_vacuum_table2 SELECT g, ARRAY[1, 2, g] FROM generate_series(1, 200000) g; +DELETE FROM parallel_vacuum_table2; +SET maintenance_work_mem TO 1024; +-- Parallel index vacuum for various types of indexes. +VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table2; +-- Parallel index cleanup. +VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table2; +RESET maintenance_work_mem; -- Deliberately don't drop table, to get further coverage from tools like -- pg_amcheck in some testing scenarios diff --git a/src/test/regress/sql/vacuum_parallel.sql b/src/test/regress/sql/vacuum_parallel.sql index 1d23f33e39..49f4f4ce6d 100644 --- a/src/test/regress/sql/vacuum_parallel.sql +++ b/src/test/regress/sql/vacuum_parallel.sql @@ -42,5 +42,40 @@ INSERT INTO parallel_vacuum_table SELECT i FROM generate_series(1, 10000) i; RESET max_parallel_maintenance_workers; RESET min_parallel_index_scan_size; +CREATE TABLE parallel_vacuum_table2 (a int, b int4[]) WITH (autovacuum_enabled = off); +INSERT INTO parallel_vacuum_table2 SELECT g, ARRAY[1, 2, g] FROM generate_series(1, 10000) g; + +-- Create different types of indexes, i.g. having different parallelvacuumoptions. +-- Also create a small index same as above. +CREATE INDEX pv_bt_index ON parallel_vacuum_table2 USING btree (a); +CREATE INDEX pv_hash_index ON parallel_vacuum_table2 USING hash (a); +CREATE INDEX pv_gin_index ON parallel_vacuum_table2 USING gin (b); +CREATE INDEX pv_brin_index ON parallel_vacuum_table2 USING brin (a); +CREATE INDEX pv_small_index ON parallel_vacuum_table2 USING btree ((1)); + + +-- Parallel index vacuum for various types of indexes. +DELETE FROM parallel_vacuum_table2; +VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table2; + +-- Parallel index cleanup. +VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table2; + +-- XXX: in order to execute index scan twice, we need about 200,000 garbage tuples +-- with the minimum maintenance_work_mem. However, it takes a long time to load. +INSERT INTO parallel_vacuum_table2 SELECT g, ARRAY[1, 2, g] FROM generate_series(1, 200000) g; + +DELETE FROM parallel_vacuum_table2; + +SET maintenance_work_mem TO 1024; + +-- Parallel index vacuum for various types of indexes. +VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table2; + +-- Parallel index cleanup. +VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table2; + +RESET maintenance_work_mem; + -- Deliberately don't drop table, to get further coverage from tools like -- pg_amcheck in some testing scenarios