HashAgg: before spilling tuples, set unneeded columns to NULL.
This is a replacement for 4cad2534. Instead of projecting all tuples going into a HashAgg, only remove unnecessary attributes when actually spilling. This avoids the regression for the in-memory case. Discussion: https://postgr.es/m/a2fb7dfeb4f50aa0a123e42151ee3013933cb802.camel%40j-davis.com Backpatch-through: 13
This commit is contained in:
parent
0babd10980
commit
2302302236
@ -359,6 +359,14 @@ typedef struct HashAggBatch
|
||||
int64 input_tuples; /* number of tuples in this batch */
|
||||
} HashAggBatch;
|
||||
|
||||
/* used to find referenced colnos */
|
||||
typedef struct FindColsContext
|
||||
{
|
||||
bool is_aggref; /* is under an aggref */
|
||||
Bitmapset *aggregated; /* column references under an aggref */
|
||||
Bitmapset *unaggregated; /* other column references */
|
||||
} FindColsContext;
|
||||
|
||||
static void select_current_set(AggState *aggstate, int setno, bool is_hash);
|
||||
static void initialize_phase(AggState *aggstate, int newphase);
|
||||
static TupleTableSlot *fetch_input_tuple(AggState *aggstate);
|
||||
@ -391,8 +399,9 @@ static void finalize_aggregates(AggState *aggstate,
|
||||
AggStatePerAgg peragg,
|
||||
AggStatePerGroup pergroup);
|
||||
static TupleTableSlot *project_aggregates(AggState *aggstate);
|
||||
static Bitmapset *find_unaggregated_cols(AggState *aggstate);
|
||||
static bool find_unaggregated_cols_walker(Node *node, Bitmapset **colnos);
|
||||
static void find_cols(AggState *aggstate, Bitmapset **aggregated,
|
||||
Bitmapset **unaggregated);
|
||||
static bool find_cols_walker(Node *node, FindColsContext *context);
|
||||
static void build_hash_tables(AggState *aggstate);
|
||||
static void build_hash_table(AggState *aggstate, int setno, long nbuckets);
|
||||
static void hashagg_recompile_expressions(AggState *aggstate, bool minslot,
|
||||
@ -425,8 +434,8 @@ static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp);
|
||||
static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo,
|
||||
int used_bits, uint64 input_tuples,
|
||||
double hashentrysize);
|
||||
static Size hashagg_spill_tuple(HashAggSpill *spill, TupleTableSlot *slot,
|
||||
uint32 hash);
|
||||
static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
|
||||
TupleTableSlot *slot, uint32 hash);
|
||||
static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill,
|
||||
int setno);
|
||||
static void hashagg_tapeinfo_init(AggState *aggstate);
|
||||
@ -1375,26 +1384,28 @@ project_aggregates(AggState *aggstate)
|
||||
}
|
||||
|
||||
/*
|
||||
* find_unaggregated_cols
|
||||
* Construct a bitmapset of the column numbers of un-aggregated Vars
|
||||
* appearing in our targetlist and qual (HAVING clause)
|
||||
* Walk tlist and qual to find referenced colnos, dividing them into
|
||||
* aggregated and unaggregated sets.
|
||||
*/
|
||||
static Bitmapset *
|
||||
find_unaggregated_cols(AggState *aggstate)
|
||||
static void
|
||||
find_cols(AggState *aggstate, Bitmapset **aggregated, Bitmapset **unaggregated)
|
||||
{
|
||||
Agg *node = (Agg *) aggstate->ss.ps.plan;
|
||||
Bitmapset *colnos;
|
||||
Agg *agg = (Agg *) aggstate->ss.ps.plan;
|
||||
FindColsContext context;
|
||||
|
||||
colnos = NULL;
|
||||
(void) find_unaggregated_cols_walker((Node *) node->plan.targetlist,
|
||||
&colnos);
|
||||
(void) find_unaggregated_cols_walker((Node *) node->plan.qual,
|
||||
&colnos);
|
||||
return colnos;
|
||||
context.is_aggref = false;
|
||||
context.aggregated = NULL;
|
||||
context.unaggregated = NULL;
|
||||
|
||||
(void) find_cols_walker((Node *) agg->plan.targetlist, &context);
|
||||
(void) find_cols_walker((Node *) agg->plan.qual, &context);
|
||||
|
||||
*aggregated = context.aggregated;
|
||||
*unaggregated = context.unaggregated;
|
||||
}
|
||||
|
||||
static bool
|
||||
find_unaggregated_cols_walker(Node *node, Bitmapset **colnos)
|
||||
find_cols_walker(Node *node, FindColsContext *context)
|
||||
{
|
||||
if (node == NULL)
|
||||
return false;
|
||||
@ -1405,16 +1416,24 @@ find_unaggregated_cols_walker(Node *node, Bitmapset **colnos)
|
||||
/* setrefs.c should have set the varno to OUTER_VAR */
|
||||
Assert(var->varno == OUTER_VAR);
|
||||
Assert(var->varlevelsup == 0);
|
||||
*colnos = bms_add_member(*colnos, var->varattno);
|
||||
if (context->is_aggref)
|
||||
context->aggregated = bms_add_member(context->aggregated,
|
||||
var->varattno);
|
||||
else
|
||||
context->unaggregated = bms_add_member(context->unaggregated,
|
||||
var->varattno);
|
||||
return false;
|
||||
}
|
||||
if (IsA(node, Aggref) || IsA(node, GroupingFunc))
|
||||
if (IsA(node, Aggref))
|
||||
{
|
||||
/* do not descend into aggregate exprs */
|
||||
Assert(!context->is_aggref);
|
||||
context->is_aggref = true;
|
||||
expression_tree_walker(node, find_cols_walker, (void *) context);
|
||||
context->is_aggref = false;
|
||||
return false;
|
||||
}
|
||||
return expression_tree_walker(node, find_unaggregated_cols_walker,
|
||||
(void *) colnos);
|
||||
return expression_tree_walker(node, find_cols_walker,
|
||||
(void *) context);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -1532,13 +1551,27 @@ static void
|
||||
find_hash_columns(AggState *aggstate)
|
||||
{
|
||||
Bitmapset *base_colnos;
|
||||
Bitmapset *aggregated_colnos;
|
||||
TupleDesc scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
|
||||
List *outerTlist = outerPlanState(aggstate)->plan->targetlist;
|
||||
int numHashes = aggstate->num_hashes;
|
||||
EState *estate = aggstate->ss.ps.state;
|
||||
int j;
|
||||
|
||||
/* Find Vars that will be needed in tlist and qual */
|
||||
base_colnos = find_unaggregated_cols(aggstate);
|
||||
find_cols(aggstate, &aggregated_colnos, &base_colnos);
|
||||
aggstate->colnos_needed = bms_union(base_colnos, aggregated_colnos);
|
||||
aggstate->max_colno_needed = 0;
|
||||
aggstate->all_cols_needed = true;
|
||||
|
||||
for (int i = 0; i < scanDesc->natts; i++)
|
||||
{
|
||||
int colno = i + 1;
|
||||
if (bms_is_member(colno, aggstate->colnos_needed))
|
||||
aggstate->max_colno_needed = colno;
|
||||
else
|
||||
aggstate->all_cols_needed = false;
|
||||
}
|
||||
|
||||
for (j = 0; j < numHashes; ++j)
|
||||
{
|
||||
@ -2097,7 +2130,7 @@ lookup_hash_entries(AggState *aggstate)
|
||||
perhash->aggnode->numGroups,
|
||||
aggstate->hashentrysize);
|
||||
|
||||
hashagg_spill_tuple(spill, slot, hash);
|
||||
hashagg_spill_tuple(aggstate, spill, slot, hash);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -2619,7 +2652,7 @@ agg_refill_hash_table(AggState *aggstate)
|
||||
HASHAGG_READ_BUFFER_SIZE);
|
||||
for (;;)
|
||||
{
|
||||
TupleTableSlot *slot = aggstate->hash_spill_slot;
|
||||
TupleTableSlot *slot = aggstate->hash_spill_rslot;
|
||||
MinimalTuple tuple;
|
||||
uint32 hash;
|
||||
bool in_hash_table;
|
||||
@ -2655,7 +2688,7 @@ agg_refill_hash_table(AggState *aggstate)
|
||||
ngroups_estimate, aggstate->hashentrysize);
|
||||
}
|
||||
/* no memory for a new group, spill */
|
||||
hashagg_spill_tuple(&spill, slot, hash);
|
||||
hashagg_spill_tuple(aggstate, &spill, slot, hash);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -2934,9 +2967,11 @@ hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
|
||||
* partition.
|
||||
*/
|
||||
static Size
|
||||
hashagg_spill_tuple(HashAggSpill *spill, TupleTableSlot *slot, uint32 hash)
|
||||
hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
|
||||
TupleTableSlot *inputslot, uint32 hash)
|
||||
{
|
||||
LogicalTapeSet *tapeset = spill->tapeset;
|
||||
TupleTableSlot *spillslot;
|
||||
int partition;
|
||||
MinimalTuple tuple;
|
||||
int tapenum;
|
||||
@ -2945,8 +2980,28 @@ hashagg_spill_tuple(HashAggSpill *spill, TupleTableSlot *slot, uint32 hash)
|
||||
|
||||
Assert(spill->partitions != NULL);
|
||||
|
||||
/* XXX: may contain unnecessary attributes, should project */
|
||||
tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);
|
||||
/* spill only attributes that we actually need */
|
||||
if (!aggstate->all_cols_needed)
|
||||
{
|
||||
spillslot = aggstate->hash_spill_wslot;
|
||||
slot_getsomeattrs(inputslot, aggstate->max_colno_needed);
|
||||
ExecClearTuple(spillslot);
|
||||
for (int i = 0; i < spillslot->tts_tupleDescriptor->natts; i++)
|
||||
{
|
||||
if (bms_is_member(i + 1, aggstate->colnos_needed))
|
||||
{
|
||||
spillslot->tts_values[i] = inputslot->tts_values[i];
|
||||
spillslot->tts_isnull[i] = inputslot->tts_isnull[i];
|
||||
}
|
||||
else
|
||||
spillslot->tts_isnull[i] = true;
|
||||
}
|
||||
ExecStoreVirtualTuple(spillslot);
|
||||
}
|
||||
else
|
||||
spillslot = inputslot;
|
||||
|
||||
tuple = ExecFetchSlotMinimalTuple(spillslot, &shouldFree);
|
||||
|
||||
partition = (hash & spill->mask) >> spill->shift;
|
||||
spill->ntuples[partition]++;
|
||||
@ -3563,8 +3618,10 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
|
||||
aggstate->hash_metacxt = AllocSetContextCreate(aggstate->ss.ps.state->es_query_cxt,
|
||||
"HashAgg meta context",
|
||||
ALLOCSET_DEFAULT_SIZES);
|
||||
aggstate->hash_spill_slot = ExecInitExtraTupleSlot(estate, scanDesc,
|
||||
&TTSOpsMinimalTuple);
|
||||
aggstate->hash_spill_rslot = ExecInitExtraTupleSlot(estate, scanDesc,
|
||||
&TTSOpsMinimalTuple);
|
||||
aggstate->hash_spill_wslot = ExecInitExtraTupleSlot(estate, scanDesc,
|
||||
&TTSOpsVirtual);
|
||||
|
||||
/* this is an array of pointers, not structures */
|
||||
aggstate->hash_pergroup = pergroups;
|
||||
|
@ -2169,6 +2169,9 @@ typedef struct AggState
|
||||
int current_set; /* The current grouping set being evaluated */
|
||||
Bitmapset *grouped_cols; /* grouped cols in current projection */
|
||||
List *all_grouped_cols; /* list of all grouped cols in DESC order */
|
||||
Bitmapset *colnos_needed; /* all columns needed from the outer plan */
|
||||
int max_colno_needed; /* highest colno needed from outer plan */
|
||||
bool all_cols_needed; /* are all cols from outer plan needed? */
|
||||
/* These fields are for grouping set phase data */
|
||||
int maxsets; /* The max number of sets in any phase */
|
||||
AggStatePerPhase phases; /* array of all phases */
|
||||
@ -2186,7 +2189,8 @@ typedef struct AggState
|
||||
struct HashTapeInfo *hash_tapeinfo; /* metadata for spill tapes */
|
||||
struct HashAggSpill *hash_spills; /* HashAggSpill for each grouping set,
|
||||
* exists only during first pass */
|
||||
TupleTableSlot *hash_spill_slot; /* slot for reading from spill files */
|
||||
TupleTableSlot *hash_spill_rslot; /* for reading spill files */
|
||||
TupleTableSlot *hash_spill_wslot; /* for writing spill files */
|
||||
List *hash_batches; /* hash batches remaining to be processed */
|
||||
bool hash_ever_spilled; /* ever spilled during this execution? */
|
||||
bool hash_spill_mode; /* we hit a limit during the current batch
|
||||
@ -2207,7 +2211,7 @@ typedef struct AggState
|
||||
* per-group pointers */
|
||||
|
||||
/* support for evaluation of agg input expressions: */
|
||||
#define FIELDNO_AGGSTATE_ALL_PERGROUPS 49
|
||||
#define FIELDNO_AGGSTATE_ALL_PERGROUPS 53
|
||||
AggStatePerGroup *all_pergroups; /* array of first ->pergroups, than
|
||||
* ->hash_pergroup */
|
||||
ProjectionInfo *combinedproj; /* projection machinery */
|
||||
|
Loading…
x
Reference in New Issue
Block a user