OSDN Git Service

Fix outdated comment
[pg-rex/syncrep.git] / src / backend / parser / parse_utilcmd.c
index eafc3b3..d40a963 100644 (file)
@@ -33,6 +33,7 @@
 #include "catalog/heap.h"
 #include "catalog/index.h"
 #include "catalog/namespace.h"
+#include "catalog/pg_collation.h"
 #include "catalog/pg_constraint.h"
 #include "catalog/pg_opclass.h"
 #include "catalog/pg_operator.h"
@@ -111,6 +112,7 @@ static void transformOfType(CreateStmtContext *cxt,
 static char *chooseIndexName(const RangeVar *relation, IndexStmt *index_stmt);
 static IndexStmt *generateClonedIndexStmt(CreateStmtContext *cxt,
                                                Relation parent_index, AttrNumber *attmap);
+static List *get_collation(Oid collation, Oid actual_datatype);
 static List *get_opclass(Oid opclass, Oid actual_datatype);
 static void transformIndexConstraints(CreateStmtContext *cxt);
 static IndexStmt *transformIndexConstraint(Constraint *constraint,
@@ -119,7 +121,7 @@ static void transformFKConstraints(CreateStmtContext *cxt,
                                           bool skipValidation,
                                           bool isAddConstraint);
 static void transformConstraintAttrs(CreateStmtContext *cxt,
-                                                                        List *constraintList);
+                                                List *constraintList);
 static void transformColumnType(CreateStmtContext *cxt, ColumnDef *column);
 static void setSchemaName(char *context_schema, char **stmt_schema_name);
 
@@ -146,6 +148,7 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
        List       *result;
        List       *save_alist;
        ListCell   *elements;
+       Oid                     namespaceid;
 
        /*
         * We must not scribble on the passed-in CreateStmt, so copy it.  (This is
@@ -154,6 +157,33 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
        stmt = (CreateStmt *) copyObject(stmt);
 
        /*
+        * Look up the creation namespace.      This also checks permissions on the
+        * target namespace, so that we throw any permissions error as early as
+        * possible.
+        */
+       namespaceid = RangeVarGetAndCheckCreationNamespace(stmt->relation);
+
+       /*
+        * If the relation already exists and the user specified "IF NOT EXISTS",
+        * bail out with a NOTICE.
+        */
+       if (stmt->if_not_exists)
+       {
+               Oid                     existing_relid;
+
+               existing_relid = get_relname_relid(stmt->relation->relname,
+                                                                                  namespaceid);
+               if (existing_relid != InvalidOid)
+               {
+                       ereport(NOTICE,
+                                       (errcode(ERRCODE_DUPLICATE_TABLE),
+                                        errmsg("relation \"%s\" already exists, skipping",
+                                                       stmt->relation->relname)));
+                       return NIL;
+               }
+       }
+
+       /*
         * If the target relation name isn't schema-qualified, make it so.  This
         * prevents some corner cases in which added-on rewritten commands might
         * think they should apply to other relations that have the same name and
@@ -162,11 +192,7 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
         */
        if (stmt->relation->schemaname == NULL
                && stmt->relation->relpersistence != RELPERSISTENCE_TEMP)
-       {
-               Oid                     namespaceid = RangeVarGetCreationNamespace(stmt->relation);
-
                stmt->relation->schemaname = get_namespace_name(namespaceid);
-       }
 
        /* Set up pstate and CreateStmtContext */
        pstate = make_parsestate(NULL);
@@ -281,7 +307,14 @@ transformColumnDefinition(CreateStmtContext *cxt, ColumnDef *column)
        {
                char       *typname = strVal(linitial(column->typeName->names));
 
-               if (strcmp(typname, "serial") == 0 ||
+               if (strcmp(typname, "smallserial") == 0 ||
+                       strcmp(typname, "serial2") == 0)
+               {
+                       is_serial = true;
+                       column->typeName->names = NIL;
+                       column->typeName->typeOid = INT2OID;
+               }
+               else if (strcmp(typname, "serial") == 0 ||
                        strcmp(typname, "serial4") == 0)
                {
                        is_serial = true;
@@ -366,8 +399,8 @@ transformColumnDefinition(CreateStmtContext *cxt, ColumnDef *column)
                 * If this is ALTER ADD COLUMN, make sure the sequence will be owned
                 * by the table's owner.  The current user might be someone else
                 * (perhaps a superuser, or someone who's only a member of the owning
-                * role), but the SEQUENCE OWNED BY mechanisms will bleat unless
-                * table and sequence have exactly the same owning role.
+                * role), but the SEQUENCE OWNED BY mechanisms will bleat unless table
+                * and sequence have exactly the same owning role.
                 */
                if (cxt->rel)
                        seqstmt->ownerId = cxt->rel->rd_rel->relowner;
@@ -730,7 +763,7 @@ transformInhRelation(CreateStmtContext *cxt, InhRelation *inhRelation)
                        /* Copy comment on constraint */
                        if ((inhRelation->options & CREATE_TABLE_LIKE_COMMENTS) &&
                                (comment = GetComment(get_constraint_oid(RelationGetRelid(relation),
-                                                                                                                 n->conname, false),
+                                                                                                                n->conname, false),
                                                                          ConstraintRelationId,
                                                                          0)) != NULL)
                        {
@@ -819,7 +852,6 @@ static void
 transformOfType(CreateStmtContext *cxt, TypeName *ofTypename)
 {
        HeapTuple       tuple;
-       Form_pg_type typ;
        TupleDesc       tupdesc;
        int                     i;
        Oid                     ofTypeId;
@@ -827,16 +859,10 @@ transformOfType(CreateStmtContext *cxt, TypeName *ofTypename)
        AssertArg(ofTypename);
 
        tuple = typenameType(NULL, ofTypename, NULL);
-       typ = (Form_pg_type) GETSTRUCT(tuple);
+       check_of_type(tuple);
        ofTypeId = HeapTupleGetOid(tuple);
        ofTypename->typeOid = ofTypeId;         /* cached for later */
 
-       if (typ->typtype != TYPTYPE_COMPOSITE)
-               ereport(ERROR,
-                               (errcode(ERRCODE_WRONG_OBJECT_TYPE),
-                                errmsg("type %s is not a composite type",
-                                               format_type_be(ofTypeId))));
-
        tupdesc = lookup_rowtype_tupdesc(ofTypeId, -1);
        for (i = 0; i < tupdesc->natts; i++)
        {
@@ -904,6 +930,7 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx,
        Form_pg_class idxrelrec;
        Form_pg_index idxrec;
        Form_pg_am      amrec;
+       oidvector  *indcollation;
        oidvector  *indclass;
        IndexStmt  *index;
        List       *indexprs;
@@ -931,6 +958,12 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx,
        /* Fetch pg_am tuple for source index from relcache entry */
        amrec = source_idx->rd_am;
 
+       /* Extract indcollation from the pg_index tuple */
+       datum = SysCacheGetAttr(INDEXRELID, ht_idx,
+                                                       Anum_pg_index_indcollation, &isnull);
+       Assert(!isnull);
+       indcollation = (oidvector *) DatumGetPointer(datum);
+
        /* Extract indclass from the pg_index tuple */
        datum = SysCacheGetAttr(INDEXRELID, ht_idx,
                                                        Anum_pg_index_indclass, &isnull);
@@ -1094,6 +1127,9 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx,
                /* Copy the original index column name */
                iparam->indexcolname = pstrdup(NameStr(attrs[keyno]->attname));
 
+               /* Add the collation name, if non-default */
+               iparam->collation = get_collation(indcollation->values[keyno], keycoltype);
+
                /* Add the operator class name, if non-default */
                iparam->opclass = get_opclass(indclass->values[keyno], keycoltype);
 
@@ -1152,7 +1188,41 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx,
 }
 
 /*
- * get_opclass                 - fetch name of an index operator class
+ * get_collation               - fetch qualified name of a collation
+ *
+ * If collation is InvalidOid or is the default for the given actual_datatype,
+ * then the return value is NIL.
+ */
+static List *
+get_collation(Oid collation, Oid actual_datatype)
+{
+       List       *result;
+       HeapTuple       ht_coll;
+       Form_pg_collation coll_rec;
+       char       *nsp_name;
+       char       *coll_name;
+
+       if (!OidIsValid(collation))
+               return NIL;                             /* easy case */
+       if (collation == get_typcollation(actual_datatype))
+               return NIL;                             /* just let it default */
+
+       ht_coll = SearchSysCache1(COLLOID, ObjectIdGetDatum(collation));
+       if (!HeapTupleIsValid(ht_coll))
+               elog(ERROR, "cache lookup failed for collation %u", collation);
+       coll_rec = (Form_pg_collation) GETSTRUCT(ht_coll);
+
+       /* For simplicity, we always schema-qualify the name */
+       nsp_name = get_namespace_name(coll_rec->collnamespace);
+       coll_name = pstrdup(NameStr(coll_rec->collname));
+       result = list_make2(makeString(nsp_name), makeString(coll_name));
+
+       ReleaseSysCache(ht_coll);
+       return result;
+}
+
+/*
+ * get_opclass                 - fetch qualified name of an index operator class
  *
  * If the opclass is the default for the given actual_datatype, then
  * the return value is NIL.
@@ -1160,9 +1230,9 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx,
 static List *
 get_opclass(Oid opclass, Oid actual_datatype)
 {
+       List       *result = NIL;
        HeapTuple       ht_opc;
        Form_pg_opclass opc_rec;
-       List       *result = NIL;
 
        ht_opc = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclass));
        if (!HeapTupleIsValid(ht_opc))
@@ -1344,8 +1414,8 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
        /*
         * If it's ALTER TABLE ADD CONSTRAINT USING INDEX, look up the index and
         * verify it's usable, then extract the implied column name list.  (We
-        * will not actually need the column name list at runtime, but we need
-        * it now to check for duplicate column entries below.)
+        * will not actually need the column name list at runtime, but we need it
+        * now to check for duplicate column entries below.)
         */
        if (constraint->indexname != NULL)
        {
@@ -1390,8 +1460,8 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
                if (OidIsValid(get_index_constraint(index_oid)))
                        ereport(ERROR,
                                        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-                                        errmsg("index \"%s\" is already associated with a constraint",
-                                                       index_name),
+                          errmsg("index \"%s\" is already associated with a constraint",
+                                         index_name),
                                         parser_errposition(cxt->pstate, constraint->location)));
 
                /* Perform validity checks on the index */
@@ -1418,26 +1488,26 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
                        ereport(ERROR,
                                        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                                         errmsg("\"%s\" is not a unique index", index_name),
-                                        errdetail("Cannot create a PRIMARY KEY or UNIQUE constraint using such an index."),
+                                        errdetail("Cannot create a primary key or unique constraint using such an index."),
                                         parser_errposition(cxt->pstate, constraint->location)));
 
                if (RelationGetIndexExpressions(index_rel) != NIL)
                        ereport(ERROR,
                                        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                                         errmsg("index \"%s\" contains expressions", index_name),
-                                        errdetail("Cannot create a PRIMARY KEY or UNIQUE constraint using such an index."),
+                                        errdetail("Cannot create a primary key or unique constraint using such an index."),
                                         parser_errposition(cxt->pstate, constraint->location)));
 
                if (RelationGetIndexPredicate(index_rel) != NIL)
                        ereport(ERROR,
                                        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                                         errmsg("\"%s\" is a partial index", index_name),
-                                        errdetail("Cannot create a PRIMARY KEY or UNIQUE constraint using such an index."),
+                                        errdetail("Cannot create a primary key or unique constraint using such an index."),
                                         parser_errposition(cxt->pstate, constraint->location)));
 
                /*
-                * It's probably unsafe to change a deferred index to non-deferred.
-                * (A non-constraint index couldn't be deferred anyway, so this case
+                * It's probably unsafe to change a deferred index to non-deferred. (A
+                * non-constraint index couldn't be deferred anyway, so this case
                 * should never occur; no need to sweat, but let's check it.)
                 */
                if (!index_form->indimmediate && !constraint->deferrable)
@@ -1448,7 +1518,7 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
                                         parser_errposition(cxt->pstate, constraint->location)));
 
                /*
-                * Insist on it being a btree.  That's the only kind that supports
+                * Insist on it being a btree.  That's the only kind that supports
                 * uniqueness at the moment anyway; but we must have an index that
                 * exactly matches what you'd get from plain ADD CONSTRAINT syntax,
                 * else dump and reload will produce a different index (breaking
@@ -1468,15 +1538,15 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
 
                for (i = 0; i < index_form->indnatts; i++)
                {
-                       int2    attnum = index_form->indkey.values[i];
+                       int2            attnum = index_form->indkey.values[i];
                        Form_pg_attribute attform;
-                       char   *attname;
-                       Oid             defopclass;
+                       char       *attname;
+                       Oid                     defopclass;
 
                        /*
                         * We shouldn't see attnum == 0 here, since we already rejected
-                        * expression indexes.  If we do, SystemAttributeDefinition
-                        * will throw an error.
+                        * expression indexes.  If we do, SystemAttributeDefinition will
+                        * throw an error.
                         */
                        if (attnum > 0)
                        {
@@ -1485,11 +1555,11 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
                        }
                        else
                                attform = SystemAttributeDefinition(attnum,
-                                                                                                       heap_rel->rd_rel->relhasoids);
+                                                                                          heap_rel->rd_rel->relhasoids);
                        attname = pstrdup(NameStr(attform->attname));
 
                        /*
-                        * Insist on default opclass and sort options.  While the index
+                        * Insist on default opclass and sort options.  While the index
                         * would still work as a constraint with non-default settings, it
                         * might not provide exactly the same uniqueness semantics as
                         * you'd get from a normally-created constraint; and there's also
@@ -1502,8 +1572,8 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
                                ereport(ERROR,
                                                (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                                                 errmsg("index \"%s\" does not have default sorting behavior", index_name),
-                                                errdetail("Cannot create a PRIMARY KEY or UNIQUE constraint using such an index."),
-                                                parser_errposition(cxt->pstate, constraint->location)));
+                                                errdetail("Cannot create a primary key or unique constraint using such an index."),
+                                        parser_errposition(cxt->pstate, constraint->location)));
 
                        constraint->keys = lappend(constraint->keys, makeString(attname));
                }
@@ -1648,13 +1718,13 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
                                                        (errcode(ERRCODE_DUPLICATE_COLUMN),
                                                         errmsg("column \"%s\" appears twice in primary key constraint",
                                                                        key),
-                                                        parser_errposition(cxt->pstate, constraint->location)));
+                                        parser_errposition(cxt->pstate, constraint->location)));
                                else
                                        ereport(ERROR,
                                                        (errcode(ERRCODE_DUPLICATE_COLUMN),
                                        errmsg("column \"%s\" appears twice in unique constraint",
                                                   key),
-                                                        parser_errposition(cxt->pstate, constraint->location)));
+                                        parser_errposition(cxt->pstate, constraint->location)));
                        }
                }
 
@@ -1663,6 +1733,7 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
                iparam->name = pstrdup(key);
                iparam->expr = NULL;
                iparam->indexcolname = NULL;
+               iparam->collation = NIL;
                iparam->opclass = NIL;
                iparam->ordering = SORTBY_DEFAULT;
                iparam->nulls_ordering = SORTBY_NULLS_DEFAULT;
@@ -1687,7 +1758,8 @@ transformFKConstraints(CreateStmtContext *cxt,
 
        /*
         * If CREATE TABLE or adding a column with NULL default, we can safely
-        * skip validation of the constraint.
+        * skip validation of FK constraints, and nonetheless mark them valid.
+        * (This will override any user-supplied NOT VALID flag.)
         */
        if (skipValidation)
        {
@@ -1696,7 +1768,7 @@ transformFKConstraints(CreateStmtContext *cxt,
                        Constraint *constraint = (Constraint *) lfirst(fkclist);
 
                        constraint->skip_validation = true;
-                       constraint->initially_valid  = true;
+                       constraint->initially_valid = true;
                }
        }
 
