OSDN Git Service

Standard pgindent run for 8.1.
[pg-rex/syncrep.git] / src / backend / optimizer / prep / preptlist.c
1 /*-------------------------------------------------------------------------
2  *
3  * preptlist.c
4  *        Routines to preprocess the parse tree target list
5  *
6  * This module takes care of altering the query targetlist as needed for
7  * INSERT, UPDATE, and DELETE queries.  For INSERT and UPDATE queries,
8  * the targetlist must contain an entry for each attribute of the target
9  * relation in the correct order.  For both UPDATE and DELETE queries,
10  * we need a junk targetlist entry holding the CTID attribute --- the
11  * executor relies on this to find the tuple to be replaced/deleted.
12  *
13  *
14  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
15  * Portions Copyright (c) 1994, Regents of the University of California
16  *
17  * IDENTIFICATION
18  *        $PostgreSQL: pgsql/src/backend/optimizer/prep/preptlist.c,v 1.78 2005/10/15 02:49:21 momjian Exp $
19  *
20  *-------------------------------------------------------------------------
21  */
22
23 #include "postgres.h"
24
25 #include "access/heapam.h"
26 #include "catalog/pg_type.h"
27 #include "nodes/makefuncs.h"
28 #include "optimizer/prep.h"
29 #include "optimizer/subselect.h"
30 #include "parser/analyze.h"
31 #include "parser/parsetree.h"
32 #include "parser/parse_coerce.h"
33
34
35 static List *expand_targetlist(List *tlist, int command_type,
36                                   Index result_relation, List *range_table);
37
38
39 /*
40  * preprocess_targetlist
41  *        Driver for preprocessing the parse tree targetlist.
42  *
43  *        Returns the new targetlist.
44  */
45 List *
46 preprocess_targetlist(PlannerInfo *root, List *tlist)
47 {
48         Query      *parse = root->parse;
49         int                     result_relation = parse->resultRelation;
50         List       *range_table = parse->rtable;
51         CmdType         command_type = parse->commandType;
52
53         /*
54          * Sanity check: if there is a result relation, it'd better be a real
55          * relation not a subquery.  Else parser or rewriter messed up.
56          */
57         if (result_relation)
58         {
59                 RangeTblEntry *rte = rt_fetch(result_relation, range_table);
60
61                 if (rte->subquery != NULL || rte->relid == InvalidOid)
62                         elog(ERROR, "subquery cannot be result relation");
63         }
64
65         /*
66          * for heap_formtuple to work, the targetlist must match the exact order
67          * of the attributes. We also need to fill in any missing attributes.
68          * -ay 10/94
69          */
70         if (command_type == CMD_INSERT || command_type == CMD_UPDATE)
71                 tlist = expand_targetlist(tlist, command_type,
72                                                                   result_relation, range_table);
73
74         /*
75          * for "update" and "delete" queries, add ctid of the result relation into
76          * the target list so that the ctid will propagate through execution and
77          * ExecutePlan() will be able to identify the right tuple to replace or
78          * delete.      This extra field is marked "junk" so that it is not stored
79          * back into the tuple.
80          */
81         if (command_type == CMD_UPDATE || command_type == CMD_DELETE)
82         {
83                 TargetEntry *tle;
84                 Var                *var;
85
86                 var = makeVar(result_relation, SelfItemPointerAttributeNumber,
87                                           TIDOID, -1, 0);
88
89                 tle = makeTargetEntry((Expr *) var,
90                                                           list_length(tlist) + 1,
91                                                           pstrdup("ctid"),
92                                                           true);
93
94                 /*
95                  * For an UPDATE, expand_targetlist already created a fresh tlist. For
96                  * DELETE, better do a listCopy so that we don't destructively modify
97                  * the original tlist (is this really necessary?).
98                  */
99                 if (command_type == CMD_DELETE)
100                         tlist = list_copy(tlist);
101
102                 tlist = lappend(tlist, tle);
103         }
104
105         /*
106          * Add TID targets for rels selected FOR UPDATE/SHARE.  The executor uses
107          * the TID to know which rows to lock, much as for UPDATE or DELETE.
108          */
109         if (parse->rowMarks)
110         {
111                 ListCell   *l;
112
113                 /*
114                  * We've got trouble if the FOR UPDATE/SHARE appears inside grouping,
115                  * since grouping renders a reference to individual tuple CTIDs
116                  * invalid.  This is also checked at parse time, but that's
117                  * insufficient because of rule substitution, query pullup, etc.
118                  */
119                 CheckSelectLocking(parse, parse->forUpdate);
120
121                 /*
122                  * Currently the executor only supports FOR UPDATE/SHARE at top level
123                  */
124                 if (PlannerQueryLevel > 1)
125                         ereport(ERROR,
126                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
127                         errmsg("SELECT FOR UPDATE/SHARE is not allowed in subqueries")));
128
129                 foreach(l, parse->rowMarks)
130                 {
131                         Index           rti = lfirst_int(l);
132                         Var                *var;
133                         char       *resname;
134                         TargetEntry *tle;
135
136                         var = makeVar(rti,
137                                                   SelfItemPointerAttributeNumber,
138                                                   TIDOID,
139                                                   -1,
140                                                   0);
141
142                         resname = (char *) palloc(32);
143                         snprintf(resname, 32, "ctid%u", rti);
144
145                         tle = makeTargetEntry((Expr *) var,
146                                                                   list_length(tlist) + 1,
147                                                                   resname,
148                                                                   true);
149
150                         tlist = lappend(tlist, tle);
151                 }
152         }
153
154         return tlist;
155 }
156
157 /*****************************************************************************
158  *
159  *              TARGETLIST EXPANSION
160  *
161  *****************************************************************************/
162
163 /*
164  * expand_targetlist
165  *        Given a target list as generated by the parser and a result relation,
166  *        add targetlist entries for any missing attributes, and ensure the
167  *        non-junk attributes appear in proper field order.
168  *
169  * NOTE: if you are tempted to put more processing here, consider whether
170  * it shouldn't go in the rewriter's rewriteTargetList() instead.
171  */
172 static List *
173 expand_targetlist(List *tlist, int command_type,
174                                   Index result_relation, List *range_table)
175 {
176         List       *new_tlist = NIL;
177         ListCell   *tlist_item;
178         Relation        rel;
179         int                     attrno,
180                                 numattrs;
181
182         tlist_item = list_head(tlist);
183
184         /*
185          * The rewriter should have already ensured that the TLEs are in correct
186          * order; but we have to insert TLEs for any missing attributes.
187          *
188          * Scan the tuple description in the relation's relcache entry to make sure
189          * we have all the user attributes in the right order.  We assume that the
190          * rewriter already acquired at least AccessShareLock on the relation, so
191          * we need no lock here.
192          */
193         rel = heap_open(getrelid(result_relation, range_table), NoLock);
194
195         numattrs = RelationGetNumberOfAttributes(rel);
196
197         for (attrno = 1; attrno <= numattrs; attrno++)
198         {
199                 Form_pg_attribute att_tup = rel->rd_att->attrs[attrno - 1];
200                 TargetEntry *new_tle = NULL;
201
202                 if (tlist_item != NULL)
203                 {
204                         TargetEntry *old_tle = (TargetEntry *) lfirst(tlist_item);
205
206                         if (!old_tle->resjunk && old_tle->resno == attrno)
207                         {
208                                 new_tle = old_tle;
209                                 tlist_item = lnext(tlist_item);
210                         }
211                 }
212
213                 if (new_tle == NULL)
214                 {
215                         /*
216                          * Didn't find a matching tlist entry, so make one.
217                          *
218                          * For INSERT, generate a NULL constant.  (We assume the rewriter
219                          * would have inserted any available default value.) Also, if the
220                          * column isn't dropped, apply any domain constraints that might
221                          * exist --- this is to catch domain NOT NULL.
222                          *
223                          * For UPDATE, generate a Var reference to the existing value of the
224                          * attribute, so that it gets copied to the new tuple. But
225                          * generate a NULL for dropped columns (we want to drop any old
226                          * values).
227                          *
228                          * When generating a NULL constant for a dropped column, we label it
229                          * INT4 (any other guaranteed-to-exist datatype would do as well).
230                          * We can't label it with the dropped column's datatype since that
231                          * might not exist anymore.  It does not really matter what we
232                          * claim the type is, since NULL is NULL --- its representation is
233                          * datatype-independent.  This could perhaps confuse code
234                          * comparing the finished plan to the target relation, however.
235                          */
236                         Oid                     atttype = att_tup->atttypid;
237                         int32           atttypmod = att_tup->atttypmod;
238                         Node       *new_expr;
239
240                         switch (command_type)
241                         {
242                                 case CMD_INSERT:
243                                         if (!att_tup->attisdropped)
244                                         {
245                                                 new_expr = (Node *) makeConst(atttype,
246                                                                                                           att_tup->attlen,
247                                                                                                           (Datum) 0,
248                                                                                                           true,         /* isnull */
249                                                                                                           att_tup->attbyval);
250                                                 new_expr = coerce_to_domain(new_expr,
251                                                                                                         InvalidOid,
252                                                                                                         atttype,
253                                                                                                         COERCE_IMPLICIT_CAST,
254                                                                                                         false,
255                                                                                                         false);
256                                         }
257                                         else
258                                         {
259                                                 /* Insert NULL for dropped column */
260                                                 new_expr = (Node *) makeConst(INT4OID,
261                                                                                                           sizeof(int32),
262                                                                                                           (Datum) 0,
263                                                                                                           true,         /* isnull */
264                                                                                                           true /* byval */ );
265                                         }
266                                         break;
267                                 case CMD_UPDATE:
268                                         if (!att_tup->attisdropped)
269                                         {
270                                                 new_expr = (Node *) makeVar(result_relation,
271                                                                                                         attrno,
272                                                                                                         atttype,
273                                                                                                         atttypmod,
274                                                                                                         0);
275                                         }
276                                         else
277                                         {
278                                                 /* Insert NULL for dropped column */
279                                                 new_expr = (Node *) makeConst(INT4OID,
280                                                                                                           sizeof(int32),
281                                                                                                           (Datum) 0,
282                                                                                                           true,         /* isnull */
283                                                                                                           true /* byval */ );
284                                         }
285                                         break;
286                                 default:
287                                         elog(ERROR, "unrecognized command_type: %d",
288                                                  (int) command_type);
289                                         new_expr = NULL;        /* keep compiler quiet */
290                                         break;
291                         }
292
293                         new_tle = makeTargetEntry((Expr *) new_expr,
294                                                                           attrno,
295                                                                           pstrdup(NameStr(att_tup->attname)),
296                                                                           false);
297                 }
298
299                 new_tlist = lappend(new_tlist, new_tle);
300         }
301
302         /*
303          * The remaining tlist entries should be resjunk; append them all to the
304          * end of the new tlist, making sure they have resnos higher than the last
305          * real attribute.      (Note: although the rewriter already did such
306          * renumbering, we have to do it again here in case we are doing an UPDATE
307          * in a table with dropped columns, or an inheritance child table with
308          * extra columns.)
309          */
310         while (tlist_item)
311         {
312                 TargetEntry *old_tle = (TargetEntry *) lfirst(tlist_item);
313
314                 if (!old_tle->resjunk)
315                         elog(ERROR, "targetlist is not sorted correctly");
316                 /* Get the resno right, but don't copy unnecessarily */
317                 if (old_tle->resno != attrno)
318                 {
319                         old_tle = flatCopyTargetEntry(old_tle);
320                         old_tle->resno = attrno;
321                 }
322                 new_tlist = lappend(new_tlist, old_tle);
323                 attrno++;
324                 tlist_item = lnext(tlist_item);
325         }
326
327         heap_close(rel, NoLock);
328
329         return new_tlist;
330 }