#include "catalog/pg_aggregate.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_type.h"
+#include "common/hashfn.h"
#include "executor/execExpr.h"
#include "executor/executor.h"
#include "executor/nodeAgg.h"
+#include "lib/hyperloglog.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#define HASHAGG_READ_BUFFER_SIZE BLCKSZ
#define HASHAGG_WRITE_BUFFER_SIZE BLCKSZ
+/*
+ * HyperLogLog is used for estimating the cardinality of the spilled tuples in
+ * a given partition. 5 bits corresponds to a size of about 32 bytes and a
+ * worst-case error of around 18%. That's effective enough to choose a
+ * reasonable number of partitions when recursing.
+ */
+#define HASHAGG_HLL_BIT_WIDTH 5
+
/*
* Estimate chunk overhead as a constant 16 bytes. XXX: should this be
* improved?
int64 *ntuples; /* number of tuples in each partition */
uint32 mask; /* mask to find partition from hash value */
int shift; /* after masking, shift by this amount */
+ hyperLogLogState *hll_card; /* cardinality estimate for contents */
} HashAggSpill;
/*
LogicalTapeSet *tapeset; /* borrowed reference to tape set */
int input_tapenum; /* input partition tape */
int64 input_tuples; /* number of tuples in this batch */
+ double input_card; /* estimated group cardinality */
} HashAggBatch;
/* used to find referenced colnos */
static long hash_choose_num_buckets(double hashentrysize,
long estimated_nbuckets,
Size memory);
-static int hash_choose_num_partitions(uint64 input_groups,
+static int hash_choose_num_partitions(double input_groups,
double hashentrysize,
int used_bits,
int *log2_npartittions);
static void hashagg_reset_spill_state(AggState *aggstate);
static HashAggBatch *hashagg_batch_new(LogicalTapeSet *tapeset,
int input_tapenum, int setno,
- int64 input_tuples, int used_bits);
+ int64 input_tuples, double input_card,
+ int used_bits);
static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp);
static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo,
- int used_bits, uint64 input_tuples,
+ int used_bits, double input_groups,
double hashentrysize);
static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
TupleTableSlot *slot, uint32 hash);
* substantially larger than the initial value.
*/
void
-hash_agg_set_limits(double hashentrysize, uint64 input_groups, int used_bits,
+hash_agg_set_limits(double hashentrysize, double input_groups, int used_bits,
Size *mem_limit, uint64 *ngroups_limit,
int *num_partitions)
{
* *log2_npartitions to the log2() of the number of partitions.
*/
static int
-hash_choose_num_partitions(uint64 input_groups, double hashentrysize,
+hash_choose_num_partitions(double input_groups, double hashentrysize,
int used_bits, int *log2_npartitions)
{
Size mem_wanted;
AggStatePerHash perhash;
HashAggSpill spill;
HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
- uint64 ngroups_estimate;
bool spill_initialized = false;
if (aggstate->hash_batches == NIL)
batch = linitial(aggstate->hash_batches);
aggstate->hash_batches = list_delete_first(aggstate->hash_batches);
- /*
- * Estimate the number of groups for this batch as the total number of
- * tuples in its input file. Although that's a worst case, it's not bad
- * here for two reasons: (1) overestimating is better than
- * underestimating; and (2) we've already scanned the relation once, so
- * it's likely that we've already finalized many of the common values.
- */
- ngroups_estimate = batch->input_tuples;
-
- hash_agg_set_limits(aggstate->hashentrysize, ngroups_estimate,
+ hash_agg_set_limits(aggstate->hashentrysize, batch->input_card,
batch->used_bits, &aggstate->hash_mem_limit,
&aggstate->hash_ngroups_limit, NULL);
*/
spill_initialized = true;
hashagg_spill_init(&spill, tapeinfo, batch->used_bits,
- ngroups_estimate, aggstate->hashentrysize);
+ batch->input_card, aggstate->hashentrysize);
}
/* no memory for a new group, spill */
hashagg_spill_tuple(aggstate, &spill, spillslot, hash);
*/
static void
hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
- uint64 input_groups, double hashentrysize)
+ double input_groups, double hashentrysize)
{
int npartitions;
int partition_bits;
spill->partitions = palloc0(sizeof(int) * npartitions);
spill->ntuples = palloc0(sizeof(int64) * npartitions);
+ spill->hll_card = palloc0(sizeof(hyperLogLogState) * npartitions);
hashagg_tapeinfo_assign(tapeinfo, spill->partitions, npartitions);
spill->shift = 32 - used_bits - partition_bits;
spill->mask = (npartitions - 1) << spill->shift;
spill->npartitions = npartitions;
+
+ for (int i = 0; i < npartitions; i++)
+ initHyperLogLog(&spill->hll_card[i], HASHAGG_HLL_BIT_WIDTH);
}
/*
partition = (hash & spill->mask) >> spill->shift;
spill->ntuples[partition]++;
+ /*
+ * All hash values destined for a given partition have some bits in
+ * common, which causes bad HLL cardinality estimates. Hash the hash to
+ * get a more uniform distribution.
+ */
+ addHyperLogLog(&spill->hll_card[partition], hash_bytes_uint32(hash));
+
tapenum = spill->partitions[partition];
LogicalTapeWrite(tapeset, tapenum, (void *) &hash, sizeof(uint32));
*/
static HashAggBatch *
hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno,
- int64 input_tuples, int used_bits)
+ int64 input_tuples, double input_card, int used_bits)
{
HashAggBatch *batch = palloc0(sizeof(HashAggBatch));
batch->tapeset = tapeset;
batch->input_tapenum = tapenum;
batch->input_tuples = input_tuples;
+ batch->input_card = input_card;
return batch;
}
for (i = 0; i < spill->npartitions; i++)
{
- int tapenum = spill->partitions[i];
- HashAggBatch *new_batch;
+ int tapenum = spill->partitions[i];
+ HashAggBatch *new_batch;
+ double cardinality;
/* if the partition is empty, don't create a new batch of work */
if (spill->ntuples[i] == 0)
continue;
+ cardinality = estimateHyperLogLog(&spill->hll_card[i]);
+ freeHyperLogLog(&spill->hll_card[i]);
+
new_batch = hashagg_batch_new(aggstate->hash_tapeinfo->tapeset,
tapenum, setno, spill->ntuples[i],
- used_bits);
+ cardinality, used_bits);
aggstate->hash_batches = lcons(new_batch, aggstate->hash_batches);
aggstate->hash_batches_used++;
}
pfree(spill->ntuples);
+ pfree(spill->hll_card);
pfree(spill->partitions);
}