OSDN Git Service

Fix bugs in relpersistence handling during table creation.
[pg-rex/syncrep.git] / src / backend / commands / view.c
index bb4f218..b238199 100644 (file)
  * view.c
  *       use rewrite rules to construct views
  *
- * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- *     $Id: view.c,v 1.62 2002/04/15 05:22:03 tgl Exp $
+ *
+ * IDENTIFICATION
+ *       src/backend/commands/view.c
  *
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
 
+#include "access/heapam.h"
 #include "access/xact.h"
-#include "catalog/heap.h"
 #include "catalog/namespace.h"
+#include "commands/defrem.h"
 #include "commands/tablecmds.h"
 #include "commands/view.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
+#include "nodes/nodeFuncs.h"
+#include "parser/analyze.h"
 #include "parser/parse_relation.h"
-#include "parser/parse_type.h"
 #include "rewrite/rewriteDefine.h"
 #include "rewrite/rewriteManip.h"
-#include "rewrite/rewriteRemove.h"
 #include "rewrite/rewriteSupport.h"
-#include "utils/syscache.h"
+#include "utils/acl.h"
+#include "utils/builtins.h"
+#include "utils/lsyscache.h"
+#include "utils/rel.h"
+
 
+static void checkViewTupleDesc(TupleDesc newdesc, TupleDesc olddesc);
+static bool isViewOnTempTable_walker(Node *node, void *context);
 
 /*---------------------------------------------------------------------
- * DefineVirtualRelation
+ * isViewOnTempTable
  *
- * Create the "view" relation.
- * `DefineRelation' does all the work, we just provide the correct
- * arguments!
+ * Returns true iff any of the relations underlying this view are
+ * temporary tables.
+ *---------------------------------------------------------------------
+ */
+static bool
+isViewOnTempTable(Query *viewParse)
+{
+       return isViewOnTempTable_walker((Node *) viewParse, NULL);
+}
+
+static bool
+isViewOnTempTable_walker(Node *node, void *context)
+{
+       if (node == NULL)
+               return false;
+
+       if (IsA(node, Query))
+       {
+               Query      *query = (Query *) node;
+               ListCell   *rtable;
+
+               foreach(rtable, query->rtable)
+               {
+                       RangeTblEntry *rte = lfirst(rtable);
+
+                       if (rte->rtekind == RTE_RELATION)
+                       {
+                               Relation        rel = heap_open(rte->relid, AccessShareLock);
+                               char            relpersistence = rel->rd_rel->relpersistence;
+
+                               heap_close(rel, AccessShareLock);
+                               if (relpersistence == RELPERSISTENCE_TEMP)
+                                       return true;
+                       }
+               }
+
+               return query_tree_walker(query,
+                                                                isViewOnTempTable_walker,
+                                                                context,
+                                                                QTW_IGNORE_JOINALIASES);
+       }
+
+       return expression_tree_walker(node,
+                                                                 isViewOnTempTable_walker,
+                                                                 context);
+}
+
+/*---------------------------------------------------------------------
+ * DefineVirtualRelation
  *
- * If the relation already exists, then 'DefineRelation' will abort
- * the xact...
+ * Create the "view" relation. `DefineRelation' does all the work,
+ * we just provide the correct arguments ... at least when we're
+ * creating a view.  If we're updating an existing view, we have to
+ * work harder.
  *---------------------------------------------------------------------
  */
 static Oid