@@ -1782,9 +1854,13 @@ transformIndexStmt(IndexStmt *stmt, const char *queryString)
 
        /* take care of the where clause */
        if (stmt->whereClause)
+       {
                stmt->whereClause = transformWhereClause(pstate,
                                                                                                 stmt->whereClause,
                                                                                                 "WHERE");
+               /* we have to fix its collations too */
+               assign_expr_collations(pstate, stmt->whereClause);
+       }
 
        /* take care of any index expressions */
        foreach(l, stmt->indexParams)
@@ -1912,6 +1988,8 @@ transformRuleStmt(RuleStmt *stmt, const char *queryString,
        *whereClause = transformWhereClause(pstate,
                                                                          (Node *) copyObject(stmt->whereClause),
                                                                                "WHERE");
+       /* we have to fix its collations too */
+       assign_expr_collations(pstate, *whereClause);
 
        if (list_length(pstate->p_rtable) != 2)         /* naughty, naughty... */
                ereport(ERROR,
@@ -2067,18 +2145,18 @@ transformRuleStmt(RuleStmt *stmt, const char *queryString,
                         * However, they were already in the outer rangetable when we
                         * analyzed the query, so we have to check.
                         *
-                        * Note that in the INSERT...SELECT case, we need to examine
-                        * the CTE lists of both top_subqry and sub_qry.
+                        * Note that in the INSERT...SELECT case, we need to examine the
+                        * CTE lists of both top_subqry and sub_qry.
                         *
-                        * Note that we aren't digging into the body of the query
-                        * looking for WITHs in nested sub-SELECTs.  A WITH down there
-                        * can legitimately refer to OLD/NEW, because it'd be an
+                        * Note that we aren't digging into the body of the query looking
+                        * for WITHs in nested sub-SELECTs.  A WITH down there can
+                        * legitimately refer to OLD/NEW, because it'd be an
                         * indirect-correlated outer reference.
                         */
                        if (rangeTableEntry_used((Node *) top_subqry->cteList,
                                                                         PRS2_OLD_VARNO, 0) ||
                                rangeTableEntry_used((Node *) sub_qry->cteList,
-                                                                         PRS2_OLD_VARNO, 0))
+                                                                        PRS2_OLD_VARNO, 0))
                                ereport(ERROR,
                                                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                                 errmsg("cannot refer to OLD within WITH query")));
