OSDN Git Service

Fix possible failures when a tuplestore switches from in-memory to on-disk
[pg-rex/syncrep.git] / src / backend / executor / nodeWindowAgg.c
1 /*-------------------------------------------------------------------------
2  *
3  * nodeWindowAgg.c
4  *        routines to handle WindowAgg nodes.
5  *
6  * A WindowAgg node evaluates "window functions" across suitable partitions
7  * of the input tuple set.  Any one WindowAgg works for just a single window
8  * specification, though it can evaluate multiple window functions sharing
9  * identical window specifications.  The input tuples are required to be
10  * delivered in sorted order, with the PARTITION BY columns (if any) as
11  * major sort keys and the ORDER BY columns (if any) as minor sort keys.
12  * (The planner generates a stack of WindowAggs with intervening Sort nodes
13  * as needed, if a query involves more than one window specification.)
14  *
15  * Since window functions can require access to any or all of the rows in
16  * the current partition, we accumulate rows of the partition into a
17  * tuplestore.  The window functions are called using the WindowObject API
18  * so that they can access those rows as needed.
19  *
20  * We also support using plain aggregate functions as window functions.
21  * For these, the regular Agg-node environment is emulated for each partition.
22  * As required by the SQL spec, the output represents the value of the
23  * aggregate function over all rows in the current row's window frame.
24  *
25  *
26  * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
27  * Portions Copyright (c) 1994, Regents of the University of California
28  *
29  * IDENTIFICATION
30  *        $PostgreSQL: pgsql/src/backend/executor/nodeWindowAgg.c,v 1.4 2009/03/27 18:30:21 tgl Exp $
31  *
32  *-------------------------------------------------------------------------
33  */
34 #include "postgres.h"
35
36 #include "catalog/pg_aggregate.h"
37 #include "catalog/pg_proc.h"
38 #include "catalog/pg_type.h"
39 #include "executor/executor.h"
40 #include "executor/nodeWindowAgg.h"
41 #include "miscadmin.h"
42 #include "nodes/nodeFuncs.h"
43 #include "optimizer/clauses.h"
44 #include "parser/parse_agg.h"
45 #include "parser/parse_coerce.h"
46 #include "utils/acl.h"
47 #include "utils/builtins.h"
48 #include "utils/datum.h"
49 #include "utils/lsyscache.h"
50 #include "utils/memutils.h"
51 #include "utils/syscache.h"
52 #include "windowapi.h"
53
54 /*
55  * All the window function APIs are called with this object, which is passed
56  * to window functions as fcinfo->context.
57  */
58 typedef struct WindowObjectData
59 {
60         NodeTag         type;
61         WindowAggState *winstate;       /* parent WindowAggState */
62         List       *argstates;          /* ExprState trees for fn's arguments */
63         void       *localmem;           /* WinGetPartitionLocalMemory's chunk */
64         int                     markptr;                /* tuplestore mark pointer for this fn */
65         int                     readptr;                /* tuplestore read pointer for this fn */
66         int64           markpos;                /* row that markptr is positioned on */
67         int64           seekpos;                /* row that readptr is positioned on */
68 } WindowObjectData;
69
70 /*
71  * We have one WindowStatePerFunc struct for each window function and
72  * window aggregate handled by this node.
73  */
74 typedef struct WindowStatePerFuncData
75 {
76         /* Links to WindowFunc expr and state nodes this working state is for */
77         WindowFuncExprState *wfuncstate;
78         WindowFunc         *wfunc;
79
80         int                     numArguments;   /* number of arguments */
81
82         FmgrInfo        flinfo;                 /* fmgr lookup data for window function */
83
84         /*
85          * We need the len and byval info for the result of each function
86          * in order to know how to copy/delete values.
87          */
88         int16           resulttypeLen;
89         bool            resulttypeByVal;
90
91         bool            plain_agg;              /* is it just a plain aggregate function? */
92         int                     aggno;                  /* if so, index of its PerAggData */
93
94         WindowObject    winobj;         /* object used in window function API */
95 } WindowStatePerFuncData;
96
97 /*
98  * For plain aggregate window functions, we also have one of these.
99  */
100 typedef struct WindowStatePerAggData
101 {
102         /* Oids of transfer functions */
103         Oid                     transfn_oid;
104         Oid                     finalfn_oid;    /* may be InvalidOid */
105
106         /*
107          * fmgr lookup data for transfer functions --- only valid when
108          * corresponding oid is not InvalidOid.  Note in particular that fn_strict
109          * flags are kept here.
110          */
111         FmgrInfo        transfn;
112         FmgrInfo        finalfn;
113
114         /*
115          * initial value from pg_aggregate entry
116          */
117         Datum           initValue;
118         bool            initValueIsNull;
119
120         /*
121          * cached value for current frame boundaries
122          */
123         Datum           resultValue;
124         bool            resultValueIsNull;
125
126         /*
127          * We need the len and byval info for the agg's input, result, and
128          * transition data types in order to know how to copy/delete values.
129          */
130         int16           inputtypeLen,
131                                 resulttypeLen,
132                                 transtypeLen;
133         bool            inputtypeByVal,
134                                 resulttypeByVal,
135                                 transtypeByVal;
136
137         int                     wfuncno;                /* index of associated PerFuncData */
138
139         /* Current transition value */
140         Datum           transValue;             /* current transition value */
141         bool            transValueIsNull;
142
143         bool            noTransValue;   /* true if transValue not set yet */
144 } WindowStatePerAggData;
145
146 static void initialize_windowaggregate(WindowAggState *winstate,
147                                                                            WindowStatePerFunc perfuncstate,
148                                                                            WindowStatePerAgg peraggstate);
149 static void advance_windowaggregate(WindowAggState *winstate,
150                                                                         WindowStatePerFunc perfuncstate,
151                                                                         WindowStatePerAgg peraggstate);
152 static void finalize_windowaggregate(WindowAggState *winstate,
153                                                                          WindowStatePerFunc perfuncstate,
154                                                                          WindowStatePerAgg peraggstate,
155                                                                          Datum *result, bool *isnull);
156
157 static void eval_windowaggregates(WindowAggState *winstate);
158 static void eval_windowfunction(WindowAggState *winstate,
159                                                                 WindowStatePerFunc perfuncstate,
160                                                                 Datum *result, bool *isnull);
161
162 static void begin_partition(WindowAggState *winstate);
163 static void spool_tuples(WindowAggState *winstate, int64 pos);
164 static void release_partition(WindowAggState *winstate);
165
166 static bool row_is_in_frame(WindowAggState *winstate, int64 pos,
167                                                         TupleTableSlot *slot);
168 static void update_frametailpos(WindowObject winobj, TupleTableSlot *slot);
169
170 static WindowStatePerAggData *initialize_peragg(WindowAggState *winstate,
171                                                                                                 WindowFunc *wfunc,
172                                                                                                 WindowStatePerAgg peraggstate);
173 static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
174
175 static bool are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
176                                           TupleTableSlot *slot2);
177 static bool window_gettupleslot(WindowObject winobj, int64 pos,
178                                                                 TupleTableSlot *slot);
179
180
181 /*
182  * initialize_windowaggregate
183  * parallel to initialize_aggregate in nodeAgg.c
184  */
185 static void
186 initialize_windowaggregate(WindowAggState *winstate,
187                                                    WindowStatePerFunc perfuncstate,
188                                                    WindowStatePerAgg peraggstate)
189 {
190         MemoryContext           oldContext;
191
192         if (peraggstate->initValueIsNull)
193                 peraggstate->transValue = peraggstate->initValue;
194         else
195         {
196                 oldContext = MemoryContextSwitchTo(winstate->wincontext);
197                 peraggstate->transValue = datumCopy(peraggstate->initValue,
198                                                                                         peraggstate->transtypeByVal,
199                                                                                         peraggstate->transtypeLen);
200                 MemoryContextSwitchTo(oldContext);
201         }
202         peraggstate->transValueIsNull = peraggstate->initValueIsNull;
203         peraggstate->noTransValue = peraggstate->initValueIsNull;
204         peraggstate->resultValueIsNull = true;
205 }
206
207 /*
208  * advance_windowaggregate
209  * parallel to advance_aggregate in nodeAgg.c
210  */
211 static void
212 advance_windowaggregate(WindowAggState *winstate,
213                                                 WindowStatePerFunc perfuncstate,
214                                                 WindowStatePerAgg peraggstate)
215 {
216         WindowFuncExprState        *wfuncstate = perfuncstate->wfuncstate;
217         int                                             numArguments = perfuncstate->numArguments;
218         FunctionCallInfoData    fcinfodata;
219         FunctionCallInfo                fcinfo = &fcinfodata;
220         Datum                                   newVal;
221         ListCell                           *arg;
222         int                                             i;
223         MemoryContext                   oldContext;
224         ExprContext *econtext = winstate->tmpcontext;
225
226         oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
227
228         /* We start from 1, since the 0th arg will be the transition value */
229         i = 1;
230         foreach(arg, wfuncstate->args)
231         {
232                 ExprState          *argstate = (ExprState *) lfirst(arg);
233
234                 fcinfo->arg[i] = ExecEvalExpr(argstate, econtext,
235                                                                           &fcinfo->argnull[i], NULL);
236                 i++;
237         }
238
239         if (peraggstate->transfn.fn_strict)
240         {
241                 /*
242                  * For a strict transfn, nothing happens when there's a NULL input; we
243                  * just keep the prior transValue.
244                  */
245                 for (i = 1; i <= numArguments; i++)
246                 {
247                         if (fcinfo->argnull[i])
248                         {
249                                 MemoryContextSwitchTo(oldContext);
250                                 return;
251                         }
252                 }
253                 if (peraggstate->noTransValue)
254                 {
255                         /*
256                          * transValue has not been initialized. This is the first non-NULL
257                          * input value. We use it as the initial value for transValue. (We
258                          * already checked that the agg's input type is binary-compatible
259                          * with its transtype, so straight copy here is OK.)
260                          *
261                          * We must copy the datum into wincontext if it is pass-by-ref. We
262                          * do not need to pfree the old transValue, since it's NULL.
263                          */
264                         MemoryContextSwitchTo(winstate->wincontext);
265                         peraggstate->transValue = datumCopy(fcinfo->arg[1],
266                                                                                          peraggstate->transtypeByVal,
267                                                                                          peraggstate->transtypeLen);
268                         peraggstate->transValueIsNull = false;
269                         peraggstate->noTransValue = false;
270                         MemoryContextSwitchTo(oldContext);
271                         return;
272                 }
273                 if (peraggstate->transValueIsNull)
274                 {
275                         /*
276                          * Don't call a strict function with NULL inputs.  Note it is
277                          * possible to get here despite the above tests, if the transfn is
278                          * strict *and* returned a NULL on a prior cycle. If that happens
279                          * we will propagate the NULL all the way to the end.
280                          */
281                         MemoryContextSwitchTo(oldContext);
282                         return;
283                 }
284         }
285
286         /*
287          * OK to call the transition function
288          */
289         InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn),
290                                                          numArguments + 1,
291                                                          (void *) winstate, NULL);
292         fcinfo->arg[0] = peraggstate->transValue;
293         fcinfo->argnull[0] = peraggstate->transValueIsNull;
294         newVal = FunctionCallInvoke(fcinfo);
295
296         /*
297          * If pass-by-ref datatype, must copy the new value into wincontext and
298          * pfree the prior transValue.  But if transfn returned a pointer to its
299          * first input, we don't need to do anything.
300          */
301         if (!peraggstate->transtypeByVal &&
302                 DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
303         {
304                 if (!fcinfo->isnull)
305                 {
306                         MemoryContextSwitchTo(winstate->wincontext);
307                         newVal = datumCopy(newVal,
308                                                            peraggstate->transtypeByVal,
309                                                            peraggstate->transtypeLen);
310                 }
311                 if (!peraggstate->transValueIsNull)
312                         pfree(DatumGetPointer(peraggstate->transValue));
313         }
314
315         MemoryContextSwitchTo(oldContext);
316         peraggstate->transValue = newVal;
317         peraggstate->transValueIsNull = fcinfo->isnull;
318 }
319
320 /*
321  * finalize_windowaggregate
322  * parallel to finalize_aggregate in nodeAgg.c
323  */
324 static void
325 finalize_windowaggregate(WindowAggState *winstate,
326                                                  WindowStatePerFunc perfuncstate,
327                                                  WindowStatePerAgg peraggstate,
328                                                  Datum *result, bool *isnull)
329 {
330         MemoryContext                   oldContext;
331
332         oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
333
334         /*
335          * Apply the agg's finalfn if one is provided, else return transValue.
336          */
337         if (OidIsValid(peraggstate->finalfn_oid))
338         {
339                 FunctionCallInfoData    fcinfo;
340
341                 InitFunctionCallInfoData(fcinfo, &(peraggstate->finalfn), 1,
342                                                                  (void *) winstate, NULL);
343                 fcinfo.arg[0] = peraggstate->transValue;
344                 fcinfo.argnull[0] = peraggstate->transValueIsNull;
345                 if (fcinfo.flinfo->fn_strict && peraggstate->transValueIsNull)
346                 {
347                         /* don't call a strict function with NULL inputs */
348                         *result = (Datum) 0;
349                         *isnull = true;
350                 }
351                 else
352                 {
353                         *result = FunctionCallInvoke(&fcinfo);
354                         *isnull = fcinfo.isnull;
355                 }
356         }
357         else
358         {
359                 *result = peraggstate->transValue;
360                 *isnull = peraggstate->transValueIsNull;
361         }
362
363         /*
364          * If result is pass-by-ref, make sure it is in the right context.
365          */
366         if (!peraggstate->resulttypeByVal && !*isnull &&
367                 !MemoryContextContains(CurrentMemoryContext,
368                                                            DatumGetPointer(*result)))
369                 *result = datumCopy(*result,
370                                                         peraggstate->resulttypeByVal,
371                                                         peraggstate->resulttypeLen);
372         MemoryContextSwitchTo(oldContext);
373 }
374
375 /*
376  * eval_windowaggregates
377  * evaluate plain aggregates being used as window functions
378  *
379  * Much of this is duplicated from nodeAgg.c.  But NOTE that we expect to be
380  * able to call aggregate final functions repeatedly after aggregating more
381  * data onto the same transition value.  This is not a behavior required by
382  * nodeAgg.c.
383  */
384 static void
385 eval_windowaggregates(WindowAggState *winstate)
386 {
387         WindowStatePerAgg   peraggstate;
388         int                                     wfuncno, numaggs;
389         int                                     i;
390         MemoryContext           oldContext;
391         ExprContext                *econtext;
392         TupleTableSlot     *agg_row_slot;
393
394         numaggs = winstate->numaggs;
395         if (numaggs == 0)
396                 return;                                 /* nothing to do */
397
398         /* final output execution is in ps_ExprContext */
399         econtext = winstate->ss.ps.ps_ExprContext;
400
401         /*
402          * Currently, we support only a subset of the SQL-standard window framing
403          * rules.  In all the supported cases, the window frame always consists
404          * of a contiguous group of rows extending forward from the start of the
405          * partition, and rows only enter the frame, never exit it, as the
406          * current row advances forward.  This makes it possible to use an
407          * incremental strategy for evaluating aggregates: we run the transition
408          * function for each row added to the frame, and run the final function
409          * whenever we need the current aggregate value.  This is considerably
410          * more efficient than the naive approach of re-running the entire
411          * aggregate calculation for each current row.  It does assume that the
412          * final function doesn't damage the running transition value.  (Some
413          * C-coded aggregates do that for efficiency's sake --- but they are
414          * supposed to do so only when their fcinfo->context is an AggState, not
415          * a WindowAggState.)
416          *
417          * In many common cases, multiple rows share the same frame and hence
418          * the same aggregate value. (In particular, if there's no ORDER BY in
419          * a RANGE window, then all rows are peers and so they all have window
420          * frame equal to the whole partition.)  We optimize such cases by
421          * calculating the aggregate value once when we reach the first row of a
422          * peer group, and then returning the saved value for all subsequent rows.
423          *
424          * 'aggregatedupto' keeps track of the first row that has not yet been
425          * accumulated into the aggregate transition values.  Whenever we start a
426          * new peer group, we accumulate forward to the end of the peer group.
427          *
428          * TODO: In the future, we should implement the full SQL-standard set
429          * of framing rules.  We could implement the other cases by recalculating
430          * the aggregates whenever a row exits the frame.  That would be pretty
431          * slow, though.  For aggregates like SUM and COUNT we could implement a
432          * "negative transition function" that would be called for each row as it
433          * exits the frame.  We'd have to think about avoiding recalculation of
434          * volatile arguments of aggregate functions, too.
435          */
436
437         /*
438          * If we've already aggregated up through current row, reuse the
439          * saved result values.  NOTE: this test works for the currently
440          * supported framing rules, but will need fixing when more are added.
441          */
442         if (winstate->aggregatedupto > winstate->currentpos)
443         {
444                 for (i = 0; i < numaggs; i++)
445                 {
446                         peraggstate = &winstate->peragg[i];
447                         wfuncno = peraggstate->wfuncno;
448                         econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue;
449                         econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull;
450                 }
451                 return;
452         }
453
454         /* Initialize aggregates on first call for partition */
455         if (winstate->currentpos == 0)
456         {
457                 for (i = 0; i < numaggs; i++)
458                 {
459                         peraggstate = &winstate->peragg[i];
460                         wfuncno = peraggstate->wfuncno;
461                         initialize_windowaggregate(winstate,
462                                                                            &winstate->perfunc[wfuncno],
463                                                                            peraggstate);
464                 }
465         }
466
467         /*
468          * Advance until we reach a row not in frame (or end of partition).
469          *
470          * Note the loop invariant: agg_row_slot is either empty or holds the
471          * row at position aggregatedupto.  The agg_ptr read pointer must always
472          * point to the next row to read into agg_row_slot.
473          */
474         agg_row_slot = winstate->agg_row_slot;
475         for (;;)
476         {
477                 /* Fetch next row if we didn't already */
478                 if (TupIsNull(agg_row_slot))
479                 {
480                         spool_tuples(winstate, winstate->aggregatedupto);
481                         tuplestore_select_read_pointer(winstate->buffer,
482                                                                                    winstate->agg_ptr);
483                         if (!tuplestore_gettupleslot(winstate->buffer, true, true,
484                                                                                  agg_row_slot))
485                                 break;                  /* must be end of partition */
486                 }
487
488                 /* Exit loop (for now) if not in frame */
489                 if (!row_is_in_frame(winstate, winstate->aggregatedupto, agg_row_slot))
490                         break;
491
492                 /* Set tuple context for evaluation of aggregate arguments */
493                 winstate->tmpcontext->ecxt_outertuple = agg_row_slot;
494
495                 /* Accumulate row into the aggregates */
496                 for (i = 0; i < numaggs; i++)
497                 {
498                         peraggstate = &winstate->peragg[i];
499                         wfuncno = peraggstate->wfuncno;
500                         advance_windowaggregate(winstate,
501                                                                         &winstate->perfunc[wfuncno],
502                                                                         peraggstate);
503                 }
504
505                 /* Reset per-input-tuple context after each tuple */
506                 ResetExprContext(winstate->tmpcontext);
507
508                 /* And advance the aggregated-row state */
509                 winstate->aggregatedupto++;
510                 ExecClearTuple(agg_row_slot);
511         }
512
513         /*
514          * finalize aggregates and fill result/isnull fields.
515          */
516         for (i = 0; i < numaggs; i++)
517         {
518                 Datum      *result;
519                 bool       *isnull;
520
521                 peraggstate = &winstate->peragg[i];
522                 wfuncno = peraggstate->wfuncno;
523                 result = &econtext->ecxt_aggvalues[wfuncno];
524                 isnull = &econtext->ecxt_aggnulls[wfuncno];
525                 finalize_windowaggregate(winstate,
526                                                                  &winstate->perfunc[wfuncno],
527                                                                  peraggstate,
528                                                                  result, isnull);
529
530                 /*
531                  * save the result in case next row shares the same frame.
532                  *
533                  * XXX in some framing modes, eg ROWS/END_CURRENT_ROW, we can know
534                  * in advance that the next row can't possibly share the same frame.
535                  * Is it worth detecting that and skipping this code?
536                  */
537                 if (!peraggstate->resulttypeByVal)
538                 {
539                         /*
540                          * clear old resultValue in order not to leak memory.  (Note:
541                          * the new result can't possibly be the same datum as old
542                          * resultValue, because we never passed it to the trans function.)
543                          */
544                         if (!peraggstate->resultValueIsNull)
545                                 pfree(DatumGetPointer(peraggstate->resultValue));
546
547                         /*
548                          * If pass-by-ref, copy it into our global context.
549                          */
550                         if (!*isnull)
551                         {
552                                 oldContext = MemoryContextSwitchTo(winstate->wincontext);
553                                 peraggstate->resultValue =
554                                         datumCopy(*result,
555                                                           peraggstate->resulttypeByVal,
556                                                           peraggstate->resulttypeLen);
557                                 MemoryContextSwitchTo(oldContext);
558                         }
559                 }
560                 else
561                 {
562                         peraggstate->resultValue = *result;
563                 }
564                 peraggstate->resultValueIsNull = *isnull;
565         }
566 }
567
568 /*
569  * eval_windowfunction
570  *
571  * Arguments of window functions are not evaluated here, because a window
572  * function can need random access to arbitrary rows in the partition.
573  * The window function uses the special WinGetFuncArgInPartition and
574  * WinGetFuncArgInFrame functions to evaluate the arguments for the rows
575  * it wants.
576  */
577 static void
578 eval_windowfunction(WindowAggState *winstate, WindowStatePerFunc perfuncstate,
579                                         Datum *result, bool *isnull)
580 {
581         FunctionCallInfoData fcinfo;
582         MemoryContext           oldContext;
583
584         oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
585
586         /*
587          * We don't pass any normal arguments to a window function, but we do
588          * pass it the number of arguments, in order to permit window function
589          * implementations to support varying numbers of arguments.  The real
590          * info goes through the WindowObject, which is passed via fcinfo->context.
591          */
592         InitFunctionCallInfoData(fcinfo, &(perfuncstate->flinfo),
593                                                          perfuncstate->numArguments,
594                                                          (void *) perfuncstate->winobj, NULL);
595         /* Just in case, make all the regular argument slots be null */
596         memset(fcinfo.argnull, true, perfuncstate->numArguments);
597
598         *result = FunctionCallInvoke(&fcinfo);
599         *isnull = fcinfo.isnull;
600
601         /*
602          * Make sure pass-by-ref data is allocated in the appropriate context.
603          * (We need this in case the function returns a pointer into some
604          * short-lived tuple, as is entirely possible.)
605          */
606         if (!perfuncstate->resulttypeByVal && !fcinfo.isnull &&
607                 !MemoryContextContains(CurrentMemoryContext,
608                                                            DatumGetPointer(*result)))
609                 *result = datumCopy(*result,
610                                                         perfuncstate->resulttypeByVal,
611                                                         perfuncstate->resulttypeLen);
612
613         MemoryContextSwitchTo(oldContext);
614 }
615
616 /*
617  * begin_partition
618  * Start buffering rows of the next partition.
619  */
620 static void
621 begin_partition(WindowAggState *winstate)
622 {
623         PlanState          *outerPlan = outerPlanState(winstate);
624         int                             numfuncs = winstate->numfuncs;
625         int                             i;
626
627         winstate->partition_spooled = false;
628         winstate->frametail_valid = false;
629         winstate->spooled_rows = 0;
630         winstate->currentpos = 0;
631         winstate->frametailpos = -1;
632         winstate->aggregatedupto = 0;
633         ExecClearTuple(winstate->agg_row_slot);
634
635         /*
636          * If this is the very first partition, we need to fetch the first
637          * input row to store in first_part_slot.
638          */
639         if (TupIsNull(winstate->first_part_slot))
640         {
641                 TupleTableSlot *outerslot = ExecProcNode(outerPlan);
642
643                 if (!TupIsNull(outerslot))
644                          ExecCopySlot(winstate->first_part_slot, outerslot);
645                 else
646                 {
647                         /* outer plan is empty, so we have nothing to do */
648                         winstate->partition_spooled = true;
649                         winstate->more_partitions = false;
650                         return;
651                 }
652         }
653
654         /* Create new tuplestore for this partition */
655         winstate->buffer = tuplestore_begin_heap(false, false, work_mem);
656
657         /*
658          * Set up read pointers for the tuplestore.  The current and agg pointers
659          * don't need BACKWARD capability, but the per-window-function read
660          * pointers do.
661          */
662         winstate->current_ptr = 0;      /* read pointer 0 is pre-allocated */
663
664         /* reset default REWIND capability bit for current ptr */
665         tuplestore_set_eflags(winstate->buffer, 0);
666
667         /* create a read pointer for aggregates, if needed */
668         if (winstate->numaggs > 0)
669                 winstate->agg_ptr = tuplestore_alloc_read_pointer(winstate->buffer, 0);
670
671         /* create mark and read pointers for each real window function */
672         for (i = 0; i < numfuncs; i++)
673         {
674                 WindowStatePerFunc      perfuncstate = &(winstate->perfunc[i]);
675
676                 if (!perfuncstate->plain_agg)
677                 {
678                         WindowObject    winobj = perfuncstate->winobj;
679
680                         winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer,
681                                                                                                                         0);
682                         winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
683                                                                                                                         EXEC_FLAG_BACKWARD);
684                         winobj->markpos = -1;
685                         winobj->seekpos = -1;
686                 }
687         }
688
689         /*
690          * Store the first tuple into the tuplestore (it's always available now;
691          * we either read it above, or saved it at the end of previous partition)
692          */
693         tuplestore_puttupleslot(winstate->buffer, winstate->first_part_slot);
694         winstate->spooled_rows++;
695 }
696
697 /*
698  * Read tuples from the outer node, up to position 'pos', and store them
699  * into the tuplestore. If pos is -1, reads the whole partition.
700  */
701 static void
702 spool_tuples(WindowAggState *winstate, int64 pos)
703 {
704         WindowAgg          *node = (WindowAgg *) winstate->ss.ps.plan;
705         PlanState          *outerPlan;
706         TupleTableSlot *outerslot;
707         MemoryContext oldcontext;
708
709         if (!winstate->buffer)
710                 return;                                 /* just a safety check */
711         if (winstate->partition_spooled)
712                 return;                                 /* whole partition done already */
713
714         /*
715          * If the tuplestore has spilled to disk, alternate reading and writing
716          * becomes quite expensive due to frequent buffer flushes.  It's cheaper
717          * to force the entire partition to get spooled in one go.
718          *
719          * XXX this is a horrid kluge --- it'd be better to fix the performance
720          * problem inside tuplestore.  FIXME
721          */
722         if (!tuplestore_in_memory(winstate->buffer))
723                 pos = -1;
724
725         outerPlan = outerPlanState(winstate);
726
727         /* Must be in query context to call outerplan or touch tuplestore */
728         oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
729
730         while (winstate->spooled_rows <= pos || pos == -1)
731         {
732                 outerslot = ExecProcNode(outerPlan);
733                 if (TupIsNull(outerslot))
734                 {
735                         /* reached the end of the last partition */
736                         winstate->partition_spooled = true;
737                         winstate->more_partitions = false;
738                         break;
739                 }
740
741                 if (node->partNumCols > 0)
742                 {
743                         /* Check if this tuple still belongs to the current partition */
744                         if (!execTuplesMatch(winstate->first_part_slot,
745                                                                  outerslot,
746                                                                  node->partNumCols, node->partColIdx,
747                                                                  winstate->partEqfunctions,
748                                                                  winstate->tmpcontext->ecxt_per_tuple_memory))
749                         {
750                                 /*
751                                  * end of partition; copy the tuple for the next cycle.
752                                  */
753                                 ExecCopySlot(winstate->first_part_slot, outerslot);
754                                 winstate->partition_spooled = true;
755                                 winstate->more_partitions = true;
756                                 break;
757                         }
758                 }
759
760                 /* Still in partition, so save it into the tuplestore */
761                 tuplestore_puttupleslot(winstate->buffer, outerslot);
762                 winstate->spooled_rows++;
763         }
764
765         MemoryContextSwitchTo(oldcontext);
766 }
767
768 /*
769  * release_partition
770  * clear information kept within a partition, including
771  * tuplestore and aggregate results.
772  */
773 static void
774 release_partition(WindowAggState *winstate)
775 {
776         int                                     i;
777
778         for (i = 0; i < winstate->numfuncs; i++)
779         {
780                 WindowStatePerFunc              perfuncstate = &(winstate->perfunc[i]);
781
782                 /* Release any partition-local state of this window function */
783                 if (perfuncstate->winobj)
784                         perfuncstate->winobj->localmem = NULL;
785         }
786
787         /*
788          * Release all partition-local memory (in particular, any partition-local
789          * state that we might have trashed our pointers to in the above loop, and
790          * any aggregate temp data).  We don't rely on retail pfree because some
791          * aggregates might have allocated data we don't have direct pointers to.
792          */
793         MemoryContextResetAndDeleteChildren(winstate->wincontext);
794
795         if (winstate->buffer)
796                 tuplestore_end(winstate->buffer);
797         winstate->buffer = NULL;
798         winstate->partition_spooled = false;
799 }
800
801 /*
802  * row_is_in_frame
803  * Determine whether a row is in the current row's window frame according
804  * to our window framing rule
805  *
806  * The caller must have already determined that the row is in the partition
807  * and fetched it into a slot.  This function just encapsulates the framing
808  * rules.
809  */
810 static bool
811 row_is_in_frame(WindowAggState *winstate, int64 pos, TupleTableSlot *slot)
812 {
813         WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
814         int                     frameOptions = node->frameOptions;
815
816         Assert(pos >= 0);                       /* else caller error */
817
818         /* We only support frame start mode UNBOUNDED PRECEDING for now */
819         Assert(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING);
820
821         /* In UNBOUNDED FOLLOWING mode, all partition rows are in frame */
822         if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING)
823                 return true;
824
825         /* Else frame tail mode must be CURRENT ROW */
826         Assert(frameOptions & FRAMEOPTION_END_CURRENT_ROW);
827
828         /* if row is current row or a predecessor, it must be in frame */
829         if (pos <= winstate->currentpos)
830                 return true;
831
832         /* In ROWS mode, *only* such rows are in frame */
833         if (frameOptions & FRAMEOPTION_ROWS)
834                 return false;
835
836         /* Else must be RANGE mode */
837         Assert(frameOptions & FRAMEOPTION_RANGE);
838
839         /* In frame iff it's a peer of current row */
840         return are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot);
841 }
842
843 /*
844  * update_frametailpos
845  * make frametailpos valid for the current row
846  *
847  * Uses the winobj's read pointer for any required fetches; the winobj's
848  * mark must not be past the currently known frame tail.  Also uses the
849  * specified slot for any required fetches.
850  */
851 static void
852 update_frametailpos(WindowObject winobj, TupleTableSlot *slot)
853 {
854         WindowAggState *winstate = winobj->winstate;
855         WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
856         int                     frameOptions = node->frameOptions;
857         int64           ftnext;
858
859         if (winstate->frametail_valid)
860                 return;                                 /* already known for current row */
861
862         /* We only support frame start mode UNBOUNDED PRECEDING for now */
863         Assert(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING);
864
865         /* In UNBOUNDED FOLLOWING mode, all partition rows are in frame */
866         if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING)
867         {
868                 spool_tuples(winstate, -1);
869                 winstate->frametailpos = winstate->spooled_rows - 1;
870                 winstate->frametail_valid = true;
871                 return;
872         }
873
874         /* Else frame tail mode must be CURRENT ROW */
875         Assert(frameOptions & FRAMEOPTION_END_CURRENT_ROW);
876
877         /* In ROWS mode, exactly the rows up to current are in frame */
878         if (frameOptions & FRAMEOPTION_ROWS)
879         {
880                 winstate->frametailpos = winstate->currentpos;
881                 winstate->frametail_valid = true;
882                 return;
883         }
884
885         /* Else must be RANGE mode */
886         Assert(frameOptions & FRAMEOPTION_RANGE);
887
888         /* If no ORDER BY, all rows are peers with each other */
889         if (node->ordNumCols == 0)
890         {
891                 spool_tuples(winstate, -1);
892                 winstate->frametailpos = winstate->spooled_rows - 1;
893                 winstate->frametail_valid = true;
894                 return;
895         }
896
897         /*
898          * Else we have to search for the first non-peer of the current row.
899          * We assume the current value of frametailpos is a lower bound on the
900          * possible frame tail location, ie, frame tail never goes backward, and
901          * that currentpos is also a lower bound, ie, current row is always in
902          * frame.
903          */
904         ftnext = Max(winstate->frametailpos, winstate->currentpos) + 1;
905         for (;;)
906         {
907                 if (!window_gettupleslot(winobj, ftnext, slot))
908                         break;                          /* end of partition */
909                 if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
910                         break;                          /* not peer of current row */
911                 ftnext++;
912         }
913         winstate->frametailpos = ftnext - 1;
914         winstate->frametail_valid = true;
915 }
916
917
918 /* -----------------
919  * ExecWindowAgg
920  *
921  *      ExecWindowAgg receives tuples from its outer subplan and
922  *      stores them into a tuplestore, then processes window functions.
923  *      This node doesn't reduce nor qualify any row so the number of
924  *      returned rows is exactly the same as its outer subplan's result
925  *      (ignoring the case of SRFs in the targetlist, that is).
926  * -----------------
927  */
928 TupleTableSlot *
929 ExecWindowAgg(WindowAggState *winstate)
930 {
931         TupleTableSlot *result;
932         ExprDoneCond    isDone;
933         ExprContext        *econtext;
934         int                             i;
935         int                             numfuncs;
936
937         if (winstate->all_done)
938                 return NULL;
939
940         /*
941          * Check to see if we're still projecting out tuples from a previous output
942          * tuple (because there is a function-returning-set in the projection
943          * expressions).  If so, try to project another one.
944          */
945         if (winstate->ss.ps.ps_TupFromTlist)
946         {
947                 TupleTableSlot *result;
948                 ExprDoneCond isDone;
949
950                 result = ExecProject(winstate->ss.ps.ps_ProjInfo, &isDone);
951                 if (isDone == ExprMultipleResult)
952                         return result;
953                 /* Done with that source tuple... */
954                 winstate->ss.ps.ps_TupFromTlist = false;
955         }
956
957 restart:
958         if (winstate->buffer == NULL)
959         {
960                 /* Initialize for first partition and set current row = 0 */
961                 begin_partition(winstate);
962                 /* If there are no input rows, we'll detect that and exit below */
963         }
964         else
965         {
966                 /* Advance current row within partition */
967                 winstate->currentpos++;
968                 /* This might mean that the frame tail moves, too */
969                 winstate->frametail_valid = false;
970         }
971
972         /*
973          * Spool all tuples up to and including the current row, if we haven't
974          * already
975          */
976         spool_tuples(winstate, winstate->currentpos);
977
978         /* Move to the next partition if we reached the end of this partition */
979         if (winstate->partition_spooled &&
980                 winstate->currentpos >= winstate->spooled_rows)
981         {
982                 release_partition(winstate);
983
984                 if (winstate->more_partitions)
985                 {
986                         begin_partition(winstate);
987                         Assert(winstate->spooled_rows > 0);
988                 }
989                 else
990                 {
991                         winstate->all_done = true;
992                         return NULL;
993                 }
994         }
995
996         /* final output execution is in ps_ExprContext */
997         econtext = winstate->ss.ps.ps_ExprContext;
998
999         /* Clear the per-output-tuple context for current row */
1000         ResetExprContext(econtext);
1001
1002         /*
1003          * Read the current row from the tuplestore, and save in ScanTupleSlot.
1004          * (We can't rely on the outerplan's output slot because we may have to
1005          * read beyond the current row.  Also, we have to actually copy the row
1006          * out of the tuplestore, since window function evaluation might cause
1007          * the tuplestore to dump its state to disk.)
1008          *
1009          * Current row must be in the tuplestore, since we spooled it above.
1010          */
1011         tuplestore_select_read_pointer(winstate->buffer, winstate->current_ptr);
1012         if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1013                                                                  winstate->ss.ss_ScanTupleSlot))
1014                 elog(ERROR, "unexpected end of tuplestore");
1015
1016         /*
1017          * Evaluate true window functions
1018          */
1019         numfuncs = winstate->numfuncs;
1020         for (i = 0; i < numfuncs; i++)
1021         {
1022                 WindowStatePerFunc      perfuncstate = &(winstate->perfunc[i]);
1023
1024                 if (perfuncstate->plain_agg)
1025                         continue;
1026                 eval_windowfunction(winstate, perfuncstate,
1027                                                         &(econtext->ecxt_aggvalues[perfuncstate->wfuncstate->wfuncno]),
1028                                                         &(econtext->ecxt_aggnulls[perfuncstate->wfuncstate->wfuncno]));
1029         }
1030
1031         /*
1032          * Evaluate aggregates
1033          */
1034         if (winstate->numaggs > 0)
1035                 eval_windowaggregates(winstate);
1036
1037         /*
1038          * Truncate any no-longer-needed rows from the tuplestore.
1039          */
1040         tuplestore_trim(winstate->buffer);
1041
1042         /*
1043          * Form and return a projection tuple using the windowfunc results
1044          * and the current row.  Setting ecxt_outertuple arranges that any
1045          * Vars will be evaluated with respect to that row.
1046          */
1047         econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
1048         result = ExecProject(winstate->ss.ps.ps_ProjInfo, &isDone);
1049
1050         if (isDone == ExprEndResult)
1051         {
1052                 /* SRF in tlist returned no rows, so advance to next input tuple */
1053                 goto restart;
1054         }
1055
1056         winstate->ss.ps.ps_TupFromTlist =
1057                 (isDone == ExprMultipleResult);
1058         return result;
1059 }
1060
1061 /* -----------------
1062  * ExecInitWindowAgg
1063  *
1064  *      Creates the run-time information for the WindowAgg node produced by the
1065  *      planner and initializes its outer subtree
1066  * -----------------
1067  */
1068 WindowAggState *
1069 ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
1070 {
1071         WindowAggState *winstate;
1072         Plan       *outerPlan;
1073         ExprContext *econtext;
1074         ExprContext *tmpcontext;
1075         WindowStatePerFunc  perfunc;
1076         WindowStatePerAgg   peragg;
1077         int                     numfuncs,
1078                                 wfuncno,
1079                                 numaggs,
1080                                 aggno;
1081         ListCell   *l;
1082
1083         /* check for unsupported flags */
1084         Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
1085
1086         /*
1087          * create state structure
1088          */
1089         winstate = makeNode(WindowAggState);
1090         winstate->ss.ps.plan = (Plan *) node;
1091         winstate->ss.ps.state = estate;
1092
1093         /*
1094          * Create expression contexts.  We need two, one for per-input-tuple
1095          * processing and one for per-output-tuple processing.  We cheat a little
1096          * by using ExecAssignExprContext() to build both.
1097          */
1098         ExecAssignExprContext(estate, &winstate->ss.ps);
1099         tmpcontext = winstate->ss.ps.ps_ExprContext;
1100         winstate->tmpcontext = tmpcontext;
1101         ExecAssignExprContext(estate, &winstate->ss.ps);
1102
1103         /* Create long-lived context for storage of aggregate transvalues etc */
1104         winstate->wincontext =
1105                 AllocSetContextCreate(CurrentMemoryContext,
1106                                                           "WindowAggContext",
1107                                                           ALLOCSET_DEFAULT_MINSIZE,
1108                                                           ALLOCSET_DEFAULT_INITSIZE,
1109                                                           ALLOCSET_DEFAULT_MAXSIZE);
1110
1111 #define WINDOWAGG_NSLOTS 6
1112
1113         /*
1114          * tuple table initialization
1115          */
1116         ExecInitScanTupleSlot(estate, &winstate->ss);
1117         ExecInitResultTupleSlot(estate, &winstate->ss.ps);
1118         winstate->first_part_slot = ExecInitExtraTupleSlot(estate);
1119         winstate->agg_row_slot = ExecInitExtraTupleSlot(estate);
1120         winstate->temp_slot_1 = ExecInitExtraTupleSlot(estate);
1121         winstate->temp_slot_2 = ExecInitExtraTupleSlot(estate);
1122
1123         winstate->ss.ps.targetlist = (List *)
1124                 ExecInitExpr((Expr *) node->plan.targetlist,
1125                                          (PlanState *) winstate);
1126
1127         /*
1128          * WindowAgg nodes never have quals, since they can only occur at the
1129          * logical top level of a query (ie, after any WHERE or HAVING filters)
1130          */
1131         Assert(node->plan.qual == NIL);
1132         winstate->ss.ps.qual = NIL;
1133
1134         /*
1135          * initialize child nodes
1136          */
1137         outerPlan = outerPlan(node);
1138         outerPlanState(winstate) = ExecInitNode(outerPlan, estate, eflags);
1139
1140         /*
1141          * initialize source tuple type (which is also the tuple type that we'll
1142          * store in the tuplestore and use in all our working slots).
1143          */
1144         ExecAssignScanTypeFromOuterPlan(&winstate->ss);
1145
1146         ExecSetSlotDescriptor(winstate->first_part_slot,
1147                                                   winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
1148         ExecSetSlotDescriptor(winstate->agg_row_slot,
1149                                                   winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
1150         ExecSetSlotDescriptor(winstate->temp_slot_1,
1151                                                   winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
1152         ExecSetSlotDescriptor(winstate->temp_slot_2,
1153                                                   winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
1154
1155         /*
1156          * Initialize result tuple type and projection info.
1157          */
1158         ExecAssignResultTypeFromTL(&winstate->ss.ps);
1159         ExecAssignProjectionInfo(&winstate->ss.ps, NULL);
1160
1161         winstate->ss.ps.ps_TupFromTlist = false;
1162
1163         /* Set up data for comparing tuples */
1164         if (node->partNumCols > 0)
1165                 winstate->partEqfunctions = execTuplesMatchPrepare(node->partNumCols,
1166                                                                                                                   node->partOperators);
1167         if (node->ordNumCols > 0)
1168                 winstate->ordEqfunctions = execTuplesMatchPrepare(node->ordNumCols,
1169                                                                                                                   node->ordOperators);
1170
1171         /*
1172          * WindowAgg nodes use aggvalues and aggnulls as well as Agg nodes.
1173          */
1174         numfuncs = winstate->numfuncs;
1175         numaggs = winstate->numaggs;
1176         econtext = winstate->ss.ps.ps_ExprContext;
1177         econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numfuncs);
1178         econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numfuncs);
1179
1180         /*
1181          * allocate per-wfunc/per-agg state information.
1182          */
1183         perfunc = (WindowStatePerFunc) palloc0(sizeof(WindowStatePerFuncData) * numfuncs);
1184         peragg = (WindowStatePerAgg) palloc0(sizeof(WindowStatePerAggData) * numaggs);
1185         winstate->perfunc = perfunc;
1186         winstate->peragg = peragg;
1187
1188         wfuncno = -1;
1189         aggno = -1;
1190         foreach(l, winstate->funcs)
1191         {
1192                 WindowFuncExprState        *wfuncstate = (WindowFuncExprState *) lfirst(l);
1193                 WindowFunc                         *wfunc = (WindowFunc *) wfuncstate->xprstate.expr;
1194                 WindowStatePerFunc perfuncstate;
1195                 AclResult       aclresult;
1196                 int                     i;
1197
1198                 if (wfunc->winref != node->winref)                      /* planner screwed up? */
1199                         elog(ERROR, "WindowFunc with winref %u assigned to WindowAgg with winref %u",
1200                                  wfunc->winref, node->winref);
1201
1202                 /* Look for a previous duplicate window function */
1203                 for (i = 0; i <= wfuncno; i++)
1204                 {
1205                         if (equal(wfunc, perfunc[i].wfunc) &&
1206                                 !contain_volatile_functions((Node *) wfunc))
1207                                 break;
1208                 }
1209                 if (i <= wfuncno)
1210                 {
1211                         /* Found a match to an existing entry, so just mark it */
1212                         wfuncstate->wfuncno = i;
1213                         continue;
1214                 }
1215
1216                 /* Nope, so assign a new PerAgg record */
1217                 perfuncstate = &perfunc[++wfuncno];
1218
1219                 /* Mark WindowFunc state node with assigned index in the result array */
1220                 wfuncstate->wfuncno = wfuncno;
1221
1222                 /* Check permission to call window function */
1223                 aclresult = pg_proc_aclcheck(wfunc->winfnoid, GetUserId(),
1224                                                                          ACL_EXECUTE);
1225                 if (aclresult != ACLCHECK_OK)
1226                         aclcheck_error(aclresult, ACL_KIND_PROC,
1227                                                    get_func_name(wfunc->winfnoid));
1228
1229                 /* Fill in the perfuncstate data */
1230                 perfuncstate->wfuncstate = wfuncstate;
1231                 perfuncstate->wfunc = wfunc;
1232                 perfuncstate->numArguments = list_length(wfuncstate->args);
1233
1234                 fmgr_info_cxt(wfunc->winfnoid, &perfuncstate->flinfo,
1235                                           tmpcontext->ecxt_per_query_memory);
1236                 perfuncstate->flinfo.fn_expr = (Node *) wfunc;
1237                 get_typlenbyval(wfunc->wintype,
1238                                                 &perfuncstate->resulttypeLen,
1239                                                 &perfuncstate->resulttypeByVal);
1240
1241                 /*
1242                  * If it's really just a plain aggregate function,
1243                  * we'll emulate the Agg environment for it.
1244                  */
1245                 perfuncstate->plain_agg = wfunc->winagg;
1246                 if (wfunc->winagg)
1247                 {
1248                         WindowStatePerAgg       peraggstate;
1249
1250                         perfuncstate->aggno = ++aggno;
1251                         peraggstate = &winstate->peragg[aggno];
1252                         initialize_peragg(winstate, wfunc, peraggstate);
1253                         peraggstate->wfuncno = wfuncno;
1254                 }
1255                 else
1256                 {
1257                         WindowObject winobj = makeNode(WindowObjectData);
1258
1259                         winobj->winstate = winstate;
1260                         winobj->argstates = wfuncstate->args;
1261                         winobj->localmem = NULL;
1262                         perfuncstate->winobj = winobj;
1263                 }
1264         }
1265
1266         /* Update numfuncs, numaggs to match number of unique functions found */
1267         winstate->numfuncs = wfuncno + 1;
1268         winstate->numaggs = aggno + 1;
1269
1270         winstate->partition_spooled = false;
1271         winstate->more_partitions = false;
1272
1273         return winstate;
1274 }
1275
1276 /* -----------------
1277  * ExecCountSlotsWindowAgg
1278  * -----------------
1279  */
1280 int
1281 ExecCountSlotsWindowAgg(WindowAgg *node)
1282 {
1283         return ExecCountSlotsNode(outerPlan(node)) +
1284                 ExecCountSlotsNode(innerPlan(node)) +
1285                 WINDOWAGG_NSLOTS;
1286 }
1287
1288 /* -----------------
1289  * ExecEndWindowAgg
1290  * -----------------
1291  */
1292 void
1293 ExecEndWindowAgg(WindowAggState *node)
1294 {
1295         PlanState  *outerPlan;
1296
1297         release_partition(node);
1298
1299         pfree(node->perfunc);
1300         pfree(node->peragg);
1301
1302         ExecClearTuple(node->ss.ss_ScanTupleSlot);
1303         ExecClearTuple(node->first_part_slot);
1304         ExecClearTuple(node->agg_row_slot);
1305         ExecClearTuple(node->temp_slot_1);
1306         ExecClearTuple(node->temp_slot_2);
1307
1308         /*
1309          * Free both the expr contexts.
1310          */
1311         ExecFreeExprContext(&node->ss.ps);
1312         node->ss.ps.ps_ExprContext = node->tmpcontext;
1313         ExecFreeExprContext(&node->ss.ps);
1314
1315         MemoryContextDelete(node->wincontext);
1316
1317         outerPlan = outerPlanState(node);
1318         ExecEndNode(outerPlan);
1319 }
1320
1321 /* -----------------
1322  * ExecRescanWindowAgg
1323  * -----------------
1324  */
1325 void
1326 ExecReScanWindowAgg(WindowAggState *node, ExprContext *exprCtxt)
1327 {
1328         ExprContext        *econtext = node->ss.ps.ps_ExprContext;
1329
1330         node->all_done = false;
1331
1332         node->ss.ps.ps_TupFromTlist = false;
1333
1334         /* release tuplestore et al */
1335         release_partition(node);
1336
1337         /* release all temp tuples, but especially first_part_slot */
1338         ExecClearTuple(node->ss.ss_ScanTupleSlot);
1339         ExecClearTuple(node->first_part_slot);
1340         ExecClearTuple(node->agg_row_slot);
1341         ExecClearTuple(node->temp_slot_1);
1342         ExecClearTuple(node->temp_slot_2);
1343
1344         /* Forget current wfunc values */
1345         MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numfuncs);
1346         MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numfuncs);
1347
1348         /*
1349          * if chgParam of subnode is not null then plan will be re-scanned by
1350          * first ExecProcNode.
1351          */
1352         if (((PlanState *) node)->lefttree->chgParam == NULL)
1353                 ExecReScan(((PlanState *) node)->lefttree, exprCtxt);
1354 }
1355
1356 /*
1357  * initialize_peragg
1358  *
1359  * Almost same as in nodeAgg.c, except we don't support DISTINCT currently.
1360  */
1361 static WindowStatePerAggData *
1362 initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
1363                                   WindowStatePerAgg peraggstate)
1364 {
1365         Oid                     inputTypes[FUNC_MAX_ARGS];
1366         int                     numArguments;
1367         HeapTuple       aggTuple;
1368         Form_pg_aggregate aggform;
1369         Oid                     aggtranstype;
1370         AclResult       aclresult;
1371         Oid                     transfn_oid,
1372                                 finalfn_oid;
1373         Expr       *transfnexpr,
1374                            *finalfnexpr;
1375         Datum           textInitVal;
1376         int                     i;
1377         ListCell   *lc;
1378
1379         numArguments = list_length(wfunc->args);
1380
1381         i = 0;
1382         foreach(lc, wfunc->args)
1383         {
1384                 inputTypes[i++] = exprType((Node *) lfirst(lc));
1385         }
1386
1387         aggTuple = SearchSysCache(AGGFNOID,
1388                                                           ObjectIdGetDatum(wfunc->winfnoid),
1389                                                           0, 0, 0);
1390         if (!HeapTupleIsValid(aggTuple))
1391                 elog(ERROR, "cache lookup failed for aggregate %u",
1392                          wfunc->winfnoid);
1393         aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
1394
1395         /*
1396          * ExecInitWindowAgg already checked permission to call aggregate function
1397          * ... but we still need to check the component functions
1398          */
1399
1400         peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn;
1401         peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
1402
1403         /* Check that aggregate owner has permission to call component fns */
1404         {
1405                 HeapTuple       procTuple;
1406                 Oid                     aggOwner;
1407
1408                 procTuple = SearchSysCache(PROCOID,
1409                                                                    ObjectIdGetDatum(wfunc->winfnoid),
1410                                                                    0, 0, 0);
1411                 if (!HeapTupleIsValid(procTuple))
1412                         elog(ERROR, "cache lookup failed for function %u",
1413                                  wfunc->winfnoid);
1414                 aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
1415                 ReleaseSysCache(procTuple);
1416
1417                 aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
1418                                                                          ACL_EXECUTE);
1419                 if (aclresult != ACLCHECK_OK)
1420                         aclcheck_error(aclresult, ACL_KIND_PROC,
1421                                                    get_func_name(transfn_oid));
1422                 if (OidIsValid(finalfn_oid))
1423                 {
1424                         aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
1425                                                                                  ACL_EXECUTE);
1426                         if (aclresult != ACLCHECK_OK)
1427                                 aclcheck_error(aclresult, ACL_KIND_PROC,
1428                                                            get_func_name(finalfn_oid));
1429                 }
1430         }
1431
1432         /* resolve actual type of transition state, if polymorphic */
1433         aggtranstype = aggform->aggtranstype;
1434         if (IsPolymorphicType(aggtranstype))
1435         {
1436                 /* have to fetch the agg's declared input types... */
1437                 Oid                *declaredArgTypes;
1438                 int                     agg_nargs;
1439
1440                 get_func_signature(wfunc->winfnoid,
1441                                                    &declaredArgTypes, &agg_nargs);
1442                 Assert(agg_nargs == numArguments);
1443                 aggtranstype = enforce_generic_type_consistency(inputTypes,
1444                                                                                                                 declaredArgTypes,
1445                                                                                                                 agg_nargs,
1446                                                                                                                 aggtranstype,
1447                                                                                                                 false);
1448                 pfree(declaredArgTypes);
1449         }
1450
1451         /* build expression trees using actual argument & result types */
1452         build_aggregate_fnexprs(inputTypes,
1453                                                         numArguments,
1454                                                         aggtranstype,
1455                                                         wfunc->wintype,
1456                                                         transfn_oid,
1457                                                         finalfn_oid,
1458                                                         &transfnexpr,
1459                                                         &finalfnexpr);
1460
1461         fmgr_info(transfn_oid, &peraggstate->transfn);
1462         peraggstate->transfn.fn_expr = (Node *) transfnexpr;
1463
1464         if (OidIsValid(finalfn_oid))
1465         {
1466                 fmgr_info(finalfn_oid, &peraggstate->finalfn);
1467                 peraggstate->finalfn.fn_expr = (Node *) finalfnexpr;
1468         }
1469
1470         get_typlenbyval(wfunc->wintype,
1471                                         &peraggstate->resulttypeLen,
1472                                         &peraggstate->resulttypeByVal);
1473         get_typlenbyval(aggtranstype,
1474                                         &peraggstate->transtypeLen,
1475                                         &peraggstate->transtypeByVal);
1476
1477         /*
1478          * initval is potentially null, so don't try to access it as a struct
1479          * field. Must do it the hard way with SysCacheGetAttr.
1480          */
1481         textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,
1482                                                                   Anum_pg_aggregate_agginitval,
1483                                                                   &peraggstate->initValueIsNull);
1484
1485         if (peraggstate->initValueIsNull)
1486                 peraggstate->initValue = (Datum) 0;
1487         else
1488                 peraggstate->initValue = GetAggInitVal(textInitVal,
1489                                                                                            aggtranstype);
1490
1491         /*
1492          * If the transfn is strict and the initval is NULL, make sure input
1493          * type and transtype are the same (or at least binary-compatible), so
1494          * that it's OK to use the first input value as the initial
1495          * transValue.  This should have been checked at agg definition time,
1496          * but just in case...
1497          */
1498         if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull)
1499         {
1500                 if (numArguments < 1 ||
1501                         !IsBinaryCoercible(inputTypes[0], aggtranstype))
1502                         ereport(ERROR,
1503                                         (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
1504                                          errmsg("aggregate %u needs to have compatible input type and transition type",
1505                                                         wfunc->winfnoid)));
1506         }
1507
1508         ReleaseSysCache(aggTuple);
1509
1510         return peraggstate;
1511 }
1512
1513 static Datum
1514 GetAggInitVal(Datum textInitVal, Oid transtype)
1515 {
1516         Oid                     typinput,
1517                                 typioparam;
1518         char       *strInitVal;
1519         Datum           initVal;
1520
1521         getTypeInputInfo(transtype, &typinput, &typioparam);
1522         strInitVal = TextDatumGetCString(textInitVal);
1523         initVal = OidInputFunctionCall(typinput, strInitVal,
1524                                                                    typioparam, -1);
1525         pfree(strInitVal);
1526         return initVal;
1527 }
1528
1529 /*
1530  * are_peers
1531  * compare two rows to see if they are equal according to the ORDER BY clause
1532  *
1533  * NB: this does not consider the window frame mode.
1534  */
1535 static bool
1536 are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
1537                   TupleTableSlot *slot2)
1538 {
1539         WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
1540
1541         /* If no ORDER BY, all rows are peers with each other */
1542         if (node->ordNumCols == 0)
1543                 return true;
1544
1545         return execTuplesMatch(slot1, slot2,
1546                                                    node->ordNumCols, node->ordColIdx,
1547                                                    winstate->ordEqfunctions,
1548                                                    winstate->tmpcontext->ecxt_per_tuple_memory);
1549 }
1550
1551 /*
1552  * window_gettupleslot
1553  *      Fetch the pos'th tuple of the current partition into the slot,
1554  *      using the winobj's read pointer
1555  *
1556  * Returns true if successful, false if no such row
1557  */
1558 static bool
1559 window_gettupleslot(WindowObject winobj, int64 pos, TupleTableSlot *slot)
1560 {
1561         WindowAggState *winstate = winobj->winstate;
1562         MemoryContext oldcontext;
1563
1564         /* Don't allow passing -1 to spool_tuples here */
1565         if (pos < 0)
1566                 return false;
1567
1568         /* If necessary, fetch the tuple into the spool */
1569         spool_tuples(winstate, pos);
1570
1571         if (pos >= winstate->spooled_rows)
1572                 return false;
1573
1574         if (pos < winobj->markpos)
1575                 elog(ERROR, "cannot fetch row before WindowObject's mark position");
1576
1577         oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
1578
1579         tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
1580
1581         /*
1582          * There's no API to refetch the tuple at the current position. We
1583          * have to move one tuple forward, and then one backward.  (We don't
1584          * do it the other way because we might try to fetch the row before
1585          * our mark, which isn't allowed.)
1586          */
1587         if (winobj->seekpos == pos)
1588         {
1589                 tuplestore_advance(winstate->buffer, true);
1590                 winobj->seekpos++;
1591         }
1592
1593         while (winobj->seekpos > pos)
1594         {
1595                 if (!tuplestore_gettupleslot(winstate->buffer, false, true, slot))
1596                         elog(ERROR, "unexpected end of tuplestore");
1597                 winobj->seekpos--;
1598         }
1599
1600         while (winobj->seekpos < pos)
1601         {
1602                 if (!tuplestore_gettupleslot(winstate->buffer, true, true, slot))
1603                         elog(ERROR, "unexpected end of tuplestore");
1604                 winobj->seekpos++;
1605         }
1606
1607         MemoryContextSwitchTo(oldcontext);
1608
1609         return true;
1610 }
1611
1612
1613 /***********************************************************************
1614  * API exposed to window functions
1615  ***********************************************************************/
1616
1617
1618 /*
1619  * WinGetPartitionLocalMemory
1620  *              Get working memory that lives till end of partition processing
1621  *
1622  * On first call within a given partition, this allocates and zeroes the
1623  * requested amount of space.  Subsequent calls just return the same chunk.
1624  *
1625  * Memory obtained this way is normally used to hold state that should be
1626  * automatically reset for each new partition.  If a window function wants
1627  * to hold state across the whole query, fcinfo->fn_extra can be used in the
1628  * usual way for that.
1629  */
1630 void *
1631 WinGetPartitionLocalMemory(WindowObject winobj, Size sz)
1632 {
1633         Assert(WindowObjectIsValid(winobj));
1634         if (winobj->localmem == NULL)
1635                 winobj->localmem = MemoryContextAllocZero(winobj->winstate->wincontext,
1636                                                                                                   sz);
1637         return winobj->localmem;
1638 }
1639
1640 /*
1641  * WinGetCurrentPosition
1642  *              Return the current row's position (counting from 0) within the current
1643  *              partition.
1644  */
1645 int64
1646 WinGetCurrentPosition(WindowObject winobj)
1647 {
1648         Assert(WindowObjectIsValid(winobj));
1649         return winobj->winstate->currentpos;
1650 }
1651
1652 /*
1653  * WinGetPartitionRowCount
1654  *              Return total number of rows contained in the current partition.
1655  *
1656  * Note: this is a relatively expensive operation because it forces the
1657  * whole partition to be "spooled" into the tuplestore at once.  Once
1658  * executed, however, additional calls within the same partition are cheap.
1659  */
1660 int64
1661 WinGetPartitionRowCount(WindowObject winobj)
1662 {
1663         Assert(WindowObjectIsValid(winobj));
1664         spool_tuples(winobj->winstate, -1);
1665         return winobj->winstate->spooled_rows;
1666 }
1667
1668 /*
1669  * WinSetMarkPosition
1670  *              Set the "mark" position for the window object, which is the oldest row
1671  *              number (counting from 0) it is allowed to fetch during all subsequent
1672  *              operations within the current partition.
1673  *
1674  * Window functions do not have to call this, but are encouraged to move the
1675  * mark forward when possible to keep the tuplestore size down and prevent
1676  * having to spill rows to disk.
1677  */
1678 void
1679 WinSetMarkPosition(WindowObject winobj, int64 markpos)
1680 {
1681         WindowAggState *winstate;
1682
1683         Assert(WindowObjectIsValid(winobj));
1684         winstate = winobj->winstate;
1685
1686         if (markpos < winobj->markpos)
1687                 elog(ERROR, "cannot move WindowObject's mark position backward");
1688         tuplestore_select_read_pointer(winstate->buffer, winobj->markptr);
1689         while (markpos > winobj->markpos)
1690         {
1691                 tuplestore_advance(winstate->buffer, true);
1692                 winobj->markpos++;
1693         }
1694         tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
1695         while (markpos > winobj->seekpos)
1696         {
1697                 tuplestore_advance(winstate->buffer, true);
1698                 winobj->seekpos++;
1699         }
1700 }
1701
1702 /*
1703  * WinRowsArePeers
1704  *              Compare two rows (specified by absolute position in window) to see
1705  *              if they are equal according to the ORDER BY clause.
1706  *
1707  * NB: this does not consider the window frame mode.
1708  */
1709 bool
1710 WinRowsArePeers(WindowObject winobj, int64 pos1, int64 pos2)
1711 {
1712         WindowAggState *winstate;
1713         WindowAgg          *node;
1714         TupleTableSlot *slot1;
1715         TupleTableSlot *slot2;
1716         bool                    res;
1717
1718         Assert(WindowObjectIsValid(winobj));
1719         winstate = winobj->winstate;
1720         node = (WindowAgg *) winstate->ss.ps.plan;
1721
1722         /* If no ORDER BY, all rows are peers; don't bother to fetch them */
1723         if (node->ordNumCols == 0)
1724                 return true;
1725
1726         slot1 = winstate->temp_slot_1;
1727         slot2 = winstate->temp_slot_2;
1728
1729         if (!window_gettupleslot(winobj, pos1, slot1))
1730                 elog(ERROR, "specified position is out of window: " INT64_FORMAT,
1731                          pos1);
1732         if (!window_gettupleslot(winobj, pos2, slot2))
1733                 elog(ERROR, "specified position is out of window: " INT64_FORMAT,
1734                          pos2);
1735
1736         res = are_peers(winstate, slot1, slot2);
1737
1738         ExecClearTuple(slot1);
1739         ExecClearTuple(slot2);
1740
1741         return res;
1742 }
1743
1744 /*
1745  * WinGetFuncArgInPartition
1746  *              Evaluate a window function's argument expression on a specified
1747  *              row of the partition.  The row is identified in lseek(2) style,
1748  *              i.e. relative to the current, first, or last row.
1749  *
1750  * argno: argument number to evaluate (counted from 0)
1751  * relpos: signed rowcount offset from the seek position
1752  * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
1753  * set_mark: If the row is found and set_mark is true, the mark is moved to
1754  *              the row as a side-effect.
1755  * isnull: output argument, receives isnull status of result
1756  * isout: output argument, set to indicate whether target row position
1757  *              is out of partition (can pass NULL if caller doesn't care about this)
1758  *
1759  * Specifying a nonexistent row is not an error, it just causes a null result
1760  * (plus setting *isout true, if isout isn't NULL).
1761  */
1762 Datum
1763 WinGetFuncArgInPartition(WindowObject winobj, int argno,
1764                                                  int relpos, int seektype, bool set_mark,
1765                                                  bool *isnull, bool *isout)
1766 {
1767         WindowAggState *winstate;
1768         ExprContext *econtext;
1769         TupleTableSlot *slot;
1770         bool            gottuple;
1771         int64           abs_pos;
1772
1773         Assert(WindowObjectIsValid(winobj));
1774         winstate = winobj->winstate;
1775         econtext = winstate->ss.ps.ps_ExprContext;
1776         slot = winstate->temp_slot_1;
1777
1778         switch (seektype)
1779         {
1780                 case WINDOW_SEEK_CURRENT:
1781                         abs_pos = winstate->currentpos + relpos;
1782                         break;
1783                 case WINDOW_SEEK_HEAD:
1784                         abs_pos = relpos;
1785                         break;
1786                 case WINDOW_SEEK_TAIL:
1787                         spool_tuples(winstate, -1);
1788                         abs_pos = winstate->spooled_rows - 1 + relpos;
1789                         break;
1790                 default:
1791                         elog(ERROR, "unrecognized window seek type: %d", seektype);
1792                         abs_pos = 0; /* keep compiler quiet */
1793                         break;
1794         }
1795
1796         gottuple = window_gettupleslot(winobj, abs_pos, slot);
1797
1798         if (!gottuple)
1799         {
1800                 if (isout)
1801                         *isout = true;
1802                 *isnull = true;
1803                 return (Datum) 0;
1804         }
1805         else
1806         {
1807                 if (isout)
1808                         *isout = false;
1809                 if (set_mark)
1810                         WinSetMarkPosition(winobj, abs_pos);
1811                 econtext->ecxt_outertuple = slot;
1812                 return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
1813                                                         econtext, isnull, NULL);
1814         }
1815 }
1816
1817 /*
1818  * WinGetFuncArgInFrame
1819  *              Evaluate a window function's argument expression on a specified
1820  *              row of the window frame.  The row is identified in lseek(2) style,
1821  *              i.e. relative to the current, first, or last row.
1822  *
1823  * argno: argument number to evaluate (counted from 0)
1824  * relpos: signed rowcount offset from the seek position
1825  * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
1826  * set_mark: If the row is found and set_mark is true, the mark is moved to
1827  *              the row as a side-effect.
1828  * isnull: output argument, receives isnull status of result
1829  * isout: output argument, set to indicate whether target row position
1830  *              is out of frame (can pass NULL if caller doesn't care about this)
1831  *
1832  * Specifying a nonexistent row is not an error, it just causes a null result
1833  * (plus setting *isout true, if isout isn't NULL).
1834  */
1835 Datum
1836 WinGetFuncArgInFrame(WindowObject winobj, int argno,
1837                                          int relpos, int seektype, bool set_mark,
1838                                          bool *isnull, bool *isout)
1839 {
1840         WindowAggState *winstate;
1841         ExprContext *econtext;
1842         TupleTableSlot *slot;
1843         bool            gottuple;
1844         int64           abs_pos;
1845
1846         Assert(WindowObjectIsValid(winobj));
1847         winstate = winobj->winstate;
1848         econtext = winstate->ss.ps.ps_ExprContext;
1849         slot = winstate->temp_slot_1;
1850
1851         switch (seektype)
1852         {
1853                 case WINDOW_SEEK_CURRENT:
1854                         abs_pos = winstate->currentpos + relpos;
1855                         break;
1856                 case WINDOW_SEEK_HEAD:
1857                         abs_pos = relpos;
1858                         break;
1859                 case WINDOW_SEEK_TAIL:
1860                         update_frametailpos(winobj, slot);
1861                         abs_pos = winstate->frametailpos + relpos;
1862                         break;
1863                 default:
1864                         elog(ERROR, "unrecognized window seek type: %d", seektype);
1865                         abs_pos = 0; /* keep compiler quiet */
1866                         break;
1867         }
1868
1869         gottuple = window_gettupleslot(winobj, abs_pos, slot);
1870         if (gottuple)
1871                 gottuple = row_is_in_frame(winstate, abs_pos, slot);
1872
1873         if (!gottuple)
1874         {
1875                 if (isout)
1876                         *isout = true;
1877                 *isnull = true;
1878                 return (Datum) 0;
1879         }
1880         else
1881         {
1882                 if (isout)
1883                         *isout = false;
1884                 if (set_mark)
1885                         WinSetMarkPosition(winobj, abs_pos);
1886                 econtext->ecxt_outertuple = slot;
1887                 return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
1888                                                         econtext, isnull, NULL);
1889         }
1890 }
1891
1892 /*
1893  * WinGetFuncArgCurrent
1894  *              Evaluate a window function's argument expression on the current row.
1895  *
1896  * argno: argument number to evaluate (counted from 0)
1897  * isnull: output argument, receives isnull status of result
1898  *
1899  * Note: this isn't quite equivalent to WinGetFuncArgInPartition or
1900  * WinGetFuncArgInFrame targeting the current row, because it will succeed
1901  * even if the WindowObject's mark has been set beyond the current row.
1902  * This should generally be used for "ordinary" arguments of a window
1903  * function, such as the offset argument of lead() or lag().
1904  */
1905 Datum
1906 WinGetFuncArgCurrent(WindowObject winobj, int argno, bool *isnull)
1907 {
1908         WindowAggState *winstate;
1909         ExprContext *econtext;
1910
1911         Assert(WindowObjectIsValid(winobj));
1912         winstate = winobj->winstate;
1913
1914         econtext = winstate->ss.ps.ps_ExprContext;
1915
1916         econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
1917         return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
1918                                                 econtext, isnull, NULL);
1919 }