HashAgg: use better cardinality estimate for recursive spilling.
authorJeff Davis <jdavis@postgresql.org>
Wed, 29 Jul 2020 06:15:47 +0000 (23:15 -0700)
committerJeff Davis <jdavis@postgresql.org>
Wed, 29 Jul 2020 06:16:28 +0000 (23:16 -0700)
Use HyperLogLog to estimate the group cardinality in a spilled
partition. This estimate is used to choose the number of partitions if
we recurse.

The previous behavior was to use the number of tuples in a spilled
partition as the estimate for the number of groups, which lead to
overpartitioning. That could cause the number of batches to be much
higher than expected (with each batch being very small), which made it
harder to interpret EXPLAIN ANALYZE results.

Reviewed-by: Peter Geoghegan
Discussion: https://postgr.es/m/a856635f9284bc36f7a77d02f47bbb6aaf7b59b3.camel@j-davis.com
Backpatch-through: 13

src/backend/executor/nodeAgg.c
src/include/executor/nodeAgg.h

index 586509c50b27e9bc76af4a8a295b9ef9189c6be6..02a9165c6941e6bb191d61452889d6318d8675bb 100644 (file)
 #include "catalog/pg_aggregate.h"
 #include "catalog/pg_proc.h"
 #include "catalog/pg_type.h"
+#include "common/hashfn.h"
 #include "executor/execExpr.h"
 #include "executor/executor.h"
 #include "executor/nodeAgg.h"
+#include "lib/hyperloglog.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "nodes/nodeFuncs.h"
 #define HASHAGG_READ_BUFFER_SIZE BLCKSZ
 #define HASHAGG_WRITE_BUFFER_SIZE BLCKSZ
 
+/*
+ * HyperLogLog is used for estimating the cardinality of the spilled tuples in
+ * a given partition. 5 bits corresponds to a size of about 32 bytes and a
+ * worst-case error of around 18%. That's effective enough to choose a
+ * reasonable number of partitions when recursing.
+ */
+#define HASHAGG_HLL_BIT_WIDTH 5
+
 /*
  * Estimate chunk overhead as a constant 16 bytes. XXX: should this be
  * improved?
@@ -339,6 +349,7 @@ typedef struct HashAggSpill
    int64      *ntuples;        /* number of tuples in each partition */
    uint32      mask;           /* mask to find partition from hash value */
    int         shift;          /* after masking, shift by this amount */
+   hyperLogLogState *hll_card; /* cardinality estimate for contents */
 } HashAggSpill;
 
 /*
@@ -357,6 +368,7 @@ typedef struct HashAggBatch
    LogicalTapeSet *tapeset;    /* borrowed reference to tape set */
    int         input_tapenum;  /* input partition tape */
    int64       input_tuples;   /* number of tuples in this batch */
+   double      input_card;     /* estimated group cardinality */
 } HashAggBatch;
 
 /* used to find referenced colnos */
@@ -411,7 +423,7 @@ static void hashagg_recompile_expressions(AggState *aggstate, bool minslot,
 static long hash_choose_num_buckets(double hashentrysize,
                                    long estimated_nbuckets,
                                    Size memory);
-static int hash_choose_num_partitions(uint64 input_groups,
+static int hash_choose_num_partitions(double input_groups,
                                       double hashentrysize,
                                       int used_bits,
                                       int *log2_npartittions);
@@ -432,10 +444,11 @@ static void hashagg_finish_initial_spills(AggState *aggstate);
 static void hashagg_reset_spill_state(AggState *aggstate);
 static HashAggBatch *hashagg_batch_new(LogicalTapeSet *tapeset,
                                       int input_tapenum, int setno,
-                                      int64 input_tuples, int used_bits);
+                                      int64 input_tuples, double input_card,
+                                      int used_bits);
 static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp);
 static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo,
-                              int used_bits, uint64 input_tuples,
+                              int used_bits, double input_groups,
                               double hashentrysize);
 static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
                                TupleTableSlot *slot, uint32 hash);
@@ -1777,7 +1790,7 @@ hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck)
  * substantially larger than the initial value.
  */
 void
