#include "access/heapam_xlog.h"
#include "access/htup_details.h"
#include "access/multixact.h"
-#include "access/parallel.h"
#include "access/transam.h"
#include "access/visibilitymap.h"
#include "access/xact.h"
*/
#define PREFETCH_SIZE ((BlockNumber) 32)
-/*
- * DSM keys for parallel vacuum. Unlike other parallel execution code, since
- * we don't need to worry about DSM keys conflicting with plan_node_id we can
- * use small integers.
- */
-#define PARALLEL_VACUUM_KEY_SHARED 1
-#define PARALLEL_VACUUM_KEY_DEAD_ITEMS 2
-#define PARALLEL_VACUUM_KEY_QUERY_TEXT 3
-#define PARALLEL_VACUUM_KEY_BUFFER_USAGE 4
-#define PARALLEL_VACUUM_KEY_WAL_USAGE 5
-#define PARALLEL_VACUUM_KEY_INDEX_STATS 6
-
/*
* Macro to check if we are in a parallel vacuum. If true, we are in the
* parallel mode and the DSM segment is initialized.
*/
-#define ParallelVacuumIsActive(vacrel) ((vacrel)->lps != NULL)
+#define ParallelVacuumIsActive(vacrel) ((vacrel)->pvs != NULL)
/* Phases of vacuum during which we report error context. */
typedef enum
VACUUM_ERRCB_PHASE_TRUNCATE
} VacErrPhase;
-/*
- * Shared information among parallel workers. So this is allocated in the DSM
- * segment.
- */
-typedef struct LVShared
-{
- /*
- * Target table relid and log level. These fields are not modified during
- * the lazy vacuum.
- */
- Oid relid;
- int elevel;
-
- /*
- * Fields for both index vacuum and cleanup.
- *
- * reltuples is the total number of input heap tuples. We set either old
- * live tuples in the index vacuum case or the new live tuples in the
- * index cleanup case.
- *
- * estimated_count is true if reltuples is an estimated value. (Note that
- * reltuples could be -1 in this case, indicating we have no idea.)
- */
- double reltuples;
- bool estimated_count;
-
- /*
- * In single process lazy vacuum we could consume more memory during index
- * vacuuming or cleanup apart from the memory for heap scanning. In
- * parallel vacuum, since individual vacuum workers can consume memory
- * equal to maintenance_work_mem, the new maintenance_work_mem for each
- * worker is set such that the parallel operation doesn't consume more
- * memory than single process lazy vacuum.
- */
- int maintenance_work_mem_worker;
-
- /*
- * Shared vacuum cost balance. During parallel vacuum,
- * VacuumSharedCostBalance points to this value and it accumulates the
- * balance of each parallel vacuum worker.
- */
- pg_atomic_uint32 cost_balance;
-
- /*
- * Number of active parallel workers. This is used for computing the
- * minimum threshold of the vacuum cost balance before a worker sleeps for
- * cost-based delay.
- */
- pg_atomic_uint32 active_nworkers;
-
- /* Counter for vacuuming and cleanup */
- pg_atomic_uint32 idx;
-} LVShared;
-
-/* Status used during parallel index vacuum or cleanup */
-typedef enum LVParallelIndVacStatus
-{
- PARALLEL_INDVAC_STATUS_INITIAL = 0,
- PARALLEL_INDVAC_STATUS_NEED_BULKDELETE,
- PARALLEL_INDVAC_STATUS_NEED_CLEANUP,
- PARALLEL_INDVAC_STATUS_COMPLETED
-} LVParallelIndVacStatus;
-
-/*
- * Struct for index vacuum statistics of an index that is used for parallel vacuum.
- * This includes the status of parallel index vacuum as well as index statistics.
- */
-typedef struct LVParallelIndStats
-{
- /*
- * The following two fields are set by leader process before executing
- * parallel index vacuum or parallel index cleanup. These fields are not
- * fixed for the entire VACUUM operation. They are only fixed for an
- * individual parallel index vacuum and cleanup.
- *
- * parallel_workers_can_process is true if both leader and worker can
- * process the index, otherwise only leader can process it.
- */
- LVParallelIndVacStatus status;
- bool parallel_workers_can_process;
-
- /*
- * Individual worker or leader stores the result of index vacuum or
- * cleanup.
- */
- bool istat_updated; /* are the stats updated? */
- IndexBulkDeleteResult istat;
-} LVParallelIndStats;
-
-/* Struct for maintaining a parallel vacuum state. */
-typedef struct LVParallelState
-{
- ParallelContext *pcxt;
-
- /* Shared information among parallel vacuum workers */
- LVShared *lvshared;
-
- /*
- * Shared index statistics among parallel vacuum workers. The array
- * element is allocated for every index, even those indexes where parallel
- * index vacuuming is unsafe or not worthwhile (e.g.,
- * will_parallel_vacuum[] is false). During parallel vacuum,
- * IndexBulkDeleteResult of each index is kept in DSM and is copied into
- * local memory at the end of parallel vacuum.
- */
- LVParallelIndStats *lvpindstats;
-
- /* Points to buffer usage area in DSM */
- BufferUsage *buffer_usage;
-
- /* Points to WAL usage area in DSM */
- WalUsage *wal_usage;
-
- /*
- * False if the index is totally unsuitable target for all parallel
- * processing. For example, the index could be <
- * min_parallel_index_scan_size cutoff.
- */
- bool *will_parallel_vacuum;
-
- /*
- * The number of indexes that support parallel index bulk-deletion and
- * parallel index cleanup respectively.
- */
- int nindexes_parallel_bulkdel;
- int nindexes_parallel_cleanup;
- int nindexes_parallel_condcleanup;
-} LVParallelState;
-
typedef struct LVRelState
{
/* Target heap relation and its indexes */
bool do_index_cleanup;
bool do_rel_truncate;
- /* Buffer access strategy and parallel state */
+ /* Buffer access strategy and parallel vacuum state */
BufferAccessStrategy bstrategy;
- LVParallelState *lps;
+ ParallelVacuumState *pvs;
/* rel's initial relfrozenxid and relminmxid */
TransactionId relfrozenxid;
LVRelState *vacrel);
static bool lazy_check_wraparound_failsafe(LVRelState *vacrel);
static void lazy_cleanup_all_indexes(LVRelState *vacrel);
-static void parallel_vacuum_process_all_indexes(LVRelState *vacrel, bool vacuum);
-static void parallel_vacuum_process_safe_indexes(LVRelState *vacrel, LVShared *shared,
- LVParallelIndStats *pindstats);
-static void parallel_vacuum_process_unsafe_indexes(LVRelState *vacrel);
-static void parallel_vacuum_process_one_index(LVRelState *vacrel, Relation indrel,
- LVShared *shared,
- LVParallelIndStats *pindstats);
static IndexBulkDeleteResult *lazy_vacuum_one_index(Relation indrel,
IndexBulkDeleteResult *istat,
double reltuples,
static void lazy_truncate_heap(LVRelState *vacrel);
static BlockNumber count_nondeletable_pages(LVRelState *vacrel,
bool *lock_waiter_detected);
-static int dead_items_max_items(LVRelState *vacrel);
static void dead_items_alloc(LVRelState *vacrel, int nworkers);
static void dead_items_cleanup(LVRelState *vacrel);
static bool heap_page_is_all_visible(LVRelState *vacrel, Buffer buf,
TransactionId *visibility_cutoff_xid, bool *all_frozen);
-static int parallel_vacuum_compute_workers(LVRelState *vacrel, int nrequested,
- bool *will_parallel_vacuum);
static void update_index_statistics(LVRelState *vacrel);
-static void parallel_vacuum_begin(LVRelState *vacrel, int nrequested);
-static void parallel_vacuum_end(LVRelState *vacrel);
-static bool parallel_vacuum_index_is_parallel_safe(LVRelState *vacrel, Relation indrel,
- bool vacuum);
static void vacuum_error_callback(void *arg);
static void update_vacuum_error_info(LVRelState *vacrel,
LVSavedErrInfo *saved_vacrel,
/*
* Free resources managed by dead_items_alloc. This will end parallel
- * mode when needed (it must end before we update index statistics).
+ * mode when needed (it must end before updating index statistics as we
+ * can't write in parallel mode).
*/
dead_items_cleanup(vacrel);
/* Should not end up here with no indexes */
Assert(vacrel->nindexes > 0);
- Assert(!IsParallelWorker());
Assert(vacrel->lpdead_item_pages > 0);
if (!vacrel->do_index_vacuuming)
{
bool allindexes = true;
- Assert(!IsParallelWorker());
Assert(vacrel->nindexes > 0);
Assert(vacrel->do_index_vacuuming);
Assert(vacrel->do_index_cleanup);
else
{
/* Outsource everything to parallel variant */
- parallel_vacuum_process_all_indexes(vacrel, true);
+ parallel_vacuum_bulkdel_all_indexes(vacrel->pvs, vacrel->old_live_tuples,
+ vacrel->num_index_scans);
/*
* Do a postcheck to consider applying wraparound failsafe now. Note
return false;
}
-/*
- * Perform index vacuum or index cleanup with parallel workers. This function
- * must be used by the parallel vacuum leader process.
- */
-static void
-parallel_vacuum_process_all_indexes(LVRelState *vacrel, bool vacuum)
-{
- LVParallelState *lps = vacrel->lps;
- LVParallelIndVacStatus new_status;
- int nworkers;
-
- Assert(!IsParallelWorker());
- Assert(ParallelVacuumIsActive(vacrel));
- Assert(vacrel->nindexes > 0);
-
- if (vacuum)
- {
- /*
- * We can only provide an approximate value of num_heap_tuples, at
- * least for now. Matches serial VACUUM case.
- */
- vacrel->lps->lvshared->reltuples = vacrel->old_live_tuples;
- vacrel->lps->lvshared->estimated_count = true;
-
- new_status = PARALLEL_INDVAC_STATUS_NEED_BULKDELETE;
-
- /* Determine the number of parallel workers to launch */
- nworkers = vacrel->lps->nindexes_parallel_bulkdel;
- }
- else
- {
- /*
- * We can provide a better estimate of total number of surviving
- * tuples (we assume indexes are more interested in that than in the
- * number of nominally live tuples).
- */
- vacrel->lps->lvshared->reltuples = vacrel->new_rel_tuples;
- vacrel->lps->lvshared->estimated_count =
- (vacrel->tupcount_pages < vacrel->rel_pages);
-
- new_status = PARALLEL_INDVAC_STATUS_NEED_CLEANUP;
-
- /* Determine the number of parallel workers to launch */
- nworkers = vacrel->lps->nindexes_parallel_cleanup;
-
- /* Add conditionally parallel-aware indexes if in the first time call */
- if (vacrel->num_index_scans == 0)
- nworkers += vacrel->lps->nindexes_parallel_condcleanup;
- }
-
- /* The leader process will participate */
- nworkers--;
-
- /*
- * It is possible that parallel context is initialized with fewer workers
- * than the number of indexes that need a separate worker in the current
- * phase, so we need to consider it. See
- * parallel_vacuum_compute_workers().
- */
- nworkers = Min(nworkers, lps->pcxt->nworkers);
-
- /*
- * Set index vacuum status and mark whether parallel vacuum worker can
- * process it.
- */
- for (int i = 0; i < vacrel->nindexes; i++)
- {
- LVParallelIndStats *pindstats = &(vacrel->lps->lvpindstats[i]);
-
- Assert(pindstats->status == PARALLEL_INDVAC_STATUS_INITIAL);
- pindstats->status = new_status;
- pindstats->parallel_workers_can_process =
- (lps->will_parallel_vacuum[i] &
- parallel_vacuum_index_is_parallel_safe(vacrel, vacrel->indrels[i],
- vacuum));
- }
-
- /* Reset the parallel index processing counter */
- pg_atomic_write_u32(&(lps->lvshared->idx), 0);
-
- /* Setup the shared cost-based vacuum delay and launch workers */
- if (nworkers > 0)
- {
- /* Reinitialize parallel context to relaunch parallel workers */
- if (vacrel->num_index_scans > 0)
- ReinitializeParallelDSM(lps->pcxt);
-
- /*
- * Set up shared cost balance and the number of active workers for
- * vacuum delay. We need to do this before launching workers as
- * otherwise, they might not see the updated values for these
- * parameters.
- */
- pg_atomic_write_u32(&(lps->lvshared->cost_balance), VacuumCostBalance);
- pg_atomic_write_u32(&(lps->lvshared->active_nworkers), 0);
-
- /*
- * The number of workers can vary between bulkdelete and cleanup
- * phase.
- */
- ReinitializeParallelWorkers(lps->pcxt, nworkers);
-
- LaunchParallelWorkers(lps->pcxt);
-
- if (lps->pcxt->nworkers_launched > 0)
- {
- /*
- * Reset the local cost values for leader backend as we have
- * already accumulated the remaining balance of heap.
- */
- VacuumCostBalance = 0;
- VacuumCostBalanceLocal = 0;
-
- /* Enable shared cost balance for leader backend */
- VacuumSharedCostBalance = &(lps->lvshared->cost_balance);
- VacuumActiveNWorkers = &(lps->lvshared->active_nworkers);
- }
-
- if (vacuum)
- ereport(elevel,
- (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
- "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
- lps->pcxt->nworkers_launched),
- lps->pcxt->nworkers_launched, nworkers)));
- else
- ereport(elevel,
- (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
- "launched %d parallel vacuum workers for index cleanup (planned: %d)",
- lps->pcxt->nworkers_launched),
- lps->pcxt->nworkers_launched, nworkers)));
- }
-
- /* Process the indexes that can be processed by only leader process */
- parallel_vacuum_process_unsafe_indexes(vacrel);
-
- /*
- * Join as a parallel worker. The leader process alone processes all
- * parallel-safe indexes in the case where no workers are launched.
- */
- parallel_vacuum_process_safe_indexes(vacrel, lps->lvshared, lps->lvpindstats);
-
- /*
- * Next, accumulate buffer and WAL usage. (This must wait for the workers
- * to finish, or we might get incomplete data.)
- */
- if (nworkers > 0)
- {
- /* Wait for all vacuum workers to finish */
- WaitForParallelWorkersToFinish(lps->pcxt);
-
- for (int i = 0; i < lps->pcxt->nworkers_launched; i++)
- InstrAccumParallelQuery(&lps->buffer_usage[i], &lps->wal_usage[i]);
- }
-
- /*
- * Reset all index status back to initial (while checking that we have
- * processed all indexes).
- */
- for (int i = 0; i < vacrel->nindexes; i++)
- {
- LVParallelIndStats *pindstats = &(lps->lvpindstats[i]);
-
- if (pindstats->status != PARALLEL_INDVAC_STATUS_COMPLETED)
- elog(ERROR, "parallel index vacuum on index \"%s\" is not completed",
- RelationGetRelationName(vacrel->indrels[i]));
-
- pindstats->status = PARALLEL_INDVAC_STATUS_INITIAL;
- }
-
- /*
- * Carry the shared balance value to heap scan and disable shared costing
- */
- if (VacuumSharedCostBalance)
- {
- VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance);
- VacuumSharedCostBalance = NULL;
- VacuumActiveNWorkers = NULL;
- }
-}
-
-/*
- * Index vacuum/cleanup routine used by the leader process and parallel
- * vacuum worker processes to process the indexes in parallel.
- */
-static void
-parallel_vacuum_process_safe_indexes(LVRelState *vacrel, LVShared *shared,
- LVParallelIndStats *pindstats)
-{
- /*
- * Increment the active worker count if we are able to launch any worker.
- */
- if (VacuumActiveNWorkers)
- pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
-
- /* Loop until all indexes are vacuumed */
- for (;;)
- {
- int idx;
- LVParallelIndStats *pis;
-
- /* Get an index number to process */
- idx = pg_atomic_fetch_add_u32(&(shared->idx), 1);
-
- /* Done for all indexes? */
- if (idx >= vacrel->nindexes)
- break;
-
- pis = &(pindstats[idx]);
-
- /*
- * Skip processing index that is unsafe for workers or has an
- * unsuitable target for parallel index vacuum (this is processed in
- * parallel_vacuum_process_unsafe_indexes() by the leader).
- */
- if (!pis->parallel_workers_can_process)
- continue;
-
- /* Do vacuum or cleanup of the index */
- parallel_vacuum_process_one_index(vacrel, vacrel->indrels[idx],
- shared, pis);
- }
-
- /*
- * We have completed the index vacuum so decrement the active worker
- * count.
- */
- if (VacuumActiveNWorkers)
- pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
-}
-
-/*
- * Perform parallel processing of indexes in leader process.
- *
- * Handles index vacuuming (or index cleanup) for indexes that are not
- * parallel safe. It's possible that this will vary for a given index, based
- * on details like whether we're performing index cleanup right now.
- *
- * Also performs processing of smaller indexes that fell under the size cutoff
- * enforced by parallel_vacuum_compute_workers().
- */
-static void
-parallel_vacuum_process_unsafe_indexes(LVRelState *vacrel)
-{
- LVParallelState *lps = vacrel->lps;
-
- Assert(!IsParallelWorker());
-
- /*
- * Increment the active worker count if we are able to launch any worker.
- */
- if (VacuumActiveNWorkers)
- pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
-
- for (int idx = 0; idx < vacrel->nindexes; idx++)
- {
- LVParallelIndStats *pindstats = &(lps->lvpindstats[idx]);
-
- /* Skip, indexes that are safe for workers */
- if (pindstats->parallel_workers_can_process)
- continue;
-
- /* Do vacuum or cleanup of the index */
- parallel_vacuum_process_one_index(vacrel, vacrel->indrels[idx],
- lps->lvshared, pindstats);
- }
-
- /*
- * We have completed the index vacuum so decrement the active worker
- * count.
- */
- if (VacuumActiveNWorkers)
- pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
-}
-
-/*
- * Vacuum or cleanup index either by leader process or by one of the worker
- * process. After processing the index this function copies the index
- * statistics returned from ambulkdelete and amvacuumcleanup to the DSM
- * segment.
- */
-static void
-parallel_vacuum_process_one_index(LVRelState *vacrel, Relation indrel,
- LVShared *shared, LVParallelIndStats *pindstats)
-{
- IndexBulkDeleteResult *istat = NULL;
- IndexBulkDeleteResult *istat_res;
-
- /*
- * Update the pointer to the corresponding bulk-deletion result if someone
- * has already updated it
- */
- if (pindstats->istat_updated)
- istat = &(pindstats->istat);
-
- switch (pindstats->status)
- {
- case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
- istat_res = lazy_vacuum_one_index(indrel, istat,
- shared->reltuples, vacrel);
- break;
- case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
- istat_res = lazy_cleanup_one_index(indrel, istat,
- shared->reltuples,
- shared->estimated_count,
- vacrel);
- break;
- default:
- elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"",
- pindstats->status,
- RelationGetRelationName(indrel));
- }
-
- /*
- * Copy the index bulk-deletion result returned from ambulkdelete and
- * amvacuumcleanup to the DSM segment if it's the first cycle because they
- * allocate locally and it's possible that an index will be vacuumed by a
- * different vacuum process the next cycle. Copying the result normally
- * happens only the first time an index is vacuumed. For any additional
- * vacuum pass, we directly point to the result on the DSM segment and
- * pass it to vacuum index APIs so that workers can update it directly.
- *
- * Since all vacuum workers write the bulk-deletion result at different
- * slots we can write them without locking.
- */
- if (!pindstats->istat_updated && istat_res != NULL)
- {
- memcpy(&(pindstats->istat), istat_res, sizeof(IndexBulkDeleteResult));
- pindstats->istat_updated = true;
-
- /* Free the locally-allocated bulk-deletion result */
- pfree(istat_res);
- }
-
- /*
- * Update the status to completed. No need to lock here since each worker
- * touches different indexes.
- */
- pindstats->status = PARALLEL_INDVAC_STATUS_COMPLETED;
-}
-
/*
* lazy_cleanup_all_indexes() -- cleanup all indexes of relation.
*/
static void
lazy_cleanup_all_indexes(LVRelState *vacrel)
{
- Assert(!IsParallelWorker());
Assert(vacrel->nindexes > 0);
/* Report that we are now cleaning up indexes */
else
{
/* Outsource everything to parallel variant */
- parallel_vacuum_process_all_indexes(vacrel, false);
+ parallel_vacuum_cleanup_all_indexes(vacrel->pvs, vacrel->new_rel_tuples,
+ vacrel->num_index_scans,
+ (vacrel->tupcount_pages < vacrel->rel_pages));
}
}
autovacuum_work_mem != -1 ?
autovacuum_work_mem : maintenance_work_mem;
- Assert(!IsParallelWorker());
-
if (vacrel->nindexes > 0)
{
BlockNumber rel_pages = vacrel->rel_pages;
VacDeadItems *dead_items;
int max_items;
+ max_items = dead_items_max_items(vacrel);
+ Assert(max_items >= MaxHeapTuplesPerPage);
+
/*
* Initialize state for a parallel vacuum. As of now, only one worker can
* be used for an index, so we invoke parallelism only if there are at
vacrel->relname)));
}
else
- parallel_vacuum_begin(vacrel, nworkers);
+ vacrel->pvs = parallel_vacuum_init(vacrel->rel, vacrel->indrels,
+ vacrel->nindexes, nworkers,
+ max_items, elevel,
+ vacrel->bstrategy);
- /* If parallel mode started, vacrel->dead_items allocated in DSM */
+ /* If parallel mode started, dead_items space is allocated in DSM */
if (ParallelVacuumIsActive(vacrel))
+ {
+ vacrel->dead_items = parallel_vacuum_get_dead_items(vacrel->pvs);
return;
+ }
}
/* Serial VACUUM case */
- max_items = dead_items_max_items(vacrel);
dead_items = (VacDeadItems *) palloc(vac_max_items_to_alloc_size(max_items));
dead_items->max_items = max_items;
dead_items->num_items = 0;
return;
}
- /*
- * End parallel mode before updating index statistics as we cannot write
- * during parallel mode.
- */
- parallel_vacuum_end(vacrel);
+ /* End parallel mode */
+ parallel_vacuum_end(vacrel->pvs, vacrel->indstats);
+ vacrel->pvs = NULL;
}
/*
return all_visible;
}
-/*
- * Compute the number of parallel worker processes to request. Both index
- * vacuum and index cleanup can be executed with parallel workers. The index
- * is eligible for parallel vacuum iff its size is greater than
- * min_parallel_index_scan_size as invoking workers for very small indexes
- * can hurt performance.
- *
- * nrequested is the number of parallel workers that user requested. If
- * nrequested is 0, we compute the parallel degree based on nindexes, that is
- * the number of indexes that support parallel vacuum. This function also
- * sets will_parallel_vacuum to remember indexes that participate in parallel
- * vacuum.
- */
-static int
-parallel_vacuum_compute_workers(LVRelState *vacrel, int nrequested,
- bool *will_parallel_vacuum)
-{
- int nindexes_parallel = 0;
- int nindexes_parallel_bulkdel = 0;
- int nindexes_parallel_cleanup = 0;
- int parallel_workers;
-
- /*
- * We don't allow performing parallel operation in standalone backend or
- * when parallelism is disabled.
- */
- if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0)
- return 0;
-
- /*
- * Compute the number of indexes that can participate in parallel vacuum.
- */
- for (int idx = 0; idx < vacrel->nindexes; idx++)
- {
- Relation indrel = vacrel->indrels[idx];
- uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
-
- /* Skip index that is not a suitable target for parallel index vacuum */
- if (vacoptions == VACUUM_OPTION_NO_PARALLEL ||
- RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size)
- continue;
-
- will_parallel_vacuum[idx] = true;
-
- if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
- nindexes_parallel_bulkdel++;
- if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) ||
- ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
- nindexes_parallel_cleanup++;
- }
-
- nindexes_parallel = Max(nindexes_parallel_bulkdel,
- nindexes_parallel_cleanup);
-
- /* The leader process takes one index */
- nindexes_parallel--;
-
- /* No index supports parallel vacuum */
- if (nindexes_parallel <= 0)
- return 0;
-
- /* Compute the parallel degree */
- parallel_workers = (nrequested > 0) ?
- Min(nrequested, nindexes_parallel) : nindexes_parallel;
-
- /* Cap by max_parallel_maintenance_workers */
- parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers);
-
- return parallel_workers;
-}
-
/*
* Update index statistics in pg_class if the statistics are accurate.
*/
}
/*
- * Try to enter parallel mode and create a parallel context. Then initialize
- * shared memory state.
- *
- * On success (when we can launch one or more workers), will set dead_items and
- * lps in vacrel for caller. A set lps in vacrel state indicates that parallel
- * VACUUM is currently active.
- */
-static void
-parallel_vacuum_begin(LVRelState *vacrel, int nrequested)
-{
- LVParallelState *lps;
- Relation *indrels = vacrel->indrels;
- int nindexes = vacrel->nindexes;
- ParallelContext *pcxt;
- LVShared *shared;
- VacDeadItems *dead_items;
- LVParallelIndStats *pindstats;
- BufferUsage *buffer_usage;
- WalUsage *wal_usage;
- bool *will_parallel_vacuum;
- int max_items;
- Size est_pindstats_len;
- Size est_shared_len;
- Size est_dead_items_len;
- int nindexes_mwm = 0;
- int parallel_workers = 0;
- int querylen;
-
- /*
- * A parallel vacuum must be requested and there must be indexes on the
- * relation
- */
- Assert(nrequested >= 0);
- Assert(nindexes > 0);
-
- /*
- * Compute the number of parallel vacuum workers to launch
- */
- will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes);
- parallel_workers = parallel_vacuum_compute_workers(vacrel, nrequested,
- will_parallel_vacuum);
- if (parallel_workers <= 0)
- {
- /* Can't perform vacuum in parallel -- lps not set in vacrel */
- pfree(will_parallel_vacuum);
- return;
- }
-
- lps = (LVParallelState *) palloc0(sizeof(LVParallelState));
-
- EnterParallelMode();
- pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
- parallel_workers);
- Assert(pcxt->nworkers > 0);
- lps->pcxt = pcxt;
- lps->will_parallel_vacuum = will_parallel_vacuum;
-
- /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_STATS */
- est_pindstats_len = mul_size(sizeof(LVParallelIndStats), nindexes);
- shm_toc_estimate_chunk(&pcxt->estimator, est_pindstats_len);
- shm_toc_estimate_keys(&pcxt->estimator, 1);
-
- /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
- est_shared_len = sizeof(LVShared);
- shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len);
- shm_toc_estimate_keys(&pcxt->estimator, 1);
-
- /* Estimate size for dead_items -- PARALLEL_VACUUM_KEY_DEAD_ITEMS */
- max_items = dead_items_max_items(vacrel);
- est_dead_items_len = vac_max_items_to_alloc_size(max_items);
- shm_toc_estimate_chunk(&pcxt->estimator, est_dead_items_len);
- shm_toc_estimate_keys(&pcxt->estimator, 1);
-
- /*
- * Estimate space for BufferUsage and WalUsage --
- * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE.
- *
- * If there are no extensions loaded that care, we could skip this. We
- * have no way of knowing whether anyone's looking at pgBufferUsage or
- * pgWalUsage, so do it unconditionally.
- */
- shm_toc_estimate_chunk(&pcxt->estimator,
- mul_size(sizeof(BufferUsage), pcxt->nworkers));
- shm_toc_estimate_keys(&pcxt->estimator, 1);
- shm_toc_estimate_chunk(&pcxt->estimator,
- mul_size(sizeof(WalUsage), pcxt->nworkers));
- shm_toc_estimate_keys(&pcxt->estimator, 1);
-
- /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
- if (debug_query_string)
- {
- querylen = strlen(debug_query_string);
- shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
- shm_toc_estimate_keys(&pcxt->estimator, 1);
- }
- else
- querylen = 0; /* keep compiler quiet */
-
- InitializeParallelDSM(pcxt);
-
- /* Prepare index vacuum stats */
- pindstats = (LVParallelIndStats *) shm_toc_allocate(pcxt->toc, est_pindstats_len);
- for (int idx = 0; idx < nindexes; idx++)
- {
- Relation indrel = indrels[idx];
- uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
-
- /*
- * Cleanup option should be either disabled, always performing in
- * parallel or conditionally performing in parallel.
- */
- Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) ||
- ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
- Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE);
-
- if (!will_parallel_vacuum[idx])
- continue;
-
- if (indrel->rd_indam->amusemaintenanceworkmem)
- nindexes_mwm++;
-
- /*
- * Remember the number of indexes that support parallel operation for
- * each phase.
- */
- if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
- lps->nindexes_parallel_bulkdel++;
- if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0)
- lps->nindexes_parallel_cleanup++;
- if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)
- lps->nindexes_parallel_condcleanup++;
- }
- shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_INDEX_STATS, pindstats);
- lps->lvpindstats = pindstats;
-
- /* Prepare shared information */
- shared = (LVShared *) shm_toc_allocate(pcxt->toc, est_shared_len);
- MemSet(shared, 0, est_shared_len);
- shared->relid = RelationGetRelid(vacrel->rel);
- shared->elevel = elevel;
- shared->maintenance_work_mem_worker =
- (nindexes_mwm > 0) ?
- maintenance_work_mem / Min(parallel_workers, nindexes_mwm) :
- maintenance_work_mem;
-
- pg_atomic_init_u32(&(shared->cost_balance), 0);
- pg_atomic_init_u32(&(shared->active_nworkers), 0);
- pg_atomic_init_u32(&(shared->idx), 0);
-
- shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared);
- lps->lvshared = shared;
-
- /* Prepare the dead_items space */
- dead_items = (VacDeadItems *) shm_toc_allocate(pcxt->toc,
- est_dead_items_len);
- dead_items->max_items = max_items;
- dead_items->num_items = 0;
- MemSet(dead_items->items, 0, sizeof(ItemPointerData) * max_items);
- shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_DEAD_ITEMS, dead_items);
-
- /*
- * Allocate space for each worker's BufferUsage and WalUsage; no need to
- * initialize
- */
- buffer_usage = shm_toc_allocate(pcxt->toc,
- mul_size(sizeof(BufferUsage), pcxt->nworkers));
- shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, buffer_usage);
- lps->buffer_usage = buffer_usage;
- wal_usage = shm_toc_allocate(pcxt->toc,
- mul_size(sizeof(WalUsage), pcxt->nworkers));
- shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_WAL_USAGE, wal_usage);
- lps->wal_usage = wal_usage;
-
- /* Store query string for workers */
- if (debug_query_string)
- {
- char *sharedquery;
-
- sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
- memcpy(sharedquery, debug_query_string, querylen + 1);
- sharedquery[querylen] = '\0';
- shm_toc_insert(pcxt->toc,
- PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
- }
-
- /* Success -- set dead_items and lps in leader's vacrel state */
- vacrel->dead_items = dead_items;
- vacrel->lps = lps;
-}
-
-/*
- * Destroy the parallel context, and end parallel mode.
- *
- * Since writes are not allowed during parallel mode, copy the
- * updated index statistics from DSM into local memory and then later use that
- * to update the index statistics. One might think that we can exit from
- * parallel mode, update the index statistics and then destroy parallel
- * context, but that won't be safe (see ExitParallelMode).
- */
-static void
-parallel_vacuum_end(LVRelState *vacrel)
-{
- IndexBulkDeleteResult **indstats = vacrel->indstats;
- LVParallelState *lps = vacrel->lps;
- int nindexes = vacrel->nindexes;
-
- Assert(!IsParallelWorker());
-
- /* Copy the updated statistics */
- for (int idx = 0; idx < nindexes; idx++)
- {
- LVParallelIndStats *pindstats = &(lps->lvpindstats[idx]);
-
- if (pindstats->istat_updated)
- {
- indstats[idx] = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
- memcpy(indstats[idx], &pindstats->istat, sizeof(IndexBulkDeleteResult));
- }
- else
- indstats[idx] = NULL;
- }
-
- DestroyParallelContext(lps->pcxt);
- ExitParallelMode();
-
- /* Deactivate parallel vacuum */
- pfree(lps->will_parallel_vacuum);
- pfree(lps);
- vacrel->lps = NULL;
-}
-
-/*
- * Returns false, if the given index can't participate in the next execution of
- * parallel index vacuum or parallel index cleanup.
- */
-static bool
-parallel_vacuum_index_is_parallel_safe(LVRelState *vacrel, Relation indrel,
- bool vacuum)
-{
- uint8 vacoptions;
-
- vacoptions = indrel->rd_indam->amparallelvacuumoptions;
-
- /* In parallel vacuum case, check if it supports parallel bulk-deletion */
- if (vacuum)
- return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0);
-
- /* Not safe, if the index does not support parallel cleanup */
- if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
- ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
- return false;
-
- /*
- * Not safe, if the index supports parallel cleanup conditionally, but we
- * have already processed the index (for bulkdelete). We do this to avoid
- * the need to invoke workers when parallel index cleanup doesn't need to
- * scan the index. See the comments for option
- * VACUUM_OPTION_PARALLEL_COND_CLEANUP to know when indexes support
- * parallel cleanup conditionally.
- */
- if (vacrel->num_index_scans > 0 &&
- ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
- return false;
-
- return true;
-}
-
-/*
- * Perform work within a launched parallel process.
- *
- * Since parallel vacuum workers perform only index vacuum or index cleanup,
- * we don't need to report progress information.
- */
-void
-parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
-{
- Relation rel;
- Relation *indrels;
- LVParallelIndStats *lvpindstats;
- LVShared *lvshared;
- VacDeadItems *dead_items;
- BufferUsage *buffer_usage;
- WalUsage *wal_usage;
- int nindexes;
- char *sharedquery;
- LVRelState vacrel;
- ErrorContextCallback errcallback;
-
- /*
- * A parallel vacuum worker must have only PROC_IN_VACUUM flag since we
- * don't support parallel vacuum for autovacuum as of now.
- */
- Assert(MyProc->statusFlags == PROC_IN_VACUUM);
-
- lvshared = (LVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED,
- false);
- elevel = lvshared->elevel;
-
- elog(DEBUG1, "starting parallel vacuum worker");
-
- /* Set debug_query_string for individual workers */
- sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true);
- debug_query_string = sharedquery;
- pgstat_report_activity(STATE_RUNNING, debug_query_string);
-
- /*
- * Open table. The lock mode is the same as the leader process. It's
- * okay because the lock mode does not conflict among the parallel
- * workers.
- */
- rel = table_open(lvshared->relid, ShareUpdateExclusiveLock);
-
- /*
- * Open all indexes. indrels are sorted in order by OID, which should be
- * matched to the leader's one.
- */
- vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels);
- Assert(nindexes > 0);
-
- /* Set index statistics */
- lvpindstats = (LVParallelIndStats *) shm_toc_lookup(toc,
- PARALLEL_VACUUM_KEY_INDEX_STATS,
- false);
-
- /* Set dead_items space (set as worker's vacrel dead_items below) */
- dead_items = (VacDeadItems *) shm_toc_lookup(toc,
- PARALLEL_VACUUM_KEY_DEAD_ITEMS,
- false);
-
- /* Set cost-based vacuum delay */
- VacuumCostActive = (VacuumCostDelay > 0);
- VacuumCostBalance = 0;
- VacuumPageHit = 0;
- VacuumPageMiss = 0;
- VacuumPageDirty = 0;
- VacuumCostBalanceLocal = 0;
- VacuumSharedCostBalance = &(lvshared->cost_balance);
- VacuumActiveNWorkers = &(lvshared->active_nworkers);
-
- vacrel.rel = rel;
- vacrel.indrels = indrels;
- vacrel.nindexes = nindexes;
- /* Each parallel VACUUM worker gets its own access strategy */
- vacrel.bstrategy = GetAccessStrategy(BAS_VACUUM);
- vacrel.indstats = (IndexBulkDeleteResult **)
- palloc0(nindexes * sizeof(IndexBulkDeleteResult *));
-
- if (lvshared->maintenance_work_mem_worker > 0)
- maintenance_work_mem = lvshared->maintenance_work_mem_worker;
-
- /*
- * Initialize vacrel for use as error callback arg by parallel worker.
- */
- vacrel.relnamespace = get_namespace_name(RelationGetNamespace(rel));
- vacrel.relname = pstrdup(RelationGetRelationName(rel));
- vacrel.indname = NULL;
- vacrel.phase = VACUUM_ERRCB_PHASE_UNKNOWN; /* Not yet processing */
- vacrel.dead_items = dead_items;
-
- /* Setup error traceback support for ereport() */
- errcallback.callback = vacuum_error_callback;
- errcallback.arg = &vacrel;
- errcallback.previous = error_context_stack;
- error_context_stack = &errcallback;
-
- /* Prepare to track buffer usage during parallel execution */
- InstrStartParallelQuery();
-
- /* Process indexes to perform vacuum/cleanup */
- parallel_vacuum_process_safe_indexes(&vacrel, lvshared, lvpindstats);
-
- /* Report buffer/WAL usage during parallel execution */
- buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
- wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
- InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
- &wal_usage[ParallelWorkerNumber]);
-
- /* Pop the error context stack */
- error_context_stack = errcallback.previous;
-
- vac_close_indexes(nindexes, indrels, RowExclusiveLock);
- table_close(rel, ShareUpdateExclusiveLock);
- FreeAccessStrategy(vacrel.bstrategy);
- pfree(vacrel.indstats);
-}
-
-/*
- * Error context callback for errors occurring during vacuum.
+ * Error context callback for errors occurring during vacuum. The error
+ * context messages for index phases should match the messages set in parallel
+ * vacuum. If you change this function for those phases, change
+ * parallel_vacuum_error_callback() as well.
*/
static void
vacuum_error_callback(void *arg)
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * vacuumparallel.c
+ * Support routines for parallel vacuum execution.
+ *
+ * This file contains routines that are intended to support setting up, using,
+ * and tearing down a ParallelVacuumState.
+ *
+ * In a parallel vacuum, we perform both index bulk deletion and index cleanup
+ * with parallel worker processes. Individual indexes are processed by one
+ * vacuum process. ParalleVacuumState contains shared information as well as
+ * the memory space for storing dead items allocated in the DSM segment. We
+ * launch parallel worker processes at the start of parallel index
+ * bulk-deletion and index cleanup and once all indexes are processed, the
+ * parallel worker processes exit. Each time we process indexes parallelly,
+ * the parallel context is re-initialized so that the same DSM can be used for
+ * multiple passes of index bulk-deletion and index cleanup.
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/commands/vacuumparallel.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/amapi.h"
+#include "access/table.h"
+#include "catalog/index.h"
+#include "commands/vacuum.h"
+#include "optimizer/paths.h"
+#include "pgstat.h"
+#include "storage/bufmgr.h"
+#include "tcop/tcopprot.h"
+#include "utils/lsyscache.h"
+
+/*
+ * DSM keys for parallel vacuum. Unlike other parallel execution code, since
+ * we don't need to worry about DSM keys conflicting with plan_node_id we can
+ * use small integers.
+ */
+#define PARALLEL_VACUUM_KEY_SHARED 1
+#define PARALLEL_VACUUM_KEY_DEAD_ITEMS 2
+#define PARALLEL_VACUUM_KEY_QUERY_TEXT 3
+#define PARALLEL_VACUUM_KEY_BUFFER_USAGE 4
+#define PARALLEL_VACUUM_KEY_WAL_USAGE 5
+#define PARALLEL_VACUUM_KEY_INDEX_STATS 6
+
+/*
+ * Shared information among parallel workers. So this is allocated in the DSM
+ * segment.
+ */
+typedef struct PVShared
+{
+ /*
+ * Target table relid and log level. These fields are not modified during
+ * the parallel vacuum.
+ */
+ Oid relid;
+ int elevel;
+
+ /*
+ * Fields for both index vacuum and cleanup.
+ *
+ * reltuples is the total number of input heap tuples. We set either old
+ * live tuples in the index vacuum case or the new live tuples in the
+ * index cleanup case.
+ *
+ * estimated_count is true if reltuples is an estimated value. (Note that
+ * reltuples could be -1 in this case, indicating we have no idea.)
+ */
+ double reltuples;
+ bool estimated_count;
+
+ /*
+ * In single process vacuum we could consume more memory during index
+ * vacuuming or cleanup apart from the memory for heap scanning. In
+ * parallel vacuum, since individual vacuum workers can consume memory
+ * equal to maintenance_work_mem, the new maintenance_work_mem for each
+ * worker is set such that the parallel operation doesn't consume more
+ * memory than single process vacuum.
+ */
+ int maintenance_work_mem_worker;
+
+ /*
+ * Shared vacuum cost balance. During parallel vacuum,
+ * VacuumSharedCostBalance points to this value and it accumulates the
+ * balance of each parallel vacuum worker.
+ */
+ pg_atomic_uint32 cost_balance;
+
+ /*
+ * Number of active parallel workers. This is used for computing the
+ * minimum threshold of the vacuum cost balance before a worker sleeps for
+ * cost-based delay.
+ */
+ pg_atomic_uint32 active_nworkers;
+
+ /* Counter for vacuuming and cleanup */
+ pg_atomic_uint32 idx;
+} PVShared;
+
+/* Status used during parallel index vacuum or cleanup */
+typedef enum PVIndVacStatus
+{
+ PARALLEL_INDVAC_STATUS_INITIAL = 0,
+ PARALLEL_INDVAC_STATUS_NEED_BULKDELETE,
+ PARALLEL_INDVAC_STATUS_NEED_CLEANUP,
+ PARALLEL_INDVAC_STATUS_COMPLETED
+} PVIndVacStatus;
+
+/*
+ * Struct for index vacuum statistics of an index that is used for parallel vacuum.
+ * This includes the status of parallel index vacuum as well as index statistics.
+ */
+typedef struct PVIndStats
+{
+ /*
+ * The following two fields are set by leader process before executing
+ * parallel index vacuum or parallel index cleanup. These fields are not
+ * fixed for the entire VACUUM operation. They are only fixed for an
+ * individual parallel index vacuum and cleanup.
+ *
+ * parallel_workers_can_process is true if both leader and worker can
+ * process the index, otherwise only leader can process it.
+ */
+ PVIndVacStatus status;
+ bool parallel_workers_can_process;
+
+ /*
+ * Individual worker or leader stores the result of index vacuum or
+ * cleanup.
+ */
+ bool istat_updated; /* are the stats updated? */
+ IndexBulkDeleteResult istat;
+} PVIndStats;
+
+/* Struct for maintaining a parallel vacuum state. */
+typedef struct ParallelVacuumState
+{
+ /* NULL for worker processes */
+ ParallelContext *pcxt;
+
+ /* Target indexes */
+ Relation *indrels;
+ int nindexes;
+
+ /* Shared information among parallel vacuum workers */
+ PVShared *shared;
+
+ /*
+ * Shared index statistics among parallel vacuum workers. The array
+ * element is allocated for every index, even those indexes where parallel
+ * index vacuuming is unsafe or not worthwhile (e.g.,
+ * will_parallel_vacuum[] is false). During parallel vacuum,
+ * IndexBulkDeleteResult of each index is kept in DSM and is copied into
+ * local memory at the end of parallel vacuum.
+ */
+ PVIndStats *indstats;
+
+ /* Shared dead items space among parallel vacuum workers */
+ VacDeadItems *dead_items;
+
+ /* Points to buffer usage area in DSM */
+ BufferUsage *buffer_usage;
+
+ /* Points to WAL usage area in DSM */
+ WalUsage *wal_usage;
+
+ /*
+ * False if the index is totally unsuitable target for all parallel
+ * processing. For example, the index could be <
+ * min_parallel_index_scan_size cutoff.
+ */
+ bool *will_parallel_vacuum;
+
+ /*
+ * The number of indexes that support parallel index bulk-deletion and
+ * parallel index cleanup respectively.
+ */
+ int nindexes_parallel_bulkdel;
+ int nindexes_parallel_cleanup;
+ int nindexes_parallel_condcleanup;
+
+ /* Buffer access strategy used by leader process */
+ BufferAccessStrategy bstrategy;
+
+ /*
+ * Error reporting state. The error callback is set only for workers
+ * processes during parallel index vacuum.
+ */
+ char *relnamespace;
+ char *relname;
+ char *indname;
+ PVIndVacStatus status;
+} ParallelVacuumState;
+
+static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
+ bool *will_parallel_vacuum);
+static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
+ bool vacuum);
+static void parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs);
+static void parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs);
+static void parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel,
+ PVIndStats *indstats);
+static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
+ bool vacuum);
+static void parallel_vacuum_error_callback(void *arg);
+
+/*
+ * Try to enter parallel mode and create a parallel context. Then initialize
+ * shared memory state.
+ *
+ * On success, return parallel vacuum state. Otherwise return NULL.
+ */
+ParallelVacuumState *
+parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
+ int nrequested_workers, int max_items,
+ int elevel, BufferAccessStrategy bstrategy)
+{
+ ParallelVacuumState *pvs;
+ ParallelContext *pcxt;
+ PVShared *shared;
+ VacDeadItems *dead_items;
+ PVIndStats *indstats;
+ BufferUsage *buffer_usage;
+ WalUsage *wal_usage;
+ bool *will_parallel_vacuum;
+ Size est_indstats_len;
+ Size est_shared_len;
+ Size est_dead_items_len;
+ int nindexes_mwm = 0;
+ int parallel_workers = 0;
+ int querylen;
+
+ /*
+ * A parallel vacuum must be requested and there must be indexes on the
+ * relation
+ */
+ Assert(nrequested_workers >= 0);
+ Assert(nindexes > 0);
+
+ /*
+ * Compute the number of parallel vacuum workers to launch
+ */
+ will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes);
+ parallel_workers = parallel_vacuum_compute_workers(indrels, nindexes,
+ nrequested_workers,
+ will_parallel_vacuum);
+ if (parallel_workers <= 0)
+ {
+ /* Can't perform vacuum in parallel -- return NULL */
+ pfree(will_parallel_vacuum);
+ return NULL;
+ }
+
+ pvs = (ParallelVacuumState *) palloc0(sizeof(ParallelVacuumState));
+ pvs->indrels = indrels;
+ pvs->nindexes = nindexes;
+ pvs->will_parallel_vacuum = will_parallel_vacuum;
+ pvs->bstrategy = bstrategy;
+
+ EnterParallelMode();
+ pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
+ parallel_workers);
+ Assert(pcxt->nworkers > 0);
+ pvs->pcxt = pcxt;
+
+ /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_INDEX_STATS */
+ est_indstats_len = mul_size(sizeof(PVIndStats), nindexes);
+ shm_toc_estimate_chunk(&pcxt->estimator, est_indstats_len);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+ /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
+ est_shared_len = sizeof(PVShared);
+ shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+ /* Estimate size for dead_items -- PARALLEL_VACUUM_KEY_DEAD_ITEMS */
+ est_dead_items_len = vac_max_items_to_alloc_size(max_items);
+ shm_toc_estimate_chunk(&pcxt->estimator, est_dead_items_len);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+ /*
+ * Estimate space for BufferUsage and WalUsage --
+ * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE.
+ *
+ * If there are no extensions loaded that care, we could skip this. We
+ * have no way of knowing whether anyone's looking at pgBufferUsage or
+ * pgWalUsage, so do it unconditionally.
+ */
+ shm_toc_estimate_chunk(&pcxt->estimator,
+ mul_size(sizeof(BufferUsage), pcxt->nworkers));
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+ shm_toc_estimate_chunk(&pcxt->estimator,
+ mul_size(sizeof(WalUsage), pcxt->nworkers));
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+ /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
+ if (debug_query_string)
+ {
+ querylen = strlen(debug_query_string);
+ shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+ }
+ else
+ querylen = 0; /* keep compiler quiet */
+
+ InitializeParallelDSM(pcxt);
+
+ /* Prepare index vacuum stats */
+ indstats = (PVIndStats *) shm_toc_allocate(pcxt->toc, est_indstats_len);
+ for (int i = 0; i < nindexes; i++)
+ {
+ Relation indrel = indrels[i];
+ uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
+
+ /*
+ * Cleanup option should be either disabled, always performing in
+ * parallel or conditionally performing in parallel.
+ */
+ Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) ||
+ ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
+ Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE);
+
+ if (!will_parallel_vacuum[i])
+ continue;
+
+ if (indrel->rd_indam->amusemaintenanceworkmem)
+ nindexes_mwm++;
+
+ /*
+ * Remember the number of indexes that support parallel operation for
+ * each phase.
+ */
+ if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
+ pvs->nindexes_parallel_bulkdel++;
+ if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0)
+ pvs->nindexes_parallel_cleanup++;
+ if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)
+ pvs->nindexes_parallel_condcleanup++;
+ }
+ shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_INDEX_STATS, indstats);
+ pvs->indstats = indstats;
+
+ /* Prepare shared information */
+ shared = (PVShared *) shm_toc_allocate(pcxt->toc, est_shared_len);
+ MemSet(shared, 0, est_shared_len);
+ shared->relid = RelationGetRelid(rel);
+ shared->elevel = elevel;
+ shared->maintenance_work_mem_worker =
+ (nindexes_mwm > 0) ?
+ maintenance_work_mem / Min(parallel_workers, nindexes_mwm) :
+ maintenance_work_mem;
+
+ pg_atomic_init_u32(&(shared->cost_balance), 0);
+ pg_atomic_init_u32(&(shared->active_nworkers), 0);
+ pg_atomic_init_u32(&(shared->idx), 0);
+
+ shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared);
+ pvs->shared = shared;
+
+ /* Prepare the dead_items space */
+ dead_items = (VacDeadItems *) shm_toc_allocate(pcxt->toc,
+ est_dead_items_len);
+ dead_items->max_items = max_items;
+ dead_items->num_items = 0;
+ MemSet(dead_items->items, 0, sizeof(ItemPointerData) * max_items);
+ shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_DEAD_ITEMS, dead_items);
+ pvs->dead_items = dead_items;
+
+ /*
+ * Allocate space for each worker's BufferUsage and WalUsage; no need to
+ * initialize
+ */
+ buffer_usage = shm_toc_allocate(pcxt->toc,
+ mul_size(sizeof(BufferUsage), pcxt->nworkers));
+ shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, buffer_usage);
+ pvs->buffer_usage = buffer_usage;
+ wal_usage = shm_toc_allocate(pcxt->toc,
+ mul_size(sizeof(WalUsage), pcxt->nworkers));
+ shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_WAL_USAGE, wal_usage);
+ pvs->wal_usage = wal_usage;
+
+ /* Store query string for workers */
+ if (debug_query_string)
+ {
+ char *sharedquery;
+
+ sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
+ memcpy(sharedquery, debug_query_string, querylen + 1);
+ sharedquery[querylen] = '\0';
+ shm_toc_insert(pcxt->toc,
+ PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
+ }
+
+ /* Success -- return parallel vacuum state */
+ return pvs;
+}
+
+/*
+ * Destroy the parallel context, and end parallel mode.
+ *
+ * Since writes are not allowed during parallel mode, copy the
+ * updated index statistics from DSM into local memory and then later use that
+ * to update the index statistics. One might think that we can exit from
+ * parallel mode, update the index statistics and then destroy parallel
+ * context, but that won't be safe (see ExitParallelMode).
+ */
+void
+parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats)
+{
+ Assert(!IsParallelWorker());
+
+ /* Copy the updated statistics */
+ for (int i = 0; i < pvs->nindexes; i++)
+ {
+ PVIndStats *indstats = &(pvs->indstats[i]);
+
+ if (indstats->istat_updated)
+ {
+ istats[i] = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
+ memcpy(istats[i], &indstats->istat, sizeof(IndexBulkDeleteResult));
+ }
+ else
+ istats[i] = NULL;
+ }
+
+ DestroyParallelContext(pvs->pcxt);
+ ExitParallelMode();
+
+ pfree(pvs->will_parallel_vacuum);
+ pfree(pvs);
+}
+
+/* Returns the dead items space */
+VacDeadItems *
+parallel_vacuum_get_dead_items(ParallelVacuumState *pvs)
+{
+ return pvs->dead_items;
+}
+
+/*
+ * Do parallel index bulk-deletion with parallel workers.
+ */
+void
+parallel_vacuum_bulkdel_all_indexes(ParallelVacuumState *pvs, long num_table_tuples,
+ int num_index_scans)
+{
+ Assert(!IsParallelWorker());
+
+ /*
+ * We can only provide an approximate value of num_heap_tuples, at least
+ * for now.
+ */
+ pvs->shared->reltuples = num_table_tuples;
+ pvs->shared->estimated_count = true;
+
+ parallel_vacuum_process_all_indexes(pvs, num_index_scans, true);
+}
+
+/*
+ * Do parallel index cleanup with parallel workers.
+ */
+void
+parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tuples,
+ int num_index_scans, bool estimated_count)
+{
+ Assert(!IsParallelWorker());
+
+ /*
+ * We can provide a better estimate of total number of surviving tuples
+ * (we assume indexes are more interested in that than in the number of
+ * nominally live tuples).
+ */
+ pvs->shared->reltuples = num_table_tuples;
+ pvs->shared->estimated_count = estimated_count;
+
+ parallel_vacuum_process_all_indexes(pvs, num_index_scans, false);
+}
+
+/*
+ * Compute the number of parallel worker processes to request. Both index
+ * vacuum and index cleanup can be executed with parallel workers.
+ * The index is eligible for parallel vacuum iff its size is greater than
+ * min_parallel_index_scan_size as invoking workers for very small indexes
+ * can hurt performance.
+ *
+ * nrequested is the number of parallel workers that user requested. If
+ * nrequested is 0, we compute the parallel degree based on nindexes, that is
+ * the number of indexes that support parallel vacuum. This function also
+ * sets will_parallel_vacuum to remember indexes that participate in parallel
+ * vacuum.
+ */
+static int
+parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
+ bool *will_parallel_vacuum)
+{
+ int nindexes_parallel = 0;
+ int nindexes_parallel_bulkdel = 0;
+ int nindexes_parallel_cleanup = 0;
+ int parallel_workers;
+
+ /*
+ * We don't allow performing parallel operation in standalone backend or
+ * when parallelism is disabled.
+ */
+ if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0)
+ return 0;
+
+ /*
+ * Compute the number of indexes that can participate in parallel vacuum.
+ */
+ for (int i = 0; i < nindexes; i++)
+ {
+ Relation indrel = indrels[i];
+ uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
+
+ /* Skip index that is not a suitable target for parallel index vacuum */
+ if (vacoptions == VACUUM_OPTION_NO_PARALLEL ||
+ RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size)
+ continue;
+
+ will_parallel_vacuum[i] = true;
+
+ if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
+ nindexes_parallel_bulkdel++;
+ if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) ||
+ ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
+ nindexes_parallel_cleanup++;
+ }
+
+ nindexes_parallel = Max(nindexes_parallel_bulkdel,
+ nindexes_parallel_cleanup);
+
+ /* The leader process takes one index */
+ nindexes_parallel--;
+
+ /* No index supports parallel vacuum */
+ if (nindexes_parallel <= 0)
+ return 0;
+
+ /* Compute the parallel degree */
+ parallel_workers = (nrequested > 0) ?
+ Min(nrequested, nindexes_parallel) : nindexes_parallel;
+
+ /* Cap by max_parallel_maintenance_workers */
+ parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers);
+
+ return parallel_workers;
+}
+
+/*
+ * Perform index vacuum or index cleanup with parallel workers. This function
+ * must be used by the parallel vacuum leader process.
+ */
+static void
+parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
+ bool vacuum)
+{
+ int nworkers;
+ PVIndVacStatus new_status;
+
+ Assert(!IsParallelWorker());
+
+ if (vacuum)
+ {
+ new_status = PARALLEL_INDVAC_STATUS_NEED_BULKDELETE;
+
+ /* Determine the number of parallel workers to launch */
+ nworkers = pvs->nindexes_parallel_bulkdel;
+ }
+ else
+ {
+ new_status = PARALLEL_INDVAC_STATUS_NEED_CLEANUP;
+
+ /* Determine the number of parallel workers to launch */
+ nworkers = pvs->nindexes_parallel_cleanup;
+
+ /* Add conditionally parallel-aware indexes if in the first time call */
+ if (num_index_scans == 0)
+ nworkers += pvs->nindexes_parallel_condcleanup;
+ }
+
+ /* The leader process will participate */
+ nworkers--;
+
+ /*
+ * It is possible that parallel context is initialized with fewer workers
+ * than the number of indexes that need a separate worker in the current
+ * phase, so we need to consider it. See
+ * parallel_vacuum_compute_workers().
+ */
+ nworkers = Min(nworkers, pvs->pcxt->nworkers);
+
+ /*
+ * Set index vacuum status and mark whether parallel vacuum worker can
+ * process it.
+ */
+ for (int i = 0; i < pvs->nindexes; i++)
+ {
+ PVIndStats *indstats = &(pvs->indstats[i]);
+
+ Assert(indstats->status == PARALLEL_INDVAC_STATUS_INITIAL);
+ indstats->status = new_status;
+ indstats->parallel_workers_can_process =
+ (pvs->will_parallel_vacuum[i] &
+ parallel_vacuum_index_is_parallel_safe(pvs->indrels[i],
+ num_index_scans,
+ vacuum));
+ }
+
+ /* Reset the parallel index processing counter */
+ pg_atomic_write_u32(&(pvs->shared->idx), 0);
+
+ /* Setup the shared cost-based vacuum delay and launch workers */
+ if (nworkers > 0)
+ {
+ /* Reinitialize parallel context to relaunch parallel workers */
+ if (num_index_scans > 0)
+ ReinitializeParallelDSM(pvs->pcxt);
+
+ /*
+ * Set up shared cost balance and the number of active workers for
+ * vacuum delay. We need to do this before launching workers as
+ * otherwise, they might not see the updated values for these
+ * parameters.
+ */
+ pg_atomic_write_u32(&(pvs->shared->cost_balance), VacuumCostBalance);
+ pg_atomic_write_u32(&(pvs->shared->active_nworkers), 0);
+
+ /*
+ * The number of workers can vary between bulkdelete and cleanup
+ * phase.
+ */
+ ReinitializeParallelWorkers(pvs->pcxt, nworkers);
+
+ LaunchParallelWorkers(pvs->pcxt);
+
+ if (pvs->pcxt->nworkers_launched > 0)
+ {
+ /*
+ * Reset the local cost values for leader backend as we have
+ * already accumulated the remaining balance of heap.
+ */
+ VacuumCostBalance = 0;
+ VacuumCostBalanceLocal = 0;
+
+ /* Enable shared cost balance for leader backend */
+ VacuumSharedCostBalance = &(pvs->shared->cost_balance);
+ VacuumActiveNWorkers = &(pvs->shared->active_nworkers);
+ }
+
+ if (vacuum)
+ ereport(pvs->shared->elevel,
+ (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
+ "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
+ pvs->pcxt->nworkers_launched),
+ pvs->pcxt->nworkers_launched, nworkers)));
+ else
+ ereport(pvs->shared->elevel,
+ (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
+ "launched %d parallel vacuum workers for index cleanup (planned: %d)",
+ pvs->pcxt->nworkers_launched),
+ pvs->pcxt->nworkers_launched, nworkers)));
+ }
+
+ /* Vacuum the indexes that can be processed by only leader process */
+ parallel_vacuum_process_unsafe_indexes(pvs);
+
+ /*
+ * Join as a parallel worker. The leader vacuums alone processes all
+ * parallel-safe indexes in the case where no workers are launched.
+ */
+ parallel_vacuum_process_safe_indexes(pvs);
+
+ /*
+ * Next, accumulate buffer and WAL usage. (This must wait for the workers
+ * to finish, or we might get incomplete data.)
+ */
+ if (nworkers > 0)
+ {
+ /* Wait for all vacuum workers to finish */
+ WaitForParallelWorkersToFinish(pvs->pcxt);
+
+ for (int i = 0; i < pvs->pcxt->nworkers_launched; i++)
+ InstrAccumParallelQuery(&pvs->buffer_usage[i], &pvs->wal_usage[i]);
+ }
+
+ /*
+ * Reset all index status back to initial (while checking that we have
+ * vacuumed all indexes).
+ */
+ for (int i = 0; i < pvs->nindexes; i++)
+ {
+ PVIndStats *indstats = &(pvs->indstats[i]);
+
+ if (indstats->status != PARALLEL_INDVAC_STATUS_COMPLETED)
+ elog(ERROR, "parallel index vacuum on index \"%s\" is not completed",
+ RelationGetRelationName(pvs->indrels[i]));
+
+ indstats->status = PARALLEL_INDVAC_STATUS_INITIAL;
+ }
+
+ /*
+ * Carry the shared balance value to heap scan and disable shared costing
+ */
+ if (VacuumSharedCostBalance)
+ {
+ VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance);
+ VacuumSharedCostBalance = NULL;
+ VacuumActiveNWorkers = NULL;
+ }
+}
+
+/*
+ * Index vacuum/cleanup routine used by the leader process and parallel
+ * vacuum worker processes to vacuum the indexes in parallel.
+ */
+static void
+parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs)
+{
+ /*
+ * Increment the active worker count if we are able to launch any worker.
+ */
+ if (VacuumActiveNWorkers)
+ pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
+
+ /* Loop until all indexes are vacuumed */
+ for (;;)
+ {
+ int idx;
+ PVIndStats *indstats;
+
+ /* Get an index number to process */
+ idx = pg_atomic_fetch_add_u32(&(pvs->shared->idx), 1);
+
+ /* Done for all indexes? */
+ if (idx >= pvs->nindexes)
+ break;
+
+ indstats = &(pvs->indstats[idx]);
+
+ /*
+ * Skip vacuuming index that is unsafe for workers or has an
+ * unsuitable target for parallel index vacuum (this is vacuumed in
+ * parallel_vacuum_process_unsafe_indexes() by the leader).
+ */
+ if (!indstats->parallel_workers_can_process)
+ continue;
+
+ /* Do vacuum or cleanup of the index */
+ parallel_vacuum_process_one_index(pvs, pvs->indrels[idx], indstats);
+ }
+
+ /*
+ * We have completed the index vacuum so decrement the active worker
+ * count.
+ */
+ if (VacuumActiveNWorkers)
+ pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
+}
+
+/*
+ * Perform parallel vacuuming of indexes in leader process.
+ *
+ * Handles index vacuuming (or index cleanup) for indexes that are not
+ * parallel safe. It's possible that this will vary for a given index, based
+ * on details like whether we're performing index cleanup right now.
+ *
+ * Also performs vacuuming of smaller indexes that fell under the size cutoff
+ * enforced by parallel_vacuum_compute_workers().
+ */
+static void
+parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs)
+{
+ Assert(!IsParallelWorker());
+
+ /*
+ * Increment the active worker count if we are able to launch any worker.
+ */
+ if (VacuumActiveNWorkers)
+ pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
+
+ for (int i = 0; i < pvs->nindexes; i++)
+ {
+ PVIndStats *indstats = &(pvs->indstats[i]);
+
+ /* Skip, indexes that are safe for workers */
+ if (indstats->parallel_workers_can_process)
+ continue;
+
+ /* Do vacuum or cleanup of the index */
+ parallel_vacuum_process_one_index(pvs, pvs->indrels[i], indstats);
+ }
+
+ /*
+ * We have completed the index vacuum so decrement the active worker
+ * count.
+ */
+ if (VacuumActiveNWorkers)
+ pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
+}
+
+/*
+ * Vacuum or cleanup index either by leader process or by one of the worker
+ * process. After vacuuming the index this function copies the index
+ * statistics returned from ambulkdelete and amvacuumcleanup to the DSM
+ * segment.
+ */
+static void
+parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel,
+ PVIndStats *indstats)
+{
+ IndexBulkDeleteResult *istat = NULL;
+ IndexBulkDeleteResult *istat_res;
+ IndexVacuumInfo ivinfo;
+
+ /*
+ * Update the pointer to the corresponding bulk-deletion result if someone
+ * has already updated it
+ */
+ if (indstats->istat_updated)
+ istat = &(indstats->istat);
+
+ ivinfo.index = indrel;
+ ivinfo.analyze_only = false;
+ ivinfo.report_progress = false;
+ ivinfo.message_level = pvs->shared->elevel;
+ ivinfo.estimated_count = pvs->shared->estimated_count;
+ ivinfo.num_heap_tuples = pvs->shared->reltuples;
+ ivinfo.strategy = pvs->bstrategy;
+
+ /* Update error traceback information */
+ pvs->indname = pstrdup(RelationGetRelationName(indrel));
+ pvs->status = indstats->status;
+
+ switch (indstats->status)
+ {
+ case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
+ istat_res = vac_bulkdel_one_index(&ivinfo, istat, pvs->dead_items);
+ break;
+ case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
+ istat_res = vac_cleanup_one_index(&ivinfo, istat);
+ break;
+ default:
+ elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"",
+ indstats->status,
+ RelationGetRelationName(indrel));
+ }
+
+ /*
+ * Copy the index bulk-deletion result returned from ambulkdelete and
+ * amvacuumcleanup to the DSM segment if it's the first cycle because they
+ * allocate locally and it's possible that an index will be vacuumed by a
+ * different vacuum process the next cycle. Copying the result normally
+ * happens only the first time an index is vacuumed. For any additional
+ * vacuum pass, we directly point to the result on the DSM segment and
+ * pass it to vacuum index APIs so that workers can update it directly.
+ *
+ * Since all vacuum workers write the bulk-deletion result at different
+ * slots we can write them without locking.
+ */
+ if (!indstats->istat_updated && istat_res != NULL)
+ {
+ memcpy(&(indstats->istat), istat_res, sizeof(IndexBulkDeleteResult));
+ indstats->istat_updated = true;
+
+ /* Free the locally-allocated bulk-deletion result */
+ pfree(istat_res);
+ }
+
+ /*
+ * Update the status to completed. No need to lock here since each worker
+ * touches different indexes.
+ */
+ indstats->status = PARALLEL_INDVAC_STATUS_COMPLETED;
+
+ /* Reset error traceback information */
+ pvs->status = PARALLEL_INDVAC_STATUS_COMPLETED;
+ pfree(pvs->indname);
+ pvs->indname = NULL;
+}
+
+/*
+ * Returns false, if the given index can't participate in the next execution of
+ * parallel index vacuum or parallel index cleanup.
+ */
+static bool
+parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
+ bool vacuum)
+{
+ uint8 vacoptions;
+
+ vacoptions = indrel->rd_indam->amparallelvacuumoptions;
+
+ /* In parallel vacuum case, check if it supports parallel bulk-deletion */
+ if (vacuum)
+ return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0);
+
+ /* Not safe, if the index does not support parallel cleanup */
+ if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
+ ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
+ return false;
+
+ /*
+ * Not safe, if the index supports parallel cleanup conditionally, but we
+ * have already processed the index (for bulkdelete). We do this to avoid
+ * the need to invoke workers when parallel index cleanup doesn't need to
+ * scan the index. See the comments for option
+ * VACUUM_OPTION_PARALLEL_COND_CLEANUP to know when indexes support
+ * parallel cleanup conditionally.
+ */
+ if (num_index_scans > 0 &&
+ ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
+ return false;
+
+ return true;
+}
+
+/*
+ * Perform work within a launched parallel process.
+ *
+ * Since parallel vacuum workers perform only index vacuum or index cleanup,
+ * we don't need to report progress information.
+ */
+void
+parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
+{
+ ParallelVacuumState pvs;
+ Relation rel;
+ Relation *indrels;
+ PVIndStats *indstats;
+ PVShared *shared;
+ VacDeadItems *dead_items;
+ BufferUsage *buffer_usage;
+ WalUsage *wal_usage;
+ int nindexes;
+ char *sharedquery;
+ ErrorContextCallback errcallback;
+
+ /*
+ * A parallel vacuum worker must have only PROC_IN_VACUUM flag since we
+ * don't support parallel vacuum for autovacuum as of now.
+ */
+ Assert(MyProc->statusFlags == PROC_IN_VACUUM);
+
+ elog(DEBUG1, "starting parallel vacuum worker");
+
+ shared = (PVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, false);
+
+ /* Set debug_query_string for individual workers */
+ sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true);
+ debug_query_string = sharedquery;
+ pgstat_report_activity(STATE_RUNNING, debug_query_string);
+
+ /*
+ * Open table. The lock mode is the same as the leader process. It's
+ * okay because the lock mode does not conflict among the parallel
+ * workers.
+ */
+ rel = table_open(shared->relid, ShareUpdateExclusiveLock);
+
+ /*
+ * Open all indexes. indrels are sorted in order by OID, which should be
+ * matched to the leader's one.
+ */
+ vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels);
+ Assert(nindexes > 0);
+
+ if (shared->maintenance_work_mem_worker > 0)
+ maintenance_work_mem = shared->maintenance_work_mem_worker;
+
+ /* Set index statistics */
+ indstats = (PVIndStats *) shm_toc_lookup(toc,
+ PARALLEL_VACUUM_KEY_INDEX_STATS,
+ false);
+
+ /* Set dead_items space */
+ dead_items = (VacDeadItems *) shm_toc_lookup(toc,
+ PARALLEL_VACUUM_KEY_DEAD_ITEMS,
+ false);
+
+ /* Set cost-based vacuum delay */
+ VacuumCostActive = (VacuumCostDelay > 0);
+ VacuumCostBalance = 0;
+ VacuumPageHit = 0;
+ VacuumPageMiss = 0;
+ VacuumPageDirty = 0;
+ VacuumCostBalanceLocal = 0;
+ VacuumSharedCostBalance = &(shared->cost_balance);
+ VacuumActiveNWorkers = &(shared->active_nworkers);
+
+ /* Set parallel vacuum state */
+ pvs.indrels = indrels;
+ pvs.nindexes = nindexes;
+ pvs.indstats = indstats;
+ pvs.shared = shared;
+ pvs.dead_items = dead_items;
+ pvs.relnamespace = get_namespace_name(RelationGetNamespace(rel));
+ pvs.relname = pstrdup(RelationGetRelationName(rel));
+
+ /* These fields will be filled during index vacuum or cleanup */
+ pvs.indname = NULL;
+ pvs.status = PARALLEL_INDVAC_STATUS_INITIAL;
+
+ /* Each parallel VACUUM worker gets its own access strategy */
+ pvs.bstrategy = GetAccessStrategy(BAS_VACUUM);
+
+ /* Setup error traceback support for ereport() */
+ errcallback.callback = parallel_vacuum_error_callback;
+ errcallback.arg = &pvs;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* Prepare to track buffer usage during parallel execution */
+ InstrStartParallelQuery();
+
+ /* Process indexes to perform vacuum/cleanup */
+ parallel_vacuum_process_safe_indexes(&pvs);
+
+ /* Report buffer/WAL usage during parallel execution */
+ buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
+ wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
+ InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
+ &wal_usage[ParallelWorkerNumber]);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+
+ vac_close_indexes(nindexes, indrels, RowExclusiveLock);
+ table_close(rel, ShareUpdateExclusiveLock);
+ FreeAccessStrategy(pvs.bstrategy);
+}
+
+/*
+ * Error context callback for errors occurring during parallel index vacuum.
+ * The error context messages should match the messages set in the lazy vacuum
+ * error context. If you change this function, change vacuum_error_callback()
+ * as well.
+ */
+static void
+parallel_vacuum_error_callback(void *arg)
+{
+ ParallelVacuumState *errinfo = arg;
+
+ switch (errinfo->status)
+ {
+ case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
+ errcontext("while vacuuming index \"%s\" of relation \"%s.%s\"",
+ errinfo->indname,
+ errinfo->relnamespace,
+ errinfo->relname);
+ break;
+ case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
+ errcontext("while cleaning up index \"%s\" of relation \"%s.%s\"",
+ errinfo->indname,
+ errinfo->relnamespace,
+ errinfo->relname);
+ break;
+ case PARALLEL_INDVAC_STATUS_INITIAL:
+ case PARALLEL_INDVAC_STATUS_COMPLETED:
+ default:
+ return;
+ }
+}