1 /*-------------------------------------------------------------------------
4 * POSTGRES create/destroy relation with inheritance utility code.
6 * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * $Header: /cvsroot/pgsql/src/backend/commands/Attic/creatinh.c,v 1.89 2002/03/21 16:00:31 tgl Exp $
13 *-------------------------------------------------------------------------
18 #include "access/heapam.h"
19 #include "catalog/catalog.h"
20 #include "catalog/catname.h"
21 #include "catalog/indexing.h"
22 #include "catalog/heap.h"
23 #include "catalog/pg_inherits.h"
24 #include "catalog/pg_type.h"
25 #include "commands/creatinh.h"
26 #include "miscadmin.h"
27 #include "optimizer/clauses.h"
28 #include "utils/acl.h"
29 #include "utils/syscache.h"
30 #include "utils/temprel.h"
37 static List *MergeAttributes(List *schema, List *supers, bool istemp,
38 List **supOids, List **supconstr, bool *supHasOids);
39 static bool change_varattnos_of_a_node(Node *node, const AttrNumber *newattno);
40 static void StoreCatalogInheritance(Oid relationId, List *supers);
41 static int findAttrByName(const char *attributeName, List *schema);
42 static void setRelhassubclassInRelation(Oid relationId, bool relhassubclass);
43 static List *MergeDomainAttributes(List *schema);
46 /* ----------------------------------------------------------------
48 * Creates a new relation.
49 * ----------------------------------------------------------------
52 DefineRelation(CreateStmt *stmt, char relkind)
54 char *relname = palloc(NAMEDATALEN);
55 List *schema = stmt->tableElts;
56 int numberOfAttributes;
61 List *old_constraints;
69 * Truncate relname to appropriate length (probably a waste of time,
70 * as parser should have done this already).
72 StrNCpy(relname, (stmt->relation)->relname, NAMEDATALEN);
75 * Merge domain attributes into the known columns before processing table
76 * inheritance. Otherwise we risk adding double constraints to a
77 * domain-type column that's inherited.
79 schema = MergeDomainAttributes(schema);
82 * Look up inheritance ancestors and generate relation schema,
83 * including inherited attributes.
85 schema = MergeAttributes(schema, stmt->inhRelations,
86 stmt->relation->istemp,
87 &inheritOids, &old_constraints, &parentHasOids);
89 numberOfAttributes = length(schema);
90 if (numberOfAttributes <= 0)
91 elog(ERROR, "DefineRelation: please inherit from a relation or define an attribute");
94 * Create a relation descriptor from the relation schema and create
95 * the relation. Note that in this stage only inherited (pre-cooked)
96 * defaults and constraints will be included into the new relation.
97 * (BuildDescForRelation takes care of the inherited defaults, but we
98 * have to copy inherited constraints here.)
100 descriptor = BuildDescForRelation(schema, relname);
102 if (old_constraints != NIL)
104 ConstrCheck *check = (ConstrCheck *) palloc(length(old_constraints) *
105 sizeof(ConstrCheck));
108 foreach(listptr, old_constraints)
110 Constraint *cdef = (Constraint *) lfirst(listptr);
112 if (cdef->contype != CONSTR_CHECK)
115 if (cdef->name != NULL)
117 for (i = 0; i < ncheck; i++)
119 if (strcmp(check[i].ccname, cdef->name) == 0)
120 elog(ERROR, "Duplicate CHECK constraint name: '%s'",
123 check[ncheck].ccname = cdef->name;
127 check[ncheck].ccname = (char *) palloc(NAMEDATALEN);
128 snprintf(check[ncheck].ccname, NAMEDATALEN, "$%d", ncheck + 1);
130 Assert(cdef->raw_expr == NULL && cdef->cooked_expr != NULL);
131 check[ncheck].ccbin = pstrdup(cdef->cooked_expr);
136 if (descriptor->constr == NULL)
138 descriptor->constr = (TupleConstr *) palloc(sizeof(TupleConstr));
139 descriptor->constr->defval = NULL;
140 descriptor->constr->num_defval = 0;
141 descriptor->constr->has_not_null = false;
143 descriptor->constr->num_check = ncheck;
144 descriptor->constr->check = check;
148 relationId = heap_create_with_catalog(relname, descriptor,
150 stmt->hasoids || parentHasOids,
151 stmt->relation->istemp,
152 allowSystemTableMods);
154 StoreCatalogInheritance(relationId, inheritOids);
157 * We must bump the command counter to make the newly-created relation
158 * tuple visible for opening.
160 CommandCounterIncrement();
163 * Open the new relation and acquire exclusive lock on it. This isn't
164 * really necessary for locking out other backends (since they can't
165 * see the new rel anyway until we commit), but it keeps the lock
166 * manager from complaining about deadlock risks.
168 rel = heap_openr(relname, AccessExclusiveLock);
171 * Now add any newly specified column default values and CHECK
172 * constraints to the new relation. These are passed to us in the
173 * form of raw parsetrees; we need to transform them to executable
174 * expression trees before they can be added. The most convenient way
175 * to do that is to apply the parser's transformExpr routine, but
176 * transformExpr doesn't work unless we have a pre-existing relation.
177 * So, the transformation has to be postponed to this final step of
180 * First, scan schema to find new column defaults.
185 foreach(listptr, schema)
187 ColumnDef *colDef = lfirst(listptr);
188 RawColumnDefault *rawEnt;
192 if (colDef->raw_default == NULL)
194 Assert(colDef->cooked_default == NULL);
196 rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault));
197 rawEnt->attnum = attnum;
198 rawEnt->raw_default = colDef->raw_default;
199 rawDefaults = lappend(rawDefaults, rawEnt);
203 * Parse and add the defaults/constraints, if any.
205 if (rawDefaults || stmt->constraints)
206 AddRelationRawConstraints(rel, rawDefaults, stmt->constraints);
209 * Clean up. We keep lock on new relation (although it shouldn't be
210 * visible to anyone else anyway, until commit).
212 heap_close(rel, NoLock);
217 * Deletes a new relation.
220 * BadArg if name is invalid.
223 * If the relation has indices defined on it, then the index relations
224 * themselves will be destroyed, too.
227 RemoveRelation(const char *name)
230 heap_drop_with_catalog(name, allowSystemTableMods);
235 * Removes all the rows from a relation
238 * BadArg if name is invalid
241 * Rows are removed, indices are truncated and reconstructed.
244 TruncateRelation(const char *relname)
250 if (!allowSystemTableMods && IsSystemRelationName(relname))
251 elog(ERROR, "TRUNCATE cannot be used on system tables. '%s' is a system table",
254 if (!pg_ownercheck(GetUserId(), relname, RELNAME))
255 elog(ERROR, "you do not own relation \"%s\"", relname);
257 /* Grab exclusive lock in preparation for truncate */
258 rel = heap_openr(relname, AccessExclusiveLock);
260 if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
261 elog(ERROR, "TRUNCATE cannot be used on sequences. '%s' is a sequence",
264 if (rel->rd_rel->relkind == RELKIND_VIEW)
265 elog(ERROR, "TRUNCATE cannot be used on views. '%s' is a view",
268 /* Keep the lock until transaction commit */
269 heap_close(rel, NoLock);
271 heap_truncate(relname);
276 * MergeDomainAttributes
277 * Returns a new table schema with the constraints, types, and other
278 * attributes of domains resolved for fields using a domain as
282 MergeDomainAttributes(List *schema)
287 * Loop through the table elements supplied. These should
288 * never include inherited domains else they'll be
289 * double (or more) processed.
291 foreach(entry, schema)
293 ColumnDef *coldef = lfirst(entry);
295 Form_pg_type typeTup;
297 tuple = SearchSysCache(TYPENAME,
298 CStringGetDatum(coldef->typename->name),
300 if (!HeapTupleIsValid(tuple))
301 elog(ERROR, "MergeDomainAttributes: Type %s does not exist",
302 coldef->typename->name);
303 typeTup = (Form_pg_type) GETSTRUCT(tuple);
305 if (typeTup->typtype == 'd')
307 /* Force the column to have the correct typmod. */
308 coldef->typename->typmod = typeTup->typtypmod;
309 /* XXX more to do here? */
312 /* Enforce type NOT NULL || column definition NOT NULL -> NOT NULL */
313 /* Currently only used for domains, but could be valid for all */
314 coldef->is_not_null |= typeTup->typnotnull;
316 ReleaseSysCache(tuple);
324 * Returns new schema given initial schema and superclasses.
327 * 'schema' is the column/attribute definition for the table. (It's a list
328 * of ColumnDef's.) It is destructively changed.
329 * 'supers' is a list of names (as Value objects) of parent relations.
330 * 'istemp' is TRUE if we are creating a temp relation.
333 * 'supOids' receives an integer list of the OIDs of the parent relations.
334 * 'supconstr' receives a list of constraints belonging to the parents,
335 * updated as necessary to be valid for the child.
336 * 'supHasOids' is set TRUE if any parent has OIDs, else it is set FALSE.
339 * Completed schema list.
342 * The order in which the attributes are inherited is very important.
343 * Intuitively, the inherited attributes should come first. If a table
344 * inherits from multiple parents, the order of those attributes are
345 * according to the order of the parents specified in CREATE TABLE.
349 * create table person (name text, age int4, location point);
350 * create table emp (salary int4, manager text) inherits(person);
351 * create table student (gpa float8) inherits (person);
352 * create table stud_emp (percent int4) inherits (emp, student);
354 * The order of the attributes of stud_emp is:
356 * person {1:name, 2:age, 3:location}
358 * {6:gpa} student emp {4:salary, 5:manager}
360 * stud_emp {7:percent}
362 * If the same attribute name appears multiple times, then it appears
363 * in the result table in the proper location for its first appearance.
365 * Constraints (including NOT NULL constraints) for the child table
366 * are the union of all relevant constraints, from both the child schema
369 * The default value for a child column is defined as:
370 * (1) If the child schema specifies a default, that value is used.
371 * (2) If neither the child nor any parent specifies a default, then
372 * the column will not have a default.
373 * (3) If conflicting defaults are inherited from different parents
374 * (and not overridden by the child), an error is raised.
375 * (4) Otherwise the inherited default is used.
376 * Rule (3) is new in Postgres 7.1; in earlier releases you got a
377 * rather arbitrary choice of which parent default to use.
381 MergeAttributes(List *schema, List *supers, bool istemp,
382 List **supOids, List **supconstr, bool *supHasOids)
385 List *inhSchema = NIL;
386 List *parentOids = NIL;
387 List *constraints = NIL;
388 bool parentHasOids = false;
389 bool have_bogus_defaults = false;
390 char *bogus_marker = "Bogus!"; /* marks conflicting
395 * Check for duplicate names in the explicit list of attributes.
397 * Although we might consider merging such entries in the same way that
398 * we handle name conflicts for inherited attributes, it seems to make
399 * more sense to assume such conflicts are errors.
401 foreach(entry, schema)
403 ColumnDef *coldef = lfirst(entry);
406 foreach(rest, lnext(entry))
408 ColumnDef *restdef = lfirst(rest);
410 if (strcmp(coldef->colname, restdef->colname) == 0)
411 elog(ERROR, "CREATE TABLE: attribute \"%s\" duplicated",
417 * Reject duplicate names in the list of parents, too.
419 * XXX needs to be smarter about schema-qualified table names.
421 foreach(entry, supers)
425 foreach(rest, lnext(entry))
427 if (strcmp(((RangeVar *) lfirst(entry))->relname,
428 ((RangeVar *) lfirst(rest))->relname) == 0)
429 elog(ERROR, "CREATE TABLE: inherited relation \"%s\" duplicated",
430 ((RangeVar *) lfirst(entry))->relname);
435 * Scan the parents left-to-right, and merge their attributes to form
436 * a list of inherited attributes (inhSchema). Also check to see if
437 * we need to inherit an OID column.
440 foreach(entry, supers)
442 char *name = ((RangeVar *) lfirst(entry))->relname;
446 AttrNumber *newattno;
447 AttrNumber parent_attno;
449 relation = heap_openr(name, AccessShareLock);
451 if (relation->rd_rel->relkind != RELKIND_RELATION)
452 elog(ERROR, "CREATE TABLE: inherited relation \"%s\" is not a table", name);
453 /* Permanent rels cannot inherit from temporary ones */
454 if (!istemp && is_temp_rel_name(name))
455 elog(ERROR, "CREATE TABLE: cannot inherit from temp relation \"%s\"", name);
458 * We should have an UNDER permission flag for this, but for now,
459 * demand that creator of a child table own the parent.
461 if (!pg_ownercheck(GetUserId(), name, RELNAME))
462 elog(ERROR, "you do not own table \"%s\"", name);
464 parentOids = lappendi(parentOids, relation->rd_id);
465 setRelhassubclassInRelation(relation->rd_id, true);
467 parentHasOids |= relation->rd_rel->relhasoids;
469 tupleDesc = RelationGetDescr(relation);
470 constr = tupleDesc->constr;
473 * newattno[] will contain the child-table attribute numbers for
474 * the attributes of this parent table. (They are not the same
475 * for parents after the first one.)
477 newattno = (AttrNumber *) palloc(tupleDesc->natts * sizeof(AttrNumber));
479 for (parent_attno = 1; parent_attno <= tupleDesc->natts;
482 Form_pg_attribute attribute = tupleDesc->attrs[parent_attno - 1];
491 * Get name and type name of attribute
493 attributeName = NameStr(attribute->attname);
494 tuple = SearchSysCache(TYPEOID,
495 ObjectIdGetDatum(attribute->atttypid),
497 if (!HeapTupleIsValid(tuple))
498 elog(ERROR, "CREATE TABLE: cache lookup failed for type %u",
499 attribute->atttypid);
500 attributeType = pstrdup(NameStr(((Form_pg_type) GETSTRUCT(tuple))->typname));
501 ReleaseSysCache(tuple);
504 * Does it conflict with some previously inherited column?
506 exist_attno = findAttrByName(attributeName, inhSchema);
510 * Yes, try to merge the two column definitions. They must
511 * have the same type and typmod.
513 elog(NOTICE, "CREATE TABLE: merging multiple inherited definitions of attribute \"%s\"",
515 def = (ColumnDef *) nth(exist_attno - 1, inhSchema);
516 if (strcmp(def->typename->name, attributeType) != 0 ||
517 def->typename->typmod != attribute->atttypmod)
518 elog(ERROR, "CREATE TABLE: inherited attribute \"%s\" type conflict (%s and %s)",
519 attributeName, def->typename->name, attributeType);
520 /* Merge of NOT NULL constraints = OR 'em together */
521 def->is_not_null |= attribute->attnotnull;
522 /* Default and other constraints are handled below */
523 newattno[parent_attno - 1] = exist_attno;
528 * No, create a new inherited column
530 def = makeNode(ColumnDef);
531 def->colname = pstrdup(attributeName);
532 typename = makeNode(TypeName);
533 typename->name = attributeType;
534 typename->typmod = attribute->atttypmod;
535 def->typename = typename;
536 def->is_not_null = attribute->attnotnull;
537 def->raw_default = NULL;
538 def->cooked_default = NULL;
539 def->constraints = NIL;
540 inhSchema = lappend(inhSchema, def);
541 newattno[parent_attno - 1] = ++child_attno;
545 * Copy default if any
547 if (attribute->atthasdef)
549 char *this_default = NULL;
550 AttrDefault *attrdef;
553 /* Find default in constraint structure */
554 Assert(constr != NULL);
555 attrdef = constr->defval;
556 for (i = 0; i < constr->num_defval; i++)
558 if (attrdef[i].adnum == parent_attno)
560 this_default = attrdef[i].adbin;
564 Assert(this_default != NULL);
567 * If default expr could contain any vars, we'd need to
568 * fix 'em, but it can't; so default is ready to apply to
571 * If we already had a default from some prior parent, check
572 * to see if they are the same. If so, no problem; if
573 * not, mark the column as having a bogus default. Below,
574 * we will complain if the bogus default isn't overridden
575 * by the child schema.
577 Assert(def->raw_default == NULL);
578 if (def->cooked_default == NULL)
579 def->cooked_default = pstrdup(this_default);
580 else if (strcmp(def->cooked_default, this_default) != 0)
582 def->cooked_default = bogus_marker;
583 have_bogus_defaults = true;
589 * Now copy the constraints of this parent, adjusting attnos using
590 * the completed newattno[] map
592 if (constr && constr->num_check > 0)
594 ConstrCheck *check = constr->check;
597 for (i = 0; i < constr->num_check; i++)
599 Constraint *cdef = makeNode(Constraint);
602 cdef->contype = CONSTR_CHECK;
603 if (check[i].ccname[0] == '$')
606 cdef->name = pstrdup(check[i].ccname);
607 cdef->raw_expr = NULL;
608 /* adjust varattnos of ccbin here */
609 expr = stringToNode(check[i].ccbin);
610 change_varattnos_of_a_node(expr, newattno);
611 cdef->cooked_expr = nodeToString(expr);
612 constraints = lappend(constraints, cdef);
619 * Close the parent rel, but keep our AccessShareLock on it until
620 * xact commit. That will prevent someone else from deleting or
621 * ALTERing the parent before the child is committed.
623 heap_close(relation, NoLock);
627 * If we had no inherited attributes, the result schema is just the
628 * explicitly declared columns. Otherwise, we need to merge the
629 * declared columns into the inherited schema list.
631 if (inhSchema != NIL)
633 foreach(entry, schema)
635 ColumnDef *newdef = lfirst(entry);
636 char *attributeName = newdef->colname;
637 char *attributeType = newdef->typename->name;
641 * Does it conflict with some previously inherited column?
643 exist_attno = findAttrByName(attributeName, inhSchema);
649 * Yes, try to merge the two column definitions. They must
650 * have the same type and typmod.
652 elog(NOTICE, "CREATE TABLE: merging attribute \"%s\" with inherited definition",
654 def = (ColumnDef *) nth(exist_attno - 1, inhSchema);
655 if (strcmp(def->typename->name, attributeType) != 0 ||
656 def->typename->typmod != newdef->typename->typmod)
657 elog(ERROR, "CREATE TABLE: attribute \"%s\" type conflict (%s and %s)",
658 attributeName, def->typename->name, attributeType);
659 /* Merge of NOT NULL constraints = OR 'em together */
660 def->is_not_null |= newdef->is_not_null;
661 /* If new def has a default, override previous default */
662 if (newdef->raw_default != NULL)
664 def->raw_default = newdef->raw_default;
665 def->cooked_default = newdef->cooked_default;
671 * No, attach new column to result schema
673 inhSchema = lappend(inhSchema, newdef);
681 * If we found any conflicting parent default values, check to make
682 * sure they were overridden by the child.
684 if (have_bogus_defaults)
686 foreach(entry, schema)
688 ColumnDef *def = lfirst(entry);
690 if (def->cooked_default == bogus_marker)
691 elog(ERROR, "CREATE TABLE: attribute \"%s\" inherits conflicting default values"
692 "\n\tTo resolve the conflict, specify a default explicitly",
697 *supOids = parentOids;
698 *supconstr = constraints;
699 *supHasOids = parentHasOids;
704 * complementary static functions for MergeAttributes().
706 * Varattnos of pg_relcheck.rcbin must be rewritten when subclasses inherit
707 * constraints from parent classes, since the inherited attributes could
708 * be given different column numbers in multiple-inheritance cases.
710 * Note that the passed node tree is modified in place!
713 change_varattnos_walker(Node *node, const AttrNumber *newattno)
719 Var *var = (Var *) node;
721 if (var->varlevelsup == 0 && var->varno == 1 &&
725 * ??? the following may be a problem when the node is
726 * multiply referenced though stringToNode() doesn't create
727 * such a node currently.
729 Assert(newattno[var->varattno - 1] > 0);
730 var->varattno = newattno[var->varattno - 1];
734 return expression_tree_walker(node, change_varattnos_walker,
739 change_varattnos_of_a_node(Node *node, const AttrNumber *newattno)
741 return change_varattnos_walker(node, newattno);
745 * StoreCatalogInheritance
746 * Updates the system catalogs with proper inheritance information.
748 * supers is an integer list of the OIDs of the new relation's direct
749 * ancestors. NB: it is destructively changed to include indirect ancestors.
752 StoreCatalogInheritance(Oid relationId, List *supers)
763 AssertArg(OidIsValid(relationId));
769 * Catalog INHERITS information using direct ancestors only.
771 relation = heap_openr(InheritsRelationName, RowExclusiveLock);
772 desc = RelationGetDescr(relation);
775 foreach(entry, supers)
777 Oid entryOid = lfirsti(entry);
778 Datum datum[Natts_pg_inherits];
779 char nullarr[Natts_pg_inherits];
781 datum[0] = ObjectIdGetDatum(relationId); /* inhrel */
782 datum[1] = ObjectIdGetDatum(entryOid); /* inhparent */
783 datum[2] = Int16GetDatum(seqNumber); /* inhseqno */
789 tuple = heap_formtuple(desc, datum, nullarr);
791 heap_insert(relation, tuple);
793 if (RelationGetForm(relation)->relhasindex)
795 Relation idescs[Num_pg_inherits_indices];
797 CatalogOpenIndices(Num_pg_inherits_indices, Name_pg_inherits_indices, idescs);
798 CatalogIndexInsert(idescs, Num_pg_inherits_indices, relation, tuple);
799 CatalogCloseIndices(Num_pg_inherits_indices, idescs);
802 heap_freetuple(tuple);
807 heap_close(relation, RowExclusiveLock);
810 * Expand supers list to include indirect ancestors as well.
813 * 0. begin with list of direct superclasses.
814 * 1. append after each relationId, its superclasses, recursively.
815 * 2. remove all but last of duplicates.
820 * 1. append after each relationId, its superclasses, recursively.
822 foreach(entry, supers)
830 id = (Oid) lfirsti(entry);
834 for (number = 1;; number += 1)
836 tuple = SearchSysCache(INHRELID,
837 ObjectIdGetDatum(id),
838 Int16GetDatum(number),
840 if (!HeapTupleIsValid(tuple))
843 lnext(current) = lconsi(((Form_pg_inherits)
844 GETSTRUCT(tuple))->inhparent,
847 ReleaseSysCache(tuple);
849 current = lnext(current);
851 lnext(current) = next;
855 * 2. remove all but last of duplicates.
857 foreach(entry, supers)
864 thisone = lfirsti(entry);
866 foreach(rest, lnext(entry))
868 if (thisone == lfirsti(rest))
877 * found a later duplicate, so remove this entry.
879 lfirsti(entry) = lfirsti(lnext(entry));
880 lnext(entry) = lnext(lnext(entry));
888 * Look for an existing schema entry with the given name.
890 * Returns the index (starting with 1) if attribute already exists in schema,
894 findAttrByName(const char *attributeName, List *schema)
901 ColumnDef *def = lfirst(s);
904 if (strcmp(attributeName, def->colname) == 0)
911 * Update a relation's pg_class.relhassubclass entry to the given value
914 setRelhassubclassInRelation(Oid relationId, bool relhassubclass)
916 Relation relationRelation;
918 Relation idescs[Num_pg_class_indices];
921 * Fetch a modifiable copy of the tuple, modify it, update pg_class.
923 relationRelation = heap_openr(RelationRelationName, RowExclusiveLock);
924 tuple = SearchSysCacheCopy(RELOID,
925 ObjectIdGetDatum(relationId),
927 if (!HeapTupleIsValid(tuple))
928 elog(ERROR, "setRelhassubclassInRelation: cache lookup failed for relation %u", relationId);
930 ((Form_pg_class) GETSTRUCT(tuple))->relhassubclass = relhassubclass;
931 simple_heap_update(relationRelation, &tuple->t_self, tuple);
933 /* keep the catalog indices up to date */
934 CatalogOpenIndices(Num_pg_class_indices, Name_pg_class_indices, idescs);
935 CatalogIndexInsert(idescs, Num_pg_class_indices, relationRelation, tuple);
936 CatalogCloseIndices(Num_pg_class_indices, idescs);
938 heap_freetuple(tuple);
939 heap_close(relationRelation, RowExclusiveLock);