-DefineVirtualRelation(const RangeVar *relation, List *tlist)
+DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace,
+                                         Oid namespaceId)
 {
+       Oid                     viewOid;
        CreateStmt *createStmt = makeNode(CreateStmt);
-       List       *attrList,
-                          *t;
+       List       *attrList;
+       ListCell   *t;
 
        /*
-        * create a list of ColumnDef nodes based on the names and types of
-        * the (non-junk) targetlist items from the view's SELECT list.
+        * create a list of ColumnDef nodes based on the names and types of the
+        * (non-junk) targetlist items from the view's SELECT list.
         */
        attrList = NIL;
        foreach(t, tlist)
        {
-               TargetEntry *entry = lfirst(t);
-               Resdom     *res = entry->resdom;
+               TargetEntry *tle = lfirst(t);
 
-               if (!res->resjunk)
+               if (!tle->resjunk)
                {
                        ColumnDef  *def = makeNode(ColumnDef);
-                       TypeName   *typename = makeNode(TypeName);
-
-                       def->colname = pstrdup(res->resname);
-
-                       typename->typeid = res->restype;
-                       typename->typmod = res->restypmod;
-                       def->typename = typename;
 
+                       def->colname = pstrdup(tle->resname);
+                       def->typeName = makeTypeNameFromOid(exprType((Node *) tle->expr),
+                                                                                        exprTypmod((Node *) tle->expr));
+                       def->inhcount = 0;
+                       def->is_local = true;
                        def->is_not_null = false;
+                       def->is_from_type = false;
+                       def->storage = 0;
                        def->raw_default = NULL;
                        def->cooked_default = NULL;
+                       def->collClause = NULL;
+                       def->collOid = exprCollation((Node *) tle->expr);
+
+                       /*
+                        * It's possible that the column is of a collatable type but the
+                        * collation could not be resolved, so double-check.
+                        */
+                       if (type_is_collatable(exprType((Node *) tle->expr)))
+                       {
+                               if (!OidIsValid(def->collOid))
+                                       ereport(ERROR,
+                                                       (errcode(ERRCODE_INDETERMINATE_COLLATION),
+                                                        errmsg("could not determine which collation to use for view column \"%s\"",
+                                                                       def->colname),
+                                                        errhint("Use the COLLATE clause to set the collation explicitly.")));
+                       }
+                       else
+                               Assert(!OidIsValid(def->collOid));
                        def->constraints = NIL;
 
                        attrList = lappend(attrList, def);
@@ -77,75 +153,192 @@ DefineVirtualRelation(const RangeVar *relation, List *tlist)
        }
 
        if (attrList == NIL)
-               elog(ERROR, "attempted to define virtual relation with no attrs");
-
-       /*
-        * now create the parameters for keys/inheritance etc. All of them are
-        * nil...
-        */
-       createStmt->relation = (RangeVar *) relation;
-       createStmt->tableElts = attrList;
-       createStmt->inhRelations = NIL;
-       createStmt->constraints = NIL;
-       createStmt->hasoids = false;
+               ereport(ERROR,
+                               (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+                                errmsg("view must have at least one column")));
 
        /*
-        * finally create the relation...
+        * Check to see if we want to replace an existing view.
         */
-       return DefineRelation(createStmt, RELKIND_VIEW);
-}
+       viewOid = get_relname_relid(relation->relname, namespaceId);
 
-static RuleStmt *
-FormViewRetrieveRule(const RangeVar *view, Query *viewParse)
-{
-       RuleStmt   *rule;
-       char       *rname;
-
-       /*
-        * Create a RuleStmt that corresponds to the suitable rewrite rule
-        * args for DefineQueryRewrite();
-        */
-       rname = MakeRetrieveViewRuleName(view->relname);
+       if (OidIsValid(viewOid) && replace)
+       {
+               Relation        rel;
+               TupleDesc       descriptor;
+
+               /*
+                * Yes.  Get exclusive lock on the existing view ...
+                */
+               rel = relation_open(viewOid, AccessExclusiveLock);
+
+               /*
+                * Make sure it *is* a view, and do permissions checks.
+                */
+               if (rel->rd_rel->relkind != RELKIND_VIEW)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                                        errmsg("\"%s\" is not a view",
+                                                       RelationGetRelationName(rel))));
+
+               if (!pg_class_ownercheck(viewOid, GetUserId()))
+                       aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
+                                                  RelationGetRelationName(rel));
+
+               /* Also check it's not in use already */
+               CheckTableNotInUse(rel, "CREATE OR REPLACE VIEW");
+
+               /*
+                * Due to the namespace visibility rules for temporary objects, we
+                * should only end up replacing a temporary view with another
+                * temporary view, and similarly for permanent views.
+                */
+               Assert(relation->relpersistence == rel->rd_rel->relpersistence);
+
+               /*
+                * Create a tuple descriptor to compare against the existing view, and
+                * verify that the old column list is an initial prefix of the new
+                * column list.
+                */
+               descriptor = BuildDescForRelation(attrList);
+               checkViewTupleDesc(descriptor, rel->rd_att);
+
+               /*
+                * If new attributes have been added, we must add pg_attribute entries
+                * for them.  It is convenient (although overkill) to use the ALTER
+                * TABLE ADD COLUMN infrastructure for this.
+                */
+               if (list_length(attrList) > rel->rd_att->natts)
+               {
+                       List       *atcmds = NIL;
+                       ListCell   *c;
+                       int                     skip = rel->rd_att->natts;
+
+                       foreach(c, attrList)
+                       {
+                               AlterTableCmd *atcmd;
+
+                               if (skip > 0)
+                               {
+                                       skip--;
+                                       continue;
+                               }
+                               atcmd = makeNode(AlterTableCmd);
+                               atcmd->subtype = AT_AddColumnToView;
+                               atcmd->def = (Node *) lfirst(c);
+                               atcmds = lappend(atcmds, atcmd);
+                       }
+                       AlterTableInternal(viewOid, atcmds, true);
+               }
 
