OSDN Git Service

43059664b9348ae462dcb6b1166124f4f0fde2ee
[pg-rex/syncrep.git] / src / backend / executor / nodeMergeAppend.c
1 /*-------------------------------------------------------------------------
2  *
3  * nodeMergeAppend.c
4  *        routines to handle MergeAppend nodes.
5  *
6  * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/executor/nodeMergeAppend.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 /* INTERFACE ROUTINES
16  *              ExecInitMergeAppend             - initialize the MergeAppend node
17  *              ExecMergeAppend                 - retrieve the next tuple from the node
18  *              ExecEndMergeAppend              - shut down the MergeAppend node
19  *              ExecReScanMergeAppend   - rescan the MergeAppend node
20  *
21  *       NOTES
22  *              A MergeAppend node contains a list of one or more subplans.
23  *              These are each expected to deliver tuples that are sorted according
24  *              to a common sort key.  The MergeAppend node merges these streams
25  *              to produce output sorted the same way.
26  *
27  *              MergeAppend nodes don't make use of their left and right
28  *              subtrees, rather they maintain a list of subplans so
29  *              a typical MergeAppend node looks like this in the plan tree:
30  *
31  *                                 ...
32  *                                 /
33  *                              MergeAppend---+------+------+--- nil
34  *                              /       \                 |              |              |
35  *                        nil   nil              ...    ...    ...
36  *                                                               subplans
37  */
38
39 #include "postgres.h"
40
41 #include "access/nbtree.h"
42 #include "executor/execdebug.h"
43 #include "executor/nodeMergeAppend.h"
44 #include "utils/lsyscache.h"
45
46 /*
47  * It gets quite confusing having a heap array (indexed by integers) which
48  * contains integers which index into the slots array. These typedefs try to
49  * clear it up, but they're only documentation.
50  */
51 typedef int SlotNumber;
52 typedef int HeapPosition;
53
54 static void heap_insert_slot(MergeAppendState *node, SlotNumber new_slot);
55 static void heap_siftup_slot(MergeAppendState *node);
56 static int32 heap_compare_slots(MergeAppendState *node, SlotNumber slot1, SlotNumber slot2);
57
58
59 /* ----------------------------------------------------------------
60  *              ExecInitMergeAppend
61  *
62  *              Begin all of the subscans of the MergeAppend node.
63  * ----------------------------------------------------------------
64  */
65 MergeAppendState *
66 ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
67 {
68         MergeAppendState *mergestate = makeNode(MergeAppendState);
69         PlanState **mergeplanstates;
70         int                     nplans;
71         int                     i;
72         ListCell   *lc;
73
74         /* check for unsupported flags */
75         Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
76
77         /*
78          * Set up empty vector of subplan states
79          */
80         nplans = list_length(node->mergeplans);
81
82         mergeplanstates = (PlanState **) palloc0(nplans * sizeof(PlanState *));
83
84         /*
85          * create new MergeAppendState for our node
86          */
87         mergestate->ps.plan = (Plan *) node;
88         mergestate->ps.state = estate;
89         mergestate->mergeplans = mergeplanstates;
90         mergestate->ms_nplans = nplans;
91
92         mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
93         mergestate->ms_heap = (int *) palloc0(sizeof(int) * nplans);
94
95         /*
96          * Miscellaneous initialization
97          *
98          * MergeAppend plans don't have expression contexts because they never
99          * call ExecQual or ExecProject.
100          */
101
102         /*
103          * MergeAppend nodes do have Result slots, which hold pointers to tuples,
104          * so we have to initialize them.
105          */
106         ExecInitResultTupleSlot(estate, &mergestate->ps);
107
108         /*
109          * call ExecInitNode on each of the plans to be executed and save the
110          * results into the array "mergeplans".
111          */
112         i = 0;
113         foreach(lc, node->mergeplans)
114         {
115                 Plan       *initNode = (Plan *) lfirst(lc);
116
117                 mergeplanstates[i] = ExecInitNode(initNode, estate, eflags);
118                 i++;
119         }
120
121         /*
122          * initialize output tuple type
123          */
124         ExecAssignResultTypeFromTL(&mergestate->ps);
125         mergestate->ps.ps_ProjInfo = NULL;
126
127         /*
128          * initialize sort-key information
129          */
130         mergestate->ms_nkeys = node->numCols;
131         mergestate->ms_scankeys = palloc0(sizeof(ScanKeyData) * node->numCols);
132
133         for (i = 0; i < node->numCols; i++)
134         {
135                 Oid                     sortFunction;
136                 bool            reverse;
137                 int                     flags;
138
139                 if (!get_compare_function_for_ordering_op(node->sortOperators[i],
140                                                                                                   &sortFunction, &reverse))
141                         elog(ERROR, "operator %u is not a valid ordering operator",
142                                  node->sortOperators[i]);
143
144                 /* We use btree's conventions for encoding directionality */
145                 flags = 0;
146                 if (reverse)
147                         flags |= SK_BT_DESC;
148                 if (node->nullsFirst[i])
149                         flags |= SK_BT_NULLS_FIRST;
150
151                 /*
152                  * We needn't fill in sk_strategy or sk_subtype since these scankeys
153                  * will never be passed to an index.
154                  */
155                 ScanKeyEntryInitialize(&mergestate->ms_scankeys[i],
156                                                            flags,
157                                                            node->sortColIdx[i],
158                                                            InvalidStrategy,
159                                                            InvalidOid,
160                                                            node->collations[i],
161                                                            sortFunction,
162                                                            (Datum) 0);
163         }
164
165         /*
166          * initialize to show we have not run the subplans yet
167          */
168         mergestate->ms_heap_size = 0;
169         mergestate->ms_initialized = false;
170         mergestate->ms_last_slot = -1;
171
172         return mergestate;
173 }
174
175 /* ----------------------------------------------------------------
176  *         ExecMergeAppend
177  *
178  *              Handles iteration over multiple subplans.
179  * ----------------------------------------------------------------
180  */
181 TupleTableSlot *
182 ExecMergeAppend(MergeAppendState *node)
183 {
184         TupleTableSlot *result;
185         SlotNumber      i;
186
187         if (!node->ms_initialized)
188         {
189                 /*
190                  * First time through: pull the first tuple from each subplan, and set
191                  * up the heap.
192                  */
193                 for (i = 0; i < node->ms_nplans; i++)
194                 {
195                         node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
196                         if (!TupIsNull(node->ms_slots[i]))
197                                 heap_insert_slot(node, i);
198                 }
199                 node->ms_initialized = true;
200         }
201         else
202         {
203                 /*
204                  * Otherwise, pull the next tuple from whichever subplan we returned
205                  * from last time, and insert it into the heap.  (We could simplify
206                  * the logic a bit by doing this before returning from the prior call,
207                  * but it's better to not pull tuples until necessary.)
208                  */
209                 i = node->ms_last_slot;
210                 node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
211                 if (!TupIsNull(node->ms_slots[i]))
212                         heap_insert_slot(node, i);
213         }
214
215         if (node->ms_heap_size > 0)
216         {
217                 /* Return the topmost heap node, and sift up the remaining nodes */
218                 i = node->ms_heap[0];
219                 result = node->ms_slots[i];
220                 node->ms_last_slot = i;
221                 heap_siftup_slot(node);
222         }
223         else
224         {
225                 /* All the subplans are exhausted, and so is the heap */
226                 result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
227         }
228
229         return result;
230 }
231
232 /*
233  * Insert a new slot into the heap.  The slot must contain a valid tuple.
234  */
235 static void
236 heap_insert_slot(MergeAppendState *node, SlotNumber new_slot)
237 {
238         SlotNumber *heap = node->ms_heap;
239         HeapPosition j;
240
241         Assert(!TupIsNull(node->ms_slots[new_slot]));
242
243         j = node->ms_heap_size++;       /* j is where the "hole" is */
244         while (j > 0)
245         {
246                 int                     i = (j - 1) / 2;
247
248                 if (heap_compare_slots(node, new_slot, node->ms_heap[i]) >= 0)
249                         break;
250                 heap[j] = heap[i];
251                 j = i;
252         }
253         heap[j] = new_slot;
254 }
255
256 /*
257  * Delete the heap top (the slot in heap[0]), and sift up.
258  */
259 static void
260 heap_siftup_slot(MergeAppendState *node)
261 {
262         SlotNumber *heap = node->ms_heap;
263         HeapPosition i,
264                                 n;
265
266         if (--node->ms_heap_size <= 0)
267                 return;
268         n = node->ms_heap_size;         /* heap[n] needs to be reinserted */
269         i = 0;                                          /* i is where the "hole" is */
270         for (;;)
271         {
272                 int                     j = 2 * i + 1;
273
274                 if (j >= n)
275                         break;
276                 if (j + 1 < n && heap_compare_slots(node, heap[j], heap[j + 1]) > 0)
277                         j++;
278                 if (heap_compare_slots(node, heap[n], heap[j]) <= 0)
279                         break;
280                 heap[i] = heap[j];
281                 i = j;
282         }
283         heap[i] = heap[n];
284 }
285
286 /*
287  * Compare the tuples in the two given slots.
288  */
289 static int32
290 heap_compare_slots(MergeAppendState *node, SlotNumber slot1, SlotNumber slot2)
291 {
292         TupleTableSlot *s1 = node->ms_slots[slot1];
293         TupleTableSlot *s2 = node->ms_slots[slot2];
294         int                     nkey;
295
296         Assert(!TupIsNull(s1));
297         Assert(!TupIsNull(s2));
298
299         for (nkey = 0; nkey < node->ms_nkeys; nkey++)
300         {
301                 ScanKey         scankey = node->ms_scankeys + nkey;
302                 AttrNumber      attno = scankey->sk_attno;
303                 Datum           datum1,
304                                         datum2;
305                 bool            isNull1,
306                                         isNull2;
307                 int32           compare;
308
309                 datum1 = slot_getattr(s1, attno, &isNull1);
310                 datum2 = slot_getattr(s2, attno, &isNull2);
311
312                 if (isNull1)
313                 {
314                         if (isNull2)
315                                 continue;               /* NULL "=" NULL */
316                         else if (scankey->sk_flags & SK_BT_NULLS_FIRST)
317                                 return -1;              /* NULL "<" NOT_NULL */
318                         else
319                                 return 1;               /* NULL ">" NOT_NULL */
320                 }
321                 else if (isNull2)
322                 {
323                         if (scankey->sk_flags & SK_BT_NULLS_FIRST)
324                                 return 1;               /* NOT_NULL ">" NULL */
325                         else
326                                 return -1;              /* NOT_NULL "<" NULL */
327                 }
328                 else
329                 {
330                         compare = DatumGetInt32(FunctionCall2Coll(&scankey->sk_func,
331                                                                                                           scankey->sk_collation,
332                                                                                                           datum1, datum2));
333                         if (compare != 0)
334                         {
335                                 if (scankey->sk_flags & SK_BT_DESC)
336                                         compare = -compare;
337                                 return compare;
338                         }
339                 }
340         }
341         return 0;
342 }
343
344 /* ----------------------------------------------------------------
345  *              ExecEndMergeAppend
346  *
347  *              Shuts down the subscans of the MergeAppend node.
348  *
349  *              Returns nothing of interest.
350  * ----------------------------------------------------------------
351  */
352 void
353 ExecEndMergeAppend(MergeAppendState *node)
354 {
355         PlanState **mergeplans;
356         int                     nplans;
357         int                     i;
358
359         /*
360          * get information from the node
361          */
362         mergeplans = node->mergeplans;
363         nplans = node->ms_nplans;
364
365         /*
366          * shut down each of the subscans
367          */
368         for (i = 0; i < nplans; i++)
369                 ExecEndNode(mergeplans[i]);
370 }
371
372 void
373 ExecReScanMergeAppend(MergeAppendState *node)
374 {
375         int                     i;
376
377         for (i = 0; i < node->ms_nplans; i++)
378         {
379                 PlanState  *subnode = node->mergeplans[i];
380
381                 /*
382                  * ExecReScan doesn't know about my subplans, so I have to do
383                  * changed-parameter signaling myself.
384                  */
385                 if (node->ps.chgParam != NULL)
386                         UpdateChangedParamSet(subnode, node->ps.chgParam);
387
388                 /*
389                  * If chgParam of subnode is not null then plan will be re-scanned by
390                  * first ExecProcNode.
391                  */
392                 if (subnode->chgParam == NULL)
393                         ExecReScan(subnode);
394         }
395         node->ms_heap_size = 0;
396         node->ms_initialized = false;
397         node->ms_last_slot = -1;
398 }