OSDN Git Service

Move Trigger and TriggerDesc structs out of rel.h into a new reltrigger.h
[pg-rex/syncrep.git] / src / backend / executor / nodeMergeAppend.c
1 /*-------------------------------------------------------------------------
2  *
3  * nodeMergeAppend.c
4  *        routines to handle MergeAppend nodes.
5  *
6  * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/executor/nodeMergeAppend.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 /* INTERFACE ROUTINES
16  *              ExecInitMergeAppend             - initialize the MergeAppend node
17  *              ExecMergeAppend                 - retrieve the next tuple from the node
18  *              ExecEndMergeAppend              - shut down the MergeAppend node
19  *              ExecReScanMergeAppend   - rescan the MergeAppend node
20  *
21  *       NOTES
22  *              A MergeAppend node contains a list of one or more subplans.
23  *              These are each expected to deliver tuples that are sorted according
24  *              to a common sort key.  The MergeAppend node merges these streams
25  *              to produce output sorted the same way.
26  *
27  *              MergeAppend nodes don't make use of their left and right
28  *              subtrees, rather they maintain a list of subplans so
29  *              a typical MergeAppend node looks like this in the plan tree:
30  *
31  *                                 ...
32  *                                 /
33  *                              MergeAppend---+------+------+--- nil
34  *                              /       \                 |              |              |
35  *                        nil   nil              ...    ...    ...
36  *                                                               subplans
37  */
38
39 #include "postgres.h"
40
41 #include "access/nbtree.h"
42 #include "executor/execdebug.h"
43 #include "executor/nodeMergeAppend.h"
44 #include "utils/lsyscache.h"
45 #include "utils/rel.h"
46
47 /*
48  * It gets quite confusing having a heap array (indexed by integers) which
49  * contains integers which index into the slots array. These typedefs try to
50  * clear it up, but they're only documentation.
51  */
52 typedef int SlotNumber;
53 typedef int HeapPosition;
54
55 static void heap_insert_slot(MergeAppendState *node, SlotNumber new_slot);
56 static void heap_siftup_slot(MergeAppendState *node);
57 static int32 heap_compare_slots(MergeAppendState *node, SlotNumber slot1, SlotNumber slot2);
58
59
60 /* ----------------------------------------------------------------
61  *              ExecInitMergeAppend
62  *
63  *              Begin all of the subscans of the MergeAppend node.
64  * ----------------------------------------------------------------
65  */
66 MergeAppendState *
67 ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
68 {
69         MergeAppendState *mergestate = makeNode(MergeAppendState);
70         PlanState **mergeplanstates;
71         int                     nplans;
72         int                     i;
73         ListCell   *lc;
74
75         /* check for unsupported flags */
76         Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
77
78         /*
79          * Set up empty vector of subplan states
80          */
81         nplans = list_length(node->mergeplans);
82
83         mergeplanstates = (PlanState **) palloc0(nplans * sizeof(PlanState *));
84
85         /*
86          * create new MergeAppendState for our node
87          */
88         mergestate->ps.plan = (Plan *) node;
89         mergestate->ps.state = estate;
90         mergestate->mergeplans = mergeplanstates;
91         mergestate->ms_nplans = nplans;
92
93         mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
94         mergestate->ms_heap = (int *) palloc0(sizeof(int) * nplans);
95
96         /*
97          * Miscellaneous initialization
98          *
99          * MergeAppend plans don't have expression contexts because they never
100          * call ExecQual or ExecProject.
101          */
102
103         /*
104          * MergeAppend nodes do have Result slots, which hold pointers to tuples,
105          * so we have to initialize them.
106          */
107         ExecInitResultTupleSlot(estate, &mergestate->ps);
108
109         /*
110          * call ExecInitNode on each of the plans to be executed and save the
111          * results into the array "mergeplans".
112          */
113         i = 0;
114         foreach(lc, node->mergeplans)
115         {
116                 Plan       *initNode = (Plan *) lfirst(lc);
117
118                 mergeplanstates[i] = ExecInitNode(initNode, estate, eflags);
119                 i++;
120         }
121
122         /*
123          * initialize output tuple type
124          */
125         ExecAssignResultTypeFromTL(&mergestate->ps);
126         mergestate->ps.ps_ProjInfo = NULL;
127
128         /*
129          * initialize sort-key information
130          */
131         mergestate->ms_nkeys = node->numCols;
132         mergestate->ms_scankeys = palloc0(sizeof(ScanKeyData) * node->numCols);
133
134         for (i = 0; i < node->numCols; i++)
135         {
136                 Oid                     sortFunction;
137                 bool            reverse;
138                 int                     flags;
139
140                 if (!get_compare_function_for_ordering_op(node->sortOperators[i],
141                                                                                                   &sortFunction, &reverse))
142                         elog(ERROR, "operator %u is not a valid ordering operator",
143                                  node->sortOperators[i]);
144
145                 /* We use btree's conventions for encoding directionality */
146                 flags = 0;
147                 if (reverse)
148                         flags |= SK_BT_DESC;
149                 if (node->nullsFirst[i])
150                         flags |= SK_BT_NULLS_FIRST;
151
152                 /*
153                  * We needn't fill in sk_strategy or sk_subtype since these scankeys
154                  * will never be passed to an index.
155                  */
156                 ScanKeyEntryInitialize(&mergestate->ms_scankeys[i],
157                                                            flags,
158                                                            node->sortColIdx[i],
159                                                            InvalidStrategy,
160                                                            InvalidOid,
161                                                            node->collations[i],
162                                                            sortFunction,
163                                                            (Datum) 0);
164         }
165
166         /*
167          * initialize to show we have not run the subplans yet
168          */
169         mergestate->ms_heap_size = 0;
170         mergestate->ms_initialized = false;
171         mergestate->ms_last_slot = -1;
172
173         return mergestate;
174 }
175
176 /* ----------------------------------------------------------------
177  *         ExecMergeAppend
178  *
179  *              Handles iteration over multiple subplans.
180  * ----------------------------------------------------------------
181  */
182 TupleTableSlot *
183 ExecMergeAppend(MergeAppendState *node)
184 {
185         TupleTableSlot *result;
186         SlotNumber      i;
187
188         if (!node->ms_initialized)
189         {
190                 /*
191                  * First time through: pull the first tuple from each subplan, and set
192                  * up the heap.
193                  */
194                 for (i = 0; i < node->ms_nplans; i++)
195                 {
196                         node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
197                         if (!TupIsNull(node->ms_slots[i]))
198                                 heap_insert_slot(node, i);
199                 }
200                 node->ms_initialized = true;
201         }
202         else
203         {
204                 /*
205                  * Otherwise, pull the next tuple from whichever subplan we returned
206                  * from last time, and insert it into the heap.  (We could simplify
207                  * the logic a bit by doing this before returning from the prior call,
208                  * but it's better to not pull tuples until necessary.)
209                  */
210                 i = node->ms_last_slot;
211                 node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
212                 if (!TupIsNull(node->ms_slots[i]))
213                         heap_insert_slot(node, i);
214         }
215
216         if (node->ms_heap_size > 0)
217         {
218                 /* Return the topmost heap node, and sift up the remaining nodes */
219                 i = node->ms_heap[0];
220                 result = node->ms_slots[i];
221                 node->ms_last_slot = i;
222                 heap_siftup_slot(node);
223         }
224         else
225         {
226                 /* All the subplans are exhausted, and so is the heap */
227                 result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
228         }
229
230         return result;
231 }
232
233 /*
234  * Insert a new slot into the heap.  The slot must contain a valid tuple.
235  */
236 static void
237 heap_insert_slot(MergeAppendState *node, SlotNumber new_slot)
238 {
239         SlotNumber *heap = node->ms_heap;
240         HeapPosition j;
241
242         Assert(!TupIsNull(node->ms_slots[new_slot]));
243
244         j = node->ms_heap_size++;       /* j is where the "hole" is */
245         while (j > 0)
246         {
247                 int                     i = (j - 1) / 2;
248
249                 if (heap_compare_slots(node, new_slot, node->ms_heap[i]) >= 0)
250                         break;
251                 heap[j] = heap[i];
252                 j = i;
253         }
254         heap[j] = new_slot;
255 }
256
257 /*
258  * Delete the heap top (the slot in heap[0]), and sift up.
259  */
260 static void
261 heap_siftup_slot(MergeAppendState *node)
262 {
263         SlotNumber *heap = node->ms_heap;
264         HeapPosition i,
265                                 n;
266
267         if (--node->ms_heap_size <= 0)
268                 return;
269         n = node->ms_heap_size;         /* heap[n] needs to be reinserted */
270         i = 0;                                          /* i is where the "hole" is */
271         for (;;)
272         {
273                 int                     j = 2 * i + 1;
274
275                 if (j >= n)
276                         break;
277                 if (j + 1 < n && heap_compare_slots(node, heap[j], heap[j + 1]) > 0)
278                         j++;
279                 if (heap_compare_slots(node, heap[n], heap[j]) <= 0)
280                         break;
281                 heap[i] = heap[j];
282                 i = j;
283         }
284         heap[i] = heap[n];
285 }
286
287 /*
288  * Compare the tuples in the two given slots.
289  */
290 static int32
291 heap_compare_slots(MergeAppendState *node, SlotNumber slot1, SlotNumber slot2)
292 {
293         TupleTableSlot *s1 = node->ms_slots[slot1];
294         TupleTableSlot *s2 = node->ms_slots[slot2];
295         int                     nkey;
296
297         Assert(!TupIsNull(s1));
298         Assert(!TupIsNull(s2));
299
300         for (nkey = 0; nkey < node->ms_nkeys; nkey++)
301         {
302                 ScanKey         scankey = node->ms_scankeys + nkey;
303                 AttrNumber      attno = scankey->sk_attno;
304                 Datum           datum1,
305                                         datum2;
306                 bool            isNull1,
307                                         isNull2;
308                 int32           compare;
309
310                 datum1 = slot_getattr(s1, attno, &isNull1);
311                 datum2 = slot_getattr(s2, attno, &isNull2);
312
313                 if (isNull1)
314                 {
315                         if (isNull2)
316                                 continue;               /* NULL "=" NULL */
317                         else if (scankey->sk_flags & SK_BT_NULLS_FIRST)
318                                 return -1;              /* NULL "<" NOT_NULL */
319                         else
320                                 return 1;               /* NULL ">" NOT_NULL */
321                 }
322                 else if (isNull2)
323                 {
324                         if (scankey->sk_flags & SK_BT_NULLS_FIRST)
325                                 return 1;               /* NOT_NULL ">" NULL */
326                         else
327                                 return -1;              /* NOT_NULL "<" NULL */
328                 }
329                 else
330                 {
331                         compare = DatumGetInt32(FunctionCall2Coll(&scankey->sk_func,
332                                                                                                           scankey->sk_collation,
333                                                                                                           datum1, datum2));
334                         if (compare != 0)
335                         {
336                                 if (scankey->sk_flags & SK_BT_DESC)
337                                         compare = -compare;
338                                 return compare;
339                         }
340                 }
341         }
342         return 0;
343 }
344
345 /* ----------------------------------------------------------------
346  *              ExecEndMergeAppend
347  *
348  *              Shuts down the subscans of the MergeAppend node.
349  *
350  *              Returns nothing of interest.
351  * ----------------------------------------------------------------
352  */
353 void
354 ExecEndMergeAppend(MergeAppendState *node)
355 {
356         PlanState **mergeplans;
357         int                     nplans;
358         int                     i;
359
360         /*
361          * get information from the node
362          */
363         mergeplans = node->mergeplans;
364         nplans = node->ms_nplans;
365
366         /*
367          * shut down each of the subscans
368          */
369         for (i = 0; i < nplans; i++)
370                 ExecEndNode(mergeplans[i]);
371 }
372
373 void
374 ExecReScanMergeAppend(MergeAppendState *node)
375 {
376         int                     i;
377
378         for (i = 0; i < node->ms_nplans; i++)
379         {
380                 PlanState  *subnode = node->mergeplans[i];
381
382                 /*
383                  * ExecReScan doesn't know about my subplans, so I have to do
384                  * changed-parameter signaling myself.
385                  */
386                 if (node->ps.chgParam != NULL)
387                         UpdateChangedParamSet(subnode, node->ps.chgParam);
388
389                 /*
390                  * If chgParam of subnode is not null then plan will be re-scanned by
391                  * first ExecProcNode.
392                  */
393                 if (subnode->chgParam == NULL)
394                         ExecReScan(subnode);
395         }
396         node->ms_heap_size = 0;
397         node->ms_initialized = false;
398         node->ms_last_slot = -1;
399 }