OSDN Git Service

Improve reporting of run-time-detected indeterminate-collation errors.
[pg-rex/syncrep.git] / src / backend / commands / view.c
1 /*-------------------------------------------------------------------------
2  *
3  * view.c
4  *        use rewrite rules to construct views
5  *
6  * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/commands/view.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/heapam.h"
18 #include "access/xact.h"
19 #include "catalog/namespace.h"
20 #include "commands/defrem.h"
21 #include "commands/tablecmds.h"
22 #include "commands/view.h"
23 #include "miscadmin.h"
24 #include "nodes/makefuncs.h"
25 #include "nodes/nodeFuncs.h"
26 #include "parser/analyze.h"
27 #include "parser/parse_relation.h"
28 #include "rewrite/rewriteDefine.h"
29 #include "rewrite/rewriteManip.h"
30 #include "rewrite/rewriteSupport.h"
31 #include "utils/acl.h"
32 #include "utils/builtins.h"
33 #include "utils/lsyscache.h"
34 #include "utils/rel.h"
35
36
37 static void checkViewTupleDesc(TupleDesc newdesc, TupleDesc olddesc);
38 static bool isViewOnTempTable_walker(Node *node, void *context);
39
40 /*---------------------------------------------------------------------
41  * isViewOnTempTable
42  *
43  * Returns true iff any of the relations underlying this view are
44  * temporary tables.
45  *---------------------------------------------------------------------
46  */
47 static bool
48 isViewOnTempTable(Query *viewParse)
49 {
50         return isViewOnTempTable_walker((Node *) viewParse, NULL);
51 }
52
53 static bool
54 isViewOnTempTable_walker(Node *node, void *context)
55 {
56         if (node == NULL)
57                 return false;
58
59         if (IsA(node, Query))
60         {
61                 Query      *query = (Query *) node;
62                 ListCell   *rtable;
63
64                 foreach(rtable, query->rtable)
65                 {
66                         RangeTblEntry *rte = lfirst(rtable);
67
68                         if (rte->rtekind == RTE_RELATION)
69                         {
70                                 Relation        rel = heap_open(rte->relid, AccessShareLock);
71                                 char            relpersistence = rel->rd_rel->relpersistence;
72
73                                 heap_close(rel, AccessShareLock);
74                                 if (relpersistence == RELPERSISTENCE_TEMP)
75                                         return true;
76                         }
77                 }
78
79                 return query_tree_walker(query,
80                                                                  isViewOnTempTable_walker,
81                                                                  context,
82                                                                  QTW_IGNORE_JOINALIASES);
83         }
84
85         return expression_tree_walker(node,
86                                                                   isViewOnTempTable_walker,
87                                                                   context);
88 }
89
90 /*---------------------------------------------------------------------
91  * DefineVirtualRelation
92  *
93  * Create the "view" relation. `DefineRelation' does all the work,
94  * we just provide the correct arguments ... at least when we're
95  * creating a view.  If we're updating an existing view, we have to
96  * work harder.
97  *---------------------------------------------------------------------
98  */
99 static Oid
100 DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace)
101 {
102         Oid                     viewOid,
103                                 namespaceId;
104         CreateStmt *createStmt = makeNode(CreateStmt);
105         List       *attrList;
106         ListCell   *t;
107
108         /*
109          * create a list of ColumnDef nodes based on the names and types of the
110          * (non-junk) targetlist items from the view's SELECT list.
111          */
112         attrList = NIL;
113         foreach(t, tlist)
114         {
115                 TargetEntry *tle = lfirst(t);
116
117                 if (!tle->resjunk)
118                 {
119                         ColumnDef  *def = makeNode(ColumnDef);
120
121                         def->colname = pstrdup(tle->resname);
122                         def->typeName = makeTypeNameFromOid(exprType((Node *) tle->expr),
123                                                                                                 exprTypmod((Node *) tle->expr));
124                         def->inhcount = 0;
125                         def->is_local = true;
126                         def->is_not_null = false;
127                         def->is_from_type = false;
128                         def->storage = 0;
129                         def->raw_default = NULL;
130                         def->cooked_default = NULL;
131                         def->collClause = NULL;
132                         def->collOid = exprCollation((Node *) tle->expr);
133                         /*
134                          * It's possible that the column is of a collatable type but the
135                          * collation could not be resolved, so double-check.
136                          */
137                         if (type_is_collatable(exprType((Node *) tle->expr)))
138                         {
139                                 if (!OidIsValid(def->collOid))
140                                         ereport(ERROR,
141                                                         (errcode(ERRCODE_INDETERMINATE_COLLATION),
142                                                          errmsg("could not determine which collation to use for view column \"%s\"",
143                                                                         def->colname),
144                                                          errhint("Use the COLLATE clause to set the collation explicitly.")));
145                         }
146                         else
147                                 Assert(!OidIsValid(def->collOid));
148                         def->constraints = NIL;
149
150                         attrList = lappend(attrList, def);
151                 }
152         }
153
154         if (attrList == NIL)
155                 ereport(ERROR,
156                                 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
157                                  errmsg("view must have at least one column")));
158
159         /*
160          * Check to see if we want to replace an existing view.
161          */
162         namespaceId = RangeVarGetCreationNamespace(relation);
163         viewOid = get_relname_relid(relation->relname, namespaceId);
164
165         if (OidIsValid(viewOid) && replace)
166         {
167                 Relation        rel;
168                 TupleDesc       descriptor;
169
170                 /*
171                  * Yes.  Get exclusive lock on the existing view ...
172                  */
173                 rel = relation_open(viewOid, AccessExclusiveLock);
174
175                 /*
176                  * Make sure it *is* a view, and do permissions checks.
177                  */
178                 if (rel->rd_rel->relkind != RELKIND_VIEW)
179                         ereport(ERROR,
180                                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
181                                          errmsg("\"%s\" is not a view",
182                                                         RelationGetRelationName(rel))));
183
184                 if (!pg_class_ownercheck(viewOid, GetUserId()))
185                         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
186                                                    RelationGetRelationName(rel));
187
188                 /* Also check it's not in use already */
189                 CheckTableNotInUse(rel, "CREATE OR REPLACE VIEW");
190
191                 /*
192                  * Due to the namespace visibility rules for temporary objects, we
193                  * should only end up replacing a temporary view with another
194                  * temporary view, and similarly for permanent views.
195                  */
196                 Assert(relation->relpersistence == rel->rd_rel->relpersistence);
197
198                 /*
199                  * Create a tuple descriptor to compare against the existing view, and
200                  * verify that the old column list is an initial prefix of the new
201                  * column list.
202                  */
203                 descriptor = BuildDescForRelation(attrList);
204                 checkViewTupleDesc(descriptor, rel->rd_att);
205
206                 /*
207                  * If new attributes have been added, we must add pg_attribute entries
208                  * for them.  It is convenient (although overkill) to use the ALTER
209                  * TABLE ADD COLUMN infrastructure for this.
210                  */
211                 if (list_length(attrList) > rel->rd_att->natts)
212                 {
213                         List       *atcmds = NIL;
214                         ListCell   *c;
215                         int                     skip = rel->rd_att->natts;
216
217                         foreach(c, attrList)
218                         {
219                                 AlterTableCmd *atcmd;
220
221                                 if (skip > 0)
222                                 {
223                                         skip--;
224                                         continue;
225                                 }
226                                 atcmd = makeNode(AlterTableCmd);
227                                 atcmd->subtype = AT_AddColumnToView;
228                                 atcmd->def = (Node *) lfirst(c);
229                                 atcmds = lappend(atcmds, atcmd);
230                         }
231                         AlterTableInternal(viewOid, atcmds, true);
232                 }
233
234                 /*
235                  * Seems okay, so return the OID of the pre-existing view.
236                  */
237                 relation_close(rel, NoLock);    /* keep the lock! */
238
239                 return viewOid;
240         }
241         else
242         {
243                 Oid             relid;
244
245                 /*
246                  * now set the parameters for keys/inheritance etc. All of these are
247                  * uninteresting for views...
248                  */
249                 createStmt->relation = (RangeVar *) relation;
250                 createStmt->tableElts = attrList;
251                 createStmt->inhRelations = NIL;
252                 createStmt->constraints = NIL;
253                 createStmt->options = list_make1(defWithOids(false));
254                 createStmt->oncommit = ONCOMMIT_NOOP;
255                 createStmt->tablespacename = NULL;
256                 createStmt->if_not_exists = false;
257
258                 /*
259                  * finally create the relation (this will error out if there's an
260                  * existing view, so we don't need more code to complain if "replace"
261                  * is false).
262                  */
263                 relid = DefineRelation(createStmt, RELKIND_VIEW, InvalidOid);
264                 Assert(relid != InvalidOid);
265                 return relid;
266         }
267 }
268
269 /*
270  * Verify that tupledesc associated with proposed new view definition
271  * matches tupledesc of old view.  This is basically a cut-down version
272  * of equalTupleDescs(), with code added to generate specific complaints.
273  * Also, we allow the new tupledesc to have more columns than the old.
274  */
275 static void
276 checkViewTupleDesc(TupleDesc newdesc, TupleDesc olddesc)
277 {
278         int                     i;
279
280         if (newdesc->natts < olddesc->natts)
281                 ereport(ERROR,
282                                 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
283                                  errmsg("cannot drop columns from view")));
284         /* we can ignore tdhasoid */
285
286         for (i = 0; i < olddesc->natts; i++)
287         {
288                 Form_pg_attribute newattr = newdesc->attrs[i];
289                 Form_pg_attribute oldattr = olddesc->attrs[i];
290
291                 /* XXX msg not right, but we don't support DROP COL on view anyway */
292                 if (newattr->attisdropped != oldattr->attisdropped)
293                         ereport(ERROR,
294                                         (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
295                                          errmsg("cannot drop columns from view")));
296
297                 if (strcmp(NameStr(newattr->attname), NameStr(oldattr->attname)) != 0)
298                         ereport(ERROR,
299                                         (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
300                                  errmsg("cannot change name of view column \"%s\" to \"%s\"",
301                                                 NameStr(oldattr->attname),
302                                                 NameStr(newattr->attname))));
303                 /* XXX would it be safe to allow atttypmod to change?  Not sure */
304                 if (newattr->atttypid != oldattr->atttypid ||
305                         newattr->atttypmod != oldattr->atttypmod)
306                         ereport(ERROR,
307                                         (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
308                                          errmsg("cannot change data type of view column \"%s\" from %s to %s",
309                                                         NameStr(oldattr->attname),
310                                                         format_type_with_typemod(oldattr->atttypid,
311                                                                                                          oldattr->atttypmod),
312                                                         format_type_with_typemod(newattr->atttypid,
313                                                                                                          newattr->atttypmod))));
314                 /* We can ignore the remaining attributes of an attribute... */
315         }
316
317         /*
318          * We ignore the constraint fields.  The new view desc can't have any
319          * constraints, and the only ones that could be on the old view are
320          * defaults, which we are happy to leave in place.
321          */
322 }
323
324 static void
325 DefineViewRules(Oid viewOid, Query *viewParse, bool replace)
326 {
327         /*
328          * Set up the ON SELECT rule.  Since the query has already been through
329          * parse analysis, we use DefineQueryRewrite() directly.
330          */
331         DefineQueryRewrite(pstrdup(ViewSelectRuleName),
332                                            viewOid,
333                                            NULL,
334                                            CMD_SELECT,
335                                            true,
336                                            replace,
337                                            list_make1(viewParse));
338
339         /*
340          * Someday: automatic ON INSERT, etc
341          */
342 }
343
344 /*---------------------------------------------------------------
345  * UpdateRangeTableOfViewParse
346  *
347  * Update the range table of the given parsetree.
348  * This update consists of adding two new entries IN THE BEGINNING
349  * of the range table (otherwise the rule system will die a slow,
350  * horrible and painful death, and we do not want that now, do we?)
351  * one for the OLD relation and one for the NEW one (both of
352  * them refer in fact to the "view" relation).
353  *
354  * Of course we must also increase the 'varnos' of all the Var nodes
355  * by 2...
356  *
357  * These extra RT entries are not actually used in the query,
358  * except for run-time permission checking.
359  *---------------------------------------------------------------
360  */
361 static Query *
362 UpdateRangeTableOfViewParse(Oid viewOid, Query *viewParse)
363 {
364         Relation        viewRel;
365         List       *new_rt;
366         RangeTblEntry *rt_entry1,
367                            *rt_entry2;
368
369         /*
370          * Make a copy of the given parsetree.  It's not so much that we don't
371          * want to scribble on our input, it's that the parser has a bad habit of
372          * outputting multiple links to the same subtree for constructs like
373          * BETWEEN, and we mustn't have OffsetVarNodes increment the varno of a
374          * Var node twice.      copyObject will expand any multiply-referenced subtree
375          * into multiple copies.
376          */
377         viewParse = (Query *) copyObject(viewParse);
378
379         /* need to open the rel for addRangeTableEntryForRelation */
380         viewRel = relation_open(viewOid, AccessShareLock);
381
382         /*
383          * Create the 2 new range table entries and form the new range table...
384          * OLD first, then NEW....
385          */
386         rt_entry1 = addRangeTableEntryForRelation(NULL, viewRel,
387                                                                                           makeAlias("old", NIL),
388                                                                                           false, false);
389         rt_entry2 = addRangeTableEntryForRelation(NULL, viewRel,
390                                                                                           makeAlias("new", NIL),
391                                                                                           false, false);
392         /* Must override addRangeTableEntry's default access-check flags */
393         rt_entry1->requiredPerms = 0;
394         rt_entry2->requiredPerms = 0;
395
396         new_rt = lcons(rt_entry1, lcons(rt_entry2, viewParse->rtable));
397
398         viewParse->rtable = new_rt;
399
400         /*
401          * Now offset all var nodes by 2, and jointree RT indexes too.
402          */
403         OffsetVarNodes((Node *) viewParse, 2, 0);
404
405         relation_close(viewRel, AccessShareLock);
406
407         return viewParse;
408 }
409
410 /*
411  * DefineView
412  *              Execute a CREATE VIEW command.
413  */
414 void
415 DefineView(ViewStmt *stmt, const char *queryString)
416 {
417         Query      *viewParse;
418         Oid                     viewOid;
419         RangeVar   *view;
420
421         /*
422          * Run parse analysis to convert the raw parse tree to a Query.  Note this
423          * also acquires sufficient locks on the source table(s).
424          *
425          * Since parse analysis scribbles on its input, copy the raw parse tree;
426          * this ensures we don't corrupt a prepared statement, for example.
427          */
428         viewParse = parse_analyze((Node *) copyObject(stmt->query),
429                                                           queryString, NULL, 0);
430
431         /*
432          * The grammar should ensure that the result is a single SELECT Query.
433          */
434         if (!IsA(viewParse, Query) ||
435                 viewParse->commandType != CMD_SELECT)
436                 elog(ERROR, "unexpected parse analysis result");
437
438         /*
439          * Check for unsupported cases.  These tests are redundant with ones in
440          * DefineQueryRewrite(), but that function will complain about a bogus
441          * ON SELECT rule, and we'd rather the message complain about a view.
442          */
443         if (viewParse->intoClause != NULL)
444                 ereport(ERROR,
445                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
446                                  errmsg("views must not contain SELECT INTO")));
447         if (viewParse->hasModifyingCTE)
448                 ereport(ERROR,
449                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
450                                  errmsg("views must not contain data-modifying statements in WITH")));
451
452         /*
453          * If a list of column names was given, run through and insert these into
454          * the actual query tree. - thomas 2000-03-08
455          */
456         if (stmt->aliases != NIL)
457         {
458                 ListCell   *alist_item = list_head(stmt->aliases);
459                 ListCell   *targetList;
460
461                 foreach(targetList, viewParse->targetList)
462                 {
463                         TargetEntry *te = (TargetEntry *) lfirst(targetList);
464
465                         Assert(IsA(te, TargetEntry));
466                         /* junk columns don't get aliases */
467                         if (te->resjunk)
468                                 continue;
469                         te->resname = pstrdup(strVal(lfirst(alist_item)));
470                         alist_item = lnext(alist_item);
471                         if (alist_item == NULL)
472                                 break;                  /* done assigning aliases */
473                 }
474
475                 if (alist_item != NULL)
476                         ereport(ERROR,
477                                         (errcode(ERRCODE_SYNTAX_ERROR),
478                                          errmsg("CREATE VIEW specifies more column "
479                                                         "names than columns")));
480         }
481
482         /*
483          * If the user didn't explicitly ask for a temporary view, check whether
484          * we need one implicitly.      We allow TEMP to be inserted automatically as
485          * long as the CREATE command is consistent with that --- no explicit
486          * schema name.
487          */
488         view = stmt->view;
489         if (view->relpersistence == RELPERSISTENCE_PERMANENT
490                 && isViewOnTempTable(viewParse))
491         {
492                 view = copyObject(view);        /* don't corrupt original command */
493                 view->relpersistence = RELPERSISTENCE_TEMP;
494                 ereport(NOTICE,
495                                 (errmsg("view \"%s\" will be a temporary view",
496                                                 view->relname)));
497         }
498
499         /* Unlogged views are not sensible. */
500         if (view->relpersistence == RELPERSISTENCE_UNLOGGED)
501                 ereport(ERROR,
502                                 (errcode(ERRCODE_SYNTAX_ERROR),
503                                  errmsg("views cannot be unlogged because they do not have storage")));
504
505         /*
506          * Create the view relation
507          *
508          * NOTE: if it already exists and replace is false, the xact will be
509          * aborted.
510          */
511         viewOid = DefineVirtualRelation(view, viewParse->targetList,
512                                                                         stmt->replace);
513
514         /*
515          * The relation we have just created is not visible to any other commands
516          * running with the same transaction & command id. So, increment the
517          * command id counter (but do NOT pfree any memory!!!!)
518          */
519         CommandCounterIncrement();
520
521         /*
522          * The range table of 'viewParse' does not contain entries for the "OLD"
523          * and "NEW" relations. So... add them!
524          */
525         viewParse = UpdateRangeTableOfViewParse(viewOid, viewParse);
526
527         /*
528          * Now create the rules associated with the view.
529          */
530         DefineViewRules(viewOid, viewParse, stmt->replace);
531 }