{
ParallelBlockTableScanDesc pbscan =
(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
+ ParallelBlockTableScanWorker pbscanwork =
+ (ParallelBlockTableScanWorker) scan->rs_base.rs_private;
table_block_parallelscan_startblock_init(scan->rs_base.rs_rd,
- pbscan);
+ pbscanwork, pbscan);
page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
- pbscan);
+ pbscanwork, pbscan);
/* Other processes might have already finished the scan. */
if (page == InvalidBlockNumber)
{
ParallelBlockTableScanDesc pbscan =
(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
+ ParallelBlockTableScanWorker pbscanwork =
+ (ParallelBlockTableScanWorker) scan->rs_base.rs_private;
page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
- pbscan);
+ pbscanwork, pbscan);
finished = (page == InvalidBlockNumber);
}
else
{
ParallelBlockTableScanDesc pbscan =
(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
+ ParallelBlockTableScanWorker pbscanwork =
+ (ParallelBlockTableScanWorker) scan->rs_base.rs_private;
table_block_parallelscan_startblock_init(scan->rs_base.rs_rd,
- pbscan);
+ pbscanwork, pbscan);
page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
- pbscan);
+ pbscanwork, pbscan);
/* Other processes might have already finished the scan. */
if (page == InvalidBlockNumber)
{
ParallelBlockTableScanDesc pbscan =
(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
+ ParallelBlockTableScanWorker pbscanwork =
+ (ParallelBlockTableScanWorker) scan->rs_base.rs_private;
page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
- pbscan);
+ pbscanwork, pbscan);
finished = (page == InvalidBlockNumber);
}
else
scan->rs_base.rs_nkeys = nkeys;
scan->rs_base.rs_flags = flags;
scan->rs_base.rs_parallel = parallel_scan;
+ scan->rs_base.rs_private =
+ palloc(sizeof(ParallelBlockTableScanWorkerData));
scan->rs_strategy = NULL; /* set in initscan */
/*
#include "access/tableam.h"
#include "access/xact.h"
#include "optimizer/plancat.h"
+#include "port/pg_bitutils.h"
#include "storage/bufmgr.h"
#include "storage/shmem.h"
#include "storage/smgr.h"
+/*
+ * Constants to control the behavior of block allocation to parallel workers
+ * during a parallel seqscan. Technically these values do not need to be
+ * powers of 2, but having them as powers of 2 makes the math more optimal
+ * and makes the ramp-down stepping more even.
+ */
+
+/* The number of I/O chunks we try to break a parallel seqscan down into */
+#define PARALLEL_SEQSCAN_NCHUNKS 2048
+/* Ramp down size of allocations when we've only this number of chunks left */
+#define PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS 64
+/* Cap the size of parallel I/O chunks to this number of blocks */
+#define PARALLEL_SEQSCAN_MAX_CHUNK_SIZE 8192
/* GUC variables */
char *default_table_access_method = DEFAULT_TABLE_ACCESS_METHOD;
* to set the startblock once.
*/
void
-table_block_parallelscan_startblock_init(Relation rel, ParallelBlockTableScanDesc pbscan)
+table_block_parallelscan_startblock_init(Relation rel,
+ ParallelBlockTableScanWorker pbscanwork,
+ ParallelBlockTableScanDesc pbscan)
{
BlockNumber sync_startpage = InvalidBlockNumber;
+ /* Reset the state we use for controlling allocation size. */
+ memset(pbscanwork, 0, sizeof(*pbscanwork));
+
+ StaticAssertStmt(MaxBlockNumber <= 0xFFFFFFFE,
+ "pg_nextpower2_32 may be too small for non-standard BlockNumber width");
+
+ /*
+ * We determine the chunk size based on the size of the relation. First we
+ * split the relation into PARALLEL_SEQSCAN_NCHUNKS chunks but we then
+ * take the next highest power of 2 number of the chunk size. This means
+ * we split the relation into somewhere between PARALLEL_SEQSCAN_NCHUNKS
+ * and PARALLEL_SEQSCAN_NCHUNKS / 2 chunks.
+ */
+ pbscanwork->phsw_chunk_size = pg_nextpower2_32(Max(pbscan->phs_nblocks /
+ PARALLEL_SEQSCAN_NCHUNKS, 1));
+
+ /*
+ * Ensure we don't go over the maximum chunk size with larger tables. This
+ * means we may get much more than PARALLEL_SEQSCAN_NCHUNKS for larger
+ * tables. Too large a chunk size has been shown to be detrimental to
+ * synchronous scan performance.
+ */
+ pbscanwork->phsw_chunk_size = Min(pbscanwork->phsw_chunk_size,
+ PARALLEL_SEQSCAN_MAX_CHUNK_SIZE);
+
retry:
/* Grab the spinlock. */
SpinLockAcquire(&pbscan->phs_mutex);
* backend gets an InvalidBlockNumber return.
*/
BlockNumber
-table_block_parallelscan_nextpage(Relation rel, ParallelBlockTableScanDesc pbscan)
+table_block_parallelscan_nextpage(Relation rel,
+ ParallelBlockTableScanWorker pbscanwork,
+ ParallelBlockTableScanDesc pbscan)
{
BlockNumber page;
uint64 nallocated;
/*
- * phs_nallocated tracks how many pages have been allocated to workers
+ * The logic below allocates block numbers out to parallel workers in a
+ * way that each worker will receive a set of consecutive block numbers to
+ * scan. Earlier versions of this would allocate the next highest block
+ * number to the next worker to call this function. This would generally
+ * result in workers never receiving consecutive block numbers. Some
+ * operating systems would not detect the sequential I/O pattern due to
+ * each backend being a different process which could result in poor
+ * performance due to inefficient or no readahead. To work around this
+ * issue, we now allocate a range of block numbers for each worker and
+ * when they come back for another block, we give them the next one in
+ * that range until the range is complete. When the worker completes the
+ * range of blocks we then allocate another range for it and return the
+ * first block number from that range.
+ *
+ * Here we name these ranges of blocks "chunks". The initial size of
+ * these chunks is determined in table_block_parallelscan_startblock_init
+ * based on the size of the relation. Towards the end of the scan, we
+ * start making reductions in the size of the chunks in order to attempt
+ * to divide the remaining work over all the workers as evenly as
+ * possible.
+ *
+ * Here pbscanwork is local worker memory. phsw_chunk_remaining tracks
+ * the number of blocks remaining in the chunk. When that reaches 0 then
+ * we must allocate a new chunk for the worker.
+ *
+ * phs_nallocated tracks how many blocks have been allocated to workers
* already. When phs_nallocated >= rs_nblocks, all blocks have been
* allocated.
*
* wide because of that, to avoid wrapping around when rs_nblocks is close
* to 2^32.
*
- * The actual page to return is calculated by adding the counter to the
+ * The actual block to return is calculated by adding the counter to the
* starting block number, modulo nblocks.
*/
- nallocated = pg_atomic_fetch_add_u64(&pbscan->phs_nallocated, 1);
+
+ /*
+ * First check if we have any remaining blocks in a previous chunk for
+ * this worker. We must consume all of the blocks from that before we
+ * allocate a new chunk to the worker.
+ */
+ if (pbscanwork->phsw_chunk_remaining > 0)
+ {
+ /*
+ * Give them the next block in the range and update the remaining
+ * number of blocks.
+ */
+ nallocated = ++pbscanwork->phsw_nallocated;
+ pbscanwork->phsw_chunk_remaining--;
+ }
+ else
+ {
+ /*
+ * When we've only got PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS chunks
+ * remaining in the scan, we half the chunk size. Since we reduce the
+ * chunk size here, we'll hit this again after doing
+ * PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS at the new size. After a few
+ * iterations of this, we'll end up doing the last few blocks with the
+ * chunk size set to 1.
+ */
+ if (pbscanwork->phsw_chunk_size > 1 &&
+ pbscanwork->phsw_nallocated > pbscan->phs_nblocks -
+ (pbscanwork->phsw_chunk_size * PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS))
+ pbscanwork->phsw_chunk_size >>= 1;
+
+ nallocated = pbscanwork->phsw_nallocated =
+ pg_atomic_fetch_add_u64(&pbscan->phs_nallocated,
+ pbscanwork->phsw_chunk_size);
+
+ /*
+ * Set the remaining number of blocks in this chunk so that subsequent
+ * calls from this worker continue on with this chunk until it's done.
+ */
+ pbscanwork->phsw_chunk_remaining = pbscanwork->phsw_chunk_size - 1;
+ }
+
if (nallocated >= pbscan->phs_nblocks)
page = InvalidBlockNumber; /* all blocks have been allocated */
else