-hash_agg_set_limits(double hashentrysize, uint64 input_groups, int used_bits,
+hash_agg_set_limits(double hashentrysize, double input_groups, int used_bits,
                    Size *mem_limit, uint64 *ngroups_limit,
                    int *num_partitions)
 {
@@ -1969,7 +1982,7 @@ hash_choose_num_buckets(double hashentrysize, long ngroups, Size memory)
  * *log2_npartitions to the log2() of the number of partitions.
  */
 static int
-hash_choose_num_partitions(uint64 input_groups, double hashentrysize,
+hash_choose_num_partitions(double input_groups, double hashentrysize,
                           int used_bits, int *log2_npartitions)
 {
    Size        mem_wanted;
@@ -2574,7 +2587,6 @@ agg_refill_hash_table(AggState *aggstate)
    AggStatePerHash perhash;
    HashAggSpill spill;
    HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
-   uint64      ngroups_estimate;
    bool        spill_initialized = false;
 
    if (aggstate->hash_batches == NIL)
@@ -2583,16 +2595,7 @@ agg_refill_hash_table(AggState *aggstate)
    batch = linitial(aggstate->hash_batches);
    aggstate->hash_batches = list_delete_first(aggstate->hash_batches);
 
-   /*
-    * Estimate the number of groups for this batch as the total number of
-    * tuples in its input file. Although that's a worst case, it's not bad
-    * here for two reasons: (1) overestimating is better than
-    * underestimating; and (2) we've already scanned the relation once, so
-    * it's likely that we've already finalized many of the common values.
-    */
-   ngroups_estimate = batch->input_tuples;
-
-   hash_agg_set_limits(aggstate->hashentrysize, ngroups_estimate,
+   hash_agg_set_limits(aggstate->hashentrysize, batch->input_card,
                        batch->used_bits, &aggstate->hash_mem_limit,
                        &aggstate->hash_ngroups_limit, NULL);
 
@@ -2678,7 +2681,7 @@ agg_refill_hash_table(AggState *aggstate)
                 */
                spill_initialized = true;
                hashagg_spill_init(&spill, tapeinfo, batch->used_bits,
-                                  ngroups_estimate, aggstate->hashentrysize);
+                                  batch->input_card, aggstate->hashentrysize);
            }
            /* no memory for a new group, spill */
            hashagg_spill_tuple(aggstate, &spill, spillslot, hash);
@@ -2936,7 +2939,7 @@ hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum)
  */
 static void
 hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
-                  uint64 input_groups, double hashentrysize)
+                  double input_groups, double hashentrysize)
 {
    int         npartitions;
    int         partition_bits;
@@ -2946,6 +2949,7 @@ hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
 
    spill->partitions = palloc0(sizeof(int) * npartitions);
    spill->ntuples = palloc0(sizeof(int64) * npartitions);
+   spill->hll_card = palloc0(sizeof(hyperLogLogState) * npartitions);
 
    hashagg_tapeinfo_assign(tapeinfo, spill->partitions, npartitions);
 
@@ -2953,6 +2957,9 @@ hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
    spill->shift = 32 - used_bits - partition_bits;
    spill->mask = (npartitions - 1) << spill->shift;
    spill->npartitions = npartitions;
+
+   for (int i = 0; i < npartitions; i++)
+       initHyperLogLog(&spill->hll_card[i], HASHAGG_HLL_BIT_WIDTH);
 }
 
 /*
@@ -3001,6 +3008,13 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
    partition = (hash & spill->mask) >> spill->shift;
    spill->ntuples[partition]++;
 
+   /*
+    * All hash values destined for a given partition have some bits in
+    * common, which causes bad HLL cardinality estimates. Hash the hash to
+    * get a more uniform distribution.
+    */
+   addHyperLogLog(&spill->hll_card[partition], hash_bytes_uint32(hash));
+
    tapenum = spill->partitions[partition];
 
    LogicalTapeWrite(tapeset, tapenum, (void *) &hash, sizeof(uint32));
@@ -3023,7 +3037,7 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
  */
 static HashAggBatch *
 hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno,
-                 int64 input_tuples, int used_bits)
+                 int64 input_tuples, double input_card, int used_bits)
 {
    HashAggBatch *batch = palloc0(sizeof(HashAggBatch));
 
@@ -3032,6 +3046,7 @@ hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno,
    batch->tapeset = tapeset;
    batch->input_tapenum = tapenum;
    batch->input_tuples = input_tuples;
+   batch->input_card = input_card;
 
    return batch;
 }
@@ -3135,21 +3150,26 @@ hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
 
    for (i = 0; i < spill->npartitions; i++)
    {
-       int         tapenum = spill->partitions[i];
-       HashAggBatch *new_batch;
+       int              tapenum = spill->partitions[i];
+       HashAggBatch    *new_batch;
+       double           cardinality;
 
        /* if the partition is empty, don't create a new batch of work */
        if (spill->ntuples[i] == 0)
            continue;
 
+       cardinality = estimateHyperLogLog(&spill->hll_card[i]);
+       freeHyperLogLog(&spill->hll_card[i]);
+
        new_batch = hashagg_batch_new(aggstate->hash_tapeinfo->tapeset,
                                      tapenum, setno, spill->ntuples[i],
-                                     used_bits);
+                                     cardinality, used_bits);
        aggstate->hash_batches = lcons(new_batch, aggstate->hash_batches);
        aggstate->hash_batches_used++;
    }
 
    pfree(spill->ntuples);
+   pfree(spill->hll_card);
    pfree(spill->partitions);
 }
 
index bb0805abe091a31ae4594cf0dbbcf46bb759b24d..b955169538444c0f72d36d336c5b7089b0cf94d1 100644 (file)
@@ -320,7 +320,7 @@ extern void ExecReScanAgg(AggState *node);
 
 extern Size hash_agg_entry_size(int numTrans, Size tupleWidth,
                                Size transitionSpace);
-extern void hash_agg_set_limits(double hashentrysize, uint64 input_groups,
+extern void hash_agg_set_limits(double hashentrysize, double input_groups,
                                int used_bits, Size *mem_limit,
                                uint64 *ngroups_limit, int *num_partitions);