OSDN Git Service

Fix bugs in relpersistence handling during table creation.
[pg-rex/syncrep.git] / src / backend / commands / view.c
index 6797121..b238199 100644 (file)
@@ -3,34 +3,89 @@
  * view.c
  *       use rewrite rules to construct views
  *
- * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/commands/view.c,v 1.83 2004/06/18 06:13:23 tgl Exp $
+ *       src/backend/commands/view.c
  *
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
 
 #include "access/heapam.h"
-#include "catalog/dependency.h"
+#include "access/xact.h"
 #include "catalog/namespace.h"
+#include "commands/defrem.h"
 #include "commands/tablecmds.h"
 #include "commands/view.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
+#include "nodes/nodeFuncs.h"
+#include "parser/analyze.h"
 #include "parser/parse_relation.h"
 #include "rewrite/rewriteDefine.h"
 #include "rewrite/rewriteManip.h"
 #include "rewrite/rewriteSupport.h"
 #include "utils/acl.h"
+#include "utils/builtins.h"
 #include "utils/lsyscache.h"
+#include "utils/rel.h"
 
 
 static void checkViewTupleDesc(TupleDesc newdesc, TupleDesc olddesc);
+static bool isViewOnTempTable_walker(Node *node, void *context);
 
+/*---------------------------------------------------------------------
+ * isViewOnTempTable
+ *
+ * Returns true iff any of the relations underlying this view are
+ * temporary tables.
+ *---------------------------------------------------------------------
+ */
+static bool
+isViewOnTempTable(Query *viewParse)
+{
+       return isViewOnTempTable_walker((Node *) viewParse, NULL);
+}
+
+static bool
+isViewOnTempTable_walker(Node *node, void *context)
+{
+       if (node == NULL)
+               return false;
+
+       if (IsA(node, Query))
+       {
+               Query      *query = (Query *) node;
+               ListCell   *rtable;
+
+               foreach(rtable, query->rtable)
+               {
+                       RangeTblEntry *rte = lfirst(rtable);
+
+                       if (rte->rtekind == RTE_RELATION)
+                       {
+                               Relation        rel = heap_open(rte->relid, AccessShareLock);
+                               char            relpersistence = rel->rd_rel->relpersistence;
+
+                               heap_close(rel, AccessShareLock);
+                               if (relpersistence == RELPERSISTENCE_TEMP)
+                                       return true;
+                       }
+               }
+
+               return query_tree_walker(query,
+                                                                isViewOnTempTable_walker,
+                                                                context,
+                                                                QTW_IGNORE_JOINALIASES);
+       }
+
+       return expression_tree_walker(node,
+                                                                 isViewOnTempTable_walker,
+                                                                 context);
+}
 
 /*---------------------------------------------------------------------
  * DefineVirtualRelation
@@ -42,42 +97,56 @@ static void checkViewTupleDesc(TupleDesc newdesc, TupleDesc olddesc);
  *---------------------------------------------------------------------
  */
 static Oid
-DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace)
+DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace,
+                                         Oid namespaceId)
 {
-       Oid                     viewOid,
-                               namespaceId;
+       Oid                     viewOid;
        CreateStmt *createStmt = makeNode(CreateStmt);
        List       *attrList;
        ListCell   *t;
 
        /*
-        * create a list of ColumnDef nodes based on the names and types of
-        * the (non-junk) targetlist items from the view's SELECT list.
+        * create a list of ColumnDef nodes based on the names and types of the
+        * (non-junk) targetlist items from the view's SELECT list.
         */
        attrList = NIL;
        foreach(t, tlist)
        {
-               TargetEntry *entry = lfirst(t);
-               Resdom     *res = entry->resdom;
+               TargetEntry *tle = lfirst(t);
 
-               if (!res->resjunk)
+               if (!tle->resjunk)
                {
                        ColumnDef  *def = makeNode(ColumnDef);
-                       TypeName   *typename = makeNode(TypeName);
-
-                       def->colname = pstrdup(res->resname);
-
-                       typename->typeid = res->restype;
-                       typename->typmod = res->restypmod;
-                       def->typename = typename;
 
+                       def->colname = pstrdup(tle->resname);
+                       def->typeName = makeTypeNameFromOid(exprType((Node *) tle->expr),
+                                                                                        exprTypmod((Node *) tle->expr));
                        def->inhcount = 0;
                        def->is_local = true;
                        def->is_not_null = false;
+                       def->is_from_type = false;
+                       def->storage = 0;
                        def->raw_default = NULL;
                        def->cooked_default = NULL;
+                       def->collClause = NULL;
+                       def->collOid = exprCollation((Node *) tle->expr);
+
+                       /*
+                        * It's possible that the column is of a collatable type but the
+                        * collation could not be resolved, so double-check.
+                        */
+                       if (type_is_collatable(exprType((Node *) tle->expr)))
+                       {
+                               if (!OidIsValid(def->collOid))
+                                       ereport(ERROR,
+                                                       (errcode(ERRCODE_INDETERMINATE_COLLATION),
+                                                        errmsg("could not determine which collation to use for view column \"%s\"",
+                                                                       def->colname),
+                                                        errhint("Use the COLLATE clause to set the collation explicitly.")));
+                       }
+                       else
+                               Assert(!OidIsValid(def->collOid));
                        def->constraints = NIL;
-                       def->support = NULL;
 
                        attrList = lappend(attrList, def);
                }
@@ -91,7 +160,6 @@ DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace)
        /*
         * Check to see if we want to replace an existing view.
         */
-       namespaceId = RangeVarGetCreationNamespace(relation);
        viewOid = get_relname_relid(relation->relname, namespaceId);
 
        if (OidIsValid(viewOid) && replace)
@@ -117,14 +185,53 @@ DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace)
                        aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
                                                   RelationGetRelationName(rel));
 
