1 /*-------------------------------------------------------------------------
4 * Commands for creating and altering table structures and settings
6 * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.176 2005/11/22 18:17:09 momjian Exp $
13 *-------------------------------------------------------------------------
17 #include "access/genam.h"
18 #include "access/tuptoaster.h"
19 #include "catalog/catalog.h"
20 #include "catalog/dependency.h"
21 #include "catalog/heap.h"
22 #include "catalog/index.h"
23 #include "catalog/indexing.h"
24 #include "catalog/namespace.h"
25 #include "catalog/pg_constraint.h"
26 #include "catalog/pg_depend.h"
27 #include "catalog/pg_inherits.h"
28 #include "catalog/pg_namespace.h"
29 #include "catalog/pg_opclass.h"
30 #include "catalog/pg_trigger.h"
31 #include "catalog/pg_type.h"
32 #include "commands/cluster.h"
33 #include "commands/defrem.h"
34 #include "commands/tablecmds.h"
35 #include "commands/tablespace.h"
36 #include "commands/trigger.h"
37 #include "commands/typecmds.h"
38 #include "executor/executor.h"
39 #include "lib/stringinfo.h"
40 #include "miscadmin.h"
41 #include "nodes/makefuncs.h"
42 #include "optimizer/clauses.h"
43 #include "optimizer/plancat.h"
44 #include "optimizer/prep.h"
45 #include "parser/analyze.h"
46 #include "parser/gramparse.h"
47 #include "parser/parser.h"
48 #include "parser/parse_clause.h"
49 #include "parser/parse_coerce.h"
50 #include "parser/parse_expr.h"
51 #include "parser/parse_oper.h"
52 #include "parser/parse_relation.h"
53 #include "parser/parse_type.h"
54 #include "rewrite/rewriteHandler.h"
55 #include "storage/smgr.h"
56 #include "utils/acl.h"
57 #include "utils/builtins.h"
58 #include "utils/fmgroids.h"
59 #include "utils/inval.h"
60 #include "utils/lsyscache.h"
61 #include "utils/memutils.h"
62 #include "utils/relcache.h"
63 #include "utils/syscache.h"
67 * ON COMMIT action list
69 typedef struct OnCommitItem
71 Oid relid; /* relid of relation */
72 OnCommitAction oncommit; /* what to do at end of xact */
75 * If this entry was created during the current transaction,
76 * creating_subid is the ID of the creating subxact; if created in a prior
77 * transaction, creating_subid is zero. If deleted during the current
78 * transaction, deleting_subid is the ID of the deleting subxact; if no
79 * deletion request is pending, deleting_subid is zero.
81 SubTransactionId creating_subid;
82 SubTransactionId deleting_subid;
85 static List *on_commits = NIL;
89 * State information for ALTER TABLE
91 * The pending-work queue for an ALTER TABLE is a List of AlteredTableInfo
92 * structs, one for each table modified by the operation (the named table
93 * plus any child tables that are affected). We save lists of subcommands
94 * to apply to this table (possibly modified by parse transformation steps);
95 * these lists will be executed in Phase 2. If a Phase 3 step is needed,
96 * necessary information is stored in the constraints and newvals lists.
98 * Phase 2 is divided into multiple passes; subcommands are executed in
99 * a pass determined by subcommand type.
102 #define AT_PASS_DROP 0 /* DROP (all flavors) */
103 #define AT_PASS_ALTER_TYPE 1 /* ALTER COLUMN TYPE */
104 #define AT_PASS_OLD_INDEX 2 /* re-add existing indexes */
105 #define AT_PASS_OLD_CONSTR 3 /* re-add existing constraints */
106 #define AT_PASS_COL_ATTRS 4 /* set other column attributes */
107 /* We could support a RENAME COLUMN pass here, but not currently used */
108 #define AT_PASS_ADD_COL 5 /* ADD COLUMN */
109 #define AT_PASS_ADD_INDEX 6 /* ADD indexes */
110 #define AT_PASS_ADD_CONSTR 7 /* ADD constraints, defaults */
111 #define AT_PASS_MISC 8 /* other stuff */
112 #define AT_NUM_PASSES 9
114 typedef struct AlteredTableInfo
116 /* Information saved before any work commences: */
117 Oid relid; /* Relation to work on */
118 char relkind; /* Its relkind */
119 TupleDesc oldDesc; /* Pre-modification tuple descriptor */
120 /* Information saved by Phase 1 for Phase 2: */
121 List *subcmds[AT_NUM_PASSES]; /* Lists of AlterTableCmd */
122 /* Information saved by Phases 1/2 for Phase 3: */
123 List *constraints; /* List of NewConstraint */
124 List *newvals; /* List of NewColumnValue */
125 Oid newTableSpace; /* new tablespace; 0 means no change */
126 /* Objects to rebuild after completing ALTER TYPE operations */
127 List *changedConstraintOids; /* OIDs of constraints to rebuild */
128 List *changedConstraintDefs; /* string definitions of same */
129 List *changedIndexOids; /* OIDs of indexes to rebuild */
130 List *changedIndexDefs; /* string definitions of same */
133 /* Struct describing one new constraint to check in Phase 3 scan */
134 typedef struct NewConstraint
136 char *name; /* Constraint name, or NULL if none */
137 ConstrType contype; /* CHECK, NOT_NULL, or FOREIGN */
138 AttrNumber attnum; /* only relevant for NOT_NULL */
139 Oid refrelid; /* PK rel, if FOREIGN */
140 Node *qual; /* Check expr or FkConstraint struct */
141 List *qualstate; /* Execution state for CHECK */
145 * Struct describing one new column value that needs to be computed during
146 * Phase 3 copy (this could be either a new column with a non-null default, or
147 * a column that we're changing the type of). Columns without such an entry
148 * are just copied from the old table during ATRewriteTable. Note that the
149 * expr is an expression over *old* table values.
151 typedef struct NewColumnValue
153 AttrNumber attnum; /* which column */
154 Expr *expr; /* expression to compute */
155 ExprState *exprstate; /* execution state */
159 static List *MergeAttributes(List *schema, List *supers, bool istemp,
160 List **supOids, List **supconstr, int *supOidCount);
161 static bool change_varattnos_of_a_node(Node *node, const AttrNumber *newattno);
162 static void StoreCatalogInheritance(Oid relationId, List *supers);
163 static int findAttrByName(const char *attributeName, List *schema);
164 static void setRelhassubclassInRelation(Oid relationId, bool relhassubclass);
165 static bool needs_toast_table(Relation rel);
166 static void AlterIndexNamespaces(Relation classRel, Relation rel,
167 Oid oldNspOid, Oid newNspOid);
168 static void AlterSeqNamespaces(Relation classRel, Relation rel,
169 Oid oldNspOid, Oid newNspOid,
170 const char *newNspName);
171 static int transformColumnNameList(Oid relId, List *colList,
172 int16 *attnums, Oid *atttypids);
173 static int transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
175 int16 *attnums, Oid *atttypids,
177 static Oid transformFkeyCheckAttrs(Relation pkrel,
178 int numattrs, int16 *attnums,
180 static void validateForeignKeyConstraint(FkConstraint *fkconstraint,
181 Relation rel, Relation pkrel);
182 static void createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint,
184 static char *fkMatchTypeToString(char match_type);
185 static void ATController(Relation rel, List *cmds, bool recurse);
186 static void ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
187 bool recurse, bool recursing);
188 static void ATRewriteCatalogs(List **wqueue);
189 static void ATExecCmd(AlteredTableInfo *tab, Relation rel, AlterTableCmd *cmd);
190 static void ATRewriteTables(List **wqueue);
191 static void ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap);
192 static AlteredTableInfo *ATGetQueueEntry(List **wqueue, Relation rel);
193 static void ATSimplePermissions(Relation rel, bool allowView);
194 static void ATSimpleRecursion(List **wqueue, Relation rel,
195 AlterTableCmd *cmd, bool recurse);
196 static void ATOneLevelRecursion(List **wqueue, Relation rel,
198 static void find_composite_type_dependencies(Oid typeOid,
199 const char *origTblName);
200 static void ATPrepAddColumn(List **wqueue, Relation rel, bool recurse,
202 static void ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
204 static void add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid);
205 static void add_column_support_dependency(Oid relid, int32 attnum,
207 static void ATExecDropNotNull(Relation rel, const char *colName);
208 static void ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
209 const char *colName);
210 static void ATExecColumnDefault(Relation rel, const char *colName,
212 static void ATPrepSetStatistics(Relation rel, const char *colName,
214 static void ATExecSetStatistics(Relation rel, const char *colName,
216 static void ATExecSetStorage(Relation rel, const char *colName,
218 static void ATExecDropColumn(Relation rel, const char *colName,
219 DropBehavior behavior,
220 bool recurse, bool recursing);
221 static void ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
222 IndexStmt *stmt, bool is_rebuild);
223 static void ATExecAddConstraint(AlteredTableInfo *tab, Relation rel,
224 Node *newConstraint);
225 static void ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
226 FkConstraint *fkconstraint);
227 static void ATPrepDropConstraint(List **wqueue, Relation rel,
228 bool recurse, AlterTableCmd *cmd);
229 static void ATExecDropConstraint(Relation rel, const char *constrName,
230 DropBehavior behavior, bool quiet);
231 static void ATPrepAlterColumnType(List **wqueue,
232 AlteredTableInfo *tab, Relation rel,
233 bool recurse, bool recursing,
235 static void ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
236 const char *colName, TypeName *typename);
237 static void ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab);
238 static void ATPostAlterTypeParse(char *cmd, List **wqueue);
239 static void change_owner_recurse_to_sequences(Oid relationOid,
241 static void ATExecClusterOn(Relation rel, const char *indexName);
242 static void ATExecDropCluster(Relation rel);
243 static void ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel,
244 char *tablespacename);
245 static void ATExecSetTableSpace(Oid tableOid, Oid newTableSpace);
246 static void ATExecEnableDisableTrigger(Relation rel, char *trigname,
247 bool enable, bool skip_system);
248 static void copy_relation_data(Relation rel, SMgrRelation dst);
249 static void update_ri_trigger_args(Oid relid,
253 bool update_relname);
256 /* ----------------------------------------------------------------
258 * Creates a new relation.
260 * If successful, returns the OID of the new relation.
261 * ----------------------------------------------------------------
264 DefineRelation(CreateStmt *stmt, char relkind)
266 char relname[NAMEDATALEN];
268 List *schema = stmt->tableElts;
272 TupleDesc descriptor;
274 List *old_constraints;
283 * Truncate relname to appropriate length (probably a waste of time, as
284 * parser should have done this already).
286 StrNCpy(relname, stmt->relation->relname, NAMEDATALEN);
289 * Check consistency of arguments
291 if (stmt->oncommit != ONCOMMIT_NOOP && !stmt->relation->istemp)
293 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
294 errmsg("ON COMMIT can only be used on temporary tables")));
297 * Look up the namespace in which we are supposed to create the relation.
298 * Check we have permission to create there. Skip check if bootstrapping,
299 * since permissions machinery may not be working yet.
301 namespaceId = RangeVarGetCreationNamespace(stmt->relation);
303 if (!IsBootstrapProcessingMode())
307 aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(),
309 if (aclresult != ACLCHECK_OK)
310 aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
311 get_namespace_name(namespaceId));
315 * Select tablespace to use. If not specified, use default_tablespace
316 * (which may in turn default to database's default).
318 if (stmt->tablespacename)
320 tablespaceId = get_tablespace_oid(stmt->tablespacename);
321 if (!OidIsValid(tablespaceId))
323 (errcode(ERRCODE_UNDEFINED_OBJECT),
324 errmsg("tablespace \"%s\" does not exist",
325 stmt->tablespacename)));
329 tablespaceId = GetDefaultTablespace();
330 /* note InvalidOid is OK in this case */
333 /* Check permissions except when using database's default */
334 if (OidIsValid(tablespaceId))
338 aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(),
340 if (aclresult != ACLCHECK_OK)
341 aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
342 get_tablespace_name(tablespaceId));
346 * Look up inheritance ancestors and generate relation schema, including
347 * inherited attributes.
349 schema = MergeAttributes(schema, stmt->inhRelations,
350 stmt->relation->istemp,
351 &inheritOids, &old_constraints, &parentOidCount);
354 * Create a relation descriptor from the relation schema and create the
355 * relation. Note that in this stage only inherited (pre-cooked) defaults
356 * and constraints will be included into the new relation.
357 * (BuildDescForRelation takes care of the inherited defaults, but we have
358 * to copy inherited constraints here.)
360 descriptor = BuildDescForRelation(schema);
362 localHasOids = interpretOidsOption(stmt->hasoids);
363 descriptor->tdhasoid = (localHasOids || parentOidCount > 0);
365 if (old_constraints != NIL)
367 ConstrCheck *check = (ConstrCheck *)
368 palloc0(list_length(old_constraints) * sizeof(ConstrCheck));
371 foreach(listptr, old_constraints)
373 Constraint *cdef = (Constraint *) lfirst(listptr);
376 if (cdef->contype != CONSTR_CHECK)
378 Assert(cdef->name != NULL);
379 Assert(cdef->raw_expr == NULL && cdef->cooked_expr != NULL);
382 * In multiple-inheritance situations, it's possible to inherit
383 * the same grandparent constraint through multiple parents.
384 * Hence, discard inherited constraints that match as to both name
385 * and expression. Otherwise, gripe if the names conflict.
387 for (i = 0; i < ncheck; i++)
389 if (strcmp(check[i].ccname, cdef->name) != 0)
391 if (strcmp(check[i].ccbin, cdef->cooked_expr) == 0)
397 (errcode(ERRCODE_DUPLICATE_OBJECT),
398 errmsg("duplicate check constraint name \"%s\"",
403 check[ncheck].ccname = cdef->name;
404 check[ncheck].ccbin = pstrdup(cdef->cooked_expr);
410 if (descriptor->constr == NULL)
412 descriptor->constr = (TupleConstr *) palloc(sizeof(TupleConstr));
413 descriptor->constr->defval = NULL;
414 descriptor->constr->num_defval = 0;
415 descriptor->constr->has_not_null = false;
417 descriptor->constr->num_check = ncheck;
418 descriptor->constr->check = check;
422 relationId = heap_create_with_catalog(relname,
433 allowSystemTableMods);
435 StoreCatalogInheritance(relationId, inheritOids);
438 * We must bump the command counter to make the newly-created relation
439 * tuple visible for opening.
441 CommandCounterIncrement();
444 * Open the new relation and acquire exclusive lock on it. This isn't
445 * really necessary for locking out other backends (since they can't see
446 * the new rel anyway until we commit), but it keeps the lock manager from
447 * complaining about deadlock risks.
449 rel = relation_open(relationId, AccessExclusiveLock);
452 * Now add any newly specified column default values and CHECK constraints
453 * to the new relation. These are passed to us in the form of raw
454 * parsetrees; we need to transform them to executable expression trees
455 * before they can be added. The most convenient way to do that is to
456 * apply the parser's transformExpr routine, but transformExpr doesn't
457 * work unless we have a pre-existing relation. So, the transformation has
458 * to be postponed to this final step of CREATE TABLE.
460 * Another task that's conveniently done at this step is to add dependency
461 * links between columns and supporting relations (such as SERIAL
464 * First, scan schema to find new column defaults.
469 foreach(listptr, schema)
471 ColumnDef *colDef = lfirst(listptr);
475 if (colDef->raw_default != NULL)
477 RawColumnDefault *rawEnt;
479 Assert(colDef->cooked_default == NULL);
481 rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault));
482 rawEnt->attnum = attnum;
483 rawEnt->raw_default = colDef->raw_default;
484 rawDefaults = lappend(rawDefaults, rawEnt);
487 /* Create dependency for supporting relation for this column */
488 if (colDef->support != NULL)
489 add_column_support_dependency(relationId, attnum, colDef->support);
493 * Parse and add the defaults/constraints, if any.
495 if (rawDefaults || stmt->constraints)
496 AddRelationRawConstraints(rel, rawDefaults, stmt->constraints);
499 * Clean up. We keep lock on new relation (although it shouldn't be
500 * visible to anyone else anyway, until commit).
502 relation_close(rel, NoLock);
509 * Deletes a relation.
512 RemoveRelation(const RangeVar *relation, DropBehavior behavior)
515 ObjectAddress object;
517 relOid = RangeVarGetRelid(relation, false);
519 object.classId = RelationRelationId;
520 object.objectId = relOid;
521 object.objectSubId = 0;
523 performDeletion(&object, behavior);
528 * Executes a TRUNCATE command.
530 * This is a multi-relation truncate. It first opens and grabs exclusive
531 * locks on all relations involved, checking permissions and otherwise
532 * verifying that the relation is OK for truncation. When they are all
533 * open, it checks foreign key references on them, namely that FK references
534 * are all internal to the group that's being truncated. Finally all
535 * relations are truncated and reindexed.
538 ExecuteTruncate(List *relations)
543 foreach(cell, relations)
545 RangeVar *rv = lfirst(cell);
548 /* Grab exclusive lock in preparation for truncate */
549 rel = heap_openrv(rv, AccessExclusiveLock);
551 /* Only allow truncate on regular tables */
552 if (rel->rd_rel->relkind != RELKIND_RELATION)
554 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
555 errmsg("\"%s\" is not a table",
556 RelationGetRelationName(rel))));
558 /* Permissions checks */
559 if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
560 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
561 RelationGetRelationName(rel));
563 if (!allowSystemTableMods && IsSystemRelation(rel))
565 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
566 errmsg("permission denied: \"%s\" is a system catalog",
567 RelationGetRelationName(rel))));
570 * We can never allow truncation of shared or nailed-in-cache
571 * relations, because we can't support changing their relfilenode
574 if (rel->rd_rel->relisshared || rel->rd_isnailed)
576 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
577 errmsg("cannot truncate system relation \"%s\"",
578 RelationGetRelationName(rel))));
581 * Don't allow truncate on temp tables of other backends ... their
582 * local buffer manager is not going to cope.
584 if (isOtherTempNamespace(RelationGetNamespace(rel)))
586 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
587 errmsg("cannot truncate temporary tables of other sessions")));
589 /* Save it into the list of rels to truncate */
590 rels = lappend(rels, rel);
594 * Check foreign key references.
596 heap_truncate_check_FKs(rels, false);
599 * OK, truncate each table.
603 Relation rel = lfirst(cell);
608 * Create a new empty storage file for the relation, and assign it as
609 * the relfilenode value. The old storage file is scheduled for
610 * deletion at commit.
612 setNewRelfilenode(rel);
614 heap_relid = RelationGetRelid(rel);
615 toast_relid = rel->rd_rel->reltoastrelid;
617 heap_close(rel, NoLock);
620 * The same for the toast table, if any.
622 if (OidIsValid(toast_relid))
624 rel = relation_open(toast_relid, AccessExclusiveLock);
625 setNewRelfilenode(rel);
626 heap_close(rel, NoLock);
630 * Reconstruct the indexes to match, and we're done.
632 reindex_relation(heap_relid, true);
638 * Returns new schema given initial schema and superclasses.
641 * 'schema' is the column/attribute definition for the table. (It's a list
642 * of ColumnDef's.) It is destructively changed.
643 * 'supers' is a list of names (as RangeVar nodes) of parent relations.
644 * 'istemp' is TRUE if we are creating a temp relation.
647 * 'supOids' receives a list of the OIDs of the parent relations.
648 * 'supconstr' receives a list of constraints belonging to the parents,
649 * updated as necessary to be valid for the child.
650 * 'supOidCount' is set to the number of parents that have OID columns.
653 * Completed schema list.
656 * The order in which the attributes are inherited is very important.
657 * Intuitively, the inherited attributes should come first. If a table
658 * inherits from multiple parents, the order of those attributes are
659 * according to the order of the parents specified in CREATE TABLE.
663 * create table person (name text, age int4, location point);
664 * create table emp (salary int4, manager text) inherits(person);
665 * create table student (gpa float8) inherits (person);
666 * create table stud_emp (percent int4) inherits (emp, student);
668 * The order of the attributes of stud_emp is:
670 * person {1:name, 2:age, 3:location}
672 * {6:gpa} student emp {4:salary, 5:manager}
674 * stud_emp {7:percent}
676 * If the same attribute name appears multiple times, then it appears
677 * in the result table in the proper location for its first appearance.
679 * Constraints (including NOT NULL constraints) for the child table
680 * are the union of all relevant constraints, from both the child schema
683 * The default value for a child column is defined as:
684 * (1) If the child schema specifies a default, that value is used.
685 * (2) If neither the child nor any parent specifies a default, then
686 * the column will not have a default.
687 * (3) If conflicting defaults are inherited from different parents
688 * (and not overridden by the child), an error is raised.
689 * (4) Otherwise the inherited default is used.
690 * Rule (3) is new in Postgres 7.1; in earlier releases you got a
691 * rather arbitrary choice of which parent default to use.
695 MergeAttributes(List *schema, List *supers, bool istemp,
696 List **supOids, List **supconstr, int *supOidCount)
699 List *inhSchema = NIL;
700 List *parentOids = NIL;
701 List *constraints = NIL;
702 int parentsWithOids = 0;
703 bool have_bogus_defaults = false;
704 char *bogus_marker = "Bogus!"; /* marks conflicting defaults */
708 * Check for and reject tables with too many columns. We perform this
709 * check relatively early for two reasons: (a) we don't run the risk of
710 * overflowing an AttrNumber in subsequent code (b) an O(n^2) algorithm is
711 * okay if we're processing <= 1600 columns, but could take minutes to
712 * execute if the user attempts to create a table with hundreds of
713 * thousands of columns.
715 * Note that we also need to check that any we do not exceed this figure
716 * after including columns from inherited relations.
718 if (list_length(schema) > MaxHeapAttributeNumber)
720 (errcode(ERRCODE_TOO_MANY_COLUMNS),
721 errmsg("tables can have at most %d columns",
722 MaxHeapAttributeNumber)));
725 * Check for duplicate names in the explicit list of attributes.
727 * Although we might consider merging such entries in the same way that we
728 * handle name conflicts for inherited attributes, it seems to make more
729 * sense to assume such conflicts are errors.
731 foreach(entry, schema)
733 ColumnDef *coldef = lfirst(entry);
736 for_each_cell(rest, lnext(entry))
738 ColumnDef *restdef = lfirst(rest);
740 if (strcmp(coldef->colname, restdef->colname) == 0)
742 (errcode(ERRCODE_DUPLICATE_COLUMN),
743 errmsg("column \"%s\" duplicated",
749 * Scan the parents left-to-right, and merge their attributes to form a
750 * list of inherited attributes (inhSchema). Also check to see if we need
751 * to inherit an OID column.
754 foreach(entry, supers)
756 RangeVar *parent = (RangeVar *) lfirst(entry);
760 AttrNumber *newattno;
761 AttrNumber parent_attno;
763 relation = heap_openrv(parent, AccessShareLock);
765 if (relation->rd_rel->relkind != RELKIND_RELATION)
767 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
768 errmsg("inherited relation \"%s\" is not a table",
770 /* Permanent rels cannot inherit from temporary ones */
771 if (!istemp && isTempNamespace(RelationGetNamespace(relation)))
773 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
774 errmsg("cannot inherit from temporary relation \"%s\"",
778 * We should have an UNDER permission flag for this, but for now,
779 * demand that creator of a child table own the parent.
781 if (!pg_class_ownercheck(RelationGetRelid(relation), GetUserId()))
782 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
783 RelationGetRelationName(relation));
786 * Reject duplications in the list of parents.
788 if (list_member_oid(parentOids, RelationGetRelid(relation)))
790 (errcode(ERRCODE_DUPLICATE_TABLE),
791 errmsg("inherited relation \"%s\" duplicated",
794 parentOids = lappend_oid(parentOids, RelationGetRelid(relation));
796 if (relation->rd_rel->relhasoids)
799 tupleDesc = RelationGetDescr(relation);
800 constr = tupleDesc->constr;
803 * newattno[] will contain the child-table attribute numbers for the
804 * attributes of this parent table. (They are not the same for
805 * parents after the first one, nor if we have dropped columns.)
807 newattno = (AttrNumber *)
808 palloc(tupleDesc->natts * sizeof(AttrNumber));
810 for (parent_attno = 1; parent_attno <= tupleDesc->natts;
813 Form_pg_attribute attribute = tupleDesc->attrs[parent_attno - 1];
814 char *attributeName = NameStr(attribute->attname);
820 * Ignore dropped columns in the parent.
822 if (attribute->attisdropped)
825 * change_varattnos_of_a_node asserts that this is greater
826 * than zero, so if anything tries to use it, we should find
829 newattno[parent_attno - 1] = 0;
834 * Does it conflict with some previously inherited column?
836 exist_attno = findAttrByName(attributeName, inhSchema);
840 * Yes, try to merge the two column definitions. They must
841 * have the same type and typmod.
844 (errmsg("merging multiple inherited definitions of column \"%s\"",
846 def = (ColumnDef *) list_nth(inhSchema, exist_attno - 1);
847 if (typenameTypeId(def->typename) != attribute->atttypid ||
848 def->typename->typmod != attribute->atttypmod)
850 (errcode(ERRCODE_DATATYPE_MISMATCH),
851 errmsg("inherited column \"%s\" has a type conflict",
853 errdetail("%s versus %s",
854 TypeNameToString(def->typename),
855 format_type_be(attribute->atttypid))));
857 /* Merge of NOT NULL constraints = OR 'em together */
858 def->is_not_null |= attribute->attnotnull;
859 /* Default and other constraints are handled below */
860 newattno[parent_attno - 1] = exist_attno;
865 * No, create a new inherited column
867 def = makeNode(ColumnDef);
868 def->colname = pstrdup(attributeName);
869 typename = makeNode(TypeName);
870 typename->typeid = attribute->atttypid;
871 typename->typmod = attribute->atttypmod;
872 def->typename = typename;
874 def->is_local = false;
875 def->is_not_null = attribute->attnotnull;
876 def->raw_default = NULL;
877 def->cooked_default = NULL;
878 def->constraints = NIL;
880 inhSchema = lappend(inhSchema, def);
881 newattno[parent_attno - 1] = ++child_attno;
885 * Copy default if any
887 if (attribute->atthasdef)
889 char *this_default = NULL;
890 AttrDefault *attrdef;
893 /* Find default in constraint structure */
894 Assert(constr != NULL);
895 attrdef = constr->defval;
896 for (i = 0; i < constr->num_defval; i++)
898 if (attrdef[i].adnum == parent_attno)
900 this_default = attrdef[i].adbin;
904 Assert(this_default != NULL);
907 * If default expr could contain any vars, we'd need to fix
908 * 'em, but it can't; so default is ready to apply to child.
910 * If we already had a default from some prior parent, check
911 * to see if they are the same. If so, no problem; if not,
912 * mark the column as having a bogus default. Below, we will
913 * complain if the bogus default isn't overridden by the child
916 Assert(def->raw_default == NULL);
917 if (def->cooked_default == NULL)
918 def->cooked_default = pstrdup(this_default);
919 else if (strcmp(def->cooked_default, this_default) != 0)
921 def->cooked_default = bogus_marker;
922 have_bogus_defaults = true;
928 * Now copy the constraints of this parent, adjusting attnos using the
929 * completed newattno[] map
931 if (constr && constr->num_check > 0)
933 ConstrCheck *check = constr->check;
936 for (i = 0; i < constr->num_check; i++)
938 Constraint *cdef = makeNode(Constraint);
941 cdef->contype = CONSTR_CHECK;
942 cdef->name = pstrdup(check[i].ccname);
943 cdef->raw_expr = NULL;
944 /* adjust varattnos of ccbin here */
945 expr = stringToNode(check[i].ccbin);
946 change_varattnos_of_a_node(expr, newattno);
947 cdef->cooked_expr = nodeToString(expr);
948 constraints = lappend(constraints, cdef);
955 * Close the parent rel, but keep our AccessShareLock on it until xact
956 * commit. That will prevent someone else from deleting or ALTERing
957 * the parent before the child is committed.
959 heap_close(relation, NoLock);
963 * If we had no inherited attributes, the result schema is just the
964 * explicitly declared columns. Otherwise, we need to merge the declared
965 * columns into the inherited schema list.
967 if (inhSchema != NIL)
969 foreach(entry, schema)
971 ColumnDef *newdef = lfirst(entry);
972 char *attributeName = newdef->colname;
976 * Does it conflict with some previously inherited column?
978 exist_attno = findAttrByName(attributeName, inhSchema);
984 * Yes, try to merge the two column definitions. They must
985 * have the same type and typmod.
988 (errmsg("merging column \"%s\" with inherited definition",
990 def = (ColumnDef *) list_nth(inhSchema, exist_attno - 1);
991 if (typenameTypeId(def->typename) != typenameTypeId(newdef->typename) ||
992 def->typename->typmod != newdef->typename->typmod)
994 (errcode(ERRCODE_DATATYPE_MISMATCH),
995 errmsg("column \"%s\" has a type conflict",
997 errdetail("%s versus %s",
998 TypeNameToString(def->typename),
999 TypeNameToString(newdef->typename))));
1000 /* Mark the column as locally defined */
1001 def->is_local = true;
1002 /* Merge of NOT NULL constraints = OR 'em together */
1003 def->is_not_null |= newdef->is_not_null;
1004 /* If new def has a default, override previous default */
1005 if (newdef->raw_default != NULL)
1007 def->raw_default = newdef->raw_default;
1008 def->cooked_default = newdef->cooked_default;
1014 * No, attach new column to result schema
1016 inhSchema = lappend(inhSchema, newdef);
1023 * Check that we haven't exceeded the legal # of columns after merging
1024 * in inherited columns.
1026 if (list_length(schema) > MaxHeapAttributeNumber)
1028 (errcode(ERRCODE_TOO_MANY_COLUMNS),
1029 errmsg("tables can have at most %d columns",
1030 MaxHeapAttributeNumber)));
1034 * If we found any conflicting parent default values, check to make sure
1035 * they were overridden by the child.
1037 if (have_bogus_defaults)
1039 foreach(entry, schema)
1041 ColumnDef *def = lfirst(entry);
1043 if (def->cooked_default == bogus_marker)
1045 (errcode(ERRCODE_INVALID_COLUMN_DEFINITION),
1046 errmsg("column \"%s\" inherits conflicting default values",
1048 errhint("To resolve the conflict, specify a default explicitly.")));
1052 *supOids = parentOids;
1053 *supconstr = constraints;
1054 *supOidCount = parentsWithOids;
1059 * complementary static functions for MergeAttributes().
1061 * Varattnos of pg_constraint.conbin must be rewritten when subclasses inherit
1062 * constraints from parent classes, since the inherited attributes could
1063 * be given different column numbers in multiple-inheritance cases.
1065 * Note that the passed node tree is modified in place!
1068 change_varattnos_walker(Node *node, const AttrNumber *newattno)
1074 Var *var = (Var *) node;
1076 if (var->varlevelsup == 0 && var->varno == 1 &&
1080 * ??? the following may be a problem when the node is multiply
1081 * referenced though stringToNode() doesn't create such a node
1084 Assert(newattno[var->varattno - 1] > 0);
1085 var->varattno = newattno[var->varattno - 1];
1089 return expression_tree_walker(node, change_varattnos_walker,
1094 change_varattnos_of_a_node(Node *node, const AttrNumber *newattno)
1096 return change_varattnos_walker(node, newattno);
1100 * StoreCatalogInheritance
1101 * Updates the system catalogs with proper inheritance information.
1103 * supers is a list of the OIDs of the new relation's direct ancestors.
1106 StoreCatalogInheritance(Oid relationId, List *supers)
1117 AssertArg(OidIsValid(relationId));
1123 * Store INHERITS information in pg_inherits using direct ancestors only.
1124 * Also enter dependencies on the direct ancestors, and make sure they are
1125 * marked with relhassubclass = true.
1127 * (Once upon a time, both direct and indirect ancestors were found here
1128 * and then entered into pg_ipl. Since that catalog doesn't exist
1129 * anymore, there's no need to look for indirect ancestors.)
1131 relation = heap_open(InheritsRelationId, RowExclusiveLock);
1132 desc = RelationGetDescr(relation);
1135 foreach(entry, supers)
1137 Oid parentOid = lfirst_oid(entry);
1138 Datum datum[Natts_pg_inherits];
1139 char nullarr[Natts_pg_inherits];
1140 ObjectAddress childobject,
1143 datum[0] = ObjectIdGetDatum(relationId); /* inhrel */
1144 datum[1] = ObjectIdGetDatum(parentOid); /* inhparent */
1145 datum[2] = Int16GetDatum(seqNumber); /* inhseqno */
1151 tuple = heap_formtuple(desc, datum, nullarr);
1153 simple_heap_insert(relation, tuple);
1155 CatalogUpdateIndexes(relation, tuple);
1157 heap_freetuple(tuple);
1160 * Store a dependency too
1162 parentobject.classId = RelationRelationId;
1163 parentobject.objectId = parentOid;
1164 parentobject.objectSubId = 0;
1165 childobject.classId = RelationRelationId;
1166 childobject.objectId = relationId;
1167 childobject.objectSubId = 0;
1169 recordDependencyOn(&childobject, &parentobject, DEPENDENCY_NORMAL);
1172 * Mark the parent as having subclasses.
1174 setRelhassubclassInRelation(parentOid, true);
1179 heap_close(relation, RowExclusiveLock);
1183 * Look for an existing schema entry with the given name.
1185 * Returns the index (starting with 1) if attribute already exists in schema,
1189 findAttrByName(const char *attributeName, List *schema)
1196 ColumnDef *def = lfirst(s);
1198 if (strcmp(attributeName, def->colname) == 0)
1207 * Update a relation's pg_class.relhassubclass entry to the given value
1210 setRelhassubclassInRelation(Oid relationId, bool relhassubclass)
1212 Relation relationRelation;
1214 Form_pg_class classtuple;
1217 * Fetch a modifiable copy of the tuple, modify it, update pg_class.
1219 * If the tuple already has the right relhassubclass setting, we don't
1220 * need to update it, but we still need to issue an SI inval message.
1222 relationRelation = heap_open(RelationRelationId, RowExclusiveLock);
1223 tuple = SearchSysCacheCopy(RELOID,
1224 ObjectIdGetDatum(relationId),
1226 if (!HeapTupleIsValid(tuple))
1227 elog(ERROR, "cache lookup failed for relation %u", relationId);
1228 classtuple = (Form_pg_class) GETSTRUCT(tuple);
1230 if (classtuple->relhassubclass != relhassubclass)
1232 classtuple->relhassubclass = relhassubclass;
1233 simple_heap_update(relationRelation, &tuple->t_self, tuple);
1235 /* keep the catalog indexes up to date */
1236 CatalogUpdateIndexes(relationRelation, tuple);
1240 /* no need to change tuple, but force relcache rebuild anyway */
1241 CacheInvalidateRelcacheByTuple(tuple);
1244 heap_freetuple(tuple);
1245 heap_close(relationRelation, RowExclusiveLock);
1250 * renameatt - changes the name of a attribute in a relation
1252 * Attname attribute is changed in attribute catalog.
1253 * No record of the previous attname is kept (correct?).
1255 * get proper relrelation from relation catalog (if not arg)
1256 * scan attribute catalog
1257 * for name conflict (within rel)
1258 * for original attribute (if not arg)
1259 * modify attname in attribute tuple
1260 * insert modified attribute in attribute catalog
1261 * delete original attribute from attribute catalog
1264 renameatt(Oid myrelid,
1265 const char *oldattname,
1266 const char *newattname,
1270 Relation targetrelation;
1271 Relation attrelation;
1273 Form_pg_attribute attform;
1276 ListCell *indexoidscan;
1279 * Grab an exclusive lock on the target table, which we will NOT release
1280 * until end of transaction.
1282 targetrelation = relation_open(myrelid, AccessExclusiveLock);
1285 * permissions checking. this would normally be done in utility.c, but
1286 * this particular routine is recursive.
1288 * normally, only the owner of a class can change its schema.
1290 if (!pg_class_ownercheck(myrelid, GetUserId()))
1291 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
1292 RelationGetRelationName(targetrelation));
1293 if (!allowSystemTableMods && IsSystemRelation(targetrelation))
1295 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1296 errmsg("permission denied: \"%s\" is a system catalog",
1297 RelationGetRelationName(targetrelation))));
1300 * if the 'recurse' flag is set then we are supposed to rename this
1301 * attribute in all classes that inherit from 'relname' (as well as in
1304 * any permissions or problems with duplicate attributes will cause the
1305 * whole transaction to abort, which is what we want -- all or nothing.
1312 /* this routine is actually in the planner */
1313 children = find_all_inheritors(myrelid);
1316 * find_all_inheritors does the recursive search of the inheritance
1317 * hierarchy, so all we have to do is process all of the relids in the
1318 * list that it returns.
1320 foreach(child, children)
1322 Oid childrelid = lfirst_oid(child);
1324 if (childrelid == myrelid)
1326 /* note we need not recurse again */
1327 renameatt(childrelid, oldattname, newattname, false, true);
1333 * If we are told not to recurse, there had better not be any child
1334 * tables; else the rename would put them out of step.
1337 find_inheritance_children(myrelid) != NIL)
1339 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
1340 errmsg("inherited column \"%s\" must be renamed in child tables too",
1344 attrelation = heap_open(AttributeRelationId, RowExclusiveLock);
1346 atttup = SearchSysCacheCopyAttName(myrelid, oldattname);
1347 if (!HeapTupleIsValid(atttup))
1349 (errcode(ERRCODE_UNDEFINED_COLUMN),
1350 errmsg("column \"%s\" does not exist",
1352 attform = (Form_pg_attribute) GETSTRUCT(atttup);
1354 attnum = attform->attnum;
1357 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1358 errmsg("cannot rename system column \"%s\"",
1362 * if the attribute is inherited, forbid the renaming, unless we are
1363 * already inside a recursive rename.
1365 if (attform->attinhcount > 0 && !recursing)
1367 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
1368 errmsg("cannot rename inherited column \"%s\"",
1371 /* should not already exist */
1372 /* this test is deliberately not attisdropped-aware */
1373 if (SearchSysCacheExists(ATTNAME,
1374 ObjectIdGetDatum(myrelid),
1375 PointerGetDatum(newattname),
1378 (errcode(ERRCODE_DUPLICATE_COLUMN),
1379 errmsg("column \"%s\" of relation \"%s\" already exists",
1380 newattname, RelationGetRelationName(targetrelation))));
1382 namestrcpy(&(attform->attname), newattname);
1384 simple_heap_update(attrelation, &atttup->t_self, atttup);
1386 /* keep system catalog indexes current */
1387 CatalogUpdateIndexes(attrelation, atttup);
1389 heap_freetuple(atttup);
1392 * Update column names of indexes that refer to the column being renamed.
1394 indexoidlist = RelationGetIndexList(targetrelation);
1396 foreach(indexoidscan, indexoidlist)
1398 Oid indexoid = lfirst_oid(indexoidscan);
1400 Form_pg_index indexform;
1404 * Scan through index columns to see if there's any simple index
1405 * entries for this attribute. We ignore expressional entries.
1407 indextup = SearchSysCache(INDEXRELID,
1408 ObjectIdGetDatum(indexoid),
1410 if (!HeapTupleIsValid(indextup))
1411 elog(ERROR, "cache lookup failed for index %u", indexoid);
1412 indexform = (Form_pg_index) GETSTRUCT(indextup);
1414 for (i = 0; i < indexform->indnatts; i++)
1416 if (attnum != indexform->indkey.values[i])
1420 * Found one, rename it.
1422 atttup = SearchSysCacheCopy(ATTNUM,
1423 ObjectIdGetDatum(indexoid),
1424 Int16GetDatum(i + 1),
1426 if (!HeapTupleIsValid(atttup))
1427 continue; /* should we raise an error? */
1430 * Update the (copied) attribute tuple.
1432 namestrcpy(&(((Form_pg_attribute) GETSTRUCT(atttup))->attname),
1435 simple_heap_update(attrelation, &atttup->t_self, atttup);
1437 /* keep system catalog indexes current */
1438 CatalogUpdateIndexes(attrelation, atttup);
1440 heap_freetuple(atttup);
1443 ReleaseSysCache(indextup);
1446 list_free(indexoidlist);
1448 heap_close(attrelation, RowExclusiveLock);
1451 * Update att name in any RI triggers associated with the relation.
1453 if (targetrelation->rd_rel->reltriggers > 0)
1455 /* update tgargs column reference where att is primary key */
1456 update_ri_trigger_args(RelationGetRelid(targetrelation),
1457 oldattname, newattname,
1459 /* update tgargs column reference where att is foreign key */
1460 update_ri_trigger_args(RelationGetRelid(targetrelation),
1461 oldattname, newattname,
1465 relation_close(targetrelation, NoLock); /* close rel but keep lock */
1469 * renamerel - change the name of a relation
1471 * XXX - When renaming sequences, we don't bother to modify the
1472 * sequence name that is stored within the sequence itself
1473 * (this would cause problems with MVCC). In the future,
1474 * the sequence name should probably be removed from the
1475 * sequence, AFAIK there's no need for it to be there.
1478 renamerel(Oid myrelid, const char *newrelname)
1480 Relation targetrelation;
1481 Relation relrelation; /* for RELATION relation */
1486 bool relhastriggers;
1489 * Grab an exclusive lock on the target table or index, which we will NOT
1490 * release until end of transaction.
1492 targetrelation = relation_open(myrelid, AccessExclusiveLock);
1494 oldrelname = pstrdup(RelationGetRelationName(targetrelation));
1495 namespaceId = RelationGetNamespace(targetrelation);
1497 if (!allowSystemTableMods && IsSystemRelation(targetrelation))
1499 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1500 errmsg("permission denied: \"%s\" is a system catalog",
1501 RelationGetRelationName(targetrelation))));
1503 relkind = targetrelation->rd_rel->relkind;
1504 relhastriggers = (targetrelation->rd_rel->reltriggers > 0);
1507 * Find relation's pg_class tuple, and make sure newrelname isn't in use.
1509 relrelation = heap_open(RelationRelationId, RowExclusiveLock);
1511 reltup = SearchSysCacheCopy(RELOID,
1512 PointerGetDatum(myrelid),
1514 if (!HeapTupleIsValid(reltup)) /* shouldn't happen */
1515 elog(ERROR, "cache lookup failed for relation %u", myrelid);
1517 if (get_relname_relid(newrelname, namespaceId) != InvalidOid)
1519 (errcode(ERRCODE_DUPLICATE_TABLE),
1520 errmsg("relation \"%s\" already exists",
1524 * Update pg_class tuple with new relname. (Scribbling on reltup is OK
1525 * because it's a copy...)
1527 namestrcpy(&(((Form_pg_class) GETSTRUCT(reltup))->relname), newrelname);
1529 simple_heap_update(relrelation, &reltup->t_self, reltup);
1531 /* keep the system catalog indexes current */
1532 CatalogUpdateIndexes(relrelation, reltup);
1534 heap_freetuple(reltup);
1535 heap_close(relrelation, RowExclusiveLock);
1538 * Also rename the associated type, if any.
1540 if (relkind != RELKIND_INDEX)
1541 TypeRename(oldrelname, namespaceId, newrelname);
1544 * Update rel name in any RI triggers associated with the relation.
1548 /* update tgargs where relname is primary key */
1549 update_ri_trigger_args(myrelid,
1553 /* update tgargs where relname is foreign key */
1554 update_ri_trigger_args(myrelid,
1561 * Close rel, but keep exclusive lock!
1563 relation_close(targetrelation, NoLock);
1567 * Scan pg_trigger for RI triggers that are on the specified relation
1568 * (if fk_scan is false) or have it as the tgconstrrel (if fk_scan
1569 * is true). Update RI trigger args fields matching oldname to contain
1570 * newname instead. If update_relname is true, examine the relname
1571 * fields; otherwise examine the attname fields.
1574 update_ri_trigger_args(Oid relid,
1575 const char *oldname,
1576 const char *newname,
1578 bool update_relname)
1581 ScanKeyData skey[1];
1582 SysScanDesc trigscan;
1584 Datum values[Natts_pg_trigger];
1585 char nulls[Natts_pg_trigger];
1586 char replaces[Natts_pg_trigger];
1588 tgrel = heap_open(TriggerRelationId, RowExclusiveLock);
1591 ScanKeyInit(&skey[0],
1592 Anum_pg_trigger_tgconstrrelid,
1593 BTEqualStrategyNumber, F_OIDEQ,
1594 ObjectIdGetDatum(relid));
1595 trigscan = systable_beginscan(tgrel, TriggerConstrRelidIndexId,
1601 ScanKeyInit(&skey[0],
1602 Anum_pg_trigger_tgrelid,
1603 BTEqualStrategyNumber, F_OIDEQ,
1604 ObjectIdGetDatum(relid));
1605 trigscan = systable_beginscan(tgrel, TriggerRelidNameIndexId,
1610 while ((tuple = systable_getnext(trigscan)) != NULL)
1612 Form_pg_trigger pg_trigger = (Form_pg_trigger) GETSTRUCT(tuple);
1622 const char *arga[RI_MAX_ARGUMENTS];
1625 tg_type = RI_FKey_trigger_type(pg_trigger->tgfoid);
1626 if (tg_type == RI_TRIGGER_NONE)
1628 /* Not an RI trigger, forget it */
1633 * It is an RI trigger, so parse the tgargs bytea.
1635 * NB: we assume the field will never be compressed or moved out of
1636 * line; so does trigger.c ...
1638 tgnargs = pg_trigger->tgnargs;
1640 DatumGetPointer(fastgetattr(tuple,
1641 Anum_pg_trigger_tgargs,
1642 tgrel->rd_att, &isnull));
1643 if (isnull || tgnargs < RI_FIRST_ATTNAME_ARGNO ||
1644 tgnargs > RI_MAX_ARGUMENTS)
1646 /* This probably shouldn't happen, but ignore busted triggers */
1649 argp = (const char *) VARDATA(val);
1650 for (i = 0; i < tgnargs; i++)
1653 argp += strlen(argp) + 1;
1657 * Figure out which item(s) to look at. If the trigger is primary-key
1658 * type and attached to my rel, I should look at the PK fields; if it
1659 * is foreign-key type and attached to my rel, I should look at the FK
1660 * fields. But the opposite rule holds when examining triggers found
1661 * by tgconstrrel search.
1663 examine_pk = (tg_type == RI_TRIGGER_PK) == (!fk_scan);
1668 /* Change the relname if needed */
1669 i = examine_pk ? RI_PK_RELNAME_ARGNO : RI_FK_RELNAME_ARGNO;
1670 if (strcmp(arga[i], oldname) == 0)
1678 /* Change attname(s) if needed */
1679 i = examine_pk ? RI_FIRST_ATTNAME_ARGNO + RI_KEYPAIR_PK_IDX :
1680 RI_FIRST_ATTNAME_ARGNO + RI_KEYPAIR_FK_IDX;
1681 for (; i < tgnargs; i += 2)
1683 if (strcmp(arga[i], oldname) == 0)
1693 /* Don't need to update this tuple */
1698 * Construct modified tgargs bytea.
1701 for (i = 0; i < tgnargs; i++)
1702 newlen += strlen(arga[i]) + 1;
1703 newtgargs = (bytea *) palloc(newlen);
1704 VARATT_SIZEP(newtgargs) = newlen;
1706 for (i = 0; i < tgnargs; i++)
1708 strcpy(((char *) newtgargs) + newlen, arga[i]);
1709 newlen += strlen(arga[i]) + 1;
1713 * Build modified tuple.
1715 for (i = 0; i < Natts_pg_trigger; i++)
1717 values[i] = (Datum) 0;
1721 values[Anum_pg_trigger_tgargs - 1] = PointerGetDatum(newtgargs);
1722 replaces[Anum_pg_trigger_tgargs - 1] = 'r';
1724 tuple = heap_modifytuple(tuple, RelationGetDescr(tgrel), values, nulls, replaces);
1727 * Update pg_trigger and its indexes
1729 simple_heap_update(tgrel, &tuple->t_self, tuple);
1731 CatalogUpdateIndexes(tgrel, tuple);
1734 * Invalidate trigger's relation's relcache entry so that other
1735 * backends (and this one too!) are sent SI message to make them
1736 * rebuild relcache entries. (Ideally this should happen
1739 * We can skip this for triggers on relid itself, since that relcache
1740 * flush will happen anyway due to the table or column rename. We
1741 * just need to catch the far ends of RI relationships.
1743 pg_trigger = (Form_pg_trigger) GETSTRUCT(tuple);
1744 if (pg_trigger->tgrelid != relid)
1745 CacheInvalidateRelcacheByRelid(pg_trigger->tgrelid);
1747 /* free up our scratch memory */
1749 heap_freetuple(tuple);
1752 systable_endscan(trigscan);
1754 heap_close(tgrel, RowExclusiveLock);
1757 * Increment cmd counter to make updates visible; this is needed in case
1758 * the same tuple has to be updated again by next pass (can happen in case
1759 * of a self-referential FK relationship).
1761 CommandCounterIncrement();
1766 * Execute ALTER TABLE, which can be a list of subcommands
1768 * ALTER TABLE is performed in three phases:
1769 * 1. Examine subcommands and perform pre-transformation checking.
1770 * 2. Update system catalogs.
1771 * 3. Scan table(s) to check new constraints, and optionally recopy
1772 * the data into new table(s).
1773 * Phase 3 is not performed unless one or more of the subcommands requires
1774 * it. The intention of this design is to allow multiple independent
1775 * updates of the table schema to be performed with only one pass over the
1778 * ATPrepCmd performs phase 1. A "work queue" entry is created for
1779 * each table to be affected (there may be multiple affected tables if the
1780 * commands traverse a table inheritance hierarchy). Also we do preliminary
1781 * validation of the subcommands, including parse transformation of those
1782 * expressions that need to be evaluated with respect to the old table
1785 * ATRewriteCatalogs performs phase 2 for each affected table (note that
1786 * phases 2 and 3 do no explicit recursion, since phase 1 already did it).
1787 * Certain subcommands need to be performed before others to avoid
1788 * unnecessary conflicts; for example, DROP COLUMN should come before
1789 * ADD COLUMN. Therefore phase 1 divides the subcommands into multiple
1790 * lists, one for each logical "pass" of phase 2.
1792 * ATRewriteTables performs phase 3 for those tables that need it.
1794 * Thanks to the magic of MVCC, an error anywhere along the way rolls back
1795 * the whole operation; we don't have to do anything special to clean up.
1798 AlterTable(AlterTableStmt *stmt)
1800 ATController(relation_openrv(stmt->relation, AccessExclusiveLock),
1802 interpretInhOption(stmt->relation->inhOpt));
1806 * AlterTableInternal
1808 * ALTER TABLE with target specified by OID
1811 AlterTableInternal(Oid relid, List *cmds, bool recurse)
1813 ATController(relation_open(relid, AccessExclusiveLock),
1819 ATController(Relation rel, List *cmds, bool recurse)
1824 /* Phase 1: preliminary examination of commands, create work queue */
1827 AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lcmd);
1829 ATPrepCmd(&wqueue, rel, cmd, recurse, false);
1832 /* Close the relation, but keep lock until commit */
1833 relation_close(rel, NoLock);
1835 /* Phase 2: update system catalogs */
1836 ATRewriteCatalogs(&wqueue);
1838 /* Phase 3: scan/rewrite tables as needed */
1839 ATRewriteTables(&wqueue);
1845 * Traffic cop for ALTER TABLE Phase 1 operations, including simple
1846 * recursion and permission checks.
1848 * Caller must have acquired AccessExclusiveLock on relation already.
1849 * This lock should be held until commit.
1852 ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
1853 bool recurse, bool recursing)
1855 AlteredTableInfo *tab;
1858 /* Find or create work queue entry for this table */
1859 tab = ATGetQueueEntry(wqueue, rel);
1862 * Copy the original subcommand for each table. This avoids conflicts
1863 * when different child tables need to make different parse
1864 * transformations (for example, the same column may have different column
1865 * numbers in different children).
1867 cmd = copyObject(cmd);
1870 * Do permissions checking, recursion to child tables if needed, and any
1871 * additional phase-1 processing needed.
1873 switch (cmd->subtype)
1875 case AT_AddColumn: /* ADD COLUMN */
1876 ATSimplePermissions(rel, false);
1877 /* Performs own recursion */
1878 ATPrepAddColumn(wqueue, rel, recurse, cmd);
1879 pass = AT_PASS_ADD_COL;
1881 case AT_ColumnDefault: /* ALTER COLUMN DEFAULT */
1884 * We allow defaults on views so that INSERT into a view can have
1885 * default-ish behavior. This works because the rewriter
1886 * substitutes default values into INSERTs before it expands
1889 ATSimplePermissions(rel, true);
1890 ATSimpleRecursion(wqueue, rel, cmd, recurse);
1891 /* No command-specific prep needed */
1892 pass = AT_PASS_ADD_CONSTR;
1894 case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */
1895 ATSimplePermissions(rel, false);
1896 ATSimpleRecursion(wqueue, rel, cmd, recurse);
1897 /* No command-specific prep needed */
1898 pass = AT_PASS_DROP;
1900 case AT_SetNotNull: /* ALTER COLUMN SET NOT NULL */
1901 ATSimplePermissions(rel, false);
1902 ATSimpleRecursion(wqueue, rel, cmd, recurse);
1903 /* No command-specific prep needed */
1904 pass = AT_PASS_ADD_CONSTR;
1906 case AT_SetStatistics: /* ALTER COLUMN STATISTICS */
1907 ATSimpleRecursion(wqueue, rel, cmd, recurse);
1908 /* Performs own permission checks */
1909 ATPrepSetStatistics(rel, cmd->name, cmd->def);
1910 pass = AT_PASS_COL_ATTRS;
1912 case AT_SetStorage: /* ALTER COLUMN STORAGE */
1913 ATSimplePermissions(rel, false);
1914 ATSimpleRecursion(wqueue, rel, cmd, recurse);
1915 /* No command-specific prep needed */
1916 pass = AT_PASS_COL_ATTRS;
1918 case AT_DropColumn: /* DROP COLUMN */
1919 ATSimplePermissions(rel, false);
1920 /* Recursion occurs during execution phase */
1921 /* No command-specific prep needed except saving recurse flag */
1923 cmd->subtype = AT_DropColumnRecurse;
1924 pass = AT_PASS_DROP;
1926 case AT_AddIndex: /* ADD INDEX */
1927 ATSimplePermissions(rel, false);
1928 /* This command never recurses */
1929 /* No command-specific prep needed */
1930 pass = AT_PASS_ADD_INDEX;
1932 case AT_AddConstraint: /* ADD CONSTRAINT */
1933 ATSimplePermissions(rel, false);
1936 * Currently we recurse only for CHECK constraints, never for
1937 * foreign-key constraints. UNIQUE/PKEY constraints won't be seen
1940 if (IsA(cmd->def, Constraint))
1941 ATSimpleRecursion(wqueue, rel, cmd, recurse);
1942 /* No command-specific prep needed */
1943 pass = AT_PASS_ADD_CONSTR;
1945 case AT_DropConstraint: /* DROP CONSTRAINT */
1946 ATSimplePermissions(rel, false);
1947 /* Performs own recursion */
1948 ATPrepDropConstraint(wqueue, rel, recurse, cmd);
1949 pass = AT_PASS_DROP;
1951 case AT_DropConstraintQuietly: /* DROP CONSTRAINT for child */
1952 ATSimplePermissions(rel, false);
1953 ATSimpleRecursion(wqueue, rel, cmd, recurse);
1954 /* No command-specific prep needed */
1955 pass = AT_PASS_DROP;
1957 case AT_AlterColumnType: /* ALTER COLUMN TYPE */
1958 ATSimplePermissions(rel, false);
1959 /* Performs own recursion */
1960 ATPrepAlterColumnType(wqueue, tab, rel, recurse, recursing, cmd);
1961 pass = AT_PASS_ALTER_TYPE;
1963 case AT_ToastTable: /* CREATE TOAST TABLE */
1964 ATSimplePermissions(rel, false);
1965 /* This command never recurses */
1966 /* No command-specific prep needed */
1967 pass = AT_PASS_MISC;
1969 case AT_ChangeOwner: /* ALTER OWNER */
1970 /* This command never recurses */
1971 /* No command-specific prep needed */
1972 pass = AT_PASS_MISC;
1974 case AT_ClusterOn: /* CLUSTER ON */
1975 case AT_DropCluster: /* SET WITHOUT CLUSTER */
1976 ATSimplePermissions(rel, false);
1977 /* These commands never recurse */
1978 /* No command-specific prep needed */
1979 pass = AT_PASS_MISC;
1981 case AT_DropOids: /* SET WITHOUT OIDS */
1982 ATSimplePermissions(rel, false);
1983 /* Performs own recursion */
1984 if (rel->rd_rel->relhasoids)
1986 AlterTableCmd *dropCmd = makeNode(AlterTableCmd);
1988 dropCmd->subtype = AT_DropColumn;
1989 dropCmd->name = pstrdup("oid");
1990 dropCmd->behavior = cmd->behavior;
1991 ATPrepCmd(wqueue, rel, dropCmd, recurse, false);
1993 pass = AT_PASS_DROP;
1995 case AT_SetTableSpace: /* SET TABLESPACE */
1996 /* This command never recurses */
1997 ATPrepSetTableSpace(tab, rel, cmd->name);
1998 pass = AT_PASS_MISC; /* doesn't actually matter */
2000 case AT_EnableTrig: /* ENABLE TRIGGER variants */
2001 case AT_EnableTrigAll:
2002 case AT_EnableTrigUser:
2003 case AT_DisableTrig: /* DISABLE TRIGGER variants */
2004 case AT_DisableTrigAll:
2005 case AT_DisableTrigUser:
2006 ATSimplePermissions(rel, false);
2007 /* These commands never recurse */
2008 /* No command-specific prep needed */
2009 pass = AT_PASS_MISC;
2012 elog(ERROR, "unrecognized alter table type: %d",
2013 (int) cmd->subtype);
2014 pass = 0; /* keep compiler quiet */
2018 /* Add the subcommand to the appropriate list for phase 2 */
2019 tab->subcmds[pass] = lappend(tab->subcmds[pass], cmd);
2025 * Traffic cop for ALTER TABLE Phase 2 operations. Subcommands are
2026 * dispatched in a "safe" execution order (designed to avoid unnecessary
2030 ATRewriteCatalogs(List **wqueue)
2036 * We process all the tables "in parallel", one pass at a time. This is
2037 * needed because we may have to propagate work from one table to another
2038 * (specifically, ALTER TYPE on a foreign key's PK has to dispatch the
2039 * re-adding of the foreign key constraint to the other table). Work can
2040 * only be propagated into later passes, however.
2042 for (pass = 0; pass < AT_NUM_PASSES; pass++)
2044 /* Go through each table that needs to be processed */
2045 foreach(ltab, *wqueue)
2047 AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
2048 List *subcmds = tab->subcmds[pass];
2056 * Exclusive lock was obtained by phase 1, needn't get it again
2058 rel = relation_open(tab->relid, NoLock);
2060 foreach(lcmd, subcmds)
2061 ATExecCmd(tab, rel, (AlterTableCmd *) lfirst(lcmd));
2064 * After the ALTER TYPE pass, do cleanup work (this is not done in
2065 * ATExecAlterColumnType since it should be done only once if
2066 * multiple columns of a table are altered).
2068 if (pass == AT_PASS_ALTER_TYPE)
2069 ATPostAlterTypeCleanup(wqueue, tab);
2071 relation_close(rel, NoLock);
2076 * Do an implicit CREATE TOAST TABLE if we executed any subcommands that
2077 * might have added a column or changed column storage.
2079 foreach(ltab, *wqueue)
2081 AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
2083 if (tab->relkind == RELKIND_RELATION &&
2084 (tab->subcmds[AT_PASS_ADD_COL] ||
2085 tab->subcmds[AT_PASS_ALTER_TYPE] ||
2086 tab->subcmds[AT_PASS_COL_ATTRS]))
2087 AlterTableCreateToastTable(tab->relid, true);
2092 * ATExecCmd: dispatch a subcommand to appropriate execution routine
2095 ATExecCmd(AlteredTableInfo *tab, Relation rel, AlterTableCmd *cmd)
2097 switch (cmd->subtype)
2099 case AT_AddColumn: /* ADD COLUMN */
2100 ATExecAddColumn(tab, rel, (ColumnDef *) cmd->def);
2102 case AT_ColumnDefault: /* ALTER COLUMN DEFAULT */
2103 ATExecColumnDefault(rel, cmd->name, cmd->def);
2105 case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */
2106 ATExecDropNotNull(rel, cmd->name);
2108 case AT_SetNotNull: /* ALTER COLUMN SET NOT NULL */
2109 ATExecSetNotNull(tab, rel, cmd->name);
2111 case AT_SetStatistics: /* ALTER COLUMN STATISTICS */
2112 ATExecSetStatistics(rel, cmd->name, cmd->def);
2114 case AT_SetStorage: /* ALTER COLUMN STORAGE */
2115 ATExecSetStorage(rel, cmd->name, cmd->def);
2117 case AT_DropColumn: /* DROP COLUMN */
2118 ATExecDropColumn(rel, cmd->name, cmd->behavior, false, false);
2120 case AT_DropColumnRecurse: /* DROP COLUMN with recursion */
2121 ATExecDropColumn(rel, cmd->name, cmd->behavior, true, false);
2123 case AT_AddIndex: /* ADD INDEX */
2124 ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, false);
2126 case AT_ReAddIndex: /* ADD INDEX */
2127 ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, true);
2129 case AT_AddConstraint: /* ADD CONSTRAINT */
2130 ATExecAddConstraint(tab, rel, cmd->def);
2132 case AT_DropConstraint: /* DROP CONSTRAINT */
2133 ATExecDropConstraint(rel, cmd->name, cmd->behavior, false);
2135 case AT_DropConstraintQuietly: /* DROP CONSTRAINT for child */
2136 ATExecDropConstraint(rel, cmd->name, cmd->behavior, true);
2138 case AT_AlterColumnType: /* ALTER COLUMN TYPE */
2139 ATExecAlterColumnType(tab, rel, cmd->name, (TypeName *) cmd->def);
2141 case AT_ToastTable: /* CREATE TOAST TABLE */
2142 AlterTableCreateToastTable(RelationGetRelid(rel), false);
2144 case AT_ChangeOwner: /* ALTER OWNER */
2145 ATExecChangeOwner(RelationGetRelid(rel),
2146 get_roleid_checked(cmd->name),
2149 case AT_ClusterOn: /* CLUSTER ON */
2150 ATExecClusterOn(rel, cmd->name);
2152 case AT_DropCluster: /* SET WITHOUT CLUSTER */
2153 ATExecDropCluster(rel);
2155 case AT_DropOids: /* SET WITHOUT OIDS */
2158 * Nothing to do here; we'll have generated a DropColumn
2159 * subcommand to do the real work
2162 case AT_SetTableSpace: /* SET TABLESPACE */
2165 * Nothing to do here; Phase 3 does the work
2168 case AT_EnableTrig: /* ENABLE TRIGGER name */
2169 ATExecEnableDisableTrigger(rel, cmd->name, true, false);
2171 case AT_DisableTrig: /* DISABLE TRIGGER name */
2172 ATExecEnableDisableTrigger(rel, cmd->name, false, false);
2174 case AT_EnableTrigAll: /* ENABLE TRIGGER ALL */
2175 ATExecEnableDisableTrigger(rel, NULL, true, false);
2177 case AT_DisableTrigAll: /* DISABLE TRIGGER ALL */
2178 ATExecEnableDisableTrigger(rel, NULL, false, false);
2180 case AT_EnableTrigUser: /* ENABLE TRIGGER USER */
2181 ATExecEnableDisableTrigger(rel, NULL, true, true);
2183 case AT_DisableTrigUser: /* DISABLE TRIGGER USER */
2184 ATExecEnableDisableTrigger(rel, NULL, false, true);
2187 elog(ERROR, "unrecognized alter table type: %d",
2188 (int) cmd->subtype);
2193 * Bump the command counter to ensure the next subcommand in the sequence
2194 * can see the changes so far
2196 CommandCounterIncrement();
2200 * ATRewriteTables: ALTER TABLE phase 3
2203 ATRewriteTables(List **wqueue)
2207 /* Go through each table that needs to be checked or rewritten */
2208 foreach(ltab, *wqueue)
2210 AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
2213 * We only need to rewrite the table if at least one column needs to
2216 if (tab->newvals != NIL)
2218 /* Build a temporary relation and copy data */
2220 char NewHeapName[NAMEDATALEN];
2223 ObjectAddress object;
2225 OldHeap = heap_open(tab->relid, NoLock);
2228 * We can never allow rewriting of shared or nailed-in-cache
2229 * relations, because we can't support changing their relfilenode
2232 if (OldHeap->rd_rel->relisshared || OldHeap->rd_isnailed)
2234 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2235 errmsg("cannot rewrite system relation \"%s\"",
2236 RelationGetRelationName(OldHeap))));
2239 * Don't allow rewrite on temp tables of other backends ... their
2240 * local buffer manager is not going to cope.
2242 if (isOtherTempNamespace(RelationGetNamespace(OldHeap)))
2244 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2245 errmsg("cannot rewrite temporary tables of other sessions")));
2248 * Select destination tablespace (same as original unless user
2249 * requested a change)
2251 if (tab->newTableSpace)
2252 NewTableSpace = tab->newTableSpace;
2254 NewTableSpace = OldHeap->rd_rel->reltablespace;
2256 heap_close(OldHeap, NoLock);
2259 * Create the new heap, using a temporary name in the same
2260 * namespace as the existing table. NOTE: there is some risk of
2261 * collision with user relnames. Working around this seems more
2262 * trouble than it's worth; in particular, we can't create the new
2263 * heap in a different namespace from the old, or we will have
2264 * problems with the TEMP status of temp tables.
2266 snprintf(NewHeapName, sizeof(NewHeapName),
2267 "pg_temp_%u", tab->relid);
2269 OIDNewHeap = make_new_heap(tab->relid, NewHeapName, NewTableSpace);
2272 * Copy the heap data into the new table with the desired
2273 * modifications, and test the current data within the table
2274 * against new constraints generated by ALTER TABLE commands.
2276 ATRewriteTable(tab, OIDNewHeap);
2278 /* Swap the physical files of the old and new heaps. */
2279 swap_relation_files(tab->relid, OIDNewHeap);
2281 CommandCounterIncrement();
2283 /* Destroy new heap with old filenode */
2284 object.classId = RelationRelationId;
2285 object.objectId = OIDNewHeap;
2286 object.objectSubId = 0;
2289 * The new relation is local to our transaction and we know
2290 * nothing depends on it, so DROP_RESTRICT should be OK.
2292 performDeletion(&object, DROP_RESTRICT);
2293 /* performDeletion does CommandCounterIncrement at end */
2296 * Rebuild each index on the relation (but not the toast table,
2297 * which is all-new anyway). We do not need
2298 * CommandCounterIncrement() because reindex_relation does it.
2300 reindex_relation(tab->relid, false);
2305 * Test the current data within the table against new constraints
2306 * generated by ALTER TABLE commands, but don't rebuild data.
2308 if (tab->constraints != NIL)
2309 ATRewriteTable(tab, InvalidOid);
2312 * If we had SET TABLESPACE but no reason to reconstruct tuples,
2313 * just do a block-by-block copy.
2315 if (tab->newTableSpace)
2316 ATExecSetTableSpace(tab->relid, tab->newTableSpace);
2321 * Foreign key constraints are checked in a final pass, since (a) it's
2322 * generally best to examine each one separately, and (b) it's at least
2323 * theoretically possible that we have changed both relations of the
2324 * foreign key, and we'd better have finished both rewrites before we try
2325 * to read the tables.
2327 foreach(ltab, *wqueue)
2329 AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
2330 Relation rel = NULL;
2333 foreach(lcon, tab->constraints)
2335 NewConstraint *con = lfirst(lcon);
2337 if (con->contype == CONSTR_FOREIGN)
2339 FkConstraint *fkconstraint = (FkConstraint *) con->qual;
2344 /* Long since locked, no need for another */
2345 rel = heap_open(tab->relid, NoLock);
2348 refrel = heap_open(con->refrelid, RowShareLock);
2350 validateForeignKeyConstraint(fkconstraint, rel, refrel);
2352 heap_close(refrel, NoLock);
2357 heap_close(rel, NoLock);
2362 * ATRewriteTable: scan or rewrite one table
2364 * OIDNewHeap is InvalidOid if we don't need to rewrite
2367 ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap)
2371 TupleDesc oldTupDesc;
2372 TupleDesc newTupDesc;
2373 bool needscan = false;
2379 * Open the relation(s). We have surely already locked the existing
2382 oldrel = heap_open(tab->relid, NoLock);
2383 oldTupDesc = tab->oldDesc;
2384 newTupDesc = RelationGetDescr(oldrel); /* includes all mods */
2386 if (OidIsValid(OIDNewHeap))
2387 newrel = heap_open(OIDNewHeap, AccessExclusiveLock);
2392 * If we need to rewrite the table, the operation has to be propagated to
2393 * tables that use this table's rowtype as a column type.
2395 * (Eventually this will probably become true for scans as well, but at
2396 * the moment a composite type does not enforce any constraints, so it's
2397 * not necessary/appropriate to enforce them just during ALTER.)
2400 find_composite_type_dependencies(oldrel->rd_rel->reltype,
2401 RelationGetRelationName(oldrel));
2404 * Generate the constraint and default execution states
2407 estate = CreateExecutorState();
2409 /* Build the needed expression execution states */
2410 foreach(l, tab->constraints)
2412 NewConstraint *con = lfirst(l);
2414 switch (con->contype)
2418 con->qualstate = (List *)
2419 ExecPrepareExpr((Expr *) con->qual, estate);
2421 case CONSTR_FOREIGN:
2422 /* Nothing to do here */
2424 case CONSTR_NOTNULL:
2428 elog(ERROR, "unrecognized constraint type: %d",
2429 (int) con->contype);
2433 foreach(l, tab->newvals)
2435 NewColumnValue *ex = lfirst(l);
2439 ex->exprstate = ExecPrepareExpr((Expr *) ex->expr, estate);
2444 ExprContext *econtext;
2447 TupleTableSlot *oldslot;
2448 TupleTableSlot *newslot;
2451 MemoryContext oldCxt;
2452 List *dropped_attrs = NIL;
2455 econtext = GetPerTupleExprContext(estate);
2458 * Make tuple slots for old and new tuples. Note that even when the
2459 * tuples are the same, the tupDescs might not be (consider ADD COLUMN
2460 * without a default).
2462 oldslot = MakeSingleTupleTableSlot(oldTupDesc);
2463 newslot = MakeSingleTupleTableSlot(newTupDesc);
2465 /* Preallocate values/isnull arrays */
2466 i = Max(newTupDesc->natts, oldTupDesc->natts);
2467 values = (Datum *) palloc(i * sizeof(Datum));
2468 isnull = (bool *) palloc(i * sizeof(bool));
2469 memset(values, 0, i * sizeof(Datum));
2470 memset(isnull, true, i * sizeof(bool));
2473 * Any attributes that are dropped according to the new tuple
2474 * descriptor can be set to NULL. We precompute the list of dropped
2475 * attributes to avoid needing to do so in the per-tuple loop.
2477 for (i = 0; i < newTupDesc->natts; i++)
2479 if (newTupDesc->attrs[i]->attisdropped)
2480 dropped_attrs = lappend_int(dropped_attrs, i);
2484 * Scan through the rows, generating a new row if needed and then
2485 * checking all the constraints.
2487 scan = heap_beginscan(oldrel, SnapshotNow, 0, NULL);
2490 * Switch to per-tuple memory context and reset it for each tuple
2491 * produced, so we don't leak memory.
2493 oldCxt = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2495 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
2499 Oid tupOid = InvalidOid;
2501 /* Extract data from old tuple */
2502 heap_deform_tuple(tuple, oldTupDesc, values, isnull);
2503 if (oldTupDesc->tdhasoid)
2504 tupOid = HeapTupleGetOid(tuple);
2506 /* Set dropped attributes to null in new tuple */
2507 foreach(lc, dropped_attrs)
2508 isnull[lfirst_int(lc)] = true;
2511 * Process supplied expressions to replace selected columns.
2512 * Expression inputs come from the old tuple.
2514 ExecStoreTuple(tuple, oldslot, InvalidBuffer, false);
2515 econtext->ecxt_scantuple = oldslot;
2517 foreach(l, tab->newvals)
2519 NewColumnValue *ex = lfirst(l);
2521 values[ex->attnum - 1] = ExecEvalExpr(ex->exprstate,
2523 &isnull[ex->attnum - 1],
2528 * Form the new tuple. Note that we don't explicitly pfree it,
2529 * since the per-tuple memory context will be reset shortly.
2531 tuple = heap_form_tuple(newTupDesc, values, isnull);
2533 /* Preserve OID, if any */
2534 if (newTupDesc->tdhasoid)
2535 HeapTupleSetOid(tuple, tupOid);
2538 /* Now check any constraints on the possibly-changed tuple */
2539 ExecStoreTuple(tuple, newslot, InvalidBuffer, false);
2540 econtext->ecxt_scantuple = newslot;
2542 foreach(l, tab->constraints)
2544 NewConstraint *con = lfirst(l);
2546 switch (con->contype)
2549 if (!ExecQual(con->qualstate, econtext, true))
2551 (errcode(ERRCODE_CHECK_VIOLATION),
2552 errmsg("check constraint \"%s\" is violated by some row",
2555 case CONSTR_NOTNULL:
2560 d = heap_getattr(tuple, con->attnum, newTupDesc,
2564 (errcode(ERRCODE_NOT_NULL_VIOLATION),
2565 errmsg("column \"%s\" contains null values",
2566 get_attname(tab->relid,
2570 case CONSTR_FOREIGN:
2571 /* Nothing to do here */
2574 elog(ERROR, "unrecognized constraint type: %d",
2575 (int) con->contype);
2579 /* Write the tuple out to the new relation */
2581 simple_heap_insert(newrel, tuple);
2583 ResetExprContext(econtext);
2585 CHECK_FOR_INTERRUPTS();
2588 MemoryContextSwitchTo(oldCxt);
2592 FreeExecutorState(estate);
2594 heap_close(oldrel, NoLock);
2596 heap_close(newrel, NoLock);
2600 * ATGetQueueEntry: find or create an entry in the ALTER TABLE work queue
2602 static AlteredTableInfo *
2603 ATGetQueueEntry(List **wqueue, Relation rel)
2605 Oid relid = RelationGetRelid(rel);
2606 AlteredTableInfo *tab;
2609 foreach(ltab, *wqueue)
2611 tab = (AlteredTableInfo *) lfirst(ltab);
2612 if (tab->relid == relid)
2617 * Not there, so add it. Note that we make a copy of the relation's
2618 * existing descriptor before anything interesting can happen to it.
2620 tab = (AlteredTableInfo *) palloc0(sizeof(AlteredTableInfo));
2622 tab->relkind = rel->rd_rel->relkind;
2623 tab->oldDesc = CreateTupleDescCopy(RelationGetDescr(rel));
2625 *wqueue = lappend(*wqueue, tab);
2631 * ATSimplePermissions
2633 * - Ensure that it is a relation (or possibly a view)
2634 * - Ensure this user is the owner
2635 * - Ensure that it is not a system table
2638 ATSimplePermissions(Relation rel, bool allowView)
2640 if (rel->rd_rel->relkind != RELKIND_RELATION)
2644 if (rel->rd_rel->relkind != RELKIND_VIEW)
2646 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2647 errmsg("\"%s\" is not a table or view",
2648 RelationGetRelationName(rel))));
2652 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2653 errmsg("\"%s\" is not a table",
2654 RelationGetRelationName(rel))));
2657 /* Permissions checks */
2658 if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
2659 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
2660 RelationGetRelationName(rel));
2662 if (!allowSystemTableMods && IsSystemRelation(rel))
2664 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2665 errmsg("permission denied: \"%s\" is a system catalog",
2666 RelationGetRelationName(rel))));
2672 * Simple table recursion sufficient for most ALTER TABLE operations.
2673 * All direct and indirect children are processed in an unspecified order.
2674 * Note that if a child inherits from the original table via multiple
2675 * inheritance paths, it will be visited just once.
2678 ATSimpleRecursion(List **wqueue, Relation rel,
2679 AlterTableCmd *cmd, bool recurse)
2682 * Propagate to children if desired. Non-table relations never have
2683 * children, so no need to search in that case.
2685 if (recurse && rel->rd_rel->relkind == RELKIND_RELATION)
2687 Oid relid = RelationGetRelid(rel);
2691 /* this routine is actually in the planner */
2692 children = find_all_inheritors(relid);
2695 * find_all_inheritors does the recursive search of the inheritance
2696 * hierarchy, so all we have to do is process all of the relids in the
2697 * list that it returns.
2699 foreach(child, children)
2701 Oid childrelid = lfirst_oid(child);
2704 if (childrelid == relid)
2706 childrel = relation_open(childrelid, AccessExclusiveLock);
2707 ATPrepCmd(wqueue, childrel, cmd, false, true);
2708 relation_close(childrel, NoLock);
2714 * ATOneLevelRecursion
2716 * Here, we visit only direct inheritance children. It is expected that
2717 * the command's prep routine will recurse again to find indirect children.
2718 * When using this technique, a multiply-inheriting child will be visited
2722 ATOneLevelRecursion(List **wqueue, Relation rel,
2725 Oid relid = RelationGetRelid(rel);
2729 /* this routine is actually in the planner */
2730 children = find_inheritance_children(relid);
2732 foreach(child, children)
2734 Oid childrelid = lfirst_oid(child);
2737 childrel = relation_open(childrelid, AccessExclusiveLock);
2738 ATPrepCmd(wqueue, childrel, cmd, true, true);
2739 relation_close(childrel, NoLock);
2745 * find_composite_type_dependencies
2747 * Check to see if a table's rowtype is being used as a column in some
2748 * other table (possibly nested several levels deep in composite types!).
2749 * Eventually, we'd like to propagate the check or rewrite operation
2750 * into other such tables, but for now, just error out if we find any.
2752 * We assume that functions and views depending on the type are not reasons
2753 * to reject the ALTER. (How safe is this really?)
2756 find_composite_type_dependencies(Oid typeOid, const char *origTblName)
2760 SysScanDesc depScan;
2764 * We scan pg_depend to find those things that depend on the rowtype. (We
2765 * assume we can ignore refobjsubid for a rowtype.)
2767 depRel = heap_open(DependRelationId, AccessShareLock);
2769 ScanKeyInit(&key[0],
2770 Anum_pg_depend_refclassid,
2771 BTEqualStrategyNumber, F_OIDEQ,
2772 ObjectIdGetDatum(TypeRelationId));
2773 ScanKeyInit(&key[1],
2774 Anum_pg_depend_refobjid,
2775 BTEqualStrategyNumber, F_OIDEQ,
2776 ObjectIdGetDatum(typeOid));
2778 depScan = systable_beginscan(depRel, DependReferenceIndexId, true,
2779 SnapshotNow, 2, key);
2781 while (HeapTupleIsValid(depTup = systable_getnext(depScan)))
2783 Form_pg_depend pg_depend = (Form_pg_depend) GETSTRUCT(depTup);
2785 Form_pg_attribute att;
2787 /* Ignore dependees that aren't user columns of relations */
2788 /* (we assume system columns are never of rowtypes) */
2789 if (pg_depend->classid != RelationRelationId ||
2790 pg_depend->objsubid <= 0)
2793 rel = relation_open(pg_depend->objid, AccessShareLock);
2794 att = rel->rd_att->attrs[pg_depend->objsubid - 1];
2796 if (rel->rd_rel->relkind == RELKIND_RELATION)
2799 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2800 errmsg("cannot alter table \"%s\" because column \"%s\".\"%s\" uses its rowtype",
2802 RelationGetRelationName(rel),
2803 NameStr(att->attname))));
2805 else if (OidIsValid(rel->rd_rel->reltype))
2808 * A view or composite type itself isn't a problem, but we must
2809 * recursively check for indirect dependencies via its rowtype.
2811 find_composite_type_dependencies(rel->rd_rel->reltype,
2815 relation_close(rel, AccessShareLock);
2818 systable_endscan(depScan);
2820 relation_close(depRel, AccessShareLock);
2825 * ALTER TABLE ADD COLUMN
2827 * Adds an additional attribute to a relation making the assumption that
2828 * CHECK, NOT NULL, and FOREIGN KEY constraints will be removed from the
2829 * AT_AddColumn AlterTableCmd by analyze.c and added as independent
2833 ATPrepAddColumn(List **wqueue, Relation rel, bool recurse,
2837 * Recurse to add the column to child classes, if requested.
2839 * We must recurse one level at a time, so that multiply-inheriting
2840 * children are visited the right number of times and end up with the
2841 * right attinhcount.
2845 AlterTableCmd *childCmd = copyObject(cmd);
2846 ColumnDef *colDefChild = (ColumnDef *) childCmd->def;
2848 /* Child should see column as singly inherited */
2849 colDefChild->inhcount = 1;
2850 colDefChild->is_local = false;
2851 /* and don't make a support dependency on the child */
2852 colDefChild->support = NULL;
2854 ATOneLevelRecursion(wqueue, rel, childCmd);
2859 * If we are told not to recurse, there had better not be any child
2860 * tables; else the addition would put them out of step.
2862 if (find_inheritance_children(RelationGetRelid(rel)) != NIL)
2864 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
2865 errmsg("column must be added to child tables too")));
2870 ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
2873 Oid myrelid = RelationGetRelid(rel);
2877 HeapTuple attributeTuple;
2878 Form_pg_attribute attribute;
2879 FormData_pg_attribute attributeD;
2883 HeapTuple typeTuple;
2888 attrdesc = heap_open(AttributeRelationId, RowExclusiveLock);
2891 * Are we adding the column to a recursion child? If so, check whether to
2892 * merge with an existing definition for the column.
2894 if (colDef->inhcount > 0)
2898 /* Does child already have a column by this name? */
2899 tuple = SearchSysCacheCopyAttName(myrelid, colDef->colname);
2900 if (HeapTupleIsValid(tuple))
2902 Form_pg_attribute childatt = (Form_pg_attribute) GETSTRUCT(tuple);
2904 /* Okay if child matches by type */
2905 if (typenameTypeId(colDef->typename) != childatt->atttypid ||
2906 colDef->typename->typmod != childatt->atttypmod)
2908 (errcode(ERRCODE_DATATYPE_MISMATCH),
2909 errmsg("child table \"%s\" has different type for column \"%s\"",
2910 RelationGetRelationName(rel), colDef->colname)));
2912 /* Bump the existing child att's inhcount */
2913 childatt->attinhcount++;
2914 simple_heap_update(attrdesc, &tuple->t_self, tuple);
2915 CatalogUpdateIndexes(attrdesc, tuple);
2917 heap_freetuple(tuple);
2919 /* Inform the user about the merge */
2921 (errmsg("merging definition of column \"%s\" for child \"%s\"",
2922 colDef->colname, RelationGetRelationName(rel))));
2924 heap_close(attrdesc, RowExclusiveLock);
2929 pgclass = heap_open(RelationRelationId, RowExclusiveLock);
2931 reltup = SearchSysCacheCopy(RELOID,
2932 ObjectIdGetDatum(myrelid),
2934 if (!HeapTupleIsValid(reltup))
2935 elog(ERROR, "cache lookup failed for relation %u", myrelid);
2938 * this test is deliberately not attisdropped-aware, since if one tries to
2939 * add a column matching a dropped column name, it's gonna fail anyway.
2941 if (SearchSysCacheExists(ATTNAME,
2942 ObjectIdGetDatum(myrelid),
2943 PointerGetDatum(colDef->colname),
2946 (errcode(ERRCODE_DUPLICATE_COLUMN),
2947 errmsg("column \"%s\" of relation \"%s\" already exists",
2948 colDef->colname, RelationGetRelationName(rel))));
2950 minattnum = ((Form_pg_class) GETSTRUCT(reltup))->relnatts;
2951 maxatts = minattnum + 1;
2952 if (maxatts > MaxHeapAttributeNumber)
2954 (errcode(ERRCODE_TOO_MANY_COLUMNS),
2955 errmsg("tables can have at most %d columns",
2956 MaxHeapAttributeNumber)));
2959 typeTuple = typenameType(colDef->typename);
2960 tform = (Form_pg_type) GETSTRUCT(typeTuple);
2961 typeOid = HeapTupleGetOid(typeTuple);
2963 /* make sure datatype is legal for a column */
2964 CheckAttributeType(colDef->colname, typeOid);
2966 attributeTuple = heap_addheader(Natts_pg_attribute,
2968 ATTRIBUTE_TUPLE_SIZE,
2969 (void *) &attributeD);
2971 attribute = (Form_pg_attribute) GETSTRUCT(attributeTuple);
2973 attribute->attrelid = myrelid;
2974 namestrcpy(&(attribute->attname), colDef->colname);
2975 attribute->atttypid = typeOid;
2976 attribute->attstattarget = -1;
2977 attribute->attlen = tform->typlen;
2978 attribute->attcacheoff = -1;
2979 attribute->atttypmod = colDef->typename->typmod;
2980 attribute->attnum = i;
2981 attribute->attbyval = tform->typbyval;
2982 attribute->attndims = list_length(colDef->typename->arrayBounds);
2983 attribute->attstorage = tform->typstorage;
2984 attribute->attalign = tform->typalign;
2985 attribute->attnotnull = colDef->is_not_null;
2986 attribute->atthasdef = false;
2987 attribute->attisdropped = false;
2988 attribute->attislocal = colDef->is_local;
2989 attribute->attinhcount = colDef->inhcount;
2991 ReleaseSysCache(typeTuple);
2993 simple_heap_insert(attrdesc, attributeTuple);
2995 /* Update indexes on pg_attribute */
2996 CatalogUpdateIndexes(attrdesc, attributeTuple);
2998 heap_close(attrdesc, RowExclusiveLock);
3001 * Update number of attributes in pg_class tuple
3003 ((Form_pg_class) GETSTRUCT(reltup))->relnatts = maxatts;
3005 simple_heap_update(pgclass, &reltup->t_self, reltup);
3007 /* keep catalog indexes current */
3008 CatalogUpdateIndexes(pgclass, reltup);
3010 heap_freetuple(reltup);
3012 heap_close(pgclass, RowExclusiveLock);
3014 /* Make the attribute's catalog entry visible */
3015 CommandCounterIncrement();
3018 * Store the DEFAULT, if any, in the catalogs
3020 if (colDef->raw_default)
3022 RawColumnDefault *rawEnt;
3024 rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault));
3025 rawEnt->attnum = attribute->attnum;
3026 rawEnt->raw_default = copyObject(colDef->raw_default);
3029 * This function is intended for CREATE TABLE, so it processes a
3030 * _list_ of defaults, but we just do one.
3032 AddRelationRawConstraints(rel, list_make1(rawEnt), NIL);
3034 /* Make the additional catalog changes visible */
3035 CommandCounterIncrement();
3039 * Tell Phase 3 to fill in the default expression, if there is one.
3041 * If there is no default, Phase 3 doesn't have to do anything, because
3042 * that effectively means that the default is NULL. The heap tuple access
3043 * routines always check for attnum > # of attributes in tuple, and return
3044 * NULL if so, so without any modification of the tuple data we will get
3045 * the effect of NULL values in the new column.
3047 * An exception occurs when the new column is of a domain type: the domain
3048 * might have a NOT NULL constraint, or a check constraint that indirectly
3049 * rejects nulls. If there are any domain constraints then we construct
3050 * an explicit NULL default value that will be passed through
3051 * CoerceToDomain processing. (This is a tad inefficient, since it causes
3052 * rewriting the table which we really don't have to do, but the present
3053 * design of domain processing doesn't offer any simple way of checking
3054 * the constraints more directly.)
3056 * Note: we use build_column_default, and not just the cooked default
3057 * returned by AddRelationRawConstraints, so that the right thing happens
3058 * when a datatype's default applies.
3060 defval = (Expr *) build_column_default(rel, attribute->attnum);
3062 if (!defval && GetDomainConstraints(typeOid) != NIL)
3064 Oid basetype = getBaseType(typeOid);
3066 defval = (Expr *) makeNullConst(basetype);
3067 defval = (Expr *) coerce_to_target_type(NULL,
3071 colDef->typename->typmod,
3072 COERCION_ASSIGNMENT,
3073 COERCE_IMPLICIT_CAST);
3074 if (defval == NULL) /* should not happen */
3075 elog(ERROR, "failed to coerce base type to domain");
3080 NewColumnValue *newval;
3082 newval = (NewColumnValue *) palloc0(sizeof(NewColumnValue));
3083 newval->attnum = attribute->attnum;
3084 newval->expr = defval;
3086 tab->newvals = lappend(tab->newvals, newval);
3090 * Add needed dependency entries for the new column.
3092 add_column_datatype_dependency(myrelid, i, attribute->atttypid);
3093 if (colDef->support != NULL)
3094 add_column_support_dependency(myrelid, i, colDef->support);
3098 * Install a column's dependency on its datatype.
3101 add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid)
3103 ObjectAddress myself,
3106 myself.classId = RelationRelationId;
3107 myself.objectId = relid;
3108 myself.objectSubId = attnum;
3109 referenced.classId = TypeRelationId;
3110 referenced.objectId = typid;
3111 referenced.objectSubId = 0;
3112 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
3116 * Install a dependency for a column's supporting relation (serial sequence).
3119 add_column_support_dependency(Oid relid, int32 attnum, RangeVar *support)
3121 ObjectAddress colobject,
3124 colobject.classId = RelationRelationId;
3125 colobject.objectId = relid;
3126 colobject.objectSubId = attnum;
3127 suppobject.classId = RelationRelationId;
3128 suppobject.objectId = RangeVarGetRelid(support, false);
3129 suppobject.objectSubId = 0;
3130 recordDependencyOn(&suppobject, &colobject, DEPENDENCY_INTERNAL);
3134 * ALTER TABLE ALTER COLUMN DROP NOT NULL
3137 ATExecDropNotNull(Relation rel, const char *colName)
3143 ListCell *indexoidscan;
3146 * lookup the attribute
3148 attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
3150 tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
3152 if (!HeapTupleIsValid(tuple))
3154 (errcode(ERRCODE_UNDEFINED_COLUMN),
3155 errmsg("column \"%s\" of relation \"%s\" does not exist",
3156 colName, RelationGetRelationName(rel))));
3158 attnum = ((Form_pg_attribute) GETSTRUCT(tuple))->attnum;
3160 /* Prevent them from altering a system attribute */
3163 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3164 errmsg("cannot alter system column \"%s\"",
3168 * Check that the attribute is not in a primary key
3171 /* Loop over all indexes on the relation */
3172 indexoidlist = RelationGetIndexList(rel);
3174 foreach(indexoidscan, indexoidlist)
3176 Oid indexoid = lfirst_oid(indexoidscan);
3177 HeapTuple indexTuple;
3178 Form_pg_index indexStruct;
3181 indexTuple = SearchSysCache(INDEXRELID,
3182 ObjectIdGetDatum(indexoid),
3184 if (!HeapTupleIsValid(indexTuple))
3185 elog(ERROR, "cache lookup failed for index %u", indexoid);
3186 indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
3188 /* If the index is not a primary key, skip the check */
3189 if (indexStruct->indisprimary)
3192 * Loop over each attribute in the primary key and see if it
3193 * matches the to-be-altered attribute
3195 for (i = 0; i < indexStruct->indnatts; i++)
3197 if (indexStruct->indkey.values[i] == attnum)
3199 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
3200 errmsg("column \"%s\" is in a primary key",
3205 ReleaseSysCache(indexTuple);
3208 list_free(indexoidlist);
3211 * Okay, actually perform the catalog change ... if needed
3213 if (((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull)
3215 ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull = FALSE;
3217 simple_heap_update(attr_rel, &tuple->t_self, tuple);
3219 /* keep the system catalog indexes current */
3220 CatalogUpdateIndexes(attr_rel, tuple);
3223 heap_close(attr_rel, RowExclusiveLock);
3227 * ALTER TABLE ALTER COLUMN SET NOT NULL
3230 ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
3231 const char *colName)
3236 NewConstraint *newcon;
3239 * lookup the attribute
3241 attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
3243 tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
3245 if (!HeapTupleIsValid(tuple))
3247 (errcode(ERRCODE_UNDEFINED_COLUMN),
3248 errmsg("column \"%s\" of relation \"%s\" does not exist",
3249 colName, RelationGetRelationName(rel))));
3251 attnum = ((Form_pg_attribute) GETSTRUCT(tuple))->attnum;
3253 /* Prevent them from altering a system attribute */
3256 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3257 errmsg("cannot alter system column \"%s\"",
3261 * Okay, actually perform the catalog change ... if needed
3263 if (!((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull)
3265 ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull = TRUE;
3267 simple_heap_update(attr_rel, &tuple->t_self, tuple);
3269 /* keep the system catalog indexes current */
3270 CatalogUpdateIndexes(attr_rel, tuple);
3272 /* Tell Phase 3 to test the constraint */
3273 newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
3274 newcon->contype = CONSTR_NOTNULL;
3275 newcon->attnum = attnum;
3276 newcon->name = "NOT NULL";
3278 tab->constraints = lappend(tab->constraints, newcon);
3281 heap_close(attr_rel, RowExclusiveLock);
3285 * ALTER TABLE ALTER COLUMN SET/DROP DEFAULT
3288 ATExecColumnDefault(Relation rel, const char *colName,
3294 * get the number of the attribute
3296 attnum = get_attnum(RelationGetRelid(rel), colName);
3297 if (attnum == InvalidAttrNumber)
3299 (errcode(ERRCODE_UNDEFINED_COLUMN),
3300 errmsg("column \"%s\" of relation \"%s\" does not exist",
3301 colName, RelationGetRelationName(rel))));
3303 /* Prevent them from altering a system attribute */
3306 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3307 errmsg("cannot alter system column \"%s\"",
3311 * Remove any old default for the column. We use RESTRICT here for
3312 * safety, but at present we do not expect anything to depend on the
3315 RemoveAttrDefault(RelationGetRelid(rel), attnum, DROP_RESTRICT, false);
3320 RawColumnDefault *rawEnt;
3322 rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault));
3323 rawEnt->attnum = attnum;
3324 rawEnt->raw_default = newDefault;
3327 * This function is intended for CREATE TABLE, so it processes a
3328 * _list_ of defaults, but we just do one.
3330 AddRelationRawConstraints(rel, list_make1(rawEnt), NIL);
3335 * ALTER TABLE ALTER COLUMN SET STATISTICS
3338 ATPrepSetStatistics(Relation rel, const char *colName, Node *flagValue)
3341 * We do our own permission checking because (a) we want to allow SET
3342 * STATISTICS on indexes (for expressional index columns), and (b) we want
3343 * to allow SET STATISTICS on system catalogs without requiring
3344 * allowSystemTableMods to be turned on.
3346 if (rel->rd_rel->relkind != RELKIND_RELATION &&
3347 rel->rd_rel->relkind != RELKIND_INDEX)
3349 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
3350 errmsg("\"%s\" is not a table or index",
3351 RelationGetRelationName(rel))));
3353 /* Permissions checks */
3354 if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
3355 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
3356 RelationGetRelationName(rel));
3360 ATExecSetStatistics(Relation rel, const char *colName, Node *newValue)
3363 Relation attrelation;
3365 Form_pg_attribute attrtuple;
3367 Assert(IsA(newValue, Integer));
3368 newtarget = intVal(newValue);
3371 * Limit target to a sane range
3376 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3377 errmsg("statistics target %d is too low",
3380 else if (newtarget > 1000)
3384 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3385 errmsg("lowering statistics target to %d",
3389 attrelation = heap_open(AttributeRelationId, RowExclusiveLock);
3391 tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
3393 if (!HeapTupleIsValid(tuple))
3395 (errcode(ERRCODE_UNDEFINED_COLUMN),
3396 errmsg("column \"%s\" of relation \"%s\" does not exist",
3397 colName, RelationGetRelationName(rel))));
3398 attrtuple = (Form_pg_attribute) GETSTRUCT(tuple);
3400 if (attrtuple->attnum <= 0)
3402 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3403 errmsg("cannot alter system column \"%s\"",
3406 attrtuple->attstattarget = newtarget;
3408 simple_heap_update(attrelation, &tuple->t_self, tuple);
3410 /* keep system catalog indexes current */
3411 CatalogUpdateIndexes(attrelation, tuple);
3413 heap_freetuple(tuple);
3415 heap_close(attrelation, RowExclusiveLock);
3419 * ALTER TABLE ALTER COLUMN SET STORAGE
3422 ATExecSetStorage(Relation rel, const char *colName, Node *newValue)
3426 Relation attrelation;
3428 Form_pg_attribute attrtuple;
3430 Assert(IsA(newValue, String));
3431 storagemode = strVal(newValue);
3433 if (pg_strcasecmp(storagemode, "plain") == 0)
3435 else if (pg_strcasecmp(storagemode, "external") == 0)
3437 else if (pg_strcasecmp(storagemode, "extended") == 0)
3439 else if (pg_strcasecmp(storagemode, "main") == 0)
3444 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3445 errmsg("invalid storage type \"%s\"",
3447 newstorage = 0; /* keep compiler quiet */
3450 attrelation = heap_open(AttributeRelationId, RowExclusiveLock);
3452 tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
3454 if (!HeapTupleIsValid(tuple))
3456 (errcode(ERRCODE_UNDEFINED_COLUMN),
3457 errmsg("column \"%s\" of relation \"%s\" does not exist",
3458 colName, RelationGetRelationName(rel))));
3459 attrtuple = (Form_pg_attribute) GETSTRUCT(tuple);
3461 if (attrtuple->attnum <= 0)
3463 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3464 errmsg("cannot alter system column \"%s\"",
3468 * safety check: do not allow toasted storage modes unless column datatype
3471 if (newstorage == 'p' || TypeIsToastable(attrtuple->atttypid))
3472 attrtuple->attstorage = newstorage;
3475 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3476 errmsg("column data type %s can only have storage PLAIN",
3477 format_type_be(attrtuple->atttypid))));
3479 simple_heap_update(attrelation, &tuple->t_self, tuple);
3481 /* keep system catalog indexes current */
3482 CatalogUpdateIndexes(attrelation, tuple);
3484 heap_freetuple(tuple);
3486 heap_close(attrelation, RowExclusiveLock);
3491 * ALTER TABLE DROP COLUMN
3493 * DROP COLUMN cannot use the normal ALTER TABLE recursion mechanism,
3494 * because we have to decide at runtime whether to recurse or not depending
3495 * on whether attinhcount goes to zero or not. (We can't check this in a
3496 * static pre-pass because it won't handle multiple inheritance situations
3497 * correctly.) Since DROP COLUMN doesn't need to create any work queue
3498 * entries for Phase 3, it's okay to recurse internally in this routine
3499 * without considering the work queue.
3502 ATExecDropColumn(Relation rel, const char *colName,
3503 DropBehavior behavior,
3504 bool recurse, bool recursing)
3507 Form_pg_attribute targetatt;
3510 ObjectAddress object;
3512 /* At top level, permission check was done in ATPrepCmd, else do it */
3514 ATSimplePermissions(rel, false);
3517 * get the number of the attribute
3519 tuple = SearchSysCacheAttName(RelationGetRelid(rel), colName);
3520 if (!HeapTupleIsValid(tuple))
3522 (errcode(ERRCODE_UNDEFINED_COLUMN),
3523 errmsg("column \"%s\" of relation \"%s\" does not exist",
3524 colName, RelationGetRelationName(rel))));
3525 targetatt = (Form_pg_attribute) GETSTRUCT(tuple);
3527 attnum = targetatt->attnum;
3529 /* Can't drop a system attribute, except OID */
3530 if (attnum <= 0 && attnum != ObjectIdAttributeNumber)
3532 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3533 errmsg("cannot drop system column \"%s\"",
3536 /* Don't drop inherited columns */
3537 if (targetatt->attinhcount > 0 && !recursing)
3539 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
3540 errmsg("cannot drop inherited column \"%s\"",
3543 ReleaseSysCache(tuple);
3546 * Propagate to children as appropriate. Unlike most other ALTER
3547 * routines, we have to do this one level of recursion at a time; we can't
3548 * use find_all_inheritors to do it in one pass.
3550 children = find_inheritance_children(RelationGetRelid(rel));
3557 attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
3558 foreach(child, children)
3560 Oid childrelid = lfirst_oid(child);
3562 Form_pg_attribute childatt;
3564 childrel = heap_open(childrelid, AccessExclusiveLock);
3566 tuple = SearchSysCacheCopyAttName(childrelid, colName);
3567 if (!HeapTupleIsValid(tuple)) /* shouldn't happen */
3568 elog(ERROR, "cache lookup failed for attribute \"%s\" of relation %u",
3569 colName, childrelid);
3570 childatt = (Form_pg_attribute) GETSTRUCT(tuple);
3572 if (childatt->attinhcount <= 0) /* shouldn't happen */
3573 elog(ERROR, "relation %u has non-inherited attribute \"%s\"",
3574 childrelid, colName);
3579 * If the child column has other definition sources, just
3580 * decrement its inheritance count; if not, recurse to delete
3583 if (childatt->attinhcount == 1 && !childatt->attislocal)
3585 /* Time to delete this child column, too */
3586 ATExecDropColumn(childrel, colName, behavior, true, true);
3590 /* Child column must survive my deletion */
3591 childatt->attinhcount--;
3593 simple_heap_update(attr_rel, &tuple->t_self, tuple);
3595 /* keep the system catalog indexes current */
3596 CatalogUpdateIndexes(attr_rel, tuple);
3598 /* Make update visible */
3599 CommandCounterIncrement();
3605 * If we were told to drop ONLY in this table (no recursion),
3606 * we need to mark the inheritors' attribute as locally
3607 * defined rather than inherited.
3609 childatt->attinhcount--;
3610 childatt->attislocal = true;
3612 simple_heap_update(attr_rel, &tuple->t_self, tuple);
3614 /* keep the system catalog indexes current */
3615 CatalogUpdateIndexes(attr_rel, tuple);
3617 /* Make update visible */
3618 CommandCounterIncrement();
3621 heap_freetuple(tuple);
3623 heap_close(childrel, NoLock);
3625 heap_close(attr_rel, RowExclusiveLock);
3629 * Perform the actual column deletion
3631 object.classId = RelationRelationId;
3632 object.objectId = RelationGetRelid(rel);
3633 object.objectSubId = attnum;
3635 performDeletion(&object, behavior);
3638 * If we dropped the OID column, must adjust pg_class.relhasoids
3640 if (attnum == ObjectIdAttributeNumber)
3643 Form_pg_class tuple_class;
3645 class_rel = heap_open(RelationRelationId, RowExclusiveLock);
3647 tuple = SearchSysCacheCopy(RELOID,
3648 ObjectIdGetDatum(RelationGetRelid(rel)),
3650 if (!HeapTupleIsValid(tuple))
3651 elog(ERROR, "cache lookup failed for relation %u",
3652 RelationGetRelid(rel));
3653 tuple_class = (Form_pg_class) GETSTRUCT(tuple);
3655 tuple_class->relhasoids = false;
3656 simple_heap_update(class_rel, &tuple->t_self, tuple);
3658 /* Keep the catalog indexes up to date */
3659 CatalogUpdateIndexes(class_rel, tuple);
3661 heap_close(class_rel, RowExclusiveLock);
3666 * ALTER TABLE ADD INDEX
3668 * There is no such command in the grammar, but the parser converts UNIQUE
3669 * and PRIMARY KEY constraints into AT_AddIndex subcommands. This lets us
3670 * schedule creation of the index at the appropriate time during ALTER.
3673 ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
3674 IndexStmt *stmt, bool is_rebuild)
3680 Assert(IsA(stmt, IndexStmt));
3682 /* suppress schema rights check when rebuilding existing index */
3683 check_rights = !is_rebuild;
3684 /* skip index build if phase 3 will have to rewrite table anyway */
3685 skip_build = (tab->newvals != NIL);
3686 /* suppress notices when rebuilding existing index */
3689 DefineIndex(stmt->relation, /* relation */
3690 stmt->idxname, /* index name */
3691 InvalidOid, /* no predefined OID */
3692 stmt->accessMethod, /* am name */
3694 stmt->indexParams, /* parameters */
3695 (Expr *) stmt->whereClause,
3700 true, /* is_alter_table */
3707 * ALTER TABLE ADD CONSTRAINT
3710 ATExecAddConstraint(AlteredTableInfo *tab, Relation rel, Node *newConstraint)
3712 switch (nodeTag(newConstraint))
3716 Constraint *constr = (Constraint *) newConstraint;
3719 * Currently, we only expect to see CONSTR_CHECK nodes
3720 * arriving here (see the preprocessing done in
3721 * parser/analyze.c). Use a switch anyway to make it easier
3722 * to add more code later.
3724 switch (constr->contype)
3732 * Call AddRelationRawConstraints to do the work.
3733 * It returns a list of cooked constraints.
3735 newcons = AddRelationRawConstraints(rel, NIL,
3736 list_make1(constr));
3737 /* Add each constraint to Phase 3's queue */
3738 foreach(lcon, newcons)
3740 CookedConstraint *ccon = (CookedConstraint *) lfirst(lcon);
3741 NewConstraint *newcon;
3743 newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
3744 newcon->name = ccon->name;
3745 newcon->contype = ccon->contype;
3746 newcon->attnum = ccon->attnum;
3747 /* ExecQual wants implicit-AND format */
3748 newcon->qual = (Node *)
3749 make_ands_implicit((Expr *) ccon->expr);
3751 tab->constraints = lappend(tab->constraints,
3757 elog(ERROR, "unrecognized constraint type: %d",
3758 (int) constr->contype);
3762 case T_FkConstraint:
3764 FkConstraint *fkconstraint = (FkConstraint *) newConstraint;
3767 * Assign or validate constraint name
3769 if (fkconstraint->constr_name)
3771 if (ConstraintNameIsUsed(CONSTRAINT_RELATION,
3772 RelationGetRelid(rel),
3773 RelationGetNamespace(rel),
3774 fkconstraint->constr_name))
3776 (errcode(ERRCODE_DUPLICATE_OBJECT),
3777 errmsg("constraint \"%s\" for relation \"%s\" already exists",
3778 fkconstraint->constr_name,
3779 RelationGetRelationName(rel))));
3782 fkconstraint->constr_name =
3783 ChooseConstraintName(RelationGetRelationName(rel),
3784 strVal(linitial(fkconstraint->fk_attrs)),
3786 RelationGetNamespace(rel),
3789 ATAddForeignKeyConstraint(tab, rel, fkconstraint);
3794 elog(ERROR, "unrecognized node type: %d",
3795 (int) nodeTag(newConstraint));
3800 * Add a foreign-key constraint to a single table
3802 * Subroutine for ATExecAddConstraint. Must already hold exclusive
3803 * lock on the rel, and have done appropriate validity/permissions checks
3807 ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
3808 FkConstraint *fkconstraint)
3811 AclResult aclresult;
3812 int16 pkattnum[INDEX_MAX_KEYS];
3813 int16 fkattnum[INDEX_MAX_KEYS];
3814 Oid pktypoid[INDEX_MAX_KEYS];
3815 Oid fktypoid[INDEX_MAX_KEYS];
3816 Oid opclasses[INDEX_MAX_KEYS];
3824 * Grab an exclusive lock on the pk table, so that someone doesn't delete
3825 * rows out from under us. (Although a lesser lock would do for that
3826 * purpose, we'll need exclusive lock anyway to add triggers to the pk
3827 * table; trying to start with a lesser lock will just create a risk of
3830 pkrel = heap_openrv(fkconstraint->pktable, AccessExclusiveLock);
3833 * Validity and permissions checks
3835 * Note: REFERENCES permissions checks are redundant with CREATE TRIGGER,
3836 * but we may as well error out sooner instead of later.
3838 if (pkrel->rd_rel->relkind != RELKIND_RELATION)
3840 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
3841 errmsg("referenced relation \"%s\" is not a table",
3842 RelationGetRelationName(pkrel))));
3844 aclresult = pg_class_aclcheck(RelationGetRelid(pkrel), GetUserId(),
3846 if (aclresult != ACLCHECK_OK)
3847 aclcheck_error(aclresult, ACL_KIND_CLASS,
3848 RelationGetRelationName(pkrel));
3850 if (!allowSystemTableMods && IsSystemRelation(pkrel))
3852 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
3853 errmsg("permission denied: \"%s\" is a system catalog",
3854 RelationGetRelationName(pkrel))));
3856 aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
3858 if (aclresult != ACLCHECK_OK)
3859 aclcheck_error(aclresult, ACL_KIND_CLASS,
3860 RelationGetRelationName(rel));
3863 * Disallow reference from permanent table to temp table or vice versa.
3864 * (The ban on perm->temp is for fairly obvious reasons. The ban on
3865 * temp->perm is because other backends might need to run the RI triggers
3866 * on the perm table, but they can't reliably see tuples the owning
3867 * backend has created in the temp table, because non-shared buffers are
3868 * used for temp tables.)
3870 if (isTempNamespace(RelationGetNamespace(pkrel)))
3872 if (!isTempNamespace(RelationGetNamespace(rel)))
3874 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
3875 errmsg("cannot reference temporary table from permanent table constraint")));
3879 if (isTempNamespace(RelationGetNamespace(rel)))
3881 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
3882 errmsg("cannot reference permanent table from temporary table constraint")));
3886 * Look up the referencing attributes to make sure they exist, and record
3887 * their attnums and type OIDs.
3889 MemSet(pkattnum, 0, sizeof(pkattnum));
3890 MemSet(fkattnum, 0, sizeof(fkattnum));
3891 MemSet(pktypoid, 0, sizeof(pktypoid));
3892 MemSet(fktypoid, 0, sizeof(fktypoid));
3893 MemSet(opclasses, 0, sizeof(opclasses));
3895 numfks = transformColumnNameList(RelationGetRelid(rel),
3896 fkconstraint->fk_attrs,
3897 fkattnum, fktypoid);
3900 * If the attribute list for the referenced table was omitted, lookup the
3901 * definition of the primary key and use it. Otherwise, validate the
3902 * supplied attribute list. In either case, discover the index OID and
3903 * index opclasses, and the attnums and type OIDs of the attributes.
3905 if (fkconstraint->pk_attrs == NIL)
3907 numpks = transformFkeyGetPrimaryKey(pkrel, &indexOid,
3908 &fkconstraint->pk_attrs,
3914 numpks = transformColumnNameList(RelationGetRelid(pkrel),
3915 fkconstraint->pk_attrs,
3916 pkattnum, pktypoid);
3917 /* Look for an index matching the column list */
3918 indexOid = transformFkeyCheckAttrs(pkrel, numpks, pkattnum,
3922 /* Be sure referencing and referenced column types are comparable */
3923 if (numfks != numpks)
3925 (errcode(ERRCODE_INVALID_FOREIGN_KEY),
3926 errmsg("number of referencing and referenced columns for foreign key disagree")));
3928 for (i = 0; i < numpks; i++)
3931 * pktypoid[i] is the primary key table's i'th key's type fktypoid[i]
3932 * is the foreign key table's i'th key's type
3934 * Note that we look for an operator with the PK type on the left;
3935 * when the types are different this is critical because the PK index
3936 * will need operators with the indexkey on the left. (Ordinarily both
3937 * commutator operators will exist if either does, but we won't get
3938 * the right answer from the test below on opclass membership unless
3939 * we select the proper operator.)
3941 Operator o = oper(list_make1(makeString("=")),
3942 pktypoid[i], fktypoid[i], true);
3946 (errcode(ERRCODE_UNDEFINED_FUNCTION),
3947 errmsg("foreign key constraint \"%s\" "
3948 "cannot be implemented",
3949 fkconstraint->constr_name),
3950 errdetail("Key columns \"%s\" and \"%s\" "
3951 "are of incompatible types: %s and %s.",
3952 strVal(list_nth(fkconstraint->fk_attrs, i)),
3953 strVal(list_nth(fkconstraint->pk_attrs, i)),
3954 format_type_be(fktypoid[i]),
3955 format_type_be(pktypoid[i]))));
3958 * Check that the found operator is compatible with the PK index, and
3959 * generate a warning if not, since otherwise costly seqscans will be
3960 * incurred to check FK validity.
3962 if (!op_in_opclass(oprid(o), opclasses[i]))
3964 (errmsg("foreign key constraint \"%s\" "
3965 "will require costly sequential scans",
3966 fkconstraint->constr_name),
3967 errdetail("Key columns \"%s\" and \"%s\" "
3968 "are of different types: %s and %s.",
3969 strVal(list_nth(fkconstraint->fk_attrs, i)),
3970 strVal(list_nth(fkconstraint->pk_attrs, i)),
3971 format_type_be(fktypoid[i]),
3972 format_type_be(pktypoid[i]))));
3978 * Tell Phase 3 to check that the constraint is satisfied by existing rows
3979 * (we can skip this during table creation).
3981 if (!fkconstraint->skip_validation)
3983 NewConstraint *newcon;
3985 newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
3986 newcon->name = fkconstraint->constr_name;
3987 newcon->contype = CONSTR_FOREIGN;
3988 newcon->refrelid = RelationGetRelid(pkrel);
3989 newcon->qual = (Node *) fkconstraint;
3991 tab->constraints = lappend(tab->constraints, newcon);
3995 * Record the FK constraint in pg_constraint.
3997 constrOid = CreateConstraintEntry(fkconstraint->constr_name,
3998 RelationGetNamespace(rel),
4000 fkconstraint->deferrable,
4001 fkconstraint->initdeferred,
4002 RelationGetRelid(rel),
4005 InvalidOid, /* not a domain
4007 RelationGetRelid(pkrel),
4010 fkconstraint->fk_upd_action,
4011 fkconstraint->fk_del_action,
4012 fkconstraint->fk_matchtype,
4014 NULL, /* no check constraint */
4019 * Create the triggers that will enforce the constraint.
4021 createForeignKeyTriggers(rel, fkconstraint, constrOid);
4024 * Close pk table, but keep lock until we've committed.
4026 heap_close(pkrel, NoLock);
4031 * transformColumnNameList - transform list of column names
4033 * Lookup each name and return its attnum and type OID
4036 transformColumnNameList(Oid relId, List *colList,
4037 int16 *attnums, Oid *atttypids)
4045 char *attname = strVal(lfirst(l));
4048 atttuple = SearchSysCacheAttName(relId, attname);
4049 if (!HeapTupleIsValid(atttuple))
4051 (errcode(ERRCODE_UNDEFINED_COLUMN),
4052 errmsg("column \"%s\" referenced in foreign key constraint does not exist",
4054 if (attnum >= INDEX_MAX_KEYS)
4056 (errcode(ERRCODE_TOO_MANY_COLUMNS),
4057 errmsg("cannot have more than %d keys in a foreign key",
4059 attnums[attnum] = ((Form_pg_attribute) GETSTRUCT(atttuple))->attnum;
4060 atttypids[attnum] = ((Form_pg_attribute) GETSTRUCT(atttuple))->atttypid;
4061 ReleaseSysCache(atttuple);
4069 * transformFkeyGetPrimaryKey -
4071 * Look up the names, attnums, and types of the primary key attributes
4072 * for the pkrel. Also return the index OID and index opclasses of the
4073 * index supporting the primary key.
4075 * All parameters except pkrel are output parameters. Also, the function
4076 * return value is the number of attributes in the primary key.
4078 * Used when the column list in the REFERENCES specification is omitted.
4081 transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
4083 int16 *attnums, Oid *atttypids,
4087 ListCell *indexoidscan;
4088 HeapTuple indexTuple = NULL;
4089 Form_pg_index indexStruct = NULL;
4090 Datum indclassDatum;
4092 oidvector *indclass;
4096 * Get the list of index OIDs for the table from the relcache, and look up
4097 * each one in the pg_index syscache until we find one marked primary key
4098 * (hopefully there isn't more than one such).
4100 *indexOid = InvalidOid;
4102 indexoidlist = RelationGetIndexList(pkrel);
4104 foreach(indexoidscan, indexoidlist)
4106 Oid indexoid = lfirst_oid(indexoidscan);
4108 indexTuple = SearchSysCache(INDEXRELID,
4109 ObjectIdGetDatum(indexoid),
4111 if (!HeapTupleIsValid(indexTuple))
4112 elog(ERROR, "cache lookup failed for index %u", indexoid);
4113 indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
4114 if (indexStruct->indisprimary)
4116 *indexOid = indexoid;
4119 ReleaseSysCache(indexTuple);
4122 list_free(indexoidlist);
4125 * Check that we found it
4127 if (!OidIsValid(*indexOid))
4129 (errcode(ERRCODE_UNDEFINED_OBJECT),
4130 errmsg("there is no primary key for referenced table \"%s\"",
4131 RelationGetRelationName(pkrel))));
4133 /* Must get indclass the hard way */
4134 indclassDatum = SysCacheGetAttr(INDEXRELID, indexTuple,
4135 Anum_pg_index_indclass, &isnull);
4137 indclass = (oidvector *) DatumGetPointer(indclassDatum);
4140 * Now build the list of PK attributes from the indkey definition (we
4141 * assume a primary key cannot have expressional elements)
4144 for (i = 0; i < indexStruct->indnatts; i++)
4146 int pkattno = indexStruct->indkey.values[i];
4148 attnums[i] = pkattno;
4149 atttypids[i] = attnumTypeId(pkrel, pkattno);
4150 opclasses[i] = indclass->values[i];
4151 *attnamelist = lappend(*attnamelist,
4152 makeString(pstrdup(NameStr(*attnumAttName(pkrel, pkattno)))));
4155 ReleaseSysCache(indexTuple);
4161 * transformFkeyCheckAttrs -
4163 * Make sure that the attributes of a referenced table belong to a unique
4164 * (or primary key) constraint. Return the OID of the index supporting
4165 * the constraint, as well as the opclasses associated with the index
4169 transformFkeyCheckAttrs(Relation pkrel,
4170 int numattrs, int16 *attnums,
4171 Oid *opclasses) /* output parameter */
4173 Oid indexoid = InvalidOid;
4176 ListCell *indexoidscan;
4179 * Get the list of index OIDs for the table from the relcache, and look up
4180 * each one in the pg_index syscache, and match unique indexes to the list
4181 * of attnums we are given.
4183 indexoidlist = RelationGetIndexList(pkrel);
4185 foreach(indexoidscan, indexoidlist)
4187 HeapTuple indexTuple;
4188 Form_pg_index indexStruct;
4192 indexoid = lfirst_oid(indexoidscan);
4193 indexTuple = SearchSysCache(INDEXRELID,
4194 ObjectIdGetDatum(indexoid),
4196 if (!HeapTupleIsValid(indexTuple))
4197 elog(ERROR, "cache lookup failed for index %u", indexoid);
4198 indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
4201 * Must have the right number of columns; must be unique and not a
4202 * partial index; forget it if there are any expressions, too
4204 if (indexStruct->indnatts == numattrs &&
4205 indexStruct->indisunique &&
4206 heap_attisnull(indexTuple, Anum_pg_index_indpred) &&
4207 heap_attisnull(indexTuple, Anum_pg_index_indexprs))
4209 /* Must get indclass the hard way */
4210 Datum indclassDatum;
4212 oidvector *indclass;
4214 indclassDatum = SysCacheGetAttr(INDEXRELID, indexTuple,
4215 Anum_pg_index_indclass, &isnull);
4217 indclass = (oidvector *) DatumGetPointer(indclassDatum);
4220 * The given attnum list may match the index columns in any order.
4221 * Check that each list is a subset of the other.
4223 for (i = 0; i < numattrs; i++)
4226 for (j = 0; j < numattrs; j++)
4228 if (attnums[i] == indexStruct->indkey.values[j])
4239 for (i = 0; i < numattrs; i++)
4242 for (j = 0; j < numattrs; j++)
4244 if (attnums[j] == indexStruct->indkey.values[i])
4246 opclasses[j] = indclass->values[i];
4256 ReleaseSysCache(indexTuple);
4263 (errcode(ERRCODE_INVALID_FOREIGN_KEY),
4264 errmsg("there is no unique constraint matching given keys for referenced table \"%s\"",
4265 RelationGetRelationName(pkrel))));
4267 list_free(indexoidlist);
4273 * Scan the existing rows in a table to verify they meet a proposed FK
4276 * Caller must have opened and locked both relations.
4279 validateForeignKeyConstraint(FkConstraint *fkconstraint,
4290 * See if we can do it with a single LEFT JOIN query. A FALSE result
4291 * indicates we must proceed with the fire-the-trigger method.
4293 if (RI_Initial_Check(fkconstraint, rel, pkrel))
4297 * Scan through each tuple, calling RI_FKey_check_ins (insert trigger) as
4298 * if that tuple had just been inserted. If any of those fail, it should
4299 * ereport(ERROR) and that's that.
4301 MemSet(&trig, 0, sizeof(trig));
4302 trig.tgoid = InvalidOid;
4303 trig.tgname = fkconstraint->constr_name;
4304 trig.tgenabled = TRUE;
4305 trig.tgisconstraint = TRUE;
4306 trig.tgconstrrelid = RelationGetRelid(pkrel);
4307 trig.tgdeferrable = FALSE;
4308 trig.tginitdeferred = FALSE;
4310 trig.tgargs = (char **) palloc(sizeof(char *) *
4311 (4 + list_length(fkconstraint->fk_attrs)
4312 + list_length(fkconstraint->pk_attrs)));
4314 trig.tgargs[0] = trig.tgname;
4315 trig.tgargs[1] = RelationGetRelationName(rel);
4316 trig.tgargs[2] = RelationGetRelationName(pkrel);
4317 trig.tgargs[3] = fkMatchTypeToString(fkconstraint->fk_matchtype);
4319 foreach(list, fkconstraint->fk_attrs)
4321 char *fk_at = strVal(lfirst(list));
4323 trig.tgargs[count] = fk_at;
4327 foreach(list, fkconstraint->pk_attrs)
4329 char *pk_at = strVal(lfirst(list));
4331 trig.tgargs[count] = pk_at;
4334 trig.tgnargs = count - 1;
4336 scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
4338 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
4340 FunctionCallInfoData fcinfo;
4341 TriggerData trigdata;
4344 * Make a call to the trigger function
4346 * No parameters are passed, but we do set a context
4348 MemSet(&fcinfo, 0, sizeof(fcinfo));
4351 * We assume RI_FKey_check_ins won't look at flinfo...
4353 trigdata.type = T_TriggerData;
4354 trigdata.tg_event = TRIGGER_EVENT_INSERT | TRIGGER_EVENT_ROW;
4355 trigdata.tg_relation = rel;
4356 trigdata.tg_trigtuple = tuple;
4357 trigdata.tg_newtuple = NULL;
4358 trigdata.tg_trigger = &trig;
4359 trigdata.tg_trigtuplebuf = scan->rs_cbuf;
4360 trigdata.tg_newtuplebuf = InvalidBuffer;
4362 fcinfo.context = (Node *) &trigdata;
4364 RI_FKey_check_ins(&fcinfo);
4373 CreateFKCheckTrigger(RangeVar *myRel, FkConstraint *fkconstraint,
4374 ObjectAddress *constrobj, ObjectAddress *trigobj,
4377 CreateTrigStmt *fk_trigger;
4381 fk_trigger = makeNode(CreateTrigStmt);
4382 fk_trigger->trigname = fkconstraint->constr_name;
4383 fk_trigger->relation = myRel;
4384 fk_trigger->before = false;
4385 fk_trigger->row = true;
4387 /* Either ON INSERT or ON UPDATE */
4390 fk_trigger->funcname = SystemFuncName("RI_FKey_check_ins");
4391 fk_trigger->actions[0] = 'i';
4395 fk_trigger->funcname = SystemFuncName("RI_FKey_check_upd");
4396 fk_trigger->actions[0] = 'u';
4398 fk_trigger->actions[1] = '\0';
4400 fk_trigger->isconstraint = true;
4401 fk_trigger->deferrable = fkconstraint->deferrable;
4402 fk_trigger->initdeferred = fkconstraint->initdeferred;
4403 fk_trigger->constrrel = fkconstraint->pktable;
4405 fk_trigger->args = NIL;
4406 fk_trigger->args = lappend(fk_trigger->args,
4407 makeString(fkconstraint->constr_name));
4408 fk_trigger->args = lappend(fk_trigger->args,
4409 makeString(myRel->relname));
4410 fk_trigger->args = lappend(fk_trigger->args,
4411 makeString(fkconstraint->pktable->relname));
4412 fk_trigger->args = lappend(fk_trigger->args,
4413 makeString(fkMatchTypeToString(fkconstraint->fk_matchtype)));
4414 if (list_length(fkconstraint->fk_attrs) != list_length(fkconstraint->pk_attrs))
4416 (errcode(ERRCODE_INVALID_FOREIGN_KEY),
4417 errmsg("number of referencing and referenced columns for foreign key disagree")));
4419 forboth(fk_attr, fkconstraint->fk_attrs,
4420 pk_attr, fkconstraint->pk_attrs)
4422 fk_trigger->args = lappend(fk_trigger->args, lfirst(fk_attr));
4423 fk_trigger->args = lappend(fk_trigger->args, lfirst(pk_attr));
4426 trigobj->objectId = CreateTrigger(fk_trigger, true);
4428 /* Register dependency from trigger to constraint */
4429 recordDependencyOn(trigobj, constrobj, DEPENDENCY_INTERNAL);
4431 /* Make changes-so-far visible */
4432 CommandCounterIncrement();
4436 * Create the triggers that implement an FK constraint.
4439 createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint,
4443 CreateTrigStmt *fk_trigger;
4446 ObjectAddress trigobj,
4450 * Reconstruct a RangeVar for my relation (not passed in, unfortunately).
4452 myRel = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)),
4453 pstrdup(RelationGetRelationName(rel)));
4456 * Preset objectAddress fields
4458 constrobj.classId = ConstraintRelationId;
4459 constrobj.objectId = constrOid;
4460 constrobj.objectSubId = 0;
4461 trigobj.classId = TriggerRelationId;
4462 trigobj.objectSubId = 0;
4464 /* Make changes-so-far visible */
4465 CommandCounterIncrement();
4468 * Build and execute a CREATE CONSTRAINT TRIGGER statement for the CHECK
4469 * action for both INSERTs and UPDATEs on the referencing table.
4471 CreateFKCheckTrigger(myRel, fkconstraint, &constrobj, &trigobj, true);
4472 CreateFKCheckTrigger(myRel, fkconstraint, &constrobj, &trigobj, false);
4475 * Build and execute a CREATE CONSTRAINT TRIGGER statement for the ON
4476 * DELETE action on the referenced table.
4478 fk_trigger = makeNode(CreateTrigStmt);
4479 fk_trigger->trigname = fkconstraint->constr_name;
4480 fk_trigger->relation = fkconstraint->pktable;
4481 fk_trigger->before = false;
4482 fk_trigger->row = true;
4483 fk_trigger->actions[0] = 'd';
4484 fk_trigger->actions[1] = '\0';
4486 fk_trigger->isconstraint = true;
4487 fk_trigger->constrrel = myRel;
4488 switch (fkconstraint->fk_del_action)
4490 case FKCONSTR_ACTION_NOACTION:
4491 fk_trigger->deferrable = fkconstraint->deferrable;
4492 fk_trigger->initdeferred = fkconstraint->initdeferred;
4493 fk_trigger->funcname = SystemFuncName("RI_FKey_noaction_del");
4495 case FKCONSTR_ACTION_RESTRICT:
4496 fk_trigger->deferrable = false;
4497 fk_trigger->initdeferred = false;
4498 fk_trigger->funcname = SystemFuncName("RI_FKey_restrict_del");
4500 case FKCONSTR_ACTION_CASCADE:
4501 fk_trigger->deferrable = false;
4502 fk_trigger->initdeferred = false;
4503 fk_trigger->funcname = SystemFuncName("RI_FKey_cascade_del");
4505 case FKCONSTR_ACTION_SETNULL:
4506 fk_trigger->deferrable = false;
4507 fk_trigger->initdeferred = false;
4508 fk_trigger->funcname = SystemFuncName("RI_FKey_setnull_del");
4510 case FKCONSTR_ACTION_SETDEFAULT:
4511 fk_trigger->deferrable = false;
4512 fk_trigger->initdeferred = false;
4513 fk_trigger->funcname = SystemFuncName("RI_FKey_setdefault_del");
4516 elog(ERROR, "unrecognized FK action type: %d",
4517 (int) fkconstraint->fk_del_action);
4521 fk_trigger->args = NIL;
4522 fk_trigger->args = lappend(fk_trigger->args,
4523 makeString(fkconstraint->constr_name));
4524 fk_trigger->args = lappend(fk_trigger->args,
4525 makeString(myRel->relname));
4526 fk_trigger->args = lappend(fk_trigger->args,
4527 makeString(fkconstraint->pktable->relname));
4528 fk_trigger->args = lappend(fk_trigger->args,
4529 makeString(fkMatchTypeToString(fkconstraint->fk_matchtype)));
4530 forboth(fk_attr, fkconstraint->fk_attrs,
4531 pk_attr, fkconstraint->pk_attrs)
4533 fk_trigger->args = lappend(fk_trigger->args, lfirst(fk_attr));
4534 fk_trigger->args = lappend(fk_trigger->args, lfirst(pk_attr));
4537 trigobj.objectId = CreateTrigger(fk_trigger, true);
4539 /* Register dependency from trigger to constraint */
4540 recordDependencyOn(&trigobj, &constrobj, DEPENDENCY_INTERNAL);
4542 /* Make changes-so-far visible */
4543 CommandCounterIncrement();
4546 * Build and execute a CREATE CONSTRAINT TRIGGER statement for the ON
4547 * UPDATE action on the referenced table.
4549 fk_trigger = makeNode(CreateTrigStmt);
4550 fk_trigger->trigname = fkconstraint->constr_name;
4551 fk_trigger->relation = fkconstraint->pktable;
4552 fk_trigger->before = false;
4553 fk_trigger->row = true;
4554 fk_trigger->actions[0] = 'u';
4555 fk_trigger->actions[1] = '\0';
4556 fk_trigger->isconstraint = true;
4557 fk_trigger->constrrel = myRel;
4558 switch (fkconstraint->fk_upd_action)
4560 case FKCONSTR_ACTION_NOACTION:
4561 fk_trigger->deferrable = fkconstraint->deferrable;
4562 fk_trigger->initdeferred = fkconstraint->initdeferred;
4563 fk_trigger->funcname = SystemFuncName("RI_FKey_noaction_upd");
4565 case FKCONSTR_ACTION_RESTRICT:
4566 fk_trigger->deferrable = false;
4567 fk_trigger->initdeferred = false;
4568 fk_trigger->funcname = SystemFuncName("RI_FKey_restrict_upd");
4570 case FKCONSTR_ACTION_CASCADE:
4571 fk_trigger->deferrable = false;
4572 fk_trigger->initdeferred = false;
4573 fk_trigger->funcname = SystemFuncName("RI_FKey_cascade_upd");
4575 case FKCONSTR_ACTION_SETNULL:
4576 fk_trigger->deferrable = false;
4577 fk_trigger->initdeferred = false;
4578 fk_trigger->funcname = SystemFuncName("RI_FKey_setnull_upd");
4580 case FKCONSTR_ACTION_SETDEFAULT:
4581 fk_trigger->deferrable = false;
4582 fk_trigger->initdeferred = false;
4583 fk_trigger->funcname = SystemFuncName("RI_FKey_setdefault_upd");
4586 elog(ERROR, "unrecognized FK action type: %d",
4587 (int) fkconstraint->fk_upd_action);
4591 fk_trigger->args = NIL;
4592 fk_trigger->args = lappend(fk_trigger->args,
4593 makeString(fkconstraint->constr_name));
4594 fk_trigger->args = lappend(fk_trigger->args,
4595 makeString(myRel->relname));
4596 fk_trigger->args = lappend(fk_trigger->args,
4597 makeString(fkconstraint->pktable->relname));
4598 fk_trigger->args = lappend(fk_trigger->args,
4599 makeString(fkMatchTypeToString(fkconstraint->fk_matchtype)));
4600 forboth(fk_attr, fkconstraint->fk_attrs,
4601 pk_attr, fkconstraint->pk_attrs)
4603 fk_trigger->args = lappend(fk_trigger->args, lfirst(fk_attr));
4604 fk_trigger->args = lappend(fk_trigger->args, lfirst(pk_attr));
4607 trigobj.objectId = CreateTrigger(fk_trigger, true);
4609 /* Register dependency from trigger to constraint */
4610 recordDependencyOn(&trigobj, &constrobj, DEPENDENCY_INTERNAL);
4614 * fkMatchTypeToString -
4615 * convert FKCONSTR_MATCH_xxx code to string to use in trigger args
4618 fkMatchTypeToString(char match_type)
4622 case FKCONSTR_MATCH_FULL:
4623 return pstrdup("FULL");
4624 case FKCONSTR_MATCH_PARTIAL:
4625 return pstrdup("PARTIAL");
4626 case FKCONSTR_MATCH_UNSPECIFIED:
4627 return pstrdup("UNSPECIFIED");
4629 elog(ERROR, "unrecognized match type: %d",
4632 return NULL; /* can't get here */
4636 * ALTER TABLE DROP CONSTRAINT
4639 ATPrepDropConstraint(List **wqueue, Relation rel,
4640 bool recurse, AlterTableCmd *cmd)
4643 * We don't want errors or noise from child tables, so we have to pass
4644 * down a modified command.
4648 AlterTableCmd *childCmd = copyObject(cmd);
4650 childCmd->subtype = AT_DropConstraintQuietly;
4651 ATSimpleRecursion(wqueue, rel, childCmd, recurse);
4656 ATExecDropConstraint(Relation rel, const char *constrName,
4657 DropBehavior behavior, bool quiet)
4661 deleted = RemoveRelConstraints(rel, constrName, behavior);
4665 /* If zero constraints deleted, complain */
4668 (errcode(ERRCODE_UNDEFINED_OBJECT),
4669 errmsg("constraint \"%s\" does not exist",
4671 /* Otherwise if more than one constraint deleted, notify */
4672 else if (deleted > 1)
4674 (errmsg("multiple constraints named \"%s\" were dropped",
4683 ATPrepAlterColumnType(List **wqueue,
4684 AlteredTableInfo *tab, Relation rel,
4685 bool recurse, bool recursing,
4688 char *colName = cmd->name;
4689 TypeName *typename = (TypeName *) cmd->def;
4691 Form_pg_attribute attTup;
4695 NewColumnValue *newval;
4696 ParseState *pstate = make_parsestate(NULL);
4698 /* lookup the attribute so we can check inheritance status */
4699 tuple = SearchSysCacheAttName(RelationGetRelid(rel), colName);
4700 if (!HeapTupleIsValid(tuple))
4702 (errcode(ERRCODE_UNDEFINED_COLUMN),
4703 errmsg("column \"%s\" of relation \"%s\" does not exist",
4704 colName, RelationGetRelationName(rel))));
4705 attTup = (Form_pg_attribute) GETSTRUCT(tuple);
4706 attnum = attTup->attnum;
4708 /* Can't alter a system attribute */
4711 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
4712 errmsg("cannot alter system column \"%s\"",
4715 /* Don't alter inherited columns */
4716 if (attTup->attinhcount > 0 && !recursing)
4718 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
4719 errmsg("cannot alter inherited column \"%s\"",
4722 /* Look up the target type */
4723 targettype = LookupTypeName(typename);
4724 if (!OidIsValid(targettype))
4726 (errcode(ERRCODE_UNDEFINED_OBJECT),
4727 errmsg("type \"%s\" does not exist",
4728 TypeNameToString(typename))));
4730 /* make sure datatype is legal for a column */
4731 CheckAttributeType(colName, targettype);
4734 * Set up an expression to transform the old data value to the new type.
4735 * If a USING option was given, transform and use that expression, else
4736 * just take the old value and try to coerce it. We do this first so that
4737 * type incompatibility can be detected before we waste effort, and
4738 * because we need the expression to be parsed against the original table
4745 /* Expression must be able to access vars of old table */
4746 rte = addRangeTableEntryForRelation(pstate,
4751 addRTEtoQuery(pstate, rte, false, true, true);
4753 transform = transformExpr(pstate, cmd->transform);
4755 /* It can't return a set */
4756 if (expression_returns_set(transform))
4758 (errcode(ERRCODE_DATATYPE_MISMATCH),
4759 errmsg("transform expression must not return a set")));
4761 /* No subplans or aggregates, either... */
4762 if (pstate->p_hasSubLinks)
4764 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
4765 errmsg("cannot use subquery in transform expression")));
4766 if (pstate->p_hasAggs)
4768 (errcode(ERRCODE_GROUPING_ERROR),
4769 errmsg("cannot use aggregate function in transform expression")));
4773 transform = (Node *) makeVar(1, attnum,
4774 attTup->atttypid, attTup->atttypmod,
4778 transform = coerce_to_target_type(pstate,
4779 transform, exprType(transform),
4780 targettype, typename->typmod,
4781 COERCION_ASSIGNMENT,
4782 COERCE_IMPLICIT_CAST);
4783 if (transform == NULL)
4785 (errcode(ERRCODE_DATATYPE_MISMATCH),
4786 errmsg("column \"%s\" cannot be cast to type \"%s\"",
4787 colName, TypeNameToString(typename))));
4790 * Add a work queue item to make ATRewriteTable update the column
4793 newval = (NewColumnValue *) palloc0(sizeof(NewColumnValue));
4794 newval->attnum = attnum;
4795 newval->expr = (Expr *) transform;
4797 tab->newvals = lappend(tab->newvals, newval);
4799 ReleaseSysCache(tuple);
4802 * The recursion case is handled by ATSimpleRecursion. However, if we are
4803 * told not to recurse, there had better not be any child tables; else the
4804 * alter would put them out of step.
4807 ATSimpleRecursion(wqueue, rel, cmd, recurse);
4808 else if (!recursing &&
4809 find_inheritance_children(RelationGetRelid(rel)) != NIL)
4811 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
4812 errmsg("type of inherited column \"%s\" must be changed in child tables too",
4817 ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
4818 const char *colName, TypeName *typename)
4821 Form_pg_attribute attTup;
4823 HeapTuple typeTuple;
4827 Relation attrelation;
4833 attrelation = heap_open(AttributeRelationId, RowExclusiveLock);
4835 /* Look up the target column */
4836 heapTup = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
4837 if (!HeapTupleIsValid(heapTup)) /* shouldn't happen */
4839 (errcode(ERRCODE_UNDEFINED_COLUMN),
4840 errmsg("column \"%s\" of relation \"%s\" does not exist",
4841 colName, RelationGetRelationName(rel))));
4842 attTup = (Form_pg_attribute) GETSTRUCT(heapTup);
4843 attnum = attTup->attnum;
4845 /* Check for multiple ALTER TYPE on same column --- can't cope */
4846 if (attTup->atttypid != tab->oldDesc->attrs[attnum - 1]->atttypid ||
4847 attTup->atttypmod != tab->oldDesc->attrs[attnum - 1]->atttypmod)
4849 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
4850 errmsg("cannot alter type of column \"%s\" twice",
4853 /* Look up the target type (should not fail, since prep found it) */
4854 typeTuple = typenameType(typename);
4855 tform = (Form_pg_type) GETSTRUCT(typeTuple);
4856 targettype = HeapTupleGetOid(typeTuple);
4859 * If there is a default expression for the column, get it and ensure we
4860 * can coerce it to the new datatype. (We must do this before changing
4861 * the column type, because build_column_default itself will try to
4862 * coerce, and will not issue the error message we want if it fails.)
4864 * We remove any implicit coercion steps at the top level of the old
4865 * default expression; this has been agreed to satisfy the principle of
4866 * least surprise. (The conversion to the new column type should act like
4867 * it started from what the user sees as the stored expression, and the
4868 * implicit coercions aren't going to be shown.)
4870 if (attTup->atthasdef)
4872 defaultexpr = build_column_default(rel, attnum);
4873 Assert(defaultexpr);
4874 defaultexpr = strip_implicit_coercions(defaultexpr);
4875 defaultexpr = coerce_to_target_type(NULL, /* no UNKNOWN params */
4876 defaultexpr, exprType(defaultexpr),
4877 targettype, typename->typmod,
4878 COERCION_ASSIGNMENT,
4879 COERCE_IMPLICIT_CAST);
4880 if (defaultexpr == NULL)
4882 (errcode(ERRCODE_DATATYPE_MISMATCH),
4883 errmsg("default for column \"%s\" cannot be cast to type \"%s\"",
4884 colName, TypeNameToString(typename))));
4890 * Find everything that depends on the column (constraints, indexes, etc),
4891 * and record enough information to let us recreate the objects.
4893 * The actual recreation does not happen here, but only after we have
4894 * performed all the individual ALTER TYPE operations. We have to save
4895 * the info before executing ALTER TYPE, though, else the deparser will
4898 * There could be multiple entries for the same object, so we must check
4899 * to ensure we process each one only once. Note: we assume that an index
4900 * that implements a constraint will not show a direct dependency on the
4903 depRel = heap_open(DependRelationId, RowExclusiveLock);
4905 ScanKeyInit(&key[0],
4906 Anum_pg_depend_refclassid,
4907 BTEqualStrategyNumber, F_OIDEQ,
4908 ObjectIdGetDatum(RelationRelationId));
4909 ScanKeyInit(&key[1],
4910 Anum_pg_depend_refobjid,
4911 BTEqualStrategyNumber, F_OIDEQ,
4912 ObjectIdGetDatum(RelationGetRelid(rel)));
4913 ScanKeyInit(&key[2],
4914 Anum_pg_depend_refobjsubid,
4915 BTEqualStrategyNumber, F_INT4EQ,
4916 Int32GetDatum((int32) attnum));
4918 scan = systable_beginscan(depRel, DependReferenceIndexId, true,
4919 SnapshotNow, 3, key);
4921 while (HeapTupleIsValid(depTup = systable_getnext(scan)))
4923 Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(depTup);
4924 ObjectAddress foundObject;
4926 /* We don't expect any PIN dependencies on columns */
4927 if (foundDep->deptype == DEPENDENCY_PIN)
4928 elog(ERROR, "cannot alter type of a pinned column");
4930 foundObject.classId = foundDep->classid;
4931 foundObject.objectId = foundDep->objid;
4932 foundObject.objectSubId = foundDep->objsubid;
4934 switch (getObjectClass(&foundObject))
4938 char relKind = get_rel_relkind(foundObject.objectId);
4940 if (relKind == RELKIND_INDEX)
4942 Assert(foundObject.objectSubId == 0);
4943 if (!list_member_oid(tab->changedIndexOids, foundObject.objectId))
4945 tab->changedIndexOids = lappend_oid(tab->changedIndexOids,
4946 foundObject.objectId);
4947 tab->changedIndexDefs = lappend(tab->changedIndexDefs,
4948 pg_get_indexdef_string(foundObject.objectId));
4951 else if (relKind == RELKIND_SEQUENCE)
4954 * This must be a SERIAL column's sequence. We need
4955 * not do anything to it.
4957 Assert(foundObject.objectSubId == 0);
4961 /* Not expecting any other direct dependencies... */
4962 elog(ERROR, "unexpected object depending on column: %s",
4963 getObjectDescription(&foundObject));
4968 case OCLASS_CONSTRAINT:
4969 Assert(foundObject.objectSubId == 0);
4970 if (!list_member_oid(tab->changedConstraintOids, foundObject.objectId))
4972 tab->changedConstraintOids = lappend_oid(tab->changedConstraintOids,
4973 foundObject.objectId);
4974 tab->changedConstraintDefs = lappend(tab->changedConstraintDefs,
4975 pg_get_constraintdef_string(foundObject.objectId));
4979 case OCLASS_REWRITE:
4980 /* XXX someday see if we can cope with revising views */
4982 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
4983 errmsg("cannot alter type of a column used by a view or rule"),
4984 errdetail("%s depends on column \"%s\"",
4985 getObjectDescription(&foundObject),
4989 case OCLASS_DEFAULT:
4992 * Ignore the column's default expression, since we will fix
4995 Assert(defaultexpr);
5001 case OCLASS_CONVERSION:
5002 case OCLASS_LANGUAGE:
5003 case OCLASS_OPERATOR:
5004 case OCLASS_OPCLASS:
5005 case OCLASS_TRIGGER:
5009 * We don't expect any of these sorts of objects to depend on
5012 elog(ERROR, "unexpected object depending on column: %s",
5013 getObjectDescription(&foundObject));
5017 elog(ERROR, "unrecognized object class: %u",
5018 foundObject.classId);
5022 systable_endscan(scan);
5025 * Now scan for dependencies of this column on other things. The only
5026 * thing we should find is the dependency on the column datatype, which we
5029 ScanKeyInit(&key[0],
5030 Anum_pg_depend_classid,
5031 BTEqualStrategyNumber, F_OIDEQ,
5032 ObjectIdGetDatum(RelationRelationId));
5033 ScanKeyInit(&key[1],
5034 Anum_pg_depend_objid,
5035 BTEqualStrategyNumber, F_OIDEQ,
5036 ObjectIdGetDatum(RelationGetRelid(rel)));
5037 ScanKeyInit(&key[2],
5038 Anum_pg_depend_objsubid,
5039 BTEqualStrategyNumber, F_INT4EQ,
5040 Int32GetDatum((int32) attnum));
5042 scan = systable_beginscan(depRel, DependDependerIndexId, true,
5043 SnapshotNow, 3, key);
5045 while (HeapTupleIsValid(depTup = systable_getnext(scan)))
5047 Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(depTup);
5049 if (foundDep->deptype != DEPENDENCY_NORMAL)
5050 elog(ERROR, "found unexpected dependency type '%c'",
5052 if (foundDep->refclassid != TypeRelationId ||
5053 foundDep->refobjid != attTup->atttypid)
5054 elog(ERROR, "found unexpected dependency for column");
5056 simple_heap_delete(depRel, &depTup->t_self);
5059 systable_endscan(scan);
5061 heap_close(depRel, RowExclusiveLock);
5064 * Here we go --- change the recorded column type. (Note heapTup is a
5065 * copy of the syscache entry, so okay to scribble on.)
5067 attTup->atttypid = targettype;
5068 attTup->atttypmod = typename->typmod;
5069 attTup->attndims = list_length(typename->arrayBounds);
5070 attTup->attlen = tform->typlen;
5071 attTup->attbyval = tform->typbyval;
5072 attTup->attalign = tform->typalign;
5073 attTup->attstorage = tform->typstorage;
5075 ReleaseSysCache(typeTuple);
5077 simple_heap_update(attrelation, &heapTup->t_self, heapTup);
5079 /* keep system catalog indexes current */
5080 CatalogUpdateIndexes(attrelation, heapTup);
5082 heap_close(attrelation, RowExclusiveLock);
5084 /* Install dependency on new datatype */
5085 add_column_datatype_dependency(RelationGetRelid(rel), attnum, targettype);
5088 * Drop any pg_statistic entry for the column, since it's now wrong type
5090 RemoveStatistics(RelationGetRelid(rel), attnum);
5093 * Update the default, if present, by brute force --- remove and re-add
5094 * the default. Probably unsafe to take shortcuts, since the new version
5095 * may well have additional dependencies. (It's okay to do this now,
5096 * rather than after other ALTER TYPE commands, since the default won't
5097 * depend on other column types.)
5101 /* Must make new row visible since it will be updated again */
5102 CommandCounterIncrement();
5105 * We use RESTRICT here for safety, but at present we do not expect
5106 * anything to depend on the default.
5108 RemoveAttrDefault(RelationGetRelid(rel), attnum, DROP_RESTRICT, true);
5110 StoreAttrDefault(rel, attnum, nodeToString(defaultexpr));
5114 heap_freetuple(heapTup);
5118 * Cleanup after we've finished all the ALTER TYPE operations for a
5119 * particular relation. We have to drop and recreate all the indexes
5120 * and constraints that depend on the altered columns.
5123 ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab)
5129 * Re-parse the index and constraint definitions, and attach them to the
5130 * appropriate work queue entries. We do this before dropping because in
5131 * the case of a FOREIGN KEY constraint, we might not yet have exclusive
5132 * lock on the table the constraint is attached to, and we need to get
5133 * that before dropping. It's safe because the parser won't actually look
5134 * at the catalogs to detect the existing entry.
5136 foreach(l, tab->changedIndexDefs)
5137 ATPostAlterTypeParse((char *) lfirst(l), wqueue);
5138 foreach(l, tab->changedConstraintDefs)
5139 ATPostAlterTypeParse((char *) lfirst(l), wqueue);
5142 * Now we can drop the existing constraints and indexes --- constraints
5143 * first, since some of them might depend on the indexes. It should be
5144 * okay to use DROP_RESTRICT here, since nothing else should be depending
5147 foreach(l, tab->changedConstraintOids)
5149 obj.classId = ConstraintRelationId;
5150 obj.objectId = lfirst_oid(l);
5151 obj.objectSubId = 0;
5152 performDeletion(&obj, DROP_RESTRICT);
5155 foreach(l, tab->changedIndexOids)
5157 obj.classId = RelationRelationId;
5158 obj.objectId = lfirst_oid(l);
5159 obj.objectSubId = 0;
5160 performDeletion(&obj, DROP_RESTRICT);
5164 * The objects will get recreated during subsequent passes over the work
5170 ATPostAlterTypeParse(char *cmd, List **wqueue)
5172 List *raw_parsetree_list;
5173 List *querytree_list;
5174 ListCell *list_item;
5177 * We expect that we only have to do raw parsing and parse analysis, not
5178 * any rule rewriting, since these will all be utility statements.
5180 raw_parsetree_list = raw_parser(cmd);
5181 querytree_list = NIL;
5182 foreach(list_item, raw_parsetree_list)
5184 Node *parsetree = (Node *) lfirst(list_item);
5186 querytree_list = list_concat(querytree_list,
5187 parse_analyze(parsetree, NULL, 0));
5191 * Attach each generated command to the proper place in the work queue.
5192 * Note this could result in creation of entirely new work-queue entries.
5194 foreach(list_item, querytree_list)
5196 Query *query = (Query *) lfirst(list_item);
5198 AlteredTableInfo *tab;
5200 Assert(IsA(query, Query));
5201 Assert(query->commandType == CMD_UTILITY);
5202 switch (nodeTag(query->utilityStmt))
5206 IndexStmt *stmt = (IndexStmt *) query->utilityStmt;
5207 AlterTableCmd *newcmd;
5209 rel = relation_openrv(stmt->relation, AccessExclusiveLock);
5210 tab = ATGetQueueEntry(wqueue, rel);
5211 newcmd = makeNode(AlterTableCmd);
5212 newcmd->subtype = AT_ReAddIndex;
5213 newcmd->def = (Node *) stmt;
5214 tab->subcmds[AT_PASS_OLD_INDEX] =
5215 lappend(tab->subcmds[AT_PASS_OLD_INDEX], newcmd);
5216 relation_close(rel, NoLock);
5219 case T_AlterTableStmt:
5221 AlterTableStmt *stmt = (AlterTableStmt *) query->utilityStmt;
5224 rel = relation_openrv(stmt->relation, AccessExclusiveLock);
5225 tab = ATGetQueueEntry(wqueue, rel);
5226 foreach(lcmd, stmt->cmds)
5228 AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lcmd);
5230 switch (cmd->subtype)
5233 cmd->subtype = AT_ReAddIndex;
5234 tab->subcmds[AT_PASS_OLD_INDEX] =
5235 lappend(tab->subcmds[AT_PASS_OLD_INDEX], cmd);
5237 case AT_AddConstraint:
5238 tab->subcmds[AT_PASS_OLD_CONSTR] =
5239 lappend(tab->subcmds[AT_PASS_OLD_CONSTR], cmd);
5242 elog(ERROR, "unexpected statement type: %d",
5243 (int) cmd->subtype);
5246 relation_close(rel, NoLock);
5250 elog(ERROR, "unexpected statement type: %d",
5251 (int) nodeTag(query->utilityStmt));
5260 * recursing is true if we are recursing from a table to its indexes or
5261 * toast table. We don't allow the ownership of those things to be
5262 * changed separately from the parent table. Also, we can skip permission
5263 * checks (this is necessary not just an optimization, else we'd fail to
5264 * handle toast tables properly).
5267 ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing)
5269 Relation target_rel;
5272 Form_pg_class tuple_class;
5275 * Get exclusive lock till end of transaction on the target table. Use
5276 * relation_open so that we can work on indexes and sequences.
5278 target_rel = relation_open(relationOid, AccessExclusiveLock);
5280 /* Get its pg_class tuple, too */
5281 class_rel = heap_open(RelationRelationId, RowExclusiveLock);
5283 tuple = SearchSysCache(RELOID,
5284 ObjectIdGetDatum(relationOid),
5286 if (!HeapTupleIsValid(tuple))
5287 elog(ERROR, "cache lookup failed for relation %u", relationOid);
5288 tuple_class = (Form_pg_class) GETSTRUCT(tuple);
5290 /* Can we change the ownership of this tuple? */
5291 switch (tuple_class->relkind)
5293 case RELKIND_RELATION:
5295 case RELKIND_SEQUENCE:
5296 /* ok to change owner */
5302 * Because ALTER INDEX OWNER used to be allowed, and in fact
5303 * is generated by old versions of pg_dump, we give a warning
5304 * and do nothing rather than erroring out. Also, to avoid
5305 * unnecessary chatter while restoring those old dumps, say
5306 * nothing at all if the command would be a no-op anyway.
5308 if (tuple_class->relowner != newOwnerId)
5310 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
5311 errmsg("cannot change owner of index \"%s\"",
5312 NameStr(tuple_class->relname)),
5313 errhint("Change the ownership of the index's table, instead.")));
5314 /* quick hack to exit via the no-op path */
5315 newOwnerId = tuple_class->relowner;
5318 case RELKIND_TOASTVALUE:
5324 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
5325 errmsg("\"%s\" is not a table, view, or sequence",
5326 NameStr(tuple_class->relname))));
5330 * If the new owner is the same as the existing owner, consider the
5331 * command to have succeeded. This is for dump restoration purposes.
5333 if (tuple_class->relowner != newOwnerId)
5335 Datum repl_val[Natts_pg_class];
5336 char repl_null[Natts_pg_class];
5337 char repl_repl[Natts_pg_class];
5343 /* skip permission checks when recursing to index or toast table */
5346 /* Superusers can always do it */
5349 Oid namespaceOid = tuple_class->relnamespace;
5350 AclResult aclresult;
5352 /* Otherwise, must be owner of the existing object */
5353 if (!pg_class_ownercheck(relationOid, GetUserId()))
5354 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
5355 RelationGetRelationName(target_rel));
5357 /* Must be able to become new owner */
5358 check_is_member_of_role(GetUserId(), newOwnerId);
5360 /* New owner must have CREATE privilege on namespace */
5361 aclresult = pg_namespace_aclcheck(namespaceOid, newOwnerId,
5363 if (aclresult != ACLCHECK_OK)
5364 aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
5365 get_namespace_name(namespaceOid));
5369 memset(repl_null, ' ', sizeof(repl_null));
5370 memset(repl_repl, ' ', sizeof(repl_repl));
5372 repl_repl[Anum_pg_class_relowner - 1] = 'r';
5373 repl_val[Anum_pg_class_relowner - 1] = ObjectIdGetDatum(newOwnerId);
5376 * Determine the modified ACL for the new owner. This is only
5377 * necessary when the ACL is non-null.
5379 aclDatum = SysCacheGetAttr(RELOID, tuple,
5380 Anum_pg_class_relacl,
5384 newAcl = aclnewowner(DatumGetAclP(aclDatum),
5385 tuple_class->relowner, newOwnerId);
5386 repl_repl[Anum_pg_class_relacl - 1] = 'r';
5387 repl_val[Anum_pg_class_relacl - 1] = PointerGetDatum(newAcl);
5390 newtuple = heap_modifytuple(tuple, RelationGetDescr(class_rel), repl_val, repl_null, repl_repl);
5392 simple_heap_update(class_rel, &newtuple->t_self, newtuple);
5393 CatalogUpdateIndexes(class_rel, newtuple);
5395 heap_freetuple(newtuple);
5397 /* Update owner dependency reference */
5398 changeDependencyOnOwner(RelationRelationId, relationOid, newOwnerId);
5401 * Also change the ownership of the table's rowtype, if it has one
5403 if (tuple_class->relkind != RELKIND_INDEX)
5404 AlterTypeOwnerInternal(tuple_class->reltype, newOwnerId);
5407 * If we are operating on a table, also change the ownership of any
5408 * indexes and sequences that belong to the table, as well as the
5409 * table's toast table (if it has one)
5411 if (tuple_class->relkind == RELKIND_RELATION ||
5412 tuple_class->relkind == RELKIND_TOASTVALUE)
5414 List *index_oid_list;
5417 /* Find all the indexes belonging to this relation */
5418 index_oid_list = RelationGetIndexList(target_rel);
5420 /* For each index, recursively change its ownership */
5421 foreach(i, index_oid_list)
5422 ATExecChangeOwner(lfirst_oid(i), newOwnerId, true);
5424 list_free(index_oid_list);
5427 if (tuple_class->relkind == RELKIND_RELATION)
5429 /* If it has a toast table, recurse to change its ownership */
5430 if (tuple_class->reltoastrelid != InvalidOid)
5431 ATExecChangeOwner(tuple_class->reltoastrelid, newOwnerId,
5434 /* If it has dependent sequences, recurse to change them too */
5435 change_owner_recurse_to_sequences(relationOid, newOwnerId);
5439 ReleaseSysCache(tuple);
5440 heap_close(class_rel, RowExclusiveLock);
5441 relation_close(target_rel, NoLock);
5445 * change_owner_recurse_to_sequences
5447 * Helper function for ATExecChangeOwner. Examines pg_depend searching
5448 * for sequences that are dependent on serial columns, and changes their
5452 change_owner_recurse_to_sequences(Oid relationOid, Oid newOwnerId)
5460 * SERIAL sequences are those having an internal dependency on one of the
5461 * table's columns (we don't care *which* column, exactly).
5463 depRel = heap_open(DependRelationId, AccessShareLock);
5465 ScanKeyInit(&key[0],
5466 Anum_pg_depend_refclassid,
5467 BTEqualStrategyNumber, F_OIDEQ,
5468 ObjectIdGetDatum(RelationRelationId));
5469 ScanKeyInit(&key[1],
5470 Anum_pg_depend_refobjid,
5471 BTEqualStrategyNumber, F_OIDEQ,
5472 ObjectIdGetDatum(relationOid));
5473 /* we leave refobjsubid unspecified */
5475 scan = systable_beginscan(depRel, DependReferenceIndexId, true,
5476 SnapshotNow, 2, key);
5478 while (HeapTupleIsValid(tup = systable_getnext(scan)))
5480 Form_pg_depend depForm = (Form_pg_depend) GETSTRUCT(tup);
5483 /* skip dependencies other than internal dependencies on columns */
5484 if (depForm->refobjsubid == 0 ||
5485 depForm->classid != RelationRelationId ||
5486 depForm->objsubid != 0 ||
5487 depForm->deptype != DEPENDENCY_INTERNAL)
5490 /* Use relation_open just in case it's an index */
5491 seqRel = relation_open(depForm->objid, AccessExclusiveLock);
5493 /* skip non-sequence relations */
5494 if (RelationGetForm(seqRel)->relkind != RELKIND_SEQUENCE)
5496 /* No need to keep the lock */
5497 relation_close(seqRel, AccessExclusiveLock);
5501 /* We don't need to close the sequence while we alter it. */
5502 ATExecChangeOwner(depForm->objid, newOwnerId, false);
5504 /* Now we can close it. Keep the lock till end of transaction. */
5505 relation_close(seqRel, NoLock);
5508 systable_endscan(scan);
5510 relation_close(depRel, AccessShareLock);
5514 * ALTER TABLE CLUSTER ON
5516 * The only thing we have to do is to change the indisclustered bits.
5519 ATExecClusterOn(Relation rel, const char *indexName)
5523 indexOid = get_relname_relid(indexName, rel->rd_rel->relnamespace);
5525 if (!OidIsValid(indexOid))
5527 (errcode(ERRCODE_UNDEFINED_OBJECT),
5528 errmsg("index \"%s\" for table \"%s\" does not exist",
5529 indexName, RelationGetRelationName(rel))));
5531 /* Check index is valid to cluster on */
5532 check_index_is_clusterable(rel, indexOid, false);
5534 /* And do the work */
5535 mark_index_clustered(rel, indexOid);
5539 * ALTER TABLE SET WITHOUT CLUSTER
5541 * We have to find any indexes on the table that have indisclustered bit
5542 * set and turn it off.
5545 ATExecDropCluster(Relation rel)
5547 mark_index_clustered(rel, InvalidOid);
5551 * ALTER TABLE SET TABLESPACE
5554 ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel, char *tablespacename)
5557 AclResult aclresult;
5560 * We do our own permission checking because we want to allow this on
5563 if (rel->rd_rel->relkind != RELKIND_RELATION &&
5564 rel->rd_rel->relkind != RELKIND_INDEX)
5566 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
5567 errmsg("\"%s\" is not a table or index",
5568 RelationGetRelationName(rel))));
5570 /* Permissions checks */
5571 if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
5572 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
5573 RelationGetRelationName(rel));
5575 if (!allowSystemTableMods && IsSystemRelation(rel))
5577 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
5578 errmsg("permission denied: \"%s\" is a system catalog",
5579 RelationGetRelationName(rel))));
5581 /* Check that the tablespace exists */
5582 tablespaceId = get_tablespace_oid(tablespacename);
5583 if (!OidIsValid(tablespaceId))
5585 (errcode(ERRCODE_UNDEFINED_OBJECT),
5586 errmsg("tablespace \"%s\" does not exist", tablespacename)));
5588 /* Check its permissions */
5589 aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(), ACL_CREATE);
5590 if (aclresult != ACLCHECK_OK)
5591 aclcheck_error(aclresult, ACL_KIND_TABLESPACE, tablespacename);
5593 /* Save info for Phase 3 to do the real work */
5594 if (OidIsValid(tab->newTableSpace))
5596 (errcode(ERRCODE_SYNTAX_ERROR),
5597 errmsg("cannot have multiple SET TABLESPACE subcommands")));
5598 tab->newTableSpace = tablespaceId;
5602 * Execute ALTER TABLE SET TABLESPACE for cases where there is no tuple
5603 * rewriting to be done, so we just want to copy the data as fast as possible.
5606 ATExecSetTableSpace(Oid tableOid, Oid newTableSpace)
5612 RelFileNode newrnode;
5613 SMgrRelation dstrel;
5616 Form_pg_class rd_rel;
5618 rel = relation_open(tableOid, NoLock);
5621 * We can never allow moving of shared or nailed-in-cache relations,
5622 * because we can't support changing their reltablespace values.
5624 if (rel->rd_rel->relisshared || rel->rd_isnailed)
5626 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
5627 errmsg("cannot move system relation \"%s\"",
5628 RelationGetRelationName(rel))));
5631 * Don't allow moving temp tables of other backends ... their local buffer
5632 * manager is not going to cope.
5634 if (isOtherTempNamespace(RelationGetNamespace(rel)))
5636 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
5637 errmsg("cannot move temporary tables of other sessions")));
5640 * No work if no change in tablespace.
5642 oldTableSpace = rel->rd_rel->reltablespace;
5643 if (newTableSpace == oldTableSpace ||
5644 (newTableSpace == MyDatabaseTableSpace && oldTableSpace == 0))
5646 relation_close(rel, NoLock);
5650 reltoastrelid = rel->rd_rel->reltoastrelid;
5651 reltoastidxid = rel->rd_rel->reltoastidxid;
5653 /* Get a modifiable copy of the relation's pg_class row */
5654 pg_class = heap_open(RelationRelationId, RowExclusiveLock);
5656 tuple = SearchSysCacheCopy(RELOID,
5657 ObjectIdGetDatum(tableOid),
5659 if (!HeapTupleIsValid(tuple))
5660 elog(ERROR, "cache lookup failed for relation %u", tableOid);
5661 rd_rel = (Form_pg_class) GETSTRUCT(tuple);
5663 /* create another storage file. Is it a little ugly ? */
5664 /* NOTE: any conflict in relfilenode value will be caught here */
5665 newrnode = rel->rd_node;
5666 newrnode.spcNode = newTableSpace;
5668 dstrel = smgropen(newrnode);
5669 smgrcreate(dstrel, rel->rd_istemp, false);
5671 /* copy relation data to the new physical file */
5672 copy_relation_data(rel, dstrel);
5674 /* schedule unlinking old physical file */
5675 RelationOpenSmgr(rel);
5676 smgrscheduleunlink(rel->rd_smgr, rel->rd_istemp);
5679 * Now drop smgr references. The source was already dropped by
5680 * smgrscheduleunlink.
5684 /* update the pg_class row */
5685 rd_rel->reltablespace = (newTableSpace == MyDatabaseTableSpace) ? InvalidOid : newTableSpace;
5686 simple_heap_update(pg_class, &tuple->t_self, tuple);
5687 CatalogUpdateIndexes(pg_class, tuple);
5689 heap_freetuple(tuple);
5691 heap_close(pg_class, RowExclusiveLock);
5693 relation_close(rel, NoLock);
5695 /* Make sure the reltablespace change is visible */
5696 CommandCounterIncrement();
5698 /* Move associated toast relation and/or index, too */
5699 if (OidIsValid(reltoastrelid))
5700 ATExecSetTableSpace(reltoastrelid, newTableSpace);
5701 if (OidIsValid(reltoastidxid))
5702 ATExecSetTableSpace(reltoastidxid, newTableSpace);
5706 * Copy data, block by block
5709 copy_relation_data(Relation rel, SMgrRelation dst)
5713 BlockNumber nblocks;
5716 Page page = (Page) buf;
5719 * Since we copy the file directly without looking at the shared buffers,
5720 * we'd better first flush out any pages of the source relation that are
5721 * in shared buffers. We assume no new changes will be made while we are
5722 * holding exclusive lock on the rel.
5724 FlushRelationBuffers(rel);
5727 * We need to log the copied data in WAL iff WAL archiving is enabled AND
5728 * it's not a temp rel.
5730 use_wal = XLogArchivingActive() && !rel->rd_istemp;
5732 nblocks = RelationGetNumberOfBlocks(rel);
5733 /* RelationGetNumberOfBlocks will certainly have opened rd_smgr */
5736 for (blkno = 0; blkno < nblocks; blkno++)
5738 smgrread(src, blkno, buf);
5743 xl_heap_newpage xlrec;
5745 XLogRecData rdata[2];
5747 /* NO ELOG(ERROR) from here till newpage op is logged */
5748 START_CRIT_SECTION();
5750 xlrec.node = dst->smgr_rnode;
5751 xlrec.blkno = blkno;
5753 rdata[0].data = (char *) &xlrec;
5754 rdata[0].len = SizeOfHeapNewpage;
5755 rdata[0].buffer = InvalidBuffer;
5756 rdata[0].next = &(rdata[1]);
5758 rdata[1].data = (char *) page;
5759 rdata[1].len = BLCKSZ;
5760 rdata[1].buffer = InvalidBuffer;
5761 rdata[1].next = NULL;
5763 recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_NEWPAGE, rdata);
5765 PageSetLSN(page, recptr);
5766 PageSetTLI(page, ThisTimeLineID);
5772 * Now write the page. We say isTemp = true even if it's not a temp
5773 * rel, because there's no need for smgr to schedule an fsync for this
5774 * write; we'll do it ourselves below.
5776 smgrwrite(dst, blkno, buf, true);
5780 * If the rel isn't temp, we must fsync it down to disk before it's safe
5781 * to commit the transaction. (For a temp rel we don't care since the rel
5782 * will be uninteresting after a crash anyway.)
5784 * It's obvious that we must do this when not WAL-logging the copy. It's
5785 * less obvious that we have to do it even if we did WAL-log the copied
5786 * pages. The reason is that since we're copying outside shared buffers, a
5787 * CHECKPOINT occurring during the copy has no way to flush the previously
5788 * written data to disk (indeed it won't know the new rel even exists). A
5789 * crash later on would replay WAL from the checkpoint, therefore it
5790 * wouldn't replay our earlier WAL entries. If we do not fsync those pages
5791 * here, they might still not be on disk when the crash occurs.
5793 if (!rel->rd_istemp)
5798 * ALTER TABLE ENABLE/DISABLE TRIGGER
5800 * We just pass this off to trigger.c.
5803 ATExecEnableDisableTrigger(Relation rel, char *trigname,
5804 bool enable, bool skip_system)
5806 EnableDisableTrigger(rel, trigname, enable, skip_system);
5810 * ALTER TABLE CREATE TOAST TABLE
5812 * Note: this is also invoked from outside this module; in such cases we
5813 * expect the caller to have verified that the relation is a table and we
5814 * have all the right permissions. Callers expect this function
5815 * to end with CommandCounterIncrement if it makes any changes.
5818 AlterTableCreateToastTable(Oid relOid, bool silent)
5823 bool shared_relation;
5827 char toast_relname[NAMEDATALEN];
5828 char toast_idxname[NAMEDATALEN];
5829 IndexInfo *indexInfo;
5830 Oid classObjectId[2];
5831 ObjectAddress baseobject,
5835 * Grab an exclusive lock on the target table, which we will NOT release
5836 * until end of transaction. (This is probably redundant in all present
5839 rel = heap_open(relOid, AccessExclusiveLock);
5842 * Toast table is shared if and only if its parent is.
5844 * We cannot allow toasting a shared relation after initdb (because
5845 * there's no way to mark it toasted in other databases' pg_class).
5846 * Unfortunately we can't distinguish initdb from a manually started
5847 * standalone backend (toasting happens after the bootstrap phase, so
5848 * checking IsBootstrapProcessingMode() won't work). However, we can at
5849 * least prevent this mistake under normal multi-user operation.
5851 shared_relation = rel->rd_rel->relisshared;
5852 if (shared_relation && IsUnderPostmaster)
5854 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5855 errmsg("shared tables cannot be toasted after initdb")));
5858 * Is it already toasted?
5860 if (rel->rd_rel->reltoastrelid != InvalidOid)
5864 heap_close(rel, NoLock);
5869 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5870 errmsg("table \"%s\" already has a TOAST table",
5871 RelationGetRelationName(rel))));
5875 * Check to see whether the table actually needs a TOAST table.
5877 if (!needs_toast_table(rel))
5881 heap_close(rel, NoLock);
5886 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5887 errmsg("table \"%s\" does not need a TOAST table",
5888 RelationGetRelationName(rel))));
5892 * Create the toast table and its index
5894 snprintf(toast_relname, sizeof(toast_relname),
5895 "pg_toast_%u", relOid);
5896 snprintf(toast_idxname, sizeof(toast_idxname),
5897 "pg_toast_%u_index", relOid);
5899 /* this is pretty painful... need a tuple descriptor */
5900 tupdesc = CreateTemplateTupleDesc(3, false);
5901 TupleDescInitEntry(tupdesc, (AttrNumber) 1,
5905 TupleDescInitEntry(tupdesc, (AttrNumber) 2,
5909 TupleDescInitEntry(tupdesc, (AttrNumber) 3,
5915 * Ensure that the toast table doesn't itself get toasted, or we'll be
5916 * toast :-(. This is essential for chunk_data because type bytea is
5917 * toastable; hit the other two just to be sure.
5919 tupdesc->attrs[0]->attstorage = 'p';
5920 tupdesc->attrs[1]->attstorage = 'p';
5921 tupdesc->attrs[2]->attstorage = 'p';
5924 * Note: the toast relation is placed in the regular pg_toast namespace
5925 * even if its master relation is a temp table. There cannot be any
5926 * naming collision, and the toast rel will be destroyed when its master
5927 * is, so there's no need to handle the toast rel as temp.
5929 toast_relid = heap_create_with_catalog(toast_relname,
5931 rel->rd_rel->reltablespace,
5933 rel->rd_rel->relowner,
5942 /* make the toast relation visible, else index creation will fail */
5943 CommandCounterIncrement();
5946 * Create unique index on chunk_id, chunk_seq.
5948 * NOTE: the normal TOAST access routines could actually function with a
5949 * single-column index on chunk_id only. However, the slice access
5950 * routines use both columns for faster access to an individual chunk. In
5951 * addition, we want it to be unique as a check against the possibility of
5952 * duplicate TOAST chunk OIDs. The index might also be a little more
5953 * efficient this way, since btree isn't all that happy with large numbers
5957 indexInfo = makeNode(IndexInfo);
5958 indexInfo->ii_NumIndexAttrs = 2;
5959 indexInfo->ii_KeyAttrNumbers[0] = 1;
5960 indexInfo->ii_KeyAttrNumbers[1] = 2;
5961 indexInfo->ii_Expressions = NIL;
5962 indexInfo->ii_ExpressionsState = NIL;
5963 indexInfo->ii_Predicate = NIL;
5964 indexInfo->ii_PredicateState = NIL;
5965 indexInfo->ii_Unique = true;
5967 classObjectId[0] = OID_BTREE_OPS_OID;
5968 classObjectId[1] = INT4_BTREE_OPS_OID;
5970 toast_idxid = index_create(toast_relid, toast_idxname, InvalidOid,
5973 rel->rd_rel->reltablespace,
5975 true, false, true, false);
5978 * Update toast rel's pg_class entry to show that it has an index. The
5979 * index OID is stored into the reltoastidxid field for easy access by the
5982 setRelhasindex(toast_relid, true, true, toast_idxid);
5985 * Store the toast table's OID in the parent relation's pg_class row
5987 class_rel = heap_open(RelationRelationId, RowExclusiveLock);
5989 reltup = SearchSysCacheCopy(RELOID,
5990 ObjectIdGetDatum(relOid),
5992 if (!HeapTupleIsValid(reltup))
5993 elog(ERROR, "cache lookup failed for relation %u", relOid);
5995 ((Form_pg_class) GETSTRUCT(reltup))->reltoastrelid = toast_relid;
5997 simple_heap_update(class_rel, &reltup->t_self, reltup);
5999 /* Keep catalog indexes current */
6000 CatalogUpdateIndexes(class_rel, reltup);
6002 heap_freetuple(reltup);
6004 heap_close(class_rel, RowExclusiveLock);
6007 * Register dependency from the toast table to the master, so that the
6008 * toast table will be deleted if the master is.
6010 baseobject.classId = RelationRelationId;
6011 baseobject.objectId = relOid;
6012 baseobject.objectSubId = 0;
6013 toastobject.classId = RelationRelationId;
6014 toastobject.objectId = toast_relid;
6015 toastobject.objectSubId = 0;
6017 recordDependencyOn(&toastobject, &baseobject, DEPENDENCY_INTERNAL);
6020 * Clean up and make changes visible
6022 heap_close(rel, NoLock);
6024 CommandCounterIncrement();
6028 * Check to see whether the table needs a TOAST table. It does only if
6029 * (1) there are any toastable attributes, and (2) the maximum length
6030 * of a tuple could exceed TOAST_TUPLE_THRESHOLD. (We don't want to
6031 * create a toast table for something like "f1 varchar(20)".)
6034 needs_toast_table(Relation rel)
6036 int32 data_length = 0;
6037 bool maxlength_unknown = false;
6038 bool has_toastable_attrs = false;
6040 Form_pg_attribute *att;
6044 tupdesc = rel->rd_att;
6045 att = tupdesc->attrs;
6047 for (i = 0; i < tupdesc->natts; i++)
6049 if (att[i]->attisdropped)
6051 data_length = att_align(data_length, att[i]->attalign);
6052 if (att[i]->attlen > 0)
6054 /* Fixed-length types are never toastable */
6055 data_length += att[i]->attlen;
6059 int32 maxlen = type_maximum_size(att[i]->atttypid,
6063 maxlength_unknown = true;
6065 data_length += maxlen;
6066 if (att[i]->attstorage != 'p')
6067 has_toastable_attrs = true;
6070 if (!has_toastable_attrs)
6071 return false; /* nothing to toast? */
6072 if (maxlength_unknown)
6073 return true; /* any unlimited-length attrs? */
6074 tuple_length = MAXALIGN(offsetof(HeapTupleHeaderData, t_bits) +
6075 BITMAPLEN(tupdesc->natts)) +
6076 MAXALIGN(data_length);
6077 return (tuple_length > TOAST_TUPLE_THRESHOLD);
6082 * Execute ALTER TABLE SET SCHEMA
6084 * Note: caller must have checked ownership of the relation already
6087 AlterTableNamespace(RangeVar *relation, const char *newschema)
6095 rel = heap_openrv(relation, AccessExclusiveLock);
6097 /* heap_openrv allows TOAST, but we don't want to */
6098 if (rel->rd_rel->relkind == RELKIND_TOASTVALUE)
6100 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
6101 errmsg("\"%s\" is a TOAST relation",
6102 RelationGetRelationName(rel))));
6104 relid = RelationGetRelid(rel);
6105 oldNspOid = RelationGetNamespace(rel);
6107 /* get schema OID and check its permissions */
6108 nspOid = LookupCreationNamespace(newschema);
6110 if (oldNspOid == nspOid)
6112 (errcode(ERRCODE_DUPLICATE_TABLE),
6113 errmsg("relation \"%s\" is already in schema \"%s\"",
6114 RelationGetRelationName(rel),
6117 /* disallow renaming into or out of temp schemas */
6118 if (isAnyTempNamespace(nspOid) || isAnyTempNamespace(oldNspOid))
6120 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
6121 errmsg("cannot move objects into or out of temporary schemas")));
6123 /* same for TOAST schema */
6124 if (nspOid == PG_TOAST_NAMESPACE || oldNspOid == PG_TOAST_NAMESPACE)
6126 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
6127 errmsg("cannot move objects into or out of TOAST schema")));
6129 /* OK, modify the pg_class row and pg_depend entry */
6130 classRel = heap_open(RelationRelationId, RowExclusiveLock);
6132 AlterRelationNamespaceInternal(classRel, relid, oldNspOid, nspOid, true);
6134 /* Fix the table's rowtype too */
6135 AlterTypeNamespaceInternal(rel->rd_rel->reltype, nspOid, false);
6137 /* Fix other dependent stuff */
6138 if (rel->rd_rel->relkind == RELKIND_RELATION)
6140 AlterIndexNamespaces(classRel, rel, oldNspOid, nspOid);
6141 AlterSeqNamespaces(classRel, rel, oldNspOid, nspOid, newschema);
6142 AlterConstraintNamespaces(relid, oldNspOid, nspOid, false);
6145 heap_close(classRel, RowExclusiveLock);
6147 /* close rel, but keep lock until commit */
6148 relation_close(rel, NoLock);
6152 * The guts of relocating a relation to another namespace: fix the pg_class
6153 * entry, and the pg_depend entry if any. Caller must already have
6154 * opened and write-locked pg_class.
6157 AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
6158 Oid oldNspOid, Oid newNspOid,
6159 bool hasDependEntry)
6162 Form_pg_class classForm;
6164 classTup = SearchSysCacheCopy(RELOID,
6165 ObjectIdGetDatum(relOid),
6167 if (!HeapTupleIsValid(classTup))
6168 elog(ERROR, "cache lookup failed for relation %u", relOid);
6169 classForm = (Form_pg_class) GETSTRUCT(classTup);
6171 Assert(classForm->relnamespace == oldNspOid);
6173 /* check for duplicate name (more friendly than unique-index failure) */
6174 if (get_relname_relid(NameStr(classForm->relname),
6175 newNspOid) != InvalidOid)
6177 (errcode(ERRCODE_DUPLICATE_TABLE),
6178 errmsg("relation \"%s\" already exists in schema \"%s\"",
6179 NameStr(classForm->relname),
6180 get_namespace_name(newNspOid))));
6182 /* classTup is a copy, so OK to scribble on */
6183 classForm->relnamespace = newNspOid;
6185 simple_heap_update(classRel, &classTup->t_self, classTup);
6186 CatalogUpdateIndexes(classRel, classTup);
6188 /* Update dependency on schema if caller said so */
6189 if (hasDependEntry &&
6190 changeDependencyFor(RelationRelationId, relOid,
6191 NamespaceRelationId, oldNspOid, newNspOid) != 1)
6192 elog(ERROR, "failed to change schema dependency for relation \"%s\"",
6193 NameStr(classForm->relname));
6195 heap_freetuple(classTup);
6199 * Move all indexes for the specified relation to another namespace.
6201 * Note: we assume adequate permission checking was done by the caller,
6202 * and that the caller has a suitable lock on the owning relation.
6205 AlterIndexNamespaces(Relation classRel, Relation rel,
6206 Oid oldNspOid, Oid newNspOid)
6211 indexList = RelationGetIndexList(rel);
6213 foreach(l, indexList)
6215 Oid indexOid = lfirst_oid(l);
6218 * Note: currently, the index will not have its own dependency on the
6219 * namespace, so we don't need to do changeDependencyFor(). There's no
6220 * rowtype in pg_type, either.
6222 AlterRelationNamespaceInternal(classRel, indexOid,
6223 oldNspOid, newNspOid,
6227 list_free(indexList);
6231 * Move all SERIAL-column sequences of the specified relation to another
6234 * Note: we assume adequate permission checking was done by the caller,
6235 * and that the caller has a suitable lock on the owning relation.
6238 AlterSeqNamespaces(Relation classRel, Relation rel,
6239 Oid oldNspOid, Oid newNspOid, const char *newNspName)
6247 * SERIAL sequences are those having an internal dependency on one of the
6248 * table's columns (we don't care *which* column, exactly).
6250 depRel = heap_open(DependRelationId, AccessShareLock);
6252 ScanKeyInit(&key[0],
6253 Anum_pg_depend_refclassid,
6254 BTEqualStrategyNumber, F_OIDEQ,
6255 ObjectIdGetDatum(RelationRelationId));
6256 ScanKeyInit(&key[1],
6257 Anum_pg_depend_refobjid,
6258 BTEqualStrategyNumber, F_OIDEQ,
6259 ObjectIdGetDatum(RelationGetRelid(rel)));
6260 /* we leave refobjsubid unspecified */
6262 scan = systable_beginscan(depRel, DependReferenceIndexId, true,
6263 SnapshotNow, 2, key);
6265 while (HeapTupleIsValid(tup = systable_getnext(scan)))
6267 Form_pg_depend depForm = (Form_pg_depend) GETSTRUCT(tup);
6270 /* skip dependencies other than internal dependencies on columns */
6271 if (depForm->refobjsubid == 0 ||
6272 depForm->classid != RelationRelationId ||
6273 depForm->objsubid != 0 ||
6274 depForm->deptype != DEPENDENCY_INTERNAL)
6277 /* Use relation_open just in case it's an index */
6278 seqRel = relation_open(depForm->objid, AccessExclusiveLock);
6280 /* skip non-sequence relations */
6281 if (RelationGetForm(seqRel)->relkind != RELKIND_SEQUENCE)
6283 /* No need to keep the lock */
6284 relation_close(seqRel, AccessExclusiveLock);
6288 /* Fix the pg_class and pg_depend entries */
6289 AlterRelationNamespaceInternal(classRel, depForm->objid,
6290 oldNspOid, newNspOid,
6294 * Sequences have entries in pg_type. We need to be careful to move
6295 * them to the new namespace, too.
6297 AlterTypeNamespaceInternal(RelationGetForm(seqRel)->reltype,
6300 /* Now we can close it. Keep the lock till end of transaction. */
6301 relation_close(seqRel, NoLock);
6304 systable_endscan(scan);
6306 relation_close(depRel, AccessShareLock);
6311 * This code supports
6312 * CREATE TEMP TABLE ... ON COMMIT { DROP | PRESERVE ROWS | DELETE ROWS }
6314 * Because we only support this for TEMP tables, it's sufficient to remember
6315 * the state in a backend-local data structure.
6319 * Register a newly-created relation's ON COMMIT action.
6322 register_on_commit_action(Oid relid, OnCommitAction action)
6325 MemoryContext oldcxt;
6328 * We needn't bother registering the relation unless there is an ON COMMIT
6329 * action we need to take.
6331 if (action == ONCOMMIT_NOOP || action == ONCOMMIT_PRESERVE_ROWS)
6334 oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
6336 oc = (OnCommitItem *) palloc(sizeof(OnCommitItem));
6338 oc->oncommit = action;
6339 oc->creating_subid = GetCurrentSubTransactionId();
6340 oc->deleting_subid = InvalidSubTransactionId;
6342 on_commits = lcons(oc, on_commits);
6344 MemoryContextSwitchTo(oldcxt);
6348 * Unregister any ON COMMIT action when a relation is deleted.
6350 * Actually, we only mark the OnCommitItem entry as to be deleted after commit.
6353 remove_on_commit_action(Oid relid)
6357 foreach(l, on_commits)
6359 OnCommitItem *oc = (OnCommitItem *) lfirst(l);
6361 if (oc->relid == relid)
6363 oc->deleting_subid = GetCurrentSubTransactionId();
6370 * Perform ON COMMIT actions.
6372 * This is invoked just before actually committing, since it's possible
6373 * to encounter errors.
6376 PreCommit_on_commit_actions(void)
6379 List *oids_to_truncate = NIL;
6381 foreach(l, on_commits)
6383 OnCommitItem *oc = (OnCommitItem *) lfirst(l);
6385 /* Ignore entry if already dropped in this xact */
6386 if (oc->deleting_subid != InvalidSubTransactionId)
6389 switch (oc->oncommit)
6392 case ONCOMMIT_PRESERVE_ROWS:
6393 /* Do nothing (there shouldn't be such entries, actually) */
6395 case ONCOMMIT_DELETE_ROWS:
6396 oids_to_truncate = lappend_oid(oids_to_truncate, oc->relid);
6400 ObjectAddress object;
6402 object.classId = RelationRelationId;
6403 object.objectId = oc->relid;
6404 object.objectSubId = 0;
6405 performDeletion(&object, DROP_CASCADE);
6408 * Note that table deletion will call
6409 * remove_on_commit_action, so the entry should get marked
6412 Assert(oc->deleting_subid != InvalidSubTransactionId);
6417 if (oids_to_truncate != NIL)
6419 heap_truncate(oids_to_truncate);
6420 CommandCounterIncrement(); /* XXX needed? */
6425 * Post-commit or post-abort cleanup for ON COMMIT management.
6427 * All we do here is remove no-longer-needed OnCommitItem entries.
6429 * During commit, remove entries that were deleted during this transaction;
6430 * during abort, remove those created during this transaction.
6433 AtEOXact_on_commit_actions(bool isCommit)
6436 ListCell *prev_item;
6439 cur_item = list_head(on_commits);
6441 while (cur_item != NULL)
6443 OnCommitItem *oc = (OnCommitItem *) lfirst(cur_item);
6445 if (isCommit ? oc->deleting_subid != InvalidSubTransactionId :
6446 oc->creating_subid != InvalidSubTransactionId)
6448 /* cur_item must be removed */
6449 on_commits = list_delete_cell(on_commits, cur_item, prev_item);
6452 cur_item = lnext(prev_item);
6454 cur_item = list_head(on_commits);
6458 /* cur_item must be preserved */
6459 oc->creating_subid = InvalidSubTransactionId;
6460 oc->deleting_subid = InvalidSubTransactionId;
6461 prev_item = cur_item;
6462 cur_item = lnext(prev_item);
6468 * Post-subcommit or post-subabort cleanup for ON COMMIT management.
6470 * During subabort, we can immediately remove entries created during this
6471 * subtransaction. During subcommit, just relabel entries marked during
6472 * this subtransaction as being the parent's responsibility.
6475 AtEOSubXact_on_commit_actions(bool isCommit, SubTransactionId mySubid,
6476 SubTransactionId parentSubid)
6479 ListCell *prev_item;
6482 cur_item = list_head(on_commits);
6484 while (cur_item != NULL)
6486 OnCommitItem *oc = (OnCommitItem *) lfirst(cur_item);
6488 if (!isCommit && oc->creating_subid == mySubid)
6490 /* cur_item must be removed */
6491 on_commits = list_delete_cell(on_commits, cur_item, prev_item);
6494 cur_item = lnext(prev_item);
6496 cur_item = list_head(on_commits);
6500 /* cur_item must be preserved */
6501 if (oc->creating_subid == mySubid)
6502 oc->creating_subid = parentSubid;
6503 if (oc->deleting_subid == mySubid)
6504 oc->deleting_subid = isCommit ? parentSubid : InvalidSubTransactionId;
6505 prev_item = cur_item;
6506 cur_item = lnext(prev_item);