OSDN Git Service

Add some basic support for window frame clauses to the window-functions
[pg-rex/syncrep.git] / src / backend / executor / nodeWindowAgg.c
1 /*-------------------------------------------------------------------------
2  *
3  * nodeWindowAgg.c
4  *        routines to handle WindowAgg nodes.
5  *
6  * A WindowAgg node evaluates "window functions" across suitable partitions
7  * of the input tuple set.  Any one WindowAgg works for just a single window
8  * specification, though it can evaluate multiple window functions sharing
9  * identical window specifications.  The input tuples are required to be
10  * delivered in sorted order, with the PARTITION BY columns (if any) as
11  * major sort keys and the ORDER BY columns (if any) as minor sort keys.
12  * (The planner generates a stack of WindowAggs with intervening Sort nodes
13  * as needed, if a query involves more than one window specification.)
14  *
15  * Since window functions can require access to any or all of the rows in
16  * the current partition, we accumulate rows of the partition into a
17  * tuplestore.  The window functions are called using the WindowObject API
18  * so that they can access those rows as needed.
19  *
20  * We also support using plain aggregate functions as window functions.
21  * For these, the regular Agg-node environment is emulated for each partition.
22  * As required by the SQL spec, the output represents the value of the
23  * aggregate function over all rows in the current row's window frame.
24  *
25  *
26  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
27  * Portions Copyright (c) 1994, Regents of the University of California
28  *
29  * IDENTIFICATION
30  *        $PostgreSQL: pgsql/src/backend/executor/nodeWindowAgg.c,v 1.2 2008/12/31 00:08:35 tgl Exp $
31  *
32  *-------------------------------------------------------------------------
33  */
34 #include "postgres.h"
35
36 #include "catalog/pg_aggregate.h"
37 #include "catalog/pg_proc.h"
38 #include "catalog/pg_type.h"
39 #include "executor/executor.h"
40 #include "executor/nodeWindowAgg.h"
41 #include "miscadmin.h"
42 #include "nodes/nodeFuncs.h"
43 #include "optimizer/clauses.h"
44 #include "parser/parse_agg.h"
45 #include "parser/parse_coerce.h"
46 #include "utils/acl.h"
47 #include "utils/builtins.h"
48 #include "utils/datum.h"
49 #include "utils/lsyscache.h"
50 #include "utils/memutils.h"
51 #include "utils/syscache.h"
52 #include "windowapi.h"
53
54 /*
55  * All the window function APIs are called with this object, which is passed
56  * to window functions as fcinfo->context.
57  */
58 typedef struct WindowObjectData
59 {
60         NodeTag         type;
61         WindowAggState *winstate;       /* parent WindowAggState */
62         List       *argstates;          /* ExprState trees for fn's arguments */
63         void       *localmem;           /* WinGetPartitionLocalMemory's chunk */
64         int                     markptr;                /* tuplestore mark pointer for this fn */
65         int                     readptr;                /* tuplestore read pointer for this fn */
66         int64           markpos;                /* row that markptr is positioned on */
67         int64           seekpos;                /* row that readptr is positioned on */
68 } WindowObjectData;
69
70 /*
71  * We have one WindowStatePerFunc struct for each window function and
72  * window aggregate handled by this node.
73  */
74 typedef struct WindowStatePerFuncData
75 {
76         /* Links to WindowFunc expr and state nodes this working state is for */
77         WindowFuncExprState *wfuncstate;
78         WindowFunc         *wfunc;
79
80         int                     numArguments;   /* number of arguments */
81
82         FmgrInfo        flinfo;                 /* fmgr lookup data for window function */
83
84         /*
85          * We need the len and byval info for the result of each function
86          * in order to know how to copy/delete values.
87          */
88         int16           resulttypeLen;
89         bool            resulttypeByVal;
90
91         bool            plain_agg;              /* is it just a plain aggregate function? */
92         int                     aggno;                  /* if so, index of its PerAggData */
93
94         WindowObject    winobj;         /* object used in window function API */
95 } WindowStatePerFuncData;
96
97 /*
98  * For plain aggregate window functions, we also have one of these.
99  */
100 typedef struct WindowStatePerAggData
101 {
102         /* Oids of transfer functions */
103         Oid                     transfn_oid;
104         Oid                     finalfn_oid;    /* may be InvalidOid */
105
106         /*
107          * fmgr lookup data for transfer functions --- only valid when
108          * corresponding oid is not InvalidOid.  Note in particular that fn_strict
109          * flags are kept here.
110          */
111         FmgrInfo        transfn;
112         FmgrInfo        finalfn;
113
114         /*
115          * initial value from pg_aggregate entry
116          */
117         Datum           initValue;
118         bool            initValueIsNull;
119
120         /*
121          * cached value for current frame boundaries
122          */
123         Datum           resultValue;
124         bool            resultValueIsNull;
125
126         /*
127          * We need the len and byval info for the agg's input, result, and
128          * transition data types in order to know how to copy/delete values.
129          */
130         int16           inputtypeLen,
131                                 resulttypeLen,
132                                 transtypeLen;
133         bool            inputtypeByVal,
134                                 resulttypeByVal,
135                                 transtypeByVal;
136
137         int                     wfuncno;                /* index of associated PerFuncData */
138
139         /* Current transition value */
140         Datum           transValue;             /* current transition value */
141         bool            transValueIsNull;
142
143         bool            noTransValue;   /* true if transValue not set yet */
144 } WindowStatePerAggData;
145
146 static void initialize_windowaggregate(WindowAggState *winstate,
147                                                                            WindowStatePerFunc perfuncstate,
148                                                                            WindowStatePerAgg peraggstate);
149 static void advance_windowaggregate(WindowAggState *winstate,
150                                                                         WindowStatePerFunc perfuncstate,
151                                                                         WindowStatePerAgg peraggstate);
152 static void finalize_windowaggregate(WindowAggState *winstate,
153                                                                          WindowStatePerFunc perfuncstate,
154                                                                          WindowStatePerAgg peraggstate,
155                                                                          Datum *result, bool *isnull);
156
157 static void eval_windowaggregates(WindowAggState *winstate);
158 static void eval_windowfunction(WindowAggState *winstate,
159                                                                 WindowStatePerFunc perfuncstate,
160                                                                 Datum *result, bool *isnull);
161
162 static void begin_partition(WindowAggState *winstate);
163 static void spool_tuples(WindowAggState *winstate, int64 pos);
164 static void release_partition(WindowAggState *winstate);
165
166 static bool row_is_in_frame(WindowAggState *winstate, int64 pos,
167                                                         TupleTableSlot *slot);
168 static void update_frametailpos(WindowObject winobj, TupleTableSlot *slot);
169
170 static WindowStatePerAggData *initialize_peragg(WindowAggState *winstate,
171                                                                                                 WindowFunc *wfunc,
172                                                                                                 WindowStatePerAgg peraggstate);
173 static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
174
175 static bool are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
176                                           TupleTableSlot *slot2);
177 static bool window_gettupleslot(WindowObject winobj, int64 pos,
178                                                                 TupleTableSlot *slot);
179
180
181 /*
182  * initialize_windowaggregate
183  * parallel to initialize_aggregate in nodeAgg.c
184  */
185 static void
186 initialize_windowaggregate(WindowAggState *winstate,
187                                                    WindowStatePerFunc perfuncstate,
188                                                    WindowStatePerAgg peraggstate)
189 {
190         MemoryContext           oldContext;
191
192         if (peraggstate->initValueIsNull)
193                 peraggstate->transValue = peraggstate->initValue;
194         else
195         {
196                 oldContext = MemoryContextSwitchTo(winstate->wincontext);
197                 peraggstate->transValue = datumCopy(peraggstate->initValue,
198                                                                                         peraggstate->transtypeByVal,
199                                                                                         peraggstate->transtypeLen);
200                 MemoryContextSwitchTo(oldContext);
201         }
202         peraggstate->transValueIsNull = peraggstate->initValueIsNull;
203         peraggstate->noTransValue = peraggstate->initValueIsNull;
204         peraggstate->resultValueIsNull = true;
205 }
206
207 /*
208  * advance_windowaggregate
209  * parallel to advance_aggregate in nodeAgg.c
210  */
211 static void
212 advance_windowaggregate(WindowAggState *winstate,
213                                                 WindowStatePerFunc perfuncstate,
214                                                 WindowStatePerAgg peraggstate)
215 {
216         WindowFuncExprState        *wfuncstate = perfuncstate->wfuncstate;
217         int                                             numArguments = perfuncstate->numArguments;
218         FunctionCallInfoData    fcinfodata;
219         FunctionCallInfo                fcinfo = &fcinfodata;
220         Datum                                   newVal;
221         ListCell                           *arg;
222         int                                             i;
223         MemoryContext                   oldContext;
224         ExprContext *econtext = winstate->tmpcontext;
225
226         oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
227
228         /* We start from 1, since the 0th arg will be the transition value */
229         i = 1;
230         foreach(arg, wfuncstate->args)
231         {
232                 ExprState          *argstate = (ExprState *) lfirst(arg);
233
234                 fcinfo->arg[i] = ExecEvalExpr(argstate, econtext,
235                                                                           &fcinfo->argnull[i], NULL);
236                 i++;
237         }
238
239         if (peraggstate->transfn.fn_strict)
240         {
241                 /*
242                  * For a strict transfn, nothing happens when there's a NULL input; we
243                  * just keep the prior transValue.
244                  */
245                 for (i = 1; i <= numArguments; i++)
246                 {
247                         if (fcinfo->argnull[i])
248                         {
249                                 MemoryContextSwitchTo(oldContext);
250                                 return;
251                         }
252                 }
253                 if (peraggstate->noTransValue)
254                 {
255                         /*
256                          * transValue has not been initialized. This is the first non-NULL
257                          * input value. We use it as the initial value for transValue. (We
258                          * already checked that the agg's input type is binary-compatible
259                          * with its transtype, so straight copy here is OK.)
260                          *
261                          * We must copy the datum into wincontext if it is pass-by-ref. We
262                          * do not need to pfree the old transValue, since it's NULL.
263                          */
264                         MemoryContextSwitchTo(winstate->wincontext);
265                         peraggstate->transValue = datumCopy(fcinfo->arg[1],
266                                                                                          peraggstate->transtypeByVal,
267                                                                                          peraggstate->transtypeLen);
268                         peraggstate->transValueIsNull = false;
269                         peraggstate->noTransValue = false;
270                         MemoryContextSwitchTo(oldContext);
271                         return;
272                 }
273                 if (peraggstate->transValueIsNull)
274                 {
275                         /*
276                          * Don't call a strict function with NULL inputs.  Note it is
277                          * possible to get here despite the above tests, if the transfn is
278                          * strict *and* returned a NULL on a prior cycle. If that happens
279                          * we will propagate the NULL all the way to the end.
280                          */
281                         MemoryContextSwitchTo(oldContext);
282                         return;
283                 }
284         }
285
286         /*
287          * OK to call the transition function
288          */
289         InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn),
290                                                          numArguments + 1,
291                                                          (void *) winstate, NULL);
292         fcinfo->arg[0] = peraggstate->transValue;
293         fcinfo->argnull[0] = peraggstate->transValueIsNull;
294         newVal = FunctionCallInvoke(fcinfo);
295
296         /*
297          * If pass-by-ref datatype, must copy the new value into wincontext and
298          * pfree the prior transValue.  But if transfn returned a pointer to its
299          * first input, we don't need to do anything.
300          */
301         if (!peraggstate->transtypeByVal &&
302                 DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
303         {
304                 if (!fcinfo->isnull)
305                 {
306                         MemoryContextSwitchTo(winstate->wincontext);
307                         newVal = datumCopy(newVal,
308                                                            peraggstate->transtypeByVal,
309                                                            peraggstate->transtypeLen);
310                 }
311                 if (!peraggstate->transValueIsNull)
312                         pfree(DatumGetPointer(peraggstate->transValue));
313         }
314
315         MemoryContextSwitchTo(oldContext);
316         peraggstate->transValue = newVal;
317         peraggstate->transValueIsNull = fcinfo->isnull;
318 }
319
320 /*
321  * finalize_windowaggregate
322  * parallel to finalize_aggregate in nodeAgg.c
323  */
324 static void
325 finalize_windowaggregate(WindowAggState *winstate,
326                                                  WindowStatePerFunc perfuncstate,
327                                                  WindowStatePerAgg peraggstate,
328                                                  Datum *result, bool *isnull)
329 {
330         MemoryContext                   oldContext;
331
332         oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
333
334         /*
335          * Apply the agg's finalfn if one is provided, else return transValue.
336          */
337         if (OidIsValid(peraggstate->finalfn_oid))
338         {
339                 FunctionCallInfoData    fcinfo;
340
341                 InitFunctionCallInfoData(fcinfo, &(peraggstate->finalfn), 1,
342                                                                  (void *) winstate, NULL);
343                 fcinfo.arg[0] = peraggstate->transValue;
344                 fcinfo.argnull[0] = peraggstate->transValueIsNull;
345                 if (fcinfo.flinfo->fn_strict && peraggstate->transValueIsNull)
346                 {
347                         /* don't call a strict function with NULL inputs */
348                         *result = (Datum) 0;
349                         *isnull = true;
350                 }
351                 else
352                 {
353                         *result = FunctionCallInvoke(&fcinfo);
354                         *isnull = fcinfo.isnull;
355                 }
356         }
357         else
358         {
359                 *result = peraggstate->transValue;
360                 *isnull = peraggstate->transValueIsNull;
361         }
362
363         /*
364          * If result is pass-by-ref, make sure it is in the right context.
365          */
366         if (!peraggstate->resulttypeByVal && !*isnull &&
367                 !MemoryContextContains(CurrentMemoryContext,
368                                                            DatumGetPointer(*result)))
369                 *result = datumCopy(*result,
370                                                         peraggstate->resulttypeByVal,
371                                                         peraggstate->resulttypeLen);
372         MemoryContextSwitchTo(oldContext);
373 }
374
375 /*
376  * eval_windowaggregates
377  * evaluate plain aggregates being used as window functions
378  *
379  * Much of this is duplicated from nodeAgg.c.  But NOTE that we expect to be
380  * able to call aggregate final functions repeatedly after aggregating more
381  * data onto the same transition value.  This is not a behavior required by
382  * nodeAgg.c.
383  */
384 static void
385 eval_windowaggregates(WindowAggState *winstate)
386 {
387         WindowStatePerAgg   peraggstate;
388         int                                     wfuncno, numaggs;
389         int                                     i;
390         MemoryContext           oldContext;
391         ExprContext                *econtext;
392         TupleTableSlot     *agg_row_slot;
393
394         numaggs = winstate->numaggs;
395         if (numaggs == 0)
396                 return;                                 /* nothing to do */
397
398         /* final output execution is in ps_ExprContext */
399         econtext = winstate->ss.ps.ps_ExprContext;
400
401         /*
402          * Currently, we support only a subset of the SQL-standard window framing
403          * rules.  In all the supported cases, the window frame always consists
404          * of a contiguous group of rows extending forward from the start of the
405          * partition, and rows only enter the frame, never exit it, as the
406          * current row advances forward.  This makes it possible to use an
407          * incremental strategy for evaluating aggregates: we run the transition
408          * function for each row added to the frame, and run the final function
409          * whenever we need the current aggregate value.  This is considerably
410          * more efficient than the naive approach of re-running the entire
411          * aggregate calculation for each current row.  It does assume that the
412          * final function doesn't damage the running transition value.  (Some
413          * C-coded aggregates do that for efficiency's sake --- but they are
414          * supposed to do so only when their fcinfo->context is an AggState, not
415          * a WindowAggState.)
416          *
417          * In many common cases, multiple rows share the same frame and hence
418          * the same aggregate value. (In particular, if there's no ORDER BY in
419          * a RANGE window, then all rows are peers and so they all have window
420          * frame equal to the whole partition.)  We optimize such cases by
421          * calculating the aggregate value once when we reach the first row of a
422          * peer group, and then returning the saved value for all subsequent rows.
423          *
424          * 'aggregatedupto' keeps track of the first row that has not yet been
425          * accumulated into the aggregate transition values.  Whenever we start a
426          * new peer group, we accumulate forward to the end of the peer group.
427          *
428          * TODO: In the future, we should implement the full SQL-standard set
429          * of framing rules.  We could implement the other cases by recalculating
430          * the aggregates whenever a row exits the frame.  That would be pretty
431          * slow, though.  For aggregates like SUM and COUNT we could implement a
432          * "negative transition function" that would be called for each row as it
433          * exits the frame.  We'd have to think about avoiding recalculation of
434          * volatile arguments of aggregate functions, too.
435          */
436
437         /*
438          * If we've already aggregated up through current row, reuse the
439          * saved result values.  NOTE: this test works for the currently
440          * supported framing rules, but will need fixing when more are added.
441          */
442         if (winstate->aggregatedupto > winstate->currentpos)
443         {
444                 for (i = 0; i < numaggs; i++)
445                 {
446                         peraggstate = &winstate->peragg[i];
447                         wfuncno = peraggstate->wfuncno;
448                         econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue;
449                         econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull;
450                 }
451                 return;
452         }
453
454         /* Initialize aggregates on first call for partition */
455         if (winstate->currentpos == 0)
456         {
457                 for (i = 0; i < numaggs; i++)
458                 {
459                         peraggstate = &winstate->peragg[i];
460                         wfuncno = peraggstate->wfuncno;
461                         initialize_windowaggregate(winstate,
462                                                                            &winstate->perfunc[wfuncno],
463                                                                            peraggstate);
464                 }
465         }
466
467         /*
468          * Advance until we reach a row not in frame (or end of partition).
469          *
470          * Note the loop invariant: agg_row_slot is either empty or holds the
471          * row at position aggregatedupto.  The agg_ptr read pointer must always
472          * point to the next row to read into agg_row_slot.
473          */
474         agg_row_slot = winstate->agg_row_slot;
475         for (;;)
476         {
477                 /* Fetch next row if we didn't already */
478                 if (TupIsNull(agg_row_slot))
479                 {
480                         spool_tuples(winstate, winstate->aggregatedupto);
481                         tuplestore_select_read_pointer(winstate->buffer,
482                                                                                    winstate->agg_ptr);
483                         if (!tuplestore_gettupleslot(winstate->buffer, true, agg_row_slot))
484                                 break;                  /* must be end of partition */
485                 }
486
487                 /* Exit loop (for now) if not in frame */
488                 if (!row_is_in_frame(winstate, winstate->aggregatedupto, agg_row_slot))
489                         break;
490
491                 /* Set tuple context for evaluation of aggregate arguments */
492                 winstate->tmpcontext->ecxt_outertuple = agg_row_slot;
493
494                 /* Accumulate row into the aggregates */
495                 for (i = 0; i < numaggs; i++)
496                 {
497                         peraggstate = &winstate->peragg[i];
498                         wfuncno = peraggstate->wfuncno;
499                         advance_windowaggregate(winstate,
500                                                                         &winstate->perfunc[wfuncno],
501                                                                         peraggstate);
502                 }
503
504                 /* Reset per-input-tuple context after each tuple */
505                 ResetExprContext(winstate->tmpcontext);
506
507                 /* And advance the aggregated-row state */
508                 winstate->aggregatedupto++;
509                 ExecClearTuple(agg_row_slot);
510         }
511
512         /*
513          * finalize aggregates and fill result/isnull fields.
514          */
515         for (i = 0; i < numaggs; i++)
516         {
517                 Datum      *result;
518                 bool       *isnull;
519
520                 peraggstate = &winstate->peragg[i];
521                 wfuncno = peraggstate->wfuncno;
522                 result = &econtext->ecxt_aggvalues[wfuncno];
523                 isnull = &econtext->ecxt_aggnulls[wfuncno];
524                 finalize_windowaggregate(winstate,
525                                                                  &winstate->perfunc[wfuncno],
526                                                                  peraggstate,
527                                                                  result, isnull);
528
529                 /*
530                  * save the result in case next row shares the same frame.
531                  *
532                  * XXX in some framing modes, eg ROWS/END_CURRENT_ROW, we can know
533                  * in advance that the next row can't possibly share the same frame.
534                  * Is it worth detecting that and skipping this code?
535                  */
536                 if (!peraggstate->resulttypeByVal)
537                 {
538                         /*
539                          * clear old resultValue in order not to leak memory.  (Note:
540                          * the new result can't possibly be the same datum as old
541                          * resultValue, because we never passed it to the trans function.)
542                          */
543                         if (!peraggstate->resultValueIsNull)
544                                 pfree(DatumGetPointer(peraggstate->resultValue));
545
546                         /*
547                          * If pass-by-ref, copy it into our global context.
548                          */
549                         if (!*isnull)
550                         {
551                                 oldContext = MemoryContextSwitchTo(winstate->wincontext);
552                                 peraggstate->resultValue =
553                                         datumCopy(*result,
554                                                           peraggstate->resulttypeByVal,
555                                                           peraggstate->resulttypeLen);
556                                 MemoryContextSwitchTo(oldContext);
557                         }
558                 }
559                 else
560                 {
561                         peraggstate->resultValue = *result;
562                 }
563                 peraggstate->resultValueIsNull = *isnull;
564         }
565 }
566
567 /*
568  * eval_windowfunction
569  *
570  * Arguments of window functions are not evaluated here, because a window
571  * function can need random access to arbitrary rows in the partition.
572  * The window function uses the special WinGetFuncArgInPartition and
573  * WinGetFuncArgInFrame functions to evaluate the arguments for the rows
574  * it wants.
575  */
576 static void
577 eval_windowfunction(WindowAggState *winstate, WindowStatePerFunc perfuncstate,
578                                         Datum *result, bool *isnull)
579 {
580         FunctionCallInfoData fcinfo;
581         MemoryContext           oldContext;
582
583         oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
584
585         /*
586          * We don't pass any normal arguments to a window function, but we do
587          * pass it the number of arguments, in order to permit window function
588          * implementations to support varying numbers of arguments.  The real
589          * info goes through the WindowObject, which is passed via fcinfo->context.
590          */
591         InitFunctionCallInfoData(fcinfo, &(perfuncstate->flinfo),
592                                                          perfuncstate->numArguments,
593                                                          (void *) perfuncstate->winobj, NULL);
594         /* Just in case, make all the regular argument slots be null */
595         memset(fcinfo.argnull, true, perfuncstate->numArguments);
596
597         *result = FunctionCallInvoke(&fcinfo);
598         *isnull = fcinfo.isnull;
599
600         /*
601          * Make sure pass-by-ref data is allocated in the appropriate context.
602          * (We need this in case the function returns a pointer into some
603          * short-lived tuple, as is entirely possible.)
604          */
605         if (!perfuncstate->resulttypeByVal && !fcinfo.isnull &&
606                 !MemoryContextContains(CurrentMemoryContext,
607                                                            DatumGetPointer(*result)))
608                 *result = datumCopy(*result,
609                                                         perfuncstate->resulttypeByVal,
610                                                         perfuncstate->resulttypeLen);
611
612         MemoryContextSwitchTo(oldContext);
613 }
614
615 /*
616  * begin_partition
617  * Start buffering rows of the next partition.
618  */
619 static void
620 begin_partition(WindowAggState *winstate)
621 {
622         PlanState          *outerPlan = outerPlanState(winstate);
623         int                             numfuncs = winstate->numfuncs;
624         int                             i;
625
626         winstate->partition_spooled = false;
627         winstate->frametail_valid = false;
628         winstate->spooled_rows = 0;
629         winstate->currentpos = 0;
630         winstate->frametailpos = -1;
631         winstate->aggregatedupto = 0;
632         ExecClearTuple(winstate->agg_row_slot);
633
634         /*
635          * If this is the very first partition, we need to fetch the first
636          * input row to store in first_part_slot.
637          */
638         if (TupIsNull(winstate->first_part_slot))
639         {
640                 TupleTableSlot *outerslot = ExecProcNode(outerPlan);
641
642                 if (!TupIsNull(outerslot))
643                          ExecCopySlot(winstate->first_part_slot, outerslot);
644                 else
645                 {
646                         /* outer plan is empty, so we have nothing to do */
647                         winstate->partition_spooled = true;
648                         winstate->more_partitions = false;
649                         return;
650                 }
651         }
652
653         /* Create new tuplestore for this partition */
654         winstate->buffer = tuplestore_begin_heap(false, false, work_mem);
655
656         /*
657          * Set up read pointers for the tuplestore.  The current and agg pointers
658          * don't need BACKWARD capability, but the per-window-function read
659          * pointers do.
660          */
661         winstate->current_ptr = 0;      /* read pointer 0 is pre-allocated */
662
663         /* reset default REWIND capability bit for current ptr */
664         tuplestore_set_eflags(winstate->buffer, 0);
665
666         /* create a read pointer for aggregates, if needed */
667         if (winstate->numaggs > 0)
668                 winstate->agg_ptr = tuplestore_alloc_read_pointer(winstate->buffer, 0);
669
670         /* create mark and read pointers for each real window function */
671         for (i = 0; i < numfuncs; i++)
672         {
673                 WindowStatePerFunc      perfuncstate = &(winstate->perfunc[i]);
674
675                 if (!perfuncstate->plain_agg)
676                 {
677                         WindowObject    winobj = perfuncstate->winobj;
678
679                         winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer,
680                                                                                                                         0);
681                         winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
682                                                                                                                         EXEC_FLAG_BACKWARD);
683                         winobj->markpos = -1;
684                         winobj->seekpos = -1;
685                 }
686         }
687
688         /*
689          * Store the first tuple into the tuplestore (it's always available now;
690          * we either read it above, or saved it at the end of previous partition)
691          */
692         tuplestore_puttupleslot(winstate->buffer, winstate->first_part_slot);
693         winstate->spooled_rows++;
694 }
695
696 /*
697  * Read tuples from the outer node, up to position 'pos', and store them
698  * into the tuplestore. If pos is -1, reads the whole partition.
699  */
700 static void
701 spool_tuples(WindowAggState *winstate, int64 pos)
702 {
703         WindowAgg          *node = (WindowAgg *) winstate->ss.ps.plan;
704         PlanState          *outerPlan;
705         TupleTableSlot *outerslot;
706         MemoryContext oldcontext;
707
708         if (!winstate->buffer)
709                 return;                                 /* just a safety check */
710         if (winstate->partition_spooled)
711                 return;                                 /* whole partition done already */
712
713         /*
714          * If the tuplestore has spilled to disk, alternate reading and writing
715          * becomes quite expensive due to frequent buffer flushes.  It's cheaper
716          * to force the entire partition to get spooled in one go.
717          *
718          * XXX this is a horrid kluge --- it'd be better to fix the performance
719          * problem inside tuplestore.  FIXME
720          */
721         if (!tuplestore_in_memory(winstate->buffer))
722                 pos = -1;
723
724         outerPlan = outerPlanState(winstate);
725
726         /* Must be in query context to call outerplan or touch tuplestore */
727         oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
728
729         while (winstate->spooled_rows <= pos || pos == -1)
730         {
731                 outerslot = ExecProcNode(outerPlan);
732                 if (TupIsNull(outerslot))
733                 {
734                         /* reached the end of the last partition */
735                         winstate->partition_spooled = true;
736                         winstate->more_partitions = false;
737                         break;
738                 }
739
740                 if (node->partNumCols > 0)
741                 {
742                         /* Check if this tuple still belongs to the current partition */
743                         if (!execTuplesMatch(winstate->first_part_slot,
744                                                                  outerslot,
745                                                                  node->partNumCols, node->partColIdx,
746                                                                  winstate->partEqfunctions,
747                                                                  winstate->tmpcontext->ecxt_per_tuple_memory))
748                         {
749                                 /*
750                                  * end of partition; copy the tuple for the next cycle.
751                                  */
752                                 ExecCopySlot(winstate->first_part_slot, outerslot);
753                                 winstate->partition_spooled = true;
754                                 winstate->more_partitions = true;
755                                 break;
756                         }
757                 }
758
759                 /* Still in partition, so save it into the tuplestore */
760                 tuplestore_puttupleslot(winstate->buffer, outerslot);
761                 winstate->spooled_rows++;
762         }
763
764         MemoryContextSwitchTo(oldcontext);
765 }
766
767 /*
768  * release_partition
769  * clear information kept within a partition, including
770  * tuplestore and aggregate results.
771  */
772 static void
773 release_partition(WindowAggState *winstate)
774 {
775         int                                     i;
776
777         for (i = 0; i < winstate->numfuncs; i++)
778         {
779                 WindowStatePerFunc              perfuncstate = &(winstate->perfunc[i]);
780
781                 /* Release any partition-local state of this window function */
782                 if (perfuncstate->winobj)
783                         perfuncstate->winobj->localmem = NULL;
784         }
785
786         /*
787          * Release all partition-local memory (in particular, any partition-local
788          * state that we might have trashed our pointers to in the above loop, and
789          * any aggregate temp data).  We don't rely on retail pfree because some
790          * aggregates might have allocated data we don't have direct pointers to.
791          */
792         MemoryContextResetAndDeleteChildren(winstate->wincontext);
793
794         if (winstate->buffer)
795                 tuplestore_end(winstate->buffer);
796         winstate->buffer = NULL;
797         winstate->partition_spooled = false;
798 }
799
800 /*
801  * row_is_in_frame
802  * Determine whether a row is in the current row's window frame according
803  * to our window framing rule
804  *
805  * The caller must have already determined that the row is in the partition
806  * and fetched it into a slot.  This function just encapsulates the framing
807  * rules.
808  */
809 static bool
810 row_is_in_frame(WindowAggState *winstate, int64 pos, TupleTableSlot *slot)
811 {
812         WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
813         int                     frameOptions = node->frameOptions;
814
815         Assert(pos >= 0);                       /* else caller error */
816
817         /* We only support frame start mode UNBOUNDED PRECEDING for now */
818         Assert(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING);
819
820         /* In UNBOUNDED FOLLOWING mode, all partition rows are in frame */
821         if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING)
822                 return true;
823
824         /* Else frame tail mode must be CURRENT ROW */
825         Assert(frameOptions & FRAMEOPTION_END_CURRENT_ROW);
826
827         /* if row is current row or a predecessor, it must be in frame */
828         if (pos <= winstate->currentpos)
829                 return true;
830
831         /* In ROWS mode, *only* such rows are in frame */
832         if (frameOptions & FRAMEOPTION_ROWS)
833                 return false;
834
835         /* Else must be RANGE mode */
836         Assert(frameOptions & FRAMEOPTION_RANGE);
837
838         /* In frame iff it's a peer of current row */
839         return are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot);
840 }
841
842 /*
843  * update_frametailpos
844  * make frametailpos valid for the current row
845  *
846  * Uses the winobj's read pointer for any required fetches; the winobj's
847  * mark must not be past the currently known frame tail.  Also uses the
848  * specified slot for any required fetches.
849  */
850 static void
851 update_frametailpos(WindowObject winobj, TupleTableSlot *slot)
852 {
853         WindowAggState *winstate = winobj->winstate;
854         WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
855         int                     frameOptions = node->frameOptions;
856         int64           ftnext;
857
858         if (winstate->frametail_valid)
859                 return;                                 /* already known for current row */
860
861         /* We only support frame start mode UNBOUNDED PRECEDING for now */
862         Assert(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING);
863
864         /* In UNBOUNDED FOLLOWING mode, all partition rows are in frame */
865         if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING)
866         {
867                 spool_tuples(winstate, -1);
868                 winstate->frametailpos = winstate->spooled_rows - 1;
869                 winstate->frametail_valid = true;
870                 return;
871         }
872
873         /* Else frame tail mode must be CURRENT ROW */
874         Assert(frameOptions & FRAMEOPTION_END_CURRENT_ROW);
875
876         /* In ROWS mode, exactly the rows up to current are in frame */
877         if (frameOptions & FRAMEOPTION_ROWS)
878         {
879                 winstate->frametailpos = winstate->currentpos;
880                 winstate->frametail_valid = true;
881                 return;
882         }
883
884         /* Else must be RANGE mode */
885         Assert(frameOptions & FRAMEOPTION_RANGE);
886
887         /* If no ORDER BY, all rows are peers with each other */
888         if (node->ordNumCols == 0)
889         {
890                 spool_tuples(winstate, -1);
891                 winstate->frametailpos = winstate->spooled_rows - 1;
892                 winstate->frametail_valid = true;
893                 return;
894         }
895
896         /*
897          * Else we have to search for the first non-peer of the current row.
898          * We assume the current value of frametailpos is a lower bound on the
899          * possible frame tail location, ie, frame tail never goes backward, and
900          * that currentpos is also a lower bound, ie, current row is always in
901          * frame.
902          */
903         ftnext = Max(winstate->frametailpos, winstate->currentpos) + 1;
904         for (;;)
905         {
906                 if (!window_gettupleslot(winobj, ftnext, slot))
907                         break;                          /* end of partition */
908                 if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
909                         break;                          /* not peer of current row */
910                 ftnext++;
911         }
912         winstate->frametailpos = ftnext - 1;
913         winstate->frametail_valid = true;
914 }
915
916
917 /* -----------------
918  * ExecWindowAgg
919  *
920  *      ExecWindowAgg receives tuples from its outer subplan and
921  *      stores them into a tuplestore, then processes window functions.
922  *      This node doesn't reduce nor qualify any row so the number of
923  *      returned rows is exactly the same as its outer subplan's result
924  *      (ignoring the case of SRFs in the targetlist, that is).
925  * -----------------
926  */
927 TupleTableSlot *
928 ExecWindowAgg(WindowAggState *winstate)
929 {
930         TupleTableSlot *result;
931         ExprDoneCond    isDone;
932         ExprContext        *econtext;
933         int                             i;
934         int                             numfuncs;
935
936         if (winstate->all_done)
937                 return NULL;
938
939         /*
940          * Check to see if we're still projecting out tuples from a previous output
941          * tuple (because there is a function-returning-set in the projection
942          * expressions).  If so, try to project another one.
943          */
944         if (winstate->ss.ps.ps_TupFromTlist)
945         {
946                 TupleTableSlot *result;
947                 ExprDoneCond isDone;
948
949                 result = ExecProject(winstate->ss.ps.ps_ProjInfo, &isDone);
950                 if (isDone == ExprMultipleResult)
951                         return result;
952                 /* Done with that source tuple... */
953                 winstate->ss.ps.ps_TupFromTlist = false;
954         }
955
956 restart:
957         if (winstate->buffer == NULL)
958         {
959                 /* Initialize for first partition and set current row = 0 */
960                 begin_partition(winstate);
961                 /* If there are no input rows, we'll detect that and exit below */
962         }
963         else
964         {
965                 /* Advance current row within partition */
966                 winstate->currentpos++;
967                 /* This might mean that the frame tail moves, too */
968                 winstate->frametail_valid = false;
969         }
970
971         /*
972          * Spool all tuples up to and including the current row, if we haven't
973          * already
974          */
975         spool_tuples(winstate, winstate->currentpos);
976
977         /* Move to the next partition if we reached the end of this partition */
978         if (winstate->partition_spooled &&
979                 winstate->currentpos >= winstate->spooled_rows)
980         {
981                 release_partition(winstate);
982
983                 if (winstate->more_partitions)
984                 {
985                         begin_partition(winstate);
986                         Assert(winstate->spooled_rows > 0);
987                 }
988                 else
989                 {
990                         winstate->all_done = true;
991                         return NULL;
992                 }
993         }
994
995         /* final output execution is in ps_ExprContext */
996         econtext = winstate->ss.ps.ps_ExprContext;
997
998         /* Clear the per-output-tuple context for current row */
999         ResetExprContext(econtext);
1000
1001         /*
1002          * Read the current row from the tuplestore, and save in ScanTupleSlot.
1003          * (We can't rely on the outerplan's output slot because we may have to
1004          * read beyond the current row.)
1005          *
1006          * Current row must be in the tuplestore, since we spooled it above.
1007          */
1008         tuplestore_select_read_pointer(winstate->buffer, winstate->current_ptr);
1009         if (!tuplestore_gettupleslot(winstate->buffer, true,
1010                                                                  winstate->ss.ss_ScanTupleSlot))
1011                 elog(ERROR, "unexpected end of tuplestore");
1012
1013         /*
1014          * Evaluate true window functions
1015          */
1016         numfuncs = winstate->numfuncs;
1017         for (i = 0; i < numfuncs; i++)
1018         {
1019                 WindowStatePerFunc      perfuncstate = &(winstate->perfunc[i]);
1020
1021                 if (perfuncstate->plain_agg)
1022                         continue;
1023                 eval_windowfunction(winstate, perfuncstate,
1024                                                         &(econtext->ecxt_aggvalues[perfuncstate->wfuncstate->wfuncno]),
1025                                                         &(econtext->ecxt_aggnulls[perfuncstate->wfuncstate->wfuncno]));
1026         }
1027
1028         /*
1029          * Evaluate aggregates
1030          */
1031         if (winstate->numaggs > 0)
1032                 eval_windowaggregates(winstate);
1033
1034         /*
1035          * Truncate any no-longer-needed rows from the tuplestore.
1036          */
1037         tuplestore_trim(winstate->buffer);
1038
1039         /*
1040          * Form and return a projection tuple using the windowfunc results
1041          * and the current row.  Setting ecxt_outertuple arranges that any
1042          * Vars will be evaluated with respect to that row.
1043          */
1044         econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
1045         result = ExecProject(winstate->ss.ps.ps_ProjInfo, &isDone);
1046
1047         if (isDone == ExprEndResult)
1048         {
1049                 /* SRF in tlist returned no rows, so advance to next input tuple */
1050                 goto restart;
1051         }
1052
1053         winstate->ss.ps.ps_TupFromTlist =
1054                 (isDone == ExprMultipleResult);
1055         return result;
1056 }
1057
1058 /* -----------------
1059  * ExecInitWindowAgg
1060  *
1061  *      Creates the run-time information for the WindowAgg node produced by the
1062  *      planner and initializes its outer subtree
1063  * -----------------
1064  */
1065 WindowAggState *
1066 ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
1067 {
1068         WindowAggState *winstate;
1069         Plan       *outerPlan;
1070         ExprContext *econtext;
1071         ExprContext *tmpcontext;
1072         WindowStatePerFunc  perfunc;
1073         WindowStatePerAgg   peragg;
1074         int                     numfuncs,
1075                                 wfuncno,
1076                                 numaggs,
1077                                 aggno;
1078         ListCell   *l;
1079
1080         /* check for unsupported flags */
1081         Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
1082
1083         /*
1084          * create state structure
1085          */
1086         winstate = makeNode(WindowAggState);
1087         winstate->ss.ps.plan = (Plan *) node;
1088         winstate->ss.ps.state = estate;
1089
1090         /*
1091          * Create expression contexts.  We need two, one for per-input-tuple
1092          * processing and one for per-output-tuple processing.  We cheat a little
1093          * by using ExecAssignExprContext() to build both.
1094          */
1095         ExecAssignExprContext(estate, &winstate->ss.ps);
1096         tmpcontext = winstate->ss.ps.ps_ExprContext;
1097         winstate->tmpcontext = tmpcontext;
1098         ExecAssignExprContext(estate, &winstate->ss.ps);
1099
1100         /* Create long-lived context for storage of aggregate transvalues etc */
1101         winstate->wincontext =
1102                 AllocSetContextCreate(CurrentMemoryContext,
1103                                                           "WindowAggContext",
1104                                                           ALLOCSET_DEFAULT_MINSIZE,
1105                                                           ALLOCSET_DEFAULT_INITSIZE,
1106                                                           ALLOCSET_DEFAULT_MAXSIZE);
1107
1108 #define WINDOWAGG_NSLOTS 6
1109
1110         /*
1111          * tuple table initialization
1112          */
1113         ExecInitScanTupleSlot(estate, &winstate->ss);
1114         ExecInitResultTupleSlot(estate, &winstate->ss.ps);
1115         winstate->first_part_slot = ExecInitExtraTupleSlot(estate);
1116         winstate->agg_row_slot = ExecInitExtraTupleSlot(estate);
1117         winstate->temp_slot_1 = ExecInitExtraTupleSlot(estate);
1118         winstate->temp_slot_2 = ExecInitExtraTupleSlot(estate);
1119
1120         winstate->ss.ps.targetlist = (List *)
1121                 ExecInitExpr((Expr *) node->plan.targetlist,
1122                                          (PlanState *) winstate);
1123
1124         /*
1125          * WindowAgg nodes never have quals, since they can only occur at the
1126          * logical top level of a query (ie, after any WHERE or HAVING filters)
1127          */
1128         Assert(node->plan.qual == NIL);
1129         winstate->ss.ps.qual = NIL;
1130
1131         /*
1132          * initialize child nodes
1133          */
1134         outerPlan = outerPlan(node);
1135         outerPlanState(winstate) = ExecInitNode(outerPlan, estate, eflags);
1136
1137         /*
1138          * initialize source tuple type (which is also the tuple type that we'll
1139          * store in the tuplestore and use in all our working slots).
1140          */
1141         ExecAssignScanTypeFromOuterPlan(&winstate->ss);
1142
1143         ExecSetSlotDescriptor(winstate->first_part_slot,
1144                                                   winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
1145         ExecSetSlotDescriptor(winstate->agg_row_slot,
1146                                                   winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
1147         ExecSetSlotDescriptor(winstate->temp_slot_1,
1148                                                   winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
1149         ExecSetSlotDescriptor(winstate->temp_slot_2,
1150                                                   winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
1151
1152         /*
1153          * Initialize result tuple type and projection info.
1154          */
1155         ExecAssignResultTypeFromTL(&winstate->ss.ps);
1156         ExecAssignProjectionInfo(&winstate->ss.ps, NULL);
1157
1158         winstate->ss.ps.ps_TupFromTlist = false;
1159
1160         /* Set up data for comparing tuples */
1161         if (node->partNumCols > 0)
1162                 winstate->partEqfunctions = execTuplesMatchPrepare(node->partNumCols,
1163                                                                                                                   node->partOperators);
1164         if (node->ordNumCols > 0)
1165                 winstate->ordEqfunctions = execTuplesMatchPrepare(node->ordNumCols,
1166                                                                                                                   node->ordOperators);
1167
1168         /*
1169          * WindowAgg nodes use aggvalues and aggnulls as well as Agg nodes.
1170          */
1171         numfuncs = winstate->numfuncs;
1172         numaggs = winstate->numaggs;
1173         econtext = winstate->ss.ps.ps_ExprContext;
1174         econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numfuncs);
1175         econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numfuncs);
1176
1177         /*
1178          * allocate per-wfunc/per-agg state information.
1179          */
1180         perfunc = (WindowStatePerFunc) palloc0(sizeof(WindowStatePerFuncData) * numfuncs);
1181         peragg = (WindowStatePerAgg) palloc0(sizeof(WindowStatePerAggData) * numaggs);
1182         winstate->perfunc = perfunc;
1183         winstate->peragg = peragg;
1184
1185         wfuncno = -1;
1186         aggno = -1;
1187         foreach(l, winstate->funcs)
1188         {
1189                 WindowFuncExprState        *wfuncstate = (WindowFuncExprState *) lfirst(l);
1190                 WindowFunc                         *wfunc = (WindowFunc *) wfuncstate->xprstate.expr;
1191                 WindowStatePerFunc perfuncstate;
1192                 AclResult       aclresult;
1193                 int                     i;
1194
1195                 if (wfunc->winref != node->winref)                      /* planner screwed up? */
1196                         elog(ERROR, "WindowFunc with winref %u assigned to WindowAgg with winref %u",
1197                                  wfunc->winref, node->winref);
1198
1199                 /* Look for a previous duplicate window function */
1200                 for (i = 0; i <= wfuncno; i++)
1201                 {
1202                         if (equal(wfunc, perfunc[i].wfunc) &&
1203                                 !contain_volatile_functions((Node *) wfunc))
1204                                 break;
1205                 }
1206                 if (i <= wfuncno)
1207                 {
1208                         /* Found a match to an existing entry, so just mark it */
1209                         wfuncstate->wfuncno = i;
1210                         continue;
1211                 }
1212
1213                 /* Nope, so assign a new PerAgg record */
1214                 perfuncstate = &perfunc[++wfuncno];
1215
1216                 /* Mark WindowFunc state node with assigned index in the result array */
1217                 wfuncstate->wfuncno = wfuncno;
1218
1219                 /* Check permission to call window function */
1220                 aclresult = pg_proc_aclcheck(wfunc->winfnoid, GetUserId(),
1221                                                                          ACL_EXECUTE);
1222                 if (aclresult != ACLCHECK_OK)
1223                         aclcheck_error(aclresult, ACL_KIND_PROC,
1224                                                    get_func_name(wfunc->winfnoid));
1225
1226                 /* Fill in the perfuncstate data */
1227                 perfuncstate->wfuncstate = wfuncstate;
1228                 perfuncstate->wfunc = wfunc;
1229                 perfuncstate->numArguments = list_length(wfuncstate->args);
1230
1231                 fmgr_info_cxt(wfunc->winfnoid, &perfuncstate->flinfo,
1232                                           tmpcontext->ecxt_per_query_memory);
1233                 perfuncstate->flinfo.fn_expr = (Node *) wfunc;
1234                 get_typlenbyval(wfunc->wintype,
1235                                                 &perfuncstate->resulttypeLen,
1236                                                 &perfuncstate->resulttypeByVal);
1237
1238                 /*
1239                  * If it's really just a plain aggregate function,
1240                  * we'll emulate the Agg environment for it.
1241                  */
1242                 perfuncstate->plain_agg = wfunc->winagg;
1243                 if (wfunc->winagg)
1244                 {
1245                         WindowStatePerAgg       peraggstate;
1246
1247                         perfuncstate->aggno = ++aggno;
1248                         peraggstate = &winstate->peragg[aggno];
1249                         initialize_peragg(winstate, wfunc, peraggstate);
1250                         peraggstate->wfuncno = wfuncno;
1251                 }
1252                 else
1253                 {
1254                         WindowObject winobj = makeNode(WindowObjectData);
1255
1256                         winobj->winstate = winstate;
1257                         winobj->argstates = wfuncstate->args;
1258                         winobj->localmem = NULL;
1259                         perfuncstate->winobj = winobj;
1260                 }
1261         }
1262
1263         /* Update numfuncs, numaggs to match number of unique functions found */
1264         winstate->numfuncs = wfuncno + 1;
1265         winstate->numaggs = aggno + 1;
1266
1267         winstate->partition_spooled = false;
1268         winstate->more_partitions = false;
1269
1270         return winstate;
1271 }
1272
1273 /* -----------------
1274  * ExecCountSlotsWindowAgg
1275  * -----------------
1276  */
1277 int
1278 ExecCountSlotsWindowAgg(WindowAgg *node)
1279 {
1280         return ExecCountSlotsNode(outerPlan(node)) +
1281                 ExecCountSlotsNode(innerPlan(node)) +
1282                 WINDOWAGG_NSLOTS;
1283 }
1284
1285 /* -----------------
1286  * ExecEndWindowAgg
1287  * -----------------
1288  */
1289 void
1290 ExecEndWindowAgg(WindowAggState *node)
1291 {
1292         PlanState  *outerPlan;
1293
1294         release_partition(node);
1295
1296         pfree(node->perfunc);
1297         pfree(node->peragg);
1298
1299         ExecClearTuple(node->ss.ss_ScanTupleSlot);
1300         ExecClearTuple(node->first_part_slot);
1301         ExecClearTuple(node->agg_row_slot);
1302         ExecClearTuple(node->temp_slot_1);
1303         ExecClearTuple(node->temp_slot_2);
1304
1305         /*
1306          * Free both the expr contexts.
1307          */
1308         ExecFreeExprContext(&node->ss.ps);
1309         node->ss.ps.ps_ExprContext = node->tmpcontext;
1310         ExecFreeExprContext(&node->ss.ps);
1311
1312         MemoryContextDelete(node->wincontext);
1313
1314         outerPlan = outerPlanState(node);
1315         ExecEndNode(outerPlan);
1316 }
1317
1318 /* -----------------
1319  * ExecRescanWindowAgg
1320  * -----------------
1321  */
1322 void
1323 ExecReScanWindowAgg(WindowAggState *node, ExprContext *exprCtxt)
1324 {
1325         ExprContext        *econtext = node->ss.ps.ps_ExprContext;
1326
1327         node->all_done = false;
1328
1329         node->ss.ps.ps_TupFromTlist = false;
1330
1331         /* release tuplestore et al */
1332         release_partition(node);
1333
1334         /* release all temp tuples, but especially first_part_slot */
1335         ExecClearTuple(node->ss.ss_ScanTupleSlot);
1336         ExecClearTuple(node->first_part_slot);
1337         ExecClearTuple(node->agg_row_slot);
1338         ExecClearTuple(node->temp_slot_1);
1339         ExecClearTuple(node->temp_slot_2);
1340
1341         /* Forget current wfunc values */
1342         MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numfuncs);
1343         MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numfuncs);
1344
1345         /*
1346          * if chgParam of subnode is not null then plan will be re-scanned by
1347          * first ExecProcNode.
1348          */
1349         if (((PlanState *) node)->lefttree->chgParam == NULL)
1350                 ExecReScan(((PlanState *) node)->lefttree, exprCtxt);
1351 }
1352
1353 /*
1354  * initialize_peragg
1355  *
1356  * Almost same as in nodeAgg.c, except we don't support DISTINCT currently.
1357  */
1358 static WindowStatePerAggData *
1359 initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
1360                                   WindowStatePerAgg peraggstate)
1361 {
1362         Oid                     inputTypes[FUNC_MAX_ARGS];
1363         int                     numArguments;
1364         HeapTuple       aggTuple;
1365         Form_pg_aggregate aggform;
1366         Oid                     aggtranstype;
1367         AclResult       aclresult;
1368         Oid                     transfn_oid,
1369                                 finalfn_oid;
1370         Expr       *transfnexpr,
1371                            *finalfnexpr;
1372         Datum           textInitVal;
1373         int                     i;
1374         ListCell   *lc;
1375
1376         numArguments = list_length(wfunc->args);
1377
1378         i = 0;
1379         foreach(lc, wfunc->args)
1380         {
1381                 inputTypes[i++] = exprType((Node *) lfirst(lc));
1382         }
1383
1384         aggTuple = SearchSysCache(AGGFNOID,
1385                                                           ObjectIdGetDatum(wfunc->winfnoid),
1386                                                           0, 0, 0);
1387         if (!HeapTupleIsValid(aggTuple))
1388                 elog(ERROR, "cache lookup failed for aggregate %u",
1389                          wfunc->winfnoid);
1390         aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
1391
1392         /*
1393          * ExecInitWindowAgg already checked permission to call aggregate function
1394          * ... but we still need to check the component functions
1395          */
1396
1397         peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn;
1398         peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
1399
1400         /* Check that aggregate owner has permission to call component fns */
1401         {
1402                 HeapTuple       procTuple;
1403                 Oid                     aggOwner;
1404
1405                 procTuple = SearchSysCache(PROCOID,
1406                                                                    ObjectIdGetDatum(wfunc->winfnoid),
1407                                                                    0, 0, 0);
1408                 if (!HeapTupleIsValid(procTuple))
1409                         elog(ERROR, "cache lookup failed for function %u",
1410                                  wfunc->winfnoid);
1411                 aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
1412                 ReleaseSysCache(procTuple);
1413
1414                 aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
1415                                                                          ACL_EXECUTE);
1416                 if (aclresult != ACLCHECK_OK)
1417                         aclcheck_error(aclresult, ACL_KIND_PROC,
1418                                                    get_func_name(transfn_oid));
1419                 if (OidIsValid(finalfn_oid))
1420                 {
1421                         aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
1422                                                                                  ACL_EXECUTE);
1423                         if (aclresult != ACLCHECK_OK)
1424                                 aclcheck_error(aclresult, ACL_KIND_PROC,
1425                                                            get_func_name(finalfn_oid));
1426                 }
1427         }
1428
1429         /* resolve actual type of transition state, if polymorphic */
1430         aggtranstype = aggform->aggtranstype;
1431         if (IsPolymorphicType(aggtranstype))
1432         {
1433                 /* have to fetch the agg's declared input types... */
1434                 Oid                *declaredArgTypes;
1435                 int                     agg_nargs;
1436
1437                 get_func_signature(wfunc->winfnoid,
1438                                                    &declaredArgTypes, &agg_nargs);
1439                 Assert(agg_nargs == numArguments);
1440                 aggtranstype = enforce_generic_type_consistency(inputTypes,
1441                                                                                                                 declaredArgTypes,
1442                                                                                                                 agg_nargs,
1443                                                                                                                 aggtranstype,
1444                                                                                                                 false);
1445                 pfree(declaredArgTypes);
1446         }
1447
1448         /* build expression trees using actual argument & result types */
1449         build_aggregate_fnexprs(inputTypes,
1450                                                         numArguments,
1451                                                         aggtranstype,
1452                                                         wfunc->wintype,
1453                                                         transfn_oid,
1454                                                         finalfn_oid,
1455                                                         &transfnexpr,
1456                                                         &finalfnexpr);
1457
1458         fmgr_info(transfn_oid, &peraggstate->transfn);
1459         peraggstate->transfn.fn_expr = (Node *) transfnexpr;
1460
1461         if (OidIsValid(finalfn_oid))
1462         {
1463                 fmgr_info(finalfn_oid, &peraggstate->finalfn);
1464                 peraggstate->finalfn.fn_expr = (Node *) finalfnexpr;
1465         }
1466
1467         get_typlenbyval(wfunc->wintype,
1468                                         &peraggstate->resulttypeLen,
1469                                         &peraggstate->resulttypeByVal);
1470         get_typlenbyval(aggtranstype,
1471                                         &peraggstate->transtypeLen,
1472                                         &peraggstate->transtypeByVal);
1473
1474         /*
1475          * initval is potentially null, so don't try to access it as a struct
1476          * field. Must do it the hard way with SysCacheGetAttr.
1477          */
1478         textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,
1479                                                                   Anum_pg_aggregate_agginitval,
1480                                                                   &peraggstate->initValueIsNull);
1481
1482         if (peraggstate->initValueIsNull)
1483                 peraggstate->initValue = (Datum) 0;
1484         else
1485                 peraggstate->initValue = GetAggInitVal(textInitVal,
1486                                                                                            aggtranstype);
1487
1488         /*
1489          * If the transfn is strict and the initval is NULL, make sure input
1490          * type and transtype are the same (or at least binary-compatible), so
1491          * that it's OK to use the first input value as the initial
1492          * transValue.  This should have been checked at agg definition time,
1493          * but just in case...
1494          */
1495         if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull)
1496         {
1497                 if (numArguments < 1 ||
1498                         !IsBinaryCoercible(inputTypes[0], aggtranstype))
1499                         ereport(ERROR,
1500                                         (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
1501                                          errmsg("aggregate %u needs to have compatible input type and transition type",
1502                                                         wfunc->winfnoid)));
1503         }
1504
1505         ReleaseSysCache(aggTuple);
1506
1507         return peraggstate;
1508 }
1509
1510 static Datum
1511 GetAggInitVal(Datum textInitVal, Oid transtype)
1512 {
1513         Oid                     typinput,
1514                                 typioparam;
1515         char       *strInitVal;
1516         Datum           initVal;
1517
1518         getTypeInputInfo(transtype, &typinput, &typioparam);
1519         strInitVal = TextDatumGetCString(textInitVal);
1520         initVal = OidInputFunctionCall(typinput, strInitVal,
1521                                                                    typioparam, -1);
1522         pfree(strInitVal);
1523         return initVal;
1524 }
1525
1526 /*
1527  * are_peers
1528  * compare two rows to see if they are equal according to the ORDER BY clause
1529  *
1530  * NB: this does not consider the window frame mode.
1531  */
1532 static bool
1533 are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
1534                   TupleTableSlot *slot2)
1535 {
1536         WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
1537
1538         /* If no ORDER BY, all rows are peers with each other */
1539         if (node->ordNumCols == 0)
1540                 return true;
1541
1542         return execTuplesMatch(slot1, slot2,
1543                                                    node->ordNumCols, node->ordColIdx,
1544                                                    winstate->ordEqfunctions,
1545                                                    winstate->tmpcontext->ecxt_per_tuple_memory);
1546 }
1547
1548 /*
1549  * window_gettupleslot
1550  *      Fetch the pos'th tuple of the current partition into the slot,
1551  *      using the winobj's read pointer
1552  *
1553  * Returns true if successful, false if no such row
1554  */
1555 static bool
1556 window_gettupleslot(WindowObject winobj, int64 pos, TupleTableSlot *slot)
1557 {
1558         WindowAggState *winstate = winobj->winstate;
1559         MemoryContext oldcontext;
1560
1561         /* Don't allow passing -1 to spool_tuples here */
1562         if (pos < 0)
1563                 return false;
1564
1565         /* If necessary, fetch the tuple into the spool */
1566         spool_tuples(winstate, pos);
1567
1568         if (pos >= winstate->spooled_rows)
1569                 return false;
1570
1571         if (pos < winobj->markpos)
1572                 elog(ERROR, "cannot fetch row before WindowObject's mark position");
1573
1574         oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
1575
1576         tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
1577
1578         /*
1579          * There's no API to refetch the tuple at the current position. We
1580          * have to move one tuple forward, and then one backward.  (We don't
1581          * do it the other way because we might try to fetch the row before
1582          * our mark, which isn't allowed.)
1583          */
1584         if (winobj->seekpos == pos)
1585         {
1586                 tuplestore_advance(winstate->buffer, true);
1587                 winobj->seekpos++;
1588         }
1589
1590         while (winobj->seekpos > pos)
1591         {
1592                 if (!tuplestore_gettupleslot(winstate->buffer, false, slot))
1593                         elog(ERROR, "unexpected end of tuplestore");
1594                 winobj->seekpos--;
1595         }
1596
1597         while (winobj->seekpos < pos)
1598         {
1599                 if (!tuplestore_gettupleslot(winstate->buffer, true, slot))
1600                         elog(ERROR, "unexpected end of tuplestore");
1601                 winobj->seekpos++;
1602         }
1603
1604         MemoryContextSwitchTo(oldcontext);
1605
1606         return true;
1607 }
1608
1609
1610 /***********************************************************************
1611  * API exposed to window functions
1612  ***********************************************************************/
1613
1614
1615 /*
1616  * WinGetPartitionLocalMemory
1617  *              Get working memory that lives till end of partition processing
1618  *
1619  * On first call within a given partition, this allocates and zeroes the
1620  * requested amount of space.  Subsequent calls just return the same chunk.
1621  *
1622  * Memory obtained this way is normally used to hold state that should be
1623  * automatically reset for each new partition.  If a window function wants
1624  * to hold state across the whole query, fcinfo->fn_extra can be used in the
1625  * usual way for that.
1626  */
1627 void *
1628 WinGetPartitionLocalMemory(WindowObject winobj, Size sz)
1629 {
1630         Assert(WindowObjectIsValid(winobj));
1631         if (winobj->localmem == NULL)
1632                 winobj->localmem = MemoryContextAllocZero(winobj->winstate->wincontext,
1633                                                                                                   sz);
1634         return winobj->localmem;
1635 }
1636
1637 /*
1638  * WinGetCurrentPosition
1639  *              Return the current row's position (counting from 0) within the current
1640  *              partition.
1641  */
1642 int64
1643 WinGetCurrentPosition(WindowObject winobj)
1644 {
1645         Assert(WindowObjectIsValid(winobj));
1646         return winobj->winstate->currentpos;
1647 }
1648
1649 /*
1650  * WinGetPartitionRowCount
1651  *              Return total number of rows contained in the current partition.
1652  *
1653  * Note: this is a relatively expensive operation because it forces the
1654  * whole partition to be "spooled" into the tuplestore at once.  Once
1655  * executed, however, additional calls within the same partition are cheap.
1656  */
1657 int64
1658 WinGetPartitionRowCount(WindowObject winobj)
1659 {
1660         Assert(WindowObjectIsValid(winobj));
1661         spool_tuples(winobj->winstate, -1);
1662         return winobj->winstate->spooled_rows;
1663 }
1664
1665 /*
1666  * WinSetMarkPosition
1667  *              Set the "mark" position for the window object, which is the oldest row
1668  *              number (counting from 0) it is allowed to fetch during all subsequent
1669  *              operations within the current partition.
1670  *
1671  * Window functions do not have to call this, but are encouraged to move the
1672  * mark forward when possible to keep the tuplestore size down and prevent
1673  * having to spill rows to disk.
1674  */
1675 void
1676 WinSetMarkPosition(WindowObject winobj, int64 markpos)
1677 {
1678         WindowAggState *winstate;
1679
1680         Assert(WindowObjectIsValid(winobj));
1681         winstate = winobj->winstate;
1682
1683         if (markpos < winobj->markpos)
1684                 elog(ERROR, "cannot move WindowObject's mark position backward");
1685         tuplestore_select_read_pointer(winstate->buffer, winobj->markptr);
1686         while (markpos > winobj->markpos)
1687         {
1688                 tuplestore_advance(winstate->buffer, true);
1689                 winobj->markpos++;
1690         }
1691         tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
1692         while (markpos > winobj->seekpos)
1693         {
1694                 tuplestore_advance(winstate->buffer, true);
1695                 winobj->seekpos++;
1696         }
1697 }
1698
1699 /*
1700  * WinRowsArePeers
1701  *              Compare two rows (specified by absolute position in window) to see
1702  *              if they are equal according to the ORDER BY clause.
1703  *
1704  * NB: this does not consider the window frame mode.
1705  */
1706 bool
1707 WinRowsArePeers(WindowObject winobj, int64 pos1, int64 pos2)
1708 {
1709         WindowAggState *winstate;
1710         WindowAgg          *node;
1711         TupleTableSlot *slot1;
1712         TupleTableSlot *slot2;
1713         bool                    res;
1714
1715         Assert(WindowObjectIsValid(winobj));
1716         winstate = winobj->winstate;
1717         node = (WindowAgg *) winstate->ss.ps.plan;
1718
1719         /* If no ORDER BY, all rows are peers; don't bother to fetch them */
1720         if (node->ordNumCols == 0)
1721                 return true;
1722
1723         slot1 = winstate->temp_slot_1;
1724         slot2 = winstate->temp_slot_2;
1725
1726         if (!window_gettupleslot(winobj, pos1, slot1))
1727                 elog(ERROR, "specified position is out of window: " INT64_FORMAT,
1728                          pos1);
1729         if (!window_gettupleslot(winobj, pos2, slot2))
1730                 elog(ERROR, "specified position is out of window: " INT64_FORMAT,
1731                          pos2);
1732
1733         res = are_peers(winstate, slot1, slot2);
1734
1735         ExecClearTuple(slot1);
1736         ExecClearTuple(slot2);
1737
1738         return res;
1739 }
1740
1741 /*
1742  * WinGetFuncArgInPartition
1743  *              Evaluate a window function's argument expression on a specified
1744  *              row of the partition.  The row is identified in lseek(2) style,
1745  *              i.e. relative to the current, first, or last row.
1746  *
1747  * argno: argument number to evaluate (counted from 0)
1748  * relpos: signed rowcount offset from the seek position
1749  * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
1750  * set_mark: If the row is found and set_mark is true, the mark is moved to
1751  *              the row as a side-effect.
1752  * isnull: output argument, receives isnull status of result
1753  * isout: output argument, set to indicate whether target row position
1754  *              is out of partition (can pass NULL if caller doesn't care about this)
1755  *
1756  * Specifying a nonexistent row is not an error, it just causes a null result
1757  * (plus setting *isout true, if isout isn't NULL).
1758  */
1759 Datum
1760 WinGetFuncArgInPartition(WindowObject winobj, int argno,
1761                                                  int relpos, int seektype, bool set_mark,
1762                                                  bool *isnull, bool *isout)
1763 {
1764         WindowAggState *winstate;
1765         ExprContext *econtext;
1766         TupleTableSlot *slot;
1767         bool            gottuple;
1768         int64           abs_pos;
1769
1770         Assert(WindowObjectIsValid(winobj));
1771         winstate = winobj->winstate;
1772         econtext = winstate->ss.ps.ps_ExprContext;
1773         slot = winstate->temp_slot_1;
1774
1775         switch (seektype)
1776         {
1777                 case WINDOW_SEEK_CURRENT:
1778                         abs_pos = winstate->currentpos + relpos;
1779                         break;
1780                 case WINDOW_SEEK_HEAD:
1781                         abs_pos = relpos;
1782                         break;
1783                 case WINDOW_SEEK_TAIL:
1784                         spool_tuples(winstate, -1);
1785                         abs_pos = winstate->spooled_rows - 1 + relpos;
1786                         break;
1787                 default:
1788                         elog(ERROR, "unrecognized window seek type: %d", seektype);
1789                         abs_pos = 0; /* keep compiler quiet */
1790                         break;
1791         }
1792
1793         gottuple = window_gettupleslot(winobj, abs_pos, slot);
1794
1795         if (!gottuple)
1796         {
1797                 if (isout)
1798                         *isout = true;
1799                 *isnull = true;
1800                 return (Datum) 0;
1801         }
1802         else
1803         {
1804                 if (isout)
1805                         *isout = false;
1806                 if (set_mark)
1807                         WinSetMarkPosition(winobj, abs_pos);
1808                 econtext->ecxt_outertuple = slot;
1809                 return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
1810                                                         econtext, isnull, NULL);
1811         }
1812 }
1813
1814 /*
1815  * WinGetFuncArgInFrame
1816  *              Evaluate a window function's argument expression on a specified
1817  *              row of the window frame.  The row is identified in lseek(2) style,
1818  *              i.e. relative to the current, first, or last row.
1819  *
1820  * argno: argument number to evaluate (counted from 0)
1821  * relpos: signed rowcount offset from the seek position
1822  * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
1823  * set_mark: If the row is found and set_mark is true, the mark is moved to
1824  *              the row as a side-effect.
1825  * isnull: output argument, receives isnull status of result
1826  * isout: output argument, set to indicate whether target row position
1827  *              is out of frame (can pass NULL if caller doesn't care about this)
1828  *
1829  * Specifying a nonexistent row is not an error, it just causes a null result
1830  * (plus setting *isout true, if isout isn't NULL).
1831  */
1832 Datum
1833 WinGetFuncArgInFrame(WindowObject winobj, int argno,
1834                                          int relpos, int seektype, bool set_mark,
1835                                          bool *isnull, bool *isout)
1836 {
1837         WindowAggState *winstate;
1838         ExprContext *econtext;
1839         TupleTableSlot *slot;
1840         bool            gottuple;
1841         int64           abs_pos;
1842
1843         Assert(WindowObjectIsValid(winobj));
1844         winstate = winobj->winstate;
1845         econtext = winstate->ss.ps.ps_ExprContext;
1846         slot = winstate->temp_slot_1;
1847
1848         switch (seektype)
1849         {
1850                 case WINDOW_SEEK_CURRENT:
1851                         abs_pos = winstate->currentpos + relpos;
1852                         break;
1853                 case WINDOW_SEEK_HEAD:
1854                         abs_pos = relpos;
1855                         break;
1856                 case WINDOW_SEEK_TAIL:
1857                         update_frametailpos(winobj, slot);
1858                         abs_pos = winstate->frametailpos + relpos;
1859                         break;
1860                 default:
1861                         elog(ERROR, "unrecognized window seek type: %d", seektype);
1862                         abs_pos = 0; /* keep compiler quiet */
1863                         break;
1864         }
1865
1866         gottuple = window_gettupleslot(winobj, abs_pos, slot);
1867         if (gottuple)
1868                 gottuple = row_is_in_frame(winstate, abs_pos, slot);
1869
1870         if (!gottuple)
1871         {
1872                 if (isout)
1873                         *isout = true;
1874                 *isnull = true;
1875                 return (Datum) 0;
1876         }
1877         else
1878         {
1879                 if (isout)
1880                         *isout = false;
1881                 if (set_mark)
1882                         WinSetMarkPosition(winobj, abs_pos);
1883                 econtext->ecxt_outertuple = slot;
1884                 return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
1885                                                         econtext, isnull, NULL);
1886         }
1887 }
1888
1889 /*
1890  * WinGetFuncArgCurrent
1891  *              Evaluate a window function's argument expression on the current row.
1892  *
1893  * argno: argument number to evaluate (counted from 0)
1894  * isnull: output argument, receives isnull status of result
1895  *
1896  * Note: this isn't quite equivalent to WinGetFuncArgInPartition or
1897  * WinGetFuncArgInFrame targeting the current row, because it will succeed
1898  * even if the WindowObject's mark has been set beyond the current row.
1899  * This should generally be used for "ordinary" arguments of a window
1900  * function, such as the offset argument of lead() or lag().
1901  */
1902 Datum
1903 WinGetFuncArgCurrent(WindowObject winobj, int argno, bool *isnull)
1904 {
1905         WindowAggState *winstate;
1906         ExprContext *econtext;
1907
1908         Assert(WindowObjectIsValid(winobj));
1909         winstate = winobj->winstate;
1910
1911         econtext = winstate->ss.ps.ps_ExprContext;
1912
1913         econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
1914         return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
1915                                                 econtext, isnull, NULL);
1916 }