+               /* Also check it's not in use already */
+               CheckTableNotInUse(rel, "CREATE OR REPLACE VIEW");
+
                /*
-                * Create a tuple descriptor to compare against the existing view,
-                * and verify it matches.
+                * Due to the namespace visibility rules for temporary objects, we
+                * should only end up replacing a temporary view with another
+                * temporary view, and similarly for permanent views.
+                */
+               Assert(relation->relpersistence == rel->rd_rel->relpersistence);
+
+               /*
+                * Create a tuple descriptor to compare against the existing view, and
+                * verify that the old column list is an initial prefix of the new
+                * column list.
                 */
                descriptor = BuildDescForRelation(attrList);
                checkViewTupleDesc(descriptor, rel->rd_att);
 
                /*
+                * If new attributes have been added, we must add pg_attribute entries
+                * for them.  It is convenient (although overkill) to use the ALTER
+                * TABLE ADD COLUMN infrastructure for this.
+                */
+               if (list_length(attrList) > rel->rd_att->natts)
+               {
+                       List       *atcmds = NIL;
+                       ListCell   *c;
+                       int                     skip = rel->rd_att->natts;
+
+                       foreach(c, attrList)
+                       {
+                               AlterTableCmd *atcmd;
+
+                               if (skip > 0)
+                               {
+                                       skip--;
+                                       continue;
+                               }
+                               atcmd = makeNode(AlterTableCmd);
+                               atcmd->subtype = AT_AddColumnToView;
+                               atcmd->def = (Node *) lfirst(c);
+                               atcmds = lappend(atcmds, atcmd);
+                       }
+                       AlterTableInternal(viewOid, atcmds, true);
+               }
+
+               /*
                 * Seems okay, so return the OID of the pre-existing view.
                 */
                relation_close(rel, NoLock);    /* keep the lock! */
@@ -133,24 +240,29 @@ DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace)
        }
        else
        {
+               Oid                     relid;
+
                /*
-                * now set the parameters for keys/inheritance etc. All of these
-                * are uninteresting for views...
+                * now set the parameters for keys/inheritance etc. All of these are
+                * uninteresting for views...
                 */
                createStmt->relation = (RangeVar *) relation;
                createStmt->tableElts = attrList;
                createStmt->inhRelations = NIL;
                createStmt->constraints = NIL;
-               createStmt->hasoids = MUST_NOT_HAVE_OIDS;
+               createStmt->options = list_make1(defWithOids(false));
                createStmt->oncommit = ONCOMMIT_NOOP;
                createStmt->tablespacename = NULL;
+               createStmt->if_not_exists = false;
 
                /*
                 * finally create the relation (this will error out if there's an
-                * existing view, so we don't need more code to complain if
-                * "replace" is false).
+                * existing view, so we don't need more code to complain if "replace"
+                * is false).
                 */
-               return DefineRelation(createStmt, RELKIND_VIEW);
+               relid = DefineRelation(createStmt, RELKIND_VIEW, InvalidOid);
+               Assert(relid != InvalidOid);
+               return relid;
        }
 }
 