-       rule = makeNode(RuleStmt);
-       rule->relation = copyObject((RangeVar *) view);
-       rule->rulename = pstrdup(rname);
-       rule->whereClause = NULL;
-       rule->event = CMD_SELECT;
-       rule->instead = true;
-       rule->actions = makeList1(viewParse);
+               /*
+                * Seems okay, so return the OID of the pre-existing view.
+                */
+               relation_close(rel, NoLock);    /* keep the lock! */
 
-       return rule;
+               return viewOid;
+       }
+       else
+       {
+               Oid                     relid;
+
+               /*
+                * now set the parameters for keys/inheritance etc. All of these are
+                * uninteresting for views...
+                */
+               createStmt->relation = (RangeVar *) relation;
+               createStmt->tableElts = attrList;
+               createStmt->inhRelations = NIL;
+               createStmt->constraints = NIL;
+               createStmt->options = list_make1(defWithOids(false));
+               createStmt->oncommit = ONCOMMIT_NOOP;
+               createStmt->tablespacename = NULL;
+               createStmt->if_not_exists = false;
+
+               /*
+                * finally create the relation (this will error out if there's an
+                * existing view, so we don't need more code to complain if "replace"
+                * is false).
+                */
+               relid = DefineRelation(createStmt, RELKIND_VIEW, InvalidOid);
+               Assert(relid != InvalidOid);
+               return relid;
+       }
 }
 
+/*
+ * Verify that tupledesc associated with proposed new view definition
+ * matches tupledesc of old view.  This is basically a cut-down version
+ * of equalTupleDescs(), with code added to generate specific complaints.
+ * Also, we allow the new tupledesc to have more columns than the old.
+ */
 static void
-DefineViewRules(const RangeVar *view, Query *viewParse)
+checkViewTupleDesc(TupleDesc newdesc, TupleDesc olddesc)
 {
-       RuleStmt   *retrieve_rule;
-
-#ifdef NOTYET
-       RuleStmt   *replace_rule;
-       RuleStmt   *append_rule;
-       RuleStmt   *delete_rule;
-#endif
+       int                     i;
 
-       retrieve_rule = FormViewRetrieveRule(view, viewParse);
+       if (newdesc->natts < olddesc->natts)
+               ereport(ERROR,
+                               (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+                                errmsg("cannot drop columns from view")));
+       /* we can ignore tdhasoid */
 
-#ifdef NOTYET
-
-       replace_rule = FormViewReplaceRule(view, viewParse);
-       append_rule = FormViewAppendRule(view, viewParse);
-       delete_rule = FormViewDeleteRule(view, viewParse);
-#endif
+       for (i = 0; i < olddesc->natts; i++)
+       {
+               Form_pg_attribute newattr = newdesc->attrs[i];
+               Form_pg_attribute oldattr = olddesc->attrs[i];
+
+               /* XXX msg not right, but we don't support DROP COL on view anyway */
+               if (newattr->attisdropped != oldattr->attisdropped)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+                                        errmsg("cannot drop columns from view")));
+
+               if (strcmp(NameStr(newattr->attname), NameStr(oldattr->attname)) != 0)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+                                errmsg("cannot change name of view column \"%s\" to \"%s\"",
+                                               NameStr(oldattr->attname),
+                                               NameStr(newattr->attname))));
+               /* XXX would it be safe to allow atttypmod to change?  Not sure */
+               if (newattr->atttypid != oldattr->atttypid ||
+                       newattr->atttypmod != oldattr->atttypmod)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+                                        errmsg("cannot change data type of view column \"%s\" from %s to %s",
+                                                       NameStr(oldattr->attname),
+                                                       format_type_with_typemod(oldattr->atttypid,
+                                                                                                        oldattr->atttypmod),
+                                                       format_type_with_typemod(newattr->atttypid,
+                                                                                                        newattr->atttypmod))));
+               /* We can ignore the remaining attributes of an attribute... */
+       }
 
-       DefineQueryRewrite(retrieve_rule);
+       /*
+        * We ignore the constraint fields.  The new view desc can't have any
+        * constraints, and the only ones that could be on the old view are
+        * defaults, which we are happy to leave in place.
+        */
+}
 
