* vacuumlazy.c
* Concurrent ("lazy") vacuuming.
*
- * The major space usage for vacuuming is storage for the array of dead TIDs
- * that are to be removed from indexes. We want to ensure we can vacuum even
- * the very largest relations with finite memory space usage. To do that, we
- * set upper bounds on the number of TIDs we can keep track of at once.
+ * The major space usage for vacuuming is storage for the dead tuple IDs that
+ * are to be removed from indexes. We want to ensure we can vacuum even the
+ * very largest relations with finite memory space usage. To do that, we set
+ * upper bounds on the memory that can be used for keeping track of dead TIDs
+ * at once.
*
* We are willing to use at most maintenance_work_mem (or perhaps
- * autovacuum_work_mem) memory space to keep track of dead TIDs. We initially
- * allocate an array of TIDs of that size, with an upper limit that depends on
- * table size (this limit ensures we don't allocate a huge area uselessly for
- * vacuuming small tables). If the array threatens to overflow, we must call
- * lazy_vacuum to vacuum indexes (and to vacuum the pages that we've pruned).
- * This frees up the memory space dedicated to storing dead TIDs.
+ * autovacuum_work_mem) memory space to keep track of dead TIDs. If the
+ * TID store is full, we must call lazy_vacuum to vacuum indexes (and to vacuum
+ * the pages that we've pruned). This frees up the memory space dedicated to
+ * to store dead TIDs.
*
* In practice VACUUM will often complete its initial pass over the target
* heap relation without ever running out of space to store TIDs. This means
#include "access/heapam_xlog.h"
#include "access/htup_details.h"
#include "access/multixact.h"
+#include "access/tidstore.h"
#include "access/transam.h"
#include "access/visibilitymap.h"
#include "access/xloginsert.h"
* that has been processed by lazy_scan_prune. Also needed by
* lazy_vacuum_heap_rel, which marks the same LP_DEAD line pointers as
* LP_UNUSED during second heap pass.
+ *
+ * Both dead_items and dead_items_info are allocated in shared memory in
+ * parallel vacuum cases.
*/
- VacDeadItems *dead_items; /* TIDs whose index tuples we'll delete */
+ TidStore *dead_items; /* TIDs whose index tuples we'll delete */
+ VacDeadItemsInfo *dead_items_info;
+
BlockNumber rel_pages; /* total number of pages */
BlockNumber scanned_pages; /* # pages examined (not skipped via VM) */
BlockNumber removed_pages; /* # pages removed by relation truncation */
static void lazy_vacuum(LVRelState *vacrel);
static bool lazy_vacuum_all_indexes(LVRelState *vacrel);
static void lazy_vacuum_heap_rel(LVRelState *vacrel);
-static int lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno,
- Buffer buffer, int index, Buffer vmbuffer);
+static void lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno,
+ Buffer buffer, OffsetNumber *offsets,
+ int num_offsets, Buffer vmbuffer);
static bool lazy_check_wraparound_failsafe(LVRelState *vacrel);
static void lazy_cleanup_all_indexes(LVRelState *vacrel);
static IndexBulkDeleteResult *lazy_vacuum_one_index(Relation indrel,
static BlockNumber count_nondeletable_pages(LVRelState *vacrel,
bool *lock_waiter_detected);
static void dead_items_alloc(LVRelState *vacrel, int nworkers);
+static void dead_items_add(LVRelState *vacrel, BlockNumber blkno, OffsetNumber *offsets,
+ int num_offsets);
+static void dead_items_reset(LVRelState *vacrel);
static void dead_items_cleanup(LVRelState *vacrel);
static bool heap_page_is_all_visible(LVRelState *vacrel, Buffer buf,
TransactionId *visibility_cutoff_xid, bool *all_frozen);
}
/*
- * Allocate dead_items array memory using dead_items_alloc. This handles
+ * Allocate dead_items memory using dead_items_alloc. This handles
* parallel VACUUM initialization as part of allocating shared memory
* space used for dead_items. (But do a failsafe precheck first, to
* ensure that parallel VACUUM won't be attempted at all when relfrozenxid
* have collected the TIDs whose index tuples need to be removed.
*
* Finally, invokes lazy_vacuum_heap_rel to vacuum heap pages, which
- * largely consists of marking LP_DEAD items (from collected TID array)
+ * largely consists of marking LP_DEAD items (from vacrel->dead_items)
* as LP_UNUSED. This has to happen in a second, final pass over the
* heap, to preserve a basic invariant that all index AMs rely on: no
* extant index tuple can ever be allowed to contain a TID that points to
next_fsm_block_to_vacuum = 0;
bool all_visible_according_to_vm;
- VacDeadItems *dead_items = vacrel->dead_items;
+ TidStore *dead_items = vacrel->dead_items;
+ VacDeadItemsInfo *dead_items_info = vacrel->dead_items_info;
Buffer vmbuffer = InvalidBuffer;
const int initprog_index[] = {
PROGRESS_VACUUM_PHASE,
PROGRESS_VACUUM_TOTAL_HEAP_BLKS,
- PROGRESS_VACUUM_MAX_DEAD_TUPLES
+ PROGRESS_VACUUM_MAX_DEAD_TUPLE_BYTES
};
int64 initprog_val[3];
/* Report that we're scanning the heap, advertising total # of blocks */
initprog_val[0] = PROGRESS_VACUUM_PHASE_SCAN_HEAP;
initprog_val[1] = rel_pages;
- initprog_val[2] = dead_items->max_items;
+ initprog_val[2] = dead_items_info->max_bytes;
pgstat_progress_update_multi_param(3, initprog_index, initprog_val);
/* Initialize for the first heap_vac_scan_next_block() call */
* dead_items TIDs, pause and do a cycle of vacuuming before we tackle
* this page.
*/
- Assert(dead_items->max_items >= MaxHeapTuplesPerPage);
- if (dead_items->max_items - dead_items->num_items < MaxHeapTuplesPerPage)
+ if (TidStoreMemoryUsage(dead_items) > dead_items_info->max_bytes)
{
/*
* Before beginning index vacuuming, we release any pin we may
/*
* If we didn't get the cleanup lock, we can still collect LP_DEAD
- * items in the dead_items array for later vacuuming, count live and
+ * items in the dead_items area for later vacuuming, count live and
* recently dead tuples for vacuum logging, and determine if this
* block could later be truncated. If we encounter any xid/mxids that
* require advancing the relfrozenxid/relminxid, we'll have to wait
* Like lazy_scan_noprune(), lazy_scan_prune() will count
* recently_dead_tuples and live tuples for vacuum logging, determine
* if the block can later be truncated, and accumulate the details of
- * remaining LP_DEAD line pointers on the page in the dead_items
- * array. These dead items include those pruned by lazy_scan_prune()
- * as well we line pointers previously marked LP_DEAD.
+ * remaining LP_DEAD line pointers on the page into dead_items. These
+ * dead items include those pruned by lazy_scan_prune() as well as
+ * line pointers previously marked LP_DEAD.
*/
if (got_cleanup_lock)
lazy_scan_prune(vacrel, buf, blkno, page,
* Do index vacuuming (call each index's ambulkdelete routine), then do
* related heap vacuuming
*/
- if (dead_items->num_items > 0)
+ if (dead_items_info->num_items > 0)
lazy_vacuum(vacrel);
/*
*/
if (lpdead_items > 0)
{
- VacDeadItems *dead_items = vacrel->dead_items;
- ItemPointerData tmp;
-
vacrel->lpdead_item_pages++;
- ItemPointerSetBlockNumber(&tmp, blkno);
-
- for (int i = 0; i < lpdead_items; i++)
- {
- ItemPointerSetOffsetNumber(&tmp, deadoffsets[i]);
- dead_items->items[dead_items->num_items++] = tmp;
- }
-
- Assert(dead_items->num_items <= dead_items->max_items);
- pgstat_progress_update_param(PROGRESS_VACUUM_NUM_DEAD_TUPLES,
- dead_items->num_items);
+ dead_items_add(vacrel, blkno, deadoffsets, lpdead_items);
/*
* It was convenient to ignore LP_DEAD items in all_visible earlier on
* lazy_scan_prune, which requires a full cleanup lock. While pruning isn't
* performed here, it's quite possible that an earlier opportunistic pruning
* operation left LP_DEAD items behind. We'll at least collect any such items
- * in the dead_items array for removal from indexes.
+ * in dead_items for removal from indexes.
*
* For aggressive VACUUM callers, we may return false to indicate that a full
* cleanup lock is required for processing by lazy_scan_prune. This is only
vacrel->NewRelfrozenXid = NoFreezePageRelfrozenXid;
vacrel->NewRelminMxid = NoFreezePageRelminMxid;
- /* Save any LP_DEAD items found on the page in dead_items array */
+ /* Save any LP_DEAD items found on the page in dead_items */
if (vacrel->nindexes == 0)
{
/* Using one-pass strategy (since table has no indexes) */
}
else if (lpdead_items > 0)
{
- VacDeadItems *dead_items = vacrel->dead_items;
- ItemPointerData tmp;
-
/*
* Page has LP_DEAD items, and so any references/TIDs that remain in
* indexes will be deleted during index vacuuming (and then marked
*/
vacrel->lpdead_item_pages++;
- ItemPointerSetBlockNumber(&tmp, blkno);
-
- for (int i = 0; i < lpdead_items; i++)
- {
- ItemPointerSetOffsetNumber(&tmp, deadoffsets[i]);
- dead_items->items[dead_items->num_items++] = tmp;
- }
-
- Assert(dead_items->num_items <= dead_items->max_items);
- pgstat_progress_update_param(PROGRESS_VACUUM_NUM_DEAD_TUPLES,
- dead_items->num_items);
+ dead_items_add(vacrel, blkno, deadoffsets, lpdead_items);
vacrel->lpdead_items += lpdead_items;
}
if (!vacrel->do_index_vacuuming)
{
Assert(!vacrel->do_index_cleanup);
- vacrel->dead_items->num_items = 0;
+ dead_items_reset(vacrel);
return;
}
BlockNumber threshold;
Assert(vacrel->num_index_scans == 0);
- Assert(vacrel->lpdead_items == vacrel->dead_items->num_items);
+ Assert(vacrel->lpdead_items == vacrel->dead_items_info->num_items);
Assert(vacrel->do_index_vacuuming);
Assert(vacrel->do_index_cleanup);
*/
threshold = (double) vacrel->rel_pages * BYPASS_THRESHOLD_PAGES;
bypass = (vacrel->lpdead_item_pages < threshold &&
- vacrel->lpdead_items < MAXDEADITEMS(32L * 1024L * 1024L));
+ (TidStoreMemoryUsage(vacrel->dead_items) < (32L * 1024L * 1024L)));
}
if (bypass)
* Forget the LP_DEAD items that we just vacuumed (or just decided to not
* vacuum)
*/
- vacrel->dead_items->num_items = 0;
+ dead_items_reset(vacrel);
}
/*
* place).
*/
Assert(vacrel->num_index_scans > 0 ||
- vacrel->dead_items->num_items == vacrel->lpdead_items);
+ vacrel->dead_items_info->num_items == vacrel->lpdead_items);
Assert(allindexes || VacuumFailsafeActive);
/*
/*
* lazy_vacuum_heap_rel() -- second pass over the heap for two pass strategy
*
- * This routine marks LP_DEAD items in vacrel->dead_items array as LP_UNUSED.
- * Pages that never had lazy_scan_prune record LP_DEAD items are not visited
- * at all.
+ * This routine marks LP_DEAD items in vacrel->dead_items as LP_UNUSED. Pages
+ * that never had lazy_scan_prune record LP_DEAD items are not visited at all.
*
* We may also be able to truncate the line pointer array of the heap pages we
* visit. If there is a contiguous group of LP_UNUSED items at the end of the
static void
lazy_vacuum_heap_rel(LVRelState *vacrel)
{
- int index = 0;
BlockNumber vacuumed_pages = 0;
Buffer vmbuffer = InvalidBuffer;
LVSavedErrInfo saved_err_info;
+ TidStoreIter *iter;
+ TidStoreIterResult *iter_result;
Assert(vacrel->do_index_vacuuming);
Assert(vacrel->do_index_cleanup);
VACUUM_ERRCB_PHASE_VACUUM_HEAP,
InvalidBlockNumber, InvalidOffsetNumber);
- while (index < vacrel->dead_items->num_items)
+ iter = TidStoreBeginIterate(vacrel->dead_items);
+ while ((iter_result = TidStoreIterateNext(iter)) != NULL)
{
BlockNumber blkno;
Buffer buf;
vacuum_delay_point();
- blkno = ItemPointerGetBlockNumber(&vacrel->dead_items->items[index]);
+ blkno = iter_result->blkno;
vacrel->blkno = blkno;
/*
buf = ReadBufferExtended(vacrel->rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
vacrel->bstrategy);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
- index = lazy_vacuum_heap_page(vacrel, blkno, buf, index, vmbuffer);
+ lazy_vacuum_heap_page(vacrel, blkno, buf, iter_result->offsets,
+ iter_result->num_offsets, vmbuffer);
/* Now that we've vacuumed the page, record its available space */
page = BufferGetPage(buf);
RecordPageWithFreeSpace(vacrel->rel, blkno, freespace);
vacuumed_pages++;
}
+ TidStoreEndIterate(iter);
vacrel->blkno = InvalidBlockNumber;
if (BufferIsValid(vmbuffer))
* We set all LP_DEAD items from the first heap pass to LP_UNUSED during
* the second heap pass. No more, no less.
*/
- Assert(index > 0);
Assert(vacrel->num_index_scans > 1 ||
- (index == vacrel->lpdead_items &&
+ (vacrel->dead_items_info->num_items == vacrel->lpdead_items &&
vacuumed_pages == vacrel->lpdead_item_pages));
ereport(DEBUG2,
(errmsg("table \"%s\": removed %lld dead item identifiers in %u pages",
- vacrel->relname, (long long) index, vacuumed_pages)));
+ vacrel->relname, (long long) vacrel->dead_items_info->num_items,
+ vacuumed_pages)));
/* Revert to the previous phase information for error traceback */
restore_vacuum_error_info(vacrel, &saved_err_info);
/*
* lazy_vacuum_heap_page() -- free page's LP_DEAD items listed in the
- * vacrel->dead_items array.
+ * vacrel->dead_items store.
*
* Caller must have an exclusive buffer lock on the buffer (though a full
* cleanup lock is also acceptable). vmbuffer must be valid and already have
* a pin on blkno's visibility map page.
- *
- * index is an offset into the vacrel->dead_items array for the first listed
- * LP_DEAD item on the page. The return value is the first index immediately
- * after all LP_DEAD items for the same page in the array.
*/
-static int
+static void
lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno, Buffer buffer,
- int index, Buffer vmbuffer)
+ OffsetNumber *deadoffsets, int num_offsets,
+ Buffer vmbuffer)
{
- VacDeadItems *dead_items = vacrel->dead_items;
Page page = BufferGetPage(buffer);
OffsetNumber unused[MaxHeapTuplesPerPage];
int nunused = 0;
START_CRIT_SECTION();
- for (; index < dead_items->num_items; index++)
+ for (int i = 0; i < num_offsets; i++)
{
- BlockNumber tblk;
- OffsetNumber toff;
ItemId itemid;
+ OffsetNumber toff = deadoffsets[i];
- tblk = ItemPointerGetBlockNumber(&dead_items->items[index]);
- if (tblk != blkno)
- break; /* past end of tuples for this block */
- toff = ItemPointerGetOffsetNumber(&dead_items->items[index]);
itemid = PageGetItemId(page, toff);
Assert(ItemIdIsDead(itemid) && !ItemIdHasStorage(itemid));
/* Revert to the previous phase information for error traceback */
restore_vacuum_error_info(vacrel, &saved_err_info);
- return index;
}
/*
* lazy_vacuum_one_index() -- vacuum index relation.
*
* Delete all the index tuples containing a TID collected in
- * vacrel->dead_items array. Also update running statistics.
- * Exact details depend on index AM's ambulkdelete routine.
+ * vacrel->dead_items. Also update running statistics. Exact
+ * details depend on index AM's ambulkdelete routine.
*
* reltuples is the number of heap tuples to be passed to the
* bulkdelete callback. It's always assumed to be estimated.
InvalidBlockNumber, InvalidOffsetNumber);
/* Do bulk deletion */
- istat = vac_bulkdel_one_index(&ivinfo, istat, (void *) vacrel->dead_items);
+ istat = vac_bulkdel_one_index(&ivinfo, istat, (void *) vacrel->dead_items,
+ vacrel->dead_items_info);
/* Revert to the previous phase information for error traceback */
restore_vacuum_error_info(vacrel, &saved_err_info);
}
/*
- * Returns the number of dead TIDs that VACUUM should allocate space to
- * store, given a heap rel of size vacrel->rel_pages, and given current
- * maintenance_work_mem setting (or current autovacuum_work_mem setting,
- * when applicable).
- *
- * See the comments at the head of this file for rationale.
- */
-static int
-dead_items_max_items(LVRelState *vacrel)
-{
- int64 max_items;
- int vac_work_mem = AmAutoVacuumWorkerProcess() &&
- autovacuum_work_mem != -1 ?
- autovacuum_work_mem : maintenance_work_mem;
-
- if (vacrel->nindexes > 0)
- {
- BlockNumber rel_pages = vacrel->rel_pages;
-
- max_items = MAXDEADITEMS(vac_work_mem * 1024L);
- max_items = Min(max_items, INT_MAX);
- max_items = Min(max_items, MAXDEADITEMS(MaxAllocSize));
-
- /* curious coding here to ensure the multiplication can't overflow */
- if ((BlockNumber) (max_items / MaxHeapTuplesPerPage) > rel_pages)
- max_items = rel_pages * MaxHeapTuplesPerPage;
-
- /* stay sane if small maintenance_work_mem */
- max_items = Max(max_items, MaxHeapTuplesPerPage);
- }
- else
- {
- /* One-pass case only stores a single heap page's TIDs at a time */
- max_items = MaxHeapTuplesPerPage;
- }
-
- return (int) max_items;
-}
-
-/*
- * Allocate dead_items (either using palloc, or in dynamic shared memory).
- * Sets dead_items in vacrel for caller.
+ * Allocate dead_items and dead_items_info (either using palloc, or in dynamic
+ * shared memory). Sets both in vacrel for caller.
*
* Also handles parallel initialization as part of allocating dead_items in
* DSM when required.
static void
dead_items_alloc(LVRelState *vacrel, int nworkers)
{
- VacDeadItems *dead_items;
- int max_items;
-
- max_items = dead_items_max_items(vacrel);
- Assert(max_items >= MaxHeapTuplesPerPage);
+ VacDeadItemsInfo *dead_items_info;
+ int vac_work_mem = AmAutoVacuumWorkerProcess() &&
+ autovacuum_work_mem != -1 ?
+ autovacuum_work_mem : maintenance_work_mem;
/*
* Initialize state for a parallel vacuum. As of now, only one worker can
else
vacrel->pvs = parallel_vacuum_init(vacrel->rel, vacrel->indrels,
vacrel->nindexes, nworkers,
- max_items,
+ vac_work_mem,
vacrel->verbose ? INFO : DEBUG2,
vacrel->bstrategy);
- /* If parallel mode started, dead_items space is allocated in DSM */
+ /*
+ * If parallel mode started, dead_items and dead_items_info spaces are
+ * allocated in DSM.
+ */
if (ParallelVacuumIsActive(vacrel))
{
- vacrel->dead_items = parallel_vacuum_get_dead_items(vacrel->pvs);
+ vacrel->dead_items = parallel_vacuum_get_dead_items(vacrel->pvs,
+ &vacrel->dead_items_info);
return;
}
}
- /* Serial VACUUM case */
- dead_items = (VacDeadItems *) palloc(vac_max_items_to_alloc_size(max_items));
- dead_items->max_items = max_items;
- dead_items->num_items = 0;
+ /*
+ * Serial VACUUM case. Allocate both dead_items and dead_items_info
+ * locally.
+ */
+
+ dead_items_info = (VacDeadItemsInfo *) palloc(sizeof(VacDeadItemsInfo));
+ dead_items_info->max_bytes = vac_work_mem * 1024L;
+ dead_items_info->num_items = 0;
+ vacrel->dead_items_info = dead_items_info;
+
+ vacrel->dead_items = TidStoreCreateLocal(dead_items_info->max_bytes);
+}
+
+/*
+ * Add the given block number and offset numbers to dead_items.
+ */
+static void
+dead_items_add(LVRelState *vacrel, BlockNumber blkno, OffsetNumber *offsets,
+ int num_offsets)
+{
+ TidStore *dead_items = vacrel->dead_items;
+
+ TidStoreSetBlockOffsets(dead_items, blkno, offsets, num_offsets);
+ vacrel->dead_items_info->num_items += num_offsets;
+
+ /* update the memory usage report */
+ pgstat_progress_update_param(PROGRESS_VACUUM_DEAD_TUPLE_BYTES,
+ TidStoreMemoryUsage(dead_items));
+}
+
+/*
+ * Forget all collected dead items.
+ */
+static void
+dead_items_reset(LVRelState *vacrel)
+{
+ TidStore *dead_items = vacrel->dead_items;
+
+ if (ParallelVacuumIsActive(vacrel))
+ {
+ parallel_vacuum_reset_dead_items(vacrel->pvs);
+ return;
+ }
+
+ /* Recreate the tidstore with the same max_bytes limitation */
+ TidStoreDestroy(dead_items);
+ vacrel->dead_items = TidStoreCreateLocal(vacrel->dead_items_info->max_bytes);
- vacrel->dead_items = dead_items;
+ /* Reset the counter */
+ vacrel->dead_items_info->num_items = 0;
}
/*
*
* In a parallel vacuum, we perform both index bulk deletion and index cleanup
* with parallel worker processes. Individual indexes are processed by one
- * vacuum process. ParallelVacuumState contains shared information as well as
- * the memory space for storing dead items allocated in the DSM segment. We
+ * vacuum process. ParalleVacuumState contains shared information as well as
+ * the memory space for storing dead items allocated in the DSA area. We
* launch parallel worker processes at the start of parallel index
* bulk-deletion and index cleanup and once all indexes are processed, the
* parallel worker processes exit. Each time we process indexes in parallel,
* use small integers.
*/
#define PARALLEL_VACUUM_KEY_SHARED 1
-#define PARALLEL_VACUUM_KEY_DEAD_ITEMS 2
-#define PARALLEL_VACUUM_KEY_QUERY_TEXT 3
-#define PARALLEL_VACUUM_KEY_BUFFER_USAGE 4
-#define PARALLEL_VACUUM_KEY_WAL_USAGE 5
-#define PARALLEL_VACUUM_KEY_INDEX_STATS 6
+#define PARALLEL_VACUUM_KEY_QUERY_TEXT 2
+#define PARALLEL_VACUUM_KEY_BUFFER_USAGE 3
+#define PARALLEL_VACUUM_KEY_WAL_USAGE 4
+#define PARALLEL_VACUUM_KEY_INDEX_STATS 5
/*
* Shared information among parallel workers. So this is allocated in the DSM
/* Counter for vacuuming and cleanup */
pg_atomic_uint32 idx;
+
+ /* DSA handle where the TidStore lives */
+ dsa_handle dead_items_dsa_handle;
+
+ /* DSA pointer to the shared TidStore */
+ dsa_pointer dead_items_handle;
+
+ /* Statistics of shared dead items */
+ VacDeadItemsInfo dead_items_info;
} PVShared;
/* Status used during parallel index vacuum or cleanup */
PVIndStats *indstats;
/* Shared dead items space among parallel vacuum workers */
- VacDeadItems *dead_items;
+ TidStore *dead_items;
/* Points to buffer usage area in DSM */
BufferUsage *buffer_usage;
*/
ParallelVacuumState *
parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
- int nrequested_workers, int max_items,
+ int nrequested_workers, int vac_work_mem,
int elevel, BufferAccessStrategy bstrategy)
{
ParallelVacuumState *pvs;
ParallelContext *pcxt;
PVShared *shared;
- VacDeadItems *dead_items;
+ TidStore *dead_items;
PVIndStats *indstats;
BufferUsage *buffer_usage;
WalUsage *wal_usage;
bool *will_parallel_vacuum;
Size est_indstats_len;
Size est_shared_len;
- Size est_dead_items_len;
int nindexes_mwm = 0;
int parallel_workers = 0;
int querylen;
shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len);
shm_toc_estimate_keys(&pcxt->estimator, 1);
- /* Estimate size for dead_items -- PARALLEL_VACUUM_KEY_DEAD_ITEMS */
- est_dead_items_len = vac_max_items_to_alloc_size(max_items);
- shm_toc_estimate_chunk(&pcxt->estimator, est_dead_items_len);
- shm_toc_estimate_keys(&pcxt->estimator, 1);
-
/*
* Estimate space for BufferUsage and WalUsage --
* PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE.
(nindexes_mwm > 0) ?
maintenance_work_mem / Min(parallel_workers, nindexes_mwm) :
maintenance_work_mem;
+ shared->dead_items_info.max_bytes = vac_work_mem * 1024L;
+
+ /* Prepare DSA space for dead items */
+ dead_items = TidStoreCreateShared(shared->dead_items_info.max_bytes,
+ LWTRANCHE_PARALLEL_VACUUM_DSA);
+ pvs->dead_items = dead_items;
+ shared->dead_items_handle = TidStoreGetHandle(dead_items);
+ shared->dead_items_dsa_handle = dsa_get_handle(TidStoreGetDSA(dead_items));
/* Use the same buffer size for all workers */
shared->ring_nbuffers = GetAccessStrategyBufferCount(bstrategy);
shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared);
pvs->shared = shared;
- /* Prepare the dead_items space */
- dead_items = (VacDeadItems *) shm_toc_allocate(pcxt->toc,
- est_dead_items_len);
- dead_items->max_items = max_items;
- dead_items->num_items = 0;
- MemSet(dead_items->items, 0, sizeof(ItemPointerData) * max_items);
- shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_DEAD_ITEMS, dead_items);
- pvs->dead_items = dead_items;
-
/*
* Allocate space for each worker's BufferUsage and WalUsage; no need to
* initialize
istats[i] = NULL;
}
+ TidStoreDestroy(pvs->dead_items);
+
DestroyParallelContext(pvs->pcxt);
ExitParallelMode();
pfree(pvs);
}
-/* Returns the dead items space */
-VacDeadItems *
-parallel_vacuum_get_dead_items(ParallelVacuumState *pvs)
+/*
+ * Returns the dead items space and dead items information.
+ */
+TidStore *
+parallel_vacuum_get_dead_items(ParallelVacuumState *pvs, VacDeadItemsInfo **dead_items_info_p)
{
+ *dead_items_info_p = &(pvs->shared->dead_items_info);
return pvs->dead_items;
}
+/* Forget all items in dead_items */
+void
+parallel_vacuum_reset_dead_items(ParallelVacuumState *pvs)
+{
+ TidStore *dead_items = pvs->dead_items;
+ VacDeadItemsInfo *dead_items_info = &(pvs->shared->dead_items_info);
+
+ /*
+ * Free the current tidstore and return allocated DSA segments to the
+ * operating system. Then we recreate the tidstore with the same max_bytes
+ * limitation we just used.
+ */
+ TidStoreDestroy(dead_items);
+ pvs->dead_items = TidStoreCreateShared(dead_items_info->max_bytes,
+ LWTRANCHE_PARALLEL_VACUUM_DSA);
+
+ /* Update the DSA pointer for dead_items to the new one */
+ pvs->shared->dead_items_dsa_handle = dsa_get_handle(TidStoreGetDSA(dead_items));
+ pvs->shared->dead_items_handle = TidStoreGetHandle(dead_items);
+
+ /* Reset the counter */
+ dead_items_info->num_items = 0;
+}
+
/*
* Do parallel index bulk-deletion with parallel workers.
*/
switch (indstats->status)
{
case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
- istat_res = vac_bulkdel_one_index(&ivinfo, istat, pvs->dead_items);
+ istat_res = vac_bulkdel_one_index(&ivinfo, istat, pvs->dead_items,
+ &pvs->shared->dead_items_info);
break;
case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
istat_res = vac_cleanup_one_index(&ivinfo, istat);
Relation *indrels;
PVIndStats *indstats;
PVShared *shared;
- VacDeadItems *dead_items;
+ TidStore *dead_items;
BufferUsage *buffer_usage;
WalUsage *wal_usage;
int nindexes;
PARALLEL_VACUUM_KEY_INDEX_STATS,
false);
- /* Set dead_items space */
- dead_items = (VacDeadItems *) shm_toc_lookup(toc,
- PARALLEL_VACUUM_KEY_DEAD_ITEMS,
- false);
+ /* Find dead_items in shared memory */
+ dead_items = TidStoreAttach(shared->dead_items_dsa_handle,
+ shared->dead_items_handle);
/* Set cost-based vacuum delay */
VacuumUpdateCosts();
InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
&wal_usage[ParallelWorkerNumber]);
+ TidStoreDetach(dead_items);
+
/* Pop the error context stack */
error_context_stack = errcallback.previous;