@@ -158,41 +270,47 @@ DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace)
  * Verify that tupledesc associated with proposed new view definition
  * matches tupledesc of old view.  This is basically a cut-down version
  * of equalTupleDescs(), with code added to generate specific complaints.
+ * Also, we allow the new tupledesc to have more columns than the old.
  */
 static void
 checkViewTupleDesc(TupleDesc newdesc, TupleDesc olddesc)
 {
        int                     i;
 
-       if (newdesc->natts != olddesc->natts)
+       if (newdesc->natts < olddesc->natts)
                ereport(ERROR,
                                (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
-                                errmsg("cannot change number of columns in view")));
+                                errmsg("cannot drop columns from view")));
        /* we can ignore tdhasoid */
 
-       for (i = 0; i < newdesc->natts; i++)
+       for (i = 0; i < olddesc->natts; i++)
        {
                Form_pg_attribute newattr = newdesc->attrs[i];
                Form_pg_attribute oldattr = olddesc->attrs[i];
 
-               /* XXX not right, but we don't support DROP COL on view anyway */
+               /* XXX msg not right, but we don't support DROP COL on view anyway */
                if (newattr->attisdropped != oldattr->attisdropped)
                        ereport(ERROR,
                                        (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
-                                        errmsg("cannot change number of columns in view")));
+                                        errmsg("cannot drop columns from view")));
 
                if (strcmp(NameStr(newattr->attname), NameStr(oldattr->attname)) != 0)
                        ereport(ERROR,
                                        (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
-                                        errmsg("cannot change name of view column \"%s\"",
-                                                       NameStr(oldattr->attname))));
+                                errmsg("cannot change name of view column \"%s\" to \"%s\"",
+                                               NameStr(oldattr->attname),
+                                               NameStr(newattr->attname))));
                /* XXX would it be safe to allow atttypmod to change?  Not sure */
                if (newattr->atttypid != oldattr->atttypid ||
                        newattr->atttypmod != oldattr->atttypmod)
                        ereport(ERROR,
                                        (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
-                                  errmsg("cannot change data type of view column \"%s\"",
-                                                 NameStr(oldattr->attname))));
+                                        errmsg("cannot change data type of view column \"%s\" from %s to %s",
+                                                       NameStr(oldattr->attname),
+                                                       format_type_with_typemod(oldattr->atttypid,
+                                                                                                        oldattr->atttypmod),
+                                                       format_type_with_typemod(newattr->atttypid,
+                                                                                                        newattr->atttypmod))));
                /* We can ignore the remaining attributes of an attribute... */
        }
 
@@ -203,54 +321,24 @@ checkViewTupleDesc(TupleDesc newdesc, TupleDesc olddesc)
         */
 }
 
-static RuleStmt *
-FormViewRetrieveRule(const RangeVar *view, Query *viewParse, bool replace)
+static void
+DefineViewRules(Oid viewOid, Query *viewParse, bool replace)
 {
-       RuleStmt   *rule;
-
        /*
-        * Create a RuleStmt that corresponds to the suitable rewrite rule
-        * args for DefineQueryRewrite();
+        * Set up the ON SELECT rule.  Since the query has already been through
+        * parse analysis, we use DefineQueryRewrite() directly.
         */
-       rule = makeNode(RuleStmt);
-       rule->relation = copyObject((RangeVar *) view);
-       rule->rulename = pstrdup(ViewSelectRuleName);
-       rule->whereClause = NULL;
-       rule->event = CMD_SELECT;
-       rule->instead = true;
-       rule->actions = list_make1(viewParse);
-       rule->replace = replace;
-
-       return rule;
-}
-
-static void
-DefineViewRules(const RangeVar *view, Query *viewParse, bool replace)
-{
-       RuleStmt   *retrieve_rule;
-
-#ifdef NOTYET
-       RuleStmt   *replace_rule;
-       RuleStmt   *append_rule;
-       RuleStmt   *delete_rule;
-#endif
-
-       retrieve_rule = FormViewRetrieveRule(view, viewParse, replace);
-
-#ifdef NOTYET
-       replace_rule = FormViewReplaceRule(view, viewParse);
-       append_rule = FormViewAppendRule(view, viewParse);
-       delete_rule = FormViewDeleteRule(view, viewParse);
-#endif
-
-       DefineQueryRewrite(retrieve_rule);
-
-#ifdef NOTYET
-       DefineQueryRewrite(replace_rule);
-       DefineQueryRewrite(append_rule);
-       DefineQueryRewrite(delete_rule);
-#endif
+       DefineQueryRewrite(pstrdup(ViewSelectRuleName),
+                                          viewOid,
+                                          NULL,
+                                          CMD_SELECT,
+                                          true,
+                                          replace,
+                                          list_make1(viewParse));
 
+       /*
+        * Someday: automatic ON INSERT, etc
+        */
 }
 
 /*---------------------------------------------------------------
@@ -273,29 +361,33 @@ DefineViewRules(const RangeVar *view, Query *viewParse, bool replace)
 static Query *
 UpdateRangeTableOfViewParse(Oid viewOid, Query *viewParse)
 {
+       Relation        viewRel;
        List       *new_rt;
        RangeTblEntry *rt_entry1,
                           *rt_entry2;
 
        /*
         * Make a copy of the given parsetree.  It's not so much that we don't
-        * want to scribble on our input, it's that the parser has a bad habit
-        * of outputting multiple links to the same subtree for constructs
-        * like BETWEEN, and we mustn't have OffsetVarNodes increment the
-        * varno of a Var node twice.  copyObject will expand any
-        * multiply-referenced subtree into multiple copies.
+        * want to scribble on our input, it's that the parser has a bad habit of
+        * outputting multiple links to the same subtree for constructs like
+        * BETWEEN, and we mustn't have OffsetVarNodes increment the varno of a
+        * Var node twice.      copyObject will expand any multiply-referenced subtree
+        * into multiple copies.
         */
        viewParse = (Query *) copyObject(viewParse);
 
