1 /*-------------------------------------------------------------------------
4 * routines to handle WindowAgg nodes.
6 * A WindowAgg node evaluates "window functions" across suitable partitions
7 * of the input tuple set. Any one WindowAgg works for just a single window
8 * specification, though it can evaluate multiple window functions sharing
9 * identical window specifications. The input tuples are required to be
10 * delivered in sorted order, with the PARTITION BY columns (if any) as
11 * major sort keys and the ORDER BY columns (if any) as minor sort keys.
12 * (The planner generates a stack of WindowAggs with intervening Sort nodes
13 * as needed, if a query involves more than one window specification.)
15 * Since window functions can require access to any or all of the rows in
16 * the current partition, we accumulate rows of the partition into a
17 * tuplestore. The window functions are called using the WindowObject API
18 * so that they can access those rows as needed.
20 * We also support using plain aggregate functions as window functions.
21 * For these, the regular Agg-node environment is emulated for each partition.
22 * As required by the SQL spec, the output represents the value of the
23 * aggregate function over all rows in the current row's window frame.
26 * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
27 * Portions Copyright (c) 1994, Regents of the University of California
30 * src/backend/executor/nodeWindowAgg.c
32 *-------------------------------------------------------------------------
36 #include "catalog/pg_aggregate.h"
37 #include "catalog/pg_proc.h"
38 #include "catalog/pg_type.h"
39 #include "executor/executor.h"
40 #include "executor/nodeWindowAgg.h"
41 #include "miscadmin.h"
42 #include "nodes/nodeFuncs.h"
43 #include "optimizer/clauses.h"
44 #include "parser/parse_agg.h"
45 #include "parser/parse_coerce.h"
46 #include "utils/acl.h"
47 #include "utils/builtins.h"
48 #include "utils/datum.h"
49 #include "utils/lsyscache.h"
50 #include "utils/memutils.h"
51 #include "utils/syscache.h"
52 #include "windowapi.h"
55 * All the window function APIs are called with this object, which is passed
56 * to window functions as fcinfo->context.
58 typedef struct WindowObjectData
61 WindowAggState *winstate; /* parent WindowAggState */
62 List *argstates; /* ExprState trees for fn's arguments */
63 void *localmem; /* WinGetPartitionLocalMemory's chunk */
64 int markptr; /* tuplestore mark pointer for this fn */
65 int readptr; /* tuplestore read pointer for this fn */
66 int64 markpos; /* row that markptr is positioned on */
67 int64 seekpos; /* row that readptr is positioned on */
71 * We have one WindowStatePerFunc struct for each window function and
72 * window aggregate handled by this node.
74 typedef struct WindowStatePerFuncData
76 /* Links to WindowFunc expr and state nodes this working state is for */
77 WindowFuncExprState *wfuncstate;
80 int numArguments; /* number of arguments */
82 FmgrInfo flinfo; /* fmgr lookup data for window function */
85 * We need the len and byval info for the result of each function in order
86 * to know how to copy/delete values.
91 bool plain_agg; /* is it just a plain aggregate function? */
92 int aggno; /* if so, index of its PerAggData */
94 WindowObject winobj; /* object used in window function API */
95 } WindowStatePerFuncData;
98 * For plain aggregate window functions, we also have one of these.
100 typedef struct WindowStatePerAggData
102 /* Oids of transfer functions */
104 Oid finalfn_oid; /* may be InvalidOid */
107 * fmgr lookup data for transfer functions --- only valid when
108 * corresponding oid is not InvalidOid. Note in particular that fn_strict
109 * flags are kept here.
115 * initial value from pg_aggregate entry
118 bool initValueIsNull;
121 * cached value for current frame boundaries
124 bool resultValueIsNull;
127 * We need the len and byval info for the agg's input, result, and
128 * transition data types in order to know how to copy/delete values.
137 int wfuncno; /* index of associated PerFuncData */
139 /* Current transition value */
140 Datum transValue; /* current transition value */
141 bool transValueIsNull;
143 bool noTransValue; /* true if transValue not set yet */
144 } WindowStatePerAggData;
146 static void initialize_windowaggregate(WindowAggState *winstate,
147 WindowStatePerFunc perfuncstate,
148 WindowStatePerAgg peraggstate);
149 static void advance_windowaggregate(WindowAggState *winstate,
150 WindowStatePerFunc perfuncstate,
151 WindowStatePerAgg peraggstate);
152 static void finalize_windowaggregate(WindowAggState *winstate,
153 WindowStatePerFunc perfuncstate,
154 WindowStatePerAgg peraggstate,
155 Datum *result, bool *isnull);
157 static void eval_windowaggregates(WindowAggState *winstate);
158 static void eval_windowfunction(WindowAggState *winstate,
159 WindowStatePerFunc perfuncstate,
160 Datum *result, bool *isnull);
162 static void begin_partition(WindowAggState *winstate);
163 static void spool_tuples(WindowAggState *winstate, int64 pos);
164 static void release_partition(WindowAggState *winstate);
166 static bool row_is_in_frame(WindowAggState *winstate, int64 pos,
167 TupleTableSlot *slot);
168 static void update_frameheadpos(WindowObject winobj, TupleTableSlot *slot);
169 static void update_frametailpos(WindowObject winobj, TupleTableSlot *slot);
171 static WindowStatePerAggData *initialize_peragg(WindowAggState *winstate,
173 WindowStatePerAgg peraggstate);
174 static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
176 static bool are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
177 TupleTableSlot *slot2);
178 static bool window_gettupleslot(WindowObject winobj, int64 pos,
179 TupleTableSlot *slot);
183 * initialize_windowaggregate
184 * parallel to initialize_aggregates in nodeAgg.c
187 initialize_windowaggregate(WindowAggState *winstate,
188 WindowStatePerFunc perfuncstate,
189 WindowStatePerAgg peraggstate)
191 MemoryContext oldContext;
193 if (peraggstate->initValueIsNull)
194 peraggstate->transValue = peraggstate->initValue;
197 oldContext = MemoryContextSwitchTo(winstate->aggcontext);
198 peraggstate->transValue = datumCopy(peraggstate->initValue,
199 peraggstate->transtypeByVal,
200 peraggstate->transtypeLen);
201 MemoryContextSwitchTo(oldContext);
203 peraggstate->transValueIsNull = peraggstate->initValueIsNull;
204 peraggstate->noTransValue = peraggstate->initValueIsNull;
205 peraggstate->resultValueIsNull = true;
209 * advance_windowaggregate
210 * parallel to advance_aggregates in nodeAgg.c
213 advance_windowaggregate(WindowAggState *winstate,
214 WindowStatePerFunc perfuncstate,
215 WindowStatePerAgg peraggstate)
217 WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
218 int numArguments = perfuncstate->numArguments;
219 FunctionCallInfoData fcinfodata;
220 FunctionCallInfo fcinfo = &fcinfodata;
224 MemoryContext oldContext;
225 ExprContext *econtext = winstate->tmpcontext;
227 oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
229 /* We start from 1, since the 0th arg will be the transition value */
231 foreach(arg, wfuncstate->args)
233 ExprState *argstate = (ExprState *) lfirst(arg);
235 fcinfo->arg[i] = ExecEvalExpr(argstate, econtext,
236 &fcinfo->argnull[i], NULL);
240 if (peraggstate->transfn.fn_strict)
243 * For a strict transfn, nothing happens when there's a NULL input; we
244 * just keep the prior transValue.
246 for (i = 1; i <= numArguments; i++)
248 if (fcinfo->argnull[i])
250 MemoryContextSwitchTo(oldContext);
254 if (peraggstate->noTransValue)
257 * transValue has not been initialized. This is the first non-NULL
258 * input value. We use it as the initial value for transValue. (We
259 * already checked that the agg's input type is binary-compatible
260 * with its transtype, so straight copy here is OK.)
262 * We must copy the datum into aggcontext if it is pass-by-ref. We
263 * do not need to pfree the old transValue, since it's NULL.
265 MemoryContextSwitchTo(winstate->aggcontext);
266 peraggstate->transValue = datumCopy(fcinfo->arg[1],
267 peraggstate->transtypeByVal,
268 peraggstate->transtypeLen);
269 peraggstate->transValueIsNull = false;
270 peraggstate->noTransValue = false;
271 MemoryContextSwitchTo(oldContext);
274 if (peraggstate->transValueIsNull)
277 * Don't call a strict function with NULL inputs. Note it is
278 * possible to get here despite the above tests, if the transfn is
279 * strict *and* returned a NULL on a prior cycle. If that happens
280 * we will propagate the NULL all the way to the end.
282 MemoryContextSwitchTo(oldContext);
288 * OK to call the transition function
290 InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn),
292 (void *) winstate, NULL);
293 fcinfo->arg[0] = peraggstate->transValue;
294 fcinfo->argnull[0] = peraggstate->transValueIsNull;
295 newVal = FunctionCallInvoke(fcinfo);
298 * If pass-by-ref datatype, must copy the new value into aggcontext and
299 * pfree the prior transValue. But if transfn returned a pointer to its
300 * first input, we don't need to do anything.
302 if (!peraggstate->transtypeByVal &&
303 DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
307 MemoryContextSwitchTo(winstate->aggcontext);
308 newVal = datumCopy(newVal,
309 peraggstate->transtypeByVal,
310 peraggstate->transtypeLen);
312 if (!peraggstate->transValueIsNull)
313 pfree(DatumGetPointer(peraggstate->transValue));
316 MemoryContextSwitchTo(oldContext);
317 peraggstate->transValue = newVal;
318 peraggstate->transValueIsNull = fcinfo->isnull;
322 * finalize_windowaggregate
323 * parallel to finalize_aggregate in nodeAgg.c
326 finalize_windowaggregate(WindowAggState *winstate,
327 WindowStatePerFunc perfuncstate,
328 WindowStatePerAgg peraggstate,
329 Datum *result, bool *isnull)
331 MemoryContext oldContext;
333 oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
336 * Apply the agg's finalfn if one is provided, else return transValue.
338 if (OidIsValid(peraggstate->finalfn_oid))
340 FunctionCallInfoData fcinfo;
342 InitFunctionCallInfoData(fcinfo, &(peraggstate->finalfn), 1,
343 (void *) winstate, NULL);
344 fcinfo.arg[0] = peraggstate->transValue;
345 fcinfo.argnull[0] = peraggstate->transValueIsNull;
346 if (fcinfo.flinfo->fn_strict && peraggstate->transValueIsNull)
348 /* don't call a strict function with NULL inputs */
354 *result = FunctionCallInvoke(&fcinfo);
355 *isnull = fcinfo.isnull;
360 *result = peraggstate->transValue;
361 *isnull = peraggstate->transValueIsNull;
365 * If result is pass-by-ref, make sure it is in the right context.
367 if (!peraggstate->resulttypeByVal && !*isnull &&
368 !MemoryContextContains(CurrentMemoryContext,
369 DatumGetPointer(*result)))
370 *result = datumCopy(*result,
371 peraggstate->resulttypeByVal,
372 peraggstate->resulttypeLen);
373 MemoryContextSwitchTo(oldContext);
377 * eval_windowaggregates
378 * evaluate plain aggregates being used as window functions
380 * Much of this is duplicated from nodeAgg.c. But NOTE that we expect to be
381 * able to call aggregate final functions repeatedly after aggregating more
382 * data onto the same transition value. This is not a behavior required by
386 eval_windowaggregates(WindowAggState *winstate)
388 WindowStatePerAgg peraggstate;
392 MemoryContext oldContext;
393 ExprContext *econtext;
394 WindowObject agg_winobj;
395 TupleTableSlot *agg_row_slot;
397 numaggs = winstate->numaggs;
399 return; /* nothing to do */
401 /* final output execution is in ps_ExprContext */
402 econtext = winstate->ss.ps.ps_ExprContext;
403 agg_winobj = winstate->agg_winobj;
404 agg_row_slot = winstate->agg_row_slot;
407 * Currently, we support only a subset of the SQL-standard window framing
410 * If the frame start is UNBOUNDED_PRECEDING, the window frame consists of
411 * a contiguous group of rows extending forward from the start of the
412 * partition, and rows only enter the frame, never exit it, as the current
413 * row advances forward. This makes it possible to use an incremental
414 * strategy for evaluating aggregates: we run the transition function for
415 * each row added to the frame, and run the final function whenever we
416 * need the current aggregate value. This is considerably more efficient
417 * than the naive approach of re-running the entire aggregate calculation
418 * for each current row. It does assume that the final function doesn't
419 * damage the running transition value, but we have the same assumption in
420 * nodeAgg.c too (when it rescans an existing hash table).
422 * For other frame start rules, we discard the aggregate state and re-run
423 * the aggregates whenever the frame head row moves. We can still
424 * optimize as above whenever successive rows share the same frame head.
426 * In many common cases, multiple rows share the same frame and hence the
427 * same aggregate value. (In particular, if there's no ORDER BY in a RANGE
428 * window, then all rows are peers and so they all have window frame equal
429 * to the whole partition.) We optimize such cases by calculating the
430 * aggregate value once when we reach the first row of a peer group, and
431 * then returning the saved value for all subsequent rows.
433 * 'aggregatedupto' keeps track of the first row that has not yet been
434 * accumulated into the aggregate transition values. Whenever we start a
435 * new peer group, we accumulate forward to the end of the peer group.
437 * TODO: Rerunning aggregates from the frame start can be pretty slow. For
438 * some aggregates like SUM and COUNT we could avoid that by implementing
439 * a "negative transition function" that would be called for each row as
440 * it exits the frame. We'd have to think about avoiding recalculation of
441 * volatile arguments of aggregate functions, too.
445 * First, update the frame head position.
447 update_frameheadpos(agg_winobj, winstate->temp_slot_1);
450 * Initialize aggregates on first call for partition, or if the frame head
451 * position moved since last time.
453 if (winstate->currentpos == 0 ||
454 winstate->frameheadpos != winstate->aggregatedbase)
457 * Discard transient aggregate values
459 MemoryContextResetAndDeleteChildren(winstate->aggcontext);
461 for (i = 0; i < numaggs; i++)
463 peraggstate = &winstate->peragg[i];
464 wfuncno = peraggstate->wfuncno;
465 initialize_windowaggregate(winstate,
466 &winstate->perfunc[wfuncno],
471 * If we created a mark pointer for aggregates, keep it pushed up to
472 * frame head, so that tuplestore can discard unnecessary rows.
474 if (agg_winobj->markptr >= 0)
475 WinSetMarkPosition(agg_winobj, winstate->frameheadpos);
478 * Initialize for loop below
480 ExecClearTuple(agg_row_slot);
481 winstate->aggregatedbase = winstate->frameheadpos;
482 winstate->aggregatedupto = winstate->frameheadpos;
486 * In UNBOUNDED_FOLLOWING mode, we don't have to recalculate aggregates
487 * except when the frame head moves. In END_CURRENT_ROW mode, we only
488 * have to recalculate when the frame head moves or currentpos has
489 * advanced past the place we'd aggregated up to. Check for these cases
490 * and if so, reuse the saved result values.
492 if ((winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING |
493 FRAMEOPTION_END_CURRENT_ROW)) &&
494 winstate->aggregatedbase <= winstate->currentpos &&
495 winstate->aggregatedupto > winstate->currentpos)
497 for (i = 0; i < numaggs; i++)
499 peraggstate = &winstate->peragg[i];
500 wfuncno = peraggstate->wfuncno;
501 econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue;
502 econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull;
508 * Advance until we reach a row not in frame (or end of partition).
510 * Note the loop invariant: agg_row_slot is either empty or holds the row
511 * at position aggregatedupto. We advance aggregatedupto after processing
516 /* Fetch next row if we didn't already */
517 if (TupIsNull(agg_row_slot))
519 if (!window_gettupleslot(agg_winobj, winstate->aggregatedupto,
521 break; /* must be end of partition */
524 /* Exit loop (for now) if not in frame */
525 if (!row_is_in_frame(winstate, winstate->aggregatedupto, agg_row_slot))
528 /* Set tuple context for evaluation of aggregate arguments */
529 winstate->tmpcontext->ecxt_outertuple = agg_row_slot;
531 /* Accumulate row into the aggregates */
532 for (i = 0; i < numaggs; i++)
534 peraggstate = &winstate->peragg[i];
535 wfuncno = peraggstate->wfuncno;
536 advance_windowaggregate(winstate,
537 &winstate->perfunc[wfuncno],
541 /* Reset per-input-tuple context after each tuple */
542 ResetExprContext(winstate->tmpcontext);
544 /* And advance the aggregated-row state */
545 winstate->aggregatedupto++;
546 ExecClearTuple(agg_row_slot);
550 * finalize aggregates and fill result/isnull fields.
552 for (i = 0; i < numaggs; i++)
557 peraggstate = &winstate->peragg[i];
558 wfuncno = peraggstate->wfuncno;
559 result = &econtext->ecxt_aggvalues[wfuncno];
560 isnull = &econtext->ecxt_aggnulls[wfuncno];
561 finalize_windowaggregate(winstate,
562 &winstate->perfunc[wfuncno],
567 * save the result in case next row shares the same frame.
569 * XXX in some framing modes, eg ROWS/END_CURRENT_ROW, we can know in
570 * advance that the next row can't possibly share the same frame. Is
571 * it worth detecting that and skipping this code?
573 if (!peraggstate->resulttypeByVal)
576 * clear old resultValue in order not to leak memory. (Note: the
577 * new result can't possibly be the same datum as old resultValue,
578 * because we never passed it to the trans function.)
580 if (!peraggstate->resultValueIsNull)
581 pfree(DatumGetPointer(peraggstate->resultValue));
584 * If pass-by-ref, copy it into our aggregate context.
588 oldContext = MemoryContextSwitchTo(winstate->aggcontext);
589 peraggstate->resultValue =
591 peraggstate->resulttypeByVal,
592 peraggstate->resulttypeLen);
593 MemoryContextSwitchTo(oldContext);
598 peraggstate->resultValue = *result;
600 peraggstate->resultValueIsNull = *isnull;
605 * eval_windowfunction
607 * Arguments of window functions are not evaluated here, because a window
608 * function can need random access to arbitrary rows in the partition.
609 * The window function uses the special WinGetFuncArgInPartition and
610 * WinGetFuncArgInFrame functions to evaluate the arguments for the rows
614 eval_windowfunction(WindowAggState *winstate, WindowStatePerFunc perfuncstate,
615 Datum *result, bool *isnull)
617 FunctionCallInfoData fcinfo;
618 MemoryContext oldContext;
620 oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
623 * We don't pass any normal arguments to a window function, but we do pass
624 * it the number of arguments, in order to permit window function
625 * implementations to support varying numbers of arguments. The real info
626 * goes through the WindowObject, which is passed via fcinfo->context.
628 InitFunctionCallInfoData(fcinfo, &(perfuncstate->flinfo),
629 perfuncstate->numArguments,
630 (void *) perfuncstate->winobj, NULL);
631 /* Just in case, make all the regular argument slots be null */
632 memset(fcinfo.argnull, true, perfuncstate->numArguments);
634 *result = FunctionCallInvoke(&fcinfo);
635 *isnull = fcinfo.isnull;
638 * Make sure pass-by-ref data is allocated in the appropriate context. (We
639 * need this in case the function returns a pointer into some short-lived
640 * tuple, as is entirely possible.)
642 if (!perfuncstate->resulttypeByVal && !fcinfo.isnull &&
643 !MemoryContextContains(CurrentMemoryContext,
644 DatumGetPointer(*result)))
645 *result = datumCopy(*result,
646 perfuncstate->resulttypeByVal,
647 perfuncstate->resulttypeLen);
649 MemoryContextSwitchTo(oldContext);
654 * Start buffering rows of the next partition.
657 begin_partition(WindowAggState *winstate)
659 PlanState *outerPlan = outerPlanState(winstate);
660 int numfuncs = winstate->numfuncs;
663 winstate->partition_spooled = false;
664 winstate->framehead_valid = false;
665 winstate->frametail_valid = false;
666 winstate->spooled_rows = 0;
667 winstate->currentpos = 0;
668 winstate->frameheadpos = 0;
669 winstate->frametailpos = -1;
670 ExecClearTuple(winstate->agg_row_slot);
673 * If this is the very first partition, we need to fetch the first input
674 * row to store in first_part_slot.
676 if (TupIsNull(winstate->first_part_slot))
678 TupleTableSlot *outerslot = ExecProcNode(outerPlan);
680 if (!TupIsNull(outerslot))
681 ExecCopySlot(winstate->first_part_slot, outerslot);
684 /* outer plan is empty, so we have nothing to do */
685 winstate->partition_spooled = true;
686 winstate->more_partitions = false;
691 /* Create new tuplestore for this partition */
692 winstate->buffer = tuplestore_begin_heap(false, false, work_mem);
695 * Set up read pointers for the tuplestore. The current pointer doesn't
696 * need BACKWARD capability, but the per-window-function read pointers do,
697 * and the aggregate pointer does if frame start is movable.
699 winstate->current_ptr = 0; /* read pointer 0 is pre-allocated */
701 /* reset default REWIND capability bit for current ptr */
702 tuplestore_set_eflags(winstate->buffer, 0);
704 /* create read pointers for aggregates, if needed */
705 if (winstate->numaggs > 0)
707 WindowObject agg_winobj = winstate->agg_winobj;
708 int readptr_flags = 0;
710 /* If the frame head is potentially movable ... */
711 if (!(winstate->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
713 /* ... create a mark pointer to track the frame head */
714 agg_winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer, 0);
715 /* and the read pointer will need BACKWARD capability */
716 readptr_flags |= EXEC_FLAG_BACKWARD;
719 agg_winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
721 agg_winobj->markpos = -1;
722 agg_winobj->seekpos = -1;
724 /* Also reset the row counters for aggregates */
725 winstate->aggregatedbase = 0;
726 winstate->aggregatedupto = 0;
729 /* create mark and read pointers for each real window function */
730 for (i = 0; i < numfuncs; i++)
732 WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
734 if (!perfuncstate->plain_agg)
736 WindowObject winobj = perfuncstate->winobj;
738 winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer,
740 winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
742 winobj->markpos = -1;
743 winobj->seekpos = -1;
748 * Store the first tuple into the tuplestore (it's always available now;
749 * we either read it above, or saved it at the end of previous partition)
751 tuplestore_puttupleslot(winstate->buffer, winstate->first_part_slot);
752 winstate->spooled_rows++;
756 * Read tuples from the outer node, up to and including position 'pos', and
757 * store them into the tuplestore. If pos is -1, reads the whole partition.
760 spool_tuples(WindowAggState *winstate, int64 pos)
762 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
763 PlanState *outerPlan;
764 TupleTableSlot *outerslot;
765 MemoryContext oldcontext;
767 if (!winstate->buffer)
768 return; /* just a safety check */
769 if (winstate->partition_spooled)
770 return; /* whole partition done already */
773 * If the tuplestore has spilled to disk, alternate reading and writing
774 * becomes quite expensive due to frequent buffer flushes. It's cheaper
775 * to force the entire partition to get spooled in one go.
777 * XXX this is a horrid kluge --- it'd be better to fix the performance
778 * problem inside tuplestore. FIXME
780 if (!tuplestore_in_memory(winstate->buffer))
783 outerPlan = outerPlanState(winstate);
785 /* Must be in query context to call outerplan */
786 oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
788 while (winstate->spooled_rows <= pos || pos == -1)
790 outerslot = ExecProcNode(outerPlan);
791 if (TupIsNull(outerslot))
793 /* reached the end of the last partition */
794 winstate->partition_spooled = true;
795 winstate->more_partitions = false;
799 if (node->partNumCols > 0)
801 /* Check if this tuple still belongs to the current partition */
802 if (!execTuplesMatch(winstate->first_part_slot,
804 node->partNumCols, node->partColIdx,
805 winstate->partEqfunctions,
806 winstate->tmpcontext->ecxt_per_tuple_memory))
809 * end of partition; copy the tuple for the next cycle.
811 ExecCopySlot(winstate->first_part_slot, outerslot);
812 winstate->partition_spooled = true;
813 winstate->more_partitions = true;
818 /* Still in partition, so save it into the tuplestore */
819 tuplestore_puttupleslot(winstate->buffer, outerslot);
820 winstate->spooled_rows++;
823 MemoryContextSwitchTo(oldcontext);
828 * clear information kept within a partition, including
829 * tuplestore and aggregate results.
832 release_partition(WindowAggState *winstate)
836 for (i = 0; i < winstate->numfuncs; i++)
838 WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
840 /* Release any partition-local state of this window function */
841 if (perfuncstate->winobj)
842 perfuncstate->winobj->localmem = NULL;
846 * Release all partition-local memory (in particular, any partition-local
847 * state that we might have trashed our pointers to in the above loop, and
848 * any aggregate temp data). We don't rely on retail pfree because some
849 * aggregates might have allocated data we don't have direct pointers to.
851 MemoryContextResetAndDeleteChildren(winstate->partcontext);
852 MemoryContextResetAndDeleteChildren(winstate->aggcontext);
854 if (winstate->buffer)
855 tuplestore_end(winstate->buffer);
856 winstate->buffer = NULL;
857 winstate->partition_spooled = false;
862 * Determine whether a row is in the current row's window frame according
863 * to our window framing rule
865 * The caller must have already determined that the row is in the partition
866 * and fetched it into a slot. This function just encapsulates the framing
870 row_is_in_frame(WindowAggState *winstate, int64 pos, TupleTableSlot *slot)
872 int frameOptions = winstate->frameOptions;
874 Assert(pos >= 0); /* else caller error */
876 /* First, check frame starting conditions */
877 if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
879 if (frameOptions & FRAMEOPTION_ROWS)
881 /* rows before current row are out of frame */
882 if (pos < winstate->currentpos)
885 else if (frameOptions & FRAMEOPTION_RANGE)
887 /* preceding row that is not peer is out of frame */
888 if (pos < winstate->currentpos &&
889 !are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
895 else if (frameOptions & FRAMEOPTION_START_VALUE)
897 if (frameOptions & FRAMEOPTION_ROWS)
899 int64 offset = DatumGetInt64(winstate->startOffsetValue);
901 /* rows before current row + offset are out of frame */
902 if (frameOptions & FRAMEOPTION_START_VALUE_PRECEDING)
905 if (pos < winstate->currentpos + offset)
908 else if (frameOptions & FRAMEOPTION_RANGE)
910 /* parser should have rejected this */
911 elog(ERROR, "window frame with value offset is not implemented");
917 /* Okay so far, now check frame ending conditions */
918 if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
920 if (frameOptions & FRAMEOPTION_ROWS)
922 /* rows after current row are out of frame */
923 if (pos > winstate->currentpos)
926 else if (frameOptions & FRAMEOPTION_RANGE)
928 /* following row that is not peer is out of frame */
929 if (pos > winstate->currentpos &&
930 !are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
936 else if (frameOptions & FRAMEOPTION_END_VALUE)
938 if (frameOptions & FRAMEOPTION_ROWS)
940 int64 offset = DatumGetInt64(winstate->endOffsetValue);
942 /* rows after current row + offset are out of frame */
943 if (frameOptions & FRAMEOPTION_END_VALUE_PRECEDING)
946 if (pos > winstate->currentpos + offset)
949 else if (frameOptions & FRAMEOPTION_RANGE)
951 /* parser should have rejected this */
952 elog(ERROR, "window frame with value offset is not implemented");
958 /* If we get here, it's in frame */
963 * update_frameheadpos
964 * make frameheadpos valid for the current row
966 * Uses the winobj's read pointer for any required fetches; hence, if the
967 * frame mode is one that requires row comparisons, the winobj's mark must
968 * not be past the currently known frame head. Also uses the specified slot
969 * for any required fetches.
972 update_frameheadpos(WindowObject winobj, TupleTableSlot *slot)
974 WindowAggState *winstate = winobj->winstate;
975 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
976 int frameOptions = winstate->frameOptions;
978 if (winstate->framehead_valid)
979 return; /* already known for current row */
981 if (frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)
983 /* In UNBOUNDED PRECEDING mode, frame head is always row 0 */
984 winstate->frameheadpos = 0;
985 winstate->framehead_valid = true;
987 else if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
989 if (frameOptions & FRAMEOPTION_ROWS)
991 /* In ROWS mode, frame head is the same as current */
992 winstate->frameheadpos = winstate->currentpos;
993 winstate->framehead_valid = true;
995 else if (frameOptions & FRAMEOPTION_RANGE)
999 /* If no ORDER BY, all rows are peers with each other */
1000 if (node->ordNumCols == 0)
1002 winstate->frameheadpos = 0;
1003 winstate->framehead_valid = true;
1008 * In RANGE START_CURRENT mode, frame head is the first row that
1009 * is a peer of current row. We search backwards from current,
1010 * which could be a bit inefficient if peer sets are large. Might
1011 * be better to have a separate read pointer that moves forward
1012 * tracking the frame head.
1014 fhprev = winstate->currentpos - 1;
1017 /* assume the frame head can't go backwards */
1018 if (fhprev < winstate->frameheadpos)
1020 if (!window_gettupleslot(winobj, fhprev, slot))
1021 break; /* start of partition */
1022 if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
1023 break; /* not peer of current row */
1026 winstate->frameheadpos = fhprev + 1;
1027 winstate->framehead_valid = true;
1032 else if (frameOptions & FRAMEOPTION_START_VALUE)
1034 if (frameOptions & FRAMEOPTION_ROWS)
1036 /* In ROWS mode, bound is physically n before/after current */
1037 int64 offset = DatumGetInt64(winstate->startOffsetValue);
1039 if (frameOptions & FRAMEOPTION_START_VALUE_PRECEDING)
1042 winstate->frameheadpos = winstate->currentpos + offset;
1043 /* frame head can't go before first row */
1044 if (winstate->frameheadpos < 0)
1045 winstate->frameheadpos = 0;
1046 else if (winstate->frameheadpos > winstate->currentpos)
1048 /* make sure frameheadpos is not past end of partition */
1049 spool_tuples(winstate, winstate->frameheadpos - 1);
1050 if (winstate->frameheadpos > winstate->spooled_rows)
1051 winstate->frameheadpos = winstate->spooled_rows;
1053 winstate->framehead_valid = true;
1055 else if (frameOptions & FRAMEOPTION_RANGE)
1057 /* parser should have rejected this */
1058 elog(ERROR, "window frame with value offset is not implemented");
1068 * update_frametailpos
1069 * make frametailpos valid for the current row
1071 * Uses the winobj's read pointer for any required fetches; hence, if the
1072 * frame mode is one that requires row comparisons, the winobj's mark must
1073 * not be past the currently known frame tail. Also uses the specified slot
1074 * for any required fetches.
1077 update_frametailpos(WindowObject winobj, TupleTableSlot *slot)
1079 WindowAggState *winstate = winobj->winstate;
1080 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1081 int frameOptions = winstate->frameOptions;
1083 if (winstate->frametail_valid)
1084 return; /* already known for current row */
1086 if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING)
1088 /* In UNBOUNDED FOLLOWING mode, all partition rows are in frame */
1089 spool_tuples(winstate, -1);
1090 winstate->frametailpos = winstate->spooled_rows - 1;
1091 winstate->frametail_valid = true;
1093 else if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
1095 if (frameOptions & FRAMEOPTION_ROWS)
1097 /* In ROWS mode, exactly the rows up to current are in frame */
1098 winstate->frametailpos = winstate->currentpos;
1099 winstate->frametail_valid = true;
1101 else if (frameOptions & FRAMEOPTION_RANGE)
1105 /* If no ORDER BY, all rows are peers with each other */
1106 if (node->ordNumCols == 0)
1108 spool_tuples(winstate, -1);
1109 winstate->frametailpos = winstate->spooled_rows - 1;
1110 winstate->frametail_valid = true;
1115 * Else we have to search for the first non-peer of the current
1116 * row. We assume the current value of frametailpos is a lower
1117 * bound on the possible frame tail location, ie, frame tail never
1118 * goes backward, and that currentpos is also a lower bound, ie,
1119 * frame end always >= current row.
1121 ftnext = Max(winstate->frametailpos, winstate->currentpos) + 1;
1124 if (!window_gettupleslot(winobj, ftnext, slot))
1125 break; /* end of partition */
1126 if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
1127 break; /* not peer of current row */
1130 winstate->frametailpos = ftnext - 1;
1131 winstate->frametail_valid = true;
1136 else if (frameOptions & FRAMEOPTION_END_VALUE)
1138 if (frameOptions & FRAMEOPTION_ROWS)
1140 /* In ROWS mode, bound is physically n before/after current */
1141 int64 offset = DatumGetInt64(winstate->endOffsetValue);
1143 if (frameOptions & FRAMEOPTION_END_VALUE_PRECEDING)
1146 winstate->frametailpos = winstate->currentpos + offset;
1147 /* smallest allowable value of frametailpos is -1 */
1148 if (winstate->frametailpos < 0)
1149 winstate->frametailpos = -1;
1150 else if (winstate->frametailpos > winstate->currentpos)
1152 /* make sure frametailpos is not past last row of partition */
1153 spool_tuples(winstate, winstate->frametailpos);
1154 if (winstate->frametailpos >= winstate->spooled_rows)
1155 winstate->frametailpos = winstate->spooled_rows - 1;
1157 winstate->frametail_valid = true;
1159 else if (frameOptions & FRAMEOPTION_RANGE)
1161 /* parser should have rejected this */
1162 elog(ERROR, "window frame with value offset is not implemented");
1172 /* -----------------
1175 * ExecWindowAgg receives tuples from its outer subplan and
1176 * stores them into a tuplestore, then processes window functions.
1177 * This node doesn't reduce nor qualify any row so the number of
1178 * returned rows is exactly the same as its outer subplan's result
1179 * (ignoring the case of SRFs in the targetlist, that is).
1183 ExecWindowAgg(WindowAggState *winstate)
1185 TupleTableSlot *result;
1186 ExprDoneCond isDone;
1187 ExprContext *econtext;
1191 if (winstate->all_done)
1195 * Check to see if we're still projecting out tuples from a previous
1196 * output tuple (because there is a function-returning-set in the
1197 * projection expressions). If so, try to project another one.
1199 if (winstate->ss.ps.ps_TupFromTlist)
1201 TupleTableSlot *result;
1202 ExprDoneCond isDone;
1204 result = ExecProject(winstate->ss.ps.ps_ProjInfo, &isDone);
1205 if (isDone == ExprMultipleResult)
1207 /* Done with that source tuple... */
1208 winstate->ss.ps.ps_TupFromTlist = false;
1212 * Compute frame offset values, if any, during first call.
1214 if (winstate->all_first)
1216 int frameOptions = winstate->frameOptions;
1217 ExprContext *econtext = winstate->ss.ps.ps_ExprContext;
1223 if (frameOptions & FRAMEOPTION_START_VALUE)
1225 Assert(winstate->startOffset != NULL);
1226 value = ExecEvalExprSwitchContext(winstate->startOffset,
1232 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
1233 errmsg("frame starting offset must not be null")));
1234 /* copy value into query-lifespan context */
1235 get_typlenbyval(exprType((Node *) winstate->startOffset->expr),
1237 winstate->startOffsetValue = datumCopy(value, byval, len);
1238 if (frameOptions & FRAMEOPTION_ROWS)
1240 /* value is known to be int8 */
1241 int64 offset = DatumGetInt64(value);
1245 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1246 errmsg("frame starting offset must not be negative")));
1249 if (frameOptions & FRAMEOPTION_END_VALUE)
1251 Assert(winstate->endOffset != NULL);
1252 value = ExecEvalExprSwitchContext(winstate->endOffset,
1258 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
1259 errmsg("frame ending offset must not be null")));
1260 /* copy value into query-lifespan context */
1261 get_typlenbyval(exprType((Node *) winstate->endOffset->expr),
1263 winstate->endOffsetValue = datumCopy(value, byval, len);
1264 if (frameOptions & FRAMEOPTION_ROWS)
1266 /* value is known to be int8 */
1267 int64 offset = DatumGetInt64(value);
1271 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1272 errmsg("frame ending offset must not be negative")));
1275 winstate->all_first = false;
1279 if (winstate->buffer == NULL)
1281 /* Initialize for first partition and set current row = 0 */
1282 begin_partition(winstate);
1283 /* If there are no input rows, we'll detect that and exit below */
1287 /* Advance current row within partition */
1288 winstate->currentpos++;
1289 /* This might mean that the frame moves, too */
1290 winstate->framehead_valid = false;
1291 winstate->frametail_valid = false;
1295 * Spool all tuples up to and including the current row, if we haven't
1298 spool_tuples(winstate, winstate->currentpos);
1300 /* Move to the next partition if we reached the end of this partition */
1301 if (winstate->partition_spooled &&
1302 winstate->currentpos >= winstate->spooled_rows)
1304 release_partition(winstate);
1306 if (winstate->more_partitions)
1308 begin_partition(winstate);
1309 Assert(winstate->spooled_rows > 0);
1313 winstate->all_done = true;
1318 /* final output execution is in ps_ExprContext */
1319 econtext = winstate->ss.ps.ps_ExprContext;
1321 /* Clear the per-output-tuple context for current row */
1322 ResetExprContext(econtext);
1325 * Read the current row from the tuplestore, and save in ScanTupleSlot.
1326 * (We can't rely on the outerplan's output slot because we may have to
1327 * read beyond the current row. Also, we have to actually copy the row
1328 * out of the tuplestore, since window function evaluation might cause the
1329 * tuplestore to dump its state to disk.)
1331 * Current row must be in the tuplestore, since we spooled it above.
1333 tuplestore_select_read_pointer(winstate->buffer, winstate->current_ptr);
1334 if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1335 winstate->ss.ss_ScanTupleSlot))
1336 elog(ERROR, "unexpected end of tuplestore");
1339 * Evaluate true window functions
1341 numfuncs = winstate->numfuncs;
1342 for (i = 0; i < numfuncs; i++)
1344 WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1346 if (perfuncstate->plain_agg)
1348 eval_windowfunction(winstate, perfuncstate,
1349 &(econtext->ecxt_aggvalues[perfuncstate->wfuncstate->wfuncno]),
1350 &(econtext->ecxt_aggnulls[perfuncstate->wfuncstate->wfuncno]));
1354 * Evaluate aggregates
1356 if (winstate->numaggs > 0)
1357 eval_windowaggregates(winstate);
1360 * Truncate any no-longer-needed rows from the tuplestore.
1362 tuplestore_trim(winstate->buffer);
1365 * Form and return a projection tuple using the windowfunc results and the
1366 * current row. Setting ecxt_outertuple arranges that any Vars will be
1367 * evaluated with respect to that row.
1369 econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
1370 result = ExecProject(winstate->ss.ps.ps_ProjInfo, &isDone);
1372 if (isDone == ExprEndResult)
1374 /* SRF in tlist returned no rows, so advance to next input tuple */
1378 winstate->ss.ps.ps_TupFromTlist =
1379 (isDone == ExprMultipleResult);
1383 /* -----------------
1386 * Creates the run-time information for the WindowAgg node produced by the
1387 * planner and initializes its outer subtree
1391 ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
1393 WindowAggState *winstate;
1395 ExprContext *econtext;
1396 ExprContext *tmpcontext;
1397 WindowStatePerFunc perfunc;
1398 WindowStatePerAgg peragg;
1405 /* check for unsupported flags */
1406 Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
1409 * create state structure
1411 winstate = makeNode(WindowAggState);
1412 winstate->ss.ps.plan = (Plan *) node;
1413 winstate->ss.ps.state = estate;
1416 * Create expression contexts. We need two, one for per-input-tuple
1417 * processing and one for per-output-tuple processing. We cheat a little
1418 * by using ExecAssignExprContext() to build both.
1420 ExecAssignExprContext(estate, &winstate->ss.ps);
1421 tmpcontext = winstate->ss.ps.ps_ExprContext;
1422 winstate->tmpcontext = tmpcontext;
1423 ExecAssignExprContext(estate, &winstate->ss.ps);
1425 /* Create long-lived context for storage of partition-local memory etc */
1426 winstate->partcontext =
1427 AllocSetContextCreate(CurrentMemoryContext,
1428 "WindowAgg_Partition",
1429 ALLOCSET_DEFAULT_MINSIZE,
1430 ALLOCSET_DEFAULT_INITSIZE,
1431 ALLOCSET_DEFAULT_MAXSIZE);
1433 /* Create mid-lived context for aggregate trans values etc */
1434 winstate->aggcontext =
1435 AllocSetContextCreate(CurrentMemoryContext,
1436 "WindowAgg_Aggregates",
1437 ALLOCSET_DEFAULT_MINSIZE,
1438 ALLOCSET_DEFAULT_INITSIZE,
1439 ALLOCSET_DEFAULT_MAXSIZE);
1442 * tuple table initialization
1444 ExecInitScanTupleSlot(estate, &winstate->ss);
1445 ExecInitResultTupleSlot(estate, &winstate->ss.ps);
1446 winstate->first_part_slot = ExecInitExtraTupleSlot(estate);
1447 winstate->agg_row_slot = ExecInitExtraTupleSlot(estate);
1448 winstate->temp_slot_1 = ExecInitExtraTupleSlot(estate);
1449 winstate->temp_slot_2 = ExecInitExtraTupleSlot(estate);
1451 winstate->ss.ps.targetlist = (List *)
1452 ExecInitExpr((Expr *) node->plan.targetlist,
1453 (PlanState *) winstate);
1456 * WindowAgg nodes never have quals, since they can only occur at the
1457 * logical top level of a query (ie, after any WHERE or HAVING filters)
1459 Assert(node->plan.qual == NIL);
1460 winstate->ss.ps.qual = NIL;
1463 * initialize child nodes
1465 outerPlan = outerPlan(node);
1466 outerPlanState(winstate) = ExecInitNode(outerPlan, estate, eflags);
1469 * initialize source tuple type (which is also the tuple type that we'll
1470 * store in the tuplestore and use in all our working slots).
1472 ExecAssignScanTypeFromOuterPlan(&winstate->ss);
1474 ExecSetSlotDescriptor(winstate->first_part_slot,
1475 winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
1476 ExecSetSlotDescriptor(winstate->agg_row_slot,
1477 winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
1478 ExecSetSlotDescriptor(winstate->temp_slot_1,
1479 winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
1480 ExecSetSlotDescriptor(winstate->temp_slot_2,
1481 winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
1484 * Initialize result tuple type and projection info.
1486 ExecAssignResultTypeFromTL(&winstate->ss.ps);
1487 ExecAssignProjectionInfo(&winstate->ss.ps, NULL);
1489 winstate->ss.ps.ps_TupFromTlist = false;
1491 /* Set up data for comparing tuples */
1492 if (node->partNumCols > 0)
1493 winstate->partEqfunctions = execTuplesMatchPrepare(node->partNumCols,
1494 node->partOperators);
1495 if (node->ordNumCols > 0)
1496 winstate->ordEqfunctions = execTuplesMatchPrepare(node->ordNumCols,
1497 node->ordOperators);
1500 * WindowAgg nodes use aggvalues and aggnulls as well as Agg nodes.
1502 numfuncs = winstate->numfuncs;
1503 numaggs = winstate->numaggs;
1504 econtext = winstate->ss.ps.ps_ExprContext;
1505 econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numfuncs);
1506 econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numfuncs);
1509 * allocate per-wfunc/per-agg state information.
1511 perfunc = (WindowStatePerFunc) palloc0(sizeof(WindowStatePerFuncData) * numfuncs);
1512 peragg = (WindowStatePerAgg) palloc0(sizeof(WindowStatePerAggData) * numaggs);
1513 winstate->perfunc = perfunc;
1514 winstate->peragg = peragg;
1518 foreach(l, winstate->funcs)
1520 WindowFuncExprState *wfuncstate = (WindowFuncExprState *) lfirst(l);
1521 WindowFunc *wfunc = (WindowFunc *) wfuncstate->xprstate.expr;
1522 WindowStatePerFunc perfuncstate;
1523 AclResult aclresult;
1526 if (wfunc->winref != node->winref) /* planner screwed up? */
1527 elog(ERROR, "WindowFunc with winref %u assigned to WindowAgg with winref %u",
1528 wfunc->winref, node->winref);
1530 /* Look for a previous duplicate window function */
1531 for (i = 0; i <= wfuncno; i++)
1533 if (equal(wfunc, perfunc[i].wfunc) &&
1534 !contain_volatile_functions((Node *) wfunc))
1539 /* Found a match to an existing entry, so just mark it */
1540 wfuncstate->wfuncno = i;
1544 /* Nope, so assign a new PerAgg record */
1545 perfuncstate = &perfunc[++wfuncno];
1547 /* Mark WindowFunc state node with assigned index in the result array */
1548 wfuncstate->wfuncno = wfuncno;
1550 /* Check permission to call window function */
1551 aclresult = pg_proc_aclcheck(wfunc->winfnoid, GetUserId(),
1553 if (aclresult != ACLCHECK_OK)
1554 aclcheck_error(aclresult, ACL_KIND_PROC,
1555 get_func_name(wfunc->winfnoid));
1557 /* Fill in the perfuncstate data */
1558 perfuncstate->wfuncstate = wfuncstate;
1559 perfuncstate->wfunc = wfunc;
1560 perfuncstate->numArguments = list_length(wfuncstate->args);
1562 fmgr_info_cxt(wfunc->winfnoid, &perfuncstate->flinfo,
1563 econtext->ecxt_per_query_memory);
1564 fmgr_info_set_collation(wfunc->inputcollid, &perfuncstate->flinfo);
1565 fmgr_info_set_expr((Node *) wfunc, &perfuncstate->flinfo);
1567 get_typlenbyval(wfunc->wintype,
1568 &perfuncstate->resulttypeLen,
1569 &perfuncstate->resulttypeByVal);
1572 * If it's really just a plain aggregate function, we'll emulate the
1573 * Agg environment for it.
1575 perfuncstate->plain_agg = wfunc->winagg;
1578 WindowStatePerAgg peraggstate;
1580 perfuncstate->aggno = ++aggno;
1581 peraggstate = &winstate->peragg[aggno];
1582 initialize_peragg(winstate, wfunc, peraggstate);
1583 peraggstate->wfuncno = wfuncno;
1587 WindowObject winobj = makeNode(WindowObjectData);
1589 winobj->winstate = winstate;
1590 winobj->argstates = wfuncstate->args;
1591 winobj->localmem = NULL;
1592 perfuncstate->winobj = winobj;
1596 /* Update numfuncs, numaggs to match number of unique functions found */
1597 winstate->numfuncs = wfuncno + 1;
1598 winstate->numaggs = aggno + 1;
1600 /* Set up WindowObject for aggregates, if needed */
1601 if (winstate->numaggs > 0)
1603 WindowObject agg_winobj = makeNode(WindowObjectData);
1605 agg_winobj->winstate = winstate;
1606 agg_winobj->argstates = NIL;
1607 agg_winobj->localmem = NULL;
1608 /* make sure markptr = -1 to invalidate. It may not get used */
1609 agg_winobj->markptr = -1;
1610 agg_winobj->readptr = -1;
1611 winstate->agg_winobj = agg_winobj;
1614 /* copy frame options to state node for easy access */
1615 winstate->frameOptions = node->frameOptions;
1617 /* initialize frame bound offset expressions */
1618 winstate->startOffset = ExecInitExpr((Expr *) node->startOffset,
1619 (PlanState *) winstate);
1620 winstate->endOffset = ExecInitExpr((Expr *) node->endOffset,
1621 (PlanState *) winstate);
1623 winstate->all_first = true;
1624 winstate->partition_spooled = false;
1625 winstate->more_partitions = false;
1630 /* -----------------
1635 ExecEndWindowAgg(WindowAggState *node)
1637 PlanState *outerPlan;
1639 release_partition(node);
1641 pfree(node->perfunc);
1642 pfree(node->peragg);
1644 ExecClearTuple(node->ss.ss_ScanTupleSlot);
1645 ExecClearTuple(node->first_part_slot);
1646 ExecClearTuple(node->agg_row_slot);
1647 ExecClearTuple(node->temp_slot_1);
1648 ExecClearTuple(node->temp_slot_2);
1651 * Free both the expr contexts.
1653 ExecFreeExprContext(&node->ss.ps);
1654 node->ss.ps.ps_ExprContext = node->tmpcontext;
1655 ExecFreeExprContext(&node->ss.ps);
1657 MemoryContextDelete(node->partcontext);
1658 MemoryContextDelete(node->aggcontext);
1660 outerPlan = outerPlanState(node);
1661 ExecEndNode(outerPlan);
1664 /* -----------------
1665 * ExecRescanWindowAgg
1669 ExecReScanWindowAgg(WindowAggState *node)
1671 ExprContext *econtext = node->ss.ps.ps_ExprContext;
1673 node->all_done = false;
1675 node->ss.ps.ps_TupFromTlist = false;
1676 node->all_first = true;
1678 /* release tuplestore et al */
1679 release_partition(node);
1681 /* release all temp tuples, but especially first_part_slot */
1682 ExecClearTuple(node->ss.ss_ScanTupleSlot);
1683 ExecClearTuple(node->first_part_slot);
1684 ExecClearTuple(node->agg_row_slot);
1685 ExecClearTuple(node->temp_slot_1);
1686 ExecClearTuple(node->temp_slot_2);
1688 /* Forget current wfunc values */
1689 MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numfuncs);
1690 MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numfuncs);
1693 * if chgParam of subnode is not null then plan will be re-scanned by
1694 * first ExecProcNode.
1696 if (node->ss.ps.lefttree->chgParam == NULL)
1697 ExecReScan(node->ss.ps.lefttree);
1703 * Almost same as in nodeAgg.c, except we don't support DISTINCT currently.
1705 static WindowStatePerAggData *
1706 initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
1707 WindowStatePerAgg peraggstate)
1709 Oid inputTypes[FUNC_MAX_ARGS];
1712 Form_pg_aggregate aggform;
1714 AclResult aclresult;
1723 numArguments = list_length(wfunc->args);
1726 foreach(lc, wfunc->args)
1728 inputTypes[i++] = exprType((Node *) lfirst(lc));
1731 aggTuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(wfunc->winfnoid));
1732 if (!HeapTupleIsValid(aggTuple))
1733 elog(ERROR, "cache lookup failed for aggregate %u",
1735 aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
1738 * ExecInitWindowAgg already checked permission to call aggregate function
1739 * ... but we still need to check the component functions
1742 peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn;
1743 peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
1745 /* Check that aggregate owner has permission to call component fns */
1747 HeapTuple procTuple;
1750 procTuple = SearchSysCache1(PROCOID,
1751 ObjectIdGetDatum(wfunc->winfnoid));
1752 if (!HeapTupleIsValid(procTuple))
1753 elog(ERROR, "cache lookup failed for function %u",
1755 aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
1756 ReleaseSysCache(procTuple);
1758 aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
1760 if (aclresult != ACLCHECK_OK)
1761 aclcheck_error(aclresult, ACL_KIND_PROC,
1762 get_func_name(transfn_oid));
1763 if (OidIsValid(finalfn_oid))
1765 aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
1767 if (aclresult != ACLCHECK_OK)
1768 aclcheck_error(aclresult, ACL_KIND_PROC,
1769 get_func_name(finalfn_oid));
1773 /* resolve actual type of transition state, if polymorphic */
1774 aggtranstype = aggform->aggtranstype;
1775 if (IsPolymorphicType(aggtranstype))
1777 /* have to fetch the agg's declared input types... */
1778 Oid *declaredArgTypes;
1781 get_func_signature(wfunc->winfnoid,
1782 &declaredArgTypes, &agg_nargs);
1783 Assert(agg_nargs == numArguments);
1784 aggtranstype = enforce_generic_type_consistency(inputTypes,
1789 pfree(declaredArgTypes);
1792 /* build expression trees using actual argument & result types */
1793 build_aggregate_fnexprs(inputTypes,
1803 fmgr_info(transfn_oid, &peraggstate->transfn);
1804 fmgr_info_set_collation(wfunc->inputcollid, &peraggstate->transfn);
1805 fmgr_info_set_expr((Node *) transfnexpr, &peraggstate->transfn);
1807 if (OidIsValid(finalfn_oid))
1809 fmgr_info(finalfn_oid, &peraggstate->finalfn);
1810 fmgr_info_set_collation(wfunc->inputcollid, &peraggstate->finalfn);
1811 fmgr_info_set_expr((Node *) finalfnexpr, &peraggstate->finalfn);
1814 get_typlenbyval(wfunc->wintype,
1815 &peraggstate->resulttypeLen,
1816 &peraggstate->resulttypeByVal);
1817 get_typlenbyval(aggtranstype,
1818 &peraggstate->transtypeLen,
1819 &peraggstate->transtypeByVal);
1822 * initval is potentially null, so don't try to access it as a struct
1823 * field. Must do it the hard way with SysCacheGetAttr.
1825 textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,
1826 Anum_pg_aggregate_agginitval,
1827 &peraggstate->initValueIsNull);
1829 if (peraggstate->initValueIsNull)
1830 peraggstate->initValue = (Datum) 0;
1832 peraggstate->initValue = GetAggInitVal(textInitVal,
1836 * If the transfn is strict and the initval is NULL, make sure input type
1837 * and transtype are the same (or at least binary-compatible), so that
1838 * it's OK to use the first input value as the initial transValue. This
1839 * should have been checked at agg definition time, but just in case...
1841 if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull)
1843 if (numArguments < 1 ||
1844 !IsBinaryCoercible(inputTypes[0], aggtranstype))
1846 (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
1847 errmsg("aggregate %u needs to have compatible input type and transition type",
1851 ReleaseSysCache(aggTuple);
1857 GetAggInitVal(Datum textInitVal, Oid transtype)
1864 getTypeInputInfo(transtype, &typinput, &typioparam);
1865 strInitVal = TextDatumGetCString(textInitVal);
1866 initVal = OidInputFunctionCall(typinput, strInitVal,
1874 * compare two rows to see if they are equal according to the ORDER BY clause
1876 * NB: this does not consider the window frame mode.
1879 are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
1880 TupleTableSlot *slot2)
1882 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1884 /* If no ORDER BY, all rows are peers with each other */
1885 if (node->ordNumCols == 0)
1888 return execTuplesMatch(slot1, slot2,
1889 node->ordNumCols, node->ordColIdx,
1890 winstate->ordEqfunctions,
1891 winstate->tmpcontext->ecxt_per_tuple_memory);
1895 * window_gettupleslot
1896 * Fetch the pos'th tuple of the current partition into the slot,
1897 * using the winobj's read pointer
1899 * Returns true if successful, false if no such row
1902 window_gettupleslot(WindowObject winobj, int64 pos, TupleTableSlot *slot)
1904 WindowAggState *winstate = winobj->winstate;
1905 MemoryContext oldcontext;
1907 /* Don't allow passing -1 to spool_tuples here */
1911 /* If necessary, fetch the tuple into the spool */
1912 spool_tuples(winstate, pos);
1914 if (pos >= winstate->spooled_rows)
1917 if (pos < winobj->markpos)
1918 elog(ERROR, "cannot fetch row before WindowObject's mark position");
1920 oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
1922 tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
1925 * There's no API to refetch the tuple at the current position. We have to
1926 * move one tuple forward, and then one backward. (We don't do it the
1927 * other way because we might try to fetch the row before our mark, which
1928 * isn't allowed.) XXX this case could stand to be optimized.
1930 if (winobj->seekpos == pos)
1932 tuplestore_advance(winstate->buffer, true);
1936 while (winobj->seekpos > pos)
1938 if (!tuplestore_gettupleslot(winstate->buffer, false, true, slot))
1939 elog(ERROR, "unexpected end of tuplestore");
1943 while (winobj->seekpos < pos)
1945 if (!tuplestore_gettupleslot(winstate->buffer, true, true, slot))
1946 elog(ERROR, "unexpected end of tuplestore");
1950 MemoryContextSwitchTo(oldcontext);
1956 /***********************************************************************
1957 * API exposed to window functions
1958 ***********************************************************************/
1962 * WinGetPartitionLocalMemory
1963 * Get working memory that lives till end of partition processing
1965 * On first call within a given partition, this allocates and zeroes the
1966 * requested amount of space. Subsequent calls just return the same chunk.
1968 * Memory obtained this way is normally used to hold state that should be
1969 * automatically reset for each new partition. If a window function wants
1970 * to hold state across the whole query, fcinfo->fn_extra can be used in the
1971 * usual way for that.
1974 WinGetPartitionLocalMemory(WindowObject winobj, Size sz)
1976 Assert(WindowObjectIsValid(winobj));
1977 if (winobj->localmem == NULL)
1979 MemoryContextAllocZero(winobj->winstate->partcontext, sz);
1980 return winobj->localmem;
1984 * WinGetCurrentPosition
1985 * Return the current row's position (counting from 0) within the current
1989 WinGetCurrentPosition(WindowObject winobj)
1991 Assert(WindowObjectIsValid(winobj));
1992 return winobj->winstate->currentpos;
1996 * WinGetPartitionRowCount
1997 * Return total number of rows contained in the current partition.
1999 * Note: this is a relatively expensive operation because it forces the
2000 * whole partition to be "spooled" into the tuplestore at once. Once
2001 * executed, however, additional calls within the same partition are cheap.
2004 WinGetPartitionRowCount(WindowObject winobj)
2006 Assert(WindowObjectIsValid(winobj));
2007 spool_tuples(winobj->winstate, -1);
2008 return winobj->winstate->spooled_rows;
2012 * WinSetMarkPosition
2013 * Set the "mark" position for the window object, which is the oldest row
2014 * number (counting from 0) it is allowed to fetch during all subsequent
2015 * operations within the current partition.
2017 * Window functions do not have to call this, but are encouraged to move the
2018 * mark forward when possible to keep the tuplestore size down and prevent
2019 * having to spill rows to disk.
2022 WinSetMarkPosition(WindowObject winobj, int64 markpos)
2024 WindowAggState *winstate;
2026 Assert(WindowObjectIsValid(winobj));
2027 winstate = winobj->winstate;
2029 if (markpos < winobj->markpos)
2030 elog(ERROR, "cannot move WindowObject's mark position backward");
2031 tuplestore_select_read_pointer(winstate->buffer, winobj->markptr);
2032 while (markpos > winobj->markpos)
2034 tuplestore_advance(winstate->buffer, true);
2037 tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
2038 while (markpos > winobj->seekpos)
2040 tuplestore_advance(winstate->buffer, true);
2047 * Compare two rows (specified by absolute position in window) to see
2048 * if they are equal according to the ORDER BY clause.
2050 * NB: this does not consider the window frame mode.
2053 WinRowsArePeers(WindowObject winobj, int64 pos1, int64 pos2)
2055 WindowAggState *winstate;
2057 TupleTableSlot *slot1;
2058 TupleTableSlot *slot2;
2061 Assert(WindowObjectIsValid(winobj));
2062 winstate = winobj->winstate;
2063 node = (WindowAgg *) winstate->ss.ps.plan;
2065 /* If no ORDER BY, all rows are peers; don't bother to fetch them */
2066 if (node->ordNumCols == 0)
2069 slot1 = winstate->temp_slot_1;
2070 slot2 = winstate->temp_slot_2;
2072 if (!window_gettupleslot(winobj, pos1, slot1))
2073 elog(ERROR, "specified position is out of window: " INT64_FORMAT,
2075 if (!window_gettupleslot(winobj, pos2, slot2))
2076 elog(ERROR, "specified position is out of window: " INT64_FORMAT,
2079 res = are_peers(winstate, slot1, slot2);
2081 ExecClearTuple(slot1);
2082 ExecClearTuple(slot2);
2088 * WinGetFuncArgInPartition
2089 * Evaluate a window function's argument expression on a specified
2090 * row of the partition. The row is identified in lseek(2) style,
2091 * i.e. relative to the current, first, or last row.
2093 * argno: argument number to evaluate (counted from 0)
2094 * relpos: signed rowcount offset from the seek position
2095 * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
2096 * set_mark: If the row is found and set_mark is true, the mark is moved to
2097 * the row as a side-effect.
2098 * isnull: output argument, receives isnull status of result
2099 * isout: output argument, set to indicate whether target row position
2100 * is out of partition (can pass NULL if caller doesn't care about this)
2102 * Specifying a nonexistent row is not an error, it just causes a null result
2103 * (plus setting *isout true, if isout isn't NULL).
2106 WinGetFuncArgInPartition(WindowObject winobj, int argno,
2107 int relpos, int seektype, bool set_mark,
2108 bool *isnull, bool *isout)
2110 WindowAggState *winstate;
2111 ExprContext *econtext;
2112 TupleTableSlot *slot;
2116 Assert(WindowObjectIsValid(winobj));
2117 winstate = winobj->winstate;
2118 econtext = winstate->ss.ps.ps_ExprContext;
2119 slot = winstate->temp_slot_1;
2123 case WINDOW_SEEK_CURRENT:
2124 abs_pos = winstate->currentpos + relpos;
2126 case WINDOW_SEEK_HEAD:
2129 case WINDOW_SEEK_TAIL:
2130 spool_tuples(winstate, -1);
2131 abs_pos = winstate->spooled_rows - 1 + relpos;
2134 elog(ERROR, "unrecognized window seek type: %d", seektype);
2135 abs_pos = 0; /* keep compiler quiet */
2139 gottuple = window_gettupleslot(winobj, abs_pos, slot);
2154 int frameOptions = winstate->frameOptions;
2155 int64 mark_pos = abs_pos;
2158 * In RANGE mode with a moving frame head, we must not let the
2159 * mark advance past frameheadpos, since that row has to be
2160 * fetchable during future update_frameheadpos calls.
2162 * XXX it is very ugly to pollute window functions' marks with
2163 * this consideration; it could for instance mask a logic bug that
2164 * lets a window function fetch rows before what it had claimed
2165 * was its mark. Perhaps use a separate mark for frame head
2168 if ((frameOptions & FRAMEOPTION_RANGE) &&
2169 !(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
2171 update_frameheadpos(winobj, winstate->temp_slot_2);
2172 if (mark_pos > winstate->frameheadpos)
2173 mark_pos = winstate->frameheadpos;
2175 WinSetMarkPosition(winobj, mark_pos);
2177 econtext->ecxt_outertuple = slot;
2178 return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
2179 econtext, isnull, NULL);
2184 * WinGetFuncArgInFrame
2185 * Evaluate a window function's argument expression on a specified
2186 * row of the window frame. The row is identified in lseek(2) style,
2187 * i.e. relative to the current, first, or last row.
2189 * argno: argument number to evaluate (counted from 0)
2190 * relpos: signed rowcount offset from the seek position
2191 * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
2192 * set_mark: If the row is found and set_mark is true, the mark is moved to
2193 * the row as a side-effect.
2194 * isnull: output argument, receives isnull status of result
2195 * isout: output argument, set to indicate whether target row position
2196 * is out of frame (can pass NULL if caller doesn't care about this)
2198 * Specifying a nonexistent row is not an error, it just causes a null result
2199 * (plus setting *isout true, if isout isn't NULL).
2202 WinGetFuncArgInFrame(WindowObject winobj, int argno,
2203 int relpos, int seektype, bool set_mark,
2204 bool *isnull, bool *isout)
2206 WindowAggState *winstate;
2207 ExprContext *econtext;
2208 TupleTableSlot *slot;
2212 Assert(WindowObjectIsValid(winobj));
2213 winstate = winobj->winstate;
2214 econtext = winstate->ss.ps.ps_ExprContext;
2215 slot = winstate->temp_slot_1;
2219 case WINDOW_SEEK_CURRENT:
2220 abs_pos = winstate->currentpos + relpos;
2222 case WINDOW_SEEK_HEAD:
2223 update_frameheadpos(winobj, slot);
2224 abs_pos = winstate->frameheadpos + relpos;
2226 case WINDOW_SEEK_TAIL:
2227 update_frametailpos(winobj, slot);
2228 abs_pos = winstate->frametailpos + relpos;
2231 elog(ERROR, "unrecognized window seek type: %d", seektype);
2232 abs_pos = 0; /* keep compiler quiet */
2236 gottuple = window_gettupleslot(winobj, abs_pos, slot);
2238 gottuple = row_is_in_frame(winstate, abs_pos, slot);
2253 int frameOptions = winstate->frameOptions;
2254 int64 mark_pos = abs_pos;
2257 * In RANGE mode with a moving frame head, we must not let the
2258 * mark advance past frameheadpos, since that row has to be
2259 * fetchable during future update_frameheadpos calls.
2261 * XXX it is very ugly to pollute window functions' marks with
2262 * this consideration; it could for instance mask a logic bug that
2263 * lets a window function fetch rows before what it had claimed
2264 * was its mark. Perhaps use a separate mark for frame head
2267 if ((frameOptions & FRAMEOPTION_RANGE) &&
2268 !(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
2270 update_frameheadpos(winobj, winstate->temp_slot_2);
2271 if (mark_pos > winstate->frameheadpos)
2272 mark_pos = winstate->frameheadpos;
2274 WinSetMarkPosition(winobj, mark_pos);
2276 econtext->ecxt_outertuple = slot;
2277 return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
2278 econtext, isnull, NULL);
2283 * WinGetFuncArgCurrent
2284 * Evaluate a window function's argument expression on the current row.
2286 * argno: argument number to evaluate (counted from 0)
2287 * isnull: output argument, receives isnull status of result
2289 * Note: this isn't quite equivalent to WinGetFuncArgInPartition or
2290 * WinGetFuncArgInFrame targeting the current row, because it will succeed
2291 * even if the WindowObject's mark has been set beyond the current row.
2292 * This should generally be used for "ordinary" arguments of a window
2293 * function, such as the offset argument of lead() or lag().
2296 WinGetFuncArgCurrent(WindowObject winobj, int argno, bool *isnull)
2298 WindowAggState *winstate;
2299 ExprContext *econtext;
2301 Assert(WindowObjectIsValid(winobj));
2302 winstate = winobj->winstate;
2304 econtext = winstate->ss.ps.ps_ExprContext;
2306 econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
2307 return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
2308 econtext, isnull, NULL);