Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeWindowAgg.c
4 : * routines to handle WindowAgg nodes.
5 : *
6 : * A WindowAgg node evaluates "window functions" across suitable partitions
7 : * of the input tuple set. Any one WindowAgg works for just a single window
8 : * specification, though it can evaluate multiple window functions sharing
9 : * identical window specifications. The input tuples are required to be
10 : * delivered in sorted order, with the PARTITION BY columns (if any) as
11 : * major sort keys and the ORDER BY columns (if any) as minor sort keys.
12 : * (The planner generates a stack of WindowAggs with intervening Sort nodes
13 : * as needed, if a query involves more than one window specification.)
14 : *
15 : * Since window functions can require access to any or all of the rows in
16 : * the current partition, we accumulate rows of the partition into a
17 : * tuplestore. The window functions are called using the WindowObject API
18 : * so that they can access those rows as needed.
19 : *
20 : * We also support using plain aggregate functions as window functions.
21 : * For these, the regular Agg-node environment is emulated for each partition.
22 : * As required by the SQL spec, the output represents the value of the
23 : * aggregate function over all rows in the current row's window frame.
24 : *
25 : *
26 : * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
27 : * Portions Copyright (c) 1994, Regents of the University of California
28 : *
29 : * IDENTIFICATION
30 : * src/backend/executor/nodeWindowAgg.c
31 : *
32 : *-------------------------------------------------------------------------
33 : */
34 : #include "postgres.h"
35 :
36 : #include "access/htup_details.h"
37 : #include "catalog/objectaccess.h"
38 : #include "catalog/pg_aggregate.h"
39 : #include "catalog/pg_proc.h"
40 : #include "executor/executor.h"
41 : #include "executor/nodeWindowAgg.h"
42 : #include "miscadmin.h"
43 : #include "nodes/nodeFuncs.h"
44 : #include "optimizer/clauses.h"
45 : #include "parser/parse_agg.h"
46 : #include "parser/parse_coerce.h"
47 : #include "utils/acl.h"
48 : #include "utils/builtins.h"
49 : #include "utils/datum.h"
50 : #include "utils/lsyscache.h"
51 : #include "utils/memutils.h"
52 : #include "utils/syscache.h"
53 : #include "windowapi.h"
54 :
55 : /*
56 : * All the window function APIs are called with this object, which is passed
57 : * to window functions as fcinfo->context.
58 : */
59 : typedef struct WindowObjectData
60 : {
61 : NodeTag type;
62 : WindowAggState *winstate; /* parent WindowAggState */
63 : List *argstates; /* ExprState trees for fn's arguments */
64 : void *localmem; /* WinGetPartitionLocalMemory's chunk */
65 : int markptr; /* tuplestore mark pointer for this fn */
66 : int readptr; /* tuplestore read pointer for this fn */
67 : int64 markpos; /* row that markptr is positioned on */
68 : int64 seekpos; /* row that readptr is positioned on */
69 : } WindowObjectData;
70 :
71 : /*
72 : * We have one WindowStatePerFunc struct for each window function and
73 : * window aggregate handled by this node.
74 : */
75 : typedef struct WindowStatePerFuncData
76 : {
77 : /* Links to WindowFunc expr and state nodes this working state is for */
78 : WindowFuncExprState *wfuncstate;
79 : WindowFunc *wfunc;
80 :
81 : int numArguments; /* number of arguments */
82 :
83 : FmgrInfo flinfo; /* fmgr lookup data for window function */
84 :
85 : Oid winCollation; /* collation derived for window function */
86 :
87 : /*
88 : * We need the len and byval info for the result of each function in order
89 : * to know how to copy/delete values.
90 : */
91 : int16 resulttypeLen;
92 : bool resulttypeByVal;
93 :
94 : bool plain_agg; /* is it just a plain aggregate function? */
95 : int aggno; /* if so, index of its PerAggData */
96 :
97 : WindowObject winobj; /* object used in window function API */
98 : } WindowStatePerFuncData;
99 :
100 : /*
101 : * For plain aggregate window functions, we also have one of these.
102 : */
103 : typedef struct WindowStatePerAggData
104 : {
105 : /* Oids of transition functions */
106 : Oid transfn_oid;
107 : Oid invtransfn_oid; /* may be InvalidOid */
108 : Oid finalfn_oid; /* may be InvalidOid */
109 :
110 : /*
111 : * fmgr lookup data for transition functions --- only valid when
112 : * corresponding oid is not InvalidOid. Note in particular that fn_strict
113 : * flags are kept here.
114 : */
115 : FmgrInfo transfn;
116 : FmgrInfo invtransfn;
117 : FmgrInfo finalfn;
118 :
119 : int numFinalArgs; /* number of arguments to pass to finalfn */
120 :
121 : /*
122 : * initial value from pg_aggregate entry
123 : */
124 : Datum initValue;
125 : bool initValueIsNull;
126 :
127 : /*
128 : * cached value for current frame boundaries
129 : */
130 : Datum resultValue;
131 : bool resultValueIsNull;
132 :
133 : /*
134 : * We need the len and byval info for the agg's input, result, and
135 : * transition data types in order to know how to copy/delete values.
136 : */
137 : int16 inputtypeLen,
138 : resulttypeLen,
139 : transtypeLen;
140 : bool inputtypeByVal,
141 : resulttypeByVal,
142 : transtypeByVal;
143 :
144 : int wfuncno; /* index of associated PerFuncData */
145 :
146 : /* Context holding transition value and possibly other subsidiary data */
147 : MemoryContext aggcontext; /* may be private, or winstate->aggcontext */
148 :
149 : /* Current transition value */
150 : Datum transValue; /* current transition value */
151 : bool transValueIsNull;
152 :
153 : int64 transValueCount; /* number of currently-aggregated rows */
154 :
155 : /* Data local to eval_windowaggregates() */
156 : bool restart; /* need to restart this agg in this cycle? */
157 : } WindowStatePerAggData;
158 :
159 : static void initialize_windowaggregate(WindowAggState *winstate,
160 : WindowStatePerFunc perfuncstate,
161 : WindowStatePerAgg peraggstate);
162 : static void advance_windowaggregate(WindowAggState *winstate,
163 : WindowStatePerFunc perfuncstate,
164 : WindowStatePerAgg peraggstate);
165 : static bool advance_windowaggregate_base(WindowAggState *winstate,
166 : WindowStatePerFunc perfuncstate,
167 : WindowStatePerAgg peraggstate);
168 : static void finalize_windowaggregate(WindowAggState *winstate,
169 : WindowStatePerFunc perfuncstate,
170 : WindowStatePerAgg peraggstate,
171 : Datum *result, bool *isnull);
172 :
173 : static void eval_windowaggregates(WindowAggState *winstate);
174 : static void eval_windowfunction(WindowAggState *winstate,
175 : WindowStatePerFunc perfuncstate,
176 : Datum *result, bool *isnull);
177 :
178 : static void begin_partition(WindowAggState *winstate);
179 : static void spool_tuples(WindowAggState *winstate, int64 pos);
180 : static void release_partition(WindowAggState *winstate);
181 :
182 : static bool row_is_in_frame(WindowAggState *winstate, int64 pos,
183 : TupleTableSlot *slot);
184 : static void update_frameheadpos(WindowObject winobj, TupleTableSlot *slot);
185 : static void update_frametailpos(WindowObject winobj, TupleTableSlot *slot);
186 :
187 : static WindowStatePerAggData *initialize_peragg(WindowAggState *winstate,
188 : WindowFunc *wfunc,
189 : WindowStatePerAgg peraggstate);
190 : static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
191 :
192 : static bool are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
193 : TupleTableSlot *slot2);
194 : static bool window_gettupleslot(WindowObject winobj, int64 pos,
195 : TupleTableSlot *slot);
196 :
197 :
198 : /*
199 : * initialize_windowaggregate
200 : * parallel to initialize_aggregates in nodeAgg.c
201 : */
202 : static void
203 250 : initialize_windowaggregate(WindowAggState *winstate,
204 : WindowStatePerFunc perfuncstate,
205 : WindowStatePerAgg peraggstate)
206 : {
207 : MemoryContext oldContext;
208 :
209 : /*
210 : * If we're using a private aggcontext, we may reset it here. But if the
211 : * context is shared, we don't know which other aggregates may still need
212 : * it, so we must leave it to the caller to reset at an appropriate time.
213 : */
214 250 : if (peraggstate->aggcontext != winstate->aggcontext)
215 228 : MemoryContextResetAndDeleteChildren(peraggstate->aggcontext);
216 :
217 250 : if (peraggstate->initValueIsNull)
218 139 : peraggstate->transValue = peraggstate->initValue;
219 : else
220 : {
221 111 : oldContext = MemoryContextSwitchTo(peraggstate->aggcontext);
222 222 : peraggstate->transValue = datumCopy(peraggstate->initValue,
223 111 : peraggstate->transtypeByVal,
224 111 : peraggstate->transtypeLen);
225 111 : MemoryContextSwitchTo(oldContext);
226 : }
227 250 : peraggstate->transValueIsNull = peraggstate->initValueIsNull;
228 250 : peraggstate->transValueCount = 0;
229 250 : peraggstate->resultValue = (Datum) 0;
230 250 : peraggstate->resultValueIsNull = true;
231 250 : }
232 :
233 : /*
234 : * advance_windowaggregate
235 : * parallel to advance_aggregates in nodeAgg.c
236 : */
237 : static void
238 23018 : advance_windowaggregate(WindowAggState *winstate,
239 : WindowStatePerFunc perfuncstate,
240 : WindowStatePerAgg peraggstate)
241 : {
242 23018 : WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
243 23018 : int numArguments = perfuncstate->numArguments;
244 : FunctionCallInfoData fcinfodata;
245 23018 : FunctionCallInfo fcinfo = &fcinfodata;
246 : Datum newVal;
247 : ListCell *arg;
248 : int i;
249 : MemoryContext oldContext;
250 23018 : ExprContext *econtext = winstate->tmpcontext;
251 23018 : ExprState *filter = wfuncstate->aggfilter;
252 :
253 23018 : oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
254 :
255 : /* Skip anything FILTERed out */
256 23018 : if (filter)
257 : {
258 : bool isnull;
259 46 : Datum res = ExecEvalExpr(filter, econtext, &isnull);
260 :
261 46 : if (isnull || !DatumGetBool(res))
262 : {
263 21 : MemoryContextSwitchTo(oldContext);
264 21 : return;
265 : }
266 : }
267 :
268 : /* We start from 1, since the 0th arg will be the transition value */
269 22997 : i = 1;
270 35941 : foreach(arg, wfuncstate->args)
271 : {
272 12944 : ExprState *argstate = (ExprState *) lfirst(arg);
273 :
274 12944 : fcinfo->arg[i] = ExecEvalExpr(argstate, econtext,
275 : &fcinfo->argnull[i]);
276 12944 : i++;
277 : }
278 :
279 22997 : if (peraggstate->transfn.fn_strict)
280 : {
281 : /*
282 : * For a strict transfn, nothing happens when there's a NULL input; we
283 : * just keep the prior transValue. Note transValueCount doesn't
284 : * change either.
285 : */
286 35134 : for (i = 1; i <= numArguments; i++)
287 : {
288 12559 : if (fcinfo->argnull[i])
289 : {
290 37 : MemoryContextSwitchTo(oldContext);
291 37 : return;
292 : }
293 : }
294 :
295 : /*
296 : * For strict transition functions with initial value NULL we use the
297 : * first non-NULL input as the initial state. (We already checked
298 : * that the agg's input type is binary-compatible with its transtype,
299 : * so straight copy here is OK.)
300 : *
301 : * We must copy the datum into aggcontext if it is pass-by-ref. We do
302 : * not need to pfree the old transValue, since it's NULL.
303 : */
304 22575 : if (peraggstate->transValueCount == 0 && peraggstate->transValueIsNull)
305 : {
306 75 : MemoryContextSwitchTo(peraggstate->aggcontext);
307 150 : peraggstate->transValue = datumCopy(fcinfo->arg[1],
308 75 : peraggstate->transtypeByVal,
309 75 : peraggstate->transtypeLen);
310 75 : peraggstate->transValueIsNull = false;
311 75 : peraggstate->transValueCount = 1;
312 75 : MemoryContextSwitchTo(oldContext);
313 75 : return;
314 : }
315 :
316 22500 : if (peraggstate->transValueIsNull)
317 : {
318 : /*
319 : * Don't call a strict function with NULL inputs. Note it is
320 : * possible to get here despite the above tests, if the transfn is
321 : * strict *and* returned a NULL on a prior cycle. If that happens
322 : * we will propagate the NULL all the way to the end. That can
323 : * only happen if there's no inverse transition function, though,
324 : * since we disallow transitions back to NULL when there is one.
325 : */
326 0 : MemoryContextSwitchTo(oldContext);
327 0 : Assert(!OidIsValid(peraggstate->invtransfn_oid));
328 0 : return;
329 : }
330 : }
331 :
332 : /*
333 : * OK to call the transition function. Set winstate->curaggcontext while
334 : * calling it, for possible use by AggCheckCallContext.
335 : */
336 22885 : InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn),
337 : numArguments + 1,
338 : perfuncstate->winCollation,
339 : (void *) winstate, NULL);
340 22885 : fcinfo->arg[0] = peraggstate->transValue;
341 22885 : fcinfo->argnull[0] = peraggstate->transValueIsNull;
342 22885 : winstate->curaggcontext = peraggstate->aggcontext;
343 22885 : newVal = FunctionCallInvoke(fcinfo);
344 22885 : winstate->curaggcontext = NULL;
345 :
346 : /*
347 : * Moving-aggregate transition functions must not return null, see
348 : * advance_windowaggregate_base().
349 : */
350 22885 : if (fcinfo->isnull && OidIsValid(peraggstate->invtransfn_oid))
351 0 : ereport(ERROR,
352 : (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
353 : errmsg("moving-aggregate transition function must not return null")));
354 :
355 : /*
356 : * We must track the number of rows included in transValue, since to
357 : * remove the last input, advance_windowaggregate_base() mustn't call the
358 : * inverse transition function, but simply reset transValue back to its
359 : * initial value.
360 : */
361 22885 : peraggstate->transValueCount++;
362 :
363 : /*
364 : * If pass-by-ref datatype, must copy the new value into aggcontext and
365 : * free the prior transValue. But if transfn returned a pointer to its
366 : * first input, we don't need to do anything. Also, if transfn returned a
367 : * pointer to a R/W expanded object that is already a child of the
368 : * aggcontext, assume we can adopt that value without copying it.
369 : */
370 43499 : if (!peraggstate->transtypeByVal &&
371 20614 : DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
372 : {
373 157 : if (!fcinfo->isnull)
374 : {
375 157 : MemoryContextSwitchTo(peraggstate->aggcontext);
376 159 : if (DatumIsReadWriteExpandedObject(newVal,
377 : false,
378 149 : peraggstate->transtypeLen) &&
379 2 : MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
380 : /* do nothing */ ;
381 : else
382 310 : newVal = datumCopy(newVal,
383 155 : peraggstate->transtypeByVal,
384 155 : peraggstate->transtypeLen);
385 : }
386 157 : if (!peraggstate->transValueIsNull)
387 : {
388 147 : if (DatumIsReadWriteExpandedObject(peraggstate->transValue,
389 : false,
390 : peraggstate->transtypeLen))
391 0 : DeleteExpandedObject(peraggstate->transValue);
392 : else
393 147 : pfree(DatumGetPointer(peraggstate->transValue));
394 : }
395 : }
396 :
397 22885 : MemoryContextSwitchTo(oldContext);
398 22885 : peraggstate->transValue = newVal;
399 22885 : peraggstate->transValueIsNull = fcinfo->isnull;
400 : }
401 :
402 : /*
403 : * advance_windowaggregate_base
404 : * Remove the oldest tuple from an aggregation.
405 : *
406 : * This is very much like advance_windowaggregate, except that we will call
407 : * the inverse transition function (which caller must have checked is
408 : * available).
409 : *
410 : * Returns true if we successfully removed the current row from this
411 : * aggregate, false if not (in the latter case, caller is responsible
412 : * for cleaning up by restarting the aggregation).
413 : */
414 : static bool
415 559 : advance_windowaggregate_base(WindowAggState *winstate,
416 : WindowStatePerFunc perfuncstate,
417 : WindowStatePerAgg peraggstate)
418 : {
419 559 : WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
420 559 : int numArguments = perfuncstate->numArguments;
421 : FunctionCallInfoData fcinfodata;
422 559 : FunctionCallInfo fcinfo = &fcinfodata;
423 : Datum newVal;
424 : ListCell *arg;
425 : int i;
426 : MemoryContext oldContext;
427 559 : ExprContext *econtext = winstate->tmpcontext;
428 559 : ExprState *filter = wfuncstate->aggfilter;
429 :
430 559 : oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
431 :
432 : /* Skip anything FILTERed out */
433 559 : if (filter)
434 : {
435 : bool isnull;
436 17 : Datum res = ExecEvalExpr(filter, econtext, &isnull);
437 :
438 17 : if (isnull || !DatumGetBool(res))
439 : {
440 8 : MemoryContextSwitchTo(oldContext);
441 8 : return true;
442 : }
443 : }
444 :
445 : /* We start from 1, since the 0th arg will be the transition value */
446 551 : i = 1;
447 1099 : foreach(arg, wfuncstate->args)
448 : {
449 548 : ExprState *argstate = (ExprState *) lfirst(arg);
450 :
451 548 : fcinfo->arg[i] = ExecEvalExpr(argstate, econtext,
452 : &fcinfo->argnull[i]);
453 548 : i++;
454 : }
455 :
456 551 : if (peraggstate->invtransfn.fn_strict)
457 : {
458 : /*
459 : * For a strict (inv)transfn, nothing happens when there's a NULL
460 : * input; we just keep the prior transValue. Note transValueCount
461 : * doesn't change either.
462 : */
463 596 : for (i = 1; i <= numArguments; i++)
464 : {
465 304 : if (fcinfo->argnull[i])
466 : {
467 15 : MemoryContextSwitchTo(oldContext);
468 15 : return true;
469 : }
470 : }
471 : }
472 :
473 : /* There should still be an added but not yet removed value */
474 536 : Assert(peraggstate->transValueCount > 0);
475 :
476 : /*
477 : * In moving-aggregate mode, the state must never be NULL, except possibly
478 : * before any rows have been aggregated (which is surely not the case at
479 : * this point). This restriction allows us to interpret a NULL result
480 : * from the inverse function as meaning "sorry, can't do an inverse
481 : * transition in this case". We already checked this in
482 : * advance_windowaggregate, but just for safety, check again.
483 : */
484 536 : if (peraggstate->transValueIsNull)
485 0 : elog(ERROR, "aggregate transition value is NULL before inverse transition");
486 :
487 : /*
488 : * We mustn't use the inverse transition function to remove the last
489 : * input. Doing so would yield a non-NULL state, whereas we should be in
490 : * the initial state afterwards which may very well be NULL. So instead,
491 : * we simply re-initialize the aggregate in this case.
492 : */
493 536 : if (peraggstate->transValueCount == 1)
494 : {
495 17 : MemoryContextSwitchTo(oldContext);
496 34 : initialize_windowaggregate(winstate,
497 34 : &winstate->perfunc[peraggstate->wfuncno],
498 : peraggstate);
499 17 : return true;
500 : }
501 :
502 : /*
503 : * OK to call the inverse transition function. Set
504 : * winstate->curaggcontext while calling it, for possible use by
505 : * AggCheckCallContext.
506 : */
507 519 : InitFunctionCallInfoData(*fcinfo, &(peraggstate->invtransfn),
508 : numArguments + 1,
509 : perfuncstate->winCollation,
510 : (void *) winstate, NULL);
511 519 : fcinfo->arg[0] = peraggstate->transValue;
512 519 : fcinfo->argnull[0] = peraggstate->transValueIsNull;
513 519 : winstate->curaggcontext = peraggstate->aggcontext;
514 519 : newVal = FunctionCallInvoke(fcinfo);
515 519 : winstate->curaggcontext = NULL;
516 :
517 : /*
518 : * If the function returns NULL, report failure, forcing a restart.
519 : */
520 519 : if (fcinfo->isnull)
521 : {
522 40 : MemoryContextSwitchTo(oldContext);
523 40 : return false;
524 : }
525 :
526 : /* Update number of rows included in transValue */
527 479 : peraggstate->transValueCount--;
528 :
529 : /*
530 : * If pass-by-ref datatype, must copy the new value into aggcontext and
531 : * free the prior transValue. But if invtransfn returned a pointer to its
532 : * first input, we don't need to do anything. Also, if invtransfn
533 : * returned a pointer to a R/W expanded object that is already a child of
534 : * the aggcontext, assume we can adopt that value without copying it.
535 : *
536 : * Note: the checks for null values here will never fire, but it seems
537 : * best to have this stanza look just like advance_windowaggregate.
538 : */
539 667 : if (!peraggstate->transtypeByVal &&
540 188 : DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
541 : {
542 114 : if (!fcinfo->isnull)
543 : {
544 114 : MemoryContextSwitchTo(peraggstate->aggcontext);
545 114 : if (DatumIsReadWriteExpandedObject(newVal,
546 : false,
547 112 : peraggstate->transtypeLen) &&
548 0 : MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
549 : /* do nothing */ ;
550 : else
551 228 : newVal = datumCopy(newVal,
552 114 : peraggstate->transtypeByVal,
553 114 : peraggstate->transtypeLen);
554 : }
555 114 : if (!peraggstate->transValueIsNull)
556 : {
557 114 : if (DatumIsReadWriteExpandedObject(peraggstate->transValue,
558 : false,
559 : peraggstate->transtypeLen))
560 0 : DeleteExpandedObject(peraggstate->transValue);
561 : else
562 114 : pfree(DatumGetPointer(peraggstate->transValue));
563 : }
564 : }
565 :
566 479 : MemoryContextSwitchTo(oldContext);
567 479 : peraggstate->transValue = newVal;
568 479 : peraggstate->transValueIsNull = fcinfo->isnull;
569 :
570 479 : return true;
571 : }
572 :
573 : /*
574 : * finalize_windowaggregate
575 : * parallel to finalize_aggregate in nodeAgg.c
576 : */
577 : static void
578 1059 : finalize_windowaggregate(WindowAggState *winstate,
579 : WindowStatePerFunc perfuncstate,
580 : WindowStatePerAgg peraggstate,
581 : Datum *result, bool *isnull)
582 : {
583 : MemoryContext oldContext;
584 :
585 1059 : oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
586 :
587 : /*
588 : * Apply the agg's finalfn if one is provided, else return transValue.
589 : */
590 1059 : if (OidIsValid(peraggstate->finalfn_oid))
591 : {
592 590 : int numFinalArgs = peraggstate->numFinalArgs;
593 : FunctionCallInfoData fcinfo;
594 : bool anynull;
595 : int i;
596 :
597 590 : InitFunctionCallInfoData(fcinfo, &(peraggstate->finalfn),
598 : numFinalArgs,
599 : perfuncstate->winCollation,
600 : (void *) winstate, NULL);
601 590 : fcinfo.arg[0] = MakeExpandedObjectReadOnly(peraggstate->transValue,
602 : peraggstate->transValueIsNull,
603 : peraggstate->transtypeLen);
604 590 : fcinfo.argnull[0] = peraggstate->transValueIsNull;
605 590 : anynull = peraggstate->transValueIsNull;
606 :
607 : /* Fill any remaining argument positions with nulls */
608 590 : for (i = 1; i < numFinalArgs; i++)
609 : {
610 0 : fcinfo.arg[i] = (Datum) 0;
611 0 : fcinfo.argnull[i] = true;
612 0 : anynull = true;
613 : }
614 :
615 590 : if (fcinfo.flinfo->fn_strict && anynull)
616 : {
617 : /* don't call a strict function with NULL inputs */
618 0 : *result = (Datum) 0;
619 0 : *isnull = true;
620 : }
621 : else
622 : {
623 590 : winstate->curaggcontext = peraggstate->aggcontext;
624 590 : *result = FunctionCallInvoke(&fcinfo);
625 590 : winstate->curaggcontext = NULL;
626 590 : *isnull = fcinfo.isnull;
627 : }
628 : }
629 : else
630 : {
631 : /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
632 469 : *result = peraggstate->transValue;
633 469 : *isnull = peraggstate->transValueIsNull;
634 : }
635 :
636 : /*
637 : * If result is pass-by-ref, make sure it is in the right context.
638 : */
639 1829 : if (!peraggstate->resulttypeByVal && !*isnull &&
640 770 : !MemoryContextContains(CurrentMemoryContext,
641 770 : DatumGetPointer(*result)))
642 1110 : *result = datumCopy(*result,
643 555 : peraggstate->resulttypeByVal,
644 555 : peraggstate->resulttypeLen);
645 1059 : MemoryContextSwitchTo(oldContext);
646 1059 : }
647 :
648 : /*
649 : * eval_windowaggregates
650 : * evaluate plain aggregates being used as window functions
651 : *
652 : * This differs from nodeAgg.c in two ways. First, if the window's frame
653 : * start position moves, we use the inverse transition function (if it exists)
654 : * to remove rows from the transition value. And second, we expect to be
655 : * able to call aggregate final functions repeatedly after aggregating more
656 : * data onto the same transition value. This is not a behavior required by
657 : * nodeAgg.c.
658 : */
659 : static void
660 20901 : eval_windowaggregates(WindowAggState *winstate)
661 : {
662 : WindowStatePerAgg peraggstate;
663 : int wfuncno,
664 : numaggs,
665 : numaggs_restart,
666 : i;
667 : int64 aggregatedupto_nonrestarted;
668 : MemoryContext oldContext;
669 : ExprContext *econtext;
670 : WindowObject agg_winobj;
671 : TupleTableSlot *agg_row_slot;
672 : TupleTableSlot *temp_slot;
673 :
674 20901 : numaggs = winstate->numaggs;
675 20901 : if (numaggs == 0)
676 0 : return; /* nothing to do */
677 :
678 : /* final output execution is in ps_ExprContext */
679 20901 : econtext = winstate->ss.ps.ps_ExprContext;
680 20901 : agg_winobj = winstate->agg_winobj;
681 20901 : agg_row_slot = winstate->agg_row_slot;
682 20901 : temp_slot = winstate->temp_slot_1;
683 :
684 : /*
685 : * Currently, we support only a subset of the SQL-standard window framing
686 : * rules.
687 : *
688 : * If the frame start is UNBOUNDED_PRECEDING, the window frame consists of
689 : * a contiguous group of rows extending forward from the start of the
690 : * partition, and rows only enter the frame, never exit it, as the current
691 : * row advances forward. This makes it possible to use an incremental
692 : * strategy for evaluating aggregates: we run the transition function for
693 : * each row added to the frame, and run the final function whenever we
694 : * need the current aggregate value. This is considerably more efficient
695 : * than the naive approach of re-running the entire aggregate calculation
696 : * for each current row. It does assume that the final function doesn't
697 : * damage the running transition value, but we have the same assumption in
698 : * nodeAgg.c too (when it rescans an existing hash table).
699 : *
700 : * If the frame start does sometimes move, we can still optimize as above
701 : * whenever successive rows share the same frame head, but if the frame
702 : * head moves beyond the previous head we try to remove those rows using
703 : * the aggregate's inverse transition function. This function restores
704 : * the aggregate's current state to what it would be if the removed row
705 : * had never been aggregated in the first place. Inverse transition
706 : * functions may optionally return NULL, indicating that the function was
707 : * unable to remove the tuple from aggregation. If this happens, or if
708 : * the aggregate doesn't have an inverse transition function at all, we
709 : * must perform the aggregation all over again for all tuples within the
710 : * new frame boundaries.
711 : *
712 : * In many common cases, multiple rows share the same frame and hence the
713 : * same aggregate value. (In particular, if there's no ORDER BY in a RANGE
714 : * window, then all rows are peers and so they all have window frame equal
715 : * to the whole partition.) We optimize such cases by calculating the
716 : * aggregate value once when we reach the first row of a peer group, and
717 : * then returning the saved value for all subsequent rows.
718 : *
719 : * 'aggregatedupto' keeps track of the first row that has not yet been
720 : * accumulated into the aggregate transition values. Whenever we start a
721 : * new peer group, we accumulate forward to the end of the peer group.
722 : */
723 :
724 : /*
725 : * First, update the frame head position.
726 : *
727 : * The frame head should never move backwards, and the code below wouldn't
728 : * cope if it did, so for safety we complain if it does.
729 : */
730 20901 : update_frameheadpos(agg_winobj, temp_slot);
731 20901 : if (winstate->frameheadpos < winstate->aggregatedbase)
732 0 : elog(ERROR, "window frame head moved backward");
733 :
734 : /*
735 : * If the frame didn't change compared to the previous row, we can re-use
736 : * the result values that were previously saved at the bottom of this
737 : * function. Since we don't know the current frame's end yet, this is not
738 : * possible to check for fully. But if the frame end mode is UNBOUNDED
739 : * FOLLOWING or CURRENT ROW, and the current row lies within the previous
740 : * row's frame, then the two frames' ends must coincide. Note that on the
741 : * first row aggregatedbase == aggregatedupto, meaning this test must
742 : * fail, so we don't need to check the "there was no previous row" case
743 : * explicitly here.
744 : */
745 41471 : if (winstate->aggregatedbase == winstate->frameheadpos &&
746 20570 : (winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING |
747 20547 : FRAMEOPTION_END_CURRENT_ROW)) &&
748 41094 : winstate->aggregatedbase <= winstate->currentpos &&
749 20547 : winstate->aggregatedupto > winstate->currentpos)
750 : {
751 40232 : for (i = 0; i < numaggs; i++)
752 : {
753 20117 : peraggstate = &winstate->peragg[i];
754 20117 : wfuncno = peraggstate->wfuncno;
755 20117 : econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue;
756 20117 : econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull;
757 : }
758 20115 : return;
759 : }
760 :
761 : /*----------
762 : * Initialize restart flags.
763 : *
764 : * We restart the aggregation:
765 : * - if we're processing the first row in the partition, or
766 : * - if the frame's head moved and we cannot use an inverse
767 : * transition function, or
768 : * - if the new frame doesn't overlap the old one
769 : *
770 : * Note that we don't strictly need to restart in the last case, but if
771 : * we're going to remove all rows from the aggregation anyway, a restart
772 : * surely is faster.
773 : *----------
774 : */
775 786 : numaggs_restart = 0;
776 1845 : for (i = 0; i < numaggs; i++)
777 : {
778 1059 : peraggstate = &winstate->peragg[i];
779 1935 : if (winstate->currentpos == 0 ||
780 1434 : (winstate->aggregatedbase != winstate->frameheadpos &&
781 1431 : !OidIsValid(peraggstate->invtransfn_oid)) ||
782 873 : winstate->aggregatedupto <= winstate->frameheadpos)
783 : {
784 193 : peraggstate->restart = true;
785 193 : numaggs_restart++;
786 : }
787 : else
788 866 : peraggstate->restart = false;
789 : }
790 :
791 : /*
792 : * If we have any possibly-moving aggregates, attempt to advance
793 : * aggregatedbase to match the frame's head by removing input rows that
794 : * fell off the top of the frame from the aggregations. This can fail,
795 : * i.e. advance_windowaggregate_base() can return false, in which case
796 : * we'll restart that aggregate below.
797 : */
798 2859 : while (numaggs_restart < numaggs &&
799 954 : winstate->aggregatedbase < winstate->frameheadpos)
800 : {
801 : /*
802 : * Fetch the next tuple of those being removed. This should never fail
803 : * as we should have been here before.
804 : */
805 333 : if (!window_gettupleslot(agg_winobj, winstate->aggregatedbase,
806 : temp_slot))
807 0 : elog(ERROR, "could not re-fetch previously fetched frame row");
808 :
809 : /* Set tuple context for evaluation of aggregate arguments */
810 333 : winstate->tmpcontext->ecxt_outertuple = temp_slot;
811 :
812 : /*
813 : * Perform the inverse transition for each aggregate function in the
814 : * window, unless it has already been marked as needing a restart.
815 : */
816 894 : for (i = 0; i < numaggs; i++)
817 : {
818 : bool ok;
819 :
820 561 : peraggstate = &winstate->peragg[i];
821 561 : if (peraggstate->restart)
822 2 : continue;
823 :
824 559 : wfuncno = peraggstate->wfuncno;
825 1118 : ok = advance_windowaggregate_base(winstate,
826 1118 : &winstate->perfunc[wfuncno],
827 : peraggstate);
828 559 : if (!ok)
829 : {
830 : /* Inverse transition function has failed, must restart */
831 40 : peraggstate->restart = true;
832 40 : numaggs_restart++;
833 : }
834 : }
835 :
836 : /* Reset per-input-tuple context after each tuple */
837 333 : ResetExprContext(winstate->tmpcontext);
838 :
839 : /* And advance the aggregated-row state */
840 333 : winstate->aggregatedbase++;
841 333 : ExecClearTuple(temp_slot);
842 : }
843 :
844 : /*
845 : * If we successfully advanced the base rows of all the aggregates,
846 : * aggregatedbase now equals frameheadpos; but if we failed for any, we
847 : * must forcibly update aggregatedbase.
848 : */
849 786 : winstate->aggregatedbase = winstate->frameheadpos;
850 :
851 : /*
852 : * If we created a mark pointer for aggregates, keep it pushed up to frame
853 : * head, so that tuplestore can discard unnecessary rows.
854 : */
855 786 : if (agg_winobj->markptr >= 0)
856 406 : WinSetMarkPosition(agg_winobj, winstate->frameheadpos);
857 :
858 : /*
859 : * Now restart the aggregates that require it.
860 : *
861 : * We assume that aggregates using the shared context always restart if
862 : * *any* aggregate restarts, and we may thus clean up the shared
863 : * aggcontext if that is the case. Private aggcontexts are reset by
864 : * initialize_windowaggregate() if their owning aggregate restarts. If we
865 : * aren't restarting an aggregate, we need to free any previously saved
866 : * result for it, else we'll leak memory.
867 : */
868 786 : if (numaggs_restart > 0)
869 203 : MemoryContextResetAndDeleteChildren(winstate->aggcontext);
870 1845 : for (i = 0; i < numaggs; i++)
871 : {
872 1059 : peraggstate = &winstate->peragg[i];
873 :
874 : /* Aggregates using the shared ctx must restart if *any* agg does */
875 1059 : Assert(peraggstate->aggcontext != winstate->aggcontext ||
876 : numaggs_restart == 0 ||
877 : peraggstate->restart);
878 :
879 1059 : if (peraggstate->restart)
880 : {
881 233 : wfuncno = peraggstate->wfuncno;
882 466 : initialize_windowaggregate(winstate,
883 466 : &winstate->perfunc[wfuncno],
884 : peraggstate);
885 : }
886 826 : else if (!peraggstate->resultValueIsNull)
887 : {
888 787 : if (!peraggstate->resulttypeByVal)
889 611 : pfree(DatumGetPointer(peraggstate->resultValue));
890 787 : peraggstate->resultValue = (Datum) 0;
891 787 : peraggstate->resultValueIsNull = true;
892 : }
893 : }
894 :
895 : /*
896 : * Non-restarted aggregates now contain the rows between aggregatedbase
897 : * (i.e., frameheadpos) and aggregatedupto, while restarted aggregates
898 : * contain no rows. If there are any restarted aggregates, we must thus
899 : * begin aggregating anew at frameheadpos, otherwise we may simply
900 : * continue at aggregatedupto. We must remember the old value of
901 : * aggregatedupto to know how long to skip advancing non-restarted
902 : * aggregates. If we modify aggregatedupto, we must also clear
903 : * agg_row_slot, per the loop invariant below.
904 : */
905 786 : aggregatedupto_nonrestarted = winstate->aggregatedupto;
906 989 : if (numaggs_restart > 0 &&
907 203 : winstate->aggregatedupto != winstate->frameheadpos)
908 : {
909 41 : winstate->aggregatedupto = winstate->frameheadpos;
910 41 : ExecClearTuple(agg_row_slot);
911 : }
912 :
913 : /*
914 : * Advance until we reach a row not in frame (or end of partition).
915 : *
916 : * Note the loop invariant: agg_row_slot is either empty or holds the row
917 : * at position aggregatedupto. We advance aggregatedupto after processing
918 : * a row.
919 : */
920 : for (;;)
921 : {
922 : /* Fetch next row if we didn't already */
923 23421 : if (TupIsNull(agg_row_slot))
924 : {
925 23067 : if (!window_gettupleslot(agg_winobj, winstate->aggregatedupto,
926 : agg_row_slot))
927 429 : break; /* must be end of partition */
928 : }
929 :
930 : /* Exit loop (for now) if not in frame */
931 22992 : if (!row_is_in_frame(winstate, winstate->aggregatedupto, agg_row_slot))
932 357 : break;
933 :
934 : /* Set tuple context for evaluation of aggregate arguments */
935 22635 : winstate->tmpcontext->ecxt_outertuple = agg_row_slot;
936 :
937 : /* Accumulate row into the aggregates */
938 49009 : for (i = 0; i < numaggs; i++)
939 : {
940 26374 : peraggstate = &winstate->peragg[i];
941 :
942 : /* Non-restarted aggs skip until aggregatedupto_nonrestarted */
943 46111 : if (!peraggstate->restart &&
944 19737 : winstate->aggregatedupto < aggregatedupto_nonrestarted)
945 3356 : continue;
946 :
947 23018 : wfuncno = peraggstate->wfuncno;
948 46036 : advance_windowaggregate(winstate,
949 46036 : &winstate->perfunc[wfuncno],
950 : peraggstate);
951 : }
952 :
953 : /* Reset per-input-tuple context after each tuple */
954 22635 : ResetExprContext(winstate->tmpcontext);
955 :
956 : /* And advance the aggregated-row state */
957 22635 : winstate->aggregatedupto++;
958 22635 : ExecClearTuple(agg_row_slot);
959 22635 : }
960 :
961 : /* The frame's end is not supposed to move backwards, ever */
962 786 : Assert(aggregatedupto_nonrestarted <= winstate->aggregatedupto);
963 :
964 : /*
965 : * finalize aggregates and fill result/isnull fields.
966 : */
967 1845 : for (i = 0; i < numaggs; i++)
968 : {
969 : Datum *result;
970 : bool *isnull;
971 :
972 1059 : peraggstate = &winstate->peragg[i];
973 1059 : wfuncno = peraggstate->wfuncno;
974 1059 : result = &econtext->ecxt_aggvalues[wfuncno];
975 1059 : isnull = &econtext->ecxt_aggnulls[wfuncno];
976 2118 : finalize_windowaggregate(winstate,
977 2118 : &winstate->perfunc[wfuncno],
978 : peraggstate,
979 : result, isnull);
980 :
981 : /*
982 : * save the result in case next row shares the same frame.
983 : *
984 : * XXX in some framing modes, eg ROWS/END_CURRENT_ROW, we can know in
985 : * advance that the next row can't possibly share the same frame. Is
986 : * it worth detecting that and skipping this code?
987 : */
988 1059 : if (!peraggstate->resulttypeByVal && !*isnull)
989 : {
990 770 : oldContext = MemoryContextSwitchTo(peraggstate->aggcontext);
991 770 : peraggstate->resultValue =
992 1540 : datumCopy(*result,
993 770 : peraggstate->resulttypeByVal,
994 770 : peraggstate->resulttypeLen);
995 770 : MemoryContextSwitchTo(oldContext);
996 : }
997 : else
998 : {
999 289 : peraggstate->resultValue = *result;
1000 : }
1001 1059 : peraggstate->resultValueIsNull = *isnull;
1002 : }
1003 : }
1004 :
1005 : /*
1006 : * eval_windowfunction
1007 : *
1008 : * Arguments of window functions are not evaluated here, because a window
1009 : * function can need random access to arbitrary rows in the partition.
1010 : * The window function uses the special WinGetFuncArgInPartition and
1011 : * WinGetFuncArgInFrame functions to evaluate the arguments for the rows
1012 : * it wants.
1013 : */
1014 : static void
1015 60398 : eval_windowfunction(WindowAggState *winstate, WindowStatePerFunc perfuncstate,
1016 : Datum *result, bool *isnull)
1017 : {
1018 : FunctionCallInfoData fcinfo;
1019 : MemoryContext oldContext;
1020 :
1021 60398 : oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
1022 :
1023 : /*
1024 : * We don't pass any normal arguments to a window function, but we do pass
1025 : * it the number of arguments, in order to permit window function
1026 : * implementations to support varying numbers of arguments. The real info
1027 : * goes through the WindowObject, which is passed via fcinfo->context.
1028 : */
1029 60398 : InitFunctionCallInfoData(fcinfo, &(perfuncstate->flinfo),
1030 : perfuncstate->numArguments,
1031 : perfuncstate->winCollation,
1032 : (void *) perfuncstate->winobj, NULL);
1033 : /* Just in case, make all the regular argument slots be null */
1034 60398 : memset(fcinfo.argnull, true, perfuncstate->numArguments);
1035 : /* Window functions don't have a current aggregate context, either */
1036 60398 : winstate->curaggcontext = NULL;
1037 :
1038 60398 : *result = FunctionCallInvoke(&fcinfo);
1039 60396 : *isnull = fcinfo.isnull;
1040 :
1041 : /*
1042 : * Make sure pass-by-ref data is allocated in the appropriate context. (We
1043 : * need this in case the function returns a pointer into some short-lived
1044 : * tuple, as is entirely possible.)
1045 : */
1046 60513 : if (!perfuncstate->resulttypeByVal && !fcinfo.isnull &&
1047 117 : !MemoryContextContains(CurrentMemoryContext,
1048 117 : DatumGetPointer(*result)))
1049 0 : *result = datumCopy(*result,
1050 0 : perfuncstate->resulttypeByVal,
1051 0 : perfuncstate->resulttypeLen);
1052 :
1053 60396 : MemoryContextSwitchTo(oldContext);
1054 60396 : }
1055 :
1056 : /*
1057 : * begin_partition
1058 : * Start buffering rows of the next partition.
1059 : */
1060 : static void
1061 250 : begin_partition(WindowAggState *winstate)
1062 : {
1063 250 : PlanState *outerPlan = outerPlanState(winstate);
1064 250 : int numfuncs = winstate->numfuncs;
1065 : int i;
1066 :
1067 250 : winstate->partition_spooled = false;
1068 250 : winstate->framehead_valid = false;
1069 250 : winstate->frametail_valid = false;
1070 250 : winstate->spooled_rows = 0;
1071 250 : winstate->currentpos = 0;
1072 250 : winstate->frameheadpos = 0;
1073 250 : winstate->frametailpos = -1;
1074 250 : ExecClearTuple(winstate->agg_row_slot);
1075 :
1076 : /*
1077 : * If this is the very first partition, we need to fetch the first input
1078 : * row to store in first_part_slot.
1079 : */
1080 250 : if (TupIsNull(winstate->first_part_slot))
1081 : {
1082 145 : TupleTableSlot *outerslot = ExecProcNode(outerPlan);
1083 :
1084 145 : if (!TupIsNull(outerslot))
1085 142 : ExecCopySlot(winstate->first_part_slot, outerslot);
1086 : else
1087 : {
1088 : /* outer plan is empty, so we have nothing to do */
1089 3 : winstate->partition_spooled = true;
1090 3 : winstate->more_partitions = false;
1091 253 : return;
1092 : }
1093 : }
1094 :
1095 : /* Create new tuplestore for this partition */
1096 247 : winstate->buffer = tuplestore_begin_heap(false, false, work_mem);
1097 :
1098 : /*
1099 : * Set up read pointers for the tuplestore. The current pointer doesn't
1100 : * need BACKWARD capability, but the per-window-function read pointers do,
1101 : * and the aggregate pointer does if frame start is movable.
1102 : */
1103 247 : winstate->current_ptr = 0; /* read pointer 0 is pre-allocated */
1104 :
1105 : /* reset default REWIND capability bit for current ptr */
1106 247 : tuplestore_set_eflags(winstate->buffer, 0);
1107 :
1108 : /* create read pointers for aggregates, if needed */
1109 247 : if (winstate->numaggs > 0)
1110 : {
1111 156 : WindowObject agg_winobj = winstate->agg_winobj;
1112 156 : int readptr_flags = 0;
1113 :
1114 : /* If the frame head is potentially movable ... */
1115 156 : if (!(winstate->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
1116 : {
1117 : /* ... create a mark pointer to track the frame head */
1118 62 : agg_winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer, 0);
1119 : /* and the read pointer will need BACKWARD capability */
1120 62 : readptr_flags |= EXEC_FLAG_BACKWARD;
1121 : }
1122 :
1123 156 : agg_winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
1124 : readptr_flags);
1125 156 : agg_winobj->markpos = -1;
1126 156 : agg_winobj->seekpos = -1;
1127 :
1128 : /* Also reset the row counters for aggregates */
1129 156 : winstate->aggregatedbase = 0;
1130 156 : winstate->aggregatedupto = 0;
1131 : }
1132 :
1133 : /* create mark and read pointers for each real window function */
1134 551 : for (i = 0; i < numfuncs; i++)
1135 : {
1136 304 : WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1137 :
1138 304 : if (!perfuncstate->plain_agg)
1139 : {
1140 121 : WindowObject winobj = perfuncstate->winobj;
1141 :
1142 121 : winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer,
1143 : 0);
1144 121 : winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
1145 : EXEC_FLAG_BACKWARD);
1146 121 : winobj->markpos = -1;
1147 121 : winobj->seekpos = -1;
1148 : }
1149 : }
1150 :
1151 : /*
1152 : * Store the first tuple into the tuplestore (it's always available now;
1153 : * we either read it above, or saved it at the end of previous partition)
1154 : */
1155 247 : tuplestore_puttupleslot(winstate->buffer, winstate->first_part_slot);
1156 247 : winstate->spooled_rows++;
1157 : }
1158 :
1159 : /*
1160 : * Read tuples from the outer node, up to and including position 'pos', and
1161 : * store them into the tuplestore. If pos is -1, reads the whole partition.
1162 : */
1163 : static void
1164 125264 : spool_tuples(WindowAggState *winstate, int64 pos)
1165 : {
1166 125264 : WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1167 : PlanState *outerPlan;
1168 : TupleTableSlot *outerslot;
1169 : MemoryContext oldcontext;
1170 :
1171 125264 : if (!winstate->buffer)
1172 3 : return; /* just a safety check */
1173 125261 : if (winstate->partition_spooled)
1174 7141 : return; /* whole partition done already */
1175 :
1176 : /*
1177 : * If the tuplestore has spilled to disk, alternate reading and writing
1178 : * becomes quite expensive due to frequent buffer flushes. It's cheaper
1179 : * to force the entire partition to get spooled in one go.
1180 : *
1181 : * XXX this is a horrid kluge --- it'd be better to fix the performance
1182 : * problem inside tuplestore. FIXME
1183 : */
1184 118120 : if (!tuplestore_in_memory(winstate->buffer))
1185 0 : pos = -1;
1186 :
1187 118120 : outerPlan = outerPlanState(winstate);
1188 :
1189 : /* Must be in query context to call outerplan */
1190 118120 : oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
1191 :
1192 297156 : while (winstate->spooled_rows <= pos || pos == -1)
1193 : {
1194 61162 : outerslot = ExecProcNode(outerPlan);
1195 61162 : if (TupIsNull(outerslot))
1196 : {
1197 : /* reached the end of the last partition */
1198 141 : winstate->partition_spooled = true;
1199 141 : winstate->more_partitions = false;
1200 141 : break;
1201 : }
1202 :
1203 61021 : if (node->partNumCols > 0)
1204 : {
1205 : /* Check if this tuple still belongs to the current partition */
1206 20382 : if (!execTuplesMatch(winstate->first_part_slot,
1207 : outerslot,
1208 : node->partNumCols, node->partColIdx,
1209 : winstate->partEqfunctions,
1210 20382 : winstate->tmpcontext->ecxt_per_tuple_memory))
1211 : {
1212 : /*
1213 : * end of partition; copy the tuple for the next cycle.
1214 : */
1215 105 : ExecCopySlot(winstate->first_part_slot, outerslot);
1216 105 : winstate->partition_spooled = true;
1217 105 : winstate->more_partitions = true;
1218 105 : break;
1219 : }
1220 : }
1221 :
1222 : /* Still in partition, so save it into the tuplestore */
1223 60916 : tuplestore_puttupleslot(winstate->buffer, outerslot);
1224 60916 : winstate->spooled_rows++;
1225 : }
1226 :
1227 118120 : MemoryContextSwitchTo(oldcontext);
1228 : }
1229 :
1230 : /*
1231 : * release_partition
1232 : * clear information kept within a partition, including
1233 : * tuplestore and aggregate results.
1234 : */
1235 : static void
1236 398 : release_partition(WindowAggState *winstate)
1237 : {
1238 : int i;
1239 :
1240 878 : for (i = 0; i < winstate->numfuncs; i++)
1241 : {
1242 480 : WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1243 :
1244 : /* Release any partition-local state of this window function */
1245 480 : if (perfuncstate->winobj)
1246 167 : perfuncstate->winobj->localmem = NULL;
1247 : }
1248 :
1249 : /*
1250 : * Release all partition-local memory (in particular, any partition-local
1251 : * state that we might have trashed our pointers to in the above loop, and
1252 : * any aggregate temp data). We don't rely on retail pfree because some
1253 : * aggregates might have allocated data we don't have direct pointers to.
1254 : */
1255 398 : MemoryContextResetAndDeleteChildren(winstate->partcontext);
1256 398 : MemoryContextResetAndDeleteChildren(winstate->aggcontext);
1257 711 : for (i = 0; i < winstate->numaggs; i++)
1258 : {
1259 313 : if (winstate->peragg[i].aggcontext != winstate->aggcontext)
1260 269 : MemoryContextResetAndDeleteChildren(winstate->peragg[i].aggcontext);
1261 : }
1262 :
1263 398 : if (winstate->buffer)
1264 245 : tuplestore_end(winstate->buffer);
1265 398 : winstate->buffer = NULL;
1266 398 : winstate->partition_spooled = false;
1267 398 : }
1268 :
1269 : /*
1270 : * row_is_in_frame
1271 : * Determine whether a row is in the current row's window frame according
1272 : * to our window framing rule
1273 : *
1274 : * The caller must have already determined that the row is in the partition
1275 : * and fetched it into a slot. This function just encapsulates the framing
1276 : * rules.
1277 : */
1278 : static bool
1279 23178 : row_is_in_frame(WindowAggState *winstate, int64 pos, TupleTableSlot *slot)
1280 : {
1281 23178 : int frameOptions = winstate->frameOptions;
1282 :
1283 23178 : Assert(pos >= 0); /* else caller error */
1284 :
1285 : /* First, check frame starting conditions */
1286 23178 : if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
1287 : {
1288 2109 : if (frameOptions & FRAMEOPTION_ROWS)
1289 : {
1290 : /* rows before current row are out of frame */
1291 2059 : if (pos < winstate->currentpos)
1292 0 : return false;
1293 : }
1294 50 : else if (frameOptions & FRAMEOPTION_RANGE)
1295 : {
1296 : /* preceding row that is not peer is out of frame */
1297 58 : if (pos < winstate->currentpos &&
1298 8 : !are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
1299 0 : return false;
1300 : }
1301 : else
1302 0 : Assert(false);
1303 : }
1304 21069 : else if (frameOptions & FRAMEOPTION_START_VALUE)
1305 : {
1306 145 : if (frameOptions & FRAMEOPTION_ROWS)
1307 : {
1308 145 : int64 offset = DatumGetInt64(winstate->startOffsetValue);
1309 :
1310 : /* rows before current row + offset are out of frame */
1311 145 : if (frameOptions & FRAMEOPTION_START_VALUE_PRECEDING)
1312 130 : offset = -offset;
1313 :
1314 145 : if (pos < winstate->currentpos + offset)
1315 0 : return false;
1316 : }
1317 0 : else if (frameOptions & FRAMEOPTION_RANGE)
1318 : {
1319 : /* parser should have rejected this */
1320 0 : elog(ERROR, "window frame with value offset is not implemented");
1321 : }
1322 : else
1323 0 : Assert(false);
1324 : }
1325 :
1326 : /* Okay so far, now check frame ending conditions */
1327 23178 : if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
1328 : {
1329 20948 : if (frameOptions & FRAMEOPTION_ROWS)
1330 : {
1331 : /* rows after current row are out of frame */
1332 337 : if (pos > winstate->currentpos)
1333 149 : return false;
1334 : }
1335 20611 : else if (frameOptions & FRAMEOPTION_RANGE)
1336 : {
1337 : /* following row that is not peer is out of frame */
1338 40882 : if (pos > winstate->currentpos &&
1339 20271 : !are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
1340 162 : return false;
1341 : }
1342 : else
1343 0 : Assert(false);
1344 : }
1345 2230 : else if (frameOptions & FRAMEOPTION_END_VALUE)
1346 : {
1347 110 : if (frameOptions & FRAMEOPTION_ROWS)
1348 : {
1349 110 : int64 offset = DatumGetInt64(winstate->endOffsetValue);
1350 :
1351 : /* rows after current row + offset are out of frame */
1352 110 : if (frameOptions & FRAMEOPTION_END_VALUE_PRECEDING)
1353 19 : offset = -offset;
1354 :
1355 110 : if (pos > winstate->currentpos + offset)
1356 46 : return false;
1357 : }
1358 0 : else if (frameOptions & FRAMEOPTION_RANGE)
1359 : {
1360 : /* parser should have rejected this */
1361 0 : elog(ERROR, "window frame with value offset is not implemented");
1362 : }
1363 : else
1364 0 : Assert(false);
1365 : }
1366 :
1367 : /* If we get here, it's in frame */
1368 22821 : return true;
1369 : }
1370 :
1371 : /*
1372 : * update_frameheadpos
1373 : * make frameheadpos valid for the current row
1374 : *
1375 : * Uses the winobj's read pointer for any required fetches; hence, if the
1376 : * frame mode is one that requires row comparisons, the winobj's mark must
1377 : * not be past the currently known frame head. Also uses the specified slot
1378 : * for any required fetches.
1379 : */
1380 : static void
1381 20991 : update_frameheadpos(WindowObject winobj, TupleTableSlot *slot)
1382 : {
1383 20991 : WindowAggState *winstate = winobj->winstate;
1384 20991 : WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1385 20991 : int frameOptions = winstate->frameOptions;
1386 :
1387 20991 : if (winstate->framehead_valid)
1388 40 : return; /* already known for current row */
1389 :
1390 20951 : if (frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)
1391 : {
1392 : /* In UNBOUNDED PRECEDING mode, frame head is always row 0 */
1393 20523 : winstate->frameheadpos = 0;
1394 20523 : winstate->framehead_valid = true;
1395 : }
1396 428 : else if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
1397 : {
1398 345 : if (frameOptions & FRAMEOPTION_ROWS)
1399 : {
1400 : /* In ROWS mode, frame head is the same as current */
1401 315 : winstate->frameheadpos = winstate->currentpos;
1402 315 : winstate->framehead_valid = true;
1403 : }
1404 30 : else if (frameOptions & FRAMEOPTION_RANGE)
1405 : {
1406 : int64 fhprev;
1407 :
1408 : /* If no ORDER BY, all rows are peers with each other */
1409 30 : if (node->ordNumCols == 0)
1410 : {
1411 0 : winstate->frameheadpos = 0;
1412 0 : winstate->framehead_valid = true;
1413 0 : return;
1414 : }
1415 :
1416 : /*
1417 : * In RANGE START_CURRENT mode, frame head is the first row that
1418 : * is a peer of current row. We search backwards from current,
1419 : * which could be a bit inefficient if peer sets are large. Might
1420 : * be better to have a separate read pointer that moves forward
1421 : * tracking the frame head.
1422 : */
1423 30 : fhprev = winstate->currentpos - 1;
1424 : for (;;)
1425 : {
1426 : /* assume the frame head can't go backwards */
1427 54 : if (fhprev < winstate->frameheadpos)
1428 21 : break;
1429 33 : if (!window_gettupleslot(winobj, fhprev, slot))
1430 0 : break; /* start of partition */
1431 33 : if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
1432 9 : break; /* not peer of current row */
1433 24 : fhprev--;
1434 24 : }
1435 30 : winstate->frameheadpos = fhprev + 1;
1436 30 : winstate->framehead_valid = true;
1437 : }
1438 : else
1439 0 : Assert(false);
1440 : }
1441 83 : else if (frameOptions & FRAMEOPTION_START_VALUE)
1442 : {
1443 83 : if (frameOptions & FRAMEOPTION_ROWS)
1444 : {
1445 : /* In ROWS mode, bound is physically n before/after current */
1446 83 : int64 offset = DatumGetInt64(winstate->startOffsetValue);
1447 :
1448 83 : if (frameOptions & FRAMEOPTION_START_VALUE_PRECEDING)
1449 73 : offset = -offset;
1450 :
1451 83 : winstate->frameheadpos = winstate->currentpos + offset;
1452 : /* frame head can't go before first row */
1453 83 : if (winstate->frameheadpos < 0)
1454 16 : winstate->frameheadpos = 0;
1455 67 : else if (winstate->frameheadpos > winstate->currentpos)
1456 : {
1457 : /* make sure frameheadpos is not past end of partition */
1458 10 : spool_tuples(winstate, winstate->frameheadpos - 1);
1459 10 : if (winstate->frameheadpos > winstate->spooled_rows)
1460 0 : winstate->frameheadpos = winstate->spooled_rows;
1461 : }
1462 83 : winstate->framehead_valid = true;
1463 : }
1464 0 : else if (frameOptions & FRAMEOPTION_RANGE)
1465 : {
1466 : /* parser should have rejected this */
1467 0 : elog(ERROR, "window frame with value offset is not implemented");
1468 : }
1469 : else
1470 0 : Assert(false);
1471 : }
1472 : else
1473 0 : Assert(false);
1474 : }
1475 :
1476 : /*
1477 : * update_frametailpos
1478 : * make frametailpos valid for the current row
1479 : *
1480 : * Uses the winobj's read pointer for any required fetches; hence, if the
1481 : * frame mode is one that requires row comparisons, the winobj's mark must
1482 : * not be past the currently known frame tail. Also uses the specified slot
1483 : * for any required fetches.
1484 : */
1485 : static void
1486 130 : update_frametailpos(WindowObject winobj, TupleTableSlot *slot)
1487 : {
1488 130 : WindowAggState *winstate = winobj->winstate;
1489 130 : WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1490 130 : int frameOptions = winstate->frameOptions;
1491 :
1492 130 : if (winstate->frametail_valid)
1493 0 : return; /* already known for current row */
1494 :
1495 130 : if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING)
1496 : {
1497 : /* In UNBOUNDED FOLLOWING mode, all partition rows are in frame */
1498 30 : spool_tuples(winstate, -1);
1499 30 : winstate->frametailpos = winstate->spooled_rows - 1;
1500 30 : winstate->frametail_valid = true;
1501 : }
1502 100 : else if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
1503 : {
1504 100 : if (frameOptions & FRAMEOPTION_ROWS)
1505 : {
1506 : /* In ROWS mode, exactly the rows up to current are in frame */
1507 20 : winstate->frametailpos = winstate->currentpos;
1508 20 : winstate->frametail_valid = true;
1509 : }
1510 80 : else if (frameOptions & FRAMEOPTION_RANGE)
1511 : {
1512 : int64 ftnext;
1513 :
1514 : /* If no ORDER BY, all rows are peers with each other */
1515 80 : if (node->ordNumCols == 0)
1516 : {
1517 10 : spool_tuples(winstate, -1);
1518 10 : winstate->frametailpos = winstate->spooled_rows - 1;
1519 10 : winstate->frametail_valid = true;
1520 10 : return;
1521 : }
1522 :
1523 : /*
1524 : * Else we have to search for the first non-peer of the current
1525 : * row. We assume the current value of frametailpos is a lower
1526 : * bound on the possible frame tail location, ie, frame tail never
1527 : * goes backward, and that currentpos is also a lower bound, ie,
1528 : * frame end always >= current row.
1529 : */
1530 70 : ftnext = Max(winstate->frametailpos, winstate->currentpos) + 1;
1531 : for (;;)
1532 : {
1533 82 : if (!window_gettupleslot(winobj, ftnext, slot))
1534 13 : break; /* end of partition */
1535 69 : if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
1536 57 : break; /* not peer of current row */
1537 12 : ftnext++;
1538 12 : }
1539 70 : winstate->frametailpos = ftnext - 1;
1540 70 : winstate->frametail_valid = true;
1541 : }
1542 : else
1543 0 : Assert(false);
1544 : }
1545 0 : else if (frameOptions & FRAMEOPTION_END_VALUE)
1546 : {
1547 0 : if (frameOptions & FRAMEOPTION_ROWS)
1548 : {
1549 : /* In ROWS mode, bound is physically n before/after current */
1550 0 : int64 offset = DatumGetInt64(winstate->endOffsetValue);
1551 :
1552 0 : if (frameOptions & FRAMEOPTION_END_VALUE_PRECEDING)
1553 0 : offset = -offset;
1554 :
1555 0 : winstate->frametailpos = winstate->currentpos + offset;
1556 : /* smallest allowable value of frametailpos is -1 */
1557 0 : if (winstate->frametailpos < 0)
1558 0 : winstate->frametailpos = -1;
1559 0 : else if (winstate->frametailpos > winstate->currentpos)
1560 : {
1561 : /* make sure frametailpos is not past last row of partition */
1562 0 : spool_tuples(winstate, winstate->frametailpos);
1563 0 : if (winstate->frametailpos >= winstate->spooled_rows)
1564 0 : winstate->frametailpos = winstate->spooled_rows - 1;
1565 : }
1566 0 : winstate->frametail_valid = true;
1567 : }
1568 0 : else if (frameOptions & FRAMEOPTION_RANGE)
1569 : {
1570 : /* parser should have rejected this */
1571 0 : elog(ERROR, "window frame with value offset is not implemented");
1572 : }
1573 : else
1574 0 : Assert(false);
1575 : }
1576 : else
1577 0 : Assert(false);
1578 : }
1579 :
1580 :
1581 : /* -----------------
1582 : * ExecWindowAgg
1583 : *
1584 : * ExecWindowAgg receives tuples from its outer subplan and
1585 : * stores them into a tuplestore, then processes window functions.
1586 : * This node doesn't reduce nor qualify any row so the number of
1587 : * returned rows is exactly the same as its outer subplan's result.
1588 : * -----------------
1589 : */
1590 : static TupleTableSlot *
1591 41308 : ExecWindowAgg(PlanState *pstate)
1592 : {
1593 41308 : WindowAggState *winstate = castNode(WindowAggState, pstate);
1594 : ExprContext *econtext;
1595 : int i;
1596 : int numfuncs;
1597 :
1598 41308 : CHECK_FOR_INTERRUPTS();
1599 :
1600 41308 : if (winstate->all_done)
1601 0 : return NULL;
1602 :
1603 : /*
1604 : * Compute frame offset values, if any, during first call.
1605 : */
1606 41308 : if (winstate->all_first)
1607 : {
1608 145 : int frameOptions = winstate->frameOptions;
1609 145 : ExprContext *econtext = winstate->ss.ps.ps_ExprContext;
1610 : Datum value;
1611 : bool isnull;
1612 : int16 len;
1613 : bool byval;
1614 :
1615 145 : if (frameOptions & FRAMEOPTION_START_VALUE)
1616 : {
1617 11 : Assert(winstate->startOffset != NULL);
1618 11 : value = ExecEvalExprSwitchContext(winstate->startOffset,
1619 : econtext,
1620 : &isnull);
1621 11 : if (isnull)
1622 0 : ereport(ERROR,
1623 : (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
1624 : errmsg("frame starting offset must not be null")));
1625 : /* copy value into query-lifespan context */
1626 11 : get_typlenbyval(exprType((Node *) winstate->startOffset->expr),
1627 : &len, &byval);
1628 11 : winstate->startOffsetValue = datumCopy(value, byval, len);
1629 11 : if (frameOptions & FRAMEOPTION_ROWS)
1630 : {
1631 : /* value is known to be int8 */
1632 11 : int64 offset = DatumGetInt64(value);
1633 :
1634 11 : if (offset < 0)
1635 0 : ereport(ERROR,
1636 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1637 : errmsg("frame starting offset must not be negative")));
1638 : }
1639 : }
1640 145 : if (frameOptions & FRAMEOPTION_END_VALUE)
1641 : {
1642 9 : Assert(winstate->endOffset != NULL);
1643 9 : value = ExecEvalExprSwitchContext(winstate->endOffset,
1644 : econtext,
1645 : &isnull);
1646 9 : if (isnull)
1647 0 : ereport(ERROR,
1648 : (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
1649 : errmsg("frame ending offset must not be null")));
1650 : /* copy value into query-lifespan context */
1651 9 : get_typlenbyval(exprType((Node *) winstate->endOffset->expr),
1652 : &len, &byval);
1653 9 : winstate->endOffsetValue = datumCopy(value, byval, len);
1654 9 : if (frameOptions & FRAMEOPTION_ROWS)
1655 : {
1656 : /* value is known to be int8 */
1657 9 : int64 offset = DatumGetInt64(value);
1658 :
1659 9 : if (offset < 0)
1660 0 : ereport(ERROR,
1661 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1662 : errmsg("frame ending offset must not be negative")));
1663 : }
1664 : }
1665 145 : winstate->all_first = false;
1666 : }
1667 :
1668 41308 : if (winstate->buffer == NULL)
1669 : {
1670 : /* Initialize for first partition and set current row = 0 */
1671 145 : begin_partition(winstate);
1672 : /* If there are no input rows, we'll detect that and exit below */
1673 : }
1674 : else
1675 : {
1676 : /* Advance current row within partition */
1677 41163 : winstate->currentpos++;
1678 : /* This might mean that the frame moves, too */
1679 41163 : winstate->framehead_valid = false;
1680 41163 : winstate->frametail_valid = false;
1681 : }
1682 :
1683 : /*
1684 : * Spool all tuples up to and including the current row, if we haven't
1685 : * already
1686 : */
1687 41308 : spool_tuples(winstate, winstate->currentpos);
1688 :
1689 : /* Move to the next partition if we reached the end of this partition */
1690 45990 : if (winstate->partition_spooled &&
1691 4682 : winstate->currentpos >= winstate->spooled_rows)
1692 : {
1693 247 : release_partition(winstate);
1694 :
1695 247 : if (winstate->more_partitions)
1696 : {
1697 105 : begin_partition(winstate);
1698 105 : Assert(winstate->spooled_rows > 0);
1699 : }
1700 : else
1701 : {
1702 142 : winstate->all_done = true;
1703 142 : return NULL;
1704 : }
1705 : }
1706 :
1707 : /* final output execution is in ps_ExprContext */
1708 41166 : econtext = winstate->ss.ps.ps_ExprContext;
1709 :
1710 : /* Clear the per-output-tuple context for current row */
1711 41166 : ResetExprContext(econtext);
1712 :
1713 : /*
1714 : * Read the current row from the tuplestore, and save in ScanTupleSlot.
1715 : * (We can't rely on the outerplan's output slot because we may have to
1716 : * read beyond the current row. Also, we have to actually copy the row
1717 : * out of the tuplestore, since window function evaluation might cause the
1718 : * tuplestore to dump its state to disk.)
1719 : *
1720 : * Current row must be in the tuplestore, since we spooled it above.
1721 : */
1722 41166 : tuplestore_select_read_pointer(winstate->buffer, winstate->current_ptr);
1723 41166 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1724 : winstate->ss.ss_ScanTupleSlot))
1725 0 : elog(ERROR, "unexpected end of tuplestore");
1726 :
1727 : /*
1728 : * Evaluate true window functions
1729 : */
1730 41166 : numfuncs = winstate->numfuncs;
1731 122738 : for (i = 0; i < numfuncs; i++)
1732 : {
1733 81574 : WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1734 :
1735 81574 : if (perfuncstate->plain_agg)
1736 21176 : continue;
1737 181194 : eval_windowfunction(winstate, perfuncstate,
1738 120796 : &(econtext->ecxt_aggvalues[perfuncstate->wfuncstate->wfuncno]),
1739 60398 : &(econtext->ecxt_aggnulls[perfuncstate->wfuncstate->wfuncno]));
1740 : }
1741 :
1742 : /*
1743 : * Evaluate aggregates
1744 : */
1745 41164 : if (winstate->numaggs > 0)
1746 20901 : eval_windowaggregates(winstate);
1747 :
1748 : /*
1749 : * Truncate any no-longer-needed rows from the tuplestore.
1750 : */
1751 41164 : tuplestore_trim(winstate->buffer);
1752 :
1753 : /*
1754 : * Form and return a projection tuple using the windowfunc results and the
1755 : * current row. Setting ecxt_outertuple arranges that any Vars will be
1756 : * evaluated with respect to that row.
1757 : */
1758 41164 : econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
1759 :
1760 41164 : return ExecProject(winstate->ss.ps.ps_ProjInfo);
1761 : }
1762 :
1763 : /* -----------------
1764 : * ExecInitWindowAgg
1765 : *
1766 : * Creates the run-time information for the WindowAgg node produced by the
1767 : * planner and initializes its outer subtree
1768 : * -----------------
1769 : */
1770 : WindowAggState *
1771 140 : ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
1772 : {
1773 : WindowAggState *winstate;
1774 : Plan *outerPlan;
1775 : ExprContext *econtext;
1776 : ExprContext *tmpcontext;
1777 : WindowStatePerFunc perfunc;
1778 : WindowStatePerAgg peragg;
1779 : int numfuncs,
1780 : wfuncno,
1781 : numaggs,
1782 : aggno;
1783 : ListCell *l;
1784 :
1785 : /* check for unsupported flags */
1786 140 : Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
1787 :
1788 : /*
1789 : * create state structure
1790 : */
1791 140 : winstate = makeNode(WindowAggState);
1792 140 : winstate->ss.ps.plan = (Plan *) node;
1793 140 : winstate->ss.ps.state = estate;
1794 140 : winstate->ss.ps.ExecProcNode = ExecWindowAgg;
1795 :
1796 : /*
1797 : * Create expression contexts. We need two, one for per-input-tuple
1798 : * processing and one for per-output-tuple processing. We cheat a little
1799 : * by using ExecAssignExprContext() to build both.
1800 : */
1801 140 : ExecAssignExprContext(estate, &winstate->ss.ps);
1802 140 : tmpcontext = winstate->ss.ps.ps_ExprContext;
1803 140 : winstate->tmpcontext = tmpcontext;
1804 140 : ExecAssignExprContext(estate, &winstate->ss.ps);
1805 :
1806 : /* Create long-lived context for storage of partition-local memory etc */
1807 140 : winstate->partcontext =
1808 140 : AllocSetContextCreate(CurrentMemoryContext,
1809 : "WindowAgg Partition",
1810 : ALLOCSET_DEFAULT_SIZES);
1811 :
1812 : /*
1813 : * Create mid-lived context for aggregate trans values etc.
1814 : *
1815 : * Note that moving aggregates each use their own private context, not
1816 : * this one.
1817 : */
1818 140 : winstate->aggcontext =
1819 140 : AllocSetContextCreate(CurrentMemoryContext,
1820 : "WindowAgg Aggregates",
1821 : ALLOCSET_DEFAULT_SIZES);
1822 :
1823 : /*
1824 : * tuple table initialization
1825 : */
1826 140 : ExecInitScanTupleSlot(estate, &winstate->ss);
1827 140 : ExecInitResultTupleSlot(estate, &winstate->ss.ps);
1828 140 : winstate->first_part_slot = ExecInitExtraTupleSlot(estate);
1829 140 : winstate->agg_row_slot = ExecInitExtraTupleSlot(estate);
1830 140 : winstate->temp_slot_1 = ExecInitExtraTupleSlot(estate);
1831 140 : winstate->temp_slot_2 = ExecInitExtraTupleSlot(estate);
1832 :
1833 : /*
1834 : * WindowAgg nodes never have quals, since they can only occur at the
1835 : * logical top level of a query (ie, after any WHERE or HAVING filters)
1836 : */
1837 140 : Assert(node->plan.qual == NIL);
1838 140 : winstate->ss.ps.qual = NULL;
1839 :
1840 : /*
1841 : * initialize child nodes
1842 : */
1843 140 : outerPlan = outerPlan(node);
1844 140 : outerPlanState(winstate) = ExecInitNode(outerPlan, estate, eflags);
1845 :
1846 : /*
1847 : * initialize source tuple type (which is also the tuple type that we'll
1848 : * store in the tuplestore and use in all our working slots).
1849 : */
1850 140 : ExecAssignScanTypeFromOuterPlan(&winstate->ss);
1851 :
1852 140 : ExecSetSlotDescriptor(winstate->first_part_slot,
1853 140 : winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
1854 140 : ExecSetSlotDescriptor(winstate->agg_row_slot,
1855 140 : winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
1856 140 : ExecSetSlotDescriptor(winstate->temp_slot_1,
1857 140 : winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
1858 140 : ExecSetSlotDescriptor(winstate->temp_slot_2,
1859 140 : winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
1860 :
1861 : /*
1862 : * Initialize result tuple type and projection info.
1863 : */
1864 140 : ExecAssignResultTypeFromTL(&winstate->ss.ps);
1865 140 : ExecAssignProjectionInfo(&winstate->ss.ps, NULL);
1866 :
1867 : /* Set up data for comparing tuples */
1868 140 : if (node->partNumCols > 0)
1869 45 : winstate->partEqfunctions = execTuplesMatchPrepare(node->partNumCols,
1870 : node->partOperators);
1871 140 : if (node->ordNumCols > 0)
1872 107 : winstate->ordEqfunctions = execTuplesMatchPrepare(node->ordNumCols,
1873 : node->ordOperators);
1874 :
1875 : /*
1876 : * WindowAgg nodes use aggvalues and aggnulls as well as Agg nodes.
1877 : */
1878 140 : numfuncs = winstate->numfuncs;
1879 140 : numaggs = winstate->numaggs;
1880 140 : econtext = winstate->ss.ps.ps_ExprContext;
1881 140 : econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numfuncs);
1882 140 : econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numfuncs);
1883 :
1884 : /*
1885 : * allocate per-wfunc/per-agg state information.
1886 : */
1887 140 : perfunc = (WindowStatePerFunc) palloc0(sizeof(WindowStatePerFuncData) * numfuncs);
1888 140 : peragg = (WindowStatePerAgg) palloc0(sizeof(WindowStatePerAggData) * numaggs);
1889 140 : winstate->perfunc = perfunc;
1890 140 : winstate->peragg = peragg;
1891 :
1892 140 : wfuncno = -1;
1893 140 : aggno = -1;
1894 306 : foreach(l, winstate->funcs)
1895 : {
1896 166 : WindowFuncExprState *wfuncstate = (WindowFuncExprState *) lfirst(l);
1897 166 : WindowFunc *wfunc = wfuncstate->wfunc;
1898 : WindowStatePerFunc perfuncstate;
1899 : AclResult aclresult;
1900 : int i;
1901 :
1902 166 : if (wfunc->winref != node->winref) /* planner screwed up? */
1903 0 : elog(ERROR, "WindowFunc with winref %u assigned to WindowAgg with winref %u",
1904 : wfunc->winref, node->winref);
1905 :
1906 : /* Look for a previous duplicate window function */
1907 201 : for (i = 0; i <= wfuncno; i++)
1908 : {
1909 37 : if (equal(wfunc, perfunc[i].wfunc) &&
1910 1 : !contain_volatile_functions((Node *) wfunc))
1911 1 : break;
1912 : }
1913 166 : if (i <= wfuncno)
1914 : {
1915 : /* Found a match to an existing entry, so just mark it */
1916 1 : wfuncstate->wfuncno = i;
1917 1 : continue;
1918 : }
1919 :
1920 : /* Nope, so assign a new PerAgg record */
1921 165 : perfuncstate = &perfunc[++wfuncno];
1922 :
1923 : /* Mark WindowFunc state node with assigned index in the result array */
1924 165 : wfuncstate->wfuncno = wfuncno;
1925 :
1926 : /* Check permission to call window function */
1927 165 : aclresult = pg_proc_aclcheck(wfunc->winfnoid, GetUserId(),
1928 : ACL_EXECUTE);
1929 165 : if (aclresult != ACLCHECK_OK)
1930 0 : aclcheck_error(aclresult, ACL_KIND_PROC,
1931 0 : get_func_name(wfunc->winfnoid));
1932 165 : InvokeFunctionExecuteHook(wfunc->winfnoid);
1933 :
1934 : /* Fill in the perfuncstate data */
1935 165 : perfuncstate->wfuncstate = wfuncstate;
1936 165 : perfuncstate->wfunc = wfunc;
1937 165 : perfuncstate->numArguments = list_length(wfuncstate->args);
1938 :
1939 165 : fmgr_info_cxt(wfunc->winfnoid, &perfuncstate->flinfo,
1940 : econtext->ecxt_per_query_memory);
1941 165 : fmgr_info_set_expr((Node *) wfunc, &perfuncstate->flinfo);
1942 :
1943 165 : perfuncstate->winCollation = wfunc->inputcollid;
1944 :
1945 165 : get_typlenbyval(wfunc->wintype,
1946 : &perfuncstate->resulttypeLen,
1947 : &perfuncstate->resulttypeByVal);
1948 :
1949 : /*
1950 : * If it's really just a plain aggregate function, we'll emulate the
1951 : * Agg environment for it.
1952 : */
1953 165 : perfuncstate->plain_agg = wfunc->winagg;
1954 165 : if (wfunc->winagg)
1955 : {
1956 : WindowStatePerAgg peraggstate;
1957 :
1958 115 : perfuncstate->aggno = ++aggno;
1959 115 : peraggstate = &winstate->peragg[aggno];
1960 115 : initialize_peragg(winstate, wfunc, peraggstate);
1961 115 : peraggstate->wfuncno = wfuncno;
1962 : }
1963 : else
1964 : {
1965 50 : WindowObject winobj = makeNode(WindowObjectData);
1966 :
1967 50 : winobj->winstate = winstate;
1968 50 : winobj->argstates = wfuncstate->args;
1969 50 : winobj->localmem = NULL;
1970 50 : perfuncstate->winobj = winobj;
1971 : }
1972 : }
1973 :
1974 : /* Update numfuncs, numaggs to match number of unique functions found */
1975 140 : winstate->numfuncs = wfuncno + 1;
1976 140 : winstate->numaggs = aggno + 1;
1977 :
1978 : /* Set up WindowObject for aggregates, if needed */
1979 140 : if (winstate->numaggs > 0)
1980 : {
1981 103 : WindowObject agg_winobj = makeNode(WindowObjectData);
1982 :
1983 103 : agg_winobj->winstate = winstate;
1984 103 : agg_winobj->argstates = NIL;
1985 103 : agg_winobj->localmem = NULL;
1986 : /* make sure markptr = -1 to invalidate. It may not get used */
1987 103 : agg_winobj->markptr = -1;
1988 103 : agg_winobj->readptr = -1;
1989 103 : winstate->agg_winobj = agg_winobj;
1990 : }
1991 :
1992 : /* copy frame options to state node for easy access */
1993 140 : winstate->frameOptions = node->frameOptions;
1994 :
1995 : /* initialize frame bound offset expressions */
1996 140 : winstate->startOffset = ExecInitExpr((Expr *) node->startOffset,
1997 : (PlanState *) winstate);
1998 140 : winstate->endOffset = ExecInitExpr((Expr *) node->endOffset,
1999 : (PlanState *) winstate);
2000 :
2001 140 : winstate->all_first = true;
2002 140 : winstate->partition_spooled = false;
2003 140 : winstate->more_partitions = false;
2004 :
2005 140 : return winstate;
2006 : }
2007 :
2008 : /* -----------------
2009 : * ExecEndWindowAgg
2010 : * -----------------
2011 : */
2012 : void
2013 138 : ExecEndWindowAgg(WindowAggState *node)
2014 : {
2015 : PlanState *outerPlan;
2016 : int i;
2017 :
2018 138 : release_partition(node);
2019 :
2020 138 : ExecClearTuple(node->ss.ss_ScanTupleSlot);
2021 138 : ExecClearTuple(node->first_part_slot);
2022 138 : ExecClearTuple(node->agg_row_slot);
2023 138 : ExecClearTuple(node->temp_slot_1);
2024 138 : ExecClearTuple(node->temp_slot_2);
2025 :
2026 : /*
2027 : * Free both the expr contexts.
2028 : */
2029 138 : ExecFreeExprContext(&node->ss.ps);
2030 138 : node->ss.ps.ps_ExprContext = node->tmpcontext;
2031 138 : ExecFreeExprContext(&node->ss.ps);
2032 :
2033 253 : for (i = 0; i < node->numaggs; i++)
2034 : {
2035 115 : if (node->peragg[i].aggcontext != node->aggcontext)
2036 104 : MemoryContextDelete(node->peragg[i].aggcontext);
2037 : }
2038 138 : MemoryContextDelete(node->partcontext);
2039 138 : MemoryContextDelete(node->aggcontext);
2040 :
2041 138 : pfree(node->perfunc);
2042 138 : pfree(node->peragg);
2043 :
2044 138 : outerPlan = outerPlanState(node);
2045 138 : ExecEndNode(outerPlan);
2046 138 : }
2047 :
2048 : /* -----------------
2049 : * ExecReScanWindowAgg
2050 : * -----------------
2051 : */
2052 : void
2053 13 : ExecReScanWindowAgg(WindowAggState *node)
2054 : {
2055 13 : PlanState *outerPlan = outerPlanState(node);
2056 13 : ExprContext *econtext = node->ss.ps.ps_ExprContext;
2057 :
2058 13 : node->all_done = false;
2059 13 : node->all_first = true;
2060 :
2061 : /* release tuplestore et al */
2062 13 : release_partition(node);
2063 :
2064 : /* release all temp tuples, but especially first_part_slot */
2065 13 : ExecClearTuple(node->ss.ss_ScanTupleSlot);
2066 13 : ExecClearTuple(node->first_part_slot);
2067 13 : ExecClearTuple(node->agg_row_slot);
2068 13 : ExecClearTuple(node->temp_slot_1);
2069 13 : ExecClearTuple(node->temp_slot_2);
2070 :
2071 : /* Forget current wfunc values */
2072 13 : MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numfuncs);
2073 13 : MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numfuncs);
2074 :
2075 : /*
2076 : * if chgParam of subnode is not null then plan will be re-scanned by
2077 : * first ExecProcNode.
2078 : */
2079 13 : if (outerPlan->chgParam == NULL)
2080 1 : ExecReScan(outerPlan);
2081 13 : }
2082 :
2083 : /*
2084 : * initialize_peragg
2085 : *
2086 : * Almost same as in nodeAgg.c, except we don't support DISTINCT currently.
2087 : */
2088 : static WindowStatePerAggData *
2089 115 : initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
2090 : WindowStatePerAgg peraggstate)
2091 : {
2092 : Oid inputTypes[FUNC_MAX_ARGS];
2093 : int numArguments;
2094 : HeapTuple aggTuple;
2095 : Form_pg_aggregate aggform;
2096 : Oid aggtranstype;
2097 : AttrNumber initvalAttNo;
2098 : AclResult aclresult;
2099 : Oid transfn_oid,
2100 : invtransfn_oid,
2101 : finalfn_oid;
2102 : bool finalextra;
2103 : Expr *transfnexpr,
2104 : *invtransfnexpr,
2105 : *finalfnexpr;
2106 : Datum textInitVal;
2107 : int i;
2108 : ListCell *lc;
2109 :
2110 115 : numArguments = list_length(wfunc->args);
2111 :
2112 115 : i = 0;
2113 220 : foreach(lc, wfunc->args)
2114 : {
2115 105 : inputTypes[i++] = exprType((Node *) lfirst(lc));
2116 : }
2117 :
2118 115 : aggTuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(wfunc->winfnoid));
2119 115 : if (!HeapTupleIsValid(aggTuple))
2120 0 : elog(ERROR, "cache lookup failed for aggregate %u",
2121 : wfunc->winfnoid);
2122 115 : aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
2123 :
2124 : /*
2125 : * Figure out whether we want to use the moving-aggregate implementation,
2126 : * and collect the right set of fields from the pg_attribute entry.
2127 : *
2128 : * If the frame head can't move, we don't need moving-aggregate code. Even
2129 : * if we'd like to use it, don't do so if the aggregate's arguments (and
2130 : * FILTER clause if any) contain any calls to volatile functions.
2131 : * Otherwise, the difference between restarting and not restarting the
2132 : * aggregation would be user-visible.
2133 : */
2134 221 : if (OidIsValid(aggform->aggminvtransfn) &&
2135 212 : !(winstate->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING) &&
2136 106 : !contain_volatile_functions((Node *) wfunc))
2137 : {
2138 104 : peraggstate->transfn_oid = transfn_oid = aggform->aggmtransfn;
2139 104 : peraggstate->invtransfn_oid = invtransfn_oid = aggform->aggminvtransfn;
2140 104 : peraggstate->finalfn_oid = finalfn_oid = aggform->aggmfinalfn;
2141 104 : finalextra = aggform->aggmfinalextra;
2142 104 : aggtranstype = aggform->aggmtranstype;
2143 104 : initvalAttNo = Anum_pg_aggregate_aggminitval;
2144 : }
2145 : else
2146 : {
2147 11 : peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn;
2148 11 : peraggstate->invtransfn_oid = invtransfn_oid = InvalidOid;
2149 11 : peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
2150 11 : finalextra = aggform->aggfinalextra;
2151 11 : aggtranstype = aggform->aggtranstype;
2152 11 : initvalAttNo = Anum_pg_aggregate_agginitval;
2153 : }
2154 :
2155 : /*
2156 : * ExecInitWindowAgg already checked permission to call aggregate function
2157 : * ... but we still need to check the component functions
2158 : */
2159 :
2160 : /* Check that aggregate owner has permission to call component fns */
2161 : {
2162 : HeapTuple procTuple;
2163 : Oid aggOwner;
2164 :
2165 115 : procTuple = SearchSysCache1(PROCOID,
2166 : ObjectIdGetDatum(wfunc->winfnoid));
2167 115 : if (!HeapTupleIsValid(procTuple))
2168 0 : elog(ERROR, "cache lookup failed for function %u",
2169 : wfunc->winfnoid);
2170 115 : aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
2171 115 : ReleaseSysCache(procTuple);
2172 :
2173 115 : aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
2174 : ACL_EXECUTE);
2175 115 : if (aclresult != ACLCHECK_OK)
2176 0 : aclcheck_error(aclresult, ACL_KIND_PROC,
2177 0 : get_func_name(transfn_oid));
2178 115 : InvokeFunctionExecuteHook(transfn_oid);
2179 :
2180 115 : if (OidIsValid(invtransfn_oid))
2181 : {
2182 104 : aclresult = pg_proc_aclcheck(invtransfn_oid, aggOwner,
2183 : ACL_EXECUTE);
2184 104 : if (aclresult != ACLCHECK_OK)
2185 0 : aclcheck_error(aclresult, ACL_KIND_PROC,
2186 0 : get_func_name(invtransfn_oid));
2187 104 : InvokeFunctionExecuteHook(invtransfn_oid);
2188 : }
2189 :
2190 115 : if (OidIsValid(finalfn_oid))
2191 : {
2192 79 : aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
2193 : ACL_EXECUTE);
2194 79 : if (aclresult != ACLCHECK_OK)
2195 0 : aclcheck_error(aclresult, ACL_KIND_PROC,
2196 0 : get_func_name(finalfn_oid));
2197 79 : InvokeFunctionExecuteHook(finalfn_oid);
2198 : }
2199 : }
2200 :
2201 : /* Detect how many arguments to pass to the finalfn */
2202 115 : if (finalextra)
2203 0 : peraggstate->numFinalArgs = numArguments + 1;
2204 : else
2205 115 : peraggstate->numFinalArgs = 1;
2206 :
2207 : /* resolve actual type of transition state, if polymorphic */
2208 115 : aggtranstype = resolve_aggregate_transtype(wfunc->winfnoid,
2209 : aggtranstype,
2210 : inputTypes,
2211 : numArguments);
2212 :
2213 : /* build expression trees using actual argument & result types */
2214 115 : build_aggregate_transfn_expr(inputTypes,
2215 : numArguments,
2216 : 0, /* no ordered-set window functions yet */
2217 : false, /* no variadic window functions yet */
2218 : aggtranstype,
2219 : wfunc->inputcollid,
2220 : transfn_oid,
2221 : invtransfn_oid,
2222 : &transfnexpr,
2223 : &invtransfnexpr);
2224 :
2225 : /* set up infrastructure for calling the transfn(s) and finalfn */
2226 115 : fmgr_info(transfn_oid, &peraggstate->transfn);
2227 115 : fmgr_info_set_expr((Node *) transfnexpr, &peraggstate->transfn);
2228 :
2229 115 : if (OidIsValid(invtransfn_oid))
2230 : {
2231 104 : fmgr_info(invtransfn_oid, &peraggstate->invtransfn);
2232 104 : fmgr_info_set_expr((Node *) invtransfnexpr, &peraggstate->invtransfn);
2233 : }
2234 :
2235 115 : if (OidIsValid(finalfn_oid))
2236 : {
2237 79 : build_aggregate_finalfn_expr(inputTypes,
2238 : peraggstate->numFinalArgs,
2239 : aggtranstype,
2240 : wfunc->wintype,
2241 : wfunc->inputcollid,
2242 : finalfn_oid,
2243 : &finalfnexpr);
2244 79 : fmgr_info(finalfn_oid, &peraggstate->finalfn);
2245 79 : fmgr_info_set_expr((Node *) finalfnexpr, &peraggstate->finalfn);
2246 : }
2247 :
2248 : /* get info about relevant datatypes */
2249 115 : get_typlenbyval(wfunc->wintype,
2250 : &peraggstate->resulttypeLen,
2251 : &peraggstate->resulttypeByVal);
2252 115 : get_typlenbyval(aggtranstype,
2253 : &peraggstate->transtypeLen,
2254 : &peraggstate->transtypeByVal);
2255 :
2256 : /*
2257 : * initval is potentially null, so don't try to access it as a struct
2258 : * field. Must do it the hard way with SysCacheGetAttr.
2259 : */
2260 115 : textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, initvalAttNo,
2261 : &peraggstate->initValueIsNull);
2262 :
2263 115 : if (peraggstate->initValueIsNull)
2264 66 : peraggstate->initValue = (Datum) 0;
2265 : else
2266 49 : peraggstate->initValue = GetAggInitVal(textInitVal,
2267 : aggtranstype);
2268 :
2269 : /*
2270 : * If the transfn is strict and the initval is NULL, make sure input type
2271 : * and transtype are the same (or at least binary-compatible), so that
2272 : * it's OK to use the first input value as the initial transValue. This
2273 : * should have been checked at agg definition time, but we must check
2274 : * again in case the transfn's strictness property has been changed.
2275 : */
2276 115 : if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull)
2277 : {
2278 36 : if (numArguments < 1 ||
2279 18 : !IsBinaryCoercible(inputTypes[0], aggtranstype))
2280 0 : ereport(ERROR,
2281 : (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
2282 : errmsg("aggregate %u needs to have compatible input type and transition type",
2283 : wfunc->winfnoid)));
2284 : }
2285 :
2286 : /*
2287 : * Insist that forward and inverse transition functions have the same
2288 : * strictness setting. Allowing them to differ would require handling
2289 : * more special cases in advance_windowaggregate and
2290 : * advance_windowaggregate_base, for no discernible benefit. This should
2291 : * have been checked at agg definition time, but we must check again in
2292 : * case either function's strictness property has been changed.
2293 : */
2294 219 : if (OidIsValid(invtransfn_oid) &&
2295 104 : peraggstate->transfn.fn_strict != peraggstate->invtransfn.fn_strict)
2296 0 : ereport(ERROR,
2297 : (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
2298 : errmsg("strictness of aggregate's forward and inverse transition functions must match")));
2299 :
2300 : /*
2301 : * Moving aggregates use their own aggcontext.
2302 : *
2303 : * This is necessary because they might restart at different times, so we
2304 : * might never be able to reset the shared context otherwise. We can't
2305 : * make it the aggregates' responsibility to clean up after themselves,
2306 : * because strict aggregates must be restarted whenever we remove their
2307 : * last non-NULL input, which the aggregate won't be aware is happening.
2308 : * Also, just pfree()ing the transValue upon restarting wouldn't help,
2309 : * since we'd miss any indirectly referenced data. We could, in theory,
2310 : * make the memory allocation rules for moving aggregates different than
2311 : * they have historically been for plain aggregates, but that seems grotty
2312 : * and likely to lead to memory leaks.
2313 : */
2314 115 : if (OidIsValid(invtransfn_oid))
2315 104 : peraggstate->aggcontext =
2316 104 : AllocSetContextCreate(CurrentMemoryContext,
2317 : "WindowAgg Per Aggregate",
2318 : ALLOCSET_DEFAULT_SIZES);
2319 : else
2320 11 : peraggstate->aggcontext = winstate->aggcontext;
2321 :
2322 115 : ReleaseSysCache(aggTuple);
2323 :
2324 115 : return peraggstate;
2325 : }
2326 :
2327 : static Datum
2328 49 : GetAggInitVal(Datum textInitVal, Oid transtype)
2329 : {
2330 : Oid typinput,
2331 : typioparam;
2332 : char *strInitVal;
2333 : Datum initVal;
2334 :
2335 49 : getTypeInputInfo(transtype, &typinput, &typioparam);
2336 49 : strInitVal = TextDatumGetCString(textInitVal);
2337 49 : initVal = OidInputFunctionCall(typinput, strInitVal,
2338 : typioparam, -1);
2339 49 : pfree(strInitVal);
2340 49 : return initVal;
2341 : }
2342 :
2343 : /*
2344 : * are_peers
2345 : * compare two rows to see if they are equal according to the ORDER BY clause
2346 : *
2347 : * NB: this does not consider the window frame mode.
2348 : */
2349 : static bool
2350 20441 : are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
2351 : TupleTableSlot *slot2)
2352 : {
2353 20441 : WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
2354 :
2355 : /* If no ORDER BY, all rows are peers with each other */
2356 20441 : if (node->ordNumCols == 0)
2357 110 : return true;
2358 :
2359 20331 : return execTuplesMatch(slot1, slot2,
2360 : node->ordNumCols, node->ordColIdx,
2361 : winstate->ordEqfunctions,
2362 20331 : winstate->tmpcontext->ecxt_per_tuple_memory);
2363 : }
2364 :
2365 : /*
2366 : * window_gettupleslot
2367 : * Fetch the pos'th tuple of the current partition into the slot,
2368 : * using the winobj's read pointer
2369 : *
2370 : * Returns true if successful, false if no such row
2371 : */
2372 : static bool
2373 83902 : window_gettupleslot(WindowObject winobj, int64 pos, TupleTableSlot *slot)
2374 : {
2375 83902 : WindowAggState *winstate = winobj->winstate;
2376 : MemoryContext oldcontext;
2377 :
2378 : /* often called repeatedly in a row */
2379 83902 : CHECK_FOR_INTERRUPTS();
2380 :
2381 : /* Don't allow passing -1 to spool_tuples here */
2382 83902 : if (pos < 0)
2383 20 : return false;
2384 :
2385 : /* If necessary, fetch the tuple into the spool */
2386 83882 : spool_tuples(winstate, pos);
2387 :
2388 83882 : if (pos >= winstate->spooled_rows)
2389 460 : return false;
2390 :
2391 83422 : if (pos < winobj->markpos)
2392 0 : elog(ERROR, "cannot fetch row before WindowObject's mark position");
2393 :
2394 83422 : oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
2395 :
2396 83422 : tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
2397 :
2398 : /*
2399 : * Advance or rewind until we are within one tuple of the one we want.
2400 : */
2401 83422 : if (winobj->seekpos < pos - 1)
2402 : {
2403 126 : if (!tuplestore_skiptuples(winstate->buffer,
2404 126 : pos - 1 - winobj->seekpos,
2405 : true))
2406 0 : elog(ERROR, "unexpected end of tuplestore");
2407 126 : winobj->seekpos = pos - 1;
2408 : }
2409 83296 : else if (winobj->seekpos > pos + 1)
2410 : {
2411 142 : if (!tuplestore_skiptuples(winstate->buffer,
2412 142 : winobj->seekpos - (pos + 1),
2413 : false))
2414 0 : elog(ERROR, "unexpected end of tuplestore");
2415 142 : winobj->seekpos = pos + 1;
2416 : }
2417 83154 : else if (winobj->seekpos == pos)
2418 : {
2419 : /*
2420 : * There's no API to refetch the tuple at the current position. We
2421 : * have to move one tuple forward, and then one backward. (We don't
2422 : * do it the other way because we might try to fetch the row before
2423 : * our mark, which isn't allowed.) XXX this case could stand to be
2424 : * optimized.
2425 : */
2426 411 : tuplestore_advance(winstate->buffer, true);
2427 411 : winobj->seekpos++;
2428 : }
2429 :
2430 : /*
2431 : * Now we should be on the tuple immediately before or after the one we
2432 : * want, so just fetch forwards or backwards as appropriate.
2433 : */
2434 83422 : if (winobj->seekpos > pos)
2435 : {
2436 623 : if (!tuplestore_gettupleslot(winstate->buffer, false, true, slot))
2437 0 : elog(ERROR, "unexpected end of tuplestore");
2438 623 : winobj->seekpos--;
2439 : }
2440 : else
2441 : {
2442 82799 : if (!tuplestore_gettupleslot(winstate->buffer, true, true, slot))
2443 0 : elog(ERROR, "unexpected end of tuplestore");
2444 82799 : winobj->seekpos++;
2445 : }
2446 :
2447 83422 : Assert(winobj->seekpos == pos);
2448 :
2449 83422 : MemoryContextSwitchTo(oldcontext);
2450 :
2451 83422 : return true;
2452 : }
2453 :
2454 :
2455 : /***********************************************************************
2456 : * API exposed to window functions
2457 : ***********************************************************************/
2458 :
2459 :
2460 : /*
2461 : * WinGetPartitionLocalMemory
2462 : * Get working memory that lives till end of partition processing
2463 : *
2464 : * On first call within a given partition, this allocates and zeroes the
2465 : * requested amount of space. Subsequent calls just return the same chunk.
2466 : *
2467 : * Memory obtained this way is normally used to hold state that should be
2468 : * automatically reset for each new partition. If a window function wants
2469 : * to hold state across the whole query, fcinfo->fn_extra can be used in the
2470 : * usual way for that.
2471 : */
2472 : void *
2473 175 : WinGetPartitionLocalMemory(WindowObject winobj, Size sz)
2474 : {
2475 175 : Assert(WindowObjectIsValid(winobj));
2476 175 : if (winobj->localmem == NULL)
2477 30 : winobj->localmem =
2478 30 : MemoryContextAllocZero(winobj->winstate->partcontext, sz);
2479 175 : return winobj->localmem;
2480 : }
2481 :
2482 : /*
2483 : * WinGetCurrentPosition
2484 : * Return the current row's position (counting from 0) within the current
2485 : * partition.
2486 : */
2487 : int64
2488 157 : WinGetCurrentPosition(WindowObject winobj)
2489 : {
2490 157 : Assert(WindowObjectIsValid(winobj));
2491 157 : return winobj->winstate->currentpos;
2492 : }
2493 :
2494 : /*
2495 : * WinGetPartitionRowCount
2496 : * Return total number of rows contained in the current partition.
2497 : *
2498 : * Note: this is a relatively expensive operation because it forces the
2499 : * whole partition to be "spooled" into the tuplestore at once. Once
2500 : * executed, however, additional calls within the same partition are cheap.
2501 : */
2502 : int64
2503 24 : WinGetPartitionRowCount(WindowObject winobj)
2504 : {
2505 24 : Assert(WindowObjectIsValid(winobj));
2506 24 : spool_tuples(winobj->winstate, -1);
2507 24 : return winobj->winstate->spooled_rows;
2508 : }
2509 :
2510 : /*
2511 : * WinSetMarkPosition
2512 : * Set the "mark" position for the window object, which is the oldest row
2513 : * number (counting from 0) it is allowed to fetch during all subsequent
2514 : * operations within the current partition.
2515 : *
2516 : * Window functions do not have to call this, but are encouraged to move the
2517 : * mark forward when possible to keep the tuplestore size down and prevent
2518 : * having to spill rows to disk.
2519 : */
2520 : void
2521 60725 : WinSetMarkPosition(WindowObject winobj, int64 markpos)
2522 : {
2523 : WindowAggState *winstate;
2524 :
2525 60725 : Assert(WindowObjectIsValid(winobj));
2526 60725 : winstate = winobj->winstate;
2527 :
2528 60725 : if (markpos < winobj->markpos)
2529 0 : elog(ERROR, "cannot move WindowObject's mark position backward");
2530 60725 : tuplestore_select_read_pointer(winstate->buffer, winobj->markptr);
2531 60725 : if (markpos > winobj->markpos)
2532 : {
2533 60641 : tuplestore_skiptuples(winstate->buffer,
2534 60641 : markpos - winobj->markpos,
2535 : true);
2536 60641 : winobj->markpos = markpos;
2537 : }
2538 60725 : tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
2539 60725 : if (markpos > winobj->seekpos)
2540 : {
2541 452 : tuplestore_skiptuples(winstate->buffer,
2542 452 : markpos - winobj->seekpos,
2543 : true);
2544 452 : winobj->seekpos = markpos;
2545 : }
2546 60725 : }
2547 :
2548 : /*
2549 : * WinRowsArePeers
2550 : * Compare two rows (specified by absolute position in window) to see
2551 : * if they are equal according to the ORDER BY clause.
2552 : *
2553 : * NB: this does not consider the window frame mode.
2554 : */
2555 : bool
2556 60 : WinRowsArePeers(WindowObject winobj, int64 pos1, int64 pos2)
2557 : {
2558 : WindowAggState *winstate;
2559 : WindowAgg *node;
2560 : TupleTableSlot *slot1;
2561 : TupleTableSlot *slot2;
2562 : bool res;
2563 :
2564 60 : Assert(WindowObjectIsValid(winobj));
2565 60 : winstate = winobj->winstate;
2566 60 : node = (WindowAgg *) winstate->ss.ps.plan;
2567 :
2568 : /* If no ORDER BY, all rows are peers; don't bother to fetch them */
2569 60 : if (node->ordNumCols == 0)
2570 0 : return true;
2571 :
2572 60 : slot1 = winstate->temp_slot_1;
2573 60 : slot2 = winstate->temp_slot_2;
2574 :
2575 60 : if (!window_gettupleslot(winobj, pos1, slot1))
2576 0 : elog(ERROR, "specified position is out of window: " INT64_FORMAT,
2577 : pos1);
2578 60 : if (!window_gettupleslot(winobj, pos2, slot2))
2579 0 : elog(ERROR, "specified position is out of window: " INT64_FORMAT,
2580 : pos2);
2581 :
2582 60 : res = are_peers(winstate, slot1, slot2);
2583 :
2584 60 : ExecClearTuple(slot1);
2585 60 : ExecClearTuple(slot2);
2586 :
2587 60 : return res;
2588 : }
2589 :
2590 : /*
2591 : * WinGetFuncArgInPartition
2592 : * Evaluate a window function's argument expression on a specified
2593 : * row of the partition. The row is identified in lseek(2) style,
2594 : * i.e. relative to the current, first, or last row.
2595 : *
2596 : * argno: argument number to evaluate (counted from 0)
2597 : * relpos: signed rowcount offset from the seek position
2598 : * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
2599 : * set_mark: If the row is found and set_mark is true, the mark is moved to
2600 : * the row as a side-effect.
2601 : * isnull: output argument, receives isnull status of result
2602 : * isout: output argument, set to indicate whether target row position
2603 : * is out of partition (can pass NULL if caller doesn't care about this)
2604 : *
2605 : * Specifying a nonexistent row is not an error, it just causes a null result
2606 : * (plus setting *isout true, if isout isn't NULL).
2607 : */
2608 : Datum
2609 60077 : WinGetFuncArgInPartition(WindowObject winobj, int argno,
2610 : int relpos, int seektype, bool set_mark,
2611 : bool *isnull, bool *isout)
2612 : {
2613 : WindowAggState *winstate;
2614 : ExprContext *econtext;
2615 : TupleTableSlot *slot;
2616 : bool gottuple;
2617 : int64 abs_pos;
2618 :
2619 60077 : Assert(WindowObjectIsValid(winobj));
2620 60077 : winstate = winobj->winstate;
2621 60077 : econtext = winstate->ss.ps.ps_ExprContext;
2622 60077 : slot = winstate->temp_slot_1;
2623 :
2624 60077 : switch (seektype)
2625 : {
2626 : case WINDOW_SEEK_CURRENT:
2627 60077 : abs_pos = winstate->currentpos + relpos;
2628 60077 : break;
2629 : case WINDOW_SEEK_HEAD:
2630 0 : abs_pos = relpos;
2631 0 : break;
2632 : case WINDOW_SEEK_TAIL:
2633 0 : spool_tuples(winstate, -1);
2634 0 : abs_pos = winstate->spooled_rows - 1 + relpos;
2635 0 : break;
2636 : default:
2637 0 : elog(ERROR, "unrecognized window seek type: %d", seektype);
2638 : abs_pos = 0; /* keep compiler quiet */
2639 : break;
2640 : }
2641 :
2642 60077 : gottuple = window_gettupleslot(winobj, abs_pos, slot);
2643 :
2644 60077 : if (!gottuple)
2645 : {
2646 34 : if (isout)
2647 34 : *isout = true;
2648 34 : *isnull = true;
2649 34 : return (Datum) 0;
2650 : }
2651 : else
2652 : {
2653 60043 : if (isout)
2654 60043 : *isout = false;
2655 60043 : if (set_mark)
2656 : {
2657 60023 : int frameOptions = winstate->frameOptions;
2658 60023 : int64 mark_pos = abs_pos;
2659 :
2660 : /*
2661 : * In RANGE mode with a moving frame head, we must not let the
2662 : * mark advance past frameheadpos, since that row has to be
2663 : * fetchable during future update_frameheadpos calls.
2664 : *
2665 : * XXX it is very ugly to pollute window functions' marks with
2666 : * this consideration; it could for instance mask a logic bug that
2667 : * lets a window function fetch rows before what it had claimed
2668 : * was its mark. Perhaps use a separate mark for frame head
2669 : * probes?
2670 : */
2671 120046 : if ((frameOptions & FRAMEOPTION_RANGE) &&
2672 60023 : !(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
2673 : {
2674 0 : update_frameheadpos(winobj, winstate->temp_slot_2);
2675 0 : if (mark_pos > winstate->frameheadpos)
2676 0 : mark_pos = winstate->frameheadpos;
2677 : }
2678 60023 : WinSetMarkPosition(winobj, mark_pos);
2679 : }
2680 60043 : econtext->ecxt_outertuple = slot;
2681 60043 : return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
2682 : econtext, isnull);
2683 : }
2684 : }
2685 :
2686 : /*
2687 : * WinGetFuncArgInFrame
2688 : * Evaluate a window function's argument expression on a specified
2689 : * row of the window frame. The row is identified in lseek(2) style,
2690 : * i.e. relative to the current, first, or last row.
2691 : *
2692 : * argno: argument number to evaluate (counted from 0)
2693 : * relpos: signed rowcount offset from the seek position
2694 : * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
2695 : * set_mark: If the row is found and set_mark is true, the mark is moved to
2696 : * the row as a side-effect.
2697 : * isnull: output argument, receives isnull status of result
2698 : * isout: output argument, set to indicate whether target row position
2699 : * is out of frame (can pass NULL if caller doesn't care about this)
2700 : *
2701 : * Specifying a nonexistent row is not an error, it just causes a null result
2702 : * (plus setting *isout true, if isout isn't NULL).
2703 : */
2704 : Datum
2705 190 : WinGetFuncArgInFrame(WindowObject winobj, int argno,
2706 : int relpos, int seektype, bool set_mark,
2707 : bool *isnull, bool *isout)
2708 : {
2709 : WindowAggState *winstate;
2710 : ExprContext *econtext;
2711 : TupleTableSlot *slot;
2712 : bool gottuple;
2713 : int64 abs_pos;
2714 :
2715 190 : Assert(WindowObjectIsValid(winobj));
2716 190 : winstate = winobj->winstate;
2717 190 : econtext = winstate->ss.ps.ps_ExprContext;
2718 190 : slot = winstate->temp_slot_1;
2719 :
2720 190 : switch (seektype)
2721 : {
2722 : case WINDOW_SEEK_CURRENT:
2723 0 : abs_pos = winstate->currentpos + relpos;
2724 0 : break;
2725 : case WINDOW_SEEK_HEAD:
2726 60 : update_frameheadpos(winobj, slot);
2727 60 : abs_pos = winstate->frameheadpos + relpos;
2728 60 : break;
2729 : case WINDOW_SEEK_TAIL:
2730 130 : update_frametailpos(winobj, slot);
2731 130 : abs_pos = winstate->frametailpos + relpos;
2732 130 : break;
2733 : default:
2734 0 : elog(ERROR, "unrecognized window seek type: %d", seektype);
2735 : abs_pos = 0; /* keep compiler quiet */
2736 : break;
2737 : }
2738 :
2739 190 : gottuple = window_gettupleslot(winobj, abs_pos, slot);
2740 190 : if (gottuple)
2741 186 : gottuple = row_is_in_frame(winstate, abs_pos, slot);
2742 :
2743 190 : if (!gottuple)
2744 : {
2745 4 : if (isout)
2746 0 : *isout = true;
2747 4 : *isnull = true;
2748 4 : return (Datum) 0;
2749 : }
2750 : else
2751 : {
2752 186 : if (isout)
2753 0 : *isout = false;
2754 186 : if (set_mark)
2755 : {
2756 179 : int frameOptions = winstate->frameOptions;
2757 179 : int64 mark_pos = abs_pos;
2758 :
2759 : /*
2760 : * In RANGE mode with a moving frame head, we must not let the
2761 : * mark advance past frameheadpos, since that row has to be
2762 : * fetchable during future update_frameheadpos calls.
2763 : *
2764 : * XXX it is very ugly to pollute window functions' marks with
2765 : * this consideration; it could for instance mask a logic bug that
2766 : * lets a window function fetch rows before what it had claimed
2767 : * was its mark. Perhaps use a separate mark for frame head
2768 : * probes?
2769 : */
2770 338 : if ((frameOptions & FRAMEOPTION_RANGE) &&
2771 159 : !(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
2772 : {
2773 30 : update_frameheadpos(winobj, winstate->temp_slot_2);
2774 30 : if (mark_pos > winstate->frameheadpos)
2775 20 : mark_pos = winstate->frameheadpos;
2776 : }
2777 179 : WinSetMarkPosition(winobj, mark_pos);
2778 : }
2779 186 : econtext->ecxt_outertuple = slot;
2780 186 : return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
2781 : econtext, isnull);
2782 : }
2783 : }
2784 :
2785 : /*
2786 : * WinGetFuncArgCurrent
2787 : * Evaluate a window function's argument expression on the current row.
2788 : *
2789 : * argno: argument number to evaluate (counted from 0)
2790 : * isnull: output argument, receives isnull status of result
2791 : *
2792 : * Note: this isn't quite equivalent to WinGetFuncArgInPartition or
2793 : * WinGetFuncArgInFrame targeting the current row, because it will succeed
2794 : * even if the WindowObject's mark has been set beyond the current row.
2795 : * This should generally be used for "ordinary" arguments of a window
2796 : * function, such as the offset argument of lead() or lag().
2797 : */
2798 : Datum
2799 103 : WinGetFuncArgCurrent(WindowObject winobj, int argno, bool *isnull)
2800 : {
2801 : WindowAggState *winstate;
2802 : ExprContext *econtext;
2803 :
2804 103 : Assert(WindowObjectIsValid(winobj));
2805 103 : winstate = winobj->winstate;
2806 :
2807 103 : econtext = winstate->ss.ps.ps_ExprContext;
2808 :
2809 103 : econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
2810 103 : return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
2811 : econtext, isnull);
2812 : }
|