+       /* need to open the rel for addRangeTableEntryForRelation */
+       viewRel = relation_open(viewOid, AccessShareLock);
+
        /*
-        * Create the 2 new range table entries and form the new range
-        * table... OLD first, then NEW....
+        * Create the 2 new range table entries and form the new range table...
+        * OLD first, then NEW....
         */
-       rt_entry1 = addRangeTableEntryForRelation(NULL, viewOid,
-                                                                                         makeAlias("*OLD*", NIL),
+       rt_entry1 = addRangeTableEntryForRelation(NULL, viewRel,
+                                                                                         makeAlias("old", NIL),
                                                                                          false, false);
-       rt_entry2 = addRangeTableEntryForRelation(NULL, viewOid,
-                                                                                         makeAlias("*NEW*", NIL),
+       rt_entry2 = addRangeTableEntryForRelation(NULL, viewRel,
+                                                                                         makeAlias("new", NIL),
                                                                                          false, false);
        /* Must override addRangeTableEntry's default access-check flags */
        rt_entry1->requiredPerms = 0;
@@ -310,25 +402,109 @@ UpdateRangeTableOfViewParse(Oid viewOid, Query *viewParse)
         */
        OffsetVarNodes((Node *) viewParse, 2, 0);
 
+       relation_close(viewRel, AccessShareLock);
+
        return viewParse;
 }
 
-/*-------------------------------------------------------------------
+/*
  * DefineView
- *
- *             - takes a "viewname", "parsetree" pair and then
- *             1)              construct the "virtual" relation
- *             2)              commit the command but NOT the transaction,
- *                             so that the relation exists
- *                             before the rules are defined.
- *             2)              define the "n" rules specified in the PRS2 paper
- *                             over the "virtual" relation
- *-------------------------------------------------------------------
+ *             Execute a CREATE VIEW command.
  */
 void
-DefineView(const RangeVar *view, Query *viewParse, bool replace)
+DefineView(ViewStmt *stmt, const char *queryString)
 {
+       Query      *viewParse;
        Oid                     viewOid;
+       Oid                     namespaceId;
+       RangeVar   *view;
+
+       /*
+        * Run parse analysis to convert the raw parse tree to a Query.  Note this
+        * also acquires sufficient locks on the source table(s).
+        *
+        * Since parse analysis scribbles on its input, copy the raw parse tree;
+        * this ensures we don't corrupt a prepared statement, for example.
+        */
+       viewParse = parse_analyze((Node *) copyObject(stmt->query),
+                                                         queryString, NULL, 0);
+
+       /*
+        * The grammar should ensure that the result is a single SELECT Query.
+        */
+       if (!IsA(viewParse, Query) ||
+               viewParse->commandType != CMD_SELECT)
+               elog(ERROR, "unexpected parse analysis result");
+
+       /*
+        * Check for unsupported cases.  These tests are redundant with ones in
+        * DefineQueryRewrite(), but that function will complain about a bogus ON
+        * SELECT rule, and we'd rather the message complain about a view.
+        */
+       if (viewParse->intoClause != NULL)
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("views must not contain SELECT INTO")));
+       if (viewParse->hasModifyingCTE)
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+               errmsg("views must not contain data-modifying statements in WITH")));
+
+       /*
+        * If a list of column names was given, run through and insert these into
+        * the actual query tree. - thomas 2000-03-08
+        */
+       if (stmt->aliases != NIL)
+       {
+               ListCell   *alist_item = list_head(stmt->aliases);
+               ListCell   *targetList;
+
+               foreach(targetList, viewParse->targetList)
+               {
+                       TargetEntry *te = (TargetEntry *) lfirst(targetList);
+
+                       Assert(IsA(te, TargetEntry));
+                       /* junk columns don't get aliases */
+                       if (te->resjunk)
+                               continue;
+                       te->resname = pstrdup(strVal(lfirst(alist_item)));
+                       alist_item = lnext(alist_item);
+                       if (alist_item == NULL)
+                               break;                  /* done assigning aliases */
+               }
+
+               if (alist_item != NULL)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_SYNTAX_ERROR),
+                                        errmsg("CREATE VIEW specifies more column "
+                                                       "names than columns")));
+       }
+
+       /* Unlogged views are not sensible. */
+       if (stmt->view->relpersistence == RELPERSISTENCE_UNLOGGED)
+               ereport(ERROR,
+                               (errcode(ERRCODE_SYNTAX_ERROR),
+               errmsg("views cannot be unlogged because they do not have storage")));
+
+       /*
+        * If the user didn't explicitly ask for a temporary view, check whether
+        * we need one implicitly.      We allow TEMP to be inserted automatically as
+        * long as the CREATE command is consistent with that --- no explicit
+        * schema name.
+        */
+       view = copyObject(stmt->view);  /* don't corrupt original command */
+       if (view->relpersistence == RELPERSISTENCE_PERMANENT
+               && isViewOnTempTable(viewParse))
+       {
+               view->relpersistence = RELPERSISTENCE_TEMP;
+               ereport(NOTICE,
+                               (errmsg("view \"%s\" will be a temporary view",
+                                               view->relname)));
+       }
+
+       /* Might also need to make it temporary if placed in temp schema. */
+       namespaceId = RangeVarGetCreationNamespace(view);
+       RangeVarAdjustRelationPersistence(view, namespaceId);
 
        /*
         * Create the view relation
@@ -336,47 +512,24 @@ DefineView(const RangeVar *view, Query *viewParse, bool replace)
         * NOTE: if it already exists and replace is false, the xact will be
         * aborted.
         */