-#ifdef NOTYET
-       DefineQueryRewrite(replace_rule);
-       DefineQueryRewrite(append_rule);
-       DefineQueryRewrite(delete_rule);
-#endif
+static void
+DefineViewRules(Oid viewOid, Query *viewParse, bool replace)
+{
+       /*
+        * Set up the ON SELECT rule.  Since the query has already been through
+        * parse analysis, we use DefineQueryRewrite() directly.
+        */
+       DefineQueryRewrite(pstrdup(ViewSelectRuleName),
+                                          viewOid,
+                                          NULL,
+                                          CMD_SELECT,
+                                          true,
+                                          replace,
+                                          list_make1(viewParse));
 
+       /*
+        * Someday: automatic ON INSERT, etc
+        */
 }
 
 /*---------------------------------------------------------------
@@ -168,33 +361,37 @@ DefineViewRules(const RangeVar *view, Query *viewParse)
 static Query *
 UpdateRangeTableOfViewParse(Oid viewOid, Query *viewParse)
 {
+       Relation        viewRel;
        List       *new_rt;
        RangeTblEntry *rt_entry1,
                           *rt_entry2;
 
        /*
         * Make a copy of the given parsetree.  It's not so much that we don't
-        * want to scribble on our input, it's that the parser has a bad habit
-        * of outputting multiple links to the same subtree for constructs
-        * like BETWEEN, and we mustn't have OffsetVarNodes increment the
-        * varno of a Var node twice.  copyObject will expand any
-        * multiply-referenced subtree into multiple copies.
+        * want to scribble on our input, it's that the parser has a bad habit of
+        * outputting multiple links to the same subtree for constructs like
+        * BETWEEN, and we mustn't have OffsetVarNodes increment the varno of a
+        * Var node twice.      copyObject will expand any multiply-referenced subtree
+        * into multiple copies.
         */
        viewParse = (Query *) copyObject(viewParse);
 
+       /* need to open the rel for addRangeTableEntryForRelation */
+       viewRel = relation_open(viewOid, AccessShareLock);
+
        /*
-        * Create the 2 new range table entries and form the new range
-        * table... OLD first, then NEW....
+        * Create the 2 new range table entries and form the new range table...
+        * OLD first, then NEW....
         */
-       rt_entry1 = addRangeTableEntryForRelation(NULL, viewOid,
-                                                                                         makeAlias("*OLD*", NIL),
+       rt_entry1 = addRangeTableEntryForRelation(NULL, viewRel,
+                                                                                         makeAlias("old", NIL),
                                                                                          false, false);
-       rt_entry2 = addRangeTableEntryForRelation(NULL, viewOid,
-                                                                                         makeAlias("*NEW*", NIL),
+       rt_entry2 = addRangeTableEntryForRelation(NULL, viewRel,
+                                                                                         makeAlias("new", NIL),
                                                                                          false, false);
        /* Must override addRangeTableEntry's default access-check flags */
-       rt_entry1->checkForRead = false;
-       rt_entry2->checkForRead = false;
+       rt_entry1->requiredPerms = 0;
+       rt_entry2->requiredPerms = 0;
 
        new_rt = lcons(rt_entry1, lcons(rt_entry2, viewParse->rtable));
 
@@ -205,67 +402,134 @@ UpdateRangeTableOfViewParse(Oid viewOid, Query *viewParse)
         */
        OffsetVarNodes((Node *) viewParse, 2, 0);
 
+       relation_close(viewRel, AccessShareLock);
+
        return viewParse;
 }
 
-/*-------------------------------------------------------------------
+/*
  * DefineView
- *
- *             - takes a "viewname", "parsetree" pair and then
- *             1)              construct the "virtual" relation
- *             2)              commit the command but NOT the transaction,
- *                             so that the relation exists
- *                             before the rules are defined.
- *             2)              define the "n" rules specified in the PRS2 paper
- *                             over the "virtual" relation
- *-------------------------------------------------------------------
+ *             Execute a CREATE VIEW command.
  */
 void
-DefineView(const RangeVar *view, Query *viewParse)
+DefineView(ViewStmt *stmt, const char *queryString)
 {
+       Query      *viewParse;
        Oid                     viewOid;
+       Oid                     namespaceId;
+       RangeVar   *view;
 
        /*
-        * Create the view relation
+        * Run parse analysis to convert the raw parse tree to a Query.  Note this
+        * also acquires sufficient locks on the source table(s).
         *
-        * NOTE: if it already exists, the xact will be aborted.
+        * Since parse analysis scribbles on its input, copy the raw parse tree;
+        * this ensures we don't corrupt a prepared statement, for example.
         */
-       viewOid = DefineVirtualRelation(view, viewParse->targetList);
+       viewParse = parse_analyze((Node *) copyObject(stmt->query),
+                                                         queryString, NULL, 0);
 
        /*
-        * The relation we have just created is not visible to any other
-        * commands running with the same transaction & command id. So,
-        * increment the command id counter (but do NOT pfree any memory!!!!)
+        * The grammar should ensure that the result is a single SELECT Query.
         */
-       CommandCounterIncrement();
+       if (!IsA(viewParse, Query) ||
+               viewParse->commandType != CMD_SELECT)
+               elog(ERROR, "unexpected parse analysis result");
 
        /*
-        * The range table of 'viewParse' does not contain entries for the
-        * "OLD" and "NEW" relations. So... add them!
+        * Check for unsupported cases.  These tests are redundant with ones in
+        * DefineQueryRewrite(), but that function will complain about a bogus ON
+        * SELECT rule, and we'd rather the message complain about a view.
         */
-       viewParse = UpdateRangeTableOfViewParse(viewOid, viewParse);
+       if (viewParse->intoClause != NULL)
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("views must not contain SELECT INTO")));
+       if (viewParse->hasModifyingCTE)
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+               errmsg("views must not contain data-modifying statements in WITH")));
 
        /*
-        * Now create the rules associated with the view.
+        * If a list of column names was given, run through and insert these into
+        * the actual query tree. - thomas 2000-03-08
         */
