OSDN Git Service

Fix a couple of misbehaviors rooted in the fact that the default creation
[pg-rex/syncrep.git] / src / backend / commands / view.c
1 /*-------------------------------------------------------------------------
2  *
3  * view.c
4  *        use rewrite rules to construct views
5  *
6  * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/commands/view.c,v 1.102 2007/08/27 03:36:08 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/heapam.h"
18 #include "access/xact.h"
19 #include "catalog/dependency.h"
20 #include "catalog/namespace.h"
21 #include "commands/defrem.h"
22 #include "commands/tablecmds.h"
23 #include "commands/view.h"
24 #include "miscadmin.h"
25 #include "nodes/makefuncs.h"
26 #include "optimizer/clauses.h"
27 #include "parser/analyze.h"
28 #include "parser/parse_expr.h"
29 #include "parser/parse_relation.h"
30 #include "rewrite/rewriteDefine.h"
31 #include "rewrite/rewriteManip.h"
32 #include "rewrite/rewriteSupport.h"
33 #include "utils/acl.h"
34 #include "utils/lsyscache.h"
35
36
37 static void checkViewTupleDesc(TupleDesc newdesc, TupleDesc olddesc);
38 static bool isViewOnTempTable_walker(Node *node, void *context);
39
40 /*---------------------------------------------------------------------
41  * isViewOnTempTable
42  *
43  * Returns true iff any of the relations underlying this view are
44  * temporary tables.
45  *---------------------------------------------------------------------
46  */
47 static bool
48 isViewOnTempTable(Query *viewParse)
49 {
50         return isViewOnTempTable_walker((Node *) viewParse, NULL);
51 }
52
53 static bool
54 isViewOnTempTable_walker(Node *node, void *context)
55 {
56         if (node == NULL)
57                 return false;
58
59         if (IsA(node, Query))
60         {
61                 Query      *query = (Query *) node;
62                 ListCell   *rtable;
63
64                 foreach(rtable, query->rtable)
65                 {
66                         RangeTblEntry *rte = lfirst(rtable);
67
68                         if (rte->rtekind == RTE_RELATION)
69                         {
70                                 Relation        rel = heap_open(rte->relid, AccessShareLock);
71                                 bool            istemp = rel->rd_istemp;
72
73                                 heap_close(rel, AccessShareLock);
74                                 if (istemp)
75                                         return true;
76                         }
77                 }
78
79                 return query_tree_walker(query,
80                                                                  isViewOnTempTable_walker,
81                                                                  context,
82                                                                  QTW_IGNORE_JOINALIASES);
83         }
84
85         return expression_tree_walker(node,
86                                                                   isViewOnTempTable_walker,
87                                                                   context);
88 }
89
90 /*---------------------------------------------------------------------
91  * DefineVirtualRelation
92  *
93  * Create the "view" relation. `DefineRelation' does all the work,
94  * we just provide the correct arguments ... at least when we're
95  * creating a view.  If we're updating an existing view, we have to
96  * work harder.
97  *---------------------------------------------------------------------
98  */
99 static Oid
100 DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace)
101 {
102         Oid                     viewOid,
103                                 namespaceId;
104         CreateStmt *createStmt = makeNode(CreateStmt);
105         List       *attrList;
106         ListCell   *t;
107
108         /*
109          * create a list of ColumnDef nodes based on the names and types of the
110          * (non-junk) targetlist items from the view's SELECT list.
111          */
112         attrList = NIL;
113         foreach(t, tlist)
114         {
115                 TargetEntry *tle = lfirst(t);
116
117                 if (!tle->resjunk)
118                 {
119                         ColumnDef  *def = makeNode(ColumnDef);
120
121                         def->colname = pstrdup(tle->resname);
122                         def->typename = makeTypeNameFromOid(exprType((Node *) tle->expr),
123                                                                                          exprTypmod((Node *) tle->expr));
124                         def->inhcount = 0;
125                         def->is_local = true;
126                         def->is_not_null = false;
127                         def->raw_default = NULL;
128                         def->cooked_default = NULL;
129                         def->constraints = NIL;
130
131                         attrList = lappend(attrList, def);
132                 }
133         }
134
135         if (attrList == NIL)
136                 ereport(ERROR,
137                                 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
138                                  errmsg("view must have at least one column")));
139
140         /*
141          * Check to see if we want to replace an existing view.
142          */
143         namespaceId = RangeVarGetCreationNamespace(relation);
144         viewOid = get_relname_relid(relation->relname, namespaceId);
145
146         if (OidIsValid(viewOid) && replace)
147         {
148                 Relation        rel;
149                 TupleDesc       descriptor;
150
151                 /*
152                  * Yes.  Get exclusive lock on the existing view ...
153                  */
154                 rel = relation_open(viewOid, AccessExclusiveLock);
155
156                 /*
157                  * Make sure it *is* a view, and do permissions checks.
158                  */
159                 if (rel->rd_rel->relkind != RELKIND_VIEW)
160                         ereport(ERROR,
161                                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
162                                          errmsg("\"%s\" is not a view",
163                                                         RelationGetRelationName(rel))));
164
165                 if (!pg_class_ownercheck(viewOid, GetUserId()))
166                         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
167                                                    RelationGetRelationName(rel));
168
169                 /*
170                  * Due to the namespace visibility rules for temporary objects, we
171                  * should only end up replacing a temporary view with another
172                  * temporary view, and vice versa.
173                  */
174                 Assert(relation->istemp == rel->rd_istemp);
175
176                 /*
177                  * Create a tuple descriptor to compare against the existing view, and
178                  * verify it matches.
179                  */
180                 descriptor = BuildDescForRelation(attrList);
181                 checkViewTupleDesc(descriptor, rel->rd_att);
182
183                 /*
184                  * Seems okay, so return the OID of the pre-existing view.
185                  */
186                 relation_close(rel, NoLock);    /* keep the lock! */
187
188                 return viewOid;
189         }
190         else
191         {
192                 /*
193                  * now set the parameters for keys/inheritance etc. All of these are
194                  * uninteresting for views...
195                  */
196                 createStmt->relation = (RangeVar *) relation;
197                 createStmt->tableElts = attrList;
198                 createStmt->inhRelations = NIL;
199                 createStmt->constraints = NIL;
200                 createStmt->options = list_make1(defWithOids(false));
201                 createStmt->oncommit = ONCOMMIT_NOOP;
202                 createStmt->tablespacename = NULL;
203
204                 /*
205                  * finally create the relation (this will error out if there's an
206                  * existing view, so we don't need more code to complain if "replace"
207                  * is false).
208                  */
209                 return DefineRelation(createStmt, RELKIND_VIEW);
210         }
211 }
212
213 /*
214  * Verify that tupledesc associated with proposed new view definition
215  * matches tupledesc of old view.  This is basically a cut-down version
216  * of equalTupleDescs(), with code added to generate specific complaints.
217  */
218 static void
219 checkViewTupleDesc(TupleDesc newdesc, TupleDesc olddesc)
220 {
221         int                     i;
222
223         if (newdesc->natts != olddesc->natts)
224                 ereport(ERROR,
225                                 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
226                                  errmsg("cannot change number of columns in view")));
227         /* we can ignore tdhasoid */
228
229         for (i = 0; i < newdesc->natts; i++)
230         {
231                 Form_pg_attribute newattr = newdesc->attrs[i];
232                 Form_pg_attribute oldattr = olddesc->attrs[i];
233
234                 /* XXX not right, but we don't support DROP COL on view anyway */
235                 if (newattr->attisdropped != oldattr->attisdropped)
236                         ereport(ERROR,
237                                         (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
238                                          errmsg("cannot change number of columns in view")));
239
240                 if (strcmp(NameStr(newattr->attname), NameStr(oldattr->attname)) != 0)
241                         ereport(ERROR,
242                                         (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
243                                          errmsg("cannot change name of view column \"%s\"",
244                                                         NameStr(oldattr->attname))));
245                 /* XXX would it be safe to allow atttypmod to change?  Not sure */
246                 if (newattr->atttypid != oldattr->atttypid ||
247                         newattr->atttypmod != oldattr->atttypmod)
248                         ereport(ERROR,
249                                         (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
250                                          errmsg("cannot change data type of view column \"%s\"",
251                                                         NameStr(oldattr->attname))));
252                 /* We can ignore the remaining attributes of an attribute... */
253         }
254
255         /*
256          * We ignore the constraint fields.  The new view desc can't have any
257          * constraints, and the only ones that could be on the old view are
258          * defaults, which we are happy to leave in place.
259          */
260 }
261
262 static void
263 DefineViewRules(Oid viewOid, Query *viewParse, bool replace)
264 {
265         /*
266          * Set up the ON SELECT rule.  Since the query has already been through
267          * parse analysis, we use DefineQueryRewrite() directly.
268          */
269         DefineQueryRewrite(pstrdup(ViewSelectRuleName),
270                                            viewOid,
271                                            NULL,
272                                            CMD_SELECT,
273                                            true,
274                                            replace,
275                                            list_make1(viewParse));
276         /*
277          * Someday: automatic ON INSERT, etc
278          */
279 }
280
281 /*---------------------------------------------------------------
282  * UpdateRangeTableOfViewParse
283  *
284  * Update the range table of the given parsetree.
285  * This update consists of adding two new entries IN THE BEGINNING
286  * of the range table (otherwise the rule system will die a slow,
287  * horrible and painful death, and we do not want that now, do we?)
288  * one for the OLD relation and one for the NEW one (both of
289  * them refer in fact to the "view" relation).
290  *
291  * Of course we must also increase the 'varnos' of all the Var nodes
292  * by 2...
293  *
294  * These extra RT entries are not actually used in the query,
295  * except for run-time permission checking.
296  *---------------------------------------------------------------
297  */
298 static Query *
299 UpdateRangeTableOfViewParse(Oid viewOid, Query *viewParse)
300 {
301         Relation        viewRel;
302         List       *new_rt;
303         RangeTblEntry *rt_entry1,
304                            *rt_entry2;
305
306         /*
307          * Make a copy of the given parsetree.  It's not so much that we don't
308          * want to scribble on our input, it's that the parser has a bad habit of
309          * outputting multiple links to the same subtree for constructs like
310          * BETWEEN, and we mustn't have OffsetVarNodes increment the varno of a
311          * Var node twice.      copyObject will expand any multiply-referenced subtree
312          * into multiple copies.
313          */
314         viewParse = (Query *) copyObject(viewParse);
315
316         /* need to open the rel for addRangeTableEntryForRelation */
317         viewRel = relation_open(viewOid, AccessShareLock);
318
319         /*
320          * Create the 2 new range table entries and form the new range table...
321          * OLD first, then NEW....
322          */
323         rt_entry1 = addRangeTableEntryForRelation(NULL, viewRel,
324                                                                                           makeAlias("*OLD*", NIL),
325                                                                                           false, false);
326         rt_entry2 = addRangeTableEntryForRelation(NULL, viewRel,
327                                                                                           makeAlias("*NEW*", NIL),
328                                                                                           false, false);
329         /* Must override addRangeTableEntry's default access-check flags */
330         rt_entry1->requiredPerms = 0;
331         rt_entry2->requiredPerms = 0;
332
333         new_rt = lcons(rt_entry1, lcons(rt_entry2, viewParse->rtable));
334
335         viewParse->rtable = new_rt;
336
337         /*
338          * Now offset all var nodes by 2, and jointree RT indexes too.
339          */
340         OffsetVarNodes((Node *) viewParse, 2, 0);
341
342         relation_close(viewRel, AccessShareLock);
343
344         return viewParse;
345 }
346
347 /*
348  * DefineView
349  *              Execute a CREATE VIEW command.
350  */
351 void
352 DefineView(ViewStmt *stmt, const char *queryString)
353 {
354         Query      *viewParse;
355         Oid                     viewOid;
356         RangeVar   *view;
357
358         /*
359          * Run parse analysis to convert the raw parse tree to a Query.  Note
360          * this also acquires sufficient locks on the source table(s).
361          *
362          * Since parse analysis scribbles on its input, copy the raw parse tree;
363          * this ensures we don't corrupt a prepared statement, for example.
364          */
365         viewParse = parse_analyze((Node *) copyObject(stmt->query),
366                                                           queryString, NULL, 0);
367
368         /*
369          * The grammar should ensure that the result is a single SELECT Query.
370          */
371         if (!IsA(viewParse, Query) ||
372                 viewParse->commandType != CMD_SELECT)
373                 elog(ERROR, "unexpected parse analysis result");
374
375         /*
376          * If a list of column names was given, run through and insert these into
377          * the actual query tree. - thomas 2000-03-08
378          */
379         if (stmt->aliases != NIL)
380         {
381                 ListCell   *alist_item = list_head(stmt->aliases);
382                 ListCell   *targetList;
383
384                 foreach(targetList, viewParse->targetList)
385                 {
386                         TargetEntry *te = (TargetEntry *) lfirst(targetList);
387
388                         Assert(IsA(te, TargetEntry));
389                         /* junk columns don't get aliases */
390                         if (te->resjunk)
391                                 continue;
392                         te->resname = pstrdup(strVal(lfirst(alist_item)));
393                         alist_item = lnext(alist_item);
394                         if (alist_item == NULL)
395                                 break;                  /* done assigning aliases */
396                 }
397
398                 if (alist_item != NULL)
399                         ereport(ERROR,
400                                         (errcode(ERRCODE_SYNTAX_ERROR),
401                                          errmsg("CREATE VIEW specifies more column "
402                                                         "names than columns")));
403         }
404
405         /*
406          * If the user didn't explicitly ask for a temporary view, check whether
407          * we need one implicitly.  We allow TEMP to be inserted automatically
408          * as long as the CREATE command is consistent with that --- no explicit
409          * schema name.
410          */
411         view = stmt->view;
412         if (!view->istemp && isViewOnTempTable(viewParse))
413         {
414                 view = copyObject(view); /* don't corrupt original command */
415                 view->istemp = true;
416                 ereport(NOTICE,
417                                 (errmsg("view \"%s\" will be a temporary view",
418                                                 view->relname)));
419         }
420
421         /*
422          * Create the view relation
423          *
424          * NOTE: if it already exists and replace is false, the xact will be
425          * aborted.
426          */
427         viewOid = DefineVirtualRelation(view, viewParse->targetList,
428                                                                         stmt->replace);
429
430         /*
431          * The relation we have just created is not visible to any other commands
432          * running with the same transaction & command id. So, increment the
433          * command id counter (but do NOT pfree any memory!!!!)
434          */
435         CommandCounterIncrement();
436
437         /*
438          * The range table of 'viewParse' does not contain entries for the "OLD"
439          * and "NEW" relations. So... add them!
440          */
441         viewParse = UpdateRangeTableOfViewParse(viewOid, viewParse);
442
443         /*
444          * Now create the rules associated with the view.
445          */
446         DefineViewRules(viewOid, viewParse, stmt->replace);
447 }
448
449 /*
450  * RemoveView
451  *
452  * Remove a view given its name
453  *
454  * We just have to drop the relation; the associated rules will be
455  * cleaned up automatically.
456  */
457 void
458 RemoveView(const RangeVar *view, DropBehavior behavior)
459 {
460         Oid                     viewOid;
461         ObjectAddress object;
462
463         viewOid = RangeVarGetRelid(view, false);
464
465         object.classId = RelationRelationId;
466         object.objectId = viewOid;
467         object.objectSubId = 0;
468
469         performDeletion(&object, behavior);
470 }