-
-       viewOid = DefineVirtualRelation(view, viewParse->targetList, replace);
+       viewOid = DefineVirtualRelation(view, viewParse->targetList,
+                                                                       stmt->replace, namespaceId);
 
        /*
-        * The relation we have just created is not visible to any other
-        * commands running with the same transaction & command id. So,
-        * increment the command id counter (but do NOT pfree any memory!!!!)
+        * The relation we have just created is not visible to any other commands
+        * running with the same transaction & command id. So, increment the
+        * command id counter (but do NOT pfree any memory!!!!)
         */
        CommandCounterIncrement();
 
        /*
-        * The range table of 'viewParse' does not contain entries for the
-        * "OLD" and "NEW" relations. So... add them!
+        * The range table of 'viewParse' does not contain entries for the "OLD"
+        * and "NEW" relations. So... add them!
         */
        viewParse = UpdateRangeTableOfViewParse(viewOid, viewParse);
 
        /*
         * Now create the rules associated with the view.
         */
-       DefineViewRules(view, viewParse, replace);
-}
-
-/*
- * RemoveView
- *
- * Remove a view given its name
- *
- * We just have to drop the relation; the associated rules will be
- * cleaned up automatically.
- */
-void
-RemoveView(const RangeVar *view, DropBehavior behavior)
-{
-       Oid                     viewOid;
-       ObjectAddress object;
-
-       viewOid = RangeVarGetRelid(view, false);
-
-       object.classId = RelOid_pg_class;
-       object.objectId = viewOid;
-       object.objectSubId = 0;
-
-       performDeletion(&object, behavior);
+       DefineViewRules(viewOid, viewParse, stmt->replace);
 }