-       DefineViewRules(view, viewParse);
-}
+       if (stmt->aliases != NIL)
+       {
+               ListCell   *alist_item = list_head(stmt->aliases);
+               ListCell   *targetList;
 
-/*------------------------------------------------------------------
- * RemoveView
- *
- * Remove a view given its name
- *------------------------------------------------------------------
- */
-void
-RemoveView(const RangeVar *view)
-{
-       Oid                     viewOid;
+               foreach(targetList, viewParse->targetList)
+               {
+                       TargetEntry *te = (TargetEntry *) lfirst(targetList);
+
+                       Assert(IsA(te, TargetEntry));
+                       /* junk columns don't get aliases */
+                       if (te->resjunk)
+                               continue;
+                       te->resname = pstrdup(strVal(lfirst(alist_item)));
+                       alist_item = lnext(alist_item);
+                       if (alist_item == NULL)
+                               break;                  /* done assigning aliases */
+               }
+
+               if (alist_item != NULL)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_SYNTAX_ERROR),
+                                        errmsg("CREATE VIEW specifies more column "
+                                                       "names than columns")));
+       }
+
+       /* Unlogged views are not sensible. */
+       if (stmt->view->relpersistence == RELPERSISTENCE_UNLOGGED)
+               ereport(ERROR,
+                               (errcode(ERRCODE_SYNTAX_ERROR),
+               errmsg("views cannot be unlogged because they do not have storage")));
+
+       /*
+        * If the user didn't explicitly ask for a temporary view, check whether
+        * we need one implicitly.      We allow TEMP to be inserted automatically as
+        * long as the CREATE command is consistent with that --- no explicit
+        * schema name.
+        */
+       view = copyObject(stmt->view);  /* don't corrupt original command */
+       if (view->relpersistence == RELPERSISTENCE_PERMANENT
+               && isViewOnTempTable(viewParse))
+       {
+               view->relpersistence = RELPERSISTENCE_TEMP;
+               ereport(NOTICE,
+                               (errmsg("view \"%s\" will be a temporary view",
+                                               view->relname)));
+       }
+
+       /* Might also need to make it temporary if placed in temp schema. */
+       namespaceId = RangeVarGetCreationNamespace(view);
+       RangeVarAdjustRelationPersistence(view, namespaceId);
 
-       viewOid = RangeVarGetRelid(view, false);
        /*
-        * We just have to drop the relation; the associated rules will be
-        * cleaned up automatically.
+        * Create the view relation
+        *
+        * NOTE: if it already exists and replace is false, the xact will be
+        * aborted.
+        */
+       viewOid = DefineVirtualRelation(view, viewParse->targetList,
+                                                                       stmt->replace, namespaceId);
+
+       /*
+        * The relation we have just created is not visible to any other commands
+        * running with the same transaction & command id. So, increment the
+        * command id counter (but do NOT pfree any memory!!!!)
+        */
+       CommandCounterIncrement();
+
+       /*
+        * The range table of 'viewParse' does not contain entries for the "OLD"
+        * and "NEW" relations. So... add them!
+        */
+       viewParse = UpdateRangeTableOfViewParse(viewOid, viewParse);
+
+       /*
+        * Now create the rules associated with the view.
         */
-       heap_drop_with_catalog(viewOid, allowSystemTableMods);
+       DefineViewRules(viewOid, viewParse, stmt->replace);
 }