OSDN Git Service

Remove no-longer-needed ExecCountSlots infrastructure.
[pg-rex/syncrep.git] / src / backend / executor / nodeWindowAgg.c
1 /*-------------------------------------------------------------------------
2  *
3  * nodeWindowAgg.c
4  *        routines to handle WindowAgg nodes.
5  *
6  * A WindowAgg node evaluates "window functions" across suitable partitions
7  * of the input tuple set.      Any one WindowAgg works for just a single window
8  * specification, though it can evaluate multiple window functions sharing
9  * identical window specifications.  The input tuples are required to be
10  * delivered in sorted order, with the PARTITION BY columns (if any) as
11  * major sort keys and the ORDER BY columns (if any) as minor sort keys.
12  * (The planner generates a stack of WindowAggs with intervening Sort nodes
13  * as needed, if a query involves more than one window specification.)
14  *
15  * Since window functions can require access to any or all of the rows in
16  * the current partition, we accumulate rows of the partition into a
17  * tuplestore.  The window functions are called using the WindowObject API
18  * so that they can access those rows as needed.
19  *
20  * We also support using plain aggregate functions as window functions.
21  * For these, the regular Agg-node environment is emulated for each partition.
22  * As required by the SQL spec, the output represents the value of the
23  * aggregate function over all rows in the current row's window frame.
24  *
25  *
26  * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
27  * Portions Copyright (c) 1994, Regents of the University of California
28  *
29  * IDENTIFICATION
30  *        $PostgreSQL: pgsql/src/backend/executor/nodeWindowAgg.c,v 1.7 2009/09/27 21:10:53 tgl Exp $
31  *
32  *-------------------------------------------------------------------------
33  */
34 #include "postgres.h"
35
36 #include "catalog/pg_aggregate.h"
37 #include "catalog/pg_proc.h"
38 #include "catalog/pg_type.h"
39 #include "executor/executor.h"
40 #include "executor/nodeWindowAgg.h"
41 #include "miscadmin.h"
42 #include "nodes/nodeFuncs.h"
43 #include "optimizer/clauses.h"
44 #include "parser/parse_agg.h"
45 #include "parser/parse_coerce.h"
46 #include "utils/acl.h"
47 #include "utils/builtins.h"
48 #include "utils/datum.h"
49 #include "utils/lsyscache.h"
50 #include "utils/memutils.h"
51 #include "utils/syscache.h"
52 #include "windowapi.h"
53
54 /*
55  * All the window function APIs are called with this object, which is passed
56  * to window functions as fcinfo->context.
57  */
58 typedef struct WindowObjectData
59 {
60         NodeTag         type;
61         WindowAggState *winstate;       /* parent WindowAggState */
62         List       *argstates;          /* ExprState trees for fn's arguments */
63         void       *localmem;           /* WinGetPartitionLocalMemory's chunk */
64         int                     markptr;                /* tuplestore mark pointer for this fn */
65         int                     readptr;                /* tuplestore read pointer for this fn */
66         int64           markpos;                /* row that markptr is positioned on */
67         int64           seekpos;                /* row that readptr is positioned on */
68 } WindowObjectData;
69
70 /*
71  * We have one WindowStatePerFunc struct for each window function and
72  * window aggregate handled by this node.
73  */
74 typedef struct WindowStatePerFuncData
75 {
76         /* Links to WindowFunc expr and state nodes this working state is for */
77         WindowFuncExprState *wfuncstate;
78         WindowFunc *wfunc;
79
80         int                     numArguments;   /* number of arguments */
81
82         FmgrInfo        flinfo;                 /* fmgr lookup data for window function */
83
84         /*
85          * We need the len and byval info for the result of each function in order
86          * to know how to copy/delete values.
87          */
88         int16           resulttypeLen;
89         bool            resulttypeByVal;
90
91         bool            plain_agg;              /* is it just a plain aggregate function? */
92         int                     aggno;                  /* if so, index of its PerAggData */
93
94         WindowObject winobj;            /* object used in window function API */
95 } WindowStatePerFuncData;
96
97 /*
98  * For plain aggregate window functions, we also have one of these.
99  */
100 typedef struct WindowStatePerAggData
101 {
102         /* Oids of transfer functions */
103         Oid                     transfn_oid;
104         Oid                     finalfn_oid;    /* may be InvalidOid */
105
106         /*
107          * fmgr lookup data for transfer functions --- only valid when
108          * corresponding oid is not InvalidOid.  Note in particular that fn_strict
109          * flags are kept here.
110          */
111         FmgrInfo        transfn;
112         FmgrInfo        finalfn;
113
114         /*
115          * initial value from pg_aggregate entry
116          */
117         Datum           initValue;
118         bool            initValueIsNull;
119
120         /*
121          * cached value for current frame boundaries
122          */
123         Datum           resultValue;
124         bool            resultValueIsNull;
125
126         /*
127          * We need the len and byval info for the agg's input, result, and
128          * transition data types in order to know how to copy/delete values.
129          */
130         int16           inputtypeLen,
131                                 resulttypeLen,
132                                 transtypeLen;
133         bool            inputtypeByVal,
134                                 resulttypeByVal,
135                                 transtypeByVal;
136
137         int                     wfuncno;                /* index of associated PerFuncData */
138
139         /* Current transition value */
140         Datum           transValue;             /* current transition value */
141         bool            transValueIsNull;
142
143         bool            noTransValue;   /* true if transValue not set yet */
144 } WindowStatePerAggData;
145
146 static void initialize_windowaggregate(WindowAggState *winstate,
147                                                    WindowStatePerFunc perfuncstate,
148                                                    WindowStatePerAgg peraggstate);
149 static void advance_windowaggregate(WindowAggState *winstate,
150                                                 WindowStatePerFunc perfuncstate,
151                                                 WindowStatePerAgg peraggstate);
152 static void finalize_windowaggregate(WindowAggState *winstate,
153                                                  WindowStatePerFunc perfuncstate,
154                                                  WindowStatePerAgg peraggstate,
155                                                  Datum *result, bool *isnull);
156
157 static void eval_windowaggregates(WindowAggState *winstate);
158 static void eval_windowfunction(WindowAggState *winstate,
159                                         WindowStatePerFunc perfuncstate,
160                                         Datum *result, bool *isnull);
161
162 static void begin_partition(WindowAggState *winstate);
163 static void spool_tuples(WindowAggState *winstate, int64 pos);
164 static void release_partition(WindowAggState *winstate);
165
166 static bool row_is_in_frame(WindowAggState *winstate, int64 pos,
167                                 TupleTableSlot *slot);
168 static void update_frametailpos(WindowObject winobj, TupleTableSlot *slot);
169
170 static WindowStatePerAggData *initialize_peragg(WindowAggState *winstate,
171                                   WindowFunc *wfunc,
172                                   WindowStatePerAgg peraggstate);
173 static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
174
175 static bool are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
176                   TupleTableSlot *slot2);
177 static bool window_gettupleslot(WindowObject winobj, int64 pos,
178                                         TupleTableSlot *slot);
179
180
181 /*
182  * initialize_windowaggregate
183  * parallel to initialize_aggregate in nodeAgg.c
184  */
185 static void
186 initialize_windowaggregate(WindowAggState *winstate,
187                                                    WindowStatePerFunc perfuncstate,
188                                                    WindowStatePerAgg peraggstate)
189 {
190         MemoryContext oldContext;
191
192         if (peraggstate->initValueIsNull)
193                 peraggstate->transValue = peraggstate->initValue;
194         else
195         {
196                 oldContext = MemoryContextSwitchTo(winstate->wincontext);
197                 peraggstate->transValue = datumCopy(peraggstate->initValue,
198                                                                                         peraggstate->transtypeByVal,
199                                                                                         peraggstate->transtypeLen);
200                 MemoryContextSwitchTo(oldContext);
201         }
202         peraggstate->transValueIsNull = peraggstate->initValueIsNull;
203         peraggstate->noTransValue = peraggstate->initValueIsNull;
204         peraggstate->resultValueIsNull = true;
205 }
206
207 /*
208  * advance_windowaggregate
209  * parallel to advance_aggregate in nodeAgg.c
210  */
211 static void
212 advance_windowaggregate(WindowAggState *winstate,
213                                                 WindowStatePerFunc perfuncstate,
214                                                 WindowStatePerAgg peraggstate)
215 {
216         WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
217         int                     numArguments = perfuncstate->numArguments;
218         FunctionCallInfoData fcinfodata;
219         FunctionCallInfo fcinfo = &fcinfodata;
220         Datum           newVal;
221         ListCell   *arg;
222         int                     i;
223         MemoryContext oldContext;
224         ExprContext *econtext = winstate->tmpcontext;
225
226         oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
227
228         /* We start from 1, since the 0th arg will be the transition value */
229         i = 1;
230         foreach(arg, wfuncstate->args)
231         {
232                 ExprState  *argstate = (ExprState *) lfirst(arg);
233
234                 fcinfo->arg[i] = ExecEvalExpr(argstate, econtext,
235                                                                           &fcinfo->argnull[i], NULL);
236                 i++;
237         }
238
239         if (peraggstate->transfn.fn_strict)
240         {
241                 /*
242                  * For a strict transfn, nothing happens when there's a NULL input; we
243                  * just keep the prior transValue.
244                  */
245                 for (i = 1; i <= numArguments; i++)
246                 {
247                         if (fcinfo->argnull[i])
248                         {
249                                 MemoryContextSwitchTo(oldContext);
250                                 return;
251                         }
252                 }
253                 if (peraggstate->noTransValue)
254                 {
255                         /*
256                          * transValue has not been initialized. This is the first non-NULL
257                          * input value. We use it as the initial value for transValue. (We
258                          * already checked that the agg's input type is binary-compatible
259                          * with its transtype, so straight copy here is OK.)
260                          *
261                          * We must copy the datum into wincontext if it is pass-by-ref. We
262                          * do not need to pfree the old transValue, since it's NULL.
263                          */
264                         MemoryContextSwitchTo(winstate->wincontext);
265                         peraggstate->transValue = datumCopy(fcinfo->arg[1],
266                                                                                                 peraggstate->transtypeByVal,
267                                                                                                 peraggstate->transtypeLen);
268                         peraggstate->transValueIsNull = false;
269                         peraggstate->noTransValue = false;
270                         MemoryContextSwitchTo(oldContext);
271                         return;
272                 }
273                 if (peraggstate->transValueIsNull)
274                 {
275                         /*
276                          * Don't call a strict function with NULL inputs.  Note it is
277                          * possible to get here despite the above tests, if the transfn is
278                          * strict *and* returned a NULL on a prior cycle. If that happens
279                          * we will propagate the NULL all the way to the end.
280                          */
281                         MemoryContextSwitchTo(oldContext);
282                         return;
283                 }
284         }
285
286         /*
287          * OK to call the transition function
288          */
289         InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn),
290                                                          numArguments + 1,
291                                                          (void *) winstate, NULL);
292         fcinfo->arg[0] = peraggstate->transValue;
293         fcinfo->argnull[0] = peraggstate->transValueIsNull;
294         newVal = FunctionCallInvoke(fcinfo);
295
296         /*
297          * If pass-by-ref datatype, must copy the new value into wincontext and
298          * pfree the prior transValue.  But if transfn returned a pointer to its
299          * first input, we don't need to do anything.
300          */
301         if (!peraggstate->transtypeByVal &&
302                 DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
303         {
304                 if (!fcinfo->isnull)
305                 {
306                         MemoryContextSwitchTo(winstate->wincontext);
307                         newVal = datumCopy(newVal,
308                                                            peraggstate->transtypeByVal,
309                                                            peraggstate->transtypeLen);
310                 }
311                 if (!peraggstate->transValueIsNull)
312                         pfree(DatumGetPointer(peraggstate->transValue));
313         }
314
315         MemoryContextSwitchTo(oldContext);
316         peraggstate->transValue = newVal;
317         peraggstate->transValueIsNull = fcinfo->isnull;
318 }
319
320 /*
321  * finalize_windowaggregate
322  * parallel to finalize_aggregate in nodeAgg.c
323  */
324 static void
325 finalize_windowaggregate(WindowAggState *winstate,
326                                                  WindowStatePerFunc perfuncstate,
327                                                  WindowStatePerAgg peraggstate,
328                                                  Datum *result, bool *isnull)
329 {
330         MemoryContext oldContext;
331
332         oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
333
334         /*
335          * Apply the agg's finalfn if one is provided, else return transValue.
336          */
337         if (OidIsValid(peraggstate->finalfn_oid))
338         {
339                 FunctionCallInfoData fcinfo;
340
341                 InitFunctionCallInfoData(fcinfo, &(peraggstate->finalfn), 1,
342                                                                  (void *) winstate, NULL);
343                 fcinfo.arg[0] = peraggstate->transValue;
344                 fcinfo.argnull[0] = peraggstate->transValueIsNull;
345                 if (fcinfo.flinfo->fn_strict && peraggstate->transValueIsNull)
346                 {
347                         /* don't call a strict function with NULL inputs */
348                         *result = (Datum) 0;
349                         *isnull = true;
350                 }
351                 else
352                 {
353                         *result = FunctionCallInvoke(&fcinfo);
354                         *isnull = fcinfo.isnull;
355                 }
356         }
357         else
358         {
359                 *result = peraggstate->transValue;
360                 *isnull = peraggstate->transValueIsNull;
361         }
362
363         /*
364          * If result is pass-by-ref, make sure it is in the right context.
365          */
366         if (!peraggstate->resulttypeByVal && !*isnull &&
367                 !MemoryContextContains(CurrentMemoryContext,
368                                                            DatumGetPointer(*result)))
369                 *result = datumCopy(*result,
370                                                         peraggstate->resulttypeByVal,
371                                                         peraggstate->resulttypeLen);
372         MemoryContextSwitchTo(oldContext);
373 }
374
375 /*
376  * eval_windowaggregates
377  * evaluate plain aggregates being used as window functions
378  *
379  * Much of this is duplicated from nodeAgg.c.  But NOTE that we expect to be
380  * able to call aggregate final functions repeatedly after aggregating more
381  * data onto the same transition value.  This is not a behavior required by
382  * nodeAgg.c.
383  */
384 static void
385 eval_windowaggregates(WindowAggState *winstate)
386 {
387         WindowStatePerAgg peraggstate;
388         int                     wfuncno,
389                                 numaggs;
390         int                     i;
391         MemoryContext oldContext;
392         ExprContext *econtext;
393         TupleTableSlot *agg_row_slot;
394
395         numaggs = winstate->numaggs;
396         if (numaggs == 0)
397                 return;                                 /* nothing to do */
398
399         /* final output execution is in ps_ExprContext */
400         econtext = winstate->ss.ps.ps_ExprContext;
401
402         /*
403          * Currently, we support only a subset of the SQL-standard window framing
404          * rules.  In all the supported cases, the window frame always consists of
405          * a contiguous group of rows extending forward from the start of the
406          * partition, and rows only enter the frame, never exit it, as the current
407          * row advances forward.  This makes it possible to use an incremental
408          * strategy for evaluating aggregates: we run the transition function for
409          * each row added to the frame, and run the final function whenever we
410          * need the current aggregate value.  This is considerably more efficient
411          * than the naive approach of re-running the entire aggregate calculation
412          * for each current row.  It does assume that the final function doesn't
413          * damage the running transition value, but we have the same assumption
414          * in nodeAgg.c too (when it rescans an existing hash table).
415          *
416          * In many common cases, multiple rows share the same frame and hence the
417          * same aggregate value. (In particular, if there's no ORDER BY in a RANGE
418          * window, then all rows are peers and so they all have window frame equal
419          * to the whole partition.)  We optimize such cases by calculating the
420          * aggregate value once when we reach the first row of a peer group, and
421          * then returning the saved value for all subsequent rows.
422          *
423          * 'aggregatedupto' keeps track of the first row that has not yet been
424          * accumulated into the aggregate transition values.  Whenever we start a
425          * new peer group, we accumulate forward to the end of the peer group.
426          *
427          * TODO: In the future, we should implement the full SQL-standard set of
428          * framing rules.  We could implement the other cases by recalculating the
429          * aggregates whenever a row exits the frame.  That would be pretty slow,
430          * though.      For aggregates like SUM and COUNT we could implement a
431          * "negative transition function" that would be called for each row as it
432          * exits the frame.  We'd have to think about avoiding recalculation of
433          * volatile arguments of aggregate functions, too.
434          */
435
436         /*
437          * If we've already aggregated up through current row, reuse the saved
438          * result values.  NOTE: this test works for the currently supported
439          * framing rules, but will need fixing when more are added.
440          */
441         if (winstate->aggregatedupto > winstate->currentpos)
442         {
443                 for (i = 0; i < numaggs; i++)
444                 {
445                         peraggstate = &winstate->peragg[i];
446                         wfuncno = peraggstate->wfuncno;
447                         econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue;
448                         econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull;
449                 }
450                 return;
451         }
452
453         /* Initialize aggregates on first call for partition */
454         if (winstate->currentpos == 0)
455         {
456                 for (i = 0; i < numaggs; i++)
457                 {
458                         peraggstate = &winstate->peragg[i];
459                         wfuncno = peraggstate->wfuncno;
460                         initialize_windowaggregate(winstate,
461                                                                            &winstate->perfunc[wfuncno],
462                                                                            peraggstate);
463                 }
464         }
465
466         /*
467          * Advance until we reach a row not in frame (or end of partition).
468          *
469          * Note the loop invariant: agg_row_slot is either empty or holds the row
470          * at position aggregatedupto.  The agg_ptr read pointer must always point
471          * to the next row to read into agg_row_slot.
472          */
473         agg_row_slot = winstate->agg_row_slot;
474         for (;;)
475         {
476                 /* Fetch next row if we didn't already */
477                 if (TupIsNull(agg_row_slot))
478                 {
479                         spool_tuples(winstate, winstate->aggregatedupto);
480                         tuplestore_select_read_pointer(winstate->buffer,
481                                                                                    winstate->agg_ptr);
482                         if (!tuplestore_gettupleslot(winstate->buffer, true, true,
483                                                                                  agg_row_slot))
484                                 break;                  /* must be end of partition */
485                 }
486
487                 /* Exit loop (for now) if not in frame */
488                 if (!row_is_in_frame(winstate, winstate->aggregatedupto, agg_row_slot))
489                         break;
490
491                 /* Set tuple context for evaluation of aggregate arguments */
492                 winstate->tmpcontext->ecxt_outertuple = agg_row_slot;
493
494                 /* Accumulate row into the aggregates */
495                 for (i = 0; i < numaggs; i++)
496                 {
497                         peraggstate = &winstate->peragg[i];
498                         wfuncno = peraggstate->wfuncno;
499                         advance_windowaggregate(winstate,
500                                                                         &winstate->perfunc[wfuncno],
501                                                                         peraggstate);
502                 }
503
504                 /* Reset per-input-tuple context after each tuple */
505                 ResetExprContext(winstate->tmpcontext);
506
507                 /* And advance the aggregated-row state */
508                 winstate->aggregatedupto++;
509                 ExecClearTuple(agg_row_slot);
510         }
511
512         /*
513          * finalize aggregates and fill result/isnull fields.
514          */
515         for (i = 0; i < numaggs; i++)
516         {
517                 Datum      *result;
518                 bool       *isnull;
519
520                 peraggstate = &winstate->peragg[i];
521                 wfuncno = peraggstate->wfuncno;
522                 result = &econtext->ecxt_aggvalues[wfuncno];
523                 isnull = &econtext->ecxt_aggnulls[wfuncno];
524                 finalize_windowaggregate(winstate,
525                                                                  &winstate->perfunc[wfuncno],
526                                                                  peraggstate,
527                                                                  result, isnull);
528
529                 /*
530                  * save the result in case next row shares the same frame.
531                  *
532                  * XXX in some framing modes, eg ROWS/END_CURRENT_ROW, we can know in
533                  * advance that the next row can't possibly share the same frame. Is
534                  * it worth detecting that and skipping this code?
535                  */
536                 if (!peraggstate->resulttypeByVal)
537                 {
538                         /*
539                          * clear old resultValue in order not to leak memory.  (Note: the
540                          * new result can't possibly be the same datum as old resultValue,
541                          * because we never passed it to the trans function.)
542                          */
543                         if (!peraggstate->resultValueIsNull)
544                                 pfree(DatumGetPointer(peraggstate->resultValue));
545
546                         /*
547                          * If pass-by-ref, copy it into our global context.
548                          */
549                         if (!*isnull)
550                         {
551                                 oldContext = MemoryContextSwitchTo(winstate->wincontext);
552                                 peraggstate->resultValue =
553                                         datumCopy(*result,
554                                                           peraggstate->resulttypeByVal,
555                                                           peraggstate->resulttypeLen);
556                                 MemoryContextSwitchTo(oldContext);
557                         }
558                 }
559                 else
560                 {
561                         peraggstate->resultValue = *result;
562                 }
563                 peraggstate->resultValueIsNull = *isnull;
564         }
565 }
566
567 /*
568  * eval_windowfunction
569  *
570  * Arguments of window functions are not evaluated here, because a window
571  * function can need random access to arbitrary rows in the partition.
572  * The window function uses the special WinGetFuncArgInPartition and
573  * WinGetFuncArgInFrame functions to evaluate the arguments for the rows
574  * it wants.
575  */
576 static void
577 eval_windowfunction(WindowAggState *winstate, WindowStatePerFunc perfuncstate,
578                                         Datum *result, bool *isnull)
579 {
580         FunctionCallInfoData fcinfo;
581         MemoryContext oldContext;
582
583         oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
584
585         /*
586          * We don't pass any normal arguments to a window function, but we do pass
587          * it the number of arguments, in order to permit window function
588          * implementations to support varying numbers of arguments.  The real info
589          * goes through the WindowObject, which is passed via fcinfo->context.
590          */
591         InitFunctionCallInfoData(fcinfo, &(perfuncstate->flinfo),
592                                                          perfuncstate->numArguments,
593                                                          (void *) perfuncstate->winobj, NULL);
594         /* Just in case, make all the regular argument slots be null */
595         memset(fcinfo.argnull, true, perfuncstate->numArguments);
596
597         *result = FunctionCallInvoke(&fcinfo);
598         *isnull = fcinfo.isnull;
599
600         /*
601          * Make sure pass-by-ref data is allocated in the appropriate context. (We
602          * need this in case the function returns a pointer into some short-lived
603          * tuple, as is entirely possible.)
604          */
605         if (!perfuncstate->resulttypeByVal && !fcinfo.isnull &&
606                 !MemoryContextContains(CurrentMemoryContext,
607                                                            DatumGetPointer(*result)))
608                 *result = datumCopy(*result,
609                                                         perfuncstate->resulttypeByVal,
610                                                         perfuncstate->resulttypeLen);
611
612         MemoryContextSwitchTo(oldContext);
613 }
614
615 /*
616  * begin_partition
617  * Start buffering rows of the next partition.
618  */
619 static void
620 begin_partition(WindowAggState *winstate)
621 {
622         PlanState  *outerPlan = outerPlanState(winstate);
623         int                     numfuncs = winstate->numfuncs;
624         int                     i;
625
626         winstate->partition_spooled = false;
627         winstate->frametail_valid = false;
628         winstate->spooled_rows = 0;
629         winstate->currentpos = 0;
630         winstate->frametailpos = -1;
631         winstate->aggregatedupto = 0;
632         ExecClearTuple(winstate->agg_row_slot);
633
634         /*
635          * If this is the very first partition, we need to fetch the first input
636          * row to store in first_part_slot.
637          */
638         if (TupIsNull(winstate->first_part_slot))
639         {
640                 TupleTableSlot *outerslot = ExecProcNode(outerPlan);
641
642                 if (!TupIsNull(outerslot))
643                         ExecCopySlot(winstate->first_part_slot, outerslot);
644                 else
645                 {
646                         /* outer plan is empty, so we have nothing to do */
647                         winstate->partition_spooled = true;
648                         winstate->more_partitions = false;
649                         return;
650                 }
651         }
652
653         /* Create new tuplestore for this partition */
654         winstate->buffer = tuplestore_begin_heap(false, false, work_mem);
655
656         /*
657          * Set up read pointers for the tuplestore.  The current and agg pointers
658          * don't need BACKWARD capability, but the per-window-function read
659          * pointers do.
660          */
661         winstate->current_ptr = 0;      /* read pointer 0 is pre-allocated */
662
663         /* reset default REWIND capability bit for current ptr */
664         tuplestore_set_eflags(winstate->buffer, 0);
665
666         /* create a read pointer for aggregates, if needed */
667         if (winstate->numaggs > 0)
668                 winstate->agg_ptr = tuplestore_alloc_read_pointer(winstate->buffer, 0);
669
670         /* create mark and read pointers for each real window function */
671         for (i = 0; i < numfuncs; i++)
672         {
673                 WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
674
675                 if (!perfuncstate->plain_agg)
676                 {
677                         WindowObject winobj = perfuncstate->winobj;
678
679                         winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer,
680                                                                                                                         0);
681                         winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
682                                                                                                                  EXEC_FLAG_BACKWARD);
683                         winobj->markpos = -1;
684                         winobj->seekpos = -1;
685                 }
686         }
687
688         /*
689          * Store the first tuple into the tuplestore (it's always available now;
690          * we either read it above, or saved it at the end of previous partition)
691          */
692         tuplestore_puttupleslot(winstate->buffer, winstate->first_part_slot);
693         winstate->spooled_rows++;
694 }
695
696 /*
697  * Read tuples from the outer node, up to position 'pos', and store them
698  * into the tuplestore. If pos is -1, reads the whole partition.
699  */
700 static void
701 spool_tuples(WindowAggState *winstate, int64 pos)
702 {
703         WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
704         PlanState  *outerPlan;
705         TupleTableSlot *outerslot;
706         MemoryContext oldcontext;
707
708         if (!winstate->buffer)
709                 return;                                 /* just a safety check */
710         if (winstate->partition_spooled)
711                 return;                                 /* whole partition done already */
712
713         /*
714          * If the tuplestore has spilled to disk, alternate reading and writing
715          * becomes quite expensive due to frequent buffer flushes.      It's cheaper
716          * to force the entire partition to get spooled in one go.
717          *
718          * XXX this is a horrid kluge --- it'd be better to fix the performance
719          * problem inside tuplestore.  FIXME
720          */
721         if (!tuplestore_in_memory(winstate->buffer))
722                 pos = -1;
723
724         outerPlan = outerPlanState(winstate);
725
726         /* Must be in query context to call outerplan or touch tuplestore */
727         oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
728
729         while (winstate->spooled_rows <= pos || pos == -1)
730         {
731                 outerslot = ExecProcNode(outerPlan);
732                 if (TupIsNull(outerslot))
733                 {
734                         /* reached the end of the last partition */
735                         winstate->partition_spooled = true;
736                         winstate->more_partitions = false;
737                         break;
738                 }
739
740                 if (node->partNumCols > 0)
741                 {
742                         /* Check if this tuple still belongs to the current partition */
743                         if (!execTuplesMatch(winstate->first_part_slot,
744                                                                  outerslot,
745                                                                  node->partNumCols, node->partColIdx,
746                                                                  winstate->partEqfunctions,
747                                                                  winstate->tmpcontext->ecxt_per_tuple_memory))
748                         {
749                                 /*
750                                  * end of partition; copy the tuple for the next cycle.
751                                  */
752                                 ExecCopySlot(winstate->first_part_slot, outerslot);
753                                 winstate->partition_spooled = true;
754                                 winstate->more_partitions = true;
755                                 break;
756                         }
757                 }
758
759                 /* Still in partition, so save it into the tuplestore */
760                 tuplestore_puttupleslot(winstate->buffer, outerslot);
761                 winstate->spooled_rows++;
762         }
763
764         MemoryContextSwitchTo(oldcontext);
765 }
766
767 /*
768  * release_partition
769  * clear information kept within a partition, including
770  * tuplestore and aggregate results.
771  */
772 static void
773 release_partition(WindowAggState *winstate)
774 {
775         int                     i;
776
777         for (i = 0; i < winstate->numfuncs; i++)
778         {
779                 WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
780
781                 /* Release any partition-local state of this window function */
782                 if (perfuncstate->winobj)
783                         perfuncstate->winobj->localmem = NULL;
784         }
785
786         /*
787          * Release all partition-local memory (in particular, any partition-local
788          * state that we might have trashed our pointers to in the above loop, and
789          * any aggregate temp data).  We don't rely on retail pfree because some
790          * aggregates might have allocated data we don't have direct pointers to.
791          */
792         MemoryContextResetAndDeleteChildren(winstate->wincontext);
793
794         if (winstate->buffer)
795                 tuplestore_end(winstate->buffer);
796         winstate->buffer = NULL;
797         winstate->partition_spooled = false;
798 }
799
800 /*
801  * row_is_in_frame
802  * Determine whether a row is in the current row's window frame according
803  * to our window framing rule
804  *
805  * The caller must have already determined that the row is in the partition
806  * and fetched it into a slot.  This function just encapsulates the framing
807  * rules.
808  */
809 static bool
810 row_is_in_frame(WindowAggState *winstate, int64 pos, TupleTableSlot *slot)
811 {
812         WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
813         int                     frameOptions = node->frameOptions;
814
815         Assert(pos >= 0);                       /* else caller error */
816
817         /* We only support frame start mode UNBOUNDED PRECEDING for now */
818         Assert(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING);
819
820         /* In UNBOUNDED FOLLOWING mode, all partition rows are in frame */
821         if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING)
822                 return true;
823
824         /* Else frame tail mode must be CURRENT ROW */
825         Assert(frameOptions & FRAMEOPTION_END_CURRENT_ROW);
826
827         /* if row is current row or a predecessor, it must be in frame */
828         if (pos <= winstate->currentpos)
829                 return true;
830
831         /* In ROWS mode, *only* such rows are in frame */
832         if (frameOptions & FRAMEOPTION_ROWS)
833                 return false;
834
835         /* Else must be RANGE mode */
836         Assert(frameOptions & FRAMEOPTION_RANGE);
837
838         /* In frame iff it's a peer of current row */
839         return are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot);
840 }
841
842 /*
843  * update_frametailpos
844  * make frametailpos valid for the current row
845  *
846  * Uses the winobj's read pointer for any required fetches; the winobj's
847  * mark must not be past the currently known frame tail.  Also uses the
848  * specified slot for any required fetches.
849  */
850 static void
851 update_frametailpos(WindowObject winobj, TupleTableSlot *slot)
852 {
853         WindowAggState *winstate = winobj->winstate;
854         WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
855         int                     frameOptions = node->frameOptions;
856         int64           ftnext;
857
858         if (winstate->frametail_valid)
859                 return;                                 /* already known for current row */
860
861         /* We only support frame start mode UNBOUNDED PRECEDING for now */
862         Assert(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING);
863
864         /* In UNBOUNDED FOLLOWING mode, all partition rows are in frame */
865         if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING)
866         {
867                 spool_tuples(winstate, -1);
868                 winstate->frametailpos = winstate->spooled_rows - 1;
869                 winstate->frametail_valid = true;
870                 return;
871         }
872
873         /* Else frame tail mode must be CURRENT ROW */
874         Assert(frameOptions & FRAMEOPTION_END_CURRENT_ROW);
875
876         /* In ROWS mode, exactly the rows up to current are in frame */
877         if (frameOptions & FRAMEOPTION_ROWS)
878         {
879                 winstate->frametailpos = winstate->currentpos;
880                 winstate->frametail_valid = true;
881                 return;
882         }
883
884         /* Else must be RANGE mode */
885         Assert(frameOptions & FRAMEOPTION_RANGE);
886
887         /* If no ORDER BY, all rows are peers with each other */
888         if (node->ordNumCols == 0)
889         {
890                 spool_tuples(winstate, -1);
891                 winstate->frametailpos = winstate->spooled_rows - 1;
892                 winstate->frametail_valid = true;
893                 return;
894         }
895
896         /*
897          * Else we have to search for the first non-peer of the current row. We
898          * assume the current value of frametailpos is a lower bound on the
899          * possible frame tail location, ie, frame tail never goes backward, and
900          * that currentpos is also a lower bound, ie, current row is always in
901          * frame.
902          */
903         ftnext = Max(winstate->frametailpos, winstate->currentpos) + 1;
904         for (;;)
905         {
906                 if (!window_gettupleslot(winobj, ftnext, slot))
907                         break;                          /* end of partition */
908                 if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
909                         break;                          /* not peer of current row */
910                 ftnext++;
911         }
912         winstate->frametailpos = ftnext - 1;
913         winstate->frametail_valid = true;
914 }
915
916
917 /* -----------------
918  * ExecWindowAgg
919  *
920  *      ExecWindowAgg receives tuples from its outer subplan and
921  *      stores them into a tuplestore, then processes window functions.
922  *      This node doesn't reduce nor qualify any row so the number of
923  *      returned rows is exactly the same as its outer subplan's result
924  *      (ignoring the case of SRFs in the targetlist, that is).
925  * -----------------
926  */
927 TupleTableSlot *
928 ExecWindowAgg(WindowAggState *winstate)
929 {
930         TupleTableSlot *result;
931         ExprDoneCond isDone;
932         ExprContext *econtext;
933         int                     i;
934         int                     numfuncs;
935
936         if (winstate->all_done)
937                 return NULL;
938
939         /*
940          * Check to see if we're still projecting out tuples from a previous
941          * output tuple (because there is a function-returning-set in the
942          * projection expressions).  If so, try to project another one.
943          */
944         if (winstate->ss.ps.ps_TupFromTlist)
945         {
946                 TupleTableSlot *result;
947                 ExprDoneCond isDone;
948
949                 result = ExecProject(winstate->ss.ps.ps_ProjInfo, &isDone);
950                 if (isDone == ExprMultipleResult)
951                         return result;
952                 /* Done with that source tuple... */
953                 winstate->ss.ps.ps_TupFromTlist = false;
954         }
955
956 restart:
957         if (winstate->buffer == NULL)
958         {
959                 /* Initialize for first partition and set current row = 0 */
960                 begin_partition(winstate);
961                 /* If there are no input rows, we'll detect that and exit below */
962         }
963         else
964         {
965                 /* Advance current row within partition */
966                 winstate->currentpos++;
967                 /* This might mean that the frame tail moves, too */
968                 winstate->frametail_valid = false;
969         }
970
971         /*
972          * Spool all tuples up to and including the current row, if we haven't
973          * already
974          */
975         spool_tuples(winstate, winstate->currentpos);
976
977         /* Move to the next partition if we reached the end of this partition */
978         if (winstate->partition_spooled &&
979                 winstate->currentpos >= winstate->spooled_rows)
980         {
981                 release_partition(winstate);
982
983                 if (winstate->more_partitions)
984                 {
985                         begin_partition(winstate);
986                         Assert(winstate->spooled_rows > 0);
987                 }
988                 else
989                 {
990                         winstate->all_done = true;
991                         return NULL;
992                 }
993         }
994
995         /* final output execution is in ps_ExprContext */
996         econtext = winstate->ss.ps.ps_ExprContext;
997
998         /* Clear the per-output-tuple context for current row */
999         ResetExprContext(econtext);
1000
1001         /*
1002          * Read the current row from the tuplestore, and save in ScanTupleSlot.
1003          * (We can't rely on the outerplan's output slot because we may have to
1004          * read beyond the current row.  Also, we have to actually copy the row
1005          * out of the tuplestore, since window function evaluation might cause the
1006          * tuplestore to dump its state to disk.)
1007          *
1008          * Current row must be in the tuplestore, since we spooled it above.
1009          */
1010         tuplestore_select_read_pointer(winstate->buffer, winstate->current_ptr);
1011         if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1012                                                                  winstate->ss.ss_ScanTupleSlot))
1013                 elog(ERROR, "unexpected end of tuplestore");
1014
1015         /*
1016          * Evaluate true window functions
1017          */
1018         numfuncs = winstate->numfuncs;
1019         for (i = 0; i < numfuncs; i++)
1020         {
1021                 WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1022
1023                 if (perfuncstate->plain_agg)
1024                         continue;
1025                 eval_windowfunction(winstate, perfuncstate,
1026                           &(econtext->ecxt_aggvalues[perfuncstate->wfuncstate->wfuncno]),
1027                           &(econtext->ecxt_aggnulls[perfuncstate->wfuncstate->wfuncno]));
1028         }
1029
1030         /*
1031          * Evaluate aggregates
1032          */
1033         if (winstate->numaggs > 0)
1034                 eval_windowaggregates(winstate);
1035
1036         /*
1037          * Truncate any no-longer-needed rows from the tuplestore.
1038          */
1039         tuplestore_trim(winstate->buffer);
1040
1041         /*
1042          * Form and return a projection tuple using the windowfunc results and the
1043          * current row.  Setting ecxt_outertuple arranges that any Vars will be
1044          * evaluated with respect to that row.
1045          */
1046         econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
1047         result = ExecProject(winstate->ss.ps.ps_ProjInfo, &isDone);
1048
1049         if (isDone == ExprEndResult)
1050         {
1051                 /* SRF in tlist returned no rows, so advance to next input tuple */
1052                 goto restart;
1053         }
1054
1055         winstate->ss.ps.ps_TupFromTlist =
1056                 (isDone == ExprMultipleResult);
1057         return result;
1058 }
1059
1060 /* -----------------
1061  * ExecInitWindowAgg
1062  *
1063  *      Creates the run-time information for the WindowAgg node produced by the
1064  *      planner and initializes its outer subtree
1065  * -----------------
1066  */
1067 WindowAggState *
1068 ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
1069 {
1070         WindowAggState *winstate;
1071         Plan       *outerPlan;
1072         ExprContext *econtext;
1073         ExprContext *tmpcontext;
1074         WindowStatePerFunc perfunc;
1075         WindowStatePerAgg peragg;
1076         int                     numfuncs,
1077                                 wfuncno,
1078                                 numaggs,
1079                                 aggno;
1080         ListCell   *l;
1081
1082         /* check for unsupported flags */
1083         Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
1084
1085         /*
1086          * create state structure
1087          */
1088         winstate = makeNode(WindowAggState);
1089         winstate->ss.ps.plan = (Plan *) node;
1090         winstate->ss.ps.state = estate;
1091
1092         /*
1093          * Create expression contexts.  We need two, one for per-input-tuple
1094          * processing and one for per-output-tuple processing.  We cheat a little
1095          * by using ExecAssignExprContext() to build both.
1096          */
1097         ExecAssignExprContext(estate, &winstate->ss.ps);
1098         tmpcontext = winstate->ss.ps.ps_ExprContext;
1099         winstate->tmpcontext = tmpcontext;
1100         ExecAssignExprContext(estate, &winstate->ss.ps);
1101
1102         /* Create long-lived context for storage of aggregate transvalues etc */
1103         winstate->wincontext =
1104                 AllocSetContextCreate(CurrentMemoryContext,
1105                                                           "WindowAggContext",
1106                                                           ALLOCSET_DEFAULT_MINSIZE,
1107                                                           ALLOCSET_DEFAULT_INITSIZE,
1108                                                           ALLOCSET_DEFAULT_MAXSIZE);
1109
1110         /*
1111          * tuple table initialization
1112          */
1113         ExecInitScanTupleSlot(estate, &winstate->ss);
1114         ExecInitResultTupleSlot(estate, &winstate->ss.ps);
1115         winstate->first_part_slot = ExecInitExtraTupleSlot(estate);
1116         winstate->agg_row_slot = ExecInitExtraTupleSlot(estate);
1117         winstate->temp_slot_1 = ExecInitExtraTupleSlot(estate);
1118         winstate->temp_slot_2 = ExecInitExtraTupleSlot(estate);
1119
1120         winstate->ss.ps.targetlist = (List *)
1121                 ExecInitExpr((Expr *) node->plan.targetlist,
1122                                          (PlanState *) winstate);
1123
1124         /*
1125          * WindowAgg nodes never have quals, since they can only occur at the
1126          * logical top level of a query (ie, after any WHERE or HAVING filters)
1127          */
1128         Assert(node->plan.qual == NIL);
1129         winstate->ss.ps.qual = NIL;
1130
1131         /*
1132          * initialize child nodes
1133          */
1134         outerPlan = outerPlan(node);
1135         outerPlanState(winstate) = ExecInitNode(outerPlan, estate, eflags);
1136
1137         /*
1138          * initialize source tuple type (which is also the tuple type that we'll
1139          * store in the tuplestore and use in all our working slots).
1140          */
1141         ExecAssignScanTypeFromOuterPlan(&winstate->ss);
1142
1143         ExecSetSlotDescriptor(winstate->first_part_slot,
1144                                                   winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
1145         ExecSetSlotDescriptor(winstate->agg_row_slot,
1146                                                   winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
1147         ExecSetSlotDescriptor(winstate->temp_slot_1,
1148                                                   winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
1149         ExecSetSlotDescriptor(winstate->temp_slot_2,
1150                                                   winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
1151
1152         /*
1153          * Initialize result tuple type and projection info.
1154          */
1155         ExecAssignResultTypeFromTL(&winstate->ss.ps);
1156         ExecAssignProjectionInfo(&winstate->ss.ps, NULL);
1157
1158         winstate->ss.ps.ps_TupFromTlist = false;
1159
1160         /* Set up data for comparing tuples */
1161         if (node->partNumCols > 0)
1162                 winstate->partEqfunctions = execTuplesMatchPrepare(node->partNumCols,
1163                                                                                                                 node->partOperators);
1164         if (node->ordNumCols > 0)
1165                 winstate->ordEqfunctions = execTuplesMatchPrepare(node->ordNumCols,
1166                                                                                                                   node->ordOperators);
1167
1168         /*
1169          * WindowAgg nodes use aggvalues and aggnulls as well as Agg nodes.
1170          */
1171         numfuncs = winstate->numfuncs;
1172         numaggs = winstate->numaggs;
1173         econtext = winstate->ss.ps.ps_ExprContext;
1174         econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numfuncs);
1175         econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numfuncs);
1176
1177         /*
1178          * allocate per-wfunc/per-agg state information.
1179          */
1180         perfunc = (WindowStatePerFunc) palloc0(sizeof(WindowStatePerFuncData) * numfuncs);
1181         peragg = (WindowStatePerAgg) palloc0(sizeof(WindowStatePerAggData) * numaggs);
1182         winstate->perfunc = perfunc;
1183         winstate->peragg = peragg;
1184
1185         wfuncno = -1;
1186         aggno = -1;
1187         foreach(l, winstate->funcs)
1188         {
1189                 WindowFuncExprState *wfuncstate = (WindowFuncExprState *) lfirst(l);
1190                 WindowFunc *wfunc = (WindowFunc *) wfuncstate->xprstate.expr;
1191                 WindowStatePerFunc perfuncstate;
1192                 AclResult       aclresult;
1193                 int                     i;
1194
1195                 if (wfunc->winref != node->winref)              /* planner screwed up? */
1196                         elog(ERROR, "WindowFunc with winref %u assigned to WindowAgg with winref %u",
1197                                  wfunc->winref, node->winref);
1198
1199                 /* Look for a previous duplicate window function */
1200                 for (i = 0; i <= wfuncno; i++)
1201                 {
1202                         if (equal(wfunc, perfunc[i].wfunc) &&
1203                                 !contain_volatile_functions((Node *) wfunc))
1204                                 break;
1205                 }
1206                 if (i <= wfuncno)
1207                 {
1208                         /* Found a match to an existing entry, so just mark it */
1209                         wfuncstate->wfuncno = i;
1210                         continue;
1211                 }
1212
1213                 /* Nope, so assign a new PerAgg record */
1214                 perfuncstate = &perfunc[++wfuncno];
1215
1216                 /* Mark WindowFunc state node with assigned index in the result array */
1217                 wfuncstate->wfuncno = wfuncno;
1218
1219                 /* Check permission to call window function */
1220                 aclresult = pg_proc_aclcheck(wfunc->winfnoid, GetUserId(),
1221                                                                          ACL_EXECUTE);
1222                 if (aclresult != ACLCHECK_OK)
1223                         aclcheck_error(aclresult, ACL_KIND_PROC,
1224                                                    get_func_name(wfunc->winfnoid));
1225
1226                 /* Fill in the perfuncstate data */
1227                 perfuncstate->wfuncstate = wfuncstate;
1228                 perfuncstate->wfunc = wfunc;
1229                 perfuncstate->numArguments = list_length(wfuncstate->args);
1230
1231                 fmgr_info_cxt(wfunc->winfnoid, &perfuncstate->flinfo,
1232                                           tmpcontext->ecxt_per_query_memory);
1233                 perfuncstate->flinfo.fn_expr = (Node *) wfunc;
1234                 get_typlenbyval(wfunc->wintype,
1235                                                 &perfuncstate->resulttypeLen,
1236                                                 &perfuncstate->resulttypeByVal);
1237
1238                 /*
1239                  * If it's really just a plain aggregate function, we'll emulate the
1240                  * Agg environment for it.
1241                  */
1242                 perfuncstate->plain_agg = wfunc->winagg;
1243                 if (wfunc->winagg)
1244                 {
1245                         WindowStatePerAgg peraggstate;
1246
1247                         perfuncstate->aggno = ++aggno;
1248                         peraggstate = &winstate->peragg[aggno];
1249                         initialize_peragg(winstate, wfunc, peraggstate);
1250                         peraggstate->wfuncno = wfuncno;
1251                 }
1252                 else
1253                 {
1254                         WindowObject winobj = makeNode(WindowObjectData);
1255
1256                         winobj->winstate = winstate;
1257                         winobj->argstates = wfuncstate->args;
1258                         winobj->localmem = NULL;
1259                         perfuncstate->winobj = winobj;
1260                 }
1261         }
1262
1263         /* Update numfuncs, numaggs to match number of unique functions found */
1264         winstate->numfuncs = wfuncno + 1;
1265         winstate->numaggs = aggno + 1;
1266
1267         winstate->partition_spooled = false;
1268         winstate->more_partitions = false;
1269
1270         return winstate;
1271 }
1272
1273 /* -----------------
1274  * ExecEndWindowAgg
1275  * -----------------
1276  */
1277 void
1278 ExecEndWindowAgg(WindowAggState *node)
1279 {
1280         PlanState  *outerPlan;
1281
1282         release_partition(node);
1283
1284         pfree(node->perfunc);
1285         pfree(node->peragg);
1286
1287         ExecClearTuple(node->ss.ss_ScanTupleSlot);
1288         ExecClearTuple(node->first_part_slot);
1289         ExecClearTuple(node->agg_row_slot);
1290         ExecClearTuple(node->temp_slot_1);
1291         ExecClearTuple(node->temp_slot_2);
1292
1293         /*
1294          * Free both the expr contexts.
1295          */
1296         ExecFreeExprContext(&node->ss.ps);
1297         node->ss.ps.ps_ExprContext = node->tmpcontext;
1298         ExecFreeExprContext(&node->ss.ps);
1299
1300         MemoryContextDelete(node->wincontext);
1301
1302         outerPlan = outerPlanState(node);
1303         ExecEndNode(outerPlan);
1304 }
1305
1306 /* -----------------
1307  * ExecRescanWindowAgg
1308  * -----------------
1309  */
1310 void
1311 ExecReScanWindowAgg(WindowAggState *node, ExprContext *exprCtxt)
1312 {
1313         ExprContext *econtext = node->ss.ps.ps_ExprContext;
1314
1315         node->all_done = false;
1316
1317         node->ss.ps.ps_TupFromTlist = false;
1318
1319         /* release tuplestore et al */
1320         release_partition(node);
1321
1322         /* release all temp tuples, but especially first_part_slot */
1323         ExecClearTuple(node->ss.ss_ScanTupleSlot);
1324         ExecClearTuple(node->first_part_slot);
1325         ExecClearTuple(node->agg_row_slot);
1326         ExecClearTuple(node->temp_slot_1);
1327         ExecClearTuple(node->temp_slot_2);
1328
1329         /* Forget current wfunc values */
1330         MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numfuncs);
1331         MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numfuncs);
1332
1333         /*
1334          * if chgParam of subnode is not null then plan will be re-scanned by
1335          * first ExecProcNode.
1336          */
1337         if (((PlanState *) node)->lefttree->chgParam == NULL)
1338                 ExecReScan(((PlanState *) node)->lefttree, exprCtxt);
1339 }
1340
1341 /*
1342  * initialize_peragg
1343  *
1344  * Almost same as in nodeAgg.c, except we don't support DISTINCT currently.
1345  */
1346 static WindowStatePerAggData *
1347 initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
1348                                   WindowStatePerAgg peraggstate)
1349 {
1350         Oid                     inputTypes[FUNC_MAX_ARGS];
1351         int                     numArguments;
1352         HeapTuple       aggTuple;
1353         Form_pg_aggregate aggform;
1354         Oid                     aggtranstype;
1355         AclResult       aclresult;
1356         Oid                     transfn_oid,
1357                                 finalfn_oid;
1358         Expr       *transfnexpr,
1359                            *finalfnexpr;
1360         Datum           textInitVal;
1361         int                     i;
1362         ListCell   *lc;
1363
1364         numArguments = list_length(wfunc->args);
1365
1366         i = 0;
1367         foreach(lc, wfunc->args)
1368         {
1369                 inputTypes[i++] = exprType((Node *) lfirst(lc));
1370         }
1371
1372         aggTuple = SearchSysCache(AGGFNOID,
1373                                                           ObjectIdGetDatum(wfunc->winfnoid),
1374                                                           0, 0, 0);
1375         if (!HeapTupleIsValid(aggTuple))
1376                 elog(ERROR, "cache lookup failed for aggregate %u",
1377                          wfunc->winfnoid);
1378         aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
1379
1380         /*
1381          * ExecInitWindowAgg already checked permission to call aggregate function
1382          * ... but we still need to check the component functions
1383          */
1384
1385         peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn;
1386         peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
1387
1388         /* Check that aggregate owner has permission to call component fns */
1389         {
1390                 HeapTuple       procTuple;
1391                 Oid                     aggOwner;
1392
1393                 procTuple = SearchSysCache(PROCOID,
1394                                                                    ObjectIdGetDatum(wfunc->winfnoid),
1395                                                                    0, 0, 0);
1396                 if (!HeapTupleIsValid(procTuple))
1397                         elog(ERROR, "cache lookup failed for function %u",
1398                                  wfunc->winfnoid);
1399                 aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
1400                 ReleaseSysCache(procTuple);
1401
1402                 aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
1403                                                                          ACL_EXECUTE);
1404                 if (aclresult != ACLCHECK_OK)
1405                         aclcheck_error(aclresult, ACL_KIND_PROC,
1406                                                    get_func_name(transfn_oid));
1407                 if (OidIsValid(finalfn_oid))
1408                 {
1409                         aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
1410                                                                                  ACL_EXECUTE);
1411                         if (aclresult != ACLCHECK_OK)
1412                                 aclcheck_error(aclresult, ACL_KIND_PROC,
1413                                                            get_func_name(finalfn_oid));
1414                 }
1415         }
1416
1417         /* resolve actual type of transition state, if polymorphic */
1418         aggtranstype = aggform->aggtranstype;
1419         if (IsPolymorphicType(aggtranstype))
1420         {
1421                 /* have to fetch the agg's declared input types... */
1422                 Oid                *declaredArgTypes;
1423                 int                     agg_nargs;
1424
1425                 get_func_signature(wfunc->winfnoid,
1426                                                    &declaredArgTypes, &agg_nargs);
1427                 Assert(agg_nargs == numArguments);
1428                 aggtranstype = enforce_generic_type_consistency(inputTypes,
1429                                                                                                                 declaredArgTypes,
1430                                                                                                                 agg_nargs,
1431                                                                                                                 aggtranstype,
1432                                                                                                                 false);
1433                 pfree(declaredArgTypes);
1434         }
1435
1436         /* build expression trees using actual argument & result types */
1437         build_aggregate_fnexprs(inputTypes,
1438                                                         numArguments,
1439                                                         aggtranstype,
1440                                                         wfunc->wintype,
1441                                                         transfn_oid,
1442                                                         finalfn_oid,
1443                                                         &transfnexpr,
1444                                                         &finalfnexpr);
1445
1446         fmgr_info(transfn_oid, &peraggstate->transfn);
1447         peraggstate->transfn.fn_expr = (Node *) transfnexpr;
1448
1449         if (OidIsValid(finalfn_oid))
1450         {
1451                 fmgr_info(finalfn_oid, &peraggstate->finalfn);
1452                 peraggstate->finalfn.fn_expr = (Node *) finalfnexpr;
1453         }
1454
1455         get_typlenbyval(wfunc->wintype,
1456                                         &peraggstate->resulttypeLen,
1457                                         &peraggstate->resulttypeByVal);
1458         get_typlenbyval(aggtranstype,
1459                                         &peraggstate->transtypeLen,
1460                                         &peraggstate->transtypeByVal);
1461
1462         /*
1463          * initval is potentially null, so don't try to access it as a struct
1464          * field. Must do it the hard way with SysCacheGetAttr.
1465          */
1466         textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,
1467                                                                   Anum_pg_aggregate_agginitval,
1468                                                                   &peraggstate->initValueIsNull);
1469
1470         if (peraggstate->initValueIsNull)
1471                 peraggstate->initValue = (Datum) 0;
1472         else
1473                 peraggstate->initValue = GetAggInitVal(textInitVal,
1474                                                                                            aggtranstype);
1475
1476         /*
1477          * If the transfn is strict and the initval is NULL, make sure input type
1478          * and transtype are the same (or at least binary-compatible), so that
1479          * it's OK to use the first input value as the initial transValue.  This
1480          * should have been checked at agg definition time, but just in case...
1481          */
1482         if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull)
1483         {
1484                 if (numArguments < 1 ||
1485                         !IsBinaryCoercible(inputTypes[0], aggtranstype))
1486                         ereport(ERROR,
1487                                         (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
1488                                          errmsg("aggregate %u needs to have compatible input type and transition type",
1489                                                         wfunc->winfnoid)));
1490         }
1491
1492         ReleaseSysCache(aggTuple);
1493
1494         return peraggstate;
1495 }
1496
1497 static Datum
1498 GetAggInitVal(Datum textInitVal, Oid transtype)
1499 {
1500         Oid                     typinput,
1501                                 typioparam;
1502         char       *strInitVal;
1503         Datum           initVal;
1504
1505         getTypeInputInfo(transtype, &typinput, &typioparam);
1506         strInitVal = TextDatumGetCString(textInitVal);
1507         initVal = OidInputFunctionCall(typinput, strInitVal,
1508                                                                    typioparam, -1);
1509         pfree(strInitVal);
1510         return initVal;
1511 }
1512
1513 /*
1514  * are_peers
1515  * compare two rows to see if they are equal according to the ORDER BY clause
1516  *
1517  * NB: this does not consider the window frame mode.
1518  */
1519 static bool
1520 are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
1521                   TupleTableSlot *slot2)
1522 {
1523         WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
1524
1525         /* If no ORDER BY, all rows are peers with each other */
1526         if (node->ordNumCols == 0)
1527                 return true;
1528
1529         return execTuplesMatch(slot1, slot2,
1530                                                    node->ordNumCols, node->ordColIdx,
1531                                                    winstate->ordEqfunctions,
1532                                                    winstate->tmpcontext->ecxt_per_tuple_memory);
1533 }
1534
1535 /*
1536  * window_gettupleslot
1537  *      Fetch the pos'th tuple of the current partition into the slot,
1538  *      using the winobj's read pointer
1539  *
1540  * Returns true if successful, false if no such row
1541  */
1542 static bool
1543 window_gettupleslot(WindowObject winobj, int64 pos, TupleTableSlot *slot)
1544 {
1545         WindowAggState *winstate = winobj->winstate;
1546         MemoryContext oldcontext;
1547
1548         /* Don't allow passing -1 to spool_tuples here */
1549         if (pos < 0)
1550                 return false;
1551
1552         /* If necessary, fetch the tuple into the spool */
1553         spool_tuples(winstate, pos);
1554
1555         if (pos >= winstate->spooled_rows)
1556                 return false;
1557
1558         if (pos < winobj->markpos)
1559                 elog(ERROR, "cannot fetch row before WindowObject's mark position");
1560
1561         oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
1562
1563         tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
1564
1565         /*
1566          * There's no API to refetch the tuple at the current position. We have to
1567          * move one tuple forward, and then one backward.  (We don't do it the
1568          * other way because we might try to fetch the row before our mark, which
1569          * isn't allowed.)
1570          */
1571         if (winobj->seekpos == pos)
1572         {
1573                 tuplestore_advance(winstate->buffer, true);
1574                 winobj->seekpos++;
1575         }
1576
1577         while (winobj->seekpos > pos)
1578         {
1579                 if (!tuplestore_gettupleslot(winstate->buffer, false, true, slot))
1580                         elog(ERROR, "unexpected end of tuplestore");
1581                 winobj->seekpos--;
1582         }
1583
1584         while (winobj->seekpos < pos)
1585         {
1586                 if (!tuplestore_gettupleslot(winstate->buffer, true, true, slot))
1587                         elog(ERROR, "unexpected end of tuplestore");
1588                 winobj->seekpos++;
1589         }
1590
1591         MemoryContextSwitchTo(oldcontext);
1592
1593         return true;
1594 }
1595
1596
1597 /***********************************************************************
1598  * API exposed to window functions
1599  ***********************************************************************/
1600
1601
1602 /*
1603  * WinGetPartitionLocalMemory
1604  *              Get working memory that lives till end of partition processing
1605  *
1606  * On first call within a given partition, this allocates and zeroes the
1607  * requested amount of space.  Subsequent calls just return the same chunk.
1608  *
1609  * Memory obtained this way is normally used to hold state that should be
1610  * automatically reset for each new partition.  If a window function wants
1611  * to hold state across the whole query, fcinfo->fn_extra can be used in the
1612  * usual way for that.
1613  */
1614 void *
1615 WinGetPartitionLocalMemory(WindowObject winobj, Size sz)
1616 {
1617         Assert(WindowObjectIsValid(winobj));
1618         if (winobj->localmem == NULL)
1619                 winobj->localmem = MemoryContextAllocZero(winobj->winstate->wincontext,
1620                                                                                                   sz);
1621         return winobj->localmem;
1622 }
1623
1624 /*
1625  * WinGetCurrentPosition
1626  *              Return the current row's position (counting from 0) within the current
1627  *              partition.
1628  */
1629 int64
1630 WinGetCurrentPosition(WindowObject winobj)
1631 {
1632         Assert(WindowObjectIsValid(winobj));
1633         return winobj->winstate->currentpos;
1634 }
1635
1636 /*
1637  * WinGetPartitionRowCount
1638  *              Return total number of rows contained in the current partition.
1639  *
1640  * Note: this is a relatively expensive operation because it forces the
1641  * whole partition to be "spooled" into the tuplestore at once.  Once
1642  * executed, however, additional calls within the same partition are cheap.
1643  */
1644 int64
1645 WinGetPartitionRowCount(WindowObject winobj)
1646 {
1647         Assert(WindowObjectIsValid(winobj));
1648         spool_tuples(winobj->winstate, -1);
1649         return winobj->winstate->spooled_rows;
1650 }
1651
1652 /*
1653  * WinSetMarkPosition
1654  *              Set the "mark" position for the window object, which is the oldest row
1655  *              number (counting from 0) it is allowed to fetch during all subsequent
1656  *              operations within the current partition.
1657  *
1658  * Window functions do not have to call this, but are encouraged to move the
1659  * mark forward when possible to keep the tuplestore size down and prevent
1660  * having to spill rows to disk.
1661  */
1662 void
1663 WinSetMarkPosition(WindowObject winobj, int64 markpos)
1664 {
1665         WindowAggState *winstate;
1666
1667         Assert(WindowObjectIsValid(winobj));
1668         winstate = winobj->winstate;
1669
1670         if (markpos < winobj->markpos)
1671                 elog(ERROR, "cannot move WindowObject's mark position backward");
1672         tuplestore_select_read_pointer(winstate->buffer, winobj->markptr);
1673         while (markpos > winobj->markpos)
1674         {
1675                 tuplestore_advance(winstate->buffer, true);
1676                 winobj->markpos++;
1677         }
1678         tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
1679         while (markpos > winobj->seekpos)
1680         {
1681                 tuplestore_advance(winstate->buffer, true);
1682                 winobj->seekpos++;
1683         }
1684 }
1685
1686 /*
1687  * WinRowsArePeers
1688  *              Compare two rows (specified by absolute position in window) to see
1689  *              if they are equal according to the ORDER BY clause.
1690  *
1691  * NB: this does not consider the window frame mode.
1692  */
1693 bool
1694 WinRowsArePeers(WindowObject winobj, int64 pos1, int64 pos2)
1695 {
1696         WindowAggState *winstate;
1697         WindowAgg  *node;
1698         TupleTableSlot *slot1;
1699         TupleTableSlot *slot2;
1700         bool            res;
1701
1702         Assert(WindowObjectIsValid(winobj));
1703         winstate = winobj->winstate;
1704         node = (WindowAgg *) winstate->ss.ps.plan;
1705
1706         /* If no ORDER BY, all rows are peers; don't bother to fetch them */
1707         if (node->ordNumCols == 0)
1708                 return true;
1709
1710         slot1 = winstate->temp_slot_1;
1711         slot2 = winstate->temp_slot_2;
1712
1713         if (!window_gettupleslot(winobj, pos1, slot1))
1714                 elog(ERROR, "specified position is out of window: " INT64_FORMAT,
1715                          pos1);
1716         if (!window_gettupleslot(winobj, pos2, slot2))
1717                 elog(ERROR, "specified position is out of window: " INT64_FORMAT,
1718                          pos2);
1719
1720         res = are_peers(winstate, slot1, slot2);
1721
1722         ExecClearTuple(slot1);
1723         ExecClearTuple(slot2);
1724
1725         return res;
1726 }
1727
1728 /*
1729  * WinGetFuncArgInPartition
1730  *              Evaluate a window function's argument expression on a specified
1731  *              row of the partition.  The row is identified in lseek(2) style,
1732  *              i.e. relative to the current, first, or last row.
1733  *
1734  * argno: argument number to evaluate (counted from 0)
1735  * relpos: signed rowcount offset from the seek position
1736  * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
1737  * set_mark: If the row is found and set_mark is true, the mark is moved to
1738  *              the row as a side-effect.
1739  * isnull: output argument, receives isnull status of result
1740  * isout: output argument, set to indicate whether target row position
1741  *              is out of partition (can pass NULL if caller doesn't care about this)
1742  *
1743  * Specifying a nonexistent row is not an error, it just causes a null result
1744  * (plus setting *isout true, if isout isn't NULL).
1745  */
1746 Datum
1747 WinGetFuncArgInPartition(WindowObject winobj, int argno,
1748                                                  int relpos, int seektype, bool set_mark,
1749                                                  bool *isnull, bool *isout)
1750 {
1751         WindowAggState *winstate;
1752         ExprContext *econtext;
1753         TupleTableSlot *slot;
1754         bool            gottuple;
1755         int64           abs_pos;
1756
1757         Assert(WindowObjectIsValid(winobj));
1758         winstate = winobj->winstate;
1759         econtext = winstate->ss.ps.ps_ExprContext;
1760         slot = winstate->temp_slot_1;
1761
1762         switch (seektype)
1763         {
1764                 case WINDOW_SEEK_CURRENT:
1765                         abs_pos = winstate->currentpos + relpos;
1766                         break;
1767                 case WINDOW_SEEK_HEAD:
1768                         abs_pos = relpos;
1769                         break;
1770                 case WINDOW_SEEK_TAIL:
1771                         spool_tuples(winstate, -1);
1772                         abs_pos = winstate->spooled_rows - 1 + relpos;
1773                         break;
1774                 default:
1775                         elog(ERROR, "unrecognized window seek type: %d", seektype);
1776                         abs_pos = 0;            /* keep compiler quiet */
1777                         break;
1778         }
1779
1780         gottuple = window_gettupleslot(winobj, abs_pos, slot);
1781
1782         if (!gottuple)
1783         {
1784                 if (isout)
1785                         *isout = true;
1786                 *isnull = true;
1787                 return (Datum) 0;
1788         }
1789         else
1790         {
1791                 if (isout)
1792                         *isout = false;
1793                 if (set_mark)
1794                         WinSetMarkPosition(winobj, abs_pos);
1795                 econtext->ecxt_outertuple = slot;
1796                 return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
1797                                                         econtext, isnull, NULL);
1798         }
1799 }
1800
1801 /*
1802  * WinGetFuncArgInFrame
1803  *              Evaluate a window function's argument expression on a specified
1804  *              row of the window frame.  The row is identified in lseek(2) style,
1805  *              i.e. relative to the current, first, or last row.
1806  *
1807  * argno: argument number to evaluate (counted from 0)
1808  * relpos: signed rowcount offset from the seek position
1809  * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
1810  * set_mark: If the row is found and set_mark is true, the mark is moved to
1811  *              the row as a side-effect.
1812  * isnull: output argument, receives isnull status of result
1813  * isout: output argument, set to indicate whether target row position
1814  *              is out of frame (can pass NULL if caller doesn't care about this)
1815  *
1816  * Specifying a nonexistent row is not an error, it just causes a null result
1817  * (plus setting *isout true, if isout isn't NULL).
1818  */
1819 Datum
1820 WinGetFuncArgInFrame(WindowObject winobj, int argno,
1821                                          int relpos, int seektype, bool set_mark,
1822                                          bool *isnull, bool *isout)
1823 {
1824         WindowAggState *winstate;
1825         ExprContext *econtext;
1826         TupleTableSlot *slot;
1827         bool            gottuple;
1828         int64           abs_pos;
1829
1830         Assert(WindowObjectIsValid(winobj));
1831         winstate = winobj->winstate;
1832         econtext = winstate->ss.ps.ps_ExprContext;
1833         slot = winstate->temp_slot_1;
1834
1835         switch (seektype)
1836         {
1837                 case WINDOW_SEEK_CURRENT:
1838                         abs_pos = winstate->currentpos + relpos;
1839                         break;
1840                 case WINDOW_SEEK_HEAD:
1841                         abs_pos = relpos;
1842                         break;
1843                 case WINDOW_SEEK_TAIL:
1844                         update_frametailpos(winobj, slot);
1845                         abs_pos = winstate->frametailpos + relpos;
1846                         break;
1847                 default:
1848                         elog(ERROR, "unrecognized window seek type: %d", seektype);
1849                         abs_pos = 0;            /* keep compiler quiet */
1850                         break;
1851         }
1852
1853         gottuple = window_gettupleslot(winobj, abs_pos, slot);
1854         if (gottuple)
1855                 gottuple = row_is_in_frame(winstate, abs_pos, slot);
1856
1857         if (!gottuple)
1858         {
1859                 if (isout)
1860                         *isout = true;
1861                 *isnull = true;
1862                 return (Datum) 0;
1863         }
1864         else
1865         {
1866                 if (isout)
1867                         *isout = false;
1868                 if (set_mark)
1869                         WinSetMarkPosition(winobj, abs_pos);
1870                 econtext->ecxt_outertuple = slot;
1871                 return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
1872                                                         econtext, isnull, NULL);
1873         }
1874 }
1875
1876 /*
1877  * WinGetFuncArgCurrent
1878  *              Evaluate a window function's argument expression on the current row.
1879  *
1880  * argno: argument number to evaluate (counted from 0)
1881  * isnull: output argument, receives isnull status of result
1882  *
1883  * Note: this isn't quite equivalent to WinGetFuncArgInPartition or
1884  * WinGetFuncArgInFrame targeting the current row, because it will succeed
1885  * even if the WindowObject's mark has been set beyond the current row.
1886  * This should generally be used for "ordinary" arguments of a window
1887  * function, such as the offset argument of lead() or lag().
1888  */
1889 Datum
1890 WinGetFuncArgCurrent(WindowObject winobj, int argno, bool *isnull)
1891 {
1892         WindowAggState *winstate;
1893         ExprContext *econtext;
1894
1895         Assert(WindowObjectIsValid(winobj));
1896         winstate = winobj->winstate;
1897
1898         econtext = winstate->ss.ps.ps_ExprContext;
1899
1900         econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
1901         return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
1902                                                 econtext, isnull, NULL);
1903 }