OSDN Git Service

Allow CREATE OR REPLACE VIEW to add columns to the _end_ of the view.
[pg-rex/syncrep.git] / src / backend / commands / view.c
1 /*-------------------------------------------------------------------------
2  *
3  * view.c
4  *        use rewrite rules to construct views
5  *
6  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/commands/view.c,v 1.108 2008/12/06 23:22:46 momjian Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/heapam.h"
18 #include "access/xact.h"
19 #include "catalog/namespace.h"
20 #include "commands/defrem.h"
21 #include "commands/tablecmds.h"
22 #include "commands/view.h"
23 #include "miscadmin.h"
24 #include "nodes/makefuncs.h"
25 #include "nodes/nodeFuncs.h"
26 #include "parser/analyze.h"
27 #include "parser/parse_relation.h"
28 #include "rewrite/rewriteDefine.h"
29 #include "rewrite/rewriteManip.h"
30 #include "rewrite/rewriteSupport.h"
31 #include "utils/acl.h"
32 #include "utils/lsyscache.h"
33 #include "utils/rel.h"
34
35
36 static void checkViewTupleDesc(TupleDesc newdesc, TupleDesc olddesc);
37 static bool isViewOnTempTable_walker(Node *node, void *context);
38
39 /*---------------------------------------------------------------------
40  * isViewOnTempTable
41  *
42  * Returns true iff any of the relations underlying this view are
43  * temporary tables.
44  *---------------------------------------------------------------------
45  */
46 static bool
47 isViewOnTempTable(Query *viewParse)
48 {
49         return isViewOnTempTable_walker((Node *) viewParse, NULL);
50 }
51
52 static bool
53 isViewOnTempTable_walker(Node *node, void *context)
54 {
55         if (node == NULL)
56                 return false;
57
58         if (IsA(node, Query))
59         {
60                 Query      *query = (Query *) node;
61                 ListCell   *rtable;
62
63                 foreach(rtable, query->rtable)
64                 {
65                         RangeTblEntry *rte = lfirst(rtable);
66
67                         if (rte->rtekind == RTE_RELATION)
68                         {
69                                 Relation        rel = heap_open(rte->relid, AccessShareLock);
70                                 bool            istemp = rel->rd_istemp;
71
72                                 heap_close(rel, AccessShareLock);
73                                 if (istemp)
74                                         return true;
75                         }
76                 }
77
78                 return query_tree_walker(query,
79                                                                  isViewOnTempTable_walker,
80                                                                  context,
81                                                                  QTW_IGNORE_JOINALIASES);
82         }
83
84         return expression_tree_walker(node,
85                                                                   isViewOnTempTable_walker,
86                                                                   context);
87 }
88
89 /*---------------------------------------------------------------------
90  * DefineVirtualRelation
91  *
92  * Create the "view" relation. `DefineRelation' does all the work,
93  * we just provide the correct arguments ... at least when we're
94  * creating a view.  If we're updating an existing view, we have to
95  * work harder.
96  *---------------------------------------------------------------------
97  */
98 static Oid
99 DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace)
100 {
101         Oid                     viewOid,
102                                 namespaceId;
103         CreateStmt *createStmt = makeNode(CreateStmt);
104         List       *attrList;
105         ListCell   *t;
106
107         /*
108          * create a list of ColumnDef nodes based on the names and types of the
109          * (non-junk) targetlist items from the view's SELECT list.
110          */
111         attrList = NIL;
112         foreach(t, tlist)
113         {
114                 TargetEntry *tle = lfirst(t);
115
116                 if (!tle->resjunk)
117                 {
118                         ColumnDef  *def = makeNode(ColumnDef);
119
120                         def->colname = pstrdup(tle->resname);
121                         def->typename = makeTypeNameFromOid(exprType((Node *) tle->expr),
122                                                                                          exprTypmod((Node *) tle->expr));
123                         def->inhcount = 0;
124                         def->is_local = true;
125                         def->is_not_null = false;
126                         def->raw_default = NULL;
127                         def->cooked_default = NULL;
128                         def->constraints = NIL;
129
130                         attrList = lappend(attrList, def);
131                 }
132         }
133
134         if (attrList == NIL)
135                 ereport(ERROR,
136                                 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
137                                  errmsg("view must have at least one column")));
138
139         /*
140          * Check to see if we want to replace an existing view.
141          */
142         namespaceId = RangeVarGetCreationNamespace(relation);
143         viewOid = get_relname_relid(relation->relname, namespaceId);
144
145         if (OidIsValid(viewOid) && replace)
146         {
147                 Relation        rel;
148                 TupleDesc       descriptor;
149
150                 /*
151                  * Yes.  Get exclusive lock on the existing view ...
152                  */
153                 rel = relation_open(viewOid, AccessExclusiveLock);
154
155                 /*
156                  * Make sure it *is* a view, and do permissions checks.
157                  */
158                 if (rel->rd_rel->relkind != RELKIND_VIEW)
159                         ereport(ERROR,
160                                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
161                                          errmsg("\"%s\" is not a view",
162                                                         RelationGetRelationName(rel))));
163
164                 if (!pg_class_ownercheck(viewOid, GetUserId()))
165                         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
166                                                    RelationGetRelationName(rel));
167
168                 /*
169                  * Due to the namespace visibility rules for temporary objects, we
170                  * should only end up replacing a temporary view with another
171                  * temporary view, and vice versa.
172                  */
173                 Assert(relation->istemp == rel->rd_istemp);
174
175                 /*
176                  * If new attributes have been added, we must modify the pre-existing
177                  * view.
178                  */
179                 if (list_length(attrList) > rel->rd_att->natts) {
180                         List            *atcmds = NIL;
181                         ListCell        *c;
182                         int                     skip = rel->rd_att->natts;
183
184                         foreach(c, attrList) {
185                                 AlterTableCmd *atcmd;
186
187                                 if (skip > 0) {
188                                         --skip;
189                                         continue;
190                                 }
191                                 atcmd = makeNode(AlterTableCmd);
192                                 atcmd->subtype = AT_AddColumnToView;
193                                 atcmd->def = lfirst(c);
194                                 atcmds = lappend(atcmds, atcmd);
195                         }
196                         AlterTableInternal(viewOid, atcmds, true);
197                 }
198
199                 /*
200                  * Create a tuple descriptor to compare against the existing view, and
201                  * verify that the old column list is an initial prefix of the new
202                  * column list.
203                  */
204                 descriptor = BuildDescForRelation(attrList);
205                 checkViewTupleDesc(descriptor, rel->rd_att);
206
207                 /*
208                  * Seems okay, so return the OID of the pre-existing view.
209                  */
210                 relation_close(rel, NoLock);    /* keep the lock! */
211
212                 return viewOid;
213         }
214         else
215         {
216                 /*
217                  * now set the parameters for keys/inheritance etc. All of these are
218                  * uninteresting for views...
219                  */
220                 createStmt->relation = (RangeVar *) relation;
221                 createStmt->tableElts = attrList;
222                 createStmt->inhRelations = NIL;
223                 createStmt->constraints = NIL;
224                 createStmt->options = list_make1(defWithOids(false));
225                 createStmt->oncommit = ONCOMMIT_NOOP;
226                 createStmt->tablespacename = NULL;
227
228                 /*
229                  * finally create the relation (this will error out if there's an
230                  * existing view, so we don't need more code to complain if "replace"
231                  * is false).
232                  */
233                 return DefineRelation(createStmt, RELKIND_VIEW);
234         }
235 }
236
237 /*
238  * Verify that tupledesc associated with proposed new view definition
239  * matches tupledesc of old view.  This is basically a cut-down version
240  * of equalTupleDescs(), with code added to generate specific complaints.
241  */
242 static void
243 checkViewTupleDesc(TupleDesc newdesc, TupleDesc olddesc)
244 {
245         int                     i;
246
247         if (newdesc->natts < olddesc->natts)
248                 ereport(ERROR,
249                                 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
250                                  errmsg("cannot drop columns from view")));
251         /* we can ignore tdhasoid */
252
253         for (i = 0; i < olddesc->natts; i++)
254         {
255                 Form_pg_attribute newattr = newdesc->attrs[i];
256                 Form_pg_attribute oldattr = olddesc->attrs[i];
257
258                 /* XXX not right, but we don't support DROP COL on view anyway */
259                 if (newattr->attisdropped != oldattr->attisdropped)
260                         ereport(ERROR,
261                                         (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
262                                          errmsg("cannot drop columns from view")));
263
264                 if (strcmp(NameStr(newattr->attname), NameStr(oldattr->attname)) != 0)
265                         ereport(ERROR,
266                                         (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
267                                          errmsg("cannot change name of view column \"%s\"",
268                                                         NameStr(oldattr->attname))));
269                 /* XXX would it be safe to allow atttypmod to change?  Not sure */
270                 if (newattr->atttypid != oldattr->atttypid ||
271                         newattr->atttypmod != oldattr->atttypmod)
272                         ereport(ERROR,
273                                         (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
274                                          errmsg("cannot change data type of view column \"%s\"",
275                                                         NameStr(oldattr->attname))));
276                 /* We can ignore the remaining attributes of an attribute... */
277         }
278
279         /*
280          * We ignore the constraint fields.  The new view desc can't have any
281          * constraints, and the only ones that could be on the old view are
282          * defaults, which we are happy to leave in place.
283          */
284 }
285
286 static void
287 DefineViewRules(Oid viewOid, Query *viewParse, bool replace)
288 {
289         /*
290          * Set up the ON SELECT rule.  Since the query has already been through
291          * parse analysis, we use DefineQueryRewrite() directly.
292          */
293         DefineQueryRewrite(pstrdup(ViewSelectRuleName),
294                                            viewOid,
295                                            NULL,
296                                            CMD_SELECT,
297                                            true,
298                                            replace,
299                                            list_make1(viewParse));
300
301         /*
302          * Someday: automatic ON INSERT, etc
303          */
304 }
305
306 /*---------------------------------------------------------------
307  * UpdateRangeTableOfViewParse
308  *
309  * Update the range table of the given parsetree.
310  * This update consists of adding two new entries IN THE BEGINNING
311  * of the range table (otherwise the rule system will die a slow,
312  * horrible and painful death, and we do not want that now, do we?)
313  * one for the OLD relation and one for the NEW one (both of
314  * them refer in fact to the "view" relation).
315  *
316  * Of course we must also increase the 'varnos' of all the Var nodes
317  * by 2...
318  *
319  * These extra RT entries are not actually used in the query,
320  * except for run-time permission checking.
321  *---------------------------------------------------------------
322  */
323 static Query *
324 UpdateRangeTableOfViewParse(Oid viewOid, Query *viewParse)
325 {
326         Relation        viewRel;
327         List       *new_rt;
328         RangeTblEntry *rt_entry1,
329                            *rt_entry2;
330
331         /*
332          * Make a copy of the given parsetree.  It's not so much that we don't
333          * want to scribble on our input, it's that the parser has a bad habit of
334          * outputting multiple links to the same subtree for constructs like
335          * BETWEEN, and we mustn't have OffsetVarNodes increment the varno of a
336          * Var node twice.      copyObject will expand any multiply-referenced subtree
337          * into multiple copies.
338          */
339         viewParse = (Query *) copyObject(viewParse);
340
341         /* need to open the rel for addRangeTableEntryForRelation */
342         viewRel = relation_open(viewOid, AccessShareLock);
343
344         /*
345          * Create the 2 new range table entries and form the new range table...
346          * OLD first, then NEW....
347          */
348         rt_entry1 = addRangeTableEntryForRelation(NULL, viewRel,
349                                                                                           makeAlias("*OLD*", NIL),
350                                                                                           false, false);
351         rt_entry2 = addRangeTableEntryForRelation(NULL, viewRel,
352                                                                                           makeAlias("*NEW*", NIL),
353                                                                                           false, false);
354         /* Must override addRangeTableEntry's default access-check flags */
355         rt_entry1->requiredPerms = 0;
356         rt_entry2->requiredPerms = 0;
357
358         new_rt = lcons(rt_entry1, lcons(rt_entry2, viewParse->rtable));
359
360         viewParse->rtable = new_rt;
361
362         /*
363          * Now offset all var nodes by 2, and jointree RT indexes too.
364          */
365         OffsetVarNodes((Node *) viewParse, 2, 0);
366
367         relation_close(viewRel, AccessShareLock);
368
369         return viewParse;
370 }
371
372 /*
373  * DefineView
374  *              Execute a CREATE VIEW command.
375  */
376 void
377 DefineView(ViewStmt *stmt, const char *queryString)
378 {
379         Query      *viewParse;
380         Oid                     viewOid;
381         RangeVar   *view;
382
383         /*
384          * Run parse analysis to convert the raw parse tree to a Query.  Note this
385          * also acquires sufficient locks on the source table(s).
386          *
387          * Since parse analysis scribbles on its input, copy the raw parse tree;
388          * this ensures we don't corrupt a prepared statement, for example.
389          */
390         viewParse = parse_analyze((Node *) copyObject(stmt->query),
391                                                           queryString, NULL, 0);
392
393         /*
394          * The grammar should ensure that the result is a single SELECT Query.
395          */
396         if (!IsA(viewParse, Query) ||
397                 viewParse->commandType != CMD_SELECT)
398                 elog(ERROR, "unexpected parse analysis result");
399
400         /*
401          * If a list of column names was given, run through and insert these into
402          * the actual query tree. - thomas 2000-03-08
403          */
404         if (stmt->aliases != NIL)
405         {
406                 ListCell   *alist_item = list_head(stmt->aliases);
407                 ListCell   *targetList;
408
409                 foreach(targetList, viewParse->targetList)
410                 {
411                         TargetEntry *te = (TargetEntry *) lfirst(targetList);
412
413                         Assert(IsA(te, TargetEntry));
414                         /* junk columns don't get aliases */
415                         if (te->resjunk)
416                                 continue;
417                         te->resname = pstrdup(strVal(lfirst(alist_item)));
418                         alist_item = lnext(alist_item);
419                         if (alist_item == NULL)
420                                 break;                  /* done assigning aliases */
421                 }
422
423                 if (alist_item != NULL)
424                         ereport(ERROR,
425                                         (errcode(ERRCODE_SYNTAX_ERROR),
426                                          errmsg("CREATE VIEW specifies more column "
427                                                         "names than columns")));
428         }
429
430         /*
431          * If the user didn't explicitly ask for a temporary view, check whether
432          * we need one implicitly.      We allow TEMP to be inserted automatically as
433          * long as the CREATE command is consistent with that --- no explicit
434          * schema name.
435          */
436         view = stmt->view;
437         if (!view->istemp && isViewOnTempTable(viewParse))
438         {
439                 view = copyObject(view);        /* don't corrupt original command */
440                 view->istemp = true;
441                 ereport(NOTICE,
442                                 (errmsg("view \"%s\" will be a temporary view",
443                                                 view->relname)));
444         }
445
446         /*
447          * Create the view relation
448          *
449          * NOTE: if it already exists and replace is false, the xact will be
450          * aborted.
451          */
452         viewOid = DefineVirtualRelation(view, viewParse->targetList,
453                                                                         stmt->replace);
454
455         /*
456          * The relation we have just created is not visible to any other commands
457          * running with the same transaction & command id. So, increment the
458          * command id counter (but do NOT pfree any memory!!!!)
459          */
460         CommandCounterIncrement();
461
462         /*
463          * The range table of 'viewParse' does not contain entries for the "OLD"
464          * and "NEW" relations. So... add them!
465          */
466         viewParse = UpdateRangeTableOfViewParse(viewOid, viewParse);
467
468         /*
469          * Now create the rules associated with the view.
470          */
471         DefineViewRules(viewOid, viewParse, stmt->replace);
472 }