Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeAgg.c
4 : * Routines to handle aggregate nodes.
5 : *
6 : * ExecAgg normally evaluates each aggregate in the following steps:
7 : *
8 : * transvalue = initcond
9 : * foreach input_tuple do
10 : * transvalue = transfunc(transvalue, input_value(s))
11 : * result = finalfunc(transvalue, direct_argument(s))
12 : *
13 : * If a finalfunc is not supplied then the result is just the ending
14 : * value of transvalue.
15 : *
16 : * Other behaviors can be selected by the "aggsplit" mode, which exists
17 : * to support partial aggregation. It is possible to:
18 : * * Skip running the finalfunc, so that the output is always the
19 : * final transvalue state.
20 : * * Substitute the combinefunc for the transfunc, so that transvalue
21 : * states (propagated up from a child partial-aggregation step) are merged
22 : * rather than processing raw input rows. (The statements below about
23 : * the transfunc apply equally to the combinefunc, when it's selected.)
24 : * * Apply the serializefunc to the output values (this only makes sense
25 : * when skipping the finalfunc, since the serializefunc works on the
26 : * transvalue data type).
27 : * * Apply the deserializefunc to the input values (this only makes sense
28 : * when using the combinefunc, for similar reasons).
29 : * It is the planner's responsibility to connect up Agg nodes using these
30 : * alternate behaviors in a way that makes sense, with partial aggregation
31 : * results being fed to nodes that expect them.
32 : *
33 : * If a normal aggregate call specifies DISTINCT or ORDER BY, we sort the
34 : * input tuples and eliminate duplicates (if required) before performing
35 : * the above-depicted process. (However, we don't do that for ordered-set
36 : * aggregates; their "ORDER BY" inputs are ordinary aggregate arguments
37 : * so far as this module is concerned.) Note that partial aggregation
38 : * is not supported in these cases, since we couldn't ensure global
39 : * ordering or distinctness of the inputs.
40 : *
41 : * If transfunc is marked "strict" in pg_proc and initcond is NULL,
42 : * then the first non-NULL input_value is assigned directly to transvalue,
43 : * and transfunc isn't applied until the second non-NULL input_value.
44 : * The agg's first input type and transtype must be the same in this case!
45 : *
46 : * If transfunc is marked "strict" then NULL input_values are skipped,
47 : * keeping the previous transvalue. If transfunc is not strict then it
48 : * is called for every input tuple and must deal with NULL initcond
49 : * or NULL input_values for itself.
50 : *
51 : * If finalfunc is marked "strict" then it is not called when the
52 : * ending transvalue is NULL, instead a NULL result is created
53 : * automatically (this is just the usual handling of strict functions,
54 : * of course). A non-strict finalfunc can make its own choice of
55 : * what to return for a NULL ending transvalue.
56 : *
57 : * Ordered-set aggregates are treated specially in one other way: we
58 : * evaluate any "direct" arguments and pass them to the finalfunc along
59 : * with the transition value.
60 : *
61 : * A finalfunc can have additional arguments beyond the transvalue and
62 : * any "direct" arguments, corresponding to the input arguments of the
63 : * aggregate. These are always just passed as NULL. Such arguments may be
64 : * needed to allow resolution of a polymorphic aggregate's result type.
65 : *
66 : * We compute aggregate input expressions and run the transition functions
67 : * in a temporary econtext (aggstate->tmpcontext). This is reset at least
68 : * once per input tuple, so when the transvalue datatype is
69 : * pass-by-reference, we have to be careful to copy it into a longer-lived
70 : * memory context, and free the prior value to avoid memory leakage. We
71 : * store transvalues in another set of econtexts, aggstate->aggcontexts
72 : * (one per grouping set, see below), which are also used for the hashtable
73 : * structures in AGG_HASHED mode. These econtexts are rescanned, not just
74 : * reset, at group boundaries so that aggregate transition functions can
75 : * register shutdown callbacks via AggRegisterCallback.
76 : *
77 : * The node's regular econtext (aggstate->ss.ps.ps_ExprContext) is used to
78 : * run finalize functions and compute the output tuple; this context can be
79 : * reset once per output tuple.
80 : *
81 : * The executor's AggState node is passed as the fmgr "context" value in
82 : * all transfunc and finalfunc calls. It is not recommended that the
83 : * transition functions look at the AggState node directly, but they can
84 : * use AggCheckCallContext() to verify that they are being called by
85 : * nodeAgg.c (and not as ordinary SQL functions). The main reason a
86 : * transition function might want to know this is so that it can avoid
87 : * palloc'ing a fixed-size pass-by-ref transition value on every call:
88 : * it can instead just scribble on and return its left input. Ordinarily
89 : * it is completely forbidden for functions to modify pass-by-ref inputs,
90 : * but in the aggregate case we know the left input is either the initial
91 : * transition value or a previous function result, and in either case its
92 : * value need not be preserved. See int8inc() for an example. Notice that
93 : * advance_transition_function() is coded to avoid a data copy step when
94 : * the previous transition value pointer is returned. It is also possible
95 : * to avoid repeated data copying when the transition value is an expanded
96 : * object: to do that, the transition function must take care to return
97 : * an expanded object that is in a child context of the memory context
98 : * returned by AggCheckCallContext(). Also, some transition functions want
99 : * to store working state in addition to the nominal transition value; they
100 : * can use the memory context returned by AggCheckCallContext() to do that.
101 : *
102 : * Note: AggCheckCallContext() is available as of PostgreSQL 9.0. The
103 : * AggState is available as context in earlier releases (back to 8.1),
104 : * but direct examination of the node is needed to use it before 9.0.
105 : *
106 : * As of 9.4, aggregate transition functions can also use AggGetAggref()
107 : * to get hold of the Aggref expression node for their aggregate call.
108 : * This is mainly intended for ordered-set aggregates, which are not
109 : * supported as window functions. (A regular aggregate function would
110 : * need some fallback logic to use this, since there's no Aggref node
111 : * for a window function.)
112 : *
113 : * Grouping sets:
114 : *
115 : * A list of grouping sets which is structurally equivalent to a ROLLUP
116 : * clause (e.g. (a,b,c), (a,b), (a)) can be processed in a single pass over
117 : * ordered data. We do this by keeping a separate set of transition values
118 : * for each grouping set being concurrently processed; for each input tuple
119 : * we update them all, and on group boundaries we reset those states
120 : * (starting at the front of the list) whose grouping values have changed
121 : * (the list of grouping sets is ordered from most specific to least
122 : * specific).
123 : *
124 : * Where more complex grouping sets are used, we break them down into
125 : * "phases", where each phase has a different sort order (except phase 0
126 : * which is reserved for hashing). During each phase but the last, the
127 : * input tuples are additionally stored in a tuplesort which is keyed to the
128 : * next phase's sort order; during each phase but the first, the input
129 : * tuples are drawn from the previously sorted data. (The sorting of the
130 : * data for the first phase is handled by the planner, as it might be
131 : * satisfied by underlying nodes.)
132 : *
133 : * Hashing can be mixed with sorted grouping. To do this, we have an
134 : * AGG_MIXED strategy that populates the hashtables during the first sorted
135 : * phase, and switches to reading them out after completing all sort phases.
136 : * We can also support AGG_HASHED with multiple hash tables and no sorting
137 : * at all.
138 : *
139 : * From the perspective of aggregate transition and final functions, the
140 : * only issue regarding grouping sets is this: a single call site (flinfo)
141 : * of an aggregate function may be used for updating several different
142 : * transition values in turn. So the function must not cache in the flinfo
143 : * anything which logically belongs as part of the transition value (most
144 : * importantly, the memory context in which the transition value exists).
145 : * The support API functions (AggCheckCallContext, AggRegisterCallback) are
146 : * sensitive to the grouping set for which the aggregate function is
147 : * currently being called.
148 : *
149 : * Plan structure:
150 : *
151 : * What we get from the planner is actually one "real" Agg node which is
152 : * part of the plan tree proper, but which optionally has an additional list
153 : * of Agg nodes hung off the side via the "chain" field. This is because an
154 : * Agg node happens to be a convenient representation of all the data we
155 : * need for grouping sets.
156 : *
157 : * For many purposes, we treat the "real" node as if it were just the first
158 : * node in the chain. The chain must be ordered such that hashed entries
159 : * come before sorted/plain entries; the real node is marked AGG_MIXED if
160 : * there are both types present (in which case the real node describes one
161 : * of the hashed groupings, other AGG_HASHED nodes may optionally follow in
162 : * the chain, followed in turn by AGG_SORTED or (one) AGG_PLAIN node). If
163 : * the real node is marked AGG_HASHED or AGG_SORTED, then all the chained
164 : * nodes must be of the same type; if it is AGG_PLAIN, there can be no
165 : * chained nodes.
166 : *
167 : * We collect all hashed nodes into a single "phase", numbered 0, and create
168 : * a sorted phase (numbered 1..n) for each AGG_SORTED or AGG_PLAIN node.
169 : * Phase 0 is allocated even if there are no hashes, but remains unused in
170 : * that case.
171 : *
172 : * AGG_HASHED nodes actually refer to only a single grouping set each,
173 : * because for each hashed grouping we need a separate grpColIdx and
174 : * numGroups estimate. AGG_SORTED nodes represent a "rollup", a list of
175 : * grouping sets that share a sort order. Each AGG_SORTED node other than
176 : * the first one has an associated Sort node which describes the sort order
177 : * to be used; the first sorted node takes its input from the outer subtree,
178 : * which the planner has already arranged to provide ordered data.
179 : *
180 : * Memory and ExprContext usage:
181 : *
182 : * Because we're accumulating aggregate values across input rows, we need to
183 : * use more memory contexts than just simple input/output tuple contexts.
184 : * In fact, for a rollup, we need a separate context for each grouping set
185 : * so that we can reset the inner (finer-grained) aggregates on their group
186 : * boundaries while continuing to accumulate values for outer
187 : * (coarser-grained) groupings. On top of this, we might be simultaneously
188 : * populating hashtables; however, we only need one context for all the
189 : * hashtables.
190 : *
191 : * So we create an array, aggcontexts, with an ExprContext for each grouping
192 : * set in the largest rollup that we're going to process, and use the
193 : * per-tuple memory context of those ExprContexts to store the aggregate
194 : * transition values. hashcontext is the single context created to support
195 : * all hash tables.
196 : *
197 : *
198 : * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
199 : * Portions Copyright (c) 1994, Regents of the University of California
200 : *
201 : * IDENTIFICATION
202 : * src/backend/executor/nodeAgg.c
203 : *
204 : *-------------------------------------------------------------------------
205 : */
206 :
207 : #include "postgres.h"
208 :
209 : #include "access/htup_details.h"
210 : #include "catalog/objectaccess.h"
211 : #include "catalog/pg_aggregate.h"
212 : #include "catalog/pg_proc.h"
213 : #include "catalog/pg_type.h"
214 : #include "executor/executor.h"
215 : #include "executor/nodeAgg.h"
216 : #include "miscadmin.h"
217 : #include "nodes/makefuncs.h"
218 : #include "nodes/nodeFuncs.h"
219 : #include "optimizer/clauses.h"
220 : #include "optimizer/tlist.h"
221 : #include "parser/parse_agg.h"
222 : #include "parser/parse_coerce.h"
223 : #include "utils/acl.h"
224 : #include "utils/builtins.h"
225 : #include "utils/lsyscache.h"
226 : #include "utils/memutils.h"
227 : #include "utils/syscache.h"
228 : #include "utils/tuplesort.h"
229 : #include "utils/datum.h"
230 :
231 :
232 : /*
233 : * AggStatePerTransData - per aggregate state value information
234 : *
235 : * Working state for updating the aggregate's state value, by calling the
236 : * transition function with an input row. This struct does not store the
237 : * information needed to produce the final aggregate result from the transition
238 : * state, that's stored in AggStatePerAggData instead. This separation allows
239 : * multiple aggregate results to be produced from a single state value.
240 : */
241 : typedef struct AggStatePerTransData
242 : {
243 : /*
244 : * These values are set up during ExecInitAgg() and do not change
245 : * thereafter:
246 : */
247 :
248 : /*
249 : * Link to an Aggref expr this state value is for.
250 : *
251 : * There can be multiple Aggref's sharing the same state value, as long as
252 : * the inputs and transition function are identical. This points to the
253 : * first one of them.
254 : */
255 : Aggref *aggref;
256 :
257 : /*
258 : * Nominal number of arguments for aggregate function. For plain aggs,
259 : * this excludes any ORDER BY expressions. For ordered-set aggs, this
260 : * counts both the direct and aggregated (ORDER BY) arguments.
261 : */
262 : int numArguments;
263 :
264 : /*
265 : * Number of aggregated input columns. This includes ORDER BY expressions
266 : * in both the plain-agg and ordered-set cases. Ordered-set direct args
267 : * are not counted, though.
268 : */
269 : int numInputs;
270 :
271 : /* offset of input columns in AggState->evalslot */
272 : int inputoff;
273 :
274 : /*
275 : * Number of aggregated input columns to pass to the transfn. This
276 : * includes the ORDER BY columns for ordered-set aggs, but not for plain
277 : * aggs. (This doesn't count the transition state value!)
278 : */
279 : int numTransInputs;
280 :
281 : /* Oid of the state transition or combine function */
282 : Oid transfn_oid;
283 :
284 : /* Oid of the serialization function or InvalidOid */
285 : Oid serialfn_oid;
286 :
287 : /* Oid of the deserialization function or InvalidOid */
288 : Oid deserialfn_oid;
289 :
290 : /* Oid of state value's datatype */
291 : Oid aggtranstype;
292 :
293 : /* ExprStates of the FILTER and argument expressions. */
294 : ExprState *aggfilter; /* state of FILTER expression, if any */
295 : List *aggdirectargs; /* states of direct-argument expressions */
296 :
297 : /*
298 : * fmgr lookup data for transition function or combine function. Note in
299 : * particular that the fn_strict flag is kept here.
300 : */
301 : FmgrInfo transfn;
302 :
303 : /* fmgr lookup data for serialization function */
304 : FmgrInfo serialfn;
305 :
306 : /* fmgr lookup data for deserialization function */
307 : FmgrInfo deserialfn;
308 :
309 : /* Input collation derived for aggregate */
310 : Oid aggCollation;
311 :
312 : /* number of sorting columns */
313 : int numSortCols;
314 :
315 : /* number of sorting columns to consider in DISTINCT comparisons */
316 : /* (this is either zero or the same as numSortCols) */
317 : int numDistinctCols;
318 :
319 : /* deconstructed sorting information (arrays of length numSortCols) */
320 : AttrNumber *sortColIdx;
321 : Oid *sortOperators;
322 : Oid *sortCollations;
323 : bool *sortNullsFirst;
324 :
325 : /*
326 : * fmgr lookup data for input columns' equality operators --- only
327 : * set/used when aggregate has DISTINCT flag. Note that these are in
328 : * order of sort column index, not parameter index.
329 : */
330 : FmgrInfo *equalfns; /* array of length numDistinctCols */
331 :
332 : /*
333 : * initial value from pg_aggregate entry
334 : */
335 : Datum initValue;
336 : bool initValueIsNull;
337 :
338 : /*
339 : * We need the len and byval info for the agg's input and transition data
340 : * types in order to know how to copy/delete values.
341 : *
342 : * Note that the info for the input type is used only when handling
343 : * DISTINCT aggs with just one argument, so there is only one input type.
344 : */
345 : int16 inputtypeLen,
346 : transtypeLen;
347 : bool inputtypeByVal,
348 : transtypeByVal;
349 :
350 : /*
351 : * Stuff for evaluation of aggregate inputs in cases where the aggregate
352 : * requires sorted input. The arguments themselves will be evaluated via
353 : * AggState->evalslot/evalproj for all aggregates at once, but we only
354 : * want to sort the relevant columns for individual aggregates.
355 : */
356 : TupleDesc sortdesc; /* descriptor of input tuples */
357 :
358 : /*
359 : * Slots for holding the evaluated input arguments. These are set up
360 : * during ExecInitAgg() and then used for each input row requiring
361 : * processing besides what's done in AggState->evalproj.
362 : */
363 : TupleTableSlot *sortslot; /* current input tuple */
364 : TupleTableSlot *uniqslot; /* used for multi-column DISTINCT */
365 :
366 : /*
367 : * These values are working state that is initialized at the start of an
368 : * input tuple group and updated for each input tuple.
369 : *
370 : * For a simple (non DISTINCT/ORDER BY) aggregate, we just feed the input
371 : * values straight to the transition function. If it's DISTINCT or
372 : * requires ORDER BY, we pass the input values into a Tuplesort object;
373 : * then at completion of the input tuple group, we scan the sorted values,
374 : * eliminate duplicates if needed, and run the transition function on the
375 : * rest.
376 : *
377 : * We need a separate tuplesort for each grouping set.
378 : */
379 :
380 : Tuplesortstate **sortstates; /* sort objects, if DISTINCT or ORDER BY */
381 :
382 : /*
383 : * This field is a pre-initialized FunctionCallInfo struct used for
384 : * calling this aggregate's transfn. We save a few cycles per row by not
385 : * re-initializing the unchanging fields; which isn't much, but it seems
386 : * worth the extra space consumption.
387 : */
388 : FunctionCallInfoData transfn_fcinfo;
389 :
390 : /* Likewise for serialization and deserialization functions */
391 : FunctionCallInfoData serialfn_fcinfo;
392 :
393 : FunctionCallInfoData deserialfn_fcinfo;
394 : } AggStatePerTransData;
395 :
396 : /*
397 : * AggStatePerAggData - per-aggregate information
398 : *
399 : * This contains the information needed to call the final function, to produce
400 : * a final aggregate result from the state value. If there are multiple
401 : * identical Aggrefs in the query, they can all share the same per-agg data.
402 : *
403 : * These values are set up during ExecInitAgg() and do not change thereafter.
404 : */
405 : typedef struct AggStatePerAggData
406 : {
407 : /*
408 : * Link to an Aggref expr this state value is for.
409 : *
410 : * There can be multiple identical Aggref's sharing the same per-agg. This
411 : * points to the first one of them.
412 : */
413 : Aggref *aggref;
414 :
415 : /* index to the state value which this agg should use */
416 : int transno;
417 :
418 : /* Optional Oid of final function (may be InvalidOid) */
419 : Oid finalfn_oid;
420 :
421 : /*
422 : * fmgr lookup data for final function --- only valid when finalfn_oid oid
423 : * is not InvalidOid.
424 : */
425 : FmgrInfo finalfn;
426 :
427 : /*
428 : * Number of arguments to pass to the finalfn. This is always at least 1
429 : * (the transition state value) plus any ordered-set direct args. If the
430 : * finalfn wants extra args then we pass nulls corresponding to the
431 : * aggregated input columns.
432 : */
433 : int numFinalArgs;
434 :
435 : /*
436 : * We need the len and byval info for the agg's result data type in order
437 : * to know how to copy/delete values.
438 : */
439 : int16 resulttypeLen;
440 : bool resulttypeByVal;
441 :
442 : } AggStatePerAggData;
443 :
444 : /*
445 : * AggStatePerGroupData - per-aggregate-per-group working state
446 : *
447 : * These values are working state that is initialized at the start of
448 : * an input tuple group and updated for each input tuple.
449 : *
450 : * In AGG_PLAIN and AGG_SORTED modes, we have a single array of these
451 : * structs (pointed to by aggstate->pergroup); we re-use the array for
452 : * each input group, if it's AGG_SORTED mode. In AGG_HASHED mode, the
453 : * hash table contains an array of these structs for each tuple group.
454 : *
455 : * Logically, the sortstate field belongs in this struct, but we do not
456 : * keep it here for space reasons: we don't support DISTINCT aggregates
457 : * in AGG_HASHED mode, so there's no reason to use up a pointer field
458 : * in every entry of the hashtable.
459 : */
460 : typedef struct AggStatePerGroupData
461 : {
462 : Datum transValue; /* current transition value */
463 : bool transValueIsNull;
464 :
465 : bool noTransValue; /* true if transValue not set yet */
466 :
467 : /*
468 : * Note: noTransValue initially has the same value as transValueIsNull,
469 : * and if true both are cleared to false at the same time. They are not
470 : * the same though: if transfn later returns a NULL, we want to keep that
471 : * NULL and not auto-replace it with a later input value. Only the first
472 : * non-NULL input will be auto-substituted.
473 : */
474 : } AggStatePerGroupData;
475 :
476 : /*
477 : * AggStatePerPhaseData - per-grouping-set-phase state
478 : *
479 : * Grouping sets are divided into "phases", where a single phase can be
480 : * processed in one pass over the input. If there is more than one phase, then
481 : * at the end of input from the current phase, state is reset and another pass
482 : * taken over the data which has been re-sorted in the mean time.
483 : *
484 : * Accordingly, each phase specifies a list of grouping sets and group clause
485 : * information, plus each phase after the first also has a sort order.
486 : */
487 : typedef struct AggStatePerPhaseData
488 : {
489 : AggStrategy aggstrategy; /* strategy for this phase */
490 : int numsets; /* number of grouping sets (or 0) */
491 : int *gset_lengths; /* lengths of grouping sets */
492 : Bitmapset **grouped_cols; /* column groupings for rollup */
493 : FmgrInfo *eqfunctions; /* per-grouping-field equality fns */
494 : Agg *aggnode; /* Agg node for phase data */
495 : Sort *sortnode; /* Sort node for input ordering for phase */
496 : } AggStatePerPhaseData;
497 :
498 : /*
499 : * AggStatePerHashData - per-hashtable state
500 : *
501 : * When doing grouping sets with hashing, we have one of these for each
502 : * grouping set. (When doing hashing without grouping sets, we have just one of
503 : * them.)
504 : */
505 : typedef struct AggStatePerHashData
506 : {
507 : TupleHashTable hashtable; /* hash table with one entry per group */
508 : TupleHashIterator hashiter; /* for iterating through hash table */
509 : TupleTableSlot *hashslot; /* slot for loading hash table */
510 : FmgrInfo *hashfunctions; /* per-grouping-field hash fns */
511 : FmgrInfo *eqfunctions; /* per-grouping-field equality fns */
512 : int numCols; /* number of hash key columns */
513 : int numhashGrpCols; /* number of columns in hash table */
514 : int largestGrpColIdx; /* largest col required for hashing */
515 : AttrNumber *hashGrpColIdxInput; /* hash col indices in input slot */
516 : AttrNumber *hashGrpColIdxHash; /* indices in hashtbl tuples */
517 : Agg *aggnode; /* original Agg node, for numGroups etc. */
518 : } AggStatePerHashData;
519 :
520 :
521 : static void select_current_set(AggState *aggstate, int setno, bool is_hash);
522 : static void initialize_phase(AggState *aggstate, int newphase);
523 : static TupleTableSlot *fetch_input_tuple(AggState *aggstate);
524 : static void initialize_aggregates(AggState *aggstate,
525 : AggStatePerGroup pergroup,
526 : int numReset);
527 : static void advance_transition_function(AggState *aggstate,
528 : AggStatePerTrans pertrans,
529 : AggStatePerGroup pergroupstate);
530 : static void advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup,
531 : AggStatePerGroup *pergroups);
532 : static void advance_combine_function(AggState *aggstate,
533 : AggStatePerTrans pertrans,
534 : AggStatePerGroup pergroupstate);
535 : static void combine_aggregates(AggState *aggstate, AggStatePerGroup pergroup);
536 : static void process_ordered_aggregate_single(AggState *aggstate,
537 : AggStatePerTrans pertrans,
538 : AggStatePerGroup pergroupstate);
539 : static void process_ordered_aggregate_multi(AggState *aggstate,
540 : AggStatePerTrans pertrans,
541 : AggStatePerGroup pergroupstate);
542 : static void finalize_aggregate(AggState *aggstate,
543 : AggStatePerAgg peragg,
544 : AggStatePerGroup pergroupstate,
545 : Datum *resultVal, bool *resultIsNull);
546 : static void finalize_partialaggregate(AggState *aggstate,
547 : AggStatePerAgg peragg,
548 : AggStatePerGroup pergroupstate,
549 : Datum *resultVal, bool *resultIsNull);
550 : static void prepare_projection_slot(AggState *aggstate,
551 : TupleTableSlot *slot,
552 : int currentSet);
553 : static void finalize_aggregates(AggState *aggstate,
554 : AggStatePerAgg peragg,
555 : AggStatePerGroup pergroup);
556 : static TupleTableSlot *project_aggregates(AggState *aggstate);
557 : static Bitmapset *find_unaggregated_cols(AggState *aggstate);
558 : static bool find_unaggregated_cols_walker(Node *node, Bitmapset **colnos);
559 : static void build_hash_table(AggState *aggstate);
560 : static TupleHashEntryData *lookup_hash_entry(AggState *aggstate);
561 : static AggStatePerGroup *lookup_hash_entries(AggState *aggstate);
562 : static TupleTableSlot *agg_retrieve_direct(AggState *aggstate);
563 : static void agg_fill_hash_table(AggState *aggstate);
564 : static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate);
565 : static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
566 : static void build_pertrans_for_aggref(AggStatePerTrans pertrans,
567 : AggState *aggstate, EState *estate,
568 : Aggref *aggref, Oid aggtransfn, Oid aggtranstype,
569 : Oid aggserialfn, Oid aggdeserialfn,
570 : Datum initValue, bool initValueIsNull,
571 : Oid *inputTypes, int numArguments);
572 : static int find_compatible_peragg(Aggref *newagg, AggState *aggstate,
573 : int lastaggno, List **same_input_transnos);
574 : static int find_compatible_pertrans(AggState *aggstate, Aggref *newagg,
575 : Oid aggtransfn, Oid aggtranstype,
576 : Oid aggserialfn, Oid aggdeserialfn,
577 : Datum initValue, bool initValueIsNull,
578 : List *transnos);
579 :
580 :
581 : /*
582 : * Select the current grouping set; affects current_set and
583 : * curaggcontext.
584 : */
585 : static void
586 9660450 : select_current_set(AggState *aggstate, int setno, bool is_hash)
587 : {
588 9660450 : if (is_hash)
589 347859 : aggstate->curaggcontext = aggstate->hashcontext;
590 : else
591 9312591 : aggstate->curaggcontext = aggstate->aggcontexts[setno];
592 :
593 9660450 : aggstate->current_set = setno;
594 9660450 : }
595 :
596 : /*
597 : * Switch to phase "newphase", which must either be 0 or 1 (to reset) or
598 : * current_phase + 1. Juggle the tuplesorts accordingly.
599 : *
600 : * Phase 0 is for hashing, which we currently handle last in the AGG_MIXED
601 : * case, so when entering phase 0, all we need to do is drop open sorts.
602 : */
603 : static void
604 3285 : initialize_phase(AggState *aggstate, int newphase)
605 : {
606 3285 : Assert(newphase <= 1 || newphase == aggstate->current_phase + 1);
607 :
608 : /*
609 : * Whatever the previous state, we're now done with whatever input
610 : * tuplesort was in use.
611 : */
612 3285 : if (aggstate->sort_in)
613 : {
614 6 : tuplesort_end(aggstate->sort_in);
615 6 : aggstate->sort_in = NULL;
616 : }
617 :
618 3285 : if (newphase <= 1)
619 : {
620 : /*
621 : * Discard any existing output tuplesort.
622 : */
623 3263 : if (aggstate->sort_out)
624 : {
625 1 : tuplesort_end(aggstate->sort_out);
626 1 : aggstate->sort_out = NULL;
627 : }
628 : }
629 : else
630 : {
631 : /*
632 : * The old output tuplesort becomes the new input one, and this is the
633 : * right time to actually sort it.
634 : */
635 22 : aggstate->sort_in = aggstate->sort_out;
636 22 : aggstate->sort_out = NULL;
637 22 : Assert(aggstate->sort_in);
638 22 : tuplesort_performsort(aggstate->sort_in);
639 : }
640 :
641 : /*
642 : * If this isn't the last phase, we need to sort appropriately for the
643 : * next phase in sequence.
644 : */
645 3285 : if (newphase > 0 && newphase < aggstate->numphases - 1)
646 : {
647 26 : Sort *sortnode = aggstate->phases[newphase + 1].sortnode;
648 26 : PlanState *outerNode = outerPlanState(aggstate);
649 26 : TupleDesc tupDesc = ExecGetResultType(outerNode);
650 :
651 26 : aggstate->sort_out = tuplesort_begin_heap(tupDesc,
652 : sortnode->numCols,
653 : sortnode->sortColIdx,
654 : sortnode->sortOperators,
655 : sortnode->collations,
656 : sortnode->nullsFirst,
657 : work_mem,
658 : false);
659 : }
660 :
661 3285 : aggstate->current_phase = newphase;
662 3285 : aggstate->phase = &aggstate->phases[newphase];
663 3285 : }
664 :
665 : /*
666 : * Fetch a tuple from either the outer plan (for phase 1) or from the sorter
667 : * populated by the previous phase. Copy it to the sorter for the next phase
668 : * if any.
669 : *
670 : * Callers cannot rely on memory for tuple in returned slot remaining valid
671 : * past any subsequently fetched tuple.
672 : */
673 : static TupleTableSlot *
674 9537553 : fetch_input_tuple(AggState *aggstate)
675 : {
676 : TupleTableSlot *slot;
677 :
678 9537553 : if (aggstate->sort_in)
679 : {
680 : /* make sure we check for interrupts in either path through here */
681 5123 : CHECK_FOR_INTERRUPTS();
682 5123 : if (!tuplesort_gettupleslot(aggstate->sort_in, true, false,
683 : aggstate->sort_slot, NULL))
684 22 : return NULL;
685 5101 : slot = aggstate->sort_slot;
686 : }
687 : else
688 9532430 : slot = ExecProcNode(outerPlanState(aggstate));
689 :
690 9537530 : if (!TupIsNull(slot) && aggstate->sort_out)
691 5101 : tuplesort_puttupleslot(aggstate->sort_out, slot);
692 :
693 9537530 : return slot;
694 : }
695 :
696 : /*
697 : * (Re)Initialize an individual aggregate.
698 : *
699 : * This function handles only one grouping set, already set in
700 : * aggstate->current_set.
701 : *
702 : * When called, CurrentMemoryContext should be the per-query context.
703 : */
704 : static void
705 4104 : initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans,
706 : AggStatePerGroup pergroupstate)
707 : {
708 : /*
709 : * Start a fresh sort operation for each DISTINCT/ORDER BY aggregate.
710 : */
711 4104 : if (pertrans->numSortCols > 0)
712 : {
713 : /*
714 : * In case of rescan, maybe there could be an uncompleted sort
715 : * operation? Clean it up if so.
716 : */
717 204 : if (pertrans->sortstates[aggstate->current_set])
718 0 : tuplesort_end(pertrans->sortstates[aggstate->current_set]);
719 :
720 :
721 : /*
722 : * We use a plain Datum sorter when there's a single input column;
723 : * otherwise sort the full tuple. (See comments for
724 : * process_ordered_aggregate_single.)
725 : */
726 204 : if (pertrans->numInputs == 1)
727 : {
728 124 : Form_pg_attribute attr = TupleDescAttr(pertrans->sortdesc, 0);
729 :
730 248 : pertrans->sortstates[aggstate->current_set] =
731 496 : tuplesort_begin_datum(attr->atttypid,
732 124 : pertrans->sortOperators[0],
733 124 : pertrans->sortCollations[0],
734 124 : pertrans->sortNullsFirst[0],
735 : work_mem, false);
736 : }
737 : else
738 160 : pertrans->sortstates[aggstate->current_set] =
739 80 : tuplesort_begin_heap(pertrans->sortdesc,
740 : pertrans->numSortCols,
741 : pertrans->sortColIdx,
742 : pertrans->sortOperators,
743 : pertrans->sortCollations,
744 : pertrans->sortNullsFirst,
745 : work_mem, false);
746 : }
747 :
748 : /*
749 : * (Re)set transValue to the initial value.
750 : *
751 : * Note that when the initial value is pass-by-ref, we must copy it (into
752 : * the aggcontext) since we will pfree the transValue later.
753 : */
754 4104 : if (pertrans->initValueIsNull)
755 2153 : pergroupstate->transValue = pertrans->initValue;
756 : else
757 : {
758 : MemoryContext oldContext;
759 :
760 1951 : oldContext = MemoryContextSwitchTo(
761 1951 : aggstate->curaggcontext->ecxt_per_tuple_memory);
762 3902 : pergroupstate->transValue = datumCopy(pertrans->initValue,
763 1951 : pertrans->transtypeByVal,
764 1951 : pertrans->transtypeLen);
765 1951 : MemoryContextSwitchTo(oldContext);
766 : }
767 4104 : pergroupstate->transValueIsNull = pertrans->initValueIsNull;
768 :
769 : /*
770 : * If the initial value for the transition state doesn't exist in the
771 : * pg_aggregate table then we will let the first non-NULL value returned
772 : * from the outer procNode become the initial value. (This is useful for
773 : * aggregates like max() and min().) The noTransValue flag signals that we
774 : * still need to do this.
775 : */
776 4104 : pergroupstate->noTransValue = pertrans->initValueIsNull;
777 4104 : }
778 :
779 : /*
780 : * Initialize all aggregate transition states for a new group of input values.
781 : *
782 : * If there are multiple grouping sets, we initialize only the first numReset
783 : * of them (the grouping sets are ordered so that the most specific one, which
784 : * is reset most often, is first). As a convenience, if numReset is 0, we
785 : * reinitialize all sets. numReset is -1 to initialize a hashtable entry, in
786 : * which case the caller must have used select_current_set appropriately.
787 : *
788 : * When called, CurrentMemoryContext should be the per-query context.
789 : */
790 : static void
791 7435 : initialize_aggregates(AggState *aggstate,
792 : AggStatePerGroup pergroup,
793 : int numReset)
794 : {
795 : int transno;
796 7435 : int numGroupingSets = Max(aggstate->phase->numsets, 1);
797 7435 : int setno = 0;
798 7435 : int numTrans = aggstate->numtrans;
799 7435 : AggStatePerTrans transstates = aggstate->pertrans;
800 :
801 7435 : if (numReset == 0)
802 0 : numReset = numGroupingSets;
803 :
804 11295 : for (transno = 0; transno < numTrans; transno++)
805 : {
806 3860 : AggStatePerTrans pertrans = &transstates[transno];
807 :
808 3860 : if (numReset < 0)
809 : {
810 : AggStatePerGroup pergroupstate;
811 :
812 1004 : pergroupstate = &pergroup[transno];
813 :
814 1004 : initialize_aggregate(aggstate, pertrans, pergroupstate);
815 : }
816 : else
817 : {
818 5956 : for (setno = 0; setno < numReset; setno++)
819 : {
820 : AggStatePerGroup pergroupstate;
821 :
822 3100 : pergroupstate = &pergroup[transno + (setno * numTrans)];
823 :
824 3100 : select_current_set(aggstate, setno, false);
825 :
826 3100 : initialize_aggregate(aggstate, pertrans, pergroupstate);
827 : }
828 : }
829 : }
830 7435 : }
831 :
832 : /*
833 : * Given new input value(s), advance the transition function of one aggregate
834 : * state within one grouping set only (already set in aggstate->current_set)
835 : *
836 : * The new values (and null flags) have been preloaded into argument positions
837 : * 1 and up in pertrans->transfn_fcinfo, so that we needn't copy them again to
838 : * pass to the transition function. We also expect that the static fields of
839 : * the fcinfo are already initialized; that was done by ExecInitAgg().
840 : *
841 : * It doesn't matter which memory context this is called in.
842 : */
843 : static void
844 9420784 : advance_transition_function(AggState *aggstate,
845 : AggStatePerTrans pertrans,
846 : AggStatePerGroup pergroupstate)
847 : {
848 9420784 : FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo;
849 : MemoryContext oldContext;
850 : Datum newVal;
851 :
852 9420784 : if (pertrans->transfn.fn_strict)
853 : {
854 : /*
855 : * For a strict transfn, nothing happens when there's a NULL input; we
856 : * just keep the prior transValue.
857 : */
858 9013297 : int numTransInputs = pertrans->numTransInputs;
859 : int i;
860 :
861 9150039 : for (i = 1; i <= numTransInputs; i++)
862 : {
863 136795 : if (fcinfo->argnull[i])
864 53 : return;
865 : }
866 9013244 : if (pergroupstate->noTransValue)
867 : {
868 : /*
869 : * transValue has not been initialized. This is the first non-NULL
870 : * input value. We use it as the initial value for transValue. (We
871 : * already checked that the agg's input type is binary-compatible
872 : * with its transtype, so straight copy here is OK.)
873 : *
874 : * We must copy the datum into aggcontext if it is pass-by-ref. We
875 : * do not need to pfree the old transValue, since it's NULL.
876 : */
877 385 : oldContext = MemoryContextSwitchTo(
878 385 : aggstate->curaggcontext->ecxt_per_tuple_memory);
879 770 : pergroupstate->transValue = datumCopy(fcinfo->arg[1],
880 385 : pertrans->transtypeByVal,
881 385 : pertrans->transtypeLen);
882 385 : pergroupstate->transValueIsNull = false;
883 385 : pergroupstate->noTransValue = false;
884 385 : MemoryContextSwitchTo(oldContext);
885 385 : return;
886 : }
887 9012859 : if (pergroupstate->transValueIsNull)
888 : {
889 : /*
890 : * Don't call a strict function with NULL inputs. Note it is
891 : * possible to get here despite the above tests, if the transfn is
892 : * strict *and* returned a NULL on a prior cycle. If that happens
893 : * we will propagate the NULL all the way to the end.
894 : */
895 0 : return;
896 : }
897 : }
898 :
899 : /* We run the transition functions in per-input-tuple memory context */
900 9420346 : oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory);
901 :
902 : /* set up aggstate->curpertrans for AggGetAggref() */
903 9420346 : aggstate->curpertrans = pertrans;
904 :
905 : /*
906 : * OK to call the transition function
907 : */
908 9420346 : fcinfo->arg[0] = pergroupstate->transValue;
909 9420346 : fcinfo->argnull[0] = pergroupstate->transValueIsNull;
910 9420346 : fcinfo->isnull = false; /* just in case transfn doesn't set it */
911 :
912 9420346 : newVal = FunctionCallInvoke(fcinfo);
913 :
914 9420339 : aggstate->curpertrans = NULL;
915 :
916 : /*
917 : * If pass-by-ref datatype, must copy the new value into aggcontext and
918 : * free the prior transValue. But if transfn returned a pointer to its
919 : * first input, we don't need to do anything. Also, if transfn returned a
920 : * pointer to a R/W expanded object that is already a child of the
921 : * aggcontext, assume we can adopt that value without copying it.
922 : */
923 18477322 : if (!pertrans->transtypeByVal &&
924 9056983 : DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
925 : {
926 1338 : if (!fcinfo->isnull)
927 : {
928 1338 : MemoryContextSwitchTo(aggstate->curaggcontext->ecxt_per_tuple_memory);
929 1344 : if (DatumIsReadWriteExpandedObject(newVal,
930 : false,
931 366 : pertrans->transtypeLen) &&
932 6 : MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
933 : /* do nothing */ ;
934 : else
935 2672 : newVal = datumCopy(newVal,
936 1336 : pertrans->transtypeByVal,
937 1336 : pertrans->transtypeLen);
938 : }
939 1338 : if (!pergroupstate->transValueIsNull)
940 : {
941 732 : if (DatumIsReadWriteExpandedObject(pergroupstate->transValue,
942 : false,
943 : pertrans->transtypeLen))
944 0 : DeleteExpandedObject(pergroupstate->transValue);
945 : else
946 732 : pfree(DatumGetPointer(pergroupstate->transValue));
947 : }
948 : }
949 :
950 9420339 : pergroupstate->transValue = newVal;
951 9420339 : pergroupstate->transValueIsNull = fcinfo->isnull;
952 :
953 9420339 : MemoryContextSwitchTo(oldContext);
954 : }
955 :
956 : /*
957 : * Advance each aggregate transition state for one input tuple. The input
958 : * tuple has been stored in tmpcontext->ecxt_outertuple, so that it is
959 : * accessible to ExecEvalExpr.
960 : *
961 : * We have two sets of transition states to handle: one for sorted aggregation
962 : * and one for hashed; we do them both here, to avoid multiple evaluation of
963 : * the inputs.
964 : *
965 : * When called, CurrentMemoryContext should be the per-query context.
966 : */
967 : static void
968 9534947 : advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup, AggStatePerGroup *pergroups)
969 : {
970 : int transno;
971 9534947 : int setno = 0;
972 9534947 : int numGroupingSets = Max(aggstate->phase->numsets, 1);
973 9534947 : int numHashes = aggstate->num_hashes;
974 9534947 : int numTrans = aggstate->numtrans;
975 9534947 : TupleTableSlot *slot = aggstate->evalslot;
976 :
977 : /* compute input for all aggregates */
978 9534947 : if (aggstate->evalproj)
979 9534947 : aggstate->evalslot = ExecProject(aggstate->evalproj);
980 :
981 18969600 : for (transno = 0; transno < numTrans; transno++)
982 : {
983 9434660 : AggStatePerTrans pertrans = &aggstate->pertrans[transno];
984 9434660 : ExprState *filter = pertrans->aggfilter;
985 9434660 : int numTransInputs = pertrans->numTransInputs;
986 : int i;
987 9434660 : int inputoff = pertrans->inputoff;
988 :
989 : /* Skip anything FILTERed out */
990 9434660 : if (filter)
991 : {
992 : Datum res;
993 : bool isnull;
994 :
995 43025 : res = ExecEvalExprSwitchContext(filter, aggstate->tmpcontext,
996 : &isnull);
997 43025 : if (isnull || !DatumGetBool(res))
998 32899 : continue;
999 : }
1000 :
1001 9401761 : if (pertrans->numSortCols > 0)
1002 : {
1003 : /* DISTINCT and/or ORDER BY case */
1004 7712 : Assert(slot->tts_nvalid >= (pertrans->numInputs + inputoff));
1005 7712 : Assert(!pergroups);
1006 :
1007 : /*
1008 : * If the transfn is strict, we want to check for nullity before
1009 : * storing the row in the sorter, to save space if there are a lot
1010 : * of nulls. Note that we must only check numTransInputs columns,
1011 : * not numInputs, since nullity in columns used only for sorting
1012 : * is not relevant here.
1013 : */
1014 7712 : if (pertrans->transfn.fn_strict)
1015 : {
1016 6876 : for (i = 0; i < numTransInputs; i++)
1017 : {
1018 3462 : if (slot->tts_isnull[i + inputoff])
1019 6 : break;
1020 : }
1021 3420 : if (i < numTransInputs)
1022 6 : continue;
1023 : }
1024 :
1025 16452 : for (setno = 0; setno < numGroupingSets; setno++)
1026 : {
1027 : /* OK, put the tuple into the tuplesort object */
1028 8746 : if (pertrans->numInputs == 1)
1029 16978 : tuplesort_putdatum(pertrans->sortstates[setno],
1030 8489 : slot->tts_values[inputoff],
1031 8489 : slot->tts_isnull[inputoff]);
1032 : else
1033 : {
1034 : /*
1035 : * Copy slot contents, starting from inputoff, into sort
1036 : * slot.
1037 : */
1038 257 : ExecClearTuple(pertrans->sortslot);
1039 771 : memcpy(pertrans->sortslot->tts_values,
1040 514 : &slot->tts_values[inputoff],
1041 257 : pertrans->numInputs * sizeof(Datum));
1042 771 : memcpy(pertrans->sortslot->tts_isnull,
1043 514 : &slot->tts_isnull[inputoff],
1044 257 : pertrans->numInputs * sizeof(bool));
1045 257 : pertrans->sortslot->tts_nvalid = pertrans->numInputs;
1046 257 : ExecStoreVirtualTuple(pertrans->sortslot);
1047 257 : tuplesort_puttupleslot(pertrans->sortstates[setno], pertrans->sortslot);
1048 : }
1049 : }
1050 : }
1051 : else
1052 : {
1053 : /* We can apply the transition function immediately */
1054 9394049 : FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo;
1055 :
1056 : /* Load values into fcinfo */
1057 : /* Start from 1, since the 0th arg will be the transition value */
1058 9394049 : Assert(slot->tts_nvalid >= (numTransInputs + inputoff));
1059 :
1060 9964256 : for (i = 0; i < numTransInputs; i++)
1061 : {
1062 570207 : fcinfo->arg[i + 1] = slot->tts_values[i + inputoff];
1063 570207 : fcinfo->argnull[i + 1] = slot->tts_isnull[i + inputoff];
1064 : }
1065 :
1066 9394049 : if (pergroup)
1067 : {
1068 : /* advance transition states for ordered grouping */
1069 :
1070 18597476 : for (setno = 0; setno < numGroupingSets; setno++)
1071 : {
1072 : AggStatePerGroup pergroupstate;
1073 :
1074 9304256 : select_current_set(aggstate, setno, false);
1075 :
1076 9304256 : pergroupstate = &pergroup[transno + (setno * numTrans)];
1077 :
1078 9304256 : advance_transition_function(aggstate, pertrans, pergroupstate);
1079 : }
1080 : }
1081 :
1082 9394042 : if (pergroups)
1083 : {
1084 : /* advance transition states for hashed grouping */
1085 :
1086 219984 : for (setno = 0; setno < numHashes; setno++)
1087 : {
1088 : AggStatePerGroup pergroupstate;
1089 :
1090 115085 : select_current_set(aggstate, setno, true);
1091 :
1092 115085 : pergroupstate = &pergroups[setno][transno];
1093 :
1094 115085 : advance_transition_function(aggstate, pertrans, pergroupstate);
1095 : }
1096 : }
1097 : }
1098 : }
1099 9534940 : }
1100 :
1101 : /*
1102 : * combine_aggregates replaces advance_aggregates in DO_AGGSPLIT_COMBINE
1103 : * mode. The principal difference is that here we may need to apply the
1104 : * deserialization function before running the transfn (which, in this mode,
1105 : * is actually the aggregate's combinefn). Also, we know we don't need to
1106 : * handle FILTER, DISTINCT, ORDER BY, or grouping sets.
1107 : */
1108 : static void
1109 90 : combine_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
1110 : {
1111 : int transno;
1112 90 : int numTrans = aggstate->numtrans;
1113 : TupleTableSlot *slot;
1114 :
1115 : /* combine not supported with grouping sets */
1116 90 : Assert(aggstate->phase->numsets <= 1);
1117 :
1118 : /* compute input for all aggregates */
1119 90 : slot = ExecProject(aggstate->evalproj);
1120 :
1121 179 : for (transno = 0; transno < numTrans; transno++)
1122 : {
1123 89 : AggStatePerTrans pertrans = &aggstate->pertrans[transno];
1124 89 : AggStatePerGroup pergroupstate = &pergroup[transno];
1125 89 : FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo;
1126 89 : int inputoff = pertrans->inputoff;
1127 :
1128 89 : Assert(slot->tts_nvalid > inputoff);
1129 :
1130 : /*
1131 : * deserialfn_oid will be set if we must deserialize the input state
1132 : * before calling the combine function
1133 : */
1134 89 : if (OidIsValid(pertrans->deserialfn_oid))
1135 : {
1136 : /* Don't call a strict deserialization function with NULL input */
1137 0 : if (pertrans->deserialfn.fn_strict && slot->tts_isnull[inputoff])
1138 : {
1139 0 : fcinfo->arg[1] = slot->tts_values[inputoff];
1140 0 : fcinfo->argnull[1] = slot->tts_isnull[inputoff];
1141 : }
1142 : else
1143 : {
1144 0 : FunctionCallInfo dsinfo = &pertrans->deserialfn_fcinfo;
1145 : MemoryContext oldContext;
1146 :
1147 0 : dsinfo->arg[0] = slot->tts_values[inputoff];
1148 0 : dsinfo->argnull[0] = slot->tts_isnull[inputoff];
1149 : /* Dummy second argument for type-safety reasons */
1150 0 : dsinfo->arg[1] = PointerGetDatum(NULL);
1151 0 : dsinfo->argnull[1] = false;
1152 :
1153 : /*
1154 : * We run the deserialization functions in per-input-tuple
1155 : * memory context.
1156 : */
1157 0 : oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory);
1158 :
1159 0 : fcinfo->arg[1] = FunctionCallInvoke(dsinfo);
1160 0 : fcinfo->argnull[1] = dsinfo->isnull;
1161 :
1162 0 : MemoryContextSwitchTo(oldContext);
1163 : }
1164 : }
1165 : else
1166 : {
1167 89 : fcinfo->arg[1] = slot->tts_values[inputoff];
1168 89 : fcinfo->argnull[1] = slot->tts_isnull[inputoff];
1169 : }
1170 :
1171 89 : advance_combine_function(aggstate, pertrans, pergroupstate);
1172 : }
1173 90 : }
1174 :
1175 : /*
1176 : * Perform combination of states between 2 aggregate states. Effectively this
1177 : * 'adds' two states together by whichever logic is defined in the aggregate
1178 : * function's combine function.
1179 : *
1180 : * Note that in this case transfn is set to the combination function. This
1181 : * perhaps should be changed to avoid confusion, but one field is ok for now
1182 : * as they'll never be needed at the same time.
1183 : */
1184 : static void
1185 89 : advance_combine_function(AggState *aggstate,
1186 : AggStatePerTrans pertrans,
1187 : AggStatePerGroup pergroupstate)
1188 : {
1189 89 : FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo;
1190 : MemoryContext oldContext;
1191 : Datum newVal;
1192 :
1193 89 : if (pertrans->transfn.fn_strict)
1194 : {
1195 : /* if we're asked to merge to a NULL state, then do nothing */
1196 89 : if (fcinfo->argnull[1])
1197 0 : return;
1198 :
1199 89 : if (pergroupstate->noTransValue)
1200 : {
1201 : /*
1202 : * transValue has not yet been initialized. If pass-by-ref
1203 : * datatype we must copy the combining state value into
1204 : * aggcontext.
1205 : */
1206 0 : if (!pertrans->transtypeByVal)
1207 : {
1208 0 : oldContext = MemoryContextSwitchTo(
1209 0 : aggstate->curaggcontext->ecxt_per_tuple_memory);
1210 0 : pergroupstate->transValue = datumCopy(fcinfo->arg[1],
1211 0 : pertrans->transtypeByVal,
1212 0 : pertrans->transtypeLen);
1213 0 : MemoryContextSwitchTo(oldContext);
1214 : }
1215 : else
1216 0 : pergroupstate->transValue = fcinfo->arg[1];
1217 :
1218 0 : pergroupstate->transValueIsNull = false;
1219 0 : pergroupstate->noTransValue = false;
1220 0 : return;
1221 : }
1222 : }
1223 :
1224 : /* We run the combine functions in per-input-tuple memory context */
1225 89 : oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory);
1226 :
1227 : /* set up aggstate->curpertrans for AggGetAggref() */
1228 89 : aggstate->curpertrans = pertrans;
1229 :
1230 : /*
1231 : * OK to call the combine function
1232 : */
1233 89 : fcinfo->arg[0] = pergroupstate->transValue;
1234 89 : fcinfo->argnull[0] = pergroupstate->transValueIsNull;
1235 89 : fcinfo->isnull = false; /* just in case combine func doesn't set it */
1236 :
1237 89 : newVal = FunctionCallInvoke(fcinfo);
1238 :
1239 89 : aggstate->curpertrans = NULL;
1240 :
1241 : /*
1242 : * If pass-by-ref datatype, must copy the new value into aggcontext and
1243 : * free the prior transValue. But if the combine function returned a
1244 : * pointer to its first input, we don't need to do anything. Also, if the
1245 : * combine function returned a pointer to a R/W expanded object that is
1246 : * already a child of the aggcontext, assume we can adopt that value
1247 : * without copying it.
1248 : */
1249 178 : if (!pertrans->transtypeByVal &&
1250 89 : DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
1251 : {
1252 89 : if (!fcinfo->isnull)
1253 : {
1254 89 : MemoryContextSwitchTo(aggstate->curaggcontext->ecxt_per_tuple_memory);
1255 89 : if (DatumIsReadWriteExpandedObject(newVal,
1256 : false,
1257 0 : pertrans->transtypeLen) &&
1258 0 : MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
1259 : /* do nothing */ ;
1260 : else
1261 178 : newVal = datumCopy(newVal,
1262 89 : pertrans->transtypeByVal,
1263 89 : pertrans->transtypeLen);
1264 : }
1265 89 : if (!pergroupstate->transValueIsNull)
1266 : {
1267 89 : if (DatumIsReadWriteExpandedObject(pergroupstate->transValue,
1268 : false,
1269 : pertrans->transtypeLen))
1270 0 : DeleteExpandedObject(pergroupstate->transValue);
1271 : else
1272 89 : pfree(DatumGetPointer(pergroupstate->transValue));
1273 : }
1274 : }
1275 :
1276 89 : pergroupstate->transValue = newVal;
1277 89 : pergroupstate->transValueIsNull = fcinfo->isnull;
1278 :
1279 89 : MemoryContextSwitchTo(oldContext);
1280 : }
1281 :
1282 :
1283 : /*
1284 : * Run the transition function for a DISTINCT or ORDER BY aggregate
1285 : * with only one input. This is called after we have completed
1286 : * entering all the input values into the sort object. We complete the
1287 : * sort, read out the values in sorted order, and run the transition
1288 : * function on each value (applying DISTINCT if appropriate).
1289 : *
1290 : * Note that the strictness of the transition function was checked when
1291 : * entering the values into the sort, so we don't check it again here;
1292 : * we just apply standard SQL DISTINCT logic.
1293 : *
1294 : * The one-input case is handled separately from the multi-input case
1295 : * for performance reasons: for single by-value inputs, such as the
1296 : * common case of count(distinct id), the tuplesort_getdatum code path
1297 : * is around 300% faster. (The speedup for by-reference types is less
1298 : * but still noticeable.)
1299 : *
1300 : * This function handles only one grouping set (already set in
1301 : * aggstate->current_set).
1302 : *
1303 : * When called, CurrentMemoryContext should be the per-query context.
1304 : */
1305 : static void
1306 124 : process_ordered_aggregate_single(AggState *aggstate,
1307 : AggStatePerTrans pertrans,
1308 : AggStatePerGroup pergroupstate)
1309 : {
1310 124 : Datum oldVal = (Datum) 0;
1311 124 : bool oldIsNull = true;
1312 124 : bool haveOldVal = false;
1313 124 : MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory;
1314 : MemoryContext oldContext;
1315 124 : bool isDistinct = (pertrans->numDistinctCols > 0);
1316 124 : Datum newAbbrevVal = (Datum) 0;
1317 124 : Datum oldAbbrevVal = (Datum) 0;
1318 124 : FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo;
1319 : Datum *newVal;
1320 : bool *isNull;
1321 :
1322 124 : Assert(pertrans->numDistinctCols < 2);
1323 :
1324 124 : tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
1325 :
1326 : /* Load the column into argument 1 (arg 0 will be transition value) */
1327 124 : newVal = fcinfo->arg + 1;
1328 124 : isNull = fcinfo->argnull + 1;
1329 :
1330 : /*
1331 : * Note: if input type is pass-by-ref, the datums returned by the sort are
1332 : * freshly palloc'd in the per-query context, so we must be careful to
1333 : * pfree them when they are no longer needed.
1334 : */
1335 :
1336 8737 : while (tuplesort_getdatum(pertrans->sortstates[aggstate->current_set],
1337 : true, newVal, isNull, &newAbbrevVal))
1338 : {
1339 : /*
1340 : * Clear and select the working context for evaluation of the equality
1341 : * function and transition function.
1342 : */
1343 8489 : MemoryContextReset(workcontext);
1344 8489 : oldContext = MemoryContextSwitchTo(workcontext);
1345 :
1346 : /*
1347 : * If DISTINCT mode, and not distinct from prior, skip it.
1348 : *
1349 : * Note: we assume equality functions don't care about collation.
1350 : */
1351 8489 : if (isDistinct &&
1352 8360 : haveOldVal &&
1353 1 : ((oldIsNull && *isNull) ||
1354 16715 : (!oldIsNull && !*isNull &&
1355 16712 : oldAbbrevVal == newAbbrevVal &&
1356 8356 : DatumGetBool(FunctionCall2(&pertrans->equalfns[0],
1357 : oldVal, *newVal)))))
1358 : {
1359 : /* equal to prior, so forget this one */
1360 14470 : if (!pertrans->inputtypeByVal && !*isNull)
1361 1150 : pfree(DatumGetPointer(*newVal));
1362 : }
1363 : else
1364 : {
1365 1254 : advance_transition_function(aggstate, pertrans, pergroupstate);
1366 : /* forget the old value, if any */
1367 1254 : if (!oldIsNull && !pertrans->inputtypeByVal)
1368 1062 : pfree(DatumGetPointer(oldVal));
1369 : /* and remember the new one for subsequent equality checks */
1370 1254 : oldVal = *newVal;
1371 1254 : oldAbbrevVal = newAbbrevVal;
1372 1254 : oldIsNull = *isNull;
1373 1254 : haveOldVal = true;
1374 : }
1375 :
1376 8489 : MemoryContextSwitchTo(oldContext);
1377 : }
1378 :
1379 124 : if (!oldIsNull && !pertrans->inputtypeByVal)
1380 10 : pfree(DatumGetPointer(oldVal));
1381 :
1382 124 : tuplesort_end(pertrans->sortstates[aggstate->current_set]);
1383 124 : pertrans->sortstates[aggstate->current_set] = NULL;
1384 124 : }
1385 :
1386 : /*
1387 : * Run the transition function for a DISTINCT or ORDER BY aggregate
1388 : * with more than one input. This is called after we have completed
1389 : * entering all the input values into the sort object. We complete the
1390 : * sort, read out the values in sorted order, and run the transition
1391 : * function on each value (applying DISTINCT if appropriate).
1392 : *
1393 : * This function handles only one grouping set (already set in
1394 : * aggstate->current_set).
1395 : *
1396 : * When called, CurrentMemoryContext should be the per-query context.
1397 : */
1398 : static void
1399 80 : process_ordered_aggregate_multi(AggState *aggstate,
1400 : AggStatePerTrans pertrans,
1401 : AggStatePerGroup pergroupstate)
1402 : {
1403 80 : MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory;
1404 80 : FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo;
1405 80 : TupleTableSlot *slot1 = pertrans->sortslot;
1406 80 : TupleTableSlot *slot2 = pertrans->uniqslot;
1407 80 : int numTransInputs = pertrans->numTransInputs;
1408 80 : int numDistinctCols = pertrans->numDistinctCols;
1409 80 : Datum newAbbrevVal = (Datum) 0;
1410 80 : Datum oldAbbrevVal = (Datum) 0;
1411 80 : bool haveOldValue = false;
1412 : int i;
1413 :
1414 80 : tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
1415 :
1416 80 : ExecClearTuple(slot1);
1417 80 : if (slot2)
1418 14 : ExecClearTuple(slot2);
1419 :
1420 417 : while (tuplesort_gettupleslot(pertrans->sortstates[aggstate->current_set],
1421 : true, true, slot1, &newAbbrevVal))
1422 : {
1423 257 : CHECK_FOR_INTERRUPTS();
1424 :
1425 : /*
1426 : * Extract the first numTransInputs columns as datums to pass to the
1427 : * transfn. (This will help execTuplesMatch too, so we do it
1428 : * immediately.)
1429 : */
1430 257 : slot_getsomeattrs(slot1, numTransInputs);
1431 :
1432 257 : if (numDistinctCols == 0 ||
1433 104 : !haveOldValue ||
1434 202 : newAbbrevVal != oldAbbrevVal ||
1435 98 : !execTuplesMatch(slot1, slot2,
1436 : numDistinctCols,
1437 : pertrans->sortColIdx,
1438 : pertrans->equalfns,
1439 : workcontext))
1440 : {
1441 : /* Load values into fcinfo */
1442 : /* Start from 1, since the 0th arg will be the transition value */
1443 599 : for (i = 0; i < numTransInputs; i++)
1444 : {
1445 410 : fcinfo->arg[i + 1] = slot1->tts_values[i];
1446 410 : fcinfo->argnull[i + 1] = slot1->tts_isnull[i];
1447 : }
1448 :
1449 189 : advance_transition_function(aggstate, pertrans, pergroupstate);
1450 :
1451 189 : if (numDistinctCols > 0)
1452 : {
1453 : /* swap the slot pointers to retain the current tuple */
1454 50 : TupleTableSlot *tmpslot = slot2;
1455 :
1456 50 : slot2 = slot1;
1457 50 : slot1 = tmpslot;
1458 : /* avoid execTuplesMatch() calls by reusing abbreviated keys */
1459 50 : oldAbbrevVal = newAbbrevVal;
1460 50 : haveOldValue = true;
1461 : }
1462 : }
1463 :
1464 : /* Reset context each time, unless execTuplesMatch did it for us */
1465 257 : if (numDistinctCols == 0)
1466 139 : MemoryContextReset(workcontext);
1467 :
1468 257 : ExecClearTuple(slot1);
1469 : }
1470 :
1471 80 : if (slot2)
1472 14 : ExecClearTuple(slot2);
1473 :
1474 80 : tuplesort_end(pertrans->sortstates[aggstate->current_set]);
1475 80 : pertrans->sortstates[aggstate->current_set] = NULL;
1476 80 : }
1477 :
1478 : /*
1479 : * Compute the final value of one aggregate.
1480 : *
1481 : * This function handles only one grouping set (already set in
1482 : * aggstate->current_set).
1483 : *
1484 : * The finalfunction will be run, and the result delivered, in the
1485 : * output-tuple context; caller's CurrentMemoryContext does not matter.
1486 : *
1487 : * The finalfn uses the state as set in the transno. This also might be
1488 : * being used by another aggregate function, so it's important that we do
1489 : * nothing destructive here.
1490 : */
1491 : static void
1492 4314 : finalize_aggregate(AggState *aggstate,
1493 : AggStatePerAgg peragg,
1494 : AggStatePerGroup pergroupstate,
1495 : Datum *resultVal, bool *resultIsNull)
1496 : {
1497 : FunctionCallInfoData fcinfo;
1498 4314 : bool anynull = false;
1499 : MemoryContext oldContext;
1500 : int i;
1501 : ListCell *lc;
1502 4314 : AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
1503 :
1504 4314 : oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
1505 :
1506 : /*
1507 : * Evaluate any direct arguments. We do this even if there's no finalfn
1508 : * (which is unlikely anyway), so that side-effects happen as expected.
1509 : * The direct arguments go into arg positions 1 and up, leaving position 0
1510 : * for the transition state value.
1511 : */
1512 4314 : i = 1;
1513 4446 : foreach(lc, pertrans->aggdirectargs)
1514 : {
1515 132 : ExprState *expr = (ExprState *) lfirst(lc);
1516 :
1517 132 : fcinfo.arg[i] = ExecEvalExpr(expr,
1518 : aggstate->ss.ps.ps_ExprContext,
1519 : &fcinfo.argnull[i]);
1520 132 : anynull |= fcinfo.argnull[i];
1521 132 : i++;
1522 : }
1523 :
1524 : /*
1525 : * Apply the agg's finalfn if one is provided, else return transValue.
1526 : */
1527 4314 : if (OidIsValid(peragg->finalfn_oid))
1528 : {
1529 1201 : int numFinalArgs = peragg->numFinalArgs;
1530 :
1531 : /* set up aggstate->curpertrans for AggGetAggref() */
1532 1201 : aggstate->curpertrans = pertrans;
1533 :
1534 1201 : InitFunctionCallInfoData(fcinfo, &peragg->finalfn,
1535 : numFinalArgs,
1536 : pertrans->aggCollation,
1537 : (void *) aggstate, NULL);
1538 :
1539 : /* Fill in the transition state value */
1540 1201 : fcinfo.arg[0] = MakeExpandedObjectReadOnly(pergroupstate->transValue,
1541 : pergroupstate->transValueIsNull,
1542 : pertrans->transtypeLen);
1543 1201 : fcinfo.argnull[0] = pergroupstate->transValueIsNull;
1544 1201 : anynull |= pergroupstate->transValueIsNull;
1545 :
1546 : /* Fill any remaining argument positions with nulls */
1547 2127 : for (; i < numFinalArgs; i++)
1548 : {
1549 926 : fcinfo.arg[i] = (Datum) 0;
1550 926 : fcinfo.argnull[i] = true;
1551 926 : anynull = true;
1552 : }
1553 :
1554 1201 : if (fcinfo.flinfo->fn_strict && anynull)
1555 : {
1556 : /* don't call a strict function with NULL inputs */
1557 0 : *resultVal = (Datum) 0;
1558 0 : *resultIsNull = true;
1559 : }
1560 : else
1561 : {
1562 1201 : *resultVal = FunctionCallInvoke(&fcinfo);
1563 1201 : *resultIsNull = fcinfo.isnull;
1564 : }
1565 1201 : aggstate->curpertrans = NULL;
1566 : }
1567 : else
1568 : {
1569 : /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
1570 3113 : *resultVal = pergroupstate->transValue;
1571 3113 : *resultIsNull = pergroupstate->transValueIsNull;
1572 : }
1573 :
1574 : /*
1575 : * If result is pass-by-ref, make sure it is in the right context.
1576 : */
1577 8188 : if (!peragg->resulttypeByVal && !*resultIsNull &&
1578 3874 : !MemoryContextContains(CurrentMemoryContext,
1579 3874 : DatumGetPointer(*resultVal)))
1580 5514 : *resultVal = datumCopy(*resultVal,
1581 2757 : peragg->resulttypeByVal,
1582 2757 : peragg->resulttypeLen);
1583 :
1584 4314 : MemoryContextSwitchTo(oldContext);
1585 4314 : }
1586 :
1587 : /*
1588 : * Compute the output value of one partial aggregate.
1589 : *
1590 : * The serialization function will be run, and the result delivered, in the
1591 : * output-tuple context; caller's CurrentMemoryContext does not matter.
1592 : */
1593 : static void
1594 89 : finalize_partialaggregate(AggState *aggstate,
1595 : AggStatePerAgg peragg,
1596 : AggStatePerGroup pergroupstate,
1597 : Datum *resultVal, bool *resultIsNull)
1598 : {
1599 89 : AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
1600 : MemoryContext oldContext;
1601 :
1602 89 : oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
1603 :
1604 : /*
1605 : * serialfn_oid will be set if we must serialize the transvalue before
1606 : * returning it
1607 : */
1608 89 : if (OidIsValid(pertrans->serialfn_oid))
1609 : {
1610 : /* Don't call a strict serialization function with NULL input. */
1611 0 : if (pertrans->serialfn.fn_strict && pergroupstate->transValueIsNull)
1612 : {
1613 0 : *resultVal = (Datum) 0;
1614 0 : *resultIsNull = true;
1615 : }
1616 : else
1617 : {
1618 0 : FunctionCallInfo fcinfo = &pertrans->serialfn_fcinfo;
1619 :
1620 0 : fcinfo->arg[0] = MakeExpandedObjectReadOnly(pergroupstate->transValue,
1621 : pergroupstate->transValueIsNull,
1622 : pertrans->transtypeLen);
1623 0 : fcinfo->argnull[0] = pergroupstate->transValueIsNull;
1624 :
1625 0 : *resultVal = FunctionCallInvoke(fcinfo);
1626 0 : *resultIsNull = fcinfo->isnull;
1627 : }
1628 : }
1629 : else
1630 : {
1631 : /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
1632 89 : *resultVal = pergroupstate->transValue;
1633 89 : *resultIsNull = pergroupstate->transValueIsNull;
1634 : }
1635 :
1636 : /* If result is pass-by-ref, make sure it is in the right context. */
1637 178 : if (!peragg->resulttypeByVal && !*resultIsNull &&
1638 89 : !MemoryContextContains(CurrentMemoryContext,
1639 89 : DatumGetPointer(*resultVal)))
1640 178 : *resultVal = datumCopy(*resultVal,
1641 89 : peragg->resulttypeByVal,
1642 89 : peragg->resulttypeLen);
1643 :
1644 89 : MemoryContextSwitchTo(oldContext);
1645 89 : }
1646 :
1647 : /*
1648 : * Prepare to finalize and project based on the specified representative tuple
1649 : * slot and grouping set.
1650 : *
1651 : * In the specified tuple slot, force to null all attributes that should be
1652 : * read as null in the context of the current grouping set. Also stash the
1653 : * current group bitmap where GroupingExpr can get at it.
1654 : *
1655 : * This relies on three conditions:
1656 : *
1657 : * 1) Nothing is ever going to try and extract the whole tuple from this slot,
1658 : * only reference it in evaluations, which will only access individual
1659 : * attributes.
1660 : *
1661 : * 2) No system columns are going to need to be nulled. (If a system column is
1662 : * referenced in a group clause, it is actually projected in the outer plan
1663 : * tlist.)
1664 : *
1665 : * 3) Within a given phase, we never need to recover the value of an attribute
1666 : * once it has been set to null.
1667 : *
1668 : * Poking into the slot this way is a bit ugly, but the consensus is that the
1669 : * alternative was worse.
1670 : */
1671 : static void
1672 7682 : prepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet)
1673 : {
1674 7682 : if (aggstate->phase->grouped_cols)
1675 : {
1676 5410 : Bitmapset *grouped_cols = aggstate->phase->grouped_cols[currentSet];
1677 :
1678 5410 : aggstate->grouped_cols = grouped_cols;
1679 :
1680 5410 : if (slot->tts_isempty)
1681 : {
1682 : /*
1683 : * Force all values to be NULL if working on an empty input tuple
1684 : * (i.e. an empty grouping set for which no input rows were
1685 : * supplied).
1686 : */
1687 8 : ExecStoreAllNullTuple(slot);
1688 : }
1689 5402 : else if (aggstate->all_grouped_cols)
1690 : {
1691 : ListCell *lc;
1692 :
1693 : /* all_grouped_cols is arranged in desc order */
1694 5394 : slot_getsomeattrs(slot, linitial_int(aggstate->all_grouped_cols));
1695 :
1696 12128 : foreach(lc, aggstate->all_grouped_cols)
1697 : {
1698 6734 : int attnum = lfirst_int(lc);
1699 :
1700 6734 : if (!bms_is_member(attnum, grouped_cols))
1701 641 : slot->tts_isnull[attnum - 1] = true;
1702 : }
1703 : }
1704 : }
1705 7682 : }
1706 :
1707 : /*
1708 : * Compute the final value of all aggregates for one group.
1709 : *
1710 : * This function handles only one grouping set at a time, which the caller must
1711 : * have selected. It's also the caller's responsibility to adjust the supplied
1712 : * pergroup parameter to point to the current set's transvalues.
1713 : *
1714 : * Results are stored in the output econtext aggvalues/aggnulls.
1715 : */
1716 : static void
1717 7682 : finalize_aggregates(AggState *aggstate,
1718 : AggStatePerAgg peraggs,
1719 : AggStatePerGroup pergroup)
1720 : {
1721 7682 : ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
1722 7682 : Datum *aggvalues = econtext->ecxt_aggvalues;
1723 7682 : bool *aggnulls = econtext->ecxt_aggnulls;
1724 : int aggno;
1725 : int transno;
1726 :
1727 : /*
1728 : * If there were any DISTINCT and/or ORDER BY aggregates, sort their
1729 : * inputs and run the transition functions.
1730 : */
1731 12078 : for (transno = 0; transno < aggstate->numtrans; transno++)
1732 : {
1733 4396 : AggStatePerTrans pertrans = &aggstate->pertrans[transno];
1734 : AggStatePerGroup pergroupstate;
1735 :
1736 4396 : pergroupstate = &pergroup[transno];
1737 :
1738 4396 : if (pertrans->numSortCols > 0)
1739 : {
1740 204 : Assert(aggstate->aggstrategy != AGG_HASHED &&
1741 : aggstate->aggstrategy != AGG_MIXED);
1742 :
1743 204 : if (pertrans->numInputs == 1)
1744 124 : process_ordered_aggregate_single(aggstate,
1745 : pertrans,
1746 : pergroupstate);
1747 : else
1748 80 : process_ordered_aggregate_multi(aggstate,
1749 : pertrans,
1750 : pergroupstate);
1751 : }
1752 : }
1753 :
1754 : /*
1755 : * Run the final functions.
1756 : */
1757 12085 : for (aggno = 0; aggno < aggstate->numaggs; aggno++)
1758 : {
1759 4403 : AggStatePerAgg peragg = &peraggs[aggno];
1760 4403 : int transno = peragg->transno;
1761 : AggStatePerGroup pergroupstate;
1762 :
1763 4403 : pergroupstate = &pergroup[transno];
1764 :
1765 4403 : if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
1766 178 : finalize_partialaggregate(aggstate, peragg, pergroupstate,
1767 89 : &aggvalues[aggno], &aggnulls[aggno]);
1768 : else
1769 8628 : finalize_aggregate(aggstate, peragg, pergroupstate,
1770 4314 : &aggvalues[aggno], &aggnulls[aggno]);
1771 : }
1772 7682 : }
1773 :
1774 : /*
1775 : * Project the result of a group (whose aggs have already been calculated by
1776 : * finalize_aggregates). Returns the result slot, or NULL if no row is
1777 : * projected (suppressed by qual).
1778 : */
1779 : static TupleTableSlot *
1780 7682 : project_aggregates(AggState *aggstate)
1781 : {
1782 7682 : ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
1783 :
1784 : /*
1785 : * Check the qual (HAVING clause); if the group does not match, ignore it.
1786 : */
1787 7682 : if (ExecQual(aggstate->ss.ps.qual, econtext))
1788 : {
1789 : /*
1790 : * Form and return projection tuple using the aggregate results and
1791 : * the representative input tuple.
1792 : */
1793 7616 : return ExecProject(aggstate->ss.ps.ps_ProjInfo);
1794 : }
1795 : else
1796 66 : InstrCountFiltered1(aggstate, 1);
1797 :
1798 66 : return NULL;
1799 : }
1800 :
1801 : /*
1802 : * find_unaggregated_cols
1803 : * Construct a bitmapset of the column numbers of un-aggregated Vars
1804 : * appearing in our targetlist and qual (HAVING clause)
1805 : */
1806 : static Bitmapset *
1807 282 : find_unaggregated_cols(AggState *aggstate)
1808 : {
1809 282 : Agg *node = (Agg *) aggstate->ss.ps.plan;
1810 : Bitmapset *colnos;
1811 :
1812 282 : colnos = NULL;
1813 282 : (void) find_unaggregated_cols_walker((Node *) node->plan.targetlist,
1814 : &colnos);
1815 282 : (void) find_unaggregated_cols_walker((Node *) node->plan.qual,
1816 : &colnos);
1817 282 : return colnos;
1818 : }
1819 :
1820 : static bool
1821 2228 : find_unaggregated_cols_walker(Node *node, Bitmapset **colnos)
1822 : {
1823 2228 : if (node == NULL)
1824 274 : return false;
1825 1954 : if (IsA(node, Var))
1826 : {
1827 462 : Var *var = (Var *) node;
1828 :
1829 : /* setrefs.c should have set the varno to OUTER_VAR */
1830 462 : Assert(var->varno == OUTER_VAR);
1831 462 : Assert(var->varlevelsup == 0);
1832 462 : *colnos = bms_add_member(*colnos, var->varattno);
1833 462 : return false;
1834 : }
1835 1492 : if (IsA(node, Aggref) ||IsA(node, GroupingFunc))
1836 : {
1837 : /* do not descend into aggregate exprs */
1838 288 : return false;
1839 : }
1840 1204 : return expression_tree_walker(node, find_unaggregated_cols_walker,
1841 : (void *) colnos);
1842 : }
1843 :
1844 : /*
1845 : * Initialize the hash table(s) to empty.
1846 : *
1847 : * To implement hashed aggregation, we need a hashtable that stores a
1848 : * representative tuple and an array of AggStatePerGroup structs for each
1849 : * distinct set of GROUP BY column values. We compute the hash key from the
1850 : * GROUP BY columns. The per-group data is allocated in lookup_hash_entry(),
1851 : * for each entry.
1852 : *
1853 : * We have a separate hashtable and associated perhash data structure for each
1854 : * grouping set for which we're doing hashing.
1855 : *
1856 : * The hash tables always live in the hashcontext's per-tuple memory context
1857 : * (there is only one of these for all tables together, since they are all
1858 : * reset at the same time).
1859 : */
1860 : static void
1861 400 : build_hash_table(AggState *aggstate)
1862 : {
1863 400 : MemoryContext tmpmem = aggstate->tmpcontext->ecxt_per_tuple_memory;
1864 : Size additionalsize;
1865 : int i;
1866 :
1867 400 : Assert(aggstate->aggstrategy == AGG_HASHED || aggstate->aggstrategy == AGG_MIXED);
1868 :
1869 400 : additionalsize = aggstate->numtrans * sizeof(AggStatePerGroupData);
1870 :
1871 846 : for (i = 0; i < aggstate->num_hashes; ++i)
1872 : {
1873 446 : AggStatePerHash perhash = &aggstate->perhash[i];
1874 :
1875 446 : Assert(perhash->aggnode->numGroups > 0);
1876 :
1877 1338 : perhash->hashtable = BuildTupleHashTable(perhash->numCols,
1878 : perhash->hashGrpColIdxHash,
1879 : perhash->eqfunctions,
1880 : perhash->hashfunctions,
1881 446 : perhash->aggnode->numGroups,
1882 : additionalsize,
1883 446 : aggstate->hashcontext->ecxt_per_tuple_memory,
1884 : tmpmem,
1885 446 : DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
1886 : }
1887 400 : }
1888 :
1889 : /*
1890 : * Compute columns that actually need to be stored in hashtable entries. The
1891 : * incoming tuples from the child plan node will contain grouping columns,
1892 : * other columns referenced in our targetlist and qual, columns used to
1893 : * compute the aggregate functions, and perhaps just junk columns we don't use
1894 : * at all. Only columns of the first two types need to be stored in the
1895 : * hashtable, and getting rid of the others can make the table entries
1896 : * significantly smaller. The hashtable only contains the relevant columns,
1897 : * and is packed/unpacked in lookup_hash_entry() / agg_retrieve_hash_table()
1898 : * into the format of the normal input descriptor.
1899 : *
1900 : * Additional columns, in addition to the columns grouped by, come from two
1901 : * sources: Firstly functionally dependent columns that we don't need to group
1902 : * by themselves, and secondly ctids for row-marks.
1903 : *
1904 : * To eliminate duplicates, we build a bitmapset of the needed columns, and
1905 : * then build an array of the columns included in the hashtable. Note that
1906 : * the array is preserved over ExecReScanAgg, so we allocate it in the
1907 : * per-query context (unlike the hash table itself).
1908 : */
1909 : static void
1910 282 : find_hash_columns(AggState *aggstate)
1911 : {
1912 : Bitmapset *base_colnos;
1913 282 : List *outerTlist = outerPlanState(aggstate)->plan->targetlist;
1914 282 : int numHashes = aggstate->num_hashes;
1915 : int j;
1916 :
1917 : /* Find Vars that will be needed in tlist and qual */
1918 282 : base_colnos = find_unaggregated_cols(aggstate);
1919 :
1920 604 : for (j = 0; j < numHashes; ++j)
1921 : {
1922 322 : AggStatePerHash perhash = &aggstate->perhash[j];
1923 322 : Bitmapset *colnos = bms_copy(base_colnos);
1924 322 : AttrNumber *grpColIdx = perhash->aggnode->grpColIdx;
1925 322 : List *hashTlist = NIL;
1926 : TupleDesc hashDesc;
1927 : int i;
1928 :
1929 322 : perhash->largestGrpColIdx = 0;
1930 :
1931 : /*
1932 : * If we're doing grouping sets, then some Vars might be referenced in
1933 : * tlist/qual for the benefit of other grouping sets, but not needed
1934 : * when hashing; i.e. prepare_projection_slot will null them out, so
1935 : * there'd be no point storing them. Use prepare_projection_slot's
1936 : * logic to determine which.
1937 : */
1938 322 : if (aggstate->phases[0].grouped_cols)
1939 : {
1940 322 : Bitmapset *grouped_cols = aggstate->phases[0].grouped_cols[j];
1941 : ListCell *lc;
1942 :
1943 876 : foreach(lc, aggstate->all_grouped_cols)
1944 : {
1945 554 : int attnum = lfirst_int(lc);
1946 :
1947 554 : if (!bms_is_member(attnum, grouped_cols))
1948 132 : colnos = bms_del_member(colnos, attnum);
1949 : }
1950 : }
1951 : /* Add in all the grouping columns */
1952 744 : for (i = 0; i < perhash->numCols; i++)
1953 422 : colnos = bms_add_member(colnos, grpColIdx[i]);
1954 :
1955 322 : perhash->hashGrpColIdxInput =
1956 322 : palloc(bms_num_members(colnos) * sizeof(AttrNumber));
1957 322 : perhash->hashGrpColIdxHash =
1958 322 : palloc(perhash->numCols * sizeof(AttrNumber));
1959 :
1960 : /*
1961 : * First build mapping for columns directly hashed. These are the
1962 : * first, because they'll be accessed when computing hash values and
1963 : * comparing tuples for exact matches. We also build simple mapping
1964 : * for execGrouping, so it knows where to find the to-be-hashed /
1965 : * compared columns in the input.
1966 : */
1967 744 : for (i = 0; i < perhash->numCols; i++)
1968 : {
1969 422 : perhash->hashGrpColIdxInput[i] = grpColIdx[i];
1970 422 : perhash->hashGrpColIdxHash[i] = i + 1;
1971 422 : perhash->numhashGrpCols++;
1972 : /* delete already mapped columns */
1973 422 : bms_del_member(colnos, grpColIdx[i]);
1974 : }
1975 :
1976 : /* and add the remaining columns */
1977 690 : while ((i = bms_first_member(colnos)) >= 0)
1978 : {
1979 46 : perhash->hashGrpColIdxInput[perhash->numhashGrpCols] = i;
1980 46 : perhash->numhashGrpCols++;
1981 : }
1982 :
1983 : /* and build a tuple descriptor for the hashtable */
1984 790 : for (i = 0; i < perhash->numhashGrpCols; i++)
1985 : {
1986 468 : int varNumber = perhash->hashGrpColIdxInput[i] - 1;
1987 :
1988 468 : hashTlist = lappend(hashTlist, list_nth(outerTlist, varNumber));
1989 468 : perhash->largestGrpColIdx =
1990 468 : Max(varNumber + 1, perhash->largestGrpColIdx);
1991 : }
1992 :
1993 322 : hashDesc = ExecTypeFromTL(hashTlist, false);
1994 322 : ExecSetSlotDescriptor(perhash->hashslot, hashDesc);
1995 :
1996 322 : list_free(hashTlist);
1997 322 : bms_free(colnos);
1998 : }
1999 :
2000 282 : bms_free(base_colnos);
2001 282 : }
2002 :
2003 : /*
2004 : * Estimate per-hash-table-entry overhead for the planner.
2005 : *
2006 : * Note that the estimate does not include space for pass-by-reference
2007 : * transition data values, nor for the representative tuple of each group.
2008 : * Nor does this account of the target fill-factor and growth policy of the
2009 : * hash table.
2010 : */
2011 : Size
2012 512 : hash_agg_entry_size(int numAggs)
2013 : {
2014 : Size entrysize;
2015 :
2016 : /* This must match build_hash_table */
2017 512 : entrysize = sizeof(TupleHashEntryData) +
2018 : numAggs * sizeof(AggStatePerGroupData);
2019 512 : entrysize = MAXALIGN(entrysize);
2020 :
2021 512 : return entrysize;
2022 : }
2023 :
2024 : /*
2025 : * Find or create a hashtable entry for the tuple group containing the current
2026 : * tuple (already set in tmpcontext's outertuple slot), in the current grouping
2027 : * set (which the caller must have selected - note that initialize_aggregate
2028 : * depends on this).
2029 : *
2030 : * When called, CurrentMemoryContext should be the per-query context.
2031 : */
2032 : static TupleHashEntryData *
2033 232060 : lookup_hash_entry(AggState *aggstate)
2034 : {
2035 232060 : TupleTableSlot *inputslot = aggstate->tmpcontext->ecxt_outertuple;
2036 232060 : AggStatePerHash perhash = &aggstate->perhash[aggstate->current_set];
2037 232060 : TupleTableSlot *hashslot = perhash->hashslot;
2038 : TupleHashEntryData *entry;
2039 : bool isnew;
2040 : int i;
2041 :
2042 : /* transfer just the needed columns into hashslot */
2043 232060 : slot_getsomeattrs(inputslot, perhash->largestGrpColIdx);
2044 232060 : ExecClearTuple(hashslot);
2045 :
2046 571837 : for (i = 0; i < perhash->numhashGrpCols; i++)
2047 : {
2048 339777 : int varNumber = perhash->hashGrpColIdxInput[i] - 1;
2049 :
2050 339777 : hashslot->tts_values[i] = inputslot->tts_values[varNumber];
2051 339777 : hashslot->tts_isnull[i] = inputslot->tts_isnull[varNumber];
2052 : }
2053 232060 : ExecStoreVirtualTuple(hashslot);
2054 :
2055 : /* find or create the hashtable entry using the filtered tuple */
2056 232060 : entry = LookupTupleHashEntry(perhash->hashtable, hashslot, &isnew);
2057 :
2058 232060 : if (isnew)
2059 : {
2060 4779 : entry->additional = (AggStatePerGroup)
2061 4779 : MemoryContextAlloc(perhash->hashtable->tablecxt,
2062 4779 : sizeof(AggStatePerGroupData) * aggstate->numtrans);
2063 : /* initialize aggregates for new tuple group */
2064 4779 : initialize_aggregates(aggstate, (AggStatePerGroup) entry->additional,
2065 : -1);
2066 : }
2067 :
2068 232060 : return entry;
2069 : }
2070 :
2071 : /*
2072 : * Look up hash entries for the current tuple in all hashed grouping sets,
2073 : * returning an array of pergroup pointers suitable for advance_aggregates.
2074 : *
2075 : * Be aware that lookup_hash_entry can reset the tmpcontext.
2076 : */
2077 : static AggStatePerGroup *
2078 221972 : lookup_hash_entries(AggState *aggstate)
2079 : {
2080 221972 : int numHashes = aggstate->num_hashes;
2081 221972 : AggStatePerGroup *pergroup = aggstate->hash_pergroup;
2082 : int setno;
2083 :
2084 454032 : for (setno = 0; setno < numHashes; setno++)
2085 : {
2086 232060 : select_current_set(aggstate, setno, true);
2087 232060 : pergroup[setno] = lookup_hash_entry(aggstate)->additional;
2088 : }
2089 :
2090 221972 : return pergroup;
2091 : }
2092 :
2093 : /*
2094 : * ExecAgg -
2095 : *
2096 : * ExecAgg receives tuples from its outer subplan and aggregates over
2097 : * the appropriate attribute for each aggregate function use (Aggref
2098 : * node) appearing in the targetlist or qual of the node. The number
2099 : * of tuples to aggregate over depends on whether grouped or plain
2100 : * aggregation is selected. In grouped aggregation, we produce a result
2101 : * row for each group; in plain aggregation there's a single result row
2102 : * for the whole query. In either case, the value of each aggregate is
2103 : * stored in the expression context to be used when ExecProject evaluates
2104 : * the result tuple.
2105 : */
2106 : static TupleTableSlot *
2107 10117 : ExecAgg(PlanState *pstate)
2108 : {
2109 10117 : AggState *node = castNode(AggState, pstate);
2110 10117 : TupleTableSlot *result = NULL;
2111 :
2112 10117 : CHECK_FOR_INTERRUPTS();
2113 :
2114 10117 : if (!node->agg_done)
2115 : {
2116 : /* Dispatch based on strategy */
2117 8115 : switch (node->phase->aggstrategy)
2118 : {
2119 : case AGG_HASHED:
2120 5101 : if (!node->table_filled)
2121 338 : agg_fill_hash_table(node);
2122 : /* FALLTHROUGH */
2123 : case AGG_MIXED:
2124 5232 : result = agg_retrieve_hash_table(node);
2125 5232 : break;
2126 : case AGG_PLAIN:
2127 : case AGG_SORTED:
2128 2883 : result = agg_retrieve_direct(node);
2129 2875 : break;
2130 : }
2131 :
2132 8107 : if (!TupIsNull(result))
2133 7616 : return result;
2134 : }
2135 :
2136 2493 : return NULL;
2137 : }
2138 :
2139 : /*
2140 : * ExecAgg for non-hashed case
2141 : */
2142 : static TupleTableSlot *
2143 2883 : agg_retrieve_direct(AggState *aggstate)
2144 : {
2145 2883 : Agg *node = aggstate->phase->aggnode;
2146 : ExprContext *econtext;
2147 : ExprContext *tmpcontext;
2148 : AggStatePerAgg peragg;
2149 : AggStatePerGroup pergroup;
2150 2883 : AggStatePerGroup *hash_pergroups = NULL;
2151 : TupleTableSlot *outerslot;
2152 : TupleTableSlot *firstSlot;
2153 : TupleTableSlot *result;
2154 2883 : bool hasGroupingSets = aggstate->phase->numsets > 0;
2155 2883 : int numGroupingSets = Max(aggstate->phase->numsets, 1);
2156 : int currentSet;
2157 : int nextSetSize;
2158 : int numReset;
2159 : int i;
2160 :
2161 : /*
2162 : * get state info from node
2163 : *
2164 : * econtext is the per-output-tuple expression context
2165 : *
2166 : * tmpcontext is the per-input-tuple expression context
2167 : */
2168 2883 : econtext = aggstate->ss.ps.ps_ExprContext;
2169 2883 : tmpcontext = aggstate->tmpcontext;
2170 :
2171 2883 : peragg = aggstate->peragg;
2172 2883 : pergroup = aggstate->pergroup;
2173 2883 : firstSlot = aggstate->ss.ss_ScanTupleSlot;
2174 :
2175 : /*
2176 : * We loop retrieving groups until we find one matching
2177 : * aggstate->ss.ps.qual
2178 : *
2179 : * For grouping sets, we have the invariant that aggstate->projected_set
2180 : * is either -1 (initial call) or the index (starting from 0) in
2181 : * gset_lengths for the group we just completed (either by projecting a
2182 : * row or by discarding it in the qual).
2183 : */
2184 5816 : while (!aggstate->agg_done)
2185 : {
2186 : /*
2187 : * Clear the per-output-tuple context for each group, as well as
2188 : * aggcontext (which contains any pass-by-ref transvalues of the old
2189 : * group). Some aggregate functions store working state in child
2190 : * contexts; those now get reset automatically without us needing to
2191 : * do anything special.
2192 : *
2193 : * We use ReScanExprContext not just ResetExprContext because we want
2194 : * any registered shutdown callbacks to be called. That allows
2195 : * aggregate functions to ensure they've cleaned up any non-memory
2196 : * resources.
2197 : */
2198 2929 : ReScanExprContext(econtext);
2199 :
2200 : /*
2201 : * Determine how many grouping sets need to be reset at this boundary.
2202 : */
2203 3695 : if (aggstate->projected_set >= 0 &&
2204 766 : aggstate->projected_set < numGroupingSets)
2205 765 : numReset = aggstate->projected_set + 1;
2206 : else
2207 2164 : numReset = numGroupingSets;
2208 :
2209 : /*
2210 : * numReset can change on a phase boundary, but that's OK; we want to
2211 : * reset the contexts used in _this_ phase, and later, after possibly
2212 : * changing phase, initialize the right number of aggregates for the
2213 : * _new_ phase.
2214 : */
2215 :
2216 6251 : for (i = 0; i < numReset; i++)
2217 : {
2218 3322 : ReScanExprContext(aggstate->aggcontexts[i]);
2219 : }
2220 :
2221 : /*
2222 : * Check if input is complete and there are no more groups to project
2223 : * in this phase; move to next phase or mark as done.
2224 : */
2225 3125 : if (aggstate->input_done == true &&
2226 196 : aggstate->projected_set >= (numGroupingSets - 1))
2227 : {
2228 90 : if (aggstate->current_phase < aggstate->numphases - 1)
2229 : {
2230 22 : initialize_phase(aggstate, aggstate->current_phase + 1);
2231 22 : aggstate->input_done = false;
2232 22 : aggstate->projected_set = -1;
2233 22 : numGroupingSets = Max(aggstate->phase->numsets, 1);
2234 22 : node = aggstate->phase->aggnode;
2235 22 : numReset = numGroupingSets;
2236 : }
2237 68 : else if (aggstate->aggstrategy == AGG_MIXED)
2238 : {
2239 : /*
2240 : * Mixed mode; we've output all the grouped stuff and have
2241 : * full hashtables, so switch to outputting those.
2242 : */
2243 11 : initialize_phase(aggstate, 0);
2244 11 : aggstate->table_filled = true;
2245 11 : ResetTupleHashIterator(aggstate->perhash[0].hashtable,
2246 : &aggstate->perhash[0].hashiter);
2247 11 : select_current_set(aggstate, 0, true);
2248 11 : return agg_retrieve_hash_table(aggstate);
2249 : }
2250 : else
2251 : {
2252 57 : aggstate->agg_done = true;
2253 57 : break;
2254 : }
2255 : }
2256 :
2257 : /*
2258 : * Get the number of columns in the next grouping set after the last
2259 : * projected one (if any). This is the number of columns to compare to
2260 : * see if we reached the boundary of that set too.
2261 : */
2262 3537 : if (aggstate->projected_set >= 0 &&
2263 676 : aggstate->projected_set < (numGroupingSets - 1))
2264 379 : nextSetSize = aggstate->phase->gset_lengths[aggstate->projected_set + 1];
2265 : else
2266 2482 : nextSetSize = 0;
2267 :
2268 : /*----------
2269 : * If a subgroup for the current grouping set is present, project it.
2270 : *
2271 : * We have a new group if:
2272 : * - we're out of input but haven't projected all grouping sets
2273 : * (checked above)
2274 : * OR
2275 : * - we already projected a row that wasn't from the last grouping
2276 : * set
2277 : * AND
2278 : * - the next grouping set has at least one grouping column (since
2279 : * empty grouping sets project only once input is exhausted)
2280 : * AND
2281 : * - the previous and pending rows differ on the grouping columns
2282 : * of the next grouping set
2283 : *----------
2284 : */
2285 5616 : if (aggstate->input_done ||
2286 3496 : (node->aggstrategy != AGG_PLAIN &&
2287 1311 : aggstate->projected_set != -1 &&
2288 843 : aggstate->projected_set < (numGroupingSets - 1) &&
2289 192 : nextSetSize > 0 &&
2290 384 : !execTuplesMatch(econtext->ecxt_outertuple,
2291 : tmpcontext->ecxt_outertuple,
2292 : nextSetSize,
2293 : node->grpColIdx,
2294 192 : aggstate->phase->eqfunctions,
2295 : tmpcontext->ecxt_per_tuple_memory)))
2296 : {
2297 187 : aggstate->projected_set += 1;
2298 :
2299 187 : Assert(aggstate->projected_set < numGroupingSets);
2300 187 : Assert(nextSetSize > 0 || aggstate->input_done);
2301 : }
2302 : else
2303 : {
2304 : /*
2305 : * We no longer care what group we just projected, the next
2306 : * projection will always be the first (or only) grouping set
2307 : * (unless the input proves to be empty).
2308 : */
2309 2674 : aggstate->projected_set = 0;
2310 :
2311 : /*
2312 : * If we don't already have the first tuple of the new group,
2313 : * fetch it from the outer plan.
2314 : */
2315 2674 : if (aggstate->grp_firstTuple == NULL)
2316 : {
2317 2185 : outerslot = fetch_input_tuple(aggstate);
2318 2185 : if (!TupIsNull(outerslot))
2319 : {
2320 : /*
2321 : * Make a copy of the first input tuple; we will use this
2322 : * for comparisons (in group mode) and for projection.
2323 : */
2324 2037 : aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot);
2325 : }
2326 : else
2327 : {
2328 : /* outer plan produced no tuples at all */
2329 148 : if (hasGroupingSets)
2330 : {
2331 : /*
2332 : * If there was no input at all, we need to project
2333 : * rows only if there are grouping sets of size 0.
2334 : * Note that this implies that there can't be any
2335 : * references to ungrouped Vars, which would otherwise
2336 : * cause issues with the empty output slot.
2337 : *
2338 : * XXX: This is no longer true, we currently deal with
2339 : * this in finalize_aggregates().
2340 : */
2341 9 : aggstate->input_done = true;
2342 :
2343 22 : while (aggstate->phase->gset_lengths[aggstate->projected_set] > 0)
2344 : {
2345 5 : aggstate->projected_set += 1;
2346 5 : if (aggstate->projected_set >= numGroupingSets)
2347 : {
2348 : /*
2349 : * We can't set agg_done here because we might
2350 : * have more phases to do, even though the
2351 : * input is empty. So we need to restart the
2352 : * whole outer loop.
2353 : */
2354 1 : break;
2355 : }
2356 : }
2357 :
2358 9 : if (aggstate->projected_set >= numGroupingSets)
2359 1 : continue;
2360 : }
2361 : else
2362 : {
2363 139 : aggstate->agg_done = true;
2364 : /* If we are grouping, we should produce no tuples too */
2365 139 : if (node->aggstrategy != AGG_PLAIN)
2366 17 : return NULL;
2367 : }
2368 : }
2369 : }
2370 :
2371 : /*
2372 : * Initialize working state for a new input tuple group.
2373 : */
2374 2656 : initialize_aggregates(aggstate, pergroup, numReset);
2375 :
2376 2656 : if (aggstate->grp_firstTuple != NULL)
2377 : {
2378 : /*
2379 : * Store the copied first input tuple in the tuple table slot
2380 : * reserved for it. The tuple will be deleted when it is
2381 : * cleared from the slot.
2382 : */
2383 2526 : ExecStoreTuple(aggstate->grp_firstTuple,
2384 : firstSlot,
2385 : InvalidBuffer,
2386 : true);
2387 2526 : aggstate->grp_firstTuple = NULL; /* don't keep two pointers */
2388 :
2389 : /* set up for first advance_aggregates call */
2390 2526 : tmpcontext->ecxt_outertuple = firstSlot;
2391 :
2392 : /*
2393 : * Process each outer-plan tuple, and then fetch the next one,
2394 : * until we exhaust the outer plan or cross a group boundary.
2395 : */
2396 : for (;;)
2397 : {
2398 : /*
2399 : * During phase 1 only of a mixed agg, we need to update
2400 : * hashtables as well in advance_aggregates.
2401 : */
2402 9321147 : if (aggstate->aggstrategy == AGG_MIXED &&
2403 4041 : aggstate->current_phase == 1)
2404 : {
2405 4041 : hash_pergroups = lookup_hash_entries(aggstate);
2406 : }
2407 : else
2408 9313065 : hash_pergroups = NULL;
2409 :
2410 9317106 : if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
2411 89 : combine_aggregates(aggstate, pergroup);
2412 : else
2413 9317017 : advance_aggregates(aggstate, pergroup, hash_pergroups);
2414 :
2415 : /* Reset per-input-tuple context after each tuple */
2416 9317099 : ResetExprContext(tmpcontext);
2417 :
2418 9317099 : outerslot = fetch_input_tuple(aggstate);
2419 9317098 : if (TupIsNull(outerslot))
2420 : {
2421 : /* no more outer-plan tuples available */
2422 2028 : if (hasGroupingSets)
2423 : {
2424 81 : aggstate->input_done = true;
2425 81 : break;
2426 : }
2427 : else
2428 : {
2429 1947 : aggstate->agg_done = true;
2430 1947 : break;
2431 : }
2432 : }
2433 : /* set up for next advance_aggregates call */
2434 9315070 : tmpcontext->ecxt_outertuple = outerslot;
2435 :
2436 : /*
2437 : * If we are grouping, check whether we've crossed a group
2438 : * boundary.
2439 : */
2440 9315070 : if (node->aggstrategy != AGG_PLAIN)
2441 : {
2442 165438 : if (!execTuplesMatch(firstSlot,
2443 : outerslot,
2444 : node->numCols,
2445 : node->grpColIdx,
2446 82719 : aggstate->phase->eqfunctions,
2447 : tmpcontext->ecxt_per_tuple_memory))
2448 : {
2449 490 : aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot);
2450 490 : break;
2451 : }
2452 : }
2453 9314580 : }
2454 : }
2455 :
2456 : /*
2457 : * Use the representative input tuple for any references to
2458 : * non-aggregated input columns in aggregate direct args, the node
2459 : * qual, and the tlist. (If we are not grouping, and there are no
2460 : * input rows at all, we will come here with an empty firstSlot
2461 : * ... but if not grouping, there can't be any references to
2462 : * non-aggregated input columns, so no problem.)
2463 : */
2464 2648 : econtext->ecxt_outertuple = firstSlot;
2465 : }
2466 :
2467 2835 : Assert(aggstate->projected_set >= 0);
2468 :
2469 2835 : currentSet = aggstate->projected_set;
2470 :
2471 2835 : prepare_projection_slot(aggstate, econtext->ecxt_outertuple, currentSet);
2472 :
2473 2835 : select_current_set(aggstate, currentSet, false);
2474 :
2475 5670 : finalize_aggregates(aggstate,
2476 : peragg,
2477 5670 : pergroup + (currentSet * aggstate->numtrans));
2478 :
2479 : /*
2480 : * If there's no row to project right now, we must continue rather
2481 : * than returning a null since there might be more groups.
2482 : */
2483 2835 : result = project_aggregates(aggstate);
2484 2835 : if (result)
2485 2786 : return result;
2486 : }
2487 :
2488 : /* No more groups */
2489 61 : return NULL;
2490 : }
2491 :
2492 : /*
2493 : * ExecAgg for hashed case: read input and build hash table
2494 : */
2495 : static void
2496 338 : agg_fill_hash_table(AggState *aggstate)
2497 : {
2498 : TupleTableSlot *outerslot;
2499 338 : ExprContext *tmpcontext = aggstate->tmpcontext;
2500 :
2501 : /*
2502 : * Process each outer-plan tuple, and then fetch the next one, until we
2503 : * exhaust the outer plan.
2504 : */
2505 : for (;;)
2506 : {
2507 : AggStatePerGroup *pergroups;
2508 :
2509 218269 : outerslot = fetch_input_tuple(aggstate);
2510 218269 : if (TupIsNull(outerslot))
2511 : break;
2512 :
2513 : /* set up for lookup_hash_entries and advance_aggregates */
2514 217931 : tmpcontext->ecxt_outertuple = outerslot;
2515 :
2516 : /* Find or build hashtable entries */
2517 217931 : pergroups = lookup_hash_entries(aggstate);
2518 :
2519 : /* Advance the aggregates */
2520 217931 : if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
2521 1 : combine_aggregates(aggstate, pergroups[0]);
2522 : else
2523 217930 : advance_aggregates(aggstate, NULL, pergroups);
2524 :
2525 : /*
2526 : * Reset per-input-tuple context after each tuple, but note that the
2527 : * hash lookups do this too
2528 : */
2529 217931 : ResetExprContext(aggstate->tmpcontext);
2530 217931 : }
2531 :
2532 338 : aggstate->table_filled = true;
2533 : /* Initialize to walk the first hash table */
2534 338 : select_current_set(aggstate, 0, true);
2535 338 : ResetTupleHashIterator(aggstate->perhash[0].hashtable,
2536 : &aggstate->perhash[0].hashiter);
2537 338 : }
2538 :
2539 : /*
2540 : * ExecAgg for hashed case: retrieving groups from hash table
2541 : */
2542 : static TupleTableSlot *
2543 5243 : agg_retrieve_hash_table(AggState *aggstate)
2544 : {
2545 : ExprContext *econtext;
2546 : AggStatePerAgg peragg;
2547 : AggStatePerGroup pergroup;
2548 : TupleHashEntryData *entry;
2549 : TupleTableSlot *firstSlot;
2550 : TupleTableSlot *result;
2551 : AggStatePerHash perhash;
2552 :
2553 : /*
2554 : * get state info from node.
2555 : *
2556 : * econtext is the per-output-tuple expression context.
2557 : */
2558 5243 : econtext = aggstate->ss.ps.ps_ExprContext;
2559 5243 : peragg = aggstate->peragg;
2560 5243 : firstSlot = aggstate->ss.ss_ScanTupleSlot;
2561 :
2562 : /*
2563 : * Note that perhash (and therefore anything accessed through it) can
2564 : * change inside the loop, as we change between grouping sets.
2565 : */
2566 5243 : perhash = &aggstate->perhash[aggstate->current_set];
2567 :
2568 : /*
2569 : * We loop retrieving groups until we find one satisfying
2570 : * aggstate->ss.ps.qual
2571 : */
2572 10527 : while (!aggstate->agg_done)
2573 : {
2574 5284 : TupleTableSlot *hashslot = perhash->hashslot;
2575 : int i;
2576 :
2577 5284 : CHECK_FOR_INTERRUPTS();
2578 :
2579 : /*
2580 : * Find the next entry in the hash table
2581 : */
2582 5284 : entry = ScanTupleHashTable(perhash->hashtable, &perhash->hashiter);
2583 5284 : if (entry == NULL)
2584 : {
2585 437 : int nextset = aggstate->current_set + 1;
2586 :
2587 437 : if (nextset < aggstate->num_hashes)
2588 : {
2589 : /*
2590 : * Switch to next grouping set, reinitialize, and restart the
2591 : * loop.
2592 : */
2593 24 : select_current_set(aggstate, nextset, true);
2594 :
2595 24 : perhash = &aggstate->perhash[aggstate->current_set];
2596 :
2597 24 : ResetTupleHashIterator(perhash->hashtable, &perhash->hashiter);
2598 :
2599 24 : continue;
2600 : }
2601 : else
2602 : {
2603 : /* No more hashtables, so done */
2604 413 : aggstate->agg_done = TRUE;
2605 413 : return NULL;
2606 : }
2607 : }
2608 :
2609 : /*
2610 : * Clear the per-output-tuple context for each group
2611 : *
2612 : * We intentionally don't use ReScanExprContext here; if any aggs have
2613 : * registered shutdown callbacks, they mustn't be called yet, since we
2614 : * might not be done with that agg.
2615 : */
2616 4847 : ResetExprContext(econtext);
2617 :
2618 : /*
2619 : * Transform representative tuple back into one with the right
2620 : * columns.
2621 : */
2622 4847 : ExecStoreMinimalTuple(entry->firstTuple, hashslot, false);
2623 4847 : slot_getallattrs(hashslot);
2624 :
2625 4847 : ExecClearTuple(firstSlot);
2626 4847 : memset(firstSlot->tts_isnull, true,
2627 4847 : firstSlot->tts_tupleDescriptor->natts * sizeof(bool));
2628 :
2629 10190 : for (i = 0; i < perhash->numhashGrpCols; i++)
2630 : {
2631 5343 : int varNumber = perhash->hashGrpColIdxInput[i] - 1;
2632 :
2633 5343 : firstSlot->tts_values[varNumber] = hashslot->tts_values[i];
2634 5343 : firstSlot->tts_isnull[varNumber] = hashslot->tts_isnull[i];
2635 : }
2636 4847 : ExecStoreVirtualTuple(firstSlot);
2637 :
2638 4847 : pergroup = (AggStatePerGroup) entry->additional;
2639 :
2640 : /*
2641 : * Use the representative input tuple for any references to
2642 : * non-aggregated input columns in the qual and tlist.
2643 : */
2644 4847 : econtext->ecxt_outertuple = firstSlot;
2645 :
2646 4847 : prepare_projection_slot(aggstate,
2647 : econtext->ecxt_outertuple,
2648 : aggstate->current_set);
2649 :
2650 4847 : finalize_aggregates(aggstate, peragg, pergroup);
2651 :
2652 4847 : result = project_aggregates(aggstate);
2653 4847 : if (result)
2654 4830 : return result;
2655 : }
2656 :
2657 : /* No more groups */
2658 0 : return NULL;
2659 : }
2660 :
2661 : /* -----------------
2662 : * ExecInitAgg
2663 : *
2664 : * Creates the run-time information for the agg node produced by the
2665 : * planner and initializes its outer subtree.
2666 : *
2667 : * -----------------
2668 : */
2669 : AggState *
2670 2664 : ExecInitAgg(Agg *node, EState *estate, int eflags)
2671 : {
2672 : AggState *aggstate;
2673 : AggStatePerAgg peraggs;
2674 : AggStatePerTrans pertransstates;
2675 : Plan *outerPlan;
2676 : ExprContext *econtext;
2677 : int numaggs,
2678 : transno,
2679 : aggno;
2680 : int phase;
2681 : int phaseidx;
2682 : List *combined_inputeval;
2683 : ListCell *l;
2684 2664 : Bitmapset *all_grouped_cols = NULL;
2685 2664 : int numGroupingSets = 1;
2686 : int numPhases;
2687 : int numHashes;
2688 : int column_offset;
2689 2664 : int i = 0;
2690 2664 : int j = 0;
2691 5064 : bool use_hashing = (node->aggstrategy == AGG_HASHED ||
2692 2400 : node->aggstrategy == AGG_MIXED);
2693 :
2694 : /* check for unsupported flags */
2695 2664 : Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
2696 :
2697 : /*
2698 : * create state structure
2699 : */
2700 2664 : aggstate = makeNode(AggState);
2701 2664 : aggstate->ss.ps.plan = (Plan *) node;
2702 2664 : aggstate->ss.ps.state = estate;
2703 2664 : aggstate->ss.ps.ExecProcNode = ExecAgg;
2704 :
2705 2664 : aggstate->aggs = NIL;
2706 2664 : aggstate->numaggs = 0;
2707 2664 : aggstate->numtrans = 0;
2708 2664 : aggstate->aggstrategy = node->aggstrategy;
2709 2664 : aggstate->aggsplit = node->aggsplit;
2710 2664 : aggstate->maxsets = 0;
2711 2664 : aggstate->projected_set = -1;
2712 2664 : aggstate->current_set = 0;
2713 2664 : aggstate->peragg = NULL;
2714 2664 : aggstate->pertrans = NULL;
2715 2664 : aggstate->curpertrans = NULL;
2716 2664 : aggstate->input_done = false;
2717 2664 : aggstate->agg_done = false;
2718 2664 : aggstate->pergroup = NULL;
2719 2664 : aggstate->grp_firstTuple = NULL;
2720 2664 : aggstate->sort_in = NULL;
2721 2664 : aggstate->sort_out = NULL;
2722 :
2723 : /*
2724 : * phases[0] always exists, but is dummy in sorted/plain mode
2725 : */
2726 2664 : numPhases = (use_hashing ? 1 : 2);
2727 2664 : numHashes = (use_hashing ? 1 : 0);
2728 :
2729 : /*
2730 : * Calculate the maximum number of grouping sets in any phase; this
2731 : * determines the size of some allocations. Also calculate the number of
2732 : * phases, since all hashed/mixed nodes contribute to only a single phase.
2733 : */
2734 2664 : if (node->groupingSets)
2735 : {
2736 88 : numGroupingSets = list_length(node->groupingSets);
2737 :
2738 170 : foreach(l, node->chain)
2739 : {
2740 82 : Agg *agg = lfirst(l);
2741 :
2742 82 : numGroupingSets = Max(numGroupingSets,
2743 : list_length(agg->groupingSets));
2744 :
2745 : /*
2746 : * additional AGG_HASHED aggs become part of phase 0, but all
2747 : * others add an extra phase.
2748 : */
2749 82 : if (agg->aggstrategy != AGG_HASHED)
2750 42 : ++numPhases;
2751 : else
2752 40 : ++numHashes;
2753 : }
2754 : }
2755 :
2756 2664 : aggstate->maxsets = numGroupingSets;
2757 2664 : aggstate->numphases = numPhases;
2758 :
2759 2664 : aggstate->aggcontexts = (ExprContext **)
2760 2664 : palloc0(sizeof(ExprContext *) * numGroupingSets);
2761 :
2762 : /*
2763 : * Create expression contexts. We need three or more, one for
2764 : * per-input-tuple processing, one for per-output-tuple processing, one
2765 : * for all the hashtables, and one for each grouping set. The per-tuple
2766 : * memory context of the per-grouping-set ExprContexts (aggcontexts)
2767 : * replaces the standalone memory context formerly used to hold transition
2768 : * values. We cheat a little by using ExecAssignExprContext() to build
2769 : * all of them.
2770 : *
2771 : * NOTE: the details of what is stored in aggcontexts and what is stored
2772 : * in the regular per-query memory context are driven by a simple
2773 : * decision: we want to reset the aggcontext at group boundaries (if not
2774 : * hashing) and in ExecReScanAgg to recover no-longer-wanted space.
2775 : */
2776 2664 : ExecAssignExprContext(estate, &aggstate->ss.ps);
2777 2664 : aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext;
2778 :
2779 5433 : for (i = 0; i < numGroupingSets; ++i)
2780 : {
2781 2769 : ExecAssignExprContext(estate, &aggstate->ss.ps);
2782 2769 : aggstate->aggcontexts[i] = aggstate->ss.ps.ps_ExprContext;
2783 : }
2784 :
2785 2664 : if (use_hashing)
2786 : {
2787 282 : ExecAssignExprContext(estate, &aggstate->ss.ps);
2788 282 : aggstate->hashcontext = aggstate->ss.ps.ps_ExprContext;
2789 : }
2790 :
2791 2664 : ExecAssignExprContext(estate, &aggstate->ss.ps);
2792 :
2793 : /*
2794 : * tuple table initialization.
2795 : *
2796 : * For hashtables, we create some additional slots below.
2797 : */
2798 2664 : ExecInitScanTupleSlot(estate, &aggstate->ss);
2799 2664 : ExecInitResultTupleSlot(estate, &aggstate->ss.ps);
2800 2664 : aggstate->sort_slot = ExecInitExtraTupleSlot(estate);
2801 :
2802 : /*
2803 : * initialize child expressions
2804 : *
2805 : * We rely on the parser to have checked that no aggs contain other agg
2806 : * calls in their arguments. This would make no sense under SQL semantics
2807 : * (and it's forbidden by the spec). Because it is true, we don't need to
2808 : * worry about evaluating the aggs in any particular order.
2809 : *
2810 : * Note: execExpr.c finds Aggrefs for us, and adds their AggrefExprState
2811 : * nodes to aggstate->aggs. Aggrefs in the qual are found here; Aggrefs
2812 : * in the targetlist are found during ExecAssignProjectionInfo, below.
2813 : */
2814 2664 : aggstate->ss.ps.qual =
2815 2664 : ExecInitQual(node->plan.qual, (PlanState *) aggstate);
2816 :
2817 : /*
2818 : * Initialize child nodes.
2819 : *
2820 : * If we are doing a hashed aggregation then the child plan does not need
2821 : * to handle REWIND efficiently; see ExecReScanAgg.
2822 : */
2823 2664 : if (node->aggstrategy == AGG_HASHED)
2824 264 : eflags &= ~EXEC_FLAG_REWIND;
2825 2664 : outerPlan = outerPlan(node);
2826 2664 : outerPlanState(aggstate) = ExecInitNode(outerPlan, estate, eflags);
2827 :
2828 : /*
2829 : * initialize source tuple type.
2830 : */
2831 2664 : ExecAssignScanTypeFromOuterPlan(&aggstate->ss);
2832 2664 : if (node->chain)
2833 47 : ExecSetSlotDescriptor(aggstate->sort_slot,
2834 47 : aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
2835 :
2836 : /*
2837 : * Initialize result tuple type and projection info.
2838 : */
2839 2664 : ExecAssignResultTypeFromTL(&aggstate->ss.ps);
2840 2664 : ExecAssignProjectionInfo(&aggstate->ss.ps, NULL);
2841 :
2842 : /*
2843 : * We should now have found all Aggrefs in the targetlist and quals.
2844 : */
2845 2664 : numaggs = aggstate->numaggs;
2846 2664 : Assert(numaggs == list_length(aggstate->aggs));
2847 2664 : if (numaggs <= 0)
2848 : {
2849 : /*
2850 : * This is not an error condition: we might be using the Agg node just
2851 : * to do hash-based grouping. Even in the regular case,
2852 : * constant-expression simplification could optimize away all of the
2853 : * Aggrefs in the targetlist and qual. So keep going, but force local
2854 : * copy of numaggs positive so that palloc()s below don't choke.
2855 : */
2856 134 : numaggs = 1;
2857 : }
2858 :
2859 : /*
2860 : * For each phase, prepare grouping set data and fmgr lookup data for
2861 : * compare functions. Accumulate all_grouped_cols in passing.
2862 : */
2863 2664 : aggstate->phases = palloc0(numPhases * sizeof(AggStatePerPhaseData));
2864 :
2865 2664 : aggstate->num_hashes = numHashes;
2866 2664 : if (numHashes)
2867 : {
2868 282 : aggstate->perhash = palloc0(sizeof(AggStatePerHashData) * numHashes);
2869 282 : aggstate->phases[0].numsets = 0;
2870 282 : aggstate->phases[0].gset_lengths = palloc(numHashes * sizeof(int));
2871 282 : aggstate->phases[0].grouped_cols = palloc(numHashes * sizeof(Bitmapset *));
2872 : }
2873 :
2874 2664 : phase = 0;
2875 5410 : for (phaseidx = 0; phaseidx <= list_length(node->chain); ++phaseidx)
2876 : {
2877 : Agg *aggnode;
2878 : Sort *sortnode;
2879 :
2880 2746 : if (phaseidx > 0)
2881 : {
2882 82 : aggnode = list_nth_node(Agg, node->chain, phaseidx - 1);
2883 82 : sortnode = castNode(Sort, aggnode->plan.lefttree);
2884 : }
2885 : else
2886 : {
2887 2664 : aggnode = node;
2888 2664 : sortnode = NULL;
2889 : }
2890 :
2891 2746 : Assert(phase <= 1 || sortnode);
2892 :
2893 2746 : if (aggnode->aggstrategy == AGG_HASHED
2894 2442 : || aggnode->aggstrategy == AGG_MIXED)
2895 : {
2896 322 : AggStatePerPhase phasedata = &aggstate->phases[0];
2897 : AggStatePerHash perhash;
2898 322 : Bitmapset *cols = NULL;
2899 :
2900 322 : Assert(phase == 0);
2901 322 : i = phasedata->numsets++;
2902 322 : perhash = &aggstate->perhash[i];
2903 :
2904 : /* phase 0 always points to the "real" Agg in the hash case */
2905 322 : phasedata->aggnode = node;
2906 322 : phasedata->aggstrategy = node->aggstrategy;
2907 :
2908 : /* but the actual Agg node representing this hash is saved here */
2909 322 : perhash->aggnode = aggnode;
2910 :
2911 322 : phasedata->gset_lengths[i] = perhash->numCols = aggnode->numCols;
2912 :
2913 744 : for (j = 0; j < aggnode->numCols; ++j)
2914 422 : cols = bms_add_member(cols, aggnode->grpColIdx[j]);
2915 :
2916 322 : phasedata->grouped_cols[i] = cols;
2917 :
2918 322 : all_grouped_cols = bms_add_members(all_grouped_cols, cols);
2919 322 : continue;
2920 : }
2921 : else
2922 : {
2923 2424 : AggStatePerPhase phasedata = &aggstate->phases[++phase];
2924 : int num_sets;
2925 :
2926 2424 : phasedata->numsets = num_sets = list_length(aggnode->groupingSets);
2927 :
2928 2424 : if (num_sets)
2929 : {
2930 100 : phasedata->gset_lengths = palloc(num_sets * sizeof(int));
2931 100 : phasedata->grouped_cols = palloc(num_sets * sizeof(Bitmapset *));
2932 :
2933 100 : i = 0;
2934 313 : foreach(l, aggnode->groupingSets)
2935 : {
2936 213 : int current_length = list_length(lfirst(l));
2937 213 : Bitmapset *cols = NULL;
2938 :
2939 : /* planner forces this to be correct */
2940 415 : for (j = 0; j < current_length; ++j)
2941 202 : cols = bms_add_member(cols, aggnode->grpColIdx[j]);
2942 :
2943 213 : phasedata->grouped_cols[i] = cols;
2944 213 : phasedata->gset_lengths[i] = current_length;
2945 :
2946 213 : ++i;
2947 : }
2948 :
2949 100 : all_grouped_cols = bms_add_members(all_grouped_cols,
2950 100 : phasedata->grouped_cols[0]);
2951 : }
2952 : else
2953 : {
2954 2324 : Assert(phaseidx == 0);
2955 :
2956 2324 : phasedata->gset_lengths = NULL;
2957 2324 : phasedata->grouped_cols = NULL;
2958 : }
2959 :
2960 : /*
2961 : * If we are grouping, precompute fmgr lookup data for inner loop.
2962 : */
2963 2424 : if (aggnode->aggstrategy == AGG_SORTED)
2964 : {
2965 189 : Assert(aggnode->numCols > 0);
2966 :
2967 189 : phasedata->eqfunctions =
2968 189 : execTuplesMatchPrepare(aggnode->numCols,
2969 : aggnode->grpOperators);
2970 : }
2971 :
2972 2424 : phasedata->aggnode = aggnode;
2973 2424 : phasedata->aggstrategy = aggnode->aggstrategy;
2974 2424 : phasedata->sortnode = sortnode;
2975 : }
2976 : }
2977 :
2978 : /*
2979 : * Convert all_grouped_cols to a descending-order list.
2980 : */
2981 2664 : i = -1;
2982 5844 : while ((i = bms_next_member(all_grouped_cols, i)) >= 0)
2983 516 : aggstate->all_grouped_cols = lcons_int(i, aggstate->all_grouped_cols);
2984 :
2985 : /*
2986 : * Set up aggregate-result storage in the output expr context, and also
2987 : * allocate my private per-agg working storage
2988 : */
2989 2664 : econtext = aggstate->ss.ps.ps_ExprContext;
2990 2664 : econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numaggs);
2991 2664 : econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numaggs);
2992 :
2993 2664 : peraggs = (AggStatePerAgg) palloc0(sizeof(AggStatePerAggData) * numaggs);
2994 2664 : pertransstates = (AggStatePerTrans) palloc0(sizeof(AggStatePerTransData) * numaggs);
2995 :
2996 2664 : aggstate->peragg = peraggs;
2997 2664 : aggstate->pertrans = pertransstates;
2998 :
2999 : /*
3000 : * Hashing can only appear in the initial phase.
3001 : */
3002 2664 : if (use_hashing)
3003 : {
3004 604 : for (i = 0; i < numHashes; ++i)
3005 : {
3006 322 : aggstate->perhash[i].hashslot = ExecInitExtraTupleSlot(estate);
3007 :
3008 966 : execTuplesHashPrepare(aggstate->perhash[i].numCols,
3009 322 : aggstate->perhash[i].aggnode->grpOperators,
3010 322 : &aggstate->perhash[i].eqfunctions,
3011 322 : &aggstate->perhash[i].hashfunctions);
3012 : }
3013 :
3014 : /* this is an array of pointers, not structures */
3015 282 : aggstate->hash_pergroup = palloc0(sizeof(AggStatePerGroup) * numHashes);
3016 :
3017 282 : find_hash_columns(aggstate);
3018 282 : build_hash_table(aggstate);
3019 282 : aggstate->table_filled = false;
3020 : }
3021 :
3022 2664 : if (node->aggstrategy != AGG_HASHED)
3023 : {
3024 : AggStatePerGroup pergroup;
3025 :
3026 2400 : pergroup = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData)
3027 : * numaggs
3028 2400 : * numGroupingSets);
3029 :
3030 2400 : aggstate->pergroup = pergroup;
3031 : }
3032 :
3033 : /*
3034 : * Initialize current phase-dependent values to initial phase. The initial
3035 : * phase is 1 (first sort pass) for all strategies that use sorting (if
3036 : * hashing is being done too, then phase 0 is processed last); but if only
3037 : * hashing is being done, then phase 0 is all there is.
3038 : */
3039 2664 : if (node->aggstrategy == AGG_HASHED)
3040 : {
3041 264 : aggstate->current_phase = 0;
3042 264 : initialize_phase(aggstate, 0);
3043 264 : select_current_set(aggstate, 0, true);
3044 : }
3045 : else
3046 : {
3047 2400 : aggstate->current_phase = 1;
3048 2400 : initialize_phase(aggstate, 1);
3049 2400 : select_current_set(aggstate, 0, false);
3050 : }
3051 :
3052 : /* -----------------
3053 : * Perform lookups of aggregate function info, and initialize the
3054 : * unchanging fields of the per-agg and per-trans data.
3055 : *
3056 : * We try to optimize by detecting duplicate aggregate functions so that
3057 : * their state and final values are re-used, rather than needlessly being
3058 : * re-calculated independently. We also detect aggregates that are not
3059 : * the same, but which can share the same transition state.
3060 : *
3061 : * Scenarios:
3062 : *
3063 : * 1. An aggregate function appears more than once in query:
3064 : *
3065 : * SELECT SUM(x) FROM ... HAVING SUM(x) > 0
3066 : *
3067 : * Since the aggregates are the identical, we only need to calculate
3068 : * the calculate it once. Both aggregates will share the same 'aggno'
3069 : * value.
3070 : *
3071 : * 2. Two different aggregate functions appear in the query, but the
3072 : * aggregates have the same transition function and initial value, but
3073 : * different final function:
3074 : *
3075 : * SELECT SUM(x), AVG(x) FROM ...
3076 : *
3077 : * In this case we must create a new peragg for the varying aggregate,
3078 : * and need to call the final functions separately, but can share the
3079 : * same transition state.
3080 : *
3081 : * For either of these optimizations to be valid, the aggregate's
3082 : * arguments must be the same, including any modifiers such as ORDER BY,
3083 : * DISTINCT and FILTER, and they mustn't contain any volatile functions.
3084 : * -----------------
3085 : */
3086 2664 : aggno = -1;
3087 2664 : transno = -1;
3088 5432 : foreach(l, aggstate->aggs)
3089 : {
3090 2768 : AggrefExprState *aggrefstate = (AggrefExprState *) lfirst(l);
3091 2768 : Aggref *aggref = aggrefstate->aggref;
3092 : AggStatePerAgg peragg;
3093 : AggStatePerTrans pertrans;
3094 : int existing_aggno;
3095 : int existing_transno;
3096 : List *same_input_transnos;
3097 : Oid inputTypes[FUNC_MAX_ARGS];
3098 : int numArguments;
3099 : int numDirectArgs;
3100 : HeapTuple aggTuple;
3101 : Form_pg_aggregate aggform;
3102 : AclResult aclresult;
3103 : Oid transfn_oid,
3104 : finalfn_oid;
3105 : Oid serialfn_oid,
3106 : deserialfn_oid;
3107 : Expr *finalfnexpr;
3108 : Oid aggtranstype;
3109 : Datum textInitVal;
3110 : Datum initValue;
3111 : bool initValueIsNull;
3112 :
3113 : /* Planner should have assigned aggregate to correct level */
3114 2768 : Assert(aggref->agglevelsup == 0);
3115 : /* ... and the split mode should match */
3116 2768 : Assert(aggref->aggsplit == aggstate->aggsplit);
3117 :
3118 : /* 1. Check for already processed aggs which can be re-used */
3119 2768 : existing_aggno = find_compatible_peragg(aggref, aggstate, aggno,
3120 : &same_input_transnos);
3121 2768 : if (existing_aggno != -1)
3122 : {
3123 : /*
3124 : * Existing compatible agg found. so just point the Aggref to the
3125 : * same per-agg struct.
3126 : */
3127 23 : aggrefstate->aggno = existing_aggno;
3128 23 : continue;
3129 : }
3130 :
3131 : /* Mark Aggref state node with assigned index in the result array */
3132 2745 : peragg = &peraggs[++aggno];
3133 2745 : peragg->aggref = aggref;
3134 2745 : aggrefstate->aggno = aggno;
3135 :
3136 : /* Fetch the pg_aggregate row */
3137 2745 : aggTuple = SearchSysCache1(AGGFNOID,
3138 : ObjectIdGetDatum(aggref->aggfnoid));
3139 2745 : if (!HeapTupleIsValid(aggTuple))
3140 0 : elog(ERROR, "cache lookup failed for aggregate %u",
3141 : aggref->aggfnoid);
3142 2745 : aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
3143 :
3144 : /* Check permission to call aggregate function */
3145 2745 : aclresult = pg_proc_aclcheck(aggref->aggfnoid, GetUserId(),
3146 : ACL_EXECUTE);
3147 2745 : if (aclresult != ACLCHECK_OK)
3148 0 : aclcheck_error(aclresult, ACL_KIND_PROC,
3149 0 : get_func_name(aggref->aggfnoid));
3150 2745 : InvokeFunctionExecuteHook(aggref->aggfnoid);
3151 :
3152 : /* planner recorded transition state type in the Aggref itself */
3153 2745 : aggtranstype = aggref->aggtranstype;
3154 2745 : Assert(OidIsValid(aggtranstype));
3155 :
3156 : /*
3157 : * If this aggregation is performing state combines, then instead of
3158 : * using the transition function, we'll use the combine function
3159 : */
3160 2745 : if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
3161 : {
3162 20 : transfn_oid = aggform->aggcombinefn;
3163 :
3164 : /* If not set then the planner messed up */
3165 20 : if (!OidIsValid(transfn_oid))
3166 0 : elog(ERROR, "combinefn not set for aggregate function");
3167 : }
3168 : else
3169 2725 : transfn_oid = aggform->aggtransfn;
3170 :
3171 : /* Final function only required if we're finalizing the aggregates */
3172 2745 : if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
3173 81 : peragg->finalfn_oid = finalfn_oid = InvalidOid;
3174 : else
3175 2664 : peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
3176 :
3177 2745 : serialfn_oid = InvalidOid;
3178 2745 : deserialfn_oid = InvalidOid;
3179 :
3180 : /*
3181 : * Check if serialization/deserialization is required. We only do it
3182 : * for aggregates that have transtype INTERNAL.
3183 : */
3184 2745 : if (aggtranstype == INTERNALOID)
3185 : {
3186 : /*
3187 : * The planner should only have generated a serialize agg node if
3188 : * every aggregate with an INTERNAL state has a serialization
3189 : * function. Verify that.
3190 : */
3191 1437 : if (DO_AGGSPLIT_SERIALIZE(aggstate->aggsplit))
3192 : {
3193 : /* serialization only valid when not running finalfn */
3194 0 : Assert(DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
3195 :
3196 0 : if (!OidIsValid(aggform->aggserialfn))
3197 0 : elog(ERROR, "serialfunc not provided for serialization aggregation");
3198 0 : serialfn_oid = aggform->aggserialfn;
3199 : }
3200 :
3201 : /* Likewise for deserialization functions */
3202 1437 : if (DO_AGGSPLIT_DESERIALIZE(aggstate->aggsplit))
3203 : {
3204 : /* deserialization only valid when combining states */
3205 0 : Assert(DO_AGGSPLIT_COMBINE(aggstate->aggsplit));
3206 :
3207 0 : if (!OidIsValid(aggform->aggdeserialfn))
3208 0 : elog(ERROR, "deserialfunc not provided for deserialization aggregation");
3209 0 : deserialfn_oid = aggform->aggdeserialfn;
3210 : }
3211 : }
3212 :
3213 : /* Check that aggregate owner has permission to call component fns */
3214 : {
3215 : HeapTuple procTuple;
3216 : Oid aggOwner;
3217 :
3218 2745 : procTuple = SearchSysCache1(PROCOID,
3219 : ObjectIdGetDatum(aggref->aggfnoid));
3220 2745 : if (!HeapTupleIsValid(procTuple))
3221 0 : elog(ERROR, "cache lookup failed for function %u",
3222 : aggref->aggfnoid);
3223 2745 : aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
3224 2745 : ReleaseSysCache(procTuple);
3225 :
3226 2745 : aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
3227 : ACL_EXECUTE);
3228 2745 : if (aclresult != ACLCHECK_OK)
3229 0 : aclcheck_error(aclresult, ACL_KIND_PROC,
3230 0 : get_func_name(transfn_oid));
3231 2745 : InvokeFunctionExecuteHook(transfn_oid);
3232 2745 : if (OidIsValid(finalfn_oid))
3233 : {
3234 1495 : aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
3235 : ACL_EXECUTE);
3236 1495 : if (aclresult != ACLCHECK_OK)
3237 0 : aclcheck_error(aclresult, ACL_KIND_PROC,
3238 0 : get_func_name(finalfn_oid));
3239 1495 : InvokeFunctionExecuteHook(finalfn_oid);
3240 : }
3241 2745 : if (OidIsValid(serialfn_oid))
3242 : {
3243 0 : aclresult = pg_proc_aclcheck(serialfn_oid, aggOwner,
3244 : ACL_EXECUTE);
3245 0 : if (aclresult != ACLCHECK_OK)
3246 0 : aclcheck_error(aclresult, ACL_KIND_PROC,
3247 0 : get_func_name(serialfn_oid));
3248 0 : InvokeFunctionExecuteHook(serialfn_oid);
3249 : }
3250 2745 : if (OidIsValid(deserialfn_oid))
3251 : {
3252 0 : aclresult = pg_proc_aclcheck(deserialfn_oid, aggOwner,
3253 : ACL_EXECUTE);
3254 0 : if (aclresult != ACLCHECK_OK)
3255 0 : aclcheck_error(aclresult, ACL_KIND_PROC,
3256 0 : get_func_name(deserialfn_oid));
3257 0 : InvokeFunctionExecuteHook(deserialfn_oid);
3258 : }
3259 : }
3260 :
3261 : /*
3262 : * Get actual datatypes of the (nominal) aggregate inputs. These
3263 : * could be different from the agg's declared input types, when the
3264 : * agg accepts ANY or a polymorphic type.
3265 : */
3266 2745 : numArguments = get_aggregate_argtypes(aggref, inputTypes);
3267 :
3268 : /* Count the "direct" arguments, if any */
3269 2745 : numDirectArgs = list_length(aggref->aggdirectargs);
3270 :
3271 : /* Detect how many arguments to pass to the finalfn */
3272 2745 : if (aggform->aggfinalextra)
3273 1125 : peragg->numFinalArgs = numArguments + 1;
3274 : else
3275 1620 : peragg->numFinalArgs = numDirectArgs + 1;
3276 :
3277 : /*
3278 : * build expression trees using actual argument & result types for the
3279 : * finalfn, if it exists and is required.
3280 : */
3281 2745 : if (OidIsValid(finalfn_oid))
3282 : {
3283 1495 : build_aggregate_finalfn_expr(inputTypes,
3284 : peragg->numFinalArgs,
3285 : aggtranstype,
3286 : aggref->aggtype,
3287 : aggref->inputcollid,
3288 : finalfn_oid,
3289 : &finalfnexpr);
3290 1495 : fmgr_info(finalfn_oid, &peragg->finalfn);
3291 1495 : fmgr_info_set_expr((Node *) finalfnexpr, &peragg->finalfn);
3292 : }
3293 :
3294 : /* get info about the output value's datatype */
3295 2745 : get_typlenbyval(aggref->aggtype,
3296 : &peragg->resulttypeLen,
3297 : &peragg->resulttypeByVal);
3298 :
3299 : /*
3300 : * initval is potentially null, so don't try to access it as a struct
3301 : * field. Must do it the hard way with SysCacheGetAttr.
3302 : */
3303 2745 : textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,
3304 : Anum_pg_aggregate_agginitval,
3305 : &initValueIsNull);
3306 2745 : if (initValueIsNull)
3307 1765 : initValue = (Datum) 0;
3308 : else
3309 980 : initValue = GetAggInitVal(textInitVal, aggtranstype);
3310 :
3311 : /*
3312 : * 2. Build working state for invoking the transition function, or
3313 : * look up previously initialized working state, if we can share it.
3314 : *
3315 : * find_compatible_peragg() already collected a list of per-Trans's
3316 : * with the same inputs. Check if any of them have the same transition
3317 : * function and initial value.
3318 : */
3319 2745 : existing_transno = find_compatible_pertrans(aggstate, aggref,
3320 : transfn_oid, aggtranstype,
3321 : serialfn_oid, deserialfn_oid,
3322 : initValue, initValueIsNull,
3323 : same_input_transnos);
3324 2745 : if (existing_transno != -1)
3325 : {
3326 : /*
3327 : * Existing compatible trans found, so just point the 'peragg' to
3328 : * the same per-trans struct.
3329 : */
3330 7 : pertrans = &pertransstates[existing_transno];
3331 7 : peragg->transno = existing_transno;
3332 : }
3333 : else
3334 : {
3335 2738 : pertrans = &pertransstates[++transno];
3336 2738 : build_pertrans_for_aggref(pertrans, aggstate, estate,
3337 : aggref, transfn_oid, aggtranstype,
3338 : serialfn_oid, deserialfn_oid,
3339 : initValue, initValueIsNull,
3340 : inputTypes, numArguments);
3341 2738 : peragg->transno = transno;
3342 : }
3343 2745 : ReleaseSysCache(aggTuple);
3344 : }
3345 :
3346 : /*
3347 : * Update numaggs to match the number of unique aggregates found. Also set
3348 : * numstates to the number of unique aggregate states found.
3349 : */
3350 2664 : aggstate->numaggs = aggno + 1;
3351 2664 : aggstate->numtrans = transno + 1;
3352 :
3353 : /*
3354 : * Build a single projection computing the aggregate arguments for all
3355 : * aggregates at once, that's considerably faster than doing it separately
3356 : * for each.
3357 : *
3358 : * First create a targetlist combining the targetlist of all the
3359 : * transitions.
3360 : */
3361 2664 : combined_inputeval = NIL;
3362 2664 : column_offset = 0;
3363 5402 : for (transno = 0; transno < aggstate->numtrans; transno++)
3364 : {
3365 2738 : AggStatePerTrans pertrans = &pertransstates[transno];
3366 : ListCell *arg;
3367 :
3368 2738 : pertrans->inputoff = column_offset;
3369 :
3370 : /*
3371 : * Adjust resno in a copied target entries, to point into the combined
3372 : * slot.
3373 : */
3374 5044 : foreach(arg, pertrans->aggref->args)
3375 : {
3376 2306 : TargetEntry *source_tle = lfirst_node(TargetEntry, arg);
3377 : TargetEntry *tle;
3378 :
3379 2306 : tle = flatCopyTargetEntry(source_tle);
3380 2306 : tle->resno += column_offset;
3381 :
3382 2306 : combined_inputeval = lappend(combined_inputeval, tle);
3383 : }
3384 :
3385 2738 : column_offset += list_length(pertrans->aggref->args);
3386 : }
3387 :
3388 : /* and then create a projection for that targetlist */
3389 2664 : aggstate->evaldesc = ExecTypeFromTL(combined_inputeval, false);
3390 2664 : aggstate->evalslot = ExecInitExtraTupleSlot(estate);
3391 2664 : aggstate->evalproj = ExecBuildProjectionInfo(combined_inputeval,
3392 : aggstate->tmpcontext,
3393 : aggstate->evalslot,
3394 : &aggstate->ss.ps,
3395 : NULL);
3396 2664 : ExecSetSlotDescriptor(aggstate->evalslot, aggstate->evaldesc);
3397 :
3398 2664 : return aggstate;
3399 : }
3400 :
3401 : /*
3402 : * Build the state needed to calculate a state value for an aggregate.
3403 : *
3404 : * This initializes all the fields in 'pertrans'. 'aggref' is the aggregate
3405 : * to initialize the state for. 'aggtransfn', 'aggtranstype', and the rest
3406 : * of the arguments could be calculated from 'aggref', but the caller has
3407 : * calculated them already, so might as well pass them.
3408 : */
3409 : static void
3410 2738 : build_pertrans_for_aggref(AggStatePerTrans pertrans,
3411 : AggState *aggstate, EState *estate,
3412 : Aggref *aggref,
3413 : Oid aggtransfn, Oid aggtranstype,
3414 : Oid aggserialfn, Oid aggdeserialfn,
3415 : Datum initValue, bool initValueIsNull,
3416 : Oid *inputTypes, int numArguments)
3417 : {
3418 2738 : int numGroupingSets = Max(aggstate->maxsets, 1);
3419 2738 : Expr *serialfnexpr = NULL;
3420 2738 : Expr *deserialfnexpr = NULL;
3421 : ListCell *lc;
3422 : int numInputs;
3423 : int numDirectArgs;
3424 : List *sortlist;
3425 : int numSortCols;
3426 : int numDistinctCols;
3427 : int naggs;
3428 : int i;
3429 :
3430 : /* Begin filling in the pertrans data */
3431 2738 : pertrans->aggref = aggref;
3432 2738 : pertrans->aggCollation = aggref->inputcollid;
3433 2738 : pertrans->transfn_oid = aggtransfn;
3434 2738 : pertrans->serialfn_oid = aggserialfn;
3435 2738 : pertrans->deserialfn_oid = aggdeserialfn;
3436 2738 : pertrans->initValue = initValue;
3437 2738 : pertrans->initValueIsNull = initValueIsNull;
3438 :
3439 : /* Count the "direct" arguments, if any */
3440 2738 : numDirectArgs = list_length(aggref->aggdirectargs);
3441 :
3442 : /* Count the number of aggregated input columns */
3443 2738 : pertrans->numInputs = numInputs = list_length(aggref->args);
3444 :
3445 2738 : pertrans->aggtranstype = aggtranstype;
3446 :
3447 : /* Detect how many arguments to pass to the transfn */
3448 2738 : if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
3449 28 : pertrans->numTransInputs = numInputs;
3450 : else
3451 2710 : pertrans->numTransInputs = numArguments;
3452 :
3453 : /*
3454 : * When combining states, we have no use at all for the aggregate
3455 : * function's transfn. Instead we use the combinefn. In this case, the
3456 : * transfn and transfn_oid fields of pertrans refer to the combine
3457 : * function rather than the transition function.
3458 : */
3459 2738 : if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
3460 : {
3461 : Expr *combinefnexpr;
3462 :
3463 20 : build_aggregate_combinefn_expr(aggtranstype,
3464 : aggref->inputcollid,
3465 : aggtransfn,
3466 : &combinefnexpr);
3467 20 : fmgr_info(aggtransfn, &pertrans->transfn);
3468 20 : fmgr_info_set_expr((Node *) combinefnexpr, &pertrans->transfn);
3469 :
3470 20 : InitFunctionCallInfoData(pertrans->transfn_fcinfo,
3471 : &pertrans->transfn,
3472 : 2,
3473 : pertrans->aggCollation,
3474 : (void *) aggstate, NULL);
3475 :
3476 : /*
3477 : * Ensure that a combine function to combine INTERNAL states is not
3478 : * strict. This should have been checked during CREATE AGGREGATE, but
3479 : * the strict property could have been changed since then.
3480 : */
3481 20 : if (pertrans->transfn.fn_strict && aggtranstype == INTERNALOID)
3482 0 : ereport(ERROR,
3483 : (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
3484 : errmsg("combine function for aggregate %u must be declared as STRICT",
3485 : aggref->aggfnoid)));
3486 : }
3487 : else
3488 : {
3489 : Expr *transfnexpr;
3490 :
3491 : /*
3492 : * Set up infrastructure for calling the transfn. Note that invtrans
3493 : * is not needed here.
3494 : */
3495 5436 : build_aggregate_transfn_expr(inputTypes,
3496 : numArguments,
3497 : numDirectArgs,
3498 2718 : aggref->aggvariadic,
3499 : aggtranstype,
3500 : aggref->inputcollid,
3501 : aggtransfn,
3502 : InvalidOid,
3503 : &transfnexpr,
3504 : NULL);
3505 2718 : fmgr_info(aggtransfn, &pertrans->transfn);
3506 2718 : fmgr_info_set_expr((Node *) transfnexpr, &pertrans->transfn);
3507 :
3508 2718 : InitFunctionCallInfoData(pertrans->transfn_fcinfo,
3509 : &pertrans->transfn,
3510 : pertrans->numTransInputs + 1,
3511 : pertrans->aggCollation,
3512 : (void *) aggstate, NULL);
3513 :
3514 : /*
3515 : * If the transfn is strict and the initval is NULL, make sure input
3516 : * type and transtype are the same (or at least binary-compatible), so
3517 : * that it's OK to use the first aggregated input value as the initial
3518 : * transValue. This should have been checked at agg definition time,
3519 : * but we must check again in case the transfn's strictness property
3520 : * has been changed.
3521 : */
3522 2718 : if (pertrans->transfn.fn_strict && pertrans->initValueIsNull)
3523 : {
3524 418 : if (numArguments <= numDirectArgs ||
3525 209 : !IsBinaryCoercible(inputTypes[numDirectArgs],
3526 : aggtranstype))
3527 0 : ereport(ERROR,
3528 : (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
3529 : errmsg("aggregate %u needs to have compatible input type and transition type",
3530 : aggref->aggfnoid)));
3531 : }
3532 : }
3533 :
3534 : /* get info about the state value's datatype */
3535 2738 : get_typlenbyval(aggtranstype,
3536 : &pertrans->transtypeLen,
3537 : &pertrans->transtypeByVal);
3538 :
3539 2738 : if (OidIsValid(aggserialfn))
3540 : {
3541 0 : build_aggregate_serialfn_expr(aggserialfn,
3542 : &serialfnexpr);
3543 0 : fmgr_info(aggserialfn, &pertrans->serialfn);
3544 0 : fmgr_info_set_expr((Node *) serialfnexpr, &pertrans->serialfn);
3545 :
3546 0 : InitFunctionCallInfoData(pertrans->serialfn_fcinfo,
3547 : &pertrans->serialfn,
3548 : 1,
3549 : InvalidOid,
3550 : (void *) aggstate, NULL);
3551 : }
3552 :
3553 2738 : if (OidIsValid(aggdeserialfn))
3554 : {
3555 0 : build_aggregate_deserialfn_expr(aggdeserialfn,
3556 : &deserialfnexpr);
3557 0 : fmgr_info(aggdeserialfn, &pertrans->deserialfn);
3558 0 : fmgr_info_set_expr((Node *) deserialfnexpr, &pertrans->deserialfn);
3559 :
3560 0 : InitFunctionCallInfoData(pertrans->deserialfn_fcinfo,
3561 : &pertrans->deserialfn,
3562 : 2,
3563 : InvalidOid,
3564 : (void *) aggstate, NULL);
3565 :
3566 : }
3567 :
3568 : /* Initialize the input and FILTER expressions */
3569 2738 : naggs = aggstate->numaggs;
3570 2738 : pertrans->aggfilter = ExecInitExpr(aggref->aggfilter,
3571 : (PlanState *) aggstate);
3572 2738 : pertrans->aggdirectargs = ExecInitExprList(aggref->aggdirectargs,
3573 : (PlanState *) aggstate);
3574 :
3575 : /*
3576 : * Complain if the aggregate's arguments contain any aggregates; nested
3577 : * agg functions are semantically nonsensical. (This should have been
3578 : * caught earlier, but we defend against it here anyway.)
3579 : */
3580 2738 : if (naggs != aggstate->numaggs)
3581 0 : ereport(ERROR,
3582 : (errcode(ERRCODE_GROUPING_ERROR),
3583 : errmsg("aggregate function calls cannot be nested")));
3584 :
3585 : /*
3586 : * If we're doing either DISTINCT or ORDER BY for a plain agg, then we
3587 : * have a list of SortGroupClause nodes; fish out the data in them and
3588 : * stick them into arrays. We ignore ORDER BY for an ordered-set agg,
3589 : * however; the agg's transfn and finalfn are responsible for that.
3590 : *
3591 : * Note that by construction, if there is a DISTINCT clause then the ORDER
3592 : * BY clause is a prefix of it (see transformDistinctClause).
3593 : */
3594 2738 : if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
3595 : {
3596 28 : sortlist = NIL;
3597 28 : numSortCols = numDistinctCols = 0;
3598 : }
3599 2710 : else if (aggref->aggdistinct)
3600 : {
3601 36 : sortlist = aggref->aggdistinct;
3602 36 : numSortCols = numDistinctCols = list_length(sortlist);
3603 36 : Assert(numSortCols >= list_length(aggref->aggorder));
3604 : }
3605 : else
3606 : {
3607 2674 : sortlist = aggref->aggorder;
3608 2674 : numSortCols = list_length(sortlist);
3609 2674 : numDistinctCols = 0;
3610 : }
3611 :
3612 2738 : pertrans->numSortCols = numSortCols;
3613 2738 : pertrans->numDistinctCols = numDistinctCols;
3614 :
3615 2738 : if (numSortCols > 0)
3616 : {
3617 : /*
3618 : * Get a tupledesc and slot corresponding to the aggregated inputs
3619 : * (including sort expressions) of the agg.
3620 : */
3621 99 : pertrans->sortdesc = ExecTypeFromTL(aggref->args, false);
3622 99 : pertrans->sortslot = ExecInitExtraTupleSlot(estate);
3623 99 : ExecSetSlotDescriptor(pertrans->sortslot, pertrans->sortdesc);
3624 :
3625 : /*
3626 : * We don't implement DISTINCT or ORDER BY aggs in the HASHED case
3627 : * (yet)
3628 : */
3629 99 : Assert(aggstate->aggstrategy != AGG_HASHED && aggstate->aggstrategy != AGG_MIXED);
3630 :
3631 : /* If we have only one input, we need its len/byval info. */
3632 99 : if (numInputs == 1)
3633 : {
3634 30 : get_typlenbyval(inputTypes[numDirectArgs],
3635 : &pertrans->inputtypeLen,
3636 : &pertrans->inputtypeByVal);
3637 : }
3638 69 : else if (numDistinctCols > 0)
3639 : {
3640 : /* we will need an extra slot to store prior values */
3641 14 : pertrans->uniqslot = ExecInitExtraTupleSlot(estate);
3642 14 : ExecSetSlotDescriptor(pertrans->uniqslot,
3643 : pertrans->sortdesc);
3644 : }
3645 :
3646 : /* Extract the sort information for use later */
3647 99 : pertrans->sortColIdx =
3648 99 : (AttrNumber *) palloc(numSortCols * sizeof(AttrNumber));
3649 99 : pertrans->sortOperators =
3650 99 : (Oid *) palloc(numSortCols * sizeof(Oid));
3651 99 : pertrans->sortCollations =
3652 99 : (Oid *) palloc(numSortCols * sizeof(Oid));
3653 99 : pertrans->sortNullsFirst =
3654 99 : (bool *) palloc(numSortCols * sizeof(bool));
3655 :
3656 99 : i = 0;
3657 229 : foreach(lc, sortlist)
3658 : {
3659 130 : SortGroupClause *sortcl = (SortGroupClause *) lfirst(lc);
3660 130 : TargetEntry *tle = get_sortgroupclause_tle(sortcl, aggref->args);
3661 :
3662 : /* the parser should have made sure of this */
3663 130 : Assert(OidIsValid(sortcl->sortop));
3664 :
3665 130 : pertrans->sortColIdx[i] = tle->resno;
3666 130 : pertrans->sortOperators[i] = sortcl->sortop;
3667 130 : pertrans->sortCollations[i] = exprCollation((Node *) tle->expr);
3668 130 : pertrans->sortNullsFirst[i] = sortcl->nulls_first;
3669 130 : i++;
3670 : }
3671 99 : Assert(i == numSortCols);
3672 : }
3673 :
3674 2738 : if (aggref->aggdistinct)
3675 : {
3676 36 : Assert(numArguments > 0);
3677 :
3678 : /*
3679 : * We need the equal function for each DISTINCT comparison we will
3680 : * make.
3681 : */
3682 36 : pertrans->equalfns =
3683 36 : (FmgrInfo *) palloc(numDistinctCols * sizeof(FmgrInfo));
3684 :
3685 36 : i = 0;
3686 98 : foreach(lc, aggref->aggdistinct)
3687 : {
3688 62 : SortGroupClause *sortcl = (SortGroupClause *) lfirst(lc);
3689 :
3690 62 : fmgr_info(get_opcode(sortcl->eqop), &pertrans->equalfns[i]);
3691 62 : i++;
3692 : }
3693 36 : Assert(i == numDistinctCols);
3694 : }
3695 :
3696 2738 : pertrans->sortstates = (Tuplesortstate **)
3697 2738 : palloc0(sizeof(Tuplesortstate *) * numGroupingSets);
3698 2738 : }
3699 :
3700 :
3701 : static Datum
3702 980 : GetAggInitVal(Datum textInitVal, Oid transtype)
3703 : {
3704 : Oid typinput,
3705 : typioparam;
3706 : char *strInitVal;
3707 : Datum initVal;
3708 :
3709 980 : getTypeInputInfo(transtype, &typinput, &typioparam);
3710 980 : strInitVal = TextDatumGetCString(textInitVal);
3711 980 : initVal = OidInputFunctionCall(typinput, strInitVal,
3712 : typioparam, -1);
3713 980 : pfree(strInitVal);
3714 980 : return initVal;
3715 : }
3716 :
3717 : /*
3718 : * find_compatible_peragg - search for a previously initialized per-Agg struct
3719 : *
3720 : * Searches the previously looked at aggregates to find one which is compatible
3721 : * with this one, with the same input parameters. If no compatible aggregate
3722 : * can be found, returns -1.
3723 : *
3724 : * As a side-effect, this also collects a list of existing per-Trans structs
3725 : * with matching inputs. If no identical Aggref is found, the list is passed
3726 : * later to find_compatible_perstate, to see if we can at least reuse the
3727 : * state value of another aggregate.
3728 : */
3729 : static int
3730 2768 : find_compatible_peragg(Aggref *newagg, AggState *aggstate,
3731 : int lastaggno, List **same_input_transnos)
3732 : {
3733 : int aggno;
3734 : AggStatePerAgg peraggs;
3735 :
3736 2768 : *same_input_transnos = NIL;
3737 :
3738 : /* we mustn't reuse the aggref if it contains volatile function calls */
3739 2768 : if (contain_volatile_functions((Node *) newagg))
3740 1 : return -1;
3741 :
3742 2767 : peraggs = aggstate->peragg;
3743 :
3744 : /*
3745 : * Search through the list of already seen aggregates. If we find an
3746 : * existing aggregate with the same aggregate function and input
3747 : * parameters as an existing one, then we can re-use that one. While
3748 : * searching, we'll also collect a list of Aggrefs with the same input
3749 : * parameters. If no matching Aggref is found, the caller can potentially
3750 : * still re-use the transition state of one of them.
3751 : */
3752 3194 : for (aggno = 0; aggno <= lastaggno; aggno++)
3753 : {
3754 : AggStatePerAgg peragg;
3755 : Aggref *existingRef;
3756 :
3757 450 : peragg = &peraggs[aggno];
3758 450 : existingRef = peragg->aggref;
3759 :
3760 : /* all of the following must be the same or it's no match */
3761 853 : if (newagg->inputcollid != existingRef->inputcollid ||
3762 687 : newagg->aggtranstype != existingRef->aggtranstype ||
3763 521 : newagg->aggstar != existingRef->aggstar ||
3764 474 : newagg->aggvariadic != existingRef->aggvariadic ||
3765 467 : newagg->aggkind != existingRef->aggkind ||
3766 460 : !equal(newagg->aggdirectargs, existingRef->aggdirectargs) ||
3767 311 : !equal(newagg->args, existingRef->args) ||
3768 161 : !equal(newagg->aggorder, existingRef->aggorder) ||
3769 160 : !equal(newagg->aggdistinct, existingRef->aggdistinct) ||
3770 80 : !equal(newagg->aggfilter, existingRef->aggfilter))
3771 376 : continue;
3772 :
3773 : /* if it's the same aggregate function then report exact match */
3774 97 : if (newagg->aggfnoid == existingRef->aggfnoid &&
3775 46 : newagg->aggtype == existingRef->aggtype &&
3776 23 : newagg->aggcollid == existingRef->aggcollid)
3777 : {
3778 23 : list_free(*same_input_transnos);
3779 23 : *same_input_transnos = NIL;
3780 23 : return aggno;
3781 : }
3782 :
3783 : /*
3784 : * Not identical, but it had the same inputs. Return it to the caller,
3785 : * in case we can re-use its per-trans state.
3786 : */
3787 51 : *same_input_transnos = lappend_int(*same_input_transnos,
3788 : peragg->transno);
3789 : }
3790 :
3791 2744 : return -1;
3792 : }
3793 :
3794 : /*
3795 : * find_compatible_pertrans - search for a previously initialized per-Trans
3796 : * struct
3797 : *
3798 : * Searches the list of transnos for a per-Trans struct with the same
3799 : * transition state and initial condition. (The inputs have already been
3800 : * verified to match.)
3801 : */
3802 : static int
3803 2745 : find_compatible_pertrans(AggState *aggstate, Aggref *newagg,
3804 : Oid aggtransfn, Oid aggtranstype,
3805 : Oid aggserialfn, Oid aggdeserialfn,
3806 : Datum initValue, bool initValueIsNull,
3807 : List *transnos)
3808 : {
3809 : ListCell *lc;
3810 :
3811 2787 : foreach(lc, transnos)
3812 : {
3813 49 : int transno = lfirst_int(lc);
3814 49 : AggStatePerTrans pertrans = &aggstate->pertrans[transno];
3815 :
3816 : /*
3817 : * if the transfns or transition state types are not the same then the
3818 : * state can't be shared.
3819 : */
3820 57 : if (aggtransfn != pertrans->transfn_oid ||
3821 8 : aggtranstype != pertrans->aggtranstype)
3822 41 : continue;
3823 :
3824 : /*
3825 : * The serialization and deserialization functions must match, if
3826 : * present, as we're unable to share the trans state for aggregates
3827 : * which will serialize or deserialize into different formats.
3828 : * Remember that these will be InvalidOid if they're not required for
3829 : * this agg node.
3830 : */
3831 16 : if (aggserialfn != pertrans->serialfn_oid ||
3832 8 : aggdeserialfn != pertrans->deserialfn_oid)
3833 0 : continue;
3834 :
3835 : /* Check that the initial condition matches, too. */
3836 8 : if (initValueIsNull && pertrans->initValueIsNull)
3837 3 : return transno;
3838 :
3839 10 : if (!initValueIsNull && !pertrans->initValueIsNull &&
3840 10 : datumIsEqual(initValue, pertrans->initValue,
3841 10 : pertrans->transtypeByVal, pertrans->transtypeLen))
3842 : {
3843 4 : return transno;
3844 : }
3845 : }
3846 2738 : return -1;
3847 : }
3848 :
3849 : void
3850 2655 : ExecEndAgg(AggState *node)
3851 : {
3852 : PlanState *outerPlan;
3853 : int transno;
3854 2655 : int numGroupingSets = Max(node->maxsets, 1);
3855 : int setno;
3856 :
3857 : /* Make sure we have closed any open tuplesorts */
3858 :
3859 2655 : if (node->sort_in)
3860 16 : tuplesort_end(node->sort_in);
3861 2655 : if (node->sort_out)
3862 3 : tuplesort_end(node->sort_out);
3863 :
3864 5384 : for (transno = 0; transno < node->numtrans; transno++)
3865 : {
3866 2729 : AggStatePerTrans pertrans = &node->pertrans[transno];
3867 :
3868 5595 : for (setno = 0; setno < numGroupingSets; setno++)
3869 : {
3870 2866 : if (pertrans->sortstates[setno])
3871 0 : tuplesort_end(pertrans->sortstates[setno]);
3872 : }
3873 : }
3874 :
3875 : /* And ensure any agg shutdown callbacks have been called */
3876 5415 : for (setno = 0; setno < numGroupingSets; setno++)
3877 2760 : ReScanExprContext(node->aggcontexts[setno]);
3878 2655 : if (node->hashcontext)
3879 281 : ReScanExprContext(node->hashcontext);
3880 :
3881 : /*
3882 : * We don't actually free any ExprContexts here (see comment in
3883 : * ExecFreeExprContext), just unlinking the output one from the plan node
3884 : * suffices.
3885 : */
3886 2655 : ExecFreeExprContext(&node->ss.ps);
3887 :
3888 : /* clean up tuple table */
3889 2655 : ExecClearTuple(node->ss.ss_ScanTupleSlot);
3890 :
3891 2655 : outerPlan = outerPlanState(node);
3892 2655 : ExecEndNode(outerPlan);
3893 2655 : }
3894 :
3895 : void
3896 785 : ExecReScanAgg(AggState *node)
3897 : {
3898 785 : ExprContext *econtext = node->ss.ps.ps_ExprContext;
3899 785 : PlanState *outerPlan = outerPlanState(node);
3900 785 : Agg *aggnode = (Agg *) node->ss.ps.plan;
3901 : int transno;
3902 785 : int numGroupingSets = Max(node->maxsets, 1);
3903 : int setno;
3904 :
3905 785 : node->agg_done = false;
3906 :
3907 785 : if (node->aggstrategy == AGG_HASHED)
3908 : {
3909 : /*
3910 : * In the hashed case, if we haven't yet built the hash table then we
3911 : * can just return; nothing done yet, so nothing to undo. If subnode's
3912 : * chgParam is not NULL then it will be re-scanned by ExecProcNode,
3913 : * else no reason to re-scan it at all.
3914 : */
3915 197 : if (!node->table_filled)
3916 5 : return;
3917 :
3918 : /*
3919 : * If we do have the hash table, and the subplan does not have any
3920 : * parameter changes, and none of our own parameter changes affect
3921 : * input expressions of the aggregated functions, then we can just
3922 : * rescan the existing hash table; no need to build it again.
3923 : */
3924 273 : if (outerPlan->chgParam == NULL &&
3925 81 : !bms_overlap(node->ss.ps.chgParam, aggnode->aggParams))
3926 : {
3927 77 : ResetTupleHashIterator(node->perhash[0].hashtable,
3928 : &node->perhash[0].hashiter);
3929 77 : select_current_set(node, 0, true);
3930 77 : return;
3931 : }
3932 : }
3933 :
3934 : /* Make sure we have closed any open tuplesorts */
3935 1296 : for (transno = 0; transno < node->numtrans; transno++)
3936 : {
3937 1192 : for (setno = 0; setno < numGroupingSets; setno++)
3938 : {
3939 599 : AggStatePerTrans pertrans = &node->pertrans[transno];
3940 :
3941 599 : if (pertrans->sortstates[setno])
3942 : {
3943 0 : tuplesort_end(pertrans->sortstates[setno]);
3944 0 : pertrans->sortstates[setno] = NULL;
3945 : }
3946 : }
3947 : }
3948 :
3949 : /*
3950 : * We don't need to ReScanExprContext the output tuple context here;
3951 : * ExecReScan already did it. But we do need to reset our per-grouping-set
3952 : * contexts, which may have transvalues stored in them. (We use rescan
3953 : * rather than just reset because transfns may have registered callbacks
3954 : * that need to be run now.) For the AGG_HASHED case, see below.
3955 : */
3956 :
3957 1412 : for (setno = 0; setno < numGroupingSets; setno++)
3958 : {
3959 709 : ReScanExprContext(node->aggcontexts[setno]);
3960 : }
3961 :
3962 : /* Release first tuple of group, if we have made a copy */
3963 703 : if (node->grp_firstTuple != NULL)
3964 : {
3965 0 : heap_freetuple(node->grp_firstTuple);
3966 0 : node->grp_firstTuple = NULL;
3967 : }
3968 703 : ExecClearTuple(node->ss.ss_ScanTupleSlot);
3969 :
3970 : /* Forget current agg values */
3971 703 : MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numaggs);
3972 703 : MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numaggs);
3973 :
3974 : /*
3975 : * With AGG_HASHED/MIXED, the hash table is allocated in a sub-context of
3976 : * the hashcontext. This used to be an issue, but now, resetting a context
3977 : * automatically deletes sub-contexts too.
3978 : */
3979 703 : if (node->aggstrategy == AGG_HASHED || node->aggstrategy == AGG_MIXED)
3980 : {
3981 118 : ReScanExprContext(node->hashcontext);
3982 : /* Rebuild an empty hash table */
3983 118 : build_hash_table(node);
3984 118 : node->table_filled = false;
3985 : /* iterator will be reset when the table is filled */
3986 : }
3987 :
3988 703 : if (node->aggstrategy != AGG_HASHED)
3989 : {
3990 : /*
3991 : * Reset the per-group state (in particular, mark transvalues null)
3992 : */
3993 588 : MemSet(node->pergroup, 0,
3994 : sizeof(AggStatePerGroupData) * node->numaggs * numGroupingSets);
3995 :
3996 : /* reset to phase 1 */
3997 588 : initialize_phase(node, 1);
3998 :
3999 588 : node->input_done = false;
4000 588 : node->projected_set = -1;
4001 : }
4002 :
4003 703 : if (outerPlan->chgParam == NULL)
4004 23 : ExecReScan(outerPlan);
4005 : }
4006 :
4007 :
4008 : /***********************************************************************
4009 : * API exposed to aggregate functions
4010 : ***********************************************************************/
4011 :
4012 :
4013 : /*
4014 : * AggCheckCallContext - test if a SQL function is being called as an aggregate
4015 : *
4016 : * The transition and/or final functions of an aggregate may want to verify
4017 : * that they are being called as aggregates, rather than as plain SQL
4018 : * functions. They should use this function to do so. The return value
4019 : * is nonzero if being called as an aggregate, or zero if not. (Specific
4020 : * nonzero values are AGG_CONTEXT_AGGREGATE or AGG_CONTEXT_WINDOW, but more
4021 : * values could conceivably appear in future.)
4022 : *
4023 : * If aggcontext isn't NULL, the function also stores at *aggcontext the
4024 : * identity of the memory context that aggregate transition values are being
4025 : * stored in. Note that the same aggregate call site (flinfo) may be called
4026 : * interleaved on different transition values in different contexts, so it's
4027 : * not kosher to cache aggcontext under fn_extra. It is, however, kosher to
4028 : * cache it in the transvalue itself (for internal-type transvalues).
4029 : */
4030 : int
4031 9116178 : AggCheckCallContext(FunctionCallInfo fcinfo, MemoryContext *aggcontext)
4032 : {
4033 9116178 : if (fcinfo->context && IsA(fcinfo->context, AggState))
4034 : {
4035 9095368 : if (aggcontext)
4036 : {
4037 38823 : AggState *aggstate = ((AggState *) fcinfo->context);
4038 38823 : ExprContext *cxt = aggstate->curaggcontext;
4039 :
4040 38823 : *aggcontext = cxt->ecxt_per_tuple_memory;
4041 : }
4042 9095368 : return AGG_CONTEXT_AGGREGATE;
4043 : }
4044 20810 : if (fcinfo->context && IsA(fcinfo->context, WindowAggState))
4045 : {
4046 20583 : if (aggcontext)
4047 70 : *aggcontext = ((WindowAggState *) fcinfo->context)->curaggcontext;
4048 20583 : return AGG_CONTEXT_WINDOW;
4049 : }
4050 :
4051 : /* this is just to prevent "uninitialized variable" warnings */
4052 227 : if (aggcontext)
4053 227 : *aggcontext = NULL;
4054 227 : return 0;
4055 : }
4056 :
4057 : /*
4058 : * AggGetAggref - allow an aggregate support function to get its Aggref
4059 : *
4060 : * If the function is being called as an aggregate support function,
4061 : * return the Aggref node for the aggregate call. Otherwise, return NULL.
4062 : *
4063 : * Note that if an aggregate is being used as a window function, this will
4064 : * return NULL. We could provide a similar function to return the relevant
4065 : * WindowFunc node in such cases, but it's not needed yet.
4066 : */
4067 : Aggref *
4068 27 : AggGetAggref(FunctionCallInfo fcinfo)
4069 : {
4070 27 : if (fcinfo->context && IsA(fcinfo->context, AggState))
4071 : {
4072 : AggStatePerTrans curpertrans;
4073 :
4074 27 : curpertrans = ((AggState *) fcinfo->context)->curpertrans;
4075 :
4076 27 : if (curpertrans)
4077 27 : return curpertrans->aggref;
4078 : }
4079 0 : return NULL;
4080 : }
4081 :
4082 : /*
4083 : * AggGetTempMemoryContext - fetch short-term memory context for aggregates
4084 : *
4085 : * This is useful in agg final functions; the context returned is one that
4086 : * the final function can safely reset as desired. This isn't useful for
4087 : * transition functions, since the context returned MAY (we don't promise)
4088 : * be the same as the context those are called in.
4089 : *
4090 : * As above, this is currently not useful for aggs called as window functions.
4091 : */
4092 : MemoryContext
4093 1 : AggGetTempMemoryContext(FunctionCallInfo fcinfo)
4094 : {
4095 1 : if (fcinfo->context && IsA(fcinfo->context, AggState))
4096 : {
4097 1 : AggState *aggstate = (AggState *) fcinfo->context;
4098 :
4099 1 : return aggstate->tmpcontext->ecxt_per_tuple_memory;
4100 : }
4101 0 : return NULL;
4102 : }
4103 :
4104 : /*
4105 : * AggRegisterCallback - register a cleanup callback for an aggregate
4106 : *
4107 : * This is useful for aggs to register shutdown callbacks, which will ensure
4108 : * that non-memory resources are freed. The callback will occur just before
4109 : * the associated aggcontext (as returned by AggCheckCallContext) is reset,
4110 : * either between groups or as a result of rescanning the query. The callback
4111 : * will NOT be called on error paths. The typical use-case is for freeing of
4112 : * tuplestores or tuplesorts maintained in aggcontext, or pins held by slots
4113 : * created by the agg functions. (The callback will not be called until after
4114 : * the result of the finalfn is no longer needed, so it's safe for the finalfn
4115 : * to return data that will be freed by the callback.)
4116 : *
4117 : * As above, this is currently not useful for aggs called as window functions.
4118 : */
4119 : void
4120 88 : AggRegisterCallback(FunctionCallInfo fcinfo,
4121 : ExprContextCallbackFunction func,
4122 : Datum arg)
4123 : {
4124 88 : if (fcinfo->context && IsA(fcinfo->context, AggState))
4125 : {
4126 88 : AggState *aggstate = (AggState *) fcinfo->context;
4127 88 : ExprContext *cxt = aggstate->curaggcontext;
4128 :
4129 88 : RegisterExprContextCallback(cxt, func, arg);
4130 :
4131 88 : return;
4132 : }
4133 0 : elog(ERROR, "aggregate function cannot register a callback in this context");
4134 : }
4135 :
4136 :
4137 : /*
4138 : * aggregate_dummy - dummy execution routine for aggregate functions
4139 : *
4140 : * This function is listed as the implementation (prosrc field) of pg_proc
4141 : * entries for aggregate functions. Its only purpose is to throw an error
4142 : * if someone mistakenly executes such a function in the normal way.
4143 : *
4144 : * Perhaps someday we could assign real meaning to the prosrc field of
4145 : * an aggregate?
4146 : */
4147 : Datum
4148 0 : aggregate_dummy(PG_FUNCTION_ARGS)
4149 : {
4150 0 : elog(ERROR, "aggregate function %u called as normal function",
4151 : fcinfo->flinfo->fn_oid);
4152 : return (Datum) 0; /* keep compiler quiet */
4153 : }
|