OSDN Git Service

First phase of SCHEMA changes, concentrating on fixing the grammar and
[pg-rex/syncrep.git] / src / backend / commands / creatinh.c
1 /*-------------------------------------------------------------------------
2  *
3  * creatinh.c
4  *        POSTGRES create/destroy relation with inheritance utility code.
5  *
6  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/commands/Attic/creatinh.c,v 1.89 2002/03/21 16:00:31 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15
16 #include "postgres.h"
17
18 #include "access/heapam.h"
19 #include "catalog/catalog.h"
20 #include "catalog/catname.h"
21 #include "catalog/indexing.h"
22 #include "catalog/heap.h"
23 #include "catalog/pg_inherits.h"
24 #include "catalog/pg_type.h"
25 #include "commands/creatinh.h"
26 #include "miscadmin.h"
27 #include "optimizer/clauses.h"
28 #include "utils/acl.h"
29 #include "utils/syscache.h"
30 #include "utils/temprel.h"
31
32 /* ----------------
33  *              local stuff
34  * ----------------
35  */
36
37 static List *MergeAttributes(List *schema, List *supers, bool istemp,
38                                 List **supOids, List **supconstr, bool *supHasOids);
39 static bool change_varattnos_of_a_node(Node *node, const AttrNumber *newattno);
40 static void StoreCatalogInheritance(Oid relationId, List *supers);
41 static int      findAttrByName(const char *attributeName, List *schema);
42 static void setRelhassubclassInRelation(Oid relationId, bool relhassubclass);
43 static List *MergeDomainAttributes(List *schema);
44
45
46 /* ----------------------------------------------------------------
47  *              DefineRelation
48  *                              Creates a new relation.
49  * ----------------------------------------------------------------
50  */
51 void
52 DefineRelation(CreateStmt *stmt, char relkind)
53 {
54         char       *relname = palloc(NAMEDATALEN);
55         List       *schema = stmt->tableElts;
56         int                     numberOfAttributes;
57         Oid                     relationId;
58         Relation        rel;
59         TupleDesc       descriptor;
60         List       *inheritOids;
61         List       *old_constraints;
62         bool            parentHasOids;
63         List       *rawDefaults;
64         List       *listptr;
65         int                     i;
66         AttrNumber      attnum;
67
68         /*
69          * Truncate relname to appropriate length (probably a waste of time,
70          * as parser should have done this already).
71          */
72         StrNCpy(relname, (stmt->relation)->relname, NAMEDATALEN);
73
74         /*
75          * Merge domain attributes into the known columns before processing table
76          * inheritance.  Otherwise we risk adding double constraints to a
77          * domain-type column that's inherited.
78          */
79         schema = MergeDomainAttributes(schema);
80
81         /*
82          * Look up inheritance ancestors and generate relation schema,
83          * including inherited attributes.
84          */
85         schema = MergeAttributes(schema, stmt->inhRelations,
86                                                          stmt->relation->istemp,
87                                                          &inheritOids, &old_constraints, &parentHasOids);
88
89         numberOfAttributes = length(schema);
90         if (numberOfAttributes <= 0)
91                 elog(ERROR, "DefineRelation: please inherit from a relation or define an attribute");
92
93         /*
94          * Create a relation descriptor from the relation schema and create
95          * the relation.  Note that in this stage only inherited (pre-cooked)
96          * defaults and constraints will be included into the new relation.
97          * (BuildDescForRelation takes care of the inherited defaults, but we
98          * have to copy inherited constraints here.)
99          */
100         descriptor = BuildDescForRelation(schema, relname);
101
102         if (old_constraints != NIL)
103         {
104                 ConstrCheck *check = (ConstrCheck *) palloc(length(old_constraints) *
105                                                                                                         sizeof(ConstrCheck));
106                 int                     ncheck = 0;
107
108                 foreach(listptr, old_constraints)
109                 {
110                         Constraint *cdef = (Constraint *) lfirst(listptr);
111
112                         if (cdef->contype != CONSTR_CHECK)
113                                 continue;
114
115                         if (cdef->name != NULL)
116                         {
117                                 for (i = 0; i < ncheck; i++)
118                                 {
119                                         if (strcmp(check[i].ccname, cdef->name) == 0)
120                                                 elog(ERROR, "Duplicate CHECK constraint name: '%s'",
121                                                          cdef->name);
122                                 }
123                                 check[ncheck].ccname = cdef->name;
124                         }
125                         else
126                         {
127                                 check[ncheck].ccname = (char *) palloc(NAMEDATALEN);
128                                 snprintf(check[ncheck].ccname, NAMEDATALEN, "$%d", ncheck + 1);
129                         }
130                         Assert(cdef->raw_expr == NULL && cdef->cooked_expr != NULL);
131                         check[ncheck].ccbin = pstrdup(cdef->cooked_expr);
132                         ncheck++;
133                 }
134                 if (ncheck > 0)
135                 {
136                         if (descriptor->constr == NULL)
137                         {
138                                 descriptor->constr = (TupleConstr *) palloc(sizeof(TupleConstr));
139                                 descriptor->constr->defval = NULL;
140                                 descriptor->constr->num_defval = 0;
141                                 descriptor->constr->has_not_null = false;
142                         }
143                         descriptor->constr->num_check = ncheck;
144                         descriptor->constr->check = check;
145                 }
146         }
147
148         relationId = heap_create_with_catalog(relname, descriptor,
149                                                                                   relkind,
150                                                                                   stmt->hasoids || parentHasOids,
151                                                                                   stmt->relation->istemp,
152                                                                                   allowSystemTableMods);
153
154         StoreCatalogInheritance(relationId, inheritOids);
155
156         /*
157          * We must bump the command counter to make the newly-created relation
158          * tuple visible for opening.
159          */
160         CommandCounterIncrement();
161
162         /*
163          * Open the new relation and acquire exclusive lock on it.      This isn't
164          * really necessary for locking out other backends (since they can't
165          * see the new rel anyway until we commit), but it keeps the lock
166          * manager from complaining about deadlock risks.
167          */
168         rel = heap_openr(relname, AccessExclusiveLock);
169
170         /*
171          * Now add any newly specified column default values and CHECK
172          * constraints to the new relation.  These are passed to us in the
173          * form of raw parsetrees; we need to transform them to executable
174          * expression trees before they can be added. The most convenient way
175          * to do that is to apply the parser's transformExpr routine, but
176          * transformExpr doesn't work unless we have a pre-existing relation.
177          * So, the transformation has to be postponed to this final step of
178          * CREATE TABLE.
179          *
180          * First, scan schema to find new column defaults.
181          */
182         rawDefaults = NIL;
183         attnum = 0;
184
185         foreach(listptr, schema)
186         {
187                 ColumnDef  *colDef = lfirst(listptr);
188                 RawColumnDefault *rawEnt;
189
190                 attnum++;
191
192                 if (colDef->raw_default == NULL)
193                         continue;
194                 Assert(colDef->cooked_default == NULL);
195
196                 rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault));
197                 rawEnt->attnum = attnum;
198                 rawEnt->raw_default = colDef->raw_default;
199                 rawDefaults = lappend(rawDefaults, rawEnt);
200         }
201
202         /*
203          * Parse and add the defaults/constraints, if any.
204          */
205         if (rawDefaults || stmt->constraints)
206                 AddRelationRawConstraints(rel, rawDefaults, stmt->constraints);
207
208         /*
209          * Clean up.  We keep lock on new relation (although it shouldn't be
210          * visible to anyone else anyway, until commit).
211          */
212         heap_close(rel, NoLock);
213 }
214
215 /*
216  * RemoveRelation
217  *              Deletes a new relation.
218  *
219  * Exceptions:
220  *              BadArg if name is invalid.
221  *
222  * Note:
223  *              If the relation has indices defined on it, then the index relations
224  * themselves will be destroyed, too.
225  */
226 void
227 RemoveRelation(const char *name)
228 {
229         AssertArg(name);
230         heap_drop_with_catalog(name, allowSystemTableMods);
231 }
232
233 /*
234  * TruncateRelation
235  *                                Removes all the rows from a relation
236  *
237  * Exceptions:
238  *                                BadArg if name is invalid
239  *
240  * Note:
241  *                                Rows are removed, indices are truncated and reconstructed.
242  */
243 void
244 TruncateRelation(const char *relname)
245 {
246         Relation        rel;
247
248         AssertArg(relname);
249
250         if (!allowSystemTableMods && IsSystemRelationName(relname))
251                 elog(ERROR, "TRUNCATE cannot be used on system tables. '%s' is a system table",
252                          relname);
253
254         if (!pg_ownercheck(GetUserId(), relname, RELNAME))
255                 elog(ERROR, "you do not own relation \"%s\"", relname);
256
257         /* Grab exclusive lock in preparation for truncate */
258         rel = heap_openr(relname, AccessExclusiveLock);
259
260         if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
261                 elog(ERROR, "TRUNCATE cannot be used on sequences. '%s' is a sequence",
262                          relname);
263
264         if (rel->rd_rel->relkind == RELKIND_VIEW)
265                 elog(ERROR, "TRUNCATE cannot be used on views. '%s' is a view",
266                          relname);
267
268         /* Keep the lock until transaction commit */
269         heap_close(rel, NoLock);
270
271         heap_truncate(relname);
272 }
273
274
275 /*
276  * MergeDomainAttributes
277  *      Returns a new table schema with the constraints, types, and other
278  *      attributes of domains resolved for fields using a domain as
279  *      their type.
280  */
281 static List *
282 MergeDomainAttributes(List *schema)
283 {
284         List       *entry;
285
286         /*
287          * Loop through the table elements supplied. These should
288          * never include inherited domains else they'll be
289          * double (or more) processed.
290          */
291         foreach(entry, schema)
292         {
293                 ColumnDef  *coldef = lfirst(entry);
294                 HeapTuple  tuple;
295                 Form_pg_type typeTup;
296
297                 tuple = SearchSysCache(TYPENAME,
298                                                            CStringGetDatum(coldef->typename->name),
299                                                            0,0,0);
300                 if (!HeapTupleIsValid(tuple))
301                         elog(ERROR, "MergeDomainAttributes: Type %s does not exist",
302                                  coldef->typename->name);
303                 typeTup = (Form_pg_type) GETSTRUCT(tuple);
304
305                 if (typeTup->typtype == 'd')
306                 {
307                         /* Force the column to have the correct typmod. */
308                         coldef->typename->typmod = typeTup->typtypmod;
309                         /* XXX more to do here? */
310                 }
311
312                 /* Enforce type NOT NULL || column definition NOT NULL -> NOT NULL */
313                 /* Currently only used for domains, but could be valid for all */
314                 coldef->is_not_null |= typeTup->typnotnull;
315
316                 ReleaseSysCache(tuple);
317         }
318
319         return schema;
320 }
321
322 /*----------
323  * MergeAttributes
324  *              Returns new schema given initial schema and superclasses.
325  *
326  * Input arguments:
327  * 'schema' is the column/attribute definition for the table. (It's a list
328  *              of ColumnDef's.) It is destructively changed.
329  * 'supers' is a list of names (as Value objects) of parent relations.
330  * 'istemp' is TRUE if we are creating a temp relation.
331  *
332  * Output arguments:
333  * 'supOids' receives an integer list of the OIDs of the parent relations.
334  * 'supconstr' receives a list of constraints belonging to the parents,
335  *              updated as necessary to be valid for the child.
336  * 'supHasOids' is set TRUE if any parent has OIDs, else it is set FALSE.
337  *
338  * Return value:
339  * Completed schema list.
340  *
341  * Notes:
342  *        The order in which the attributes are inherited is very important.
343  *        Intuitively, the inherited attributes should come first. If a table
344  *        inherits from multiple parents, the order of those attributes are
345  *        according to the order of the parents specified in CREATE TABLE.
346  *
347  *        Here's an example:
348  *
349  *              create table person (name text, age int4, location point);
350  *              create table emp (salary int4, manager text) inherits(person);
351  *              create table student (gpa float8) inherits (person);
352  *              create table stud_emp (percent int4) inherits (emp, student);
353  *
354  *        The order of the attributes of stud_emp is:
355  *
356  *                                                      person {1:name, 2:age, 3:location}
357  *                                                      /        \
358  *                         {6:gpa}      student   emp {4:salary, 5:manager}
359  *                                                      \        /
360  *                                                 stud_emp {7:percent}
361  *
362  *         If the same attribute name appears multiple times, then it appears
363  *         in the result table in the proper location for its first appearance.
364  *
365  *         Constraints (including NOT NULL constraints) for the child table
366  *         are the union of all relevant constraints, from both the child schema
367  *         and parent tables.
368  *
369  *         The default value for a child column is defined as:
370  *              (1) If the child schema specifies a default, that value is used.
371  *              (2) If neither the child nor any parent specifies a default, then
372  *                      the column will not have a default.
373  *              (3) If conflicting defaults are inherited from different parents
374  *                      (and not overridden by the child), an error is raised.
375  *              (4) Otherwise the inherited default is used.
376  *              Rule (3) is new in Postgres 7.1; in earlier releases you got a
377  *              rather arbitrary choice of which parent default to use.
378  *----------
379  */
380 static List *
381 MergeAttributes(List *schema, List *supers, bool istemp,
382                                 List **supOids, List **supconstr, bool *supHasOids)
383 {
384         List       *entry;
385         List       *inhSchema = NIL;
386         List       *parentOids = NIL;
387         List       *constraints = NIL;
388         bool            parentHasOids = false;
389         bool            have_bogus_defaults = false;
390         char       *bogus_marker = "Bogus!";            /* marks conflicting
391                                                                                                  * defaults */
392         int                     child_attno;
393
394         /*
395          * Check for duplicate names in the explicit list of attributes.
396          *
397          * Although we might consider merging such entries in the same way that
398          * we handle name conflicts for inherited attributes, it seems to make
399          * more sense to assume such conflicts are errors.
400          */
401         foreach(entry, schema)
402         {
403                 ColumnDef  *coldef = lfirst(entry);
404                 List       *rest;
405
406                 foreach(rest, lnext(entry))
407                 {
408                         ColumnDef  *restdef = lfirst(rest);
409
410                         if (strcmp(coldef->colname, restdef->colname) == 0)
411                                 elog(ERROR, "CREATE TABLE: attribute \"%s\" duplicated",
412                                          coldef->colname);
413                 }
414         }
415
416         /*
417          * Reject duplicate names in the list of parents, too.
418          *
419          * XXX needs to be smarter about schema-qualified table names.
420          */
421         foreach(entry, supers)
422         {
423                 List       *rest;
424
425                 foreach(rest, lnext(entry))
426                 {
427                         if (strcmp(((RangeVar *) lfirst(entry))->relname, 
428                                            ((RangeVar *) lfirst(rest))->relname) == 0)
429                                 elog(ERROR, "CREATE TABLE: inherited relation \"%s\" duplicated",
430                                          ((RangeVar *) lfirst(entry))->relname);
431                 }
432         }
433
434         /*
435          * Scan the parents left-to-right, and merge their attributes to form
436          * a list of inherited attributes (inhSchema).  Also check to see if
437          * we need to inherit an OID column.
438          */
439         child_attno = 0;
440         foreach(entry, supers)
441         {
442                 char       *name = ((RangeVar *) lfirst(entry))->relname;
443                 Relation        relation;
444                 TupleDesc       tupleDesc;
445                 TupleConstr *constr;
446                 AttrNumber *newattno;
447                 AttrNumber      parent_attno;
448
449                 relation = heap_openr(name, AccessShareLock);
450
451                 if (relation->rd_rel->relkind != RELKIND_RELATION)
452                         elog(ERROR, "CREATE TABLE: inherited relation \"%s\" is not a table", name);
453                 /* Permanent rels cannot inherit from temporary ones */
454                 if (!istemp && is_temp_rel_name(name))
455                         elog(ERROR, "CREATE TABLE: cannot inherit from temp relation \"%s\"", name);
456
457                 /*
458                  * We should have an UNDER permission flag for this, but for now,
459                  * demand that creator of a child table own the parent.
460                  */
461                 if (!pg_ownercheck(GetUserId(), name, RELNAME))
462                         elog(ERROR, "you do not own table \"%s\"", name);
463
464                 parentOids = lappendi(parentOids, relation->rd_id);
465                 setRelhassubclassInRelation(relation->rd_id, true);
466
467                 parentHasOids |= relation->rd_rel->relhasoids;
468
469                 tupleDesc = RelationGetDescr(relation);
470                 constr = tupleDesc->constr;
471
472                 /*
473                  * newattno[] will contain the child-table attribute numbers for
474                  * the attributes of this parent table.  (They are not the same
475                  * for parents after the first one.)
476                  */
477                 newattno = (AttrNumber *) palloc(tupleDesc->natts * sizeof(AttrNumber));
478
479                 for (parent_attno = 1; parent_attno <= tupleDesc->natts;
480                          parent_attno++)
481                 {
482                         Form_pg_attribute attribute = tupleDesc->attrs[parent_attno - 1];
483                         char       *attributeName;
484                         char       *attributeType;
485                         HeapTuple       tuple;
486                         int                     exist_attno;
487                         ColumnDef  *def;
488                         TypeName   *typename;
489
490                         /*
491                          * Get name and type name of attribute
492                          */
493                         attributeName = NameStr(attribute->attname);
494                         tuple = SearchSysCache(TYPEOID,
495                                                                    ObjectIdGetDatum(attribute->atttypid),
496                                                                    0, 0, 0);
497                         if (!HeapTupleIsValid(tuple))
498                                 elog(ERROR, "CREATE TABLE: cache lookup failed for type %u",
499                                          attribute->atttypid);
500                         attributeType = pstrdup(NameStr(((Form_pg_type) GETSTRUCT(tuple))->typname));
501                         ReleaseSysCache(tuple);
502
503                         /*
504                          * Does it conflict with some previously inherited column?
505                          */
506                         exist_attno = findAttrByName(attributeName, inhSchema);
507                         if (exist_attno > 0)
508                         {
509                                 /*
510                                  * Yes, try to merge the two column definitions. They must
511                                  * have the same type and typmod.
512                                  */
513                                 elog(NOTICE, "CREATE TABLE: merging multiple inherited definitions of attribute \"%s\"",
514                                          attributeName);
515                                 def = (ColumnDef *) nth(exist_attno - 1, inhSchema);
516                                 if (strcmp(def->typename->name, attributeType) != 0 ||
517                                         def->typename->typmod != attribute->atttypmod)
518                                         elog(ERROR, "CREATE TABLE: inherited attribute \"%s\" type conflict (%s and %s)",
519                                           attributeName, def->typename->name, attributeType);
520                                 /* Merge of NOT NULL constraints = OR 'em together */
521                                 def->is_not_null |= attribute->attnotnull;
522                                 /* Default and other constraints are handled below */
523                                 newattno[parent_attno - 1] = exist_attno;
524                         }
525                         else
526                         {
527                                 /*
528                                  * No, create a new inherited column
529                                  */
530                                 def = makeNode(ColumnDef);
531                                 def->colname = pstrdup(attributeName);
532                                 typename = makeNode(TypeName);
533                                 typename->name = attributeType;
534                                 typename->typmod = attribute->atttypmod;
535                                 def->typename = typename;
536                                 def->is_not_null = attribute->attnotnull;
537                                 def->raw_default = NULL;
538                                 def->cooked_default = NULL;
539                                 def->constraints = NIL;
540                                 inhSchema = lappend(inhSchema, def);
541                                 newattno[parent_attno - 1] = ++child_attno;
542                         }
543
544                         /*
545                          * Copy default if any
546                          */
547                         if (attribute->atthasdef)
548                         {
549                                 char       *this_default = NULL;
550                                 AttrDefault *attrdef;
551                                 int                     i;
552
553                                 /* Find default in constraint structure */
554                                 Assert(constr != NULL);
555                                 attrdef = constr->defval;
556                                 for (i = 0; i < constr->num_defval; i++)
557                                 {
558                                         if (attrdef[i].adnum == parent_attno)
559                                         {
560                                                 this_default = attrdef[i].adbin;
561                                                 break;
562                                         }
563                                 }
564                                 Assert(this_default != NULL);
565
566                                 /*
567                                  * If default expr could contain any vars, we'd need to
568                                  * fix 'em, but it can't; so default is ready to apply to
569                                  * child.
570                                  *
571                                  * If we already had a default from some prior parent, check
572                                  * to see if they are the same.  If so, no problem; if
573                                  * not, mark the column as having a bogus default. Below,
574                                  * we will complain if the bogus default isn't overridden
575                                  * by the child schema.
576                                  */
577                                 Assert(def->raw_default == NULL);
578                                 if (def->cooked_default == NULL)
579                                         def->cooked_default = pstrdup(this_default);
580                                 else if (strcmp(def->cooked_default, this_default) != 0)
581                                 {
582                                         def->cooked_default = bogus_marker;
583                                         have_bogus_defaults = true;
584                                 }
585                         }
586                 }
587
588                 /*
589                  * Now copy the constraints of this parent, adjusting attnos using
590                  * the completed newattno[] map
591                  */
592                 if (constr && constr->num_check > 0)
593                 {
594                         ConstrCheck *check = constr->check;
595                         int                     i;
596
597                         for (i = 0; i < constr->num_check; i++)
598                         {
599                                 Constraint *cdef = makeNode(Constraint);
600                                 Node       *expr;
601
602                                 cdef->contype = CONSTR_CHECK;
603                                 if (check[i].ccname[0] == '$')
604                                         cdef->name = NULL;
605                                 else
606                                         cdef->name = pstrdup(check[i].ccname);
607                                 cdef->raw_expr = NULL;
608                                 /* adjust varattnos of ccbin here */
609                                 expr = stringToNode(check[i].ccbin);
610                                 change_varattnos_of_a_node(expr, newattno);
611                                 cdef->cooked_expr = nodeToString(expr);
612                                 constraints = lappend(constraints, cdef);
613                         }
614                 }
615
616                 pfree(newattno);
617
618                 /*
619                  * Close the parent rel, but keep our AccessShareLock on it until
620                  * xact commit.  That will prevent someone else from deleting or
621                  * ALTERing the parent before the child is committed.
622                  */
623                 heap_close(relation, NoLock);
624         }
625
626         /*
627          * If we had no inherited attributes, the result schema is just the
628          * explicitly declared columns.  Otherwise, we need to merge the
629          * declared columns into the inherited schema list.
630          */
631         if (inhSchema != NIL)
632         {
633                 foreach(entry, schema)
634                 {
635                         ColumnDef  *newdef = lfirst(entry);
636                         char       *attributeName = newdef->colname;
637                         char       *attributeType = newdef->typename->name;
638                         int                     exist_attno;
639
640                         /*
641                          * Does it conflict with some previously inherited column?
642                          */
643                         exist_attno = findAttrByName(attributeName, inhSchema);
644                         if (exist_attno > 0)
645                         {
646                                 ColumnDef  *def;
647
648                                 /*
649                                  * Yes, try to merge the two column definitions. They must
650                                  * have the same type and typmod.
651                                  */
652                                 elog(NOTICE, "CREATE TABLE: merging attribute \"%s\" with inherited definition",
653                                          attributeName);
654                                 def = (ColumnDef *) nth(exist_attno - 1, inhSchema);
655                                 if (strcmp(def->typename->name, attributeType) != 0 ||
656                                         def->typename->typmod != newdef->typename->typmod)
657                                         elog(ERROR, "CREATE TABLE: attribute \"%s\" type conflict (%s and %s)",
658                                           attributeName, def->typename->name, attributeType);
659                                 /* Merge of NOT NULL constraints = OR 'em together */
660                                 def->is_not_null |= newdef->is_not_null;
661                                 /* If new def has a default, override previous default */
662                                 if (newdef->raw_default != NULL)
663                                 {
664                                         def->raw_default = newdef->raw_default;
665                                         def->cooked_default = newdef->cooked_default;
666                                 }
667                         }
668                         else
669                         {
670                                 /*
671                                  * No, attach new column to result schema
672                                  */
673                                 inhSchema = lappend(inhSchema, newdef);
674                         }
675                 }
676
677                 schema = inhSchema;
678         }
679
680         /*
681          * If we found any conflicting parent default values, check to make
682          * sure they were overridden by the child.
683          */
684         if (have_bogus_defaults)
685         {
686                 foreach(entry, schema)
687                 {
688                         ColumnDef  *def = lfirst(entry);
689
690                         if (def->cooked_default == bogus_marker)
691                                 elog(ERROR, "CREATE TABLE: attribute \"%s\" inherits conflicting default values"
692                                          "\n\tTo resolve the conflict, specify a default explicitly",
693                                          def->colname);
694                 }
695         }
696
697         *supOids = parentOids;
698         *supconstr = constraints;
699         *supHasOids = parentHasOids;
700         return schema;
701 }
702
703 /*
704  * complementary static functions for MergeAttributes().
705  *
706  * Varattnos of pg_relcheck.rcbin must be rewritten when subclasses inherit
707  * constraints from parent classes, since the inherited attributes could
708  * be given different column numbers in multiple-inheritance cases.
709  *
710  * Note that the passed node tree is modified in place!
711  */
712 static bool
713 change_varattnos_walker(Node *node, const AttrNumber *newattno)
714 {
715         if (node == NULL)
716                 return false;
717         if (IsA(node, Var))
718         {
719                 Var                *var = (Var *) node;
720
721                 if (var->varlevelsup == 0 && var->varno == 1 &&
722                         var->varattno > 0)
723                 {
724                         /*
725                          * ??? the following may be a problem when the node is
726                          * multiply referenced though stringToNode() doesn't create
727                          * such a node currently.
728                          */
729                         Assert(newattno[var->varattno - 1] > 0);
730                         var->varattno = newattno[var->varattno - 1];
731                 }
732                 return false;
733         }
734         return expression_tree_walker(node, change_varattnos_walker,
735                                                                   (void *) newattno);
736 }
737
738 static bool
739 change_varattnos_of_a_node(Node *node, const AttrNumber *newattno)
740 {
741         return change_varattnos_walker(node, newattno);
742 }
743
744 /*
745  * StoreCatalogInheritance
746  *              Updates the system catalogs with proper inheritance information.
747  *
748  * supers is an integer list of the OIDs of the new relation's direct
749  * ancestors.  NB: it is destructively changed to include indirect ancestors.
750  */
751 static void
752 StoreCatalogInheritance(Oid relationId, List *supers)
753 {
754         Relation        relation;
755         TupleDesc       desc;
756         int16           seqNumber;
757         List       *entry;
758         HeapTuple       tuple;
759
760         /*
761          * sanity checks
762          */
763         AssertArg(OidIsValid(relationId));
764
765         if (supers == NIL)
766                 return;
767
768         /*
769          * Catalog INHERITS information using direct ancestors only.
770          */
771         relation = heap_openr(InheritsRelationName, RowExclusiveLock);
772         desc = RelationGetDescr(relation);
773
774         seqNumber = 1;
775         foreach(entry, supers)
776         {
777                 Oid                     entryOid = lfirsti(entry);
778                 Datum           datum[Natts_pg_inherits];
779                 char            nullarr[Natts_pg_inherits];
780
781                 datum[0] = ObjectIdGetDatum(relationId);                /* inhrel */
782                 datum[1] = ObjectIdGetDatum(entryOid);  /* inhparent */
783                 datum[2] = Int16GetDatum(seqNumber);    /* inhseqno */
784
785                 nullarr[0] = ' ';
786                 nullarr[1] = ' ';
787                 nullarr[2] = ' ';
788
789                 tuple = heap_formtuple(desc, datum, nullarr);
790
791                 heap_insert(relation, tuple);
792
793                 if (RelationGetForm(relation)->relhasindex)
794                 {
795                         Relation        idescs[Num_pg_inherits_indices];
796
797                         CatalogOpenIndices(Num_pg_inherits_indices, Name_pg_inherits_indices, idescs);
798                         CatalogIndexInsert(idescs, Num_pg_inherits_indices, relation, tuple);
799                         CatalogCloseIndices(Num_pg_inherits_indices, idescs);
800                 }
801
802                 heap_freetuple(tuple);
803
804                 seqNumber += 1;
805         }
806
807         heap_close(relation, RowExclusiveLock);
808
809         /* ----------------
810          * Expand supers list to include indirect ancestors as well.
811          *
812          * Algorithm:
813          *      0. begin with list of direct superclasses.
814          *      1. append after each relationId, its superclasses, recursively.
815          *      2. remove all but last of duplicates.
816          * ----------------
817          */
818
819         /*
820          * 1. append after each relationId, its superclasses, recursively.
821          */
822         foreach(entry, supers)
823         {
824                 HeapTuple       tuple;
825                 Oid                     id;
826                 int16           number;
827                 List       *next;
828                 List       *current;
829
830                 id = (Oid) lfirsti(entry);
831                 current = entry;
832                 next = lnext(entry);
833
834                 for (number = 1;; number += 1)
835                 {
836                         tuple = SearchSysCache(INHRELID,
837                                                                    ObjectIdGetDatum(id),
838                                                                    Int16GetDatum(number),
839                                                                    0, 0);
840                         if (!HeapTupleIsValid(tuple))
841                                 break;
842
843                         lnext(current) = lconsi(((Form_pg_inherits)
844                                                                          GETSTRUCT(tuple))->inhparent,
845                                                                         NIL);
846
847                         ReleaseSysCache(tuple);
848
849                         current = lnext(current);
850                 }
851                 lnext(current) = next;
852         }
853
854         /*
855          * 2. remove all but last of duplicates.
856          */
857         foreach(entry, supers)
858         {
859                 Oid                     thisone;
860                 bool            found;
861                 List       *rest;
862
863 again:
864                 thisone = lfirsti(entry);
865                 found = false;
866                 foreach(rest, lnext(entry))
867                 {
868                         if (thisone == lfirsti(rest))
869                         {
870                                 found = true;
871                                 break;
872                         }
873                 }
874                 if (found)
875                 {
876                         /*
877                          * found a later duplicate, so remove this entry.
878                          */
879                         lfirsti(entry) = lfirsti(lnext(entry));
880                         lnext(entry) = lnext(lnext(entry));
881
882                         goto again;
883                 }
884         }
885 }
886
887 /*
888  * Look for an existing schema entry with the given name.
889  *
890  * Returns the index (starting with 1) if attribute already exists in schema,
891  * 0 if it doesn't.
892  */
893 static int
894 findAttrByName(const char *attributeName, List *schema)
895 {
896         List       *s;
897         int                     i = 0;
898
899         foreach(s, schema)
900         {
901                 ColumnDef  *def = lfirst(s);
902
903                 ++i;
904                 if (strcmp(attributeName, def->colname) == 0)
905                         return i;
906         }
907         return 0;
908 }
909
910 /*
911  * Update a relation's pg_class.relhassubclass entry to the given value
912  */
913 static void
914 setRelhassubclassInRelation(Oid relationId, bool relhassubclass)
915 {
916         Relation        relationRelation;
917         HeapTuple       tuple;
918         Relation        idescs[Num_pg_class_indices];
919
920         /*
921          * Fetch a modifiable copy of the tuple, modify it, update pg_class.
922          */
923         relationRelation = heap_openr(RelationRelationName, RowExclusiveLock);
924         tuple = SearchSysCacheCopy(RELOID,
925                                                            ObjectIdGetDatum(relationId),
926                                                            0, 0, 0);
927         if (!HeapTupleIsValid(tuple))
928                 elog(ERROR, "setRelhassubclassInRelation: cache lookup failed for relation %u", relationId);
929
930         ((Form_pg_class) GETSTRUCT(tuple))->relhassubclass = relhassubclass;
931         simple_heap_update(relationRelation, &tuple->t_self, tuple);
932
933         /* keep the catalog indices up to date */
934         CatalogOpenIndices(Num_pg_class_indices, Name_pg_class_indices, idescs);
935         CatalogIndexInsert(idescs, Num_pg_class_indices, relationRelation, tuple);
936         CatalogCloseIndices(Num_pg_class_indices, idescs);
937
938         heap_freetuple(tuple);
939         heap_close(relationRelation, RowExclusiveLock);
940 }