HashAgg: before spilling tuples, set unneeded columns to NULL.
authorJeff Davis <jdavis@postgresql.org>
Mon, 13 Jul 2020 00:48:49 +0000 (17:48 -0700)
committerJeff Davis <jdavis@postgresql.org>
Mon, 13 Jul 2020 05:59:32 +0000 (22:59 -0700)
This is a replacement for 4cad2534. Instead of projecting all tuples
going into a HashAgg, only remove unnecessary attributes when actually
spilling. This avoids the regression for the in-memory case.

Discussion: https://postgr.es/m/a2fb7dfeb4f50aa0a123e42151ee3013933cb802.camel%40j-davis.com
Backpatch-through: 13

src/backend/executor/nodeAgg.c
src/include/nodes/execnodes.h

index a20554ae65a678dc917c2f43bdf5367a5d9fc7b3..8eb1732ca884984f877f8334512288609559692e 100644 (file)
@@ -359,6 +359,14 @@ typedef struct HashAggBatch
    int64       input_tuples;   /* number of tuples in this batch */
 } HashAggBatch;
 
+/* used to find referenced colnos */
+typedef struct FindColsContext
+{
+   bool       is_aggref;       /* is under an aggref */
+   Bitmapset *aggregated;      /* column references under an aggref */
+   Bitmapset *unaggregated;    /* other column references */
+} FindColsContext;
+
 static void select_current_set(AggState *aggstate, int setno, bool is_hash);
 static void initialize_phase(AggState *aggstate, int newphase);
 static TupleTableSlot *fetch_input_tuple(AggState *aggstate);
@@ -391,8 +399,9 @@ static void finalize_aggregates(AggState *aggstate,
                                AggStatePerAgg peragg,
                                AggStatePerGroup pergroup);
 static TupleTableSlot *project_aggregates(AggState *aggstate);
-static Bitmapset *find_unaggregated_cols(AggState *aggstate);
-static bool find_unaggregated_cols_walker(Node *node, Bitmapset **colnos);
+static void find_cols(AggState *aggstate, Bitmapset **aggregated,
+                     Bitmapset **unaggregated);
+static bool find_cols_walker(Node *node, FindColsContext *context);
 static void build_hash_tables(AggState *aggstate);
 static void build_hash_table(AggState *aggstate, int setno, long nbuckets);
 static void hashagg_recompile_expressions(AggState *aggstate, bool minslot,
@@ -425,8 +434,8 @@ static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp);
 static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo,
                               int used_bits, uint64 input_tuples,
                               double hashentrysize);
-static Size hashagg_spill_tuple(HashAggSpill *spill, TupleTableSlot *slot,
-                               uint32 hash);
+static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
+                               TupleTableSlot *slot, uint32 hash);
 static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill,
                                 int setno);
 static void hashagg_tapeinfo_init(AggState *aggstate);
@@ -1375,26 +1384,28 @@ project_aggregates(AggState *aggstate)
 }
 
 /*
- * find_unaggregated_cols
- *   Construct a bitmapset of the column numbers of un-aggregated Vars
- *   appearing in our targetlist and qual (HAVING clause)
+ * Walk tlist and qual to find referenced colnos, dividing them into
+ * aggregated and unaggregated sets.
  */
-static Bitmapset *
-find_unaggregated_cols(AggState *aggstate)
+static void
+find_cols(AggState *aggstate, Bitmapset **aggregated, Bitmapset **unaggregated)
 {
-   Agg        *node = (Agg *) aggstate->ss.ps.plan;
-   Bitmapset  *colnos;
-
-   colnos = NULL;
-   (void) find_unaggregated_cols_walker((Node *) node->plan.targetlist,
-                                        &colnos);
-   (void) find_unaggregated_cols_walker((Node *) node->plan.qual,
-                                        &colnos);
-   return colnos;
+   Agg *agg = (Agg *) aggstate->ss.ps.plan;
+   FindColsContext context;
+
+   context.is_aggref = false;
+   context.aggregated = NULL;
+   context.unaggregated = NULL;
+
+   (void) find_cols_walker((Node *) agg->plan.targetlist, &context);
+   (void) find_cols_walker((Node *) agg->plan.qual, &context);
+
+   *aggregated = context.aggregated;
+   *unaggregated = context.unaggregated;
 }
 
 static bool