@@ -2173,12 +2251,13 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString)
        lockmode = AlterTableGetLockLevel(stmt->cmds);
 
        /*
-        * Acquire appropriate lock on the target relation, which will be held until
-        * end of transaction.  This ensures any decisions we make here based on
-        * the state of the relation will still be good at execution. We must get
-        * lock now because execution will later require it; taking a lower grade lock
-        * now and trying to upgrade later risks deadlock.  Any new commands we add
-        * after this must not upgrade the lock level requested here.
+        * Acquire appropriate lock on the target relation, which will be held
+        * until end of transaction.  This ensures any decisions we make here
+        * based on the state of the relation will still be good at execution. We
+        * must get lock now because execution will later require it; taking a
+        * lower grade lock now and trying to upgrade later risks deadlock.  Any
+        * new commands we add after this must not upgrade the lock level
+        * requested here.
         */
        rel = relation_openrv(stmt->relation, lockmode);
 
@@ -2339,8 +2418,8 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString)
  * and detect inconsistent/misplaced constraint attributes.
  *
  * NOTE: currently, attributes are only supported for FOREIGN KEY, UNIQUE,
- * and PRIMARY KEY constraints, but someday they ought to be supported
- * for other constraint types.
+ * EXCLUSION, and PRIMARY KEY constraints, but someday they ought to be
+ * supported for other constraint types.
  */
 static void
 transformConstraintAttrs(CreateStmtContext *cxt, List *constraintList)
@@ -2469,11 +2548,10 @@ transformColumnType(CreateStmtContext *cxt, ColumnDef *column)
        if (column->collClause)
        {
                Form_pg_type typtup = (Form_pg_type) GETSTRUCT(ctype);
-               Oid             collOid;
 
-               collOid = LookupCollation(cxt->pstate,
-                                                                 column->collClause->collname,
-                                                                 column->collClause->location);
+               LookupCollation(cxt->pstate,
+                                               column->collClause->collname,
+                                               column->collClause->location);
                /* Complain if COLLATE is applied to an uncollatable type */
                if (!OidIsValid(typtup->typcollation))
                        ereport(ERROR,