diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c index bfb1ea0d25..16deb328bb 100644 --- a/src/backend/access/heap/vacuumlazy.c +++ b/src/backend/access/heap/vacuumlazy.c @@ -153,6 +153,7 @@ #define PARALLEL_VACUUM_KEY_QUERY_TEXT 3 #define PARALLEL_VACUUM_KEY_BUFFER_USAGE 4 #define PARALLEL_VACUUM_KEY_WAL_USAGE 5 +#define PARALLEL_VACUUM_KEY_INDEX_STATS 6 /* * Macro to check if we are in a parallel vacuum. If true, we are in the @@ -206,14 +207,6 @@ typedef struct LVShared Oid relid; int elevel; - /* - * An indication for vacuum workers to perform either index vacuum or - * index cleanup. first_time is true only if for_cleanup is true and - * bulk-deletion is not performed yet. - */ - bool for_cleanup; - bool first_time; - /* * Fields for both index vacuum and cleanup. * @@ -251,23 +244,18 @@ typedef struct LVShared */ pg_atomic_uint32 active_nworkers; - /* - * Variables to control parallel vacuum. We have a bitmap to indicate - * which index has stats in shared memory. The set bit in the map - * indicates that the particular index supports a parallel vacuum. - */ - pg_atomic_uint32 idx; /* counter for vacuuming and clean up */ - uint32 offset; /* sizeof header incl. bitmap */ - bits8 bitmap[FLEXIBLE_ARRAY_MEMBER]; /* bit map of NULLs */ - - /* Shared index statistics data follows at end of struct */ + /* Counter for vacuuming and cleanup */ + pg_atomic_uint32 idx; } LVShared; -#define SizeOfLVShared (offsetof(LVShared, bitmap) + sizeof(bits8)) -#define GetSharedIndStats(s) \ - ((LVSharedIndStats *)((char *)(s) + ((LVShared *)(s))->offset)) -#define IndStatsIsNull(s, i) \ - (!(((LVShared *)(s))->bitmap[(i) >> 3] & (1 << ((i) & 0x07)))) +/* Status used during parallel index vacuum or cleanup */ +typedef enum LVIndVacStatus +{ + INDVAC_STATUS_INITIAL = 0, + INDVAC_STATUS_NEED_BULKDELETE, + INDVAC_STATUS_NEED_CLEANUP, + INDVAC_STATUS_COMPLETED, +} LVIndVacStatus; /* * Struct for an index bulk-deletion statistic used for parallel vacuum. This @@ -275,7 +263,15 @@ typedef struct LVShared */ typedef struct LVSharedIndStats { - bool updated; /* are the stats updated? */ + LVIndVacStatus status; + + /* + * True if both leader and worker can process the index, otherwise only + * leader can process it. + */ + bool parallel_safe; + + bool istat_updated; /* are the stats updated? */ IndexBulkDeleteResult istat; } LVSharedIndStats; @@ -287,6 +283,9 @@ typedef struct LVParallelState /* Shared information among parallel vacuum workers */ LVShared *lvshared; + /* Shared index statistics among parallel vacuum workers */ + LVSharedIndStats *lvsharedindstats; + /* Points to buffer usage area in DSM */ BufferUsage *buffer_usage; @@ -416,18 +415,14 @@ static int lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno, static bool lazy_check_needs_freeze(Buffer buf, bool *hastup, LVRelState *vacrel); static bool lazy_check_wraparound_failsafe(LVRelState *vacrel); -static void do_parallel_lazy_vacuum_all_indexes(LVRelState *vacrel); -static void do_parallel_lazy_cleanup_all_indexes(LVRelState *vacrel); -static void do_parallel_vacuum_or_cleanup(LVRelState *vacrel, int nworkers); -static void do_parallel_processing(LVRelState *vacrel, - LVShared *lvshared); -static void do_serial_processing_for_unsafe_indexes(LVRelState *vacrel, - LVShared *lvshared); -static IndexBulkDeleteResult *parallel_process_one_index(Relation indrel, - IndexBulkDeleteResult *istat, - LVShared *lvshared, - LVSharedIndStats *shared_indstats, - LVRelState *vacrel); +static void parallel_lazy_vacuum_or_cleanup_all_indexes(LVRelState *vacrel, bool vacuum); +static void prepare_parallel_index_processing(LVRelState *vacrel, bool vacuum); +static void lazy_serial_process_indexes(LVRelState *vacrel); +static void lazy_parallel_process_indexes(LVRelState *vacrel, LVShared *lvshared, + LVSharedIndStats *indstats); +static void lazy_parallel_process_one_index(LVRelState *vacrel, Relation indrel, + LVShared *lvshared, + LVSharedIndStats *stats); static void lazy_cleanup_all_indexes(LVRelState *vacrel); static IndexBulkDeleteResult *lazy_vacuum_one_index(Relation indrel, IndexBulkDeleteResult *istat, @@ -450,16 +445,14 @@ static bool lazy_tid_reaped(ItemPointer itemptr, void *state); static int vac_cmp_itemptr(const void *left, const void *right); static bool heap_page_is_all_visible(LVRelState *vacrel, Buffer buf, TransactionId *visibility_cutoff_xid, bool *all_frozen); -static int compute_parallel_vacuum_workers(LVRelState *vacrel, - int nrequested, - bool *will_parallel_vacuum); +static int compute_parallel_vacuum_workers(LVRelState *vacrel, int nrequested); static void update_index_statistics(LVRelState *vacrel); static LVParallelState *begin_parallel_vacuum(LVRelState *vacrel, BlockNumber nblocks, int nrequested); static void end_parallel_vacuum(LVRelState *vacrel); -static LVSharedIndStats *parallel_stats_for_idx(LVShared *lvshared, int getidx); -static bool parallel_processing_is_safe(Relation indrel, LVShared *lvshared); +static bool parallel_processing_is_safe(LVRelState *vacrel, Relation indrel, + bool vacuum); static void vacuum_error_callback(void *arg); static void update_vacuum_error_info(LVRelState *vacrel, LVSavedErrInfo *saved_vacrel, @@ -2251,7 +2244,7 @@ lazy_vacuum_all_indexes(LVRelState *vacrel) else { /* Outsource everything to parallel variant */ - do_parallel_lazy_vacuum_all_indexes(vacrel); + parallel_lazy_vacuum_or_cleanup_all_indexes(vacrel, true); /* * Do a postcheck to consider applying wraparound failsafe now. Note @@ -2625,78 +2618,32 @@ lazy_check_wraparound_failsafe(LVRelState *vacrel) return false; } -/* - * Perform lazy_vacuum_all_indexes() steps in parallel - */ -static void -do_parallel_lazy_vacuum_all_indexes(LVRelState *vacrel) -{ - /* Tell parallel workers to do index vacuuming */ - vacrel->lps->lvshared->for_cleanup = false; - vacrel->lps->lvshared->first_time = false; - - /* - * We can only provide an approximate value of num_heap_tuples, at least - * for now. Matches serial VACUUM case. - */ - vacrel->lps->lvshared->reltuples = vacrel->old_live_tuples; - vacrel->lps->lvshared->estimated_count = true; - - do_parallel_vacuum_or_cleanup(vacrel, - vacrel->lps->nindexes_parallel_bulkdel); -} - -/* - * Perform lazy_cleanup_all_indexes() steps in parallel - */ -static void -do_parallel_lazy_cleanup_all_indexes(LVRelState *vacrel) -{ - int nworkers; - - /* - * If parallel vacuum is active we perform index cleanup with parallel - * workers. - * - * Tell parallel workers to do index cleanup. - */ - vacrel->lps->lvshared->for_cleanup = true; - vacrel->lps->lvshared->first_time = (vacrel->num_index_scans == 0); - - /* - * Now we can provide a better estimate of total number of surviving - * tuples (we assume indexes are more interested in that than in the - * number of nominally live tuples). - */ - vacrel->lps->lvshared->reltuples = vacrel->new_rel_tuples; - vacrel->lps->lvshared->estimated_count = - (vacrel->tupcount_pages < vacrel->rel_pages); - - /* Determine the number of parallel workers to launch */ - if (vacrel->lps->lvshared->first_time) - nworkers = vacrel->lps->nindexes_parallel_cleanup + - vacrel->lps->nindexes_parallel_condcleanup; - else - nworkers = vacrel->lps->nindexes_parallel_cleanup; - - do_parallel_vacuum_or_cleanup(vacrel, nworkers); -} - /* * Perform index vacuum or index cleanup with parallel workers. This function - * must be used by the parallel vacuum leader process. The caller must set - * lps->lvshared->for_cleanup to indicate whether to perform vacuum or - * cleanup. + * must be used by the parallel vacuum leader process. */ static void -do_parallel_vacuum_or_cleanup(LVRelState *vacrel, int nworkers) +parallel_lazy_vacuum_or_cleanup_all_indexes(LVRelState *vacrel, bool vacuum) { LVParallelState *lps = vacrel->lps; + int nworkers; Assert(!IsParallelWorker()); Assert(ParallelVacuumIsActive(vacrel)); Assert(vacrel->nindexes > 0); + /* Determine the number of parallel workers to launch */ + if (vacuum) + nworkers = vacrel->lps->nindexes_parallel_bulkdel; + else + { + nworkers = vacrel->lps->nindexes_parallel_cleanup; + + /* Add conditionally parallel-aware indexes if in the first time call */ + if (vacrel->num_index_scans == 0) + nworkers += vacrel->lps->nindexes_parallel_condcleanup; + } + /* The leader process will participate */ nworkers--; @@ -2707,17 +2654,18 @@ do_parallel_vacuum_or_cleanup(LVRelState *vacrel, int nworkers) */ nworkers = Min(nworkers, lps->pcxt->nworkers); + /* Set data required for parallel index vacuum or cleanup */ + prepare_parallel_index_processing(vacrel, vacuum); + + /* Reset the parallel index processing counter */ + pg_atomic_write_u32(&(lps->lvshared->idx), 0); + /* Setup the shared cost-based vacuum delay and launch workers */ if (nworkers > 0) { + /* Reinitialize the parallel context to relaunch parallel workers */ if (vacrel->num_index_scans > 0) - { - /* Reset the parallel index processing counter */ - pg_atomic_write_u32(&(lps->lvshared->idx), 0); - - /* Reinitialize the parallel context to relaunch parallel workers */ ReinitializeParallelDSM(lps->pcxt); - } /* * Set up shared cost balance and the number of active workers for @@ -2750,28 +2698,28 @@ do_parallel_vacuum_or_cleanup(LVRelState *vacrel, int nworkers) VacuumActiveNWorkers = &(lps->lvshared->active_nworkers); } - if (lps->lvshared->for_cleanup) + if (vacuum) ereport(elevel, - (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)", - "launched %d parallel vacuum workers for index cleanup (planned: %d)", + (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)", + "launched %d parallel vacuum workers for index vacuuming (planned: %d)", lps->pcxt->nworkers_launched), lps->pcxt->nworkers_launched, nworkers))); else ereport(elevel, - (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)", - "launched %d parallel vacuum workers for index vacuuming (planned: %d)", + (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)", + "launched %d parallel vacuum workers for index cleanup (planned: %d)", lps->pcxt->nworkers_launched), lps->pcxt->nworkers_launched, nworkers))); } /* Process the indexes that can be processed by only leader process */ - do_serial_processing_for_unsafe_indexes(vacrel, lps->lvshared); + lazy_serial_process_indexes(vacrel); /* - * Join as a parallel worker. The leader process alone processes all the - * indexes in the case where no workers are launched. + * Join as a parallel worker. The leader process alone processes all + * parallel-safe indexes in the case where no workers are launched. */ - do_parallel_processing(vacrel, lps->lvshared); + lazy_parallel_process_indexes(vacrel, lps->lvshared, vacrel->lps->lvsharedindstats); /* * Next, accumulate buffer and WAL usage. (This must wait for the workers @@ -2786,6 +2734,18 @@ do_parallel_vacuum_or_cleanup(LVRelState *vacrel, int nworkers) InstrAccumParallelQuery(&lps->buffer_usage[i], &lps->wal_usage[i]); } + /* + * Reset all index status back to invalid (while checking that we have + * processed all indexes). + */ + for (int i = 0; i < vacrel->nindexes; i++) + { + LVSharedIndStats *stats = &(lps->lvsharedindstats[i]); + + Assert(stats->status == INDVAC_STATUS_COMPLETED); + stats->status = INDVAC_STATUS_INITIAL; + } + /* * Carry the shared balance value to heap scan and disable shared costing */ @@ -2797,12 +2757,62 @@ do_parallel_vacuum_or_cleanup(LVRelState *vacrel, int nworkers) } } + +/* + * This function prepares the shared data for parallel index vacuum or cleanup, + * and set index vacuum status accordingly. + */ +static void +prepare_parallel_index_processing(LVRelState *vacrel, bool vacuum) +{ + LVIndVacStatus next_status; + + if (vacuum) + { + /* + * We can only provide an approximate value of num_heap_tuples, at least + * for now. Matches serial VACUUM case. + */ + vacrel->lps->lvshared->reltuples = vacrel->old_live_tuples; + vacrel->lps->lvshared->estimated_count = true; + + next_status = INDVAC_STATUS_NEED_BULKDELETE; + } + else + { + /* + * We can provide a better estimate of total number of surviving + * tuples (we assume indexes are more interested in that than in the + * number of nominally live tuples). + */ + vacrel->lps->lvshared->reltuples = vacrel->new_rel_tuples; + vacrel->lps->lvshared->estimated_count = + (vacrel->tupcount_pages < vacrel->rel_pages); + + next_status = INDVAC_STATUS_NEED_CLEANUP; + } + + /* Set index vacuum status and mark as parallel safe or not */ + for (int i = 0; i < vacrel->nindexes; i++) + { + LVSharedIndStats *stats = &(vacrel->lps->lvsharedindstats[i]); + + Assert(stats->status == INDVAC_STATUS_INITIAL); + + stats->status = next_status; + stats->parallel_safe = parallel_processing_is_safe(vacrel, + vacrel->indrels[i], + vacuum); + } +} + /* * Index vacuum/cleanup routine used by the leader process and parallel * vacuum worker processes to process the indexes in parallel. */ static void -do_parallel_processing(LVRelState *vacrel, LVShared *lvshared) +lazy_parallel_process_indexes(LVRelState *vacrel, LVShared *lvshared, + LVSharedIndStats *indstats) { /* * Increment the active worker count if we are able to launch any worker. @@ -2810,13 +2820,10 @@ do_parallel_processing(LVRelState *vacrel, LVShared *lvshared) if (VacuumActiveNWorkers) pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1); - /* Loop until all indexes are vacuumed */ for (;;) { - int idx; - LVSharedIndStats *shared_istat; - Relation indrel; - IndexBulkDeleteResult *istat; + int idx; + LVSharedIndStats *stats; /* Get an index number to process */ idx = pg_atomic_fetch_add_u32(&(lvshared->idx), 1); @@ -2825,28 +2832,17 @@ do_parallel_processing(LVRelState *vacrel, LVShared *lvshared) if (idx >= vacrel->nindexes) break; - /* Get the index statistics space from DSM, if any */ - shared_istat = parallel_stats_for_idx(lvshared, idx); - - /* Skip indexes not participating in parallelism */ - if (shared_istat == NULL) - continue; - - indrel = vacrel->indrels[idx]; + stats = &(indstats[idx]); /* - * Skip processing indexes that are unsafe for workers (these are - * processed in do_serial_processing_for_unsafe_indexes() by leader) + * Parallel unsafe indexes can be processed only by leader (these are + * processed in lazy_serial_process_indexes() by leader. */ - if (!parallel_processing_is_safe(indrel, lvshared)) + if (!stats->parallel_safe) continue; - /* Do vacuum or cleanup of the index */ - istat = vacrel->indstats[idx]; - vacrel->indstats[idx] = parallel_process_one_index(indrel, istat, - lvshared, - shared_istat, - vacrel); + lazy_parallel_process_one_index(vacrel, vacrel->indrels[idx], + lvshared, stats); } /* @@ -2861,16 +2857,16 @@ do_parallel_processing(LVRelState *vacrel, LVShared *lvshared) * Perform parallel processing of indexes in leader process. * * Handles index vacuuming (or index cleanup) for indexes that are not - * parallel safe. It's possible that this will vary for a given index, based - * on details like whether we're performing for_cleanup processing right now. + * parallel safe. * * Also performs processing of smaller indexes that fell under the size cutoff - * enforced by compute_parallel_vacuum_workers(). These indexes never get a - * slot for statistics in DSM. + * enforced by compute_parallel_vacuum_workers(). */ static void -do_serial_processing_for_unsafe_indexes(LVRelState *vacrel, LVShared *lvshared) +lazy_serial_process_indexes(LVRelState *vacrel) { + LVParallelState *lps = vacrel->lps; + Assert(!IsParallelWorker()); /* @@ -2879,30 +2875,16 @@ do_serial_processing_for_unsafe_indexes(LVRelState *vacrel, LVShared *lvshared) if (VacuumActiveNWorkers) pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1); - for (int idx = 0; idx < vacrel->nindexes; idx++) + for (int i = 0; i < vacrel->nindexes; i++) { - LVSharedIndStats *shared_istat; - Relation indrel; - IndexBulkDeleteResult *istat; - - shared_istat = parallel_stats_for_idx(lvshared, idx); - indrel = vacrel->indrels[idx]; + LVSharedIndStats *stats = &(lps->lvsharedindstats[i]); - /* - * We're only here for the indexes that parallel workers won't - * process. Note that the shared_istat test ensures that we process - * indexes that fell under initial size cutoff. - */ - if (shared_istat != NULL && - parallel_processing_is_safe(indrel, lvshared)) + /* Skip, safe indexes as they are processed by workers */ + if (stats->parallel_safe) continue; - /* Do vacuum or cleanup of the index */ - istat = vacrel->indstats[idx]; - vacrel->indstats[idx] = parallel_process_one_index(indrel, istat, - lvshared, - shared_istat, - vacrel); + lazy_parallel_process_one_index(vacrel, vacrel->indrels[i], + lps->lvshared, stats); } /* @@ -2919,29 +2901,33 @@ do_serial_processing_for_unsafe_indexes(LVRelState *vacrel, LVShared *lvshared) * statistics returned from ambulkdelete and amvacuumcleanup to the DSM * segment. */ -static IndexBulkDeleteResult * -parallel_process_one_index(Relation indrel, - IndexBulkDeleteResult *istat, - LVShared *lvshared, - LVSharedIndStats *shared_istat, - LVRelState *vacrel) +static void +lazy_parallel_process_one_index(LVRelState *vacrel, Relation indrel, LVShared *lvshared, + LVSharedIndStats *stats) { + IndexBulkDeleteResult *istat = NULL; IndexBulkDeleteResult *istat_res; - /* - * Update the pointer to the corresponding bulk-deletion result if someone - * has already updated it - */ - if (shared_istat && shared_istat->updated && istat == NULL) - istat = &shared_istat->istat; + /* Get the index statistics space, if already updated */ + if (stats->istat_updated) + istat = &(stats->istat); - /* Do vacuum or cleanup of the index */ - if (lvshared->for_cleanup) - istat_res = lazy_cleanup_one_index(indrel, istat, lvshared->reltuples, - lvshared->estimated_count, vacrel); - else - istat_res = lazy_vacuum_one_index(indrel, istat, lvshared->reltuples, - vacrel); + switch (stats->status) + { + case INDVAC_STATUS_NEED_BULKDELETE: + istat_res = lazy_vacuum_one_index(indrel, istat, + lvshared->reltuples, vacrel); + break; + case INDVAC_STATUS_NEED_CLEANUP: + istat_res = lazy_cleanup_one_index(indrel, istat, + lvshared->reltuples, + lvshared->estimated_count, + vacrel); + break; + default: + elog(ERROR, "unexpected parallel vacuum index status %d", + stats->status); + } /* * Copy the index bulk-deletion result returned from ambulkdelete and @@ -2955,19 +2941,18 @@ parallel_process_one_index(Relation indrel, * Since all vacuum workers write the bulk-deletion result at different * slots we can write them without locking. */ - if (shared_istat && !shared_istat->updated && istat_res != NULL) + if (!stats->istat_updated && istat_res != NULL) { - memcpy(&shared_istat->istat, istat_res, sizeof(IndexBulkDeleteResult)); - shared_istat->updated = true; - - /* Free the locally-allocated bulk-deletion result */ + memcpy(&(stats->istat), istat_res, sizeof(IndexBulkDeleteResult)); + stats->istat_updated = true; pfree(istat_res); - - /* return the pointer to the result from shared memory */ - return &shared_istat->istat; } - return istat_res; + /* + * Update the status to completed. No need to lock here since each + * worker touches different indexes. + */ + stats->status = INDVAC_STATUS_COMPLETED; } /* @@ -3002,7 +2987,7 @@ lazy_cleanup_all_indexes(LVRelState *vacrel) else { /* Outsource everything to parallel variant */ - do_parallel_lazy_cleanup_all_indexes(vacrel); + parallel_lazy_vacuum_or_cleanup_all_indexes(vacrel, false); } } @@ -3734,13 +3719,10 @@ heap_page_is_all_visible(LVRelState *vacrel, Buffer buf, * * nrequested is the number of parallel workers that user requested. If * nrequested is 0, we compute the parallel degree based on nindexes, that is - * the number of indexes that support parallel vacuum. This function also - * sets will_parallel_vacuum to remember indexes that participate in parallel - * vacuum. + * the number of indexes that support parallel vacuum. */ static int -compute_parallel_vacuum_workers(LVRelState *vacrel, int nrequested, - bool *will_parallel_vacuum) +compute_parallel_vacuum_workers(LVRelState *vacrel, int nrequested) { int nindexes_parallel = 0; int nindexes_parallel_bulkdel = 0; @@ -3766,8 +3748,6 @@ compute_parallel_vacuum_workers(LVRelState *vacrel, int nrequested, RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size) continue; - will_parallel_vacuum[idx] = true; - if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0) nindexes_parallel_bulkdel++; if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) || @@ -3840,14 +3820,15 @@ begin_parallel_vacuum(LVRelState *vacrel, BlockNumber nblocks, Relation *indrels = vacrel->indrels; int nindexes = vacrel->nindexes; ParallelContext *pcxt; + LVSharedIndStats *indstats; LVShared *shared; LVDeadTuples *dead_tuples; BufferUsage *buffer_usage; WalUsage *wal_usage; - bool *will_parallel_vacuum; long maxtuples; - Size est_shared; - Size est_deadtuples; + Size est_indstats = 0; + Size est_shared = 0; + Size est_deadtuples = 0; int nindexes_mwm = 0; int parallel_workers = 0; int querylen; @@ -3862,17 +3843,11 @@ begin_parallel_vacuum(LVRelState *vacrel, BlockNumber nblocks, /* * Compute the number of parallel vacuum workers to launch */ - will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes); - parallel_workers = compute_parallel_vacuum_workers(vacrel, - nrequested, - will_parallel_vacuum); + parallel_workers = compute_parallel_vacuum_workers(vacrel, nrequested); /* Can't perform vacuum in parallel */ if (parallel_workers <= 0) - { - pfree(will_parallel_vacuum); return lps; - } lps = (LVParallelState *) palloc0(sizeof(LVParallelState)); @@ -3882,41 +3857,13 @@ begin_parallel_vacuum(LVRelState *vacrel, BlockNumber nblocks, Assert(pcxt->nworkers > 0); lps->pcxt = pcxt; - /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */ - est_shared = MAXALIGN(add_size(SizeOfLVShared, BITMAPLEN(nindexes))); - for (int idx = 0; idx < nindexes; idx++) - { - Relation indrel = indrels[idx]; - uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions; - - /* - * Cleanup option should be either disabled, always performing in - * parallel or conditionally performing in parallel. - */ - Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) || - ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0)); - Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE); - - /* Skip indexes that don't participate in parallel vacuum */ - if (!will_parallel_vacuum[idx]) - continue; - - if (indrel->rd_indam->amusemaintenanceworkmem) - nindexes_mwm++; - - est_shared = add_size(est_shared, sizeof(LVSharedIndStats)); + /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_STATS */ + est_indstats = mul_size(sizeof(LVSharedIndStats), nindexes); + shm_toc_estimate_chunk(&pcxt->estimator, est_indstats); + shm_toc_estimate_keys(&pcxt->estimator, 1); - /* - * Remember the number of indexes that support parallel operation for - * each phase. - */ - if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0) - lps->nindexes_parallel_bulkdel++; - if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) - lps->nindexes_parallel_cleanup++; - if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0) - lps->nindexes_parallel_condcleanup++; - } + /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */ + est_shared = MAXALIGN(sizeof(LVShared)); shm_toc_estimate_chunk(&pcxt->estimator, est_shared); shm_toc_estimate_keys(&pcxt->estimator, 1); @@ -3953,6 +3900,45 @@ begin_parallel_vacuum(LVRelState *vacrel, BlockNumber nblocks, InitializeParallelDSM(pcxt); + /* Prepare index vacuum stats */ + indstats = (LVSharedIndStats *) shm_toc_allocate(pcxt->toc, est_indstats); + MemSet(indstats, 0, est_indstats); + for (int idx = 0; idx < nindexes; idx++) + { + Relation indrel = indrels[idx]; + uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions; + + /* + * Cleanup option should be either disabled, always performing in + * parallel or conditionally performing in parallel. + */ + Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) || + ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0)); + Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE); + + /* Skip indexes that don't participate in parallel vacuum */ + if (vacoptions == VACUUM_OPTION_NO_PARALLEL || + RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size) + continue; + + if (indrel->rd_indam->amusemaintenanceworkmem) + nindexes_mwm++; + + /* + * Remember the number of indexes that support parallel operation for + * each phase. + */ + if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0) + lps->nindexes_parallel_bulkdel++; + if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) + lps->nindexes_parallel_cleanup++; + if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0) + lps->nindexes_parallel_condcleanup++; + } + + shm_toc_insert(pcxt->toc,PARALLEL_VACUUM_KEY_INDEX_STATS, indstats); + lps->lvsharedindstats = indstats; + /* Prepare shared information */ shared = (LVShared *) shm_toc_allocate(pcxt->toc, est_shared); MemSet(shared, 0, est_shared); @@ -3966,21 +3952,6 @@ begin_parallel_vacuum(LVRelState *vacrel, BlockNumber nblocks, pg_atomic_init_u32(&(shared->cost_balance), 0); pg_atomic_init_u32(&(shared->active_nworkers), 0); pg_atomic_init_u32(&(shared->idx), 0); - shared->offset = MAXALIGN(add_size(SizeOfLVShared, BITMAPLEN(nindexes))); - - /* - * Initialize variables for shared index statistics, set NULL bitmap and - * the size of stats for each index. - */ - memset(shared->bitmap, 0x00, BITMAPLEN(nindexes)); - for (int idx = 0; idx < nindexes; idx++) - { - if (!will_parallel_vacuum[idx]) - continue; - - /* Set NOT NULL as this index does support parallelism */ - shared->bitmap[idx >> 3] |= 1 << (idx & 0x07); - } shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared); lps->lvshared = shared; @@ -4018,7 +3989,6 @@ begin_parallel_vacuum(LVRelState *vacrel, BlockNumber nblocks, PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery); } - pfree(will_parallel_vacuum); return lps; } @@ -4043,21 +4013,12 @@ end_parallel_vacuum(LVRelState *vacrel) /* Copy the updated statistics */ for (int idx = 0; idx < nindexes; idx++) { - LVSharedIndStats *shared_istat; - - shared_istat = parallel_stats_for_idx(lps->lvshared, idx); + LVSharedIndStats *stats = &(lps->lvsharedindstats[idx]); - /* - * Skip index -- it must have been processed by the leader, from - * inside do_serial_processing_for_unsafe_indexes() - */ - if (shared_istat == NULL) - continue; - - if (shared_istat->updated) + if (stats->istat_updated) { indstats[idx] = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult)); - memcpy(indstats[idx], &shared_istat->istat, sizeof(IndexBulkDeleteResult)); + memcpy(indstats[idx], &stats->istat, sizeof(IndexBulkDeleteResult)); } else indstats[idx] = NULL; @@ -4071,68 +4032,42 @@ end_parallel_vacuum(LVRelState *vacrel) vacrel->lps = NULL; } -/* - * Return shared memory statistics for index at offset 'getidx', if any - * - * Returning NULL indicates that compute_parallel_vacuum_workers() determined - * that the index is a totally unsuitable target for all parallel processing - * up front. For example, the index could be < min_parallel_index_scan_size - * cutoff. - */ -static LVSharedIndStats * -parallel_stats_for_idx(LVShared *lvshared, int getidx) -{ - char *p; - - if (IndStatsIsNull(lvshared, getidx)) - return NULL; - - p = (char *) GetSharedIndStats(lvshared); - for (int idx = 0; idx < getidx; idx++) - { - if (IndStatsIsNull(lvshared, idx)) - continue; - - p += sizeof(LVSharedIndStats); - } - - return (LVSharedIndStats *) p; -} - /* * Returns false, if the given index can't participate in parallel index - * vacuum or parallel index cleanup + * vacuum or parallel index cleanup. */ static bool -parallel_processing_is_safe(Relation indrel, LVShared *lvshared) +parallel_processing_is_safe(LVRelState *vacrel, Relation indrel, bool vacuum) { - uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions; + uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions; + + /* + * Check if the index is a totally unsuitable target for all parallel + * processing up front. For example, the index could be + * < min_parallel_index_scan_size cufoff. + */ + if (vacoptions == VACUUM_OPTION_NO_PARALLEL || + RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size) + return false; - /* first_time must be true only if for_cleanup is true */ - Assert(lvshared->for_cleanup || !lvshared->first_time); + /* In parallel vacuum case, check if it supports parallel bulk-deletion */ + if (vacuum) + return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0); - if (lvshared->for_cleanup) - { - /* Skip, if the index does not support parallel cleanup */ - if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) && - ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0)) - return true; + /* Not safe, if the index does not support parallel cleanup */ + if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) && + ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0)) + return false; - /* - * Skip, if the index supports parallel cleanup conditionally, but we - * have already processed the index (for bulkdelete). See the - * comments for option VACUUM_OPTION_PARALLEL_COND_CLEANUP to know - * when indexes support parallel cleanup conditionally. - */ - if (!lvshared->first_time && - ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)) - return false; - } - else if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) == 0) - { - /* Skip if the index does not support parallel bulk deletion */ + /* + * Not safe, if the index supports parallel cleanup conditionally, + * but we have already processed the index (for bulkdelete). See the + * comments for option VACUUM_OPTION_PARALLEL_COND_CLEANUP to know + * when indexes support parallel cleanup conditionally. + */ + if (vacrel->num_index_scans > 0 && + ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)) return false; - } return true; } @@ -4148,6 +4083,7 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) { Relation rel; Relation *indrels; + LVSharedIndStats *lvindstats; LVShared *lvshared; LVDeadTuples *dead_tuples; BufferUsage *buffer_usage; @@ -4161,10 +4097,7 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) false); elevel = lvshared->elevel; - if (lvshared->for_cleanup) - elog(DEBUG1, "starting parallel vacuum worker for cleanup"); - else - elog(DEBUG1, "starting parallel vacuum worker for bulk delete"); + elog(DEBUG1, "starting parallel vacuum worker"); /* Set debug_query_string for individual workers */ sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true); @@ -4185,6 +4118,11 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels); Assert(nindexes > 0); + /* Set index statistics */ + lvindstats = (LVSharedIndStats *) shm_toc_lookup(toc, + PARALLEL_VACUUM_KEY_INDEX_STATS, + false); + /* Set dead tuple space */ dead_tuples = (LVDeadTuples *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_DEAD_TUPLES, @@ -4230,7 +4168,7 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) InstrStartParallelQuery(); /* Process indexes to perform vacuum/cleanup */ - do_parallel_processing(&vacrel, lvshared); + lazy_parallel_process_indexes(&vacrel, lvshared, lvindstats); /* Report buffer/WAL usage during parallel execution */ buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false); diff --git a/src/test/regress/expected/vacuum_parallel.out b/src/test/regress/expected/vacuum_parallel.out index ddf0ee544b..a07f5b2b73 100644 --- a/src/test/regress/expected/vacuum_parallel.out +++ b/src/test/regress/expected/vacuum_parallel.out @@ -45,5 +45,25 @@ VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table; INSERT INTO parallel_vacuum_table SELECT i FROM generate_series(1, 10000) i; RESET max_parallel_maintenance_workers; RESET min_parallel_index_scan_size; +CREATE TABLE parallel_vacuum_table2 (a int, b int4[]) WITH (autovacuum_enabled = off); +INSERT INTO parallel_vacuum_table2 SELECT g, ARRAY[1, 2, g] FROM generate_series(1, 10000) g; +CREATE INDEX pv_bt_index ON parallel_vacuum_table2 USING btree (a); +CREATE INDEX pv_hash_index ON parallel_vacuum_table2 USING hash (a); +CREATE INDEX pv_gin_index ON parallel_vacuum_table2 USING gin (b); +CREATE INDEX pv_brin_index ON parallel_vacuum_table2 USING brin (a); +CREATE INDEX pv_small_index ON parallel_vacuum_table2 USING btree ((1)); +DELETE FROM parallel_vacuum_table2; +-- Parallel index vacuum for various types of indexes. +VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table2; +-- Parallel index cleanup. +VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table2; +INSERT INTO parallel_vacuum_table2 SELECT g, ARRAY[1, 2, g] FROM generate_series(1, 200000) g; +DELETE FROM parallel_vacuum_table2; +SET maintenance_work_mem TO 1024; +-- Parallel index vacuum for various types of indexes. +VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table2; +-- Parallel index cleanup. +VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table2; +RESET maintenance_work_mem; -- Deliberately don't drop table, to get further coverage from tools like -- pg_amcheck in some testing scenarios diff --git a/src/test/regress/sql/vacuum_parallel.sql b/src/test/regress/sql/vacuum_parallel.sql index 1d23f33e39..49f4f4ce6d 100644 --- a/src/test/regress/sql/vacuum_parallel.sql +++ b/src/test/regress/sql/vacuum_parallel.sql @@ -42,5 +42,40 @@ INSERT INTO parallel_vacuum_table SELECT i FROM generate_series(1, 10000) i; RESET max_parallel_maintenance_workers; RESET min_parallel_index_scan_size; +CREATE TABLE parallel_vacuum_table2 (a int, b int4[]) WITH (autovacuum_enabled = off); +INSERT INTO parallel_vacuum_table2 SELECT g, ARRAY[1, 2, g] FROM generate_series(1, 10000) g; + +-- Create different types of indexes, i.g. having different parallelvacuumoptions. +-- Also create a small index same as above. +CREATE INDEX pv_bt_index ON parallel_vacuum_table2 USING btree (a); +CREATE INDEX pv_hash_index ON parallel_vacuum_table2 USING hash (a); +CREATE INDEX pv_gin_index ON parallel_vacuum_table2 USING gin (b); +CREATE INDEX pv_brin_index ON parallel_vacuum_table2 USING brin (a); +CREATE INDEX pv_small_index ON parallel_vacuum_table2 USING btree ((1)); + + +-- Parallel index vacuum for various types of indexes. +DELETE FROM parallel_vacuum_table2; +VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table2; + +-- Parallel index cleanup. +VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table2; + +-- XXX: in order to execute index scan twice, we need about 200,000 garbage tuples +-- with the minimum maintenance_work_mem. However, it takes a long time to load. +INSERT INTO parallel_vacuum_table2 SELECT g, ARRAY[1, 2, g] FROM generate_series(1, 200000) g; + +DELETE FROM parallel_vacuum_table2; + +SET maintenance_work_mem TO 1024; + +-- Parallel index vacuum for various types of indexes. +VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table2; + +-- Parallel index cleanup. +VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table2; + +RESET maintenance_work_mem; + -- Deliberately don't drop table, to get further coverage from tools like -- pg_amcheck in some testing scenarios