OSDN Git Service

Revise collation derivation method and expression-tree representation.
[pg-rex/syncrep.git] / src / backend / executor / nodeWindowAgg.c
1 /*-------------------------------------------------------------------------
2  *
3  * nodeWindowAgg.c
4  *        routines to handle WindowAgg nodes.
5  *
6  * A WindowAgg node evaluates "window functions" across suitable partitions
7  * of the input tuple set.      Any one WindowAgg works for just a single window
8  * specification, though it can evaluate multiple window functions sharing
9  * identical window specifications.  The input tuples are required to be
10  * delivered in sorted order, with the PARTITION BY columns (if any) as
11  * major sort keys and the ORDER BY columns (if any) as minor sort keys.
12  * (The planner generates a stack of WindowAggs with intervening Sort nodes
13  * as needed, if a query involves more than one window specification.)
14  *
15  * Since window functions can require access to any or all of the rows in
16  * the current partition, we accumulate rows of the partition into a
17  * tuplestore.  The window functions are called using the WindowObject API
18  * so that they can access those rows as needed.
19  *
20  * We also support using plain aggregate functions as window functions.
21  * For these, the regular Agg-node environment is emulated for each partition.
22  * As required by the SQL spec, the output represents the value of the
23  * aggregate function over all rows in the current row's window frame.
24  *
25  *
26  * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
27  * Portions Copyright (c) 1994, Regents of the University of California
28  *
29  * IDENTIFICATION
30  *        src/backend/executor/nodeWindowAgg.c
31  *
32  *-------------------------------------------------------------------------
33  */
34 #include "postgres.h"
35
36 #include "catalog/pg_aggregate.h"
37 #include "catalog/pg_proc.h"
38 #include "catalog/pg_type.h"
39 #include "executor/executor.h"
40 #include "executor/nodeWindowAgg.h"
41 #include "miscadmin.h"
42 #include "nodes/nodeFuncs.h"
43 #include "optimizer/clauses.h"
44 #include "parser/parse_agg.h"
45 #include "parser/parse_coerce.h"
46 #include "utils/acl.h"
47 #include "utils/builtins.h"
48 #include "utils/datum.h"
49 #include "utils/lsyscache.h"
50 #include "utils/memutils.h"
51 #include "utils/syscache.h"
52 #include "windowapi.h"
53
54 /*
55  * All the window function APIs are called with this object, which is passed
56  * to window functions as fcinfo->context.
57  */
58 typedef struct WindowObjectData
59 {
60         NodeTag         type;
61         WindowAggState *winstate;       /* parent WindowAggState */
62         List       *argstates;          /* ExprState trees for fn's arguments */
63         void       *localmem;           /* WinGetPartitionLocalMemory's chunk */
64         int                     markptr;                /* tuplestore mark pointer for this fn */
65         int                     readptr;                /* tuplestore read pointer for this fn */
66         int64           markpos;                /* row that markptr is positioned on */
67         int64           seekpos;                /* row that readptr is positioned on */
68 } WindowObjectData;
69
70 /*
71  * We have one WindowStatePerFunc struct for each window function and
72  * window aggregate handled by this node.
73  */
74 typedef struct WindowStatePerFuncData
75 {
76         /* Links to WindowFunc expr and state nodes this working state is for */
77         WindowFuncExprState *wfuncstate;
78         WindowFunc *wfunc;
79
80         int                     numArguments;   /* number of arguments */
81
82         FmgrInfo        flinfo;                 /* fmgr lookup data for window function */
83
84         /*
85          * We need the len and byval info for the result of each function in order
86          * to know how to copy/delete values.
87          */
88         int16           resulttypeLen;
89         bool            resulttypeByVal;
90
91         bool            plain_agg;              /* is it just a plain aggregate function? */
92         int                     aggno;                  /* if so, index of its PerAggData */
93
94         WindowObject winobj;            /* object used in window function API */
95 } WindowStatePerFuncData;
96
97 /*
98  * For plain aggregate window functions, we also have one of these.
99  */
100 typedef struct WindowStatePerAggData
101 {
102         /* Oids of transfer functions */
103         Oid                     transfn_oid;
104         Oid                     finalfn_oid;    /* may be InvalidOid */
105
106         /*
107          * fmgr lookup data for transfer functions --- only valid when
108          * corresponding oid is not InvalidOid.  Note in particular that fn_strict
109          * flags are kept here.
110          */
111         FmgrInfo        transfn;
112         FmgrInfo        finalfn;
113
114         /*
115          * initial value from pg_aggregate entry
116          */
117         Datum           initValue;
118         bool            initValueIsNull;
119
120         /*
121          * cached value for current frame boundaries
122          */
123         Datum           resultValue;
124         bool            resultValueIsNull;
125
126         /*
127          * We need the len and byval info for the agg's input, result, and
128          * transition data types in order to know how to copy/delete values.
129          */
130         int16           inputtypeLen,
131                                 resulttypeLen,
132                                 transtypeLen;
133         bool            inputtypeByVal,
134                                 resulttypeByVal,
135                                 transtypeByVal;
136
137         int                     wfuncno;                /* index of associated PerFuncData */
138
139         /* Current transition value */
140         Datum           transValue;             /* current transition value */
141         bool            transValueIsNull;
142
143         bool            noTransValue;   /* true if transValue not set yet */
144 } WindowStatePerAggData;
145
146 static void initialize_windowaggregate(WindowAggState *winstate,
147                                                    WindowStatePerFunc perfuncstate,
148                                                    WindowStatePerAgg peraggstate);
149 static void advance_windowaggregate(WindowAggState *winstate,
150                                                 WindowStatePerFunc perfuncstate,
151                                                 WindowStatePerAgg peraggstate);
152 static void finalize_windowaggregate(WindowAggState *winstate,
153                                                  WindowStatePerFunc perfuncstate,
154                                                  WindowStatePerAgg peraggstate,
155                                                  Datum *result, bool *isnull);
156
157 static void eval_windowaggregates(WindowAggState *winstate);
158 static void eval_windowfunction(WindowAggState *winstate,
159                                         WindowStatePerFunc perfuncstate,
160                                         Datum *result, bool *isnull);
161
162 static void begin_partition(WindowAggState *winstate);
163 static void spool_tuples(WindowAggState *winstate, int64 pos);
164 static void release_partition(WindowAggState *winstate);
165
166 static bool row_is_in_frame(WindowAggState *winstate, int64 pos,
167                                 TupleTableSlot *slot);
168 static void update_frameheadpos(WindowObject winobj, TupleTableSlot *slot);
169 static void update_frametailpos(WindowObject winobj, TupleTableSlot *slot);
170
171 static WindowStatePerAggData *initialize_peragg(WindowAggState *winstate,
172                                   WindowFunc *wfunc,
173                                   WindowStatePerAgg peraggstate);
174 static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
175
176 static bool are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
177                   TupleTableSlot *slot2);
178 static bool window_gettupleslot(WindowObject winobj, int64 pos,
179                                         TupleTableSlot *slot);
180
181
182 /*
183  * initialize_windowaggregate
184  * parallel to initialize_aggregates in nodeAgg.c
185  */
186 static void
187 initialize_windowaggregate(WindowAggState *winstate,
188                                                    WindowStatePerFunc perfuncstate,
189                                                    WindowStatePerAgg peraggstate)
190 {
191         MemoryContext oldContext;
192
193         if (peraggstate->initValueIsNull)
194                 peraggstate->transValue = peraggstate->initValue;
195         else
196         {
197                 oldContext = MemoryContextSwitchTo(winstate->aggcontext);
198                 peraggstate->transValue = datumCopy(peraggstate->initValue,
199                                                                                         peraggstate->transtypeByVal,
200                                                                                         peraggstate->transtypeLen);
201                 MemoryContextSwitchTo(oldContext);
202         }
203         peraggstate->transValueIsNull = peraggstate->initValueIsNull;
204         peraggstate->noTransValue = peraggstate->initValueIsNull;
205         peraggstate->resultValueIsNull = true;
206 }
207
208 /*
209  * advance_windowaggregate
210  * parallel to advance_aggregates in nodeAgg.c
211  */
212 static void
213 advance_windowaggregate(WindowAggState *winstate,
214                                                 WindowStatePerFunc perfuncstate,
215                                                 WindowStatePerAgg peraggstate)
216 {
217         WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
218         int                     numArguments = perfuncstate->numArguments;
219         FunctionCallInfoData fcinfodata;
220         FunctionCallInfo fcinfo = &fcinfodata;
221         Datum           newVal;
222         ListCell   *arg;
223         int                     i;
224         MemoryContext oldContext;
225         ExprContext *econtext = winstate->tmpcontext;
226
227         oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
228
229         /* We start from 1, since the 0th arg will be the transition value */
230         i = 1;
231         foreach(arg, wfuncstate->args)
232         {
233                 ExprState  *argstate = (ExprState *) lfirst(arg);
234
235                 fcinfo->arg[i] = ExecEvalExpr(argstate, econtext,
236                                                                           &fcinfo->argnull[i], NULL);
237                 i++;
238         }
239
240         if (peraggstate->transfn.fn_strict)
241         {
242                 /*
243                  * For a strict transfn, nothing happens when there's a NULL input; we
244                  * just keep the prior transValue.
245                  */
246                 for (i = 1; i <= numArguments; i++)
247                 {
248                         if (fcinfo->argnull[i])
249                         {
250                                 MemoryContextSwitchTo(oldContext);
251                                 return;
252                         }
253                 }
254                 if (peraggstate->noTransValue)
255                 {
256                         /*
257                          * transValue has not been initialized. This is the first non-NULL
258                          * input value. We use it as the initial value for transValue. (We
259                          * already checked that the agg's input type is binary-compatible
260                          * with its transtype, so straight copy here is OK.)
261                          *
262                          * We must copy the datum into aggcontext if it is pass-by-ref. We
263                          * do not need to pfree the old transValue, since it's NULL.
264                          */
265                         MemoryContextSwitchTo(winstate->aggcontext);
266                         peraggstate->transValue = datumCopy(fcinfo->arg[1],
267                                                                                                 peraggstate->transtypeByVal,
268                                                                                                 peraggstate->transtypeLen);
269                         peraggstate->transValueIsNull = false;
270                         peraggstate->noTransValue = false;
271                         MemoryContextSwitchTo(oldContext);
272                         return;
273                 }
274                 if (peraggstate->transValueIsNull)
275                 {
276                         /*
277                          * Don't call a strict function with NULL inputs.  Note it is
278                          * possible to get here despite the above tests, if the transfn is
279                          * strict *and* returned a NULL on a prior cycle. If that happens
280                          * we will propagate the NULL all the way to the end.
281                          */
282                         MemoryContextSwitchTo(oldContext);
283                         return;
284                 }
285         }
286
287         /*
288          * OK to call the transition function
289          */
290         InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn),
291                                                          numArguments + 1,
292                                                          (void *) winstate, NULL);
293         fcinfo->arg[0] = peraggstate->transValue;
294         fcinfo->argnull[0] = peraggstate->transValueIsNull;
295         newVal = FunctionCallInvoke(fcinfo);
296
297         /*
298          * If pass-by-ref datatype, must copy the new value into aggcontext and
299          * pfree the prior transValue.  But if transfn returned a pointer to its
300          * first input, we don't need to do anything.
301          */
302         if (!peraggstate->transtypeByVal &&
303                 DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
304         {
305                 if (!fcinfo->isnull)
306                 {
307                         MemoryContextSwitchTo(winstate->aggcontext);
308                         newVal = datumCopy(newVal,
309                                                            peraggstate->transtypeByVal,
310                                                            peraggstate->transtypeLen);
311                 }
312                 if (!peraggstate->transValueIsNull)
313                         pfree(DatumGetPointer(peraggstate->transValue));
314         }
315
316         MemoryContextSwitchTo(oldContext);
317         peraggstate->transValue = newVal;
318         peraggstate->transValueIsNull = fcinfo->isnull;
319 }
320
321 /*
322  * finalize_windowaggregate
323  * parallel to finalize_aggregate in nodeAgg.c
324  */
325 static void
326 finalize_windowaggregate(WindowAggState *winstate,
327                                                  WindowStatePerFunc perfuncstate,
328                                                  WindowStatePerAgg peraggstate,
329                                                  Datum *result, bool *isnull)
330 {
331         MemoryContext oldContext;
332
333         oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
334
335         /*
336          * Apply the agg's finalfn if one is provided, else return transValue.
337          */
338         if (OidIsValid(peraggstate->finalfn_oid))
339         {
340                 FunctionCallInfoData fcinfo;
341
342                 InitFunctionCallInfoData(fcinfo, &(peraggstate->finalfn), 1,
343                                                                  (void *) winstate, NULL);
344                 fcinfo.arg[0] = peraggstate->transValue;
345                 fcinfo.argnull[0] = peraggstate->transValueIsNull;
346                 if (fcinfo.flinfo->fn_strict && peraggstate->transValueIsNull)
347                 {
348                         /* don't call a strict function with NULL inputs */
349                         *result = (Datum) 0;
350                         *isnull = true;
351                 }
352                 else
353                 {
354                         *result = FunctionCallInvoke(&fcinfo);
355                         *isnull = fcinfo.isnull;
356                 }
357         }
358         else
359         {
360                 *result = peraggstate->transValue;
361                 *isnull = peraggstate->transValueIsNull;
362         }
363
364         /*
365          * If result is pass-by-ref, make sure it is in the right context.
366          */
367         if (!peraggstate->resulttypeByVal && !*isnull &&
368                 !MemoryContextContains(CurrentMemoryContext,
369                                                            DatumGetPointer(*result)))
370                 *result = datumCopy(*result,
371                                                         peraggstate->resulttypeByVal,
372                                                         peraggstate->resulttypeLen);
373         MemoryContextSwitchTo(oldContext);
374 }
375
376 /*
377  * eval_windowaggregates
378  * evaluate plain aggregates being used as window functions
379  *
380  * Much of this is duplicated from nodeAgg.c.  But NOTE that we expect to be
381  * able to call aggregate final functions repeatedly after aggregating more
382  * data onto the same transition value.  This is not a behavior required by
383  * nodeAgg.c.
384  */
385 static void
386 eval_windowaggregates(WindowAggState *winstate)
387 {
388         WindowStatePerAgg peraggstate;
389         int                     wfuncno,
390                                 numaggs;
391         int                     i;
392         MemoryContext oldContext;
393         ExprContext *econtext;
394         WindowObject agg_winobj;
395         TupleTableSlot *agg_row_slot;
396
397         numaggs = winstate->numaggs;
398         if (numaggs == 0)
399                 return;                                 /* nothing to do */
400
401         /* final output execution is in ps_ExprContext */
402         econtext = winstate->ss.ps.ps_ExprContext;
403         agg_winobj = winstate->agg_winobj;
404         agg_row_slot = winstate->agg_row_slot;
405
406         /*
407          * Currently, we support only a subset of the SQL-standard window framing
408          * rules.
409          *
410          * If the frame start is UNBOUNDED_PRECEDING, the window frame consists of
411          * a contiguous group of rows extending forward from the start of the
412          * partition, and rows only enter the frame, never exit it, as the current
413          * row advances forward.  This makes it possible to use an incremental
414          * strategy for evaluating aggregates: we run the transition function for
415          * each row added to the frame, and run the final function whenever we
416          * need the current aggregate value.  This is considerably more efficient
417          * than the naive approach of re-running the entire aggregate calculation
418          * for each current row.  It does assume that the final function doesn't
419          * damage the running transition value, but we have the same assumption in
420          * nodeAgg.c too (when it rescans an existing hash table).
421          *
422          * For other frame start rules, we discard the aggregate state and re-run
423          * the aggregates whenever the frame head row moves.  We can still
424          * optimize as above whenever successive rows share the same frame head.
425          *
426          * In many common cases, multiple rows share the same frame and hence the
427          * same aggregate value. (In particular, if there's no ORDER BY in a RANGE
428          * window, then all rows are peers and so they all have window frame equal
429          * to the whole partition.)  We optimize such cases by calculating the
430          * aggregate value once when we reach the first row of a peer group, and
431          * then returning the saved value for all subsequent rows.
432          *
433          * 'aggregatedupto' keeps track of the first row that has not yet been
434          * accumulated into the aggregate transition values.  Whenever we start a
435          * new peer group, we accumulate forward to the end of the peer group.
436          *
437          * TODO: Rerunning aggregates from the frame start can be pretty slow. For
438          * some aggregates like SUM and COUNT we could avoid that by implementing
439          * a "negative transition function" that would be called for each row as
440          * it exits the frame.  We'd have to think about avoiding recalculation of
441          * volatile arguments of aggregate functions, too.
442          */
443
444         /*
445          * First, update the frame head position.
446          */
447         update_frameheadpos(agg_winobj, winstate->temp_slot_1);
448
449         /*
450          * Initialize aggregates on first call for partition, or if the frame head
451          * position moved since last time.
452          */
453         if (winstate->currentpos == 0 ||
454                 winstate->frameheadpos != winstate->aggregatedbase)
455         {
456                 /*
457                  * Discard transient aggregate values
458                  */
459                 MemoryContextResetAndDeleteChildren(winstate->aggcontext);
460
461                 for (i = 0; i < numaggs; i++)
462                 {
463                         peraggstate = &winstate->peragg[i];
464                         wfuncno = peraggstate->wfuncno;
465                         initialize_windowaggregate(winstate,
466                                                                            &winstate->perfunc[wfuncno],
467                                                                            peraggstate);
468                 }
469
470                 /*
471                  * If we created a mark pointer for aggregates, keep it pushed up to
472                  * frame head, so that tuplestore can discard unnecessary rows.
473                  */
474                 if (agg_winobj->markptr >= 0)
475                         WinSetMarkPosition(agg_winobj, winstate->frameheadpos);
476
477                 /*
478                  * Initialize for loop below
479                  */
480                 ExecClearTuple(agg_row_slot);
481                 winstate->aggregatedbase = winstate->frameheadpos;
482                 winstate->aggregatedupto = winstate->frameheadpos;
483         }
484
485         /*
486          * In UNBOUNDED_FOLLOWING mode, we don't have to recalculate aggregates
487          * except when the frame head moves.  In END_CURRENT_ROW mode, we only
488          * have to recalculate when the frame head moves or currentpos has
489          * advanced past the place we'd aggregated up to.  Check for these cases
490          * and if so, reuse the saved result values.
491          */
492         if ((winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING |
493                                                                    FRAMEOPTION_END_CURRENT_ROW)) &&
494                 winstate->aggregatedbase <= winstate->currentpos &&
495                 winstate->aggregatedupto > winstate->currentpos)
496         {
497                 for (i = 0; i < numaggs; i++)
498                 {
499                         peraggstate = &winstate->peragg[i];
500                         wfuncno = peraggstate->wfuncno;
501                         econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue;
502                         econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull;
503                 }
504                 return;
505         }
506
507         /*
508          * Advance until we reach a row not in frame (or end of partition).
509          *
510          * Note the loop invariant: agg_row_slot is either empty or holds the row
511          * at position aggregatedupto.  We advance aggregatedupto after processing
512          * a row.
513          */
514         for (;;)
515         {
516                 /* Fetch next row if we didn't already */
517                 if (TupIsNull(agg_row_slot))
518                 {
519                         if (!window_gettupleslot(agg_winobj, winstate->aggregatedupto,
520                                                                          agg_row_slot))
521                                 break;                  /* must be end of partition */
522                 }
523
524                 /* Exit loop (for now) if not in frame */
525                 if (!row_is_in_frame(winstate, winstate->aggregatedupto, agg_row_slot))
526                         break;
527
528                 /* Set tuple context for evaluation of aggregate arguments */
529                 winstate->tmpcontext->ecxt_outertuple = agg_row_slot;
530
531                 /* Accumulate row into the aggregates */
532                 for (i = 0; i < numaggs; i++)
533                 {
534                         peraggstate = &winstate->peragg[i];
535                         wfuncno = peraggstate->wfuncno;
536                         advance_windowaggregate(winstate,
537                                                                         &winstate->perfunc[wfuncno],
538                                                                         peraggstate);
539                 }
540
541                 /* Reset per-input-tuple context after each tuple */
542                 ResetExprContext(winstate->tmpcontext);
543
544                 /* And advance the aggregated-row state */
545                 winstate->aggregatedupto++;
546                 ExecClearTuple(agg_row_slot);
547         }
548
549         /*
550          * finalize aggregates and fill result/isnull fields.
551          */
552         for (i = 0; i < numaggs; i++)
553         {
554                 Datum      *result;
555                 bool       *isnull;
556
557                 peraggstate = &winstate->peragg[i];
558                 wfuncno = peraggstate->wfuncno;
559                 result = &econtext->ecxt_aggvalues[wfuncno];
560                 isnull = &econtext->ecxt_aggnulls[wfuncno];
561                 finalize_windowaggregate(winstate,
562                                                                  &winstate->perfunc[wfuncno],
563                                                                  peraggstate,
564                                                                  result, isnull);
565
566                 /*
567                  * save the result in case next row shares the same frame.
568                  *
569                  * XXX in some framing modes, eg ROWS/END_CURRENT_ROW, we can know in
570                  * advance that the next row can't possibly share the same frame. Is
571                  * it worth detecting that and skipping this code?
572                  */
573                 if (!peraggstate->resulttypeByVal)
574                 {
575                         /*
576                          * clear old resultValue in order not to leak memory.  (Note: the
577                          * new result can't possibly be the same datum as old resultValue,
578                          * because we never passed it to the trans function.)
579                          */
580                         if (!peraggstate->resultValueIsNull)
581                                 pfree(DatumGetPointer(peraggstate->resultValue));
582
583                         /*
584                          * If pass-by-ref, copy it into our aggregate context.
585                          */
586                         if (!*isnull)
587                         {
588                                 oldContext = MemoryContextSwitchTo(winstate->aggcontext);
589                                 peraggstate->resultValue =
590                                         datumCopy(*result,
591                                                           peraggstate->resulttypeByVal,
592                                                           peraggstate->resulttypeLen);
593                                 MemoryContextSwitchTo(oldContext);
594                         }
595                 }
596                 else
597                 {
598                         peraggstate->resultValue = *result;
599                 }
600                 peraggstate->resultValueIsNull = *isnull;
601         }
602 }
603
604 /*
605  * eval_windowfunction
606  *
607  * Arguments of window functions are not evaluated here, because a window
608  * function can need random access to arbitrary rows in the partition.
609  * The window function uses the special WinGetFuncArgInPartition and
610  * WinGetFuncArgInFrame functions to evaluate the arguments for the rows
611  * it wants.
612  */
613 static void
614 eval_windowfunction(WindowAggState *winstate, WindowStatePerFunc perfuncstate,
615                                         Datum *result, bool *isnull)
616 {
617         FunctionCallInfoData fcinfo;
618         MemoryContext oldContext;
619
620         oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
621
622         /*
623          * We don't pass any normal arguments to a window function, but we do pass
624          * it the number of arguments, in order to permit window function
625          * implementations to support varying numbers of arguments.  The real info
626          * goes through the WindowObject, which is passed via fcinfo->context.
627          */
628         InitFunctionCallInfoData(fcinfo, &(perfuncstate->flinfo),
629                                                          perfuncstate->numArguments,
630                                                          (void *) perfuncstate->winobj, NULL);
631         /* Just in case, make all the regular argument slots be null */
632         memset(fcinfo.argnull, true, perfuncstate->numArguments);
633
634         *result = FunctionCallInvoke(&fcinfo);
635         *isnull = fcinfo.isnull;
636
637         /*
638          * Make sure pass-by-ref data is allocated in the appropriate context. (We
639          * need this in case the function returns a pointer into some short-lived
640          * tuple, as is entirely possible.)
641          */
642         if (!perfuncstate->resulttypeByVal && !fcinfo.isnull &&
643                 !MemoryContextContains(CurrentMemoryContext,
644                                                            DatumGetPointer(*result)))
645                 *result = datumCopy(*result,
646                                                         perfuncstate->resulttypeByVal,
647                                                         perfuncstate->resulttypeLen);
648
649         MemoryContextSwitchTo(oldContext);
650 }
651
652 /*
653  * begin_partition
654  * Start buffering rows of the next partition.
655  */
656 static void
657 begin_partition(WindowAggState *winstate)
658 {
659         PlanState  *outerPlan = outerPlanState(winstate);
660         int                     numfuncs = winstate->numfuncs;
661         int                     i;
662
663         winstate->partition_spooled = false;
664         winstate->framehead_valid = false;
665         winstate->frametail_valid = false;
666         winstate->spooled_rows = 0;
667         winstate->currentpos = 0;
668         winstate->frameheadpos = 0;
669         winstate->frametailpos = -1;
670         ExecClearTuple(winstate->agg_row_slot);
671
672         /*
673          * If this is the very first partition, we need to fetch the first input
674          * row to store in first_part_slot.
675          */
676         if (TupIsNull(winstate->first_part_slot))
677         {
678                 TupleTableSlot *outerslot = ExecProcNode(outerPlan);
679
680                 if (!TupIsNull(outerslot))
681                         ExecCopySlot(winstate->first_part_slot, outerslot);
682                 else
683                 {
684                         /* outer plan is empty, so we have nothing to do */
685                         winstate->partition_spooled = true;
686                         winstate->more_partitions = false;
687                         return;
688                 }
689         }
690
691         /* Create new tuplestore for this partition */
692         winstate->buffer = tuplestore_begin_heap(false, false, work_mem);
693
694         /*
695          * Set up read pointers for the tuplestore.  The current pointer doesn't
696          * need BACKWARD capability, but the per-window-function read pointers do,
697          * and the aggregate pointer does if frame start is movable.
698          */
699         winstate->current_ptr = 0;      /* read pointer 0 is pre-allocated */
700
701         /* reset default REWIND capability bit for current ptr */
702         tuplestore_set_eflags(winstate->buffer, 0);
703
704         /* create read pointers for aggregates, if needed */
705         if (winstate->numaggs > 0)
706         {
707                 WindowObject agg_winobj = winstate->agg_winobj;
708                 int                     readptr_flags = 0;
709
710                 /* If the frame head is potentially movable ... */
711                 if (!(winstate->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
712                 {
713                         /* ... create a mark pointer to track the frame head */
714                         agg_winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer, 0);
715                         /* and the read pointer will need BACKWARD capability */
716                         readptr_flags |= EXEC_FLAG_BACKWARD;
717                 }
718
719                 agg_winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
720                                                                                                                         readptr_flags);
721                 agg_winobj->markpos = -1;
722                 agg_winobj->seekpos = -1;
723
724                 /* Also reset the row counters for aggregates */
725                 winstate->aggregatedbase = 0;
726                 winstate->aggregatedupto = 0;
727         }
728
729         /* create mark and read pointers for each real window function */
730         for (i = 0; i < numfuncs; i++)
731         {
732                 WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
733
734                 if (!perfuncstate->plain_agg)
735                 {
736                         WindowObject winobj = perfuncstate->winobj;
737
738                         winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer,
739                                                                                                                         0);
740                         winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
741                                                                                                                  EXEC_FLAG_BACKWARD);
742                         winobj->markpos = -1;
743                         winobj->seekpos = -1;
744                 }
745         }
746
747         /*
748          * Store the first tuple into the tuplestore (it's always available now;
749          * we either read it above, or saved it at the end of previous partition)
750          */
751         tuplestore_puttupleslot(winstate->buffer, winstate->first_part_slot);
752         winstate->spooled_rows++;
753 }
754
755 /*
756  * Read tuples from the outer node, up to and including position 'pos', and
757  * store them into the tuplestore. If pos is -1, reads the whole partition.
758  */
759 static void
760 spool_tuples(WindowAggState *winstate, int64 pos)
761 {
762         WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
763         PlanState  *outerPlan;
764         TupleTableSlot *outerslot;
765         MemoryContext oldcontext;
766
767         if (!winstate->buffer)
768                 return;                                 /* just a safety check */
769         if (winstate->partition_spooled)
770                 return;                                 /* whole partition done already */
771
772         /*
773          * If the tuplestore has spilled to disk, alternate reading and writing
774          * becomes quite expensive due to frequent buffer flushes.      It's cheaper
775          * to force the entire partition to get spooled in one go.
776          *
777          * XXX this is a horrid kluge --- it'd be better to fix the performance
778          * problem inside tuplestore.  FIXME
779          */
780         if (!tuplestore_in_memory(winstate->buffer))
781                 pos = -1;
782
783         outerPlan = outerPlanState(winstate);
784
785         /* Must be in query context to call outerplan */
786         oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
787
788         while (winstate->spooled_rows <= pos || pos == -1)
789         {
790                 outerslot = ExecProcNode(outerPlan);
791                 if (TupIsNull(outerslot))
792                 {
793                         /* reached the end of the last partition */
794                         winstate->partition_spooled = true;
795                         winstate->more_partitions = false;
796                         break;
797                 }
798
799                 if (node->partNumCols > 0)
800                 {
801                         /* Check if this tuple still belongs to the current partition */
802                         if (!execTuplesMatch(winstate->first_part_slot,
803                                                                  outerslot,
804                                                                  node->partNumCols, node->partColIdx,
805                                                                  winstate->partEqfunctions,
806                                                                  winstate->tmpcontext->ecxt_per_tuple_memory))
807                         {
808                                 /*
809                                  * end of partition; copy the tuple for the next cycle.
810                                  */
811                                 ExecCopySlot(winstate->first_part_slot, outerslot);
812                                 winstate->partition_spooled = true;
813                                 winstate->more_partitions = true;
814                                 break;
815                         }
816                 }
817
818                 /* Still in partition, so save it into the tuplestore */
819                 tuplestore_puttupleslot(winstate->buffer, outerslot);
820                 winstate->spooled_rows++;
821         }
822
823         MemoryContextSwitchTo(oldcontext);
824 }
825
826 /*
827  * release_partition
828  * clear information kept within a partition, including
829  * tuplestore and aggregate results.
830  */
831 static void
832 release_partition(WindowAggState *winstate)
833 {
834         int                     i;
835
836         for (i = 0; i < winstate->numfuncs; i++)
837         {
838                 WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
839
840                 /* Release any partition-local state of this window function */
841                 if (perfuncstate->winobj)
842                         perfuncstate->winobj->localmem = NULL;
843         }
844
845         /*
846          * Release all partition-local memory (in particular, any partition-local
847          * state that we might have trashed our pointers to in the above loop, and
848          * any aggregate temp data).  We don't rely on retail pfree because some
849          * aggregates might have allocated data we don't have direct pointers to.
850          */
851         MemoryContextResetAndDeleteChildren(winstate->partcontext);
852         MemoryContextResetAndDeleteChildren(winstate->aggcontext);
853
854         if (winstate->buffer)
855                 tuplestore_end(winstate->buffer);
856         winstate->buffer = NULL;
857         winstate->partition_spooled = false;
858 }
859
860 /*
861  * row_is_in_frame
862  * Determine whether a row is in the current row's window frame according
863  * to our window framing rule
864  *
865  * The caller must have already determined that the row is in the partition
866  * and fetched it into a slot.  This function just encapsulates the framing
867  * rules.
868  */
869 static bool
870 row_is_in_frame(WindowAggState *winstate, int64 pos, TupleTableSlot *slot)
871 {
872         int                     frameOptions = winstate->frameOptions;
873
874         Assert(pos >= 0);                       /* else caller error */
875
876         /* First, check frame starting conditions */
877         if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
878         {
879                 if (frameOptions & FRAMEOPTION_ROWS)
880                 {
881                         /* rows before current row are out of frame */
882                         if (pos < winstate->currentpos)
883                                 return false;
884                 }
885                 else if (frameOptions & FRAMEOPTION_RANGE)
886                 {
887                         /* preceding row that is not peer is out of frame */
888                         if (pos < winstate->currentpos &&
889                                 !are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
890                                 return false;
891                 }
892                 else
893                         Assert(false);
894         }
895         else if (frameOptions & FRAMEOPTION_START_VALUE)
896         {
897                 if (frameOptions & FRAMEOPTION_ROWS)
898                 {
899                         int64           offset = DatumGetInt64(winstate->startOffsetValue);
900
901                         /* rows before current row + offset are out of frame */
902                         if (frameOptions & FRAMEOPTION_START_VALUE_PRECEDING)
903                                 offset = -offset;
904
905                         if (pos < winstate->currentpos + offset)
906                                 return false;
907                 }
908                 else if (frameOptions & FRAMEOPTION_RANGE)
909                 {
910                         /* parser should have rejected this */
911                         elog(ERROR, "window frame with value offset is not implemented");
912                 }
913                 else
914                         Assert(false);
915         }
916
917         /* Okay so far, now check frame ending conditions */
918         if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
919         {
920                 if (frameOptions & FRAMEOPTION_ROWS)
921                 {
922                         /* rows after current row are out of frame */
923                         if (pos > winstate->currentpos)
924                                 return false;
925                 }
926                 else if (frameOptions & FRAMEOPTION_RANGE)
927                 {
928                         /* following row that is not peer is out of frame */
929                         if (pos > winstate->currentpos &&
930                                 !are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
931                                 return false;
932                 }
933                 else
934                         Assert(false);
935         }
936         else if (frameOptions & FRAMEOPTION_END_VALUE)
937         {
938                 if (frameOptions & FRAMEOPTION_ROWS)
939                 {
940                         int64           offset = DatumGetInt64(winstate->endOffsetValue);
941
942                         /* rows after current row + offset are out of frame */
943                         if (frameOptions & FRAMEOPTION_END_VALUE_PRECEDING)
944                                 offset = -offset;
945
946                         if (pos > winstate->currentpos + offset)
947                                 return false;
948                 }
949                 else if (frameOptions & FRAMEOPTION_RANGE)
950                 {
951                         /* parser should have rejected this */
952                         elog(ERROR, "window frame with value offset is not implemented");
953                 }
954                 else
955                         Assert(false);
956         }
957
958         /* If we get here, it's in frame */
959         return true;
960 }
961
962 /*
963  * update_frameheadpos
964  * make frameheadpos valid for the current row
965  *
966  * Uses the winobj's read pointer for any required fetches; hence, if the
967  * frame mode is one that requires row comparisons, the winobj's mark must
968  * not be past the currently known frame head.  Also uses the specified slot
969  * for any required fetches.
970  */
971 static void
972 update_frameheadpos(WindowObject winobj, TupleTableSlot *slot)
973 {
974         WindowAggState *winstate = winobj->winstate;
975         WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
976         int                     frameOptions = winstate->frameOptions;
977
978         if (winstate->framehead_valid)
979                 return;                                 /* already known for current row */
980
981         if (frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)
982         {
983                 /* In UNBOUNDED PRECEDING mode, frame head is always row 0 */
984                 winstate->frameheadpos = 0;
985                 winstate->framehead_valid = true;
986         }
987         else if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
988         {
989                 if (frameOptions & FRAMEOPTION_ROWS)
990                 {
991                         /* In ROWS mode, frame head is the same as current */
992                         winstate->frameheadpos = winstate->currentpos;
993                         winstate->framehead_valid = true;
994                 }
995                 else if (frameOptions & FRAMEOPTION_RANGE)
996                 {
997                         int64           fhprev;
998
999                         /* If no ORDER BY, all rows are peers with each other */
1000                         if (node->ordNumCols == 0)
1001                         {
1002                                 winstate->frameheadpos = 0;
1003                                 winstate->framehead_valid = true;
1004                                 return;
1005                         }
1006
1007                         /*
1008                          * In RANGE START_CURRENT mode, frame head is the first row that
1009                          * is a peer of current row.  We search backwards from current,
1010                          * which could be a bit inefficient if peer sets are large. Might
1011                          * be better to have a separate read pointer that moves forward
1012                          * tracking the frame head.
1013                          */
1014                         fhprev = winstate->currentpos - 1;
1015                         for (;;)
1016                         {
1017                                 /* assume the frame head can't go backwards */
1018                                 if (fhprev < winstate->frameheadpos)
1019                                         break;
1020                                 if (!window_gettupleslot(winobj, fhprev, slot))
1021                                         break;          /* start of partition */
1022                                 if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
1023                                         break;          /* not peer of current row */
1024                                 fhprev--;
1025                         }
1026                         winstate->frameheadpos = fhprev + 1;
1027                         winstate->framehead_valid = true;
1028                 }
1029                 else
1030                         Assert(false);
1031         }
1032         else if (frameOptions & FRAMEOPTION_START_VALUE)
1033         {
1034                 if (frameOptions & FRAMEOPTION_ROWS)
1035                 {
1036                         /* In ROWS mode, bound is physically n before/after current */
1037                         int64           offset = DatumGetInt64(winstate->startOffsetValue);
1038
1039                         if (frameOptions & FRAMEOPTION_START_VALUE_PRECEDING)
1040                                 offset = -offset;
1041
1042                         winstate->frameheadpos = winstate->currentpos + offset;
1043                         /* frame head can't go before first row */
1044                         if (winstate->frameheadpos < 0)
1045                                 winstate->frameheadpos = 0;
1046                         else if (winstate->frameheadpos > winstate->currentpos)
1047                         {
1048                                 /* make sure frameheadpos is not past end of partition */
1049                                 spool_tuples(winstate, winstate->frameheadpos - 1);
1050                                 if (winstate->frameheadpos > winstate->spooled_rows)
1051                                         winstate->frameheadpos = winstate->spooled_rows;
1052                         }
1053                         winstate->framehead_valid = true;
1054                 }
1055                 else if (frameOptions & FRAMEOPTION_RANGE)
1056                 {
1057                         /* parser should have rejected this */
1058                         elog(ERROR, "window frame with value offset is not implemented");
1059                 }
1060                 else
1061                         Assert(false);
1062         }
1063         else
1064                 Assert(false);
1065 }
1066
1067 /*
1068  * update_frametailpos
1069  * make frametailpos valid for the current row
1070  *
1071  * Uses the winobj's read pointer for any required fetches; hence, if the
1072  * frame mode is one that requires row comparisons, the winobj's mark must
1073  * not be past the currently known frame tail.  Also uses the specified slot
1074  * for any required fetches.
1075  */
1076 static void
1077 update_frametailpos(WindowObject winobj, TupleTableSlot *slot)
1078 {
1079         WindowAggState *winstate = winobj->winstate;
1080         WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
1081         int                     frameOptions = winstate->frameOptions;
1082
1083         if (winstate->frametail_valid)
1084                 return;                                 /* already known for current row */
1085
1086         if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING)
1087         {
1088                 /* In UNBOUNDED FOLLOWING mode, all partition rows are in frame */
1089                 spool_tuples(winstate, -1);
1090                 winstate->frametailpos = winstate->spooled_rows - 1;
1091                 winstate->frametail_valid = true;
1092         }
1093         else if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
1094         {
1095                 if (frameOptions & FRAMEOPTION_ROWS)
1096                 {
1097                         /* In ROWS mode, exactly the rows up to current are in frame */
1098                         winstate->frametailpos = winstate->currentpos;
1099                         winstate->frametail_valid = true;
1100                 }
1101                 else if (frameOptions & FRAMEOPTION_RANGE)
1102                 {
1103                         int64           ftnext;
1104
1105                         /* If no ORDER BY, all rows are peers with each other */
1106                         if (node->ordNumCols == 0)
1107                         {
1108                                 spool_tuples(winstate, -1);
1109                                 winstate->frametailpos = winstate->spooled_rows - 1;
1110                                 winstate->frametail_valid = true;
1111                                 return;
1112                         }
1113
1114                         /*
1115                          * Else we have to search for the first non-peer of the current
1116                          * row.  We assume the current value of frametailpos is a lower
1117                          * bound on the possible frame tail location, ie, frame tail never
1118                          * goes backward, and that currentpos is also a lower bound, ie,
1119                          * frame end always >= current row.
1120                          */
1121                         ftnext = Max(winstate->frametailpos, winstate->currentpos) + 1;
1122                         for (;;)
1123                         {
1124                                 if (!window_gettupleslot(winobj, ftnext, slot))
1125                                         break;          /* end of partition */
1126                                 if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
1127                                         break;          /* not peer of current row */
1128                                 ftnext++;
1129                         }
1130                         winstate->frametailpos = ftnext - 1;
1131                         winstate->frametail_valid = true;
1132                 }
1133                 else
1134                         Assert(false);
1135         }
1136         else if (frameOptions & FRAMEOPTION_END_VALUE)
1137         {
1138                 if (frameOptions & FRAMEOPTION_ROWS)
1139                 {
1140                         /* In ROWS mode, bound is physically n before/after current */
1141                         int64           offset = DatumGetInt64(winstate->endOffsetValue);
1142
1143                         if (frameOptions & FRAMEOPTION_END_VALUE_PRECEDING)
1144                                 offset = -offset;
1145
1146                         winstate->frametailpos = winstate->currentpos + offset;
1147                         /* smallest allowable value of frametailpos is -1 */
1148                         if (winstate->frametailpos < 0)
1149                                 winstate->frametailpos = -1;
1150                         else if (winstate->frametailpos > winstate->currentpos)
1151                         {
1152                                 /* make sure frametailpos is not past last row of partition */
1153                                 spool_tuples(winstate, winstate->frametailpos);
1154                                 if (winstate->frametailpos >= winstate->spooled_rows)
1155                                         winstate->frametailpos = winstate->spooled_rows - 1;
1156                         }
1157                         winstate->frametail_valid = true;
1158                 }
1159                 else if (frameOptions & FRAMEOPTION_RANGE)
1160                 {
1161                         /* parser should have rejected this */
1162                         elog(ERROR, "window frame with value offset is not implemented");
1163                 }
1164                 else
1165                         Assert(false);
1166         }
1167         else
1168                 Assert(false);
1169 }
1170
1171
1172 /* -----------------
1173  * ExecWindowAgg
1174  *
1175  *      ExecWindowAgg receives tuples from its outer subplan and
1176  *      stores them into a tuplestore, then processes window functions.
1177  *      This node doesn't reduce nor qualify any row so the number of
1178  *      returned rows is exactly the same as its outer subplan's result
1179  *      (ignoring the case of SRFs in the targetlist, that is).
1180  * -----------------
1181  */
1182 TupleTableSlot *
1183 ExecWindowAgg(WindowAggState *winstate)
1184 {
1185         TupleTableSlot *result;
1186         ExprDoneCond isDone;
1187         ExprContext *econtext;
1188         int                     i;
1189         int                     numfuncs;
1190
1191         if (winstate->all_done)
1192                 return NULL;
1193
1194         /*
1195          * Check to see if we're still projecting out tuples from a previous
1196          * output tuple (because there is a function-returning-set in the
1197          * projection expressions).  If so, try to project another one.
1198          */
1199         if (winstate->ss.ps.ps_TupFromTlist)
1200         {
1201                 TupleTableSlot *result;
1202                 ExprDoneCond isDone;
1203
1204                 result = ExecProject(winstate->ss.ps.ps_ProjInfo, &isDone);
1205                 if (isDone == ExprMultipleResult)
1206                         return result;
1207                 /* Done with that source tuple... */
1208                 winstate->ss.ps.ps_TupFromTlist = false;
1209         }
1210
1211         /*
1212          * Compute frame offset values, if any, during first call.
1213          */
1214         if (winstate->all_first)
1215         {
1216                 int                     frameOptions = winstate->frameOptions;
1217                 ExprContext *econtext = winstate->ss.ps.ps_ExprContext;
1218                 Datum           value;
1219                 bool            isnull;
1220                 int16           len;
1221                 bool            byval;
1222
1223                 if (frameOptions & FRAMEOPTION_START_VALUE)
1224                 {
1225                         Assert(winstate->startOffset != NULL);
1226                         value = ExecEvalExprSwitchContext(winstate->startOffset,
1227                                                                                           econtext,
1228                                                                                           &isnull,
1229                                                                                           NULL);
1230                         if (isnull)
1231                                 ereport(ERROR,
1232                                                 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
1233                                                  errmsg("frame starting offset must not be null")));
1234                         /* copy value into query-lifespan context */
1235                         get_typlenbyval(exprType((Node *) winstate->startOffset->expr),
1236                                                         &len, &byval);
1237                         winstate->startOffsetValue = datumCopy(value, byval, len);
1238                         if (frameOptions & FRAMEOPTION_ROWS)
1239                         {
1240                                 /* value is known to be int8 */
1241                                 int64           offset = DatumGetInt64(value);
1242
1243                                 if (offset < 0)
1244                                         ereport(ERROR,
1245                                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1246                                           errmsg("frame starting offset must not be negative")));
1247                         }
1248                 }
1249                 if (frameOptions & FRAMEOPTION_END_VALUE)
1250                 {
1251                         Assert(winstate->endOffset != NULL);
1252                         value = ExecEvalExprSwitchContext(winstate->endOffset,
1253                                                                                           econtext,
1254                                                                                           &isnull,
1255                                                                                           NULL);
1256                         if (isnull)
1257                                 ereport(ERROR,
1258                                                 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
1259                                                  errmsg("frame ending offset must not be null")));
1260                         /* copy value into query-lifespan context */
1261                         get_typlenbyval(exprType((Node *) winstate->endOffset->expr),
1262                                                         &len, &byval);
1263                         winstate->endOffsetValue = datumCopy(value, byval, len);
1264                         if (frameOptions & FRAMEOPTION_ROWS)
1265                         {
1266                                 /* value is known to be int8 */
1267                                 int64           offset = DatumGetInt64(value);
1268
1269                                 if (offset < 0)
1270                                         ereport(ERROR,
1271                                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1272                                                 errmsg("frame ending offset must not be negative")));
1273                         }
1274                 }
1275                 winstate->all_first = false;
1276         }
1277
1278 restart:
1279         if (winstate->buffer == NULL)
1280         {
1281                 /* Initialize for first partition and set current row = 0 */
1282                 begin_partition(winstate);
1283                 /* If there are no input rows, we'll detect that and exit below */
1284         }
1285         else
1286         {
1287                 /* Advance current row within partition */
1288                 winstate->currentpos++;
1289                 /* This might mean that the frame moves, too */
1290                 winstate->framehead_valid = false;
1291                 winstate->frametail_valid = false;
1292         }
1293
1294         /*
1295          * Spool all tuples up to and including the current row, if we haven't
1296          * already
1297          */
1298         spool_tuples(winstate, winstate->currentpos);
1299
1300         /* Move to the next partition if we reached the end of this partition */
1301         if (winstate->partition_spooled &&
1302                 winstate->currentpos >= winstate->spooled_rows)
1303         {
1304                 release_partition(winstate);
1305
1306                 if (winstate->more_partitions)
1307                 {
1308                         begin_partition(winstate);
1309                         Assert(winstate->spooled_rows > 0);
1310                 }
1311                 else
1312                 {
1313                         winstate->all_done = true;
1314                         return NULL;
1315                 }
1316         }
1317
1318         /* final output execution is in ps_ExprContext */
1319         econtext = winstate->ss.ps.ps_ExprContext;
1320
1321         /* Clear the per-output-tuple context for current row */
1322         ResetExprContext(econtext);
1323
1324         /*
1325          * Read the current row from the tuplestore, and save in ScanTupleSlot.
1326          * (We can't rely on the outerplan's output slot because we may have to
1327          * read beyond the current row.  Also, we have to actually copy the row
1328          * out of the tuplestore, since window function evaluation might cause the
1329          * tuplestore to dump its state to disk.)
1330          *
1331          * Current row must be in the tuplestore, since we spooled it above.
1332          */
1333         tuplestore_select_read_pointer(winstate->buffer, winstate->current_ptr);
1334         if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1335                                                                  winstate->ss.ss_ScanTupleSlot))
1336                 elog(ERROR, "unexpected end of tuplestore");
1337
1338         /*
1339          * Evaluate true window functions
1340          */
1341         numfuncs = winstate->numfuncs;
1342         for (i = 0; i < numfuncs; i++)
1343         {
1344                 WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1345
1346                 if (perfuncstate->plain_agg)
1347                         continue;
1348                 eval_windowfunction(winstate, perfuncstate,
1349                           &(econtext->ecxt_aggvalues[perfuncstate->wfuncstate->wfuncno]),
1350                           &(econtext->ecxt_aggnulls[perfuncstate->wfuncstate->wfuncno]));
1351         }
1352
1353         /*
1354          * Evaluate aggregates
1355          */
1356         if (winstate->numaggs > 0)
1357                 eval_windowaggregates(winstate);
1358
1359         /*
1360          * Truncate any no-longer-needed rows from the tuplestore.
1361          */
1362         tuplestore_trim(winstate->buffer);
1363
1364         /*
1365          * Form and return a projection tuple using the windowfunc results and the
1366          * current row.  Setting ecxt_outertuple arranges that any Vars will be
1367          * evaluated with respect to that row.
1368          */
1369         econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
1370         result = ExecProject(winstate->ss.ps.ps_ProjInfo, &isDone);
1371
1372         if (isDone == ExprEndResult)
1373         {
1374                 /* SRF in tlist returned no rows, so advance to next input tuple */
1375                 goto restart;
1376         }
1377
1378         winstate->ss.ps.ps_TupFromTlist =
1379                 (isDone == ExprMultipleResult);
1380         return result;
1381 }
1382
1383 /* -----------------
1384  * ExecInitWindowAgg
1385  *
1386  *      Creates the run-time information for the WindowAgg node produced by the
1387  *      planner and initializes its outer subtree
1388  * -----------------
1389  */
1390 WindowAggState *
1391 ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
1392 {
1393         WindowAggState *winstate;
1394         Plan       *outerPlan;
1395         ExprContext *econtext;
1396         ExprContext *tmpcontext;
1397         WindowStatePerFunc perfunc;
1398         WindowStatePerAgg peragg;
1399         int                     numfuncs,
1400                                 wfuncno,
1401                                 numaggs,
1402                                 aggno;
1403         ListCell   *l;
1404
1405         /* check for unsupported flags */
1406         Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
1407
1408         /*
1409          * create state structure
1410          */
1411         winstate = makeNode(WindowAggState);
1412         winstate->ss.ps.plan = (Plan *) node;
1413         winstate->ss.ps.state = estate;
1414
1415         /*
1416          * Create expression contexts.  We need two, one for per-input-tuple
1417          * processing and one for per-output-tuple processing.  We cheat a little
1418          * by using ExecAssignExprContext() to build both.
1419          */
1420         ExecAssignExprContext(estate, &winstate->ss.ps);
1421         tmpcontext = winstate->ss.ps.ps_ExprContext;
1422         winstate->tmpcontext = tmpcontext;
1423         ExecAssignExprContext(estate, &winstate->ss.ps);
1424
1425         /* Create long-lived context for storage of partition-local memory etc */
1426         winstate->partcontext =
1427                 AllocSetContextCreate(CurrentMemoryContext,
1428                                                           "WindowAgg_Partition",
1429                                                           ALLOCSET_DEFAULT_MINSIZE,
1430                                                           ALLOCSET_DEFAULT_INITSIZE,
1431                                                           ALLOCSET_DEFAULT_MAXSIZE);
1432
1433         /* Create mid-lived context for aggregate trans values etc */
1434         winstate->aggcontext =
1435                 AllocSetContextCreate(CurrentMemoryContext,
1436                                                           "WindowAgg_Aggregates",
1437                                                           ALLOCSET_DEFAULT_MINSIZE,
1438                                                           ALLOCSET_DEFAULT_INITSIZE,
1439                                                           ALLOCSET_DEFAULT_MAXSIZE);
1440
1441         /*
1442          * tuple table initialization
1443          */
1444         ExecInitScanTupleSlot(estate, &winstate->ss);
1445         ExecInitResultTupleSlot(estate, &winstate->ss.ps);
1446         winstate->first_part_slot = ExecInitExtraTupleSlot(estate);
1447         winstate->agg_row_slot = ExecInitExtraTupleSlot(estate);
1448         winstate->temp_slot_1 = ExecInitExtraTupleSlot(estate);
1449         winstate->temp_slot_2 = ExecInitExtraTupleSlot(estate);
1450
1451         winstate->ss.ps.targetlist = (List *)
1452                 ExecInitExpr((Expr *) node->plan.targetlist,
1453                                          (PlanState *) winstate);
1454
1455         /*
1456          * WindowAgg nodes never have quals, since they can only occur at the
1457          * logical top level of a query (ie, after any WHERE or HAVING filters)
1458          */
1459         Assert(node->plan.qual == NIL);
1460         winstate->ss.ps.qual = NIL;
1461
1462         /*
1463          * initialize child nodes
1464          */
1465         outerPlan = outerPlan(node);
1466         outerPlanState(winstate) = ExecInitNode(outerPlan, estate, eflags);
1467
1468         /*
1469          * initialize source tuple type (which is also the tuple type that we'll
1470          * store in the tuplestore and use in all our working slots).
1471          */
1472         ExecAssignScanTypeFromOuterPlan(&winstate->ss);
1473
1474         ExecSetSlotDescriptor(winstate->first_part_slot,
1475                                                   winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
1476         ExecSetSlotDescriptor(winstate->agg_row_slot,
1477                                                   winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
1478         ExecSetSlotDescriptor(winstate->temp_slot_1,
1479                                                   winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
1480         ExecSetSlotDescriptor(winstate->temp_slot_2,
1481                                                   winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
1482
1483         /*
1484          * Initialize result tuple type and projection info.
1485          */
1486         ExecAssignResultTypeFromTL(&winstate->ss.ps);
1487         ExecAssignProjectionInfo(&winstate->ss.ps, NULL);
1488
1489         winstate->ss.ps.ps_TupFromTlist = false;
1490
1491         /* Set up data for comparing tuples */
1492         if (node->partNumCols > 0)
1493                 winstate->partEqfunctions = execTuplesMatchPrepare(node->partNumCols,
1494                                                                                                                 node->partOperators);
1495         if (node->ordNumCols > 0)
1496                 winstate->ordEqfunctions = execTuplesMatchPrepare(node->ordNumCols,
1497                                                                                                                   node->ordOperators);
1498
1499         /*
1500          * WindowAgg nodes use aggvalues and aggnulls as well as Agg nodes.
1501          */
1502         numfuncs = winstate->numfuncs;
1503         numaggs = winstate->numaggs;
1504         econtext = winstate->ss.ps.ps_ExprContext;
1505         econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numfuncs);
1506         econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numfuncs);
1507
1508         /*
1509          * allocate per-wfunc/per-agg state information.
1510          */
1511         perfunc = (WindowStatePerFunc) palloc0(sizeof(WindowStatePerFuncData) * numfuncs);
1512         peragg = (WindowStatePerAgg) palloc0(sizeof(WindowStatePerAggData) * numaggs);
1513         winstate->perfunc = perfunc;
1514         winstate->peragg = peragg;
1515
1516         wfuncno = -1;
1517         aggno = -1;
1518         foreach(l, winstate->funcs)
1519         {
1520                 WindowFuncExprState *wfuncstate = (WindowFuncExprState *) lfirst(l);
1521                 WindowFunc *wfunc = (WindowFunc *) wfuncstate->xprstate.expr;
1522                 WindowStatePerFunc perfuncstate;
1523                 AclResult       aclresult;
1524                 int                     i;
1525
1526                 if (wfunc->winref != node->winref)              /* planner screwed up? */
1527                         elog(ERROR, "WindowFunc with winref %u assigned to WindowAgg with winref %u",
1528                                  wfunc->winref, node->winref);
1529
1530                 /* Look for a previous duplicate window function */
1531                 for (i = 0; i <= wfuncno; i++)
1532                 {
1533                         if (equal(wfunc, perfunc[i].wfunc) &&
1534                                 !contain_volatile_functions((Node *) wfunc))
1535                                 break;
1536                 }
1537                 if (i <= wfuncno)
1538                 {
1539                         /* Found a match to an existing entry, so just mark it */
1540                         wfuncstate->wfuncno = i;
1541                         continue;
1542                 }
1543
1544                 /* Nope, so assign a new PerAgg record */
1545                 perfuncstate = &perfunc[++wfuncno];
1546
1547                 /* Mark WindowFunc state node with assigned index in the result array */
1548                 wfuncstate->wfuncno = wfuncno;
1549
1550                 /* Check permission to call window function */
1551                 aclresult = pg_proc_aclcheck(wfunc->winfnoid, GetUserId(),
1552                                                                          ACL_EXECUTE);
1553                 if (aclresult != ACLCHECK_OK)
1554                         aclcheck_error(aclresult, ACL_KIND_PROC,
1555                                                    get_func_name(wfunc->winfnoid));
1556
1557                 /* Fill in the perfuncstate data */
1558                 perfuncstate->wfuncstate = wfuncstate;
1559                 perfuncstate->wfunc = wfunc;
1560                 perfuncstate->numArguments = list_length(wfuncstate->args);
1561
1562                 fmgr_info_cxt(wfunc->winfnoid, &perfuncstate->flinfo,
1563                                           econtext->ecxt_per_query_memory);
1564                 fmgr_info_set_collation(wfunc->inputcollid, &perfuncstate->flinfo);
1565                 fmgr_info_set_expr((Node *) wfunc, &perfuncstate->flinfo);
1566
1567                 get_typlenbyval(wfunc->wintype,
1568                                                 &perfuncstate->resulttypeLen,
1569                                                 &perfuncstate->resulttypeByVal);
1570
1571                 /*
1572                  * If it's really just a plain aggregate function, we'll emulate the
1573                  * Agg environment for it.
1574                  */
1575                 perfuncstate->plain_agg = wfunc->winagg;
1576                 if (wfunc->winagg)
1577                 {
1578                         WindowStatePerAgg peraggstate;
1579
1580                         perfuncstate->aggno = ++aggno;
1581                         peraggstate = &winstate->peragg[aggno];
1582                         initialize_peragg(winstate, wfunc, peraggstate);
1583                         peraggstate->wfuncno = wfuncno;
1584                 }
1585                 else
1586                 {
1587                         WindowObject winobj = makeNode(WindowObjectData);
1588
1589                         winobj->winstate = winstate;
1590                         winobj->argstates = wfuncstate->args;
1591                         winobj->localmem = NULL;
1592                         perfuncstate->winobj = winobj;
1593                 }
1594         }
1595
1596         /* Update numfuncs, numaggs to match number of unique functions found */
1597         winstate->numfuncs = wfuncno + 1;
1598         winstate->numaggs = aggno + 1;
1599
1600         /* Set up WindowObject for aggregates, if needed */
1601         if (winstate->numaggs > 0)
1602         {
1603                 WindowObject agg_winobj = makeNode(WindowObjectData);
1604
1605                 agg_winobj->winstate = winstate;
1606                 agg_winobj->argstates = NIL;
1607                 agg_winobj->localmem = NULL;
1608                 /* make sure markptr = -1 to invalidate. It may not get used */
1609                 agg_winobj->markptr = -1;
1610                 agg_winobj->readptr = -1;
1611                 winstate->agg_winobj = agg_winobj;
1612         }
1613
1614         /* copy frame options to state node for easy access */
1615         winstate->frameOptions = node->frameOptions;
1616
1617         /* initialize frame bound offset expressions */
1618         winstate->startOffset = ExecInitExpr((Expr *) node->startOffset,
1619                                                                                  (PlanState *) winstate);
1620         winstate->endOffset = ExecInitExpr((Expr *) node->endOffset,
1621                                                                            (PlanState *) winstate);
1622
1623         winstate->all_first = true;
1624         winstate->partition_spooled = false;
1625         winstate->more_partitions = false;
1626
1627         return winstate;
1628 }
1629
1630 /* -----------------
1631  * ExecEndWindowAgg
1632  * -----------------
1633  */
1634 void
1635 ExecEndWindowAgg(WindowAggState *node)
1636 {
1637         PlanState  *outerPlan;
1638
1639         release_partition(node);
1640
1641         pfree(node->perfunc);
1642         pfree(node->peragg);
1643
1644         ExecClearTuple(node->ss.ss_ScanTupleSlot);
1645         ExecClearTuple(node->first_part_slot);
1646         ExecClearTuple(node->agg_row_slot);
1647         ExecClearTuple(node->temp_slot_1);
1648         ExecClearTuple(node->temp_slot_2);
1649
1650         /*
1651          * Free both the expr contexts.
1652          */
1653         ExecFreeExprContext(&node->ss.ps);
1654         node->ss.ps.ps_ExprContext = node->tmpcontext;
1655         ExecFreeExprContext(&node->ss.ps);
1656
1657         MemoryContextDelete(node->partcontext);
1658         MemoryContextDelete(node->aggcontext);
1659
1660         outerPlan = outerPlanState(node);
1661         ExecEndNode(outerPlan);
1662 }
1663
1664 /* -----------------
1665  * ExecRescanWindowAgg
1666  * -----------------
1667  */
1668 void
1669 ExecReScanWindowAgg(WindowAggState *node)
1670 {
1671         ExprContext *econtext = node->ss.ps.ps_ExprContext;
1672
1673         node->all_done = false;
1674
1675         node->ss.ps.ps_TupFromTlist = false;
1676         node->all_first = true;
1677
1678         /* release tuplestore et al */
1679         release_partition(node);
1680
1681         /* release all temp tuples, but especially first_part_slot */
1682         ExecClearTuple(node->ss.ss_ScanTupleSlot);
1683         ExecClearTuple(node->first_part_slot);
1684         ExecClearTuple(node->agg_row_slot);
1685         ExecClearTuple(node->temp_slot_1);
1686         ExecClearTuple(node->temp_slot_2);
1687
1688         /* Forget current wfunc values */
1689         MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numfuncs);
1690         MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numfuncs);
1691
1692         /*
1693          * if chgParam of subnode is not null then plan will be re-scanned by
1694          * first ExecProcNode.
1695          */
1696         if (node->ss.ps.lefttree->chgParam == NULL)
1697                 ExecReScan(node->ss.ps.lefttree);
1698 }
1699
1700 /*
1701  * initialize_peragg
1702  *
1703  * Almost same as in nodeAgg.c, except we don't support DISTINCT currently.
1704  */
1705 static WindowStatePerAggData *
1706 initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
1707                                   WindowStatePerAgg peraggstate)
1708 {
1709         Oid                     inputTypes[FUNC_MAX_ARGS];
1710         int                     numArguments;
1711         HeapTuple       aggTuple;
1712         Form_pg_aggregate aggform;
1713         Oid                     aggtranstype;
1714         AclResult       aclresult;
1715         Oid                     transfn_oid,
1716                                 finalfn_oid;
1717         Expr       *transfnexpr,
1718                            *finalfnexpr;
1719         Datum           textInitVal;
1720         int                     i;
1721         ListCell   *lc;
1722
1723         numArguments = list_length(wfunc->args);
1724
1725         i = 0;
1726         foreach(lc, wfunc->args)
1727         {
1728                 inputTypes[i++] = exprType((Node *) lfirst(lc));
1729         }
1730
1731         aggTuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(wfunc->winfnoid));
1732         if (!HeapTupleIsValid(aggTuple))
1733                 elog(ERROR, "cache lookup failed for aggregate %u",
1734                          wfunc->winfnoid);
1735         aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
1736
1737         /*
1738          * ExecInitWindowAgg already checked permission to call aggregate function
1739          * ... but we still need to check the component functions
1740          */
1741
1742         peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn;
1743         peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
1744
1745         /* Check that aggregate owner has permission to call component fns */
1746         {
1747                 HeapTuple       procTuple;
1748                 Oid                     aggOwner;
1749
1750                 procTuple = SearchSysCache1(PROCOID,
1751                                                                         ObjectIdGetDatum(wfunc->winfnoid));
1752                 if (!HeapTupleIsValid(procTuple))
1753                         elog(ERROR, "cache lookup failed for function %u",
1754                                  wfunc->winfnoid);
1755                 aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
1756                 ReleaseSysCache(procTuple);
1757
1758                 aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
1759                                                                          ACL_EXECUTE);
1760                 if (aclresult != ACLCHECK_OK)
1761                         aclcheck_error(aclresult, ACL_KIND_PROC,
1762                                                    get_func_name(transfn_oid));
1763                 if (OidIsValid(finalfn_oid))
1764                 {
1765                         aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
1766                                                                                  ACL_EXECUTE);
1767                         if (aclresult != ACLCHECK_OK)
1768                                 aclcheck_error(aclresult, ACL_KIND_PROC,
1769                                                            get_func_name(finalfn_oid));
1770                 }
1771         }
1772
1773         /* resolve actual type of transition state, if polymorphic */
1774         aggtranstype = aggform->aggtranstype;
1775         if (IsPolymorphicType(aggtranstype))
1776         {
1777                 /* have to fetch the agg's declared input types... */
1778                 Oid                *declaredArgTypes;
1779                 int                     agg_nargs;
1780
1781                 get_func_signature(wfunc->winfnoid,
1782                                                    &declaredArgTypes, &agg_nargs);
1783                 Assert(agg_nargs == numArguments);
1784                 aggtranstype = enforce_generic_type_consistency(inputTypes,
1785                                                                                                                 declaredArgTypes,
1786                                                                                                                 agg_nargs,
1787                                                                                                                 aggtranstype,
1788                                                                                                                 false);
1789                 pfree(declaredArgTypes);
1790         }
1791
1792         /* build expression trees using actual argument & result types */
1793         build_aggregate_fnexprs(inputTypes,
1794                                                         numArguments,
1795                                                         aggtranstype,
1796                                                         wfunc->wintype,
1797                                                         wfunc->inputcollid,
1798                                                         transfn_oid,
1799                                                         finalfn_oid,
1800                                                         &transfnexpr,
1801                                                         &finalfnexpr);
1802
1803         fmgr_info(transfn_oid, &peraggstate->transfn);
1804         fmgr_info_set_collation(wfunc->inputcollid, &peraggstate->transfn);
1805         fmgr_info_set_expr((Node *) transfnexpr, &peraggstate->transfn);
1806
1807         if (OidIsValid(finalfn_oid))
1808         {
1809                 fmgr_info(finalfn_oid, &peraggstate->finalfn);
1810                 fmgr_info_set_collation(wfunc->inputcollid, &peraggstate->finalfn);
1811                 fmgr_info_set_expr((Node *) finalfnexpr, &peraggstate->finalfn);
1812         }
1813
1814         get_typlenbyval(wfunc->wintype,
1815                                         &peraggstate->resulttypeLen,
1816                                         &peraggstate->resulttypeByVal);
1817         get_typlenbyval(aggtranstype,
1818                                         &peraggstate->transtypeLen,
1819                                         &peraggstate->transtypeByVal);
1820
1821         /*
1822          * initval is potentially null, so don't try to access it as a struct
1823          * field. Must do it the hard way with SysCacheGetAttr.
1824          */
1825         textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,
1826                                                                   Anum_pg_aggregate_agginitval,
1827                                                                   &peraggstate->initValueIsNull);
1828
1829         if (peraggstate->initValueIsNull)
1830                 peraggstate->initValue = (Datum) 0;
1831         else
1832                 peraggstate->initValue = GetAggInitVal(textInitVal,
1833                                                                                            aggtranstype);
1834
1835         /*
1836          * If the transfn is strict and the initval is NULL, make sure input type
1837          * and transtype are the same (or at least binary-compatible), so that
1838          * it's OK to use the first input value as the initial transValue.  This
1839          * should have been checked at agg definition time, but just in case...
1840          */
1841         if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull)
1842         {
1843                 if (numArguments < 1 ||
1844                         !IsBinaryCoercible(inputTypes[0], aggtranstype))
1845                         ereport(ERROR,
1846                                         (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
1847                                          errmsg("aggregate %u needs to have compatible input type and transition type",
1848                                                         wfunc->winfnoid)));
1849         }
1850
1851         ReleaseSysCache(aggTuple);
1852
1853         return peraggstate;
1854 }
1855
1856 static Datum
1857 GetAggInitVal(Datum textInitVal, Oid transtype)
1858 {
1859         Oid                     typinput,
1860                                 typioparam;
1861         char       *strInitVal;
1862         Datum           initVal;
1863
1864         getTypeInputInfo(transtype, &typinput, &typioparam);
1865         strInitVal = TextDatumGetCString(textInitVal);
1866         initVal = OidInputFunctionCall(typinput, strInitVal,
1867                                                                    typioparam, -1);
1868         pfree(strInitVal);
1869         return initVal;
1870 }
1871
1872 /*
1873  * are_peers
1874  * compare two rows to see if they are equal according to the ORDER BY clause
1875  *
1876  * NB: this does not consider the window frame mode.
1877  */
1878 static bool
1879 are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
1880                   TupleTableSlot *slot2)
1881 {
1882         WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
1883
1884         /* If no ORDER BY, all rows are peers with each other */
1885         if (node->ordNumCols == 0)
1886                 return true;
1887
1888         return execTuplesMatch(slot1, slot2,
1889                                                    node->ordNumCols, node->ordColIdx,
1890                                                    winstate->ordEqfunctions,
1891                                                    winstate->tmpcontext->ecxt_per_tuple_memory);
1892 }
1893
1894 /*
1895  * window_gettupleslot
1896  *      Fetch the pos'th tuple of the current partition into the slot,
1897  *      using the winobj's read pointer
1898  *
1899  * Returns true if successful, false if no such row
1900  */
1901 static bool
1902 window_gettupleslot(WindowObject winobj, int64 pos, TupleTableSlot *slot)
1903 {
1904         WindowAggState *winstate = winobj->winstate;
1905         MemoryContext oldcontext;
1906
1907         /* Don't allow passing -1 to spool_tuples here */
1908         if (pos < 0)
1909                 return false;
1910
1911         /* If necessary, fetch the tuple into the spool */
1912         spool_tuples(winstate, pos);
1913
1914         if (pos >= winstate->spooled_rows)
1915                 return false;
1916
1917         if (pos < winobj->markpos)
1918                 elog(ERROR, "cannot fetch row before WindowObject's mark position");
1919
1920         oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
1921
1922         tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
1923
1924         /*
1925          * There's no API to refetch the tuple at the current position. We have to
1926          * move one tuple forward, and then one backward.  (We don't do it the
1927          * other way because we might try to fetch the row before our mark, which
1928          * isn't allowed.)  XXX this case could stand to be optimized.
1929          */
1930         if (winobj->seekpos == pos)
1931         {
1932                 tuplestore_advance(winstate->buffer, true);
1933                 winobj->seekpos++;
1934         }
1935
1936         while (winobj->seekpos > pos)
1937         {
1938                 if (!tuplestore_gettupleslot(winstate->buffer, false, true, slot))
1939                         elog(ERROR, "unexpected end of tuplestore");
1940                 winobj->seekpos--;
1941         }
1942
1943         while (winobj->seekpos < pos)
1944         {
1945                 if (!tuplestore_gettupleslot(winstate->buffer, true, true, slot))
1946                         elog(ERROR, "unexpected end of tuplestore");
1947                 winobj->seekpos++;
1948         }
1949
1950         MemoryContextSwitchTo(oldcontext);
1951
1952         return true;
1953 }
1954
1955
1956 /***********************************************************************
1957  * API exposed to window functions
1958  ***********************************************************************/
1959
1960
1961 /*
1962  * WinGetPartitionLocalMemory
1963  *              Get working memory that lives till end of partition processing
1964  *
1965  * On first call within a given partition, this allocates and zeroes the
1966  * requested amount of space.  Subsequent calls just return the same chunk.
1967  *
1968  * Memory obtained this way is normally used to hold state that should be
1969  * automatically reset for each new partition.  If a window function wants
1970  * to hold state across the whole query, fcinfo->fn_extra can be used in the
1971  * usual way for that.
1972  */
1973 void *
1974 WinGetPartitionLocalMemory(WindowObject winobj, Size sz)
1975 {
1976         Assert(WindowObjectIsValid(winobj));
1977         if (winobj->localmem == NULL)
1978                 winobj->localmem =
1979                         MemoryContextAllocZero(winobj->winstate->partcontext, sz);
1980         return winobj->localmem;
1981 }
1982
1983 /*
1984  * WinGetCurrentPosition
1985  *              Return the current row's position (counting from 0) within the current
1986  *              partition.
1987  */
1988 int64
1989 WinGetCurrentPosition(WindowObject winobj)
1990 {
1991         Assert(WindowObjectIsValid(winobj));
1992         return winobj->winstate->currentpos;
1993 }
1994
1995 /*
1996  * WinGetPartitionRowCount
1997  *              Return total number of rows contained in the current partition.
1998  *
1999  * Note: this is a relatively expensive operation because it forces the
2000  * whole partition to be "spooled" into the tuplestore at once.  Once
2001  * executed, however, additional calls within the same partition are cheap.
2002  */
2003 int64
2004 WinGetPartitionRowCount(WindowObject winobj)
2005 {
2006         Assert(WindowObjectIsValid(winobj));
2007         spool_tuples(winobj->winstate, -1);
2008         return winobj->winstate->spooled_rows;
2009 }
2010
2011 /*
2012  * WinSetMarkPosition
2013  *              Set the "mark" position for the window object, which is the oldest row
2014  *              number (counting from 0) it is allowed to fetch during all subsequent
2015  *              operations within the current partition.
2016  *
2017  * Window functions do not have to call this, but are encouraged to move the
2018  * mark forward when possible to keep the tuplestore size down and prevent
2019  * having to spill rows to disk.
2020  */
2021 void
2022 WinSetMarkPosition(WindowObject winobj, int64 markpos)
2023 {
2024         WindowAggState *winstate;
2025
2026         Assert(WindowObjectIsValid(winobj));
2027         winstate = winobj->winstate;
2028
2029         if (markpos < winobj->markpos)
2030                 elog(ERROR, "cannot move WindowObject's mark position backward");
2031         tuplestore_select_read_pointer(winstate->buffer, winobj->markptr);
2032         while (markpos > winobj->markpos)
2033         {
2034                 tuplestore_advance(winstate->buffer, true);
2035                 winobj->markpos++;
2036         }
2037         tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
2038         while (markpos > winobj->seekpos)
2039         {
2040                 tuplestore_advance(winstate->buffer, true);
2041                 winobj->seekpos++;
2042         }
2043 }
2044
2045 /*
2046  * WinRowsArePeers
2047  *              Compare two rows (specified by absolute position in window) to see
2048  *              if they are equal according to the ORDER BY clause.
2049  *
2050  * NB: this does not consider the window frame mode.
2051  */
2052 bool
2053 WinRowsArePeers(WindowObject winobj, int64 pos1, int64 pos2)
2054 {
2055         WindowAggState *winstate;
2056         WindowAgg  *node;
2057         TupleTableSlot *slot1;
2058         TupleTableSlot *slot2;
2059         bool            res;
2060
2061         Assert(WindowObjectIsValid(winobj));
2062         winstate = winobj->winstate;
2063         node = (WindowAgg *) winstate->ss.ps.plan;
2064
2065         /* If no ORDER BY, all rows are peers; don't bother to fetch them */
2066         if (node->ordNumCols == 0)
2067                 return true;
2068
2069         slot1 = winstate->temp_slot_1;
2070         slot2 = winstate->temp_slot_2;
2071
2072         if (!window_gettupleslot(winobj, pos1, slot1))
2073                 elog(ERROR, "specified position is out of window: " INT64_FORMAT,
2074                          pos1);
2075         if (!window_gettupleslot(winobj, pos2, slot2))
2076                 elog(ERROR, "specified position is out of window: " INT64_FORMAT,
2077                          pos2);
2078
2079         res = are_peers(winstate, slot1, slot2);
2080
2081         ExecClearTuple(slot1);
2082         ExecClearTuple(slot2);
2083
2084         return res;
2085 }
2086
2087 /*
2088  * WinGetFuncArgInPartition
2089  *              Evaluate a window function's argument expression on a specified
2090  *              row of the partition.  The row is identified in lseek(2) style,
2091  *              i.e. relative to the current, first, or last row.
2092  *
2093  * argno: argument number to evaluate (counted from 0)
2094  * relpos: signed rowcount offset from the seek position
2095  * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
2096  * set_mark: If the row is found and set_mark is true, the mark is moved to
2097  *              the row as a side-effect.
2098  * isnull: output argument, receives isnull status of result
2099  * isout: output argument, set to indicate whether target row position
2100  *              is out of partition (can pass NULL if caller doesn't care about this)
2101  *
2102  * Specifying a nonexistent row is not an error, it just causes a null result
2103  * (plus setting *isout true, if isout isn't NULL).
2104  */
2105 Datum
2106 WinGetFuncArgInPartition(WindowObject winobj, int argno,
2107                                                  int relpos, int seektype, bool set_mark,
2108                                                  bool *isnull, bool *isout)
2109 {
2110         WindowAggState *winstate;
2111         ExprContext *econtext;
2112         TupleTableSlot *slot;
2113         bool            gottuple;
2114         int64           abs_pos;
2115
2116         Assert(WindowObjectIsValid(winobj));
2117         winstate = winobj->winstate;
2118         econtext = winstate->ss.ps.ps_ExprContext;
2119         slot = winstate->temp_slot_1;
2120
2121         switch (seektype)
2122         {
2123                 case WINDOW_SEEK_CURRENT:
2124                         abs_pos = winstate->currentpos + relpos;
2125                         break;
2126                 case WINDOW_SEEK_HEAD:
2127                         abs_pos = relpos;
2128                         break;
2129                 case WINDOW_SEEK_TAIL:
2130                         spool_tuples(winstate, -1);
2131                         abs_pos = winstate->spooled_rows - 1 + relpos;
2132                         break;
2133                 default:
2134                         elog(ERROR, "unrecognized window seek type: %d", seektype);
2135                         abs_pos = 0;            /* keep compiler quiet */
2136                         break;
2137         }
2138
2139         gottuple = window_gettupleslot(winobj, abs_pos, slot);
2140
2141         if (!gottuple)
2142         {
2143                 if (isout)
2144                         *isout = true;
2145                 *isnull = true;
2146                 return (Datum) 0;
2147         }
2148         else
2149         {
2150                 if (isout)
2151                         *isout = false;
2152                 if (set_mark)
2153                 {
2154                         int                     frameOptions = winstate->frameOptions;
2155                         int64           mark_pos = abs_pos;
2156
2157                         /*
2158                          * In RANGE mode with a moving frame head, we must not let the
2159                          * mark advance past frameheadpos, since that row has to be
2160                          * fetchable during future update_frameheadpos calls.
2161                          *
2162                          * XXX it is very ugly to pollute window functions' marks with
2163                          * this consideration; it could for instance mask a logic bug that
2164                          * lets a window function fetch rows before what it had claimed
2165                          * was its mark.  Perhaps use a separate mark for frame head
2166                          * probes?
2167                          */
2168                         if ((frameOptions & FRAMEOPTION_RANGE) &&
2169                                 !(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
2170                         {
2171                                 update_frameheadpos(winobj, winstate->temp_slot_2);
2172                                 if (mark_pos > winstate->frameheadpos)
2173                                         mark_pos = winstate->frameheadpos;
2174                         }
2175                         WinSetMarkPosition(winobj, mark_pos);
2176                 }
2177                 econtext->ecxt_outertuple = slot;
2178                 return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
2179                                                         econtext, isnull, NULL);
2180         }
2181 }
2182
2183 /*
2184  * WinGetFuncArgInFrame
2185  *              Evaluate a window function's argument expression on a specified
2186  *              row of the window frame.  The row is identified in lseek(2) style,
2187  *              i.e. relative to the current, first, or last row.
2188  *
2189  * argno: argument number to evaluate (counted from 0)
2190  * relpos: signed rowcount offset from the seek position
2191  * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
2192  * set_mark: If the row is found and set_mark is true, the mark is moved to
2193  *              the row as a side-effect.
2194  * isnull: output argument, receives isnull status of result
2195  * isout: output argument, set to indicate whether target row position
2196  *              is out of frame (can pass NULL if caller doesn't care about this)
2197  *
2198  * Specifying a nonexistent row is not an error, it just causes a null result
2199  * (plus setting *isout true, if isout isn't NULL).
2200  */
2201 Datum
2202 WinGetFuncArgInFrame(WindowObject winobj, int argno,
2203                                          int relpos, int seektype, bool set_mark,
2204                                          bool *isnull, bool *isout)
2205 {
2206         WindowAggState *winstate;
2207         ExprContext *econtext;
2208         TupleTableSlot *slot;
2209         bool            gottuple;
2210         int64           abs_pos;
2211
2212         Assert(WindowObjectIsValid(winobj));
2213         winstate = winobj->winstate;
2214         econtext = winstate->ss.ps.ps_ExprContext;
2215         slot = winstate->temp_slot_1;
2216
2217         switch (seektype)
2218         {
2219                 case WINDOW_SEEK_CURRENT:
2220                         abs_pos = winstate->currentpos + relpos;
2221                         break;
2222                 case WINDOW_SEEK_HEAD:
2223                         update_frameheadpos(winobj, slot);
2224                         abs_pos = winstate->frameheadpos + relpos;
2225                         break;
2226                 case WINDOW_SEEK_TAIL:
2227                         update_frametailpos(winobj, slot);
2228                         abs_pos = winstate->frametailpos + relpos;
2229                         break;
2230                 default:
2231                         elog(ERROR, "unrecognized window seek type: %d", seektype);
2232                         abs_pos = 0;            /* keep compiler quiet */
2233                         break;
2234         }
2235
2236         gottuple = window_gettupleslot(winobj, abs_pos, slot);
2237         if (gottuple)
2238                 gottuple = row_is_in_frame(winstate, abs_pos, slot);
2239
2240         if (!gottuple)
2241         {
2242                 if (isout)
2243                         *isout = true;
2244                 *isnull = true;
2245                 return (Datum) 0;
2246         }
2247         else
2248         {
2249                 if (isout)
2250                         *isout = false;
2251                 if (set_mark)
2252                 {
2253                         int                     frameOptions = winstate->frameOptions;
2254                         int64           mark_pos = abs_pos;
2255
2256                         /*
2257                          * In RANGE mode with a moving frame head, we must not let the
2258                          * mark advance past frameheadpos, since that row has to be
2259                          * fetchable during future update_frameheadpos calls.
2260                          *
2261                          * XXX it is very ugly to pollute window functions' marks with
2262                          * this consideration; it could for instance mask a logic bug that
2263                          * lets a window function fetch rows before what it had claimed
2264                          * was its mark.  Perhaps use a separate mark for frame head
2265                          * probes?
2266                          */
2267                         if ((frameOptions & FRAMEOPTION_RANGE) &&
2268                                 !(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
2269                         {
2270                                 update_frameheadpos(winobj, winstate->temp_slot_2);
2271                                 if (mark_pos > winstate->frameheadpos)
2272                                         mark_pos = winstate->frameheadpos;
2273                         }
2274                         WinSetMarkPosition(winobj, mark_pos);
2275                 }
2276                 econtext->ecxt_outertuple = slot;
2277                 return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
2278                                                         econtext, isnull, NULL);
2279         }
2280 }
2281
2282 /*
2283  * WinGetFuncArgCurrent
2284  *              Evaluate a window function's argument expression on the current row.
2285  *
2286  * argno: argument number to evaluate (counted from 0)
2287  * isnull: output argument, receives isnull status of result
2288  *
2289  * Note: this isn't quite equivalent to WinGetFuncArgInPartition or
2290  * WinGetFuncArgInFrame targeting the current row, because it will succeed
2291  * even if the WindowObject's mark has been set beyond the current row.
2292  * This should generally be used for "ordinary" arguments of a window
2293  * function, such as the offset argument of lead() or lag().
2294  */
2295 Datum
2296 WinGetFuncArgCurrent(WindowObject winobj, int argno, bool *isnull)
2297 {
2298         WindowAggState *winstate;
2299         ExprContext *econtext;
2300
2301         Assert(WindowObjectIsValid(winobj));
2302         winstate = winobj->winstate;
2303
2304         econtext = winstate->ss.ps.ps_ExprContext;
2305
2306         econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
2307         return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
2308                                                 econtext, isnull, NULL);
2309 }