-find_unaggregated_cols_walker(Node *node, Bitmapset **colnos)
+find_cols_walker(Node *node, FindColsContext *context)
 {
    if (node == NULL)
        return false;
@@ -1405,16 +1416,24 @@ find_unaggregated_cols_walker(Node *node, Bitmapset **colnos)
        /* setrefs.c should have set the varno to OUTER_VAR */
        Assert(var->varno == OUTER_VAR);
        Assert(var->varlevelsup == 0);
-       *colnos = bms_add_member(*colnos, var->varattno);
+       if (context->is_aggref)
+           context->aggregated = bms_add_member(context->aggregated,
+                                                var->varattno);
+       else
+           context->unaggregated = bms_add_member(context->unaggregated,
+                                                  var->varattno);
        return false;
    }
-   if (IsA(node, Aggref) || IsA(node, GroupingFunc))
+   if (IsA(node, Aggref))
    {
-       /* do not descend into aggregate exprs */
+       Assert(!context->is_aggref);
+       context->is_aggref = true;
+       expression_tree_walker(node, find_cols_walker, (void *) context);
+       context->is_aggref = false;
        return false;
    }
-   return expression_tree_walker(node, find_unaggregated_cols_walker,
-                                 (void *) colnos);
+   return expression_tree_walker(node, find_cols_walker,
+                                 (void *) context);
 }
 
 /*
@@ -1532,13 +1551,27 @@ static void
 find_hash_columns(AggState *aggstate)
 {
    Bitmapset  *base_colnos;
+   Bitmapset  *aggregated_colnos;
+   TupleDesc   scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
    List       *outerTlist = outerPlanState(aggstate)->plan->targetlist;
    int         numHashes = aggstate->num_hashes;
    EState     *estate = aggstate->ss.ps.state;
    int         j;
 
    /* Find Vars that will be needed in tlist and qual */
-   base_colnos = find_unaggregated_cols(aggstate);
+   find_cols(aggstate, &aggregated_colnos, &base_colnos);
+   aggstate->colnos_needed = bms_union(base_colnos, aggregated_colnos);
+   aggstate->max_colno_needed = 0;
+   aggstate->all_cols_needed = true;
+
+   for (int i = 0; i < scanDesc->natts; i++)
+   {
+       int colno = i + 1;
+       if (bms_is_member(colno, aggstate->colnos_needed))
+           aggstate->max_colno_needed = colno;
+       else
+           aggstate->all_cols_needed = false;
+   }
 
    for (j = 0; j < numHashes; ++j)
    {
@@ -2097,7 +2130,7 @@ lookup_hash_entries(AggState *aggstate)
                                   perhash->aggnode->numGroups,
                                   aggstate->hashentrysize);
 
-           hashagg_spill_tuple(spill, slot, hash);
+           hashagg_spill_tuple(aggstate, spill, slot, hash);
        }
    }
 }
@@ -2619,7 +2652,7 @@ agg_refill_hash_table(AggState *aggstate)
                             HASHAGG_READ_BUFFER_SIZE);
    for (;;)
    {
-       TupleTableSlot *slot = aggstate->hash_spill_slot;
+       TupleTableSlot *slot = aggstate->hash_spill_rslot;
        MinimalTuple tuple;
        uint32      hash;
        bool        in_hash_table;
@@ -2655,7 +2688,7 @@ agg_refill_hash_table(AggState *aggstate)
                                   ngroups_estimate, aggstate->hashentrysize);
            }
            /* no memory for a new group, spill */
-           hashagg_spill_tuple(&spill, slot, hash);
+           hashagg_spill_tuple(aggstate, &spill, slot, hash);
        }
 
        /*
@@ -2934,9 +2967,11 @@ hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
  * partition.
  */
 static Size
-hashagg_spill_tuple(HashAggSpill *spill, TupleTableSlot *slot, uint32 hash)
+hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
+                   TupleTableSlot *inputslot, uint32 hash)
 {
    LogicalTapeSet *tapeset = spill->tapeset;
+   TupleTableSlot *spillslot;
    int         partition;
    MinimalTuple tuple;
    int         tapenum;
@@ -2945,8 +2980,28 @@ hashagg_spill_tuple(HashAggSpill *spill, TupleTableSlot *slot, uint32 hash)
 
    Assert(spill->partitions != NULL);
 
-   /* XXX: may contain unnecessary attributes, should project */
-   tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);
+   /* spill only attributes that we actually need */
+   if (!aggstate->all_cols_needed)
+   {
+       spillslot = aggstate->hash_spill_wslot;
+       slot_getsomeattrs(inputslot, aggstate->max_colno_needed);
+       ExecClearTuple(spillslot);
+       for (int i = 0; i < spillslot->tts_tupleDescriptor->natts; i++)
+       {
+           if (bms_is_member(i + 1, aggstate->colnos_needed))
+           {
+               spillslot->tts_values[i] = inputslot->tts_values[i];
+               spillslot->tts_isnull[i] = inputslot->tts_isnull[i];
+           }
+           else
+               spillslot->tts_isnull[i] = true;
+       }
+       ExecStoreVirtualTuple(spillslot);
+   }
+   else
+       spillslot = inputslot;
+
+   tuple = ExecFetchSlotMinimalTuple(spillslot, &shouldFree);
 
    partition = (hash & spill->mask) >> spill->shift;
    spill->ntuples[partition]++;
@@ -3563,8 +3618,10 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
        aggstate->hash_metacxt = AllocSetContextCreate(aggstate->ss.ps.state->es_query_cxt,
                                                       "HashAgg meta context",
                                                       ALLOCSET_DEFAULT_SIZES);
-       aggstate->hash_spill_slot = ExecInitExtraTupleSlot(estate, scanDesc,
-                                                          &TTSOpsMinimalTuple);
+       aggstate->hash_spill_rslot = ExecInitExtraTupleSlot(estate, scanDesc,
+                                                           &TTSOpsMinimalTuple);
+       aggstate->hash_spill_wslot = ExecInitExtraTupleSlot(estate, scanDesc,
+                                                           &TTSOpsVirtual);
 
        /* this is an array of pointers, not structures */
        aggstate->hash_pergroup = pergroups;
index 0187989fd19f37c85eebca2320f8e2473631f3ec..6f96b31fb438303f74911706f9a37f9b0196e9c6 100644 (file)
@@ -2169,6 +2169,9 @@ typedef struct AggState
    int         current_set;    /* The current grouping set being evaluated */
    Bitmapset  *grouped_cols;   /* grouped cols in current projection */
    List       *all_grouped_cols;   /* list of all grouped cols in DESC order */
+   Bitmapset  *colnos_needed;  /* all columns needed from the outer plan */
+   int         max_colno_needed;   /* highest colno needed from outer plan */
+   bool        all_cols_needed;    /* are all cols from outer plan needed? */
    /* These fields are for grouping set phase data */
    int         maxsets;        /* The max number of sets in any phase */
    AggStatePerPhase phases;    /* array of all phases */
@@ -2186,7 +2189,8 @@ typedef struct AggState
    struct HashTapeInfo *hash_tapeinfo; /* metadata for spill tapes */
    struct HashAggSpill *hash_spills;   /* HashAggSpill for each grouping set,
                                         * exists only during first pass */
-   TupleTableSlot *hash_spill_slot;    /* slot for reading from spill files */
+   TupleTableSlot *hash_spill_rslot;   /* for reading spill files */
+   TupleTableSlot *hash_spill_wslot;   /* for writing spill files */
    List       *hash_batches;   /* hash batches remaining to be processed */
    bool        hash_ever_spilled;  /* ever spilled during this execution? */
    bool        hash_spill_mode;    /* we hit a limit during the current batch
@@ -2207,7 +2211,7 @@ typedef struct AggState
                                         * per-group pointers */
 
    /* support for evaluation of agg input expressions: */
-#define FIELDNO_AGGSTATE_ALL_PERGROUPS 49
+#define FIELDNO_AGGSTATE_ALL_PERGROUPS 53
    AggStatePerGroup *all_pergroups;    /* array of first ->pergroups, than
                                         * ->hash_pergroup */
    ProjectionInfo *combinedproj;   /* projection machinery */