1 /*-------------------------------------------------------------------------
4 * Commands for creating and altering table structures and settings
6 * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.178 2006/03/03 03:30:52 tgl Exp $
13 *-------------------------------------------------------------------------
17 #include "access/genam.h"
18 #include "access/tuptoaster.h"
19 #include "catalog/catalog.h"
20 #include "catalog/dependency.h"
21 #include "catalog/heap.h"
22 #include "catalog/index.h"
23 #include "catalog/indexing.h"
24 #include "catalog/namespace.h"
25 #include "catalog/pg_constraint.h"
26 #include "catalog/pg_depend.h"
27 #include "catalog/pg_inherits.h"
28 #include "catalog/pg_namespace.h"
29 #include "catalog/pg_opclass.h"
30 #include "catalog/pg_trigger.h"
31 #include "catalog/pg_type.h"
32 #include "commands/cluster.h"
33 #include "commands/defrem.h"
34 #include "commands/tablecmds.h"
35 #include "commands/tablespace.h"
36 #include "commands/trigger.h"
37 #include "commands/typecmds.h"
38 #include "executor/executor.h"
39 #include "lib/stringinfo.h"
40 #include "miscadmin.h"
41 #include "nodes/makefuncs.h"
42 #include "optimizer/clauses.h"
43 #include "optimizer/plancat.h"
44 #include "optimizer/prep.h"
45 #include "parser/analyze.h"
46 #include "parser/gramparse.h"
47 #include "parser/parser.h"
48 #include "parser/parse_clause.h"
49 #include "parser/parse_coerce.h"
50 #include "parser/parse_expr.h"
51 #include "parser/parse_oper.h"
52 #include "parser/parse_relation.h"
53 #include "parser/parse_type.h"
54 #include "rewrite/rewriteHandler.h"
55 #include "storage/smgr.h"
56 #include "utils/acl.h"
57 #include "utils/builtins.h"
58 #include "utils/fmgroids.h"
59 #include "utils/inval.h"
60 #include "utils/lsyscache.h"
61 #include "utils/memutils.h"
62 #include "utils/relcache.h"
63 #include "utils/syscache.h"
67 * ON COMMIT action list
69 typedef struct OnCommitItem
71 Oid relid; /* relid of relation */
72 OnCommitAction oncommit; /* what to do at end of xact */
75 * If this entry was created during the current transaction,
76 * creating_subid is the ID of the creating subxact; if created in a prior
77 * transaction, creating_subid is zero. If deleted during the current
78 * transaction, deleting_subid is the ID of the deleting subxact; if no
79 * deletion request is pending, deleting_subid is zero.
81 SubTransactionId creating_subid;
82 SubTransactionId deleting_subid;
85 static List *on_commits = NIL;
89 * State information for ALTER TABLE
91 * The pending-work queue for an ALTER TABLE is a List of AlteredTableInfo
92 * structs, one for each table modified by the operation (the named table
93 * plus any child tables that are affected). We save lists of subcommands
94 * to apply to this table (possibly modified by parse transformation steps);
95 * these lists will be executed in Phase 2. If a Phase 3 step is needed,
96 * necessary information is stored in the constraints and newvals lists.
98 * Phase 2 is divided into multiple passes; subcommands are executed in
99 * a pass determined by subcommand type.
102 #define AT_PASS_DROP 0 /* DROP (all flavors) */
103 #define AT_PASS_ALTER_TYPE 1 /* ALTER COLUMN TYPE */
104 #define AT_PASS_OLD_INDEX 2 /* re-add existing indexes */
105 #define AT_PASS_OLD_CONSTR 3 /* re-add existing constraints */
106 #define AT_PASS_COL_ATTRS 4 /* set other column attributes */
107 /* We could support a RENAME COLUMN pass here, but not currently used */
108 #define AT_PASS_ADD_COL 5 /* ADD COLUMN */
109 #define AT_PASS_ADD_INDEX 6 /* ADD indexes */
110 #define AT_PASS_ADD_CONSTR 7 /* ADD constraints, defaults */
111 #define AT_PASS_MISC 8 /* other stuff */
112 #define AT_NUM_PASSES 9
114 typedef struct AlteredTableInfo
116 /* Information saved before any work commences: */
117 Oid relid; /* Relation to work on */
118 char relkind; /* Its relkind */
119 TupleDesc oldDesc; /* Pre-modification tuple descriptor */
120 /* Information saved by Phase 1 for Phase 2: */
121 List *subcmds[AT_NUM_PASSES]; /* Lists of AlterTableCmd */
122 /* Information saved by Phases 1/2 for Phase 3: */
123 List *constraints; /* List of NewConstraint */
124 List *newvals; /* List of NewColumnValue */
125 Oid newTableSpace; /* new tablespace; 0 means no change */
126 /* Objects to rebuild after completing ALTER TYPE operations */
127 List *changedConstraintOids; /* OIDs of constraints to rebuild */
128 List *changedConstraintDefs; /* string definitions of same */
129 List *changedIndexOids; /* OIDs of indexes to rebuild */
130 List *changedIndexDefs; /* string definitions of same */
133 /* Struct describing one new constraint to check in Phase 3 scan */
134 typedef struct NewConstraint
136 char *name; /* Constraint name, or NULL if none */
137 ConstrType contype; /* CHECK, NOT_NULL, or FOREIGN */
138 AttrNumber attnum; /* only relevant for NOT_NULL */
139 Oid refrelid; /* PK rel, if FOREIGN */
140 Node *qual; /* Check expr or FkConstraint struct */
141 List *qualstate; /* Execution state for CHECK */
145 * Struct describing one new column value that needs to be computed during
146 * Phase 3 copy (this could be either a new column with a non-null default, or
147 * a column that we're changing the type of). Columns without such an entry
148 * are just copied from the old table during ATRewriteTable. Note that the
149 * expr is an expression over *old* table values.
151 typedef struct NewColumnValue
153 AttrNumber attnum; /* which column */
154 Expr *expr; /* expression to compute */
155 ExprState *exprstate; /* execution state */
159 static List *MergeAttributes(List *schema, List *supers, bool istemp,
160 List **supOids, List **supconstr, int *supOidCount);
161 static bool change_varattnos_of_a_node(Node *node, const AttrNumber *newattno);
162 static void StoreCatalogInheritance(Oid relationId, List *supers);
163 static int findAttrByName(const char *attributeName, List *schema);
164 static void setRelhassubclassInRelation(Oid relationId, bool relhassubclass);
165 static bool needs_toast_table(Relation rel);
166 static void AlterIndexNamespaces(Relation classRel, Relation rel,
167 Oid oldNspOid, Oid newNspOid);
168 static void AlterSeqNamespaces(Relation classRel, Relation rel,
169 Oid oldNspOid, Oid newNspOid,
170 const char *newNspName);
171 static int transformColumnNameList(Oid relId, List *colList,
172 int16 *attnums, Oid *atttypids);
173 static int transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
175 int16 *attnums, Oid *atttypids,
177 static Oid transformFkeyCheckAttrs(Relation pkrel,
178 int numattrs, int16 *attnums,
180 static void validateForeignKeyConstraint(FkConstraint *fkconstraint,
181 Relation rel, Relation pkrel);
182 static void createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint,
184 static char *fkMatchTypeToString(char match_type);
185 static void ATController(Relation rel, List *cmds, bool recurse);
186 static void ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
187 bool recurse, bool recursing);
188 static void ATRewriteCatalogs(List **wqueue);
189 static void ATExecCmd(AlteredTableInfo *tab, Relation rel, AlterTableCmd *cmd);
190 static void ATRewriteTables(List **wqueue);
191 static void ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap);
192 static AlteredTableInfo *ATGetQueueEntry(List **wqueue, Relation rel);
193 static void ATSimplePermissions(Relation rel, bool allowView);
194 static void ATSimpleRecursion(List **wqueue, Relation rel,
195 AlterTableCmd *cmd, bool recurse);
196 static void ATOneLevelRecursion(List **wqueue, Relation rel,
198 static void find_composite_type_dependencies(Oid typeOid,
199 const char *origTblName);
200 static void ATPrepAddColumn(List **wqueue, Relation rel, bool recurse,
202 static void ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
204 static void add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid);
205 static void add_column_support_dependency(Oid relid, int32 attnum,
207 static void ATExecDropNotNull(Relation rel, const char *colName);
208 static void ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
209 const char *colName);
210 static void ATExecColumnDefault(Relation rel, const char *colName,
212 static void ATPrepSetStatistics(Relation rel, const char *colName,
214 static void ATExecSetStatistics(Relation rel, const char *colName,
216 static void ATExecSetStorage(Relation rel, const char *colName,
218 static void ATExecDropColumn(Relation rel, const char *colName,
219 DropBehavior behavior,
220 bool recurse, bool recursing);
221 static void ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
222 IndexStmt *stmt, bool is_rebuild);
223 static void ATExecAddConstraint(AlteredTableInfo *tab, Relation rel,
224 Node *newConstraint);
225 static void ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
226 FkConstraint *fkconstraint);
227 static void ATPrepDropConstraint(List **wqueue, Relation rel,
228 bool recurse, AlterTableCmd *cmd);
229 static void ATExecDropConstraint(Relation rel, const char *constrName,
230 DropBehavior behavior, bool quiet);
231 static void ATPrepAlterColumnType(List **wqueue,
232 AlteredTableInfo *tab, Relation rel,
233 bool recurse, bool recursing,
235 static void ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
236 const char *colName, TypeName *typename);
237 static void ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab);
238 static void ATPostAlterTypeParse(char *cmd, List **wqueue);
239 static void change_owner_recurse_to_sequences(Oid relationOid,
241 static void ATExecClusterOn(Relation rel, const char *indexName);
242 static void ATExecDropCluster(Relation rel);
243 static void ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel,
244 char *tablespacename);
245 static void ATExecSetTableSpace(Oid tableOid, Oid newTableSpace);
246 static void ATExecEnableDisableTrigger(Relation rel, char *trigname,
247 bool enable, bool skip_system);
248 static void copy_relation_data(Relation rel, SMgrRelation dst);
249 static void update_ri_trigger_args(Oid relid,
253 bool update_relname);
256 /* ----------------------------------------------------------------
258 * Creates a new relation.
260 * If successful, returns the OID of the new relation.
261 * ----------------------------------------------------------------
264 DefineRelation(CreateStmt *stmt, char relkind)
266 char relname[NAMEDATALEN];
268 List *schema = stmt->tableElts;
272 TupleDesc descriptor;
274 List *old_constraints;
283 * Truncate relname to appropriate length (probably a waste of time, as
284 * parser should have done this already).
286 StrNCpy(relname, stmt->relation->relname, NAMEDATALEN);
289 * Check consistency of arguments
291 if (stmt->oncommit != ONCOMMIT_NOOP && !stmt->relation->istemp)
293 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
294 errmsg("ON COMMIT can only be used on temporary tables")));
297 * Look up the namespace in which we are supposed to create the relation.
298 * Check we have permission to create there. Skip check if bootstrapping,
299 * since permissions machinery may not be working yet.
301 namespaceId = RangeVarGetCreationNamespace(stmt->relation);
303 if (!IsBootstrapProcessingMode())
307 aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(),
309 if (aclresult != ACLCHECK_OK)
310 aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
311 get_namespace_name(namespaceId));
315 * Select tablespace to use. If not specified, use default_tablespace
316 * (which may in turn default to database's default).
318 if (stmt->tablespacename)
320 tablespaceId = get_tablespace_oid(stmt->tablespacename);
321 if (!OidIsValid(tablespaceId))
323 (errcode(ERRCODE_UNDEFINED_OBJECT),
324 errmsg("tablespace \"%s\" does not exist",
325 stmt->tablespacename)));
329 tablespaceId = GetDefaultTablespace();
330 /* note InvalidOid is OK in this case */
333 /* Check permissions except when using database's default */
334 if (OidIsValid(tablespaceId))
338 aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(),
340 if (aclresult != ACLCHECK_OK)
341 aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
342 get_tablespace_name(tablespaceId));
346 * Look up inheritance ancestors and generate relation schema, including
347 * inherited attributes.
349 schema = MergeAttributes(schema, stmt->inhRelations,
350 stmt->relation->istemp,
351 &inheritOids, &old_constraints, &parentOidCount);
354 * Create a relation descriptor from the relation schema and create the
355 * relation. Note that in this stage only inherited (pre-cooked) defaults
356 * and constraints will be included into the new relation.
357 * (BuildDescForRelation takes care of the inherited defaults, but we have
358 * to copy inherited constraints here.)
360 descriptor = BuildDescForRelation(schema);
362 localHasOids = interpretOidsOption(stmt->hasoids);
363 descriptor->tdhasoid = (localHasOids || parentOidCount > 0);
365 if (old_constraints != NIL)
367 ConstrCheck *check = (ConstrCheck *)
368 palloc0(list_length(old_constraints) * sizeof(ConstrCheck));
371 foreach(listptr, old_constraints)
373 Constraint *cdef = (Constraint *) lfirst(listptr);
376 if (cdef->contype != CONSTR_CHECK)
378 Assert(cdef->name != NULL);
379 Assert(cdef->raw_expr == NULL && cdef->cooked_expr != NULL);
382 * In multiple-inheritance situations, it's possible to inherit
383 * the same grandparent constraint through multiple parents.
384 * Hence, discard inherited constraints that match as to both name
385 * and expression. Otherwise, gripe if the names conflict.
387 for (i = 0; i < ncheck; i++)
389 if (strcmp(check[i].ccname, cdef->name) != 0)
391 if (strcmp(check[i].ccbin, cdef->cooked_expr) == 0)
397 (errcode(ERRCODE_DUPLICATE_OBJECT),
398 errmsg("duplicate check constraint name \"%s\"",
403 check[ncheck].ccname = cdef->name;
404 check[ncheck].ccbin = pstrdup(cdef->cooked_expr);
410 if (descriptor->constr == NULL)
412 descriptor->constr = (TupleConstr *) palloc(sizeof(TupleConstr));
413 descriptor->constr->defval = NULL;
414 descriptor->constr->num_defval = 0;
415 descriptor->constr->has_not_null = false;
417 descriptor->constr->num_check = ncheck;
418 descriptor->constr->check = check;
422 relationId = heap_create_with_catalog(relname,
433 allowSystemTableMods);
435 StoreCatalogInheritance(relationId, inheritOids);
438 * We must bump the command counter to make the newly-created relation
439 * tuple visible for opening.
441 CommandCounterIncrement();
444 * Open the new relation and acquire exclusive lock on it. This isn't
445 * really necessary for locking out other backends (since they can't see
446 * the new rel anyway until we commit), but it keeps the lock manager from
447 * complaining about deadlock risks.
449 rel = relation_open(relationId, AccessExclusiveLock);
452 * Now add any newly specified column default values and CHECK constraints
453 * to the new relation. These are passed to us in the form of raw
454 * parsetrees; we need to transform them to executable expression trees
455 * before they can be added. The most convenient way to do that is to
456 * apply the parser's transformExpr routine, but transformExpr doesn't
457 * work unless we have a pre-existing relation. So, the transformation has
458 * to be postponed to this final step of CREATE TABLE.
460 * Another task that's conveniently done at this step is to add dependency
461 * links between columns and supporting relations (such as SERIAL
464 * First, scan schema to find new column defaults.
469 foreach(listptr, schema)
471 ColumnDef *colDef = lfirst(listptr);
475 if (colDef->raw_default != NULL)
477 RawColumnDefault *rawEnt;
479 Assert(colDef->cooked_default == NULL);
481 rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault));
482 rawEnt->attnum = attnum;
483 rawEnt->raw_default = colDef->raw_default;
484 rawDefaults = lappend(rawDefaults, rawEnt);
487 /* Create dependency for supporting relation for this column */
488 if (colDef->support != NULL)
489 add_column_support_dependency(relationId, attnum, colDef->support);
493 * Parse and add the defaults/constraints, if any.
495 if (rawDefaults || stmt->constraints)
496 AddRelationRawConstraints(rel, rawDefaults, stmt->constraints);
499 * Clean up. We keep lock on new relation (although it shouldn't be
500 * visible to anyone else anyway, until commit).
502 relation_close(rel, NoLock);
509 * Deletes a relation.
512 RemoveRelation(const RangeVar *relation, DropBehavior behavior)
515 ObjectAddress object;
517 relOid = RangeVarGetRelid(relation, false);
519 object.classId = RelationRelationId;
520 object.objectId = relOid;
521 object.objectSubId = 0;
523 performDeletion(&object, behavior);
528 * Executes a TRUNCATE command.
530 * This is a multi-relation truncate. We first open and grab exclusive
531 * lock on all relations involved, checking permissions and otherwise
532 * verifying that the relation is OK for truncation. In CASCADE mode,
533 * relations having FK references to the targeted relations are automatically
534 * added to the group; in RESTRICT mode, we check that all FK references are
535 * internal to the group that's being truncated. Finally all the relations
536 * are truncated and reindexed.
539 ExecuteTruncate(TruncateStmt *stmt)
542 List *directRelids = NIL;
548 * Open and exclusive-lock all the explicitly-specified relations
550 foreach(cell, stmt->relations)
552 RangeVar *rv = lfirst(cell);
554 rel = heap_openrv(rv, AccessExclusiveLock);
555 rels = lappend(rels, rel);
556 directRelids = lappend_oid(directRelids, RelationGetRelid(rel));
560 * In CASCADE mode, suck in all referencing relations as well. This
561 * requires multiple iterations to find indirectly-dependent relations.
562 * At each phase, we need to exclusive-lock new rels before looking
563 * for their dependencies, else we might miss something.
565 if (stmt->behavior == DROP_CASCADE)
567 List *relids = list_copy(directRelids);
573 newrelids = heap_truncate_find_FKs(relids);
574 if (newrelids == NIL)
575 break; /* nothing else to add */
577 foreach(cell, newrelids)
579 relid = lfirst_oid(cell);
580 rel = heap_open(relid, AccessExclusiveLock);
581 rels = lappend(rels, rel);
582 relids = lappend_oid(relids, relid);
587 /* now check all involved relations */
590 rel = (Relation) lfirst(cell);
591 relid = RelationGetRelid(rel);
594 * If this table was added to the command by CASCADE, report it.
595 * We don't do this earlier because if we error out on one of the
596 * tables, it'd be confusing to list subsequently-added tables.
598 if (stmt->behavior == DROP_CASCADE &&
599 !list_member_oid(directRelids, relid))
601 (errmsg("truncate cascades to table \"%s\"",
602 RelationGetRelationName(rel))));
604 /* Only allow truncate on regular tables */
605 if (rel->rd_rel->relkind != RELKIND_RELATION)
607 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
608 errmsg("\"%s\" is not a table",
609 RelationGetRelationName(rel))));
611 /* Permissions checks */
612 if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
613 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
614 RelationGetRelationName(rel));
616 if (!allowSystemTableMods && IsSystemRelation(rel))
618 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
619 errmsg("permission denied: \"%s\" is a system catalog",
620 RelationGetRelationName(rel))));
623 * We can never allow truncation of shared or nailed-in-cache
624 * relations, because we can't support changing their relfilenode
627 if (rel->rd_rel->relisshared || rel->rd_isnailed)
629 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
630 errmsg("cannot truncate system relation \"%s\"",
631 RelationGetRelationName(rel))));
634 * Don't allow truncate on temp tables of other backends ... their
635 * local buffer manager is not going to cope.
637 if (isOtherTempNamespace(RelationGetNamespace(rel)))
639 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
640 errmsg("cannot truncate temporary tables of other sessions")));
644 * Check foreign key references. In CASCADE mode, this should be
645 * unnecessary since we just pulled in all the references; but as
646 * a cross-check, do it anyway if in an Assert-enabled build.
648 #ifdef USE_ASSERT_CHECKING
649 heap_truncate_check_FKs(rels, false);
651 if (stmt->behavior == DROP_RESTRICT)
652 heap_truncate_check_FKs(rels, false);
656 * OK, truncate each table.
663 rel = (Relation) lfirst(cell);
666 * Create a new empty storage file for the relation, and assign it as
667 * the relfilenode value. The old storage file is scheduled for
668 * deletion at commit.
670 setNewRelfilenode(rel);
672 heap_relid = RelationGetRelid(rel);
673 toast_relid = rel->rd_rel->reltoastrelid;
675 heap_close(rel, NoLock);
678 * The same for the toast table, if any.
680 if (OidIsValid(toast_relid))
682 rel = relation_open(toast_relid, AccessExclusiveLock);
683 setNewRelfilenode(rel);
684 heap_close(rel, NoLock);
688 * Reconstruct the indexes to match, and we're done.
690 reindex_relation(heap_relid, true);
696 * Returns new schema given initial schema and superclasses.
699 * 'schema' is the column/attribute definition for the table. (It's a list
700 * of ColumnDef's.) It is destructively changed.
701 * 'supers' is a list of names (as RangeVar nodes) of parent relations.
702 * 'istemp' is TRUE if we are creating a temp relation.
705 * 'supOids' receives a list of the OIDs of the parent relations.
706 * 'supconstr' receives a list of constraints belonging to the parents,
707 * updated as necessary to be valid for the child.
708 * 'supOidCount' is set to the number of parents that have OID columns.
711 * Completed schema list.
714 * The order in which the attributes are inherited is very important.
715 * Intuitively, the inherited attributes should come first. If a table
716 * inherits from multiple parents, the order of those attributes are
717 * according to the order of the parents specified in CREATE TABLE.
721 * create table person (name text, age int4, location point);
722 * create table emp (salary int4, manager text) inherits(person);
723 * create table student (gpa float8) inherits (person);
724 * create table stud_emp (percent int4) inherits (emp, student);
726 * The order of the attributes of stud_emp is:
728 * person {1:name, 2:age, 3:location}
730 * {6:gpa} student emp {4:salary, 5:manager}
732 * stud_emp {7:percent}
734 * If the same attribute name appears multiple times, then it appears
735 * in the result table in the proper location for its first appearance.
737 * Constraints (including NOT NULL constraints) for the child table
738 * are the union of all relevant constraints, from both the child schema
741 * The default value for a child column is defined as:
742 * (1) If the child schema specifies a default, that value is used.
743 * (2) If neither the child nor any parent specifies a default, then
744 * the column will not have a default.
745 * (3) If conflicting defaults are inherited from different parents
746 * (and not overridden by the child), an error is raised.
747 * (4) Otherwise the inherited default is used.
748 * Rule (3) is new in Postgres 7.1; in earlier releases you got a
749 * rather arbitrary choice of which parent default to use.
753 MergeAttributes(List *schema, List *supers, bool istemp,
754 List **supOids, List **supconstr, int *supOidCount)
757 List *inhSchema = NIL;
758 List *parentOids = NIL;
759 List *constraints = NIL;
760 int parentsWithOids = 0;
761 bool have_bogus_defaults = false;
762 char *bogus_marker = "Bogus!"; /* marks conflicting defaults */
766 * Check for and reject tables with too many columns. We perform this
767 * check relatively early for two reasons: (a) we don't run the risk of
768 * overflowing an AttrNumber in subsequent code (b) an O(n^2) algorithm is
769 * okay if we're processing <= 1600 columns, but could take minutes to
770 * execute if the user attempts to create a table with hundreds of
771 * thousands of columns.
773 * Note that we also need to check that any we do not exceed this figure
774 * after including columns from inherited relations.
776 if (list_length(schema) > MaxHeapAttributeNumber)
778 (errcode(ERRCODE_TOO_MANY_COLUMNS),
779 errmsg("tables can have at most %d columns",
780 MaxHeapAttributeNumber)));
783 * Check for duplicate names in the explicit list of attributes.
785 * Although we might consider merging such entries in the same way that we
786 * handle name conflicts for inherited attributes, it seems to make more
787 * sense to assume such conflicts are errors.
789 foreach(entry, schema)
791 ColumnDef *coldef = lfirst(entry);
794 for_each_cell(rest, lnext(entry))
796 ColumnDef *restdef = lfirst(rest);
798 if (strcmp(coldef->colname, restdef->colname) == 0)
800 (errcode(ERRCODE_DUPLICATE_COLUMN),
801 errmsg("column \"%s\" duplicated",
807 * Scan the parents left-to-right, and merge their attributes to form a
808 * list of inherited attributes (inhSchema). Also check to see if we need
809 * to inherit an OID column.
812 foreach(entry, supers)
814 RangeVar *parent = (RangeVar *) lfirst(entry);
818 AttrNumber *newattno;
819 AttrNumber parent_attno;
821 relation = heap_openrv(parent, AccessShareLock);
823 if (relation->rd_rel->relkind != RELKIND_RELATION)
825 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
826 errmsg("inherited relation \"%s\" is not a table",
828 /* Permanent rels cannot inherit from temporary ones */
829 if (!istemp && isTempNamespace(RelationGetNamespace(relation)))
831 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
832 errmsg("cannot inherit from temporary relation \"%s\"",
836 * We should have an UNDER permission flag for this, but for now,
837 * demand that creator of a child table own the parent.
839 if (!pg_class_ownercheck(RelationGetRelid(relation), GetUserId()))
840 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
841 RelationGetRelationName(relation));
844 * Reject duplications in the list of parents.
846 if (list_member_oid(parentOids, RelationGetRelid(relation)))
848 (errcode(ERRCODE_DUPLICATE_TABLE),
849 errmsg("inherited relation \"%s\" duplicated",
852 parentOids = lappend_oid(parentOids, RelationGetRelid(relation));
854 if (relation->rd_rel->relhasoids)
857 tupleDesc = RelationGetDescr(relation);
858 constr = tupleDesc->constr;
861 * newattno[] will contain the child-table attribute numbers for the
862 * attributes of this parent table. (They are not the same for
863 * parents after the first one, nor if we have dropped columns.)
865 newattno = (AttrNumber *)
866 palloc(tupleDesc->natts * sizeof(AttrNumber));
868 for (parent_attno = 1; parent_attno <= tupleDesc->natts;
871 Form_pg_attribute attribute = tupleDesc->attrs[parent_attno - 1];
872 char *attributeName = NameStr(attribute->attname);
878 * Ignore dropped columns in the parent.
880 if (attribute->attisdropped)
883 * change_varattnos_of_a_node asserts that this is greater
884 * than zero, so if anything tries to use it, we should find
887 newattno[parent_attno - 1] = 0;
892 * Does it conflict with some previously inherited column?
894 exist_attno = findAttrByName(attributeName, inhSchema);
898 * Yes, try to merge the two column definitions. They must
899 * have the same type and typmod.
902 (errmsg("merging multiple inherited definitions of column \"%s\"",
904 def = (ColumnDef *) list_nth(inhSchema, exist_attno - 1);
905 if (typenameTypeId(def->typename) != attribute->atttypid ||
906 def->typename->typmod != attribute->atttypmod)
908 (errcode(ERRCODE_DATATYPE_MISMATCH),
909 errmsg("inherited column \"%s\" has a type conflict",
911 errdetail("%s versus %s",
912 TypeNameToString(def->typename),
913 format_type_be(attribute->atttypid))));
915 /* Merge of NOT NULL constraints = OR 'em together */
916 def->is_not_null |= attribute->attnotnull;
917 /* Default and other constraints are handled below */
918 newattno[parent_attno - 1] = exist_attno;
923 * No, create a new inherited column
925 def = makeNode(ColumnDef);
926 def->colname = pstrdup(attributeName);
927 typename = makeNode(TypeName);
928 typename->typeid = attribute->atttypid;
929 typename->typmod = attribute->atttypmod;
930 def->typename = typename;
932 def->is_local = false;
933 def->is_not_null = attribute->attnotnull;
934 def->raw_default = NULL;
935 def->cooked_default = NULL;
936 def->constraints = NIL;
938 inhSchema = lappend(inhSchema, def);
939 newattno[parent_attno - 1] = ++child_attno;
943 * Copy default if any
945 if (attribute->atthasdef)
947 char *this_default = NULL;
948 AttrDefault *attrdef;
951 /* Find default in constraint structure */
952 Assert(constr != NULL);
953 attrdef = constr->defval;
954 for (i = 0; i < constr->num_defval; i++)
956 if (attrdef[i].adnum == parent_attno)
958 this_default = attrdef[i].adbin;
962 Assert(this_default != NULL);
965 * If default expr could contain any vars, we'd need to fix
966 * 'em, but it can't; so default is ready to apply to child.
968 * If we already had a default from some prior parent, check
969 * to see if they are the same. If so, no problem; if not,
970 * mark the column as having a bogus default. Below, we will
971 * complain if the bogus default isn't overridden by the child
974 Assert(def->raw_default == NULL);
975 if (def->cooked_default == NULL)
976 def->cooked_default = pstrdup(this_default);
977 else if (strcmp(def->cooked_default, this_default) != 0)
979 def->cooked_default = bogus_marker;
980 have_bogus_defaults = true;
986 * Now copy the constraints of this parent, adjusting attnos using the
987 * completed newattno[] map
989 if (constr && constr->num_check > 0)
991 ConstrCheck *check = constr->check;
994 for (i = 0; i < constr->num_check; i++)
996 Constraint *cdef = makeNode(Constraint);
999 cdef->contype = CONSTR_CHECK;
1000 cdef->name = pstrdup(check[i].ccname);
1001 cdef->raw_expr = NULL;
1002 /* adjust varattnos of ccbin here */
1003 expr = stringToNode(check[i].ccbin);
1004 change_varattnos_of_a_node(expr, newattno);
1005 cdef->cooked_expr = nodeToString(expr);
1006 constraints = lappend(constraints, cdef);
1013 * Close the parent rel, but keep our AccessShareLock on it until xact
1014 * commit. That will prevent someone else from deleting or ALTERing
1015 * the parent before the child is committed.
1017 heap_close(relation, NoLock);
1021 * If we had no inherited attributes, the result schema is just the
1022 * explicitly declared columns. Otherwise, we need to merge the declared
1023 * columns into the inherited schema list.
1025 if (inhSchema != NIL)
1027 foreach(entry, schema)
1029 ColumnDef *newdef = lfirst(entry);
1030 char *attributeName = newdef->colname;
1034 * Does it conflict with some previously inherited column?
1036 exist_attno = findAttrByName(attributeName, inhSchema);
1037 if (exist_attno > 0)
1042 * Yes, try to merge the two column definitions. They must
1043 * have the same type and typmod.
1046 (errmsg("merging column \"%s\" with inherited definition",
1048 def = (ColumnDef *) list_nth(inhSchema, exist_attno - 1);
1049 if (typenameTypeId(def->typename) != typenameTypeId(newdef->typename) ||
1050 def->typename->typmod != newdef->typename->typmod)
1052 (errcode(ERRCODE_DATATYPE_MISMATCH),
1053 errmsg("column \"%s\" has a type conflict",
1055 errdetail("%s versus %s",
1056 TypeNameToString(def->typename),
1057 TypeNameToString(newdef->typename))));
1058 /* Mark the column as locally defined */
1059 def->is_local = true;
1060 /* Merge of NOT NULL constraints = OR 'em together */
1061 def->is_not_null |= newdef->is_not_null;
1062 /* If new def has a default, override previous default */
1063 if (newdef->raw_default != NULL)
1065 def->raw_default = newdef->raw_default;
1066 def->cooked_default = newdef->cooked_default;
1072 * No, attach new column to result schema
1074 inhSchema = lappend(inhSchema, newdef);
1081 * Check that we haven't exceeded the legal # of columns after merging
1082 * in inherited columns.
1084 if (list_length(schema) > MaxHeapAttributeNumber)
1086 (errcode(ERRCODE_TOO_MANY_COLUMNS),
1087 errmsg("tables can have at most %d columns",
1088 MaxHeapAttributeNumber)));
1092 * If we found any conflicting parent default values, check to make sure
1093 * they were overridden by the child.
1095 if (have_bogus_defaults)
1097 foreach(entry, schema)
1099 ColumnDef *def = lfirst(entry);
1101 if (def->cooked_default == bogus_marker)
1103 (errcode(ERRCODE_INVALID_COLUMN_DEFINITION),
1104 errmsg("column \"%s\" inherits conflicting default values",
1106 errhint("To resolve the conflict, specify a default explicitly.")));
1110 *supOids = parentOids;
1111 *supconstr = constraints;
1112 *supOidCount = parentsWithOids;
1117 * complementary static functions for MergeAttributes().
1119 * Varattnos of pg_constraint.conbin must be rewritten when subclasses inherit
1120 * constraints from parent classes, since the inherited attributes could
1121 * be given different column numbers in multiple-inheritance cases.
1123 * Note that the passed node tree is modified in place!
1126 change_varattnos_walker(Node *node, const AttrNumber *newattno)
1132 Var *var = (Var *) node;
1134 if (var->varlevelsup == 0 && var->varno == 1 &&
1138 * ??? the following may be a problem when the node is multiply
1139 * referenced though stringToNode() doesn't create such a node
1142 Assert(newattno[var->varattno - 1] > 0);
1143 var->varattno = newattno[var->varattno - 1];
1147 return expression_tree_walker(node, change_varattnos_walker,
1152 change_varattnos_of_a_node(Node *node, const AttrNumber *newattno)
1154 return change_varattnos_walker(node, newattno);
1158 * StoreCatalogInheritance
1159 * Updates the system catalogs with proper inheritance information.
1161 * supers is a list of the OIDs of the new relation's direct ancestors.
1164 StoreCatalogInheritance(Oid relationId, List *supers)
1175 AssertArg(OidIsValid(relationId));
1181 * Store INHERITS information in pg_inherits using direct ancestors only.
1182 * Also enter dependencies on the direct ancestors, and make sure they are
1183 * marked with relhassubclass = true.
1185 * (Once upon a time, both direct and indirect ancestors were found here
1186 * and then entered into pg_ipl. Since that catalog doesn't exist
1187 * anymore, there's no need to look for indirect ancestors.)
1189 relation = heap_open(InheritsRelationId, RowExclusiveLock);
1190 desc = RelationGetDescr(relation);
1193 foreach(entry, supers)
1195 Oid parentOid = lfirst_oid(entry);
1196 Datum datum[Natts_pg_inherits];
1197 char nullarr[Natts_pg_inherits];
1198 ObjectAddress childobject,
1201 datum[0] = ObjectIdGetDatum(relationId); /* inhrel */
1202 datum[1] = ObjectIdGetDatum(parentOid); /* inhparent */
1203 datum[2] = Int16GetDatum(seqNumber); /* inhseqno */
1209 tuple = heap_formtuple(desc, datum, nullarr);
1211 simple_heap_insert(relation, tuple);
1213 CatalogUpdateIndexes(relation, tuple);
1215 heap_freetuple(tuple);
1218 * Store a dependency too
1220 parentobject.classId = RelationRelationId;
1221 parentobject.objectId = parentOid;
1222 parentobject.objectSubId = 0;
1223 childobject.classId = RelationRelationId;
1224 childobject.objectId = relationId;
1225 childobject.objectSubId = 0;
1227 recordDependencyOn(&childobject, &parentobject, DEPENDENCY_NORMAL);
1230 * Mark the parent as having subclasses.
1232 setRelhassubclassInRelation(parentOid, true);
1237 heap_close(relation, RowExclusiveLock);
1241 * Look for an existing schema entry with the given name.
1243 * Returns the index (starting with 1) if attribute already exists in schema,
1247 findAttrByName(const char *attributeName, List *schema)
1254 ColumnDef *def = lfirst(s);
1256 if (strcmp(attributeName, def->colname) == 0)
1265 * Update a relation's pg_class.relhassubclass entry to the given value
1268 setRelhassubclassInRelation(Oid relationId, bool relhassubclass)
1270 Relation relationRelation;
1272 Form_pg_class classtuple;
1275 * Fetch a modifiable copy of the tuple, modify it, update pg_class.
1277 * If the tuple already has the right relhassubclass setting, we don't
1278 * need to update it, but we still need to issue an SI inval message.
1280 relationRelation = heap_open(RelationRelationId, RowExclusiveLock);
1281 tuple = SearchSysCacheCopy(RELOID,
1282 ObjectIdGetDatum(relationId),
1284 if (!HeapTupleIsValid(tuple))
1285 elog(ERROR, "cache lookup failed for relation %u", relationId);
1286 classtuple = (Form_pg_class) GETSTRUCT(tuple);
1288 if (classtuple->relhassubclass != relhassubclass)
1290 classtuple->relhassubclass = relhassubclass;
1291 simple_heap_update(relationRelation, &tuple->t_self, tuple);
1293 /* keep the catalog indexes up to date */
1294 CatalogUpdateIndexes(relationRelation, tuple);
1298 /* no need to change tuple, but force relcache rebuild anyway */
1299 CacheInvalidateRelcacheByTuple(tuple);
1302 heap_freetuple(tuple);
1303 heap_close(relationRelation, RowExclusiveLock);
1308 * renameatt - changes the name of a attribute in a relation
1310 * Attname attribute is changed in attribute catalog.
1311 * No record of the previous attname is kept (correct?).
1313 * get proper relrelation from relation catalog (if not arg)
1314 * scan attribute catalog
1315 * for name conflict (within rel)
1316 * for original attribute (if not arg)
1317 * modify attname in attribute tuple
1318 * insert modified attribute in attribute catalog
1319 * delete original attribute from attribute catalog
1322 renameatt(Oid myrelid,
1323 const char *oldattname,
1324 const char *newattname,
1328 Relation targetrelation;
1329 Relation attrelation;
1331 Form_pg_attribute attform;
1334 ListCell *indexoidscan;
1337 * Grab an exclusive lock on the target table, which we will NOT release
1338 * until end of transaction.
1340 targetrelation = relation_open(myrelid, AccessExclusiveLock);
1343 * permissions checking. this would normally be done in utility.c, but
1344 * this particular routine is recursive.
1346 * normally, only the owner of a class can change its schema.
1348 if (!pg_class_ownercheck(myrelid, GetUserId()))
1349 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
1350 RelationGetRelationName(targetrelation));
1351 if (!allowSystemTableMods && IsSystemRelation(targetrelation))
1353 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1354 errmsg("permission denied: \"%s\" is a system catalog",
1355 RelationGetRelationName(targetrelation))));
1358 * if the 'recurse' flag is set then we are supposed to rename this
1359 * attribute in all classes that inherit from 'relname' (as well as in
1362 * any permissions or problems with duplicate attributes will cause the
1363 * whole transaction to abort, which is what we want -- all or nothing.
1370 /* this routine is actually in the planner */
1371 children = find_all_inheritors(myrelid);
1374 * find_all_inheritors does the recursive search of the inheritance
1375 * hierarchy, so all we have to do is process all of the relids in the
1376 * list that it returns.
1378 foreach(child, children)
1380 Oid childrelid = lfirst_oid(child);
1382 if (childrelid == myrelid)
1384 /* note we need not recurse again */
1385 renameatt(childrelid, oldattname, newattname, false, true);
1391 * If we are told not to recurse, there had better not be any child
1392 * tables; else the rename would put them out of step.
1395 find_inheritance_children(myrelid) != NIL)
1397 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
1398 errmsg("inherited column \"%s\" must be renamed in child tables too",
1402 attrelation = heap_open(AttributeRelationId, RowExclusiveLock);
1404 atttup = SearchSysCacheCopyAttName(myrelid, oldattname);
1405 if (!HeapTupleIsValid(atttup))
1407 (errcode(ERRCODE_UNDEFINED_COLUMN),
1408 errmsg("column \"%s\" does not exist",
1410 attform = (Form_pg_attribute) GETSTRUCT(atttup);
1412 attnum = attform->attnum;
1415 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1416 errmsg("cannot rename system column \"%s\"",
1420 * if the attribute is inherited, forbid the renaming, unless we are
1421 * already inside a recursive rename.
1423 if (attform->attinhcount > 0 && !recursing)
1425 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
1426 errmsg("cannot rename inherited column \"%s\"",
1429 /* should not already exist */
1430 /* this test is deliberately not attisdropped-aware */
1431 if (SearchSysCacheExists(ATTNAME,
1432 ObjectIdGetDatum(myrelid),
1433 PointerGetDatum(newattname),
1436 (errcode(ERRCODE_DUPLICATE_COLUMN),
1437 errmsg("column \"%s\" of relation \"%s\" already exists",
1438 newattname, RelationGetRelationName(targetrelation))));
1440 namestrcpy(&(attform->attname), newattname);
1442 simple_heap_update(attrelation, &atttup->t_self, atttup);
1444 /* keep system catalog indexes current */
1445 CatalogUpdateIndexes(attrelation, atttup);
1447 heap_freetuple(atttup);
1450 * Update column names of indexes that refer to the column being renamed.
1452 indexoidlist = RelationGetIndexList(targetrelation);
1454 foreach(indexoidscan, indexoidlist)
1456 Oid indexoid = lfirst_oid(indexoidscan);
1458 Form_pg_index indexform;
1462 * Scan through index columns to see if there's any simple index
1463 * entries for this attribute. We ignore expressional entries.
1465 indextup = SearchSysCache(INDEXRELID,
1466 ObjectIdGetDatum(indexoid),
1468 if (!HeapTupleIsValid(indextup))
1469 elog(ERROR, "cache lookup failed for index %u", indexoid);
1470 indexform = (Form_pg_index) GETSTRUCT(indextup);
1472 for (i = 0; i < indexform->indnatts; i++)
1474 if (attnum != indexform->indkey.values[i])
1478 * Found one, rename it.
1480 atttup = SearchSysCacheCopy(ATTNUM,
1481 ObjectIdGetDatum(indexoid),
1482 Int16GetDatum(i + 1),
1484 if (!HeapTupleIsValid(atttup))
1485 continue; /* should we raise an error? */
1488 * Update the (copied) attribute tuple.
1490 namestrcpy(&(((Form_pg_attribute) GETSTRUCT(atttup))->attname),
1493 simple_heap_update(attrelation, &atttup->t_self, atttup);
1495 /* keep system catalog indexes current */
1496 CatalogUpdateIndexes(attrelation, atttup);
1498 heap_freetuple(atttup);
1501 ReleaseSysCache(indextup);
1504 list_free(indexoidlist);
1506 heap_close(attrelation, RowExclusiveLock);
1509 * Update att name in any RI triggers associated with the relation.
1511 if (targetrelation->rd_rel->reltriggers > 0)
1513 /* update tgargs column reference where att is primary key */
1514 update_ri_trigger_args(RelationGetRelid(targetrelation),
1515 oldattname, newattname,
1517 /* update tgargs column reference where att is foreign key */
1518 update_ri_trigger_args(RelationGetRelid(targetrelation),
1519 oldattname, newattname,
1523 relation_close(targetrelation, NoLock); /* close rel but keep lock */
1527 * renamerel - change the name of a relation
1529 * XXX - When renaming sequences, we don't bother to modify the
1530 * sequence name that is stored within the sequence itself
1531 * (this would cause problems with MVCC). In the future,
1532 * the sequence name should probably be removed from the
1533 * sequence, AFAIK there's no need for it to be there.
1536 renamerel(Oid myrelid, const char *newrelname)
1538 Relation targetrelation;
1539 Relation relrelation; /* for RELATION relation */
1544 bool relhastriggers;
1547 * Grab an exclusive lock on the target table or index, which we will NOT
1548 * release until end of transaction.
1550 targetrelation = relation_open(myrelid, AccessExclusiveLock);
1552 oldrelname = pstrdup(RelationGetRelationName(targetrelation));
1553 namespaceId = RelationGetNamespace(targetrelation);
1555 if (!allowSystemTableMods && IsSystemRelation(targetrelation))
1557 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1558 errmsg("permission denied: \"%s\" is a system catalog",
1559 RelationGetRelationName(targetrelation))));
1561 relkind = targetrelation->rd_rel->relkind;
1562 relhastriggers = (targetrelation->rd_rel->reltriggers > 0);
1565 * Find relation's pg_class tuple, and make sure newrelname isn't in use.
1567 relrelation = heap_open(RelationRelationId, RowExclusiveLock);
1569 reltup = SearchSysCacheCopy(RELOID,
1570 PointerGetDatum(myrelid),
1572 if (!HeapTupleIsValid(reltup)) /* shouldn't happen */
1573 elog(ERROR, "cache lookup failed for relation %u", myrelid);
1575 if (get_relname_relid(newrelname, namespaceId) != InvalidOid)
1577 (errcode(ERRCODE_DUPLICATE_TABLE),
1578 errmsg("relation \"%s\" already exists",
1582 * Update pg_class tuple with new relname. (Scribbling on reltup is OK
1583 * because it's a copy...)
1585 namestrcpy(&(((Form_pg_class) GETSTRUCT(reltup))->relname), newrelname);
1587 simple_heap_update(relrelation, &reltup->t_self, reltup);
1589 /* keep the system catalog indexes current */
1590 CatalogUpdateIndexes(relrelation, reltup);
1592 heap_freetuple(reltup);
1593 heap_close(relrelation, RowExclusiveLock);
1596 * Also rename the associated type, if any.
1598 if (relkind != RELKIND_INDEX)
1599 TypeRename(oldrelname, namespaceId, newrelname);
1602 * Update rel name in any RI triggers associated with the relation.
1606 /* update tgargs where relname is primary key */
1607 update_ri_trigger_args(myrelid,
1611 /* update tgargs where relname is foreign key */
1612 update_ri_trigger_args(myrelid,
1619 * Close rel, but keep exclusive lock!
1621 relation_close(targetrelation, NoLock);
1625 * Scan pg_trigger for RI triggers that are on the specified relation
1626 * (if fk_scan is false) or have it as the tgconstrrel (if fk_scan
1627 * is true). Update RI trigger args fields matching oldname to contain
1628 * newname instead. If update_relname is true, examine the relname
1629 * fields; otherwise examine the attname fields.
1632 update_ri_trigger_args(Oid relid,
1633 const char *oldname,
1634 const char *newname,
1636 bool update_relname)
1639 ScanKeyData skey[1];
1640 SysScanDesc trigscan;
1642 Datum values[Natts_pg_trigger];
1643 char nulls[Natts_pg_trigger];
1644 char replaces[Natts_pg_trigger];
1646 tgrel = heap_open(TriggerRelationId, RowExclusiveLock);
1649 ScanKeyInit(&skey[0],
1650 Anum_pg_trigger_tgconstrrelid,
1651 BTEqualStrategyNumber, F_OIDEQ,
1652 ObjectIdGetDatum(relid));
1653 trigscan = systable_beginscan(tgrel, TriggerConstrRelidIndexId,
1659 ScanKeyInit(&skey[0],
1660 Anum_pg_trigger_tgrelid,
1661 BTEqualStrategyNumber, F_OIDEQ,
1662 ObjectIdGetDatum(relid));
1663 trigscan = systable_beginscan(tgrel, TriggerRelidNameIndexId,
1668 while ((tuple = systable_getnext(trigscan)) != NULL)
1670 Form_pg_trigger pg_trigger = (Form_pg_trigger) GETSTRUCT(tuple);
1680 const char *arga[RI_MAX_ARGUMENTS];
1683 tg_type = RI_FKey_trigger_type(pg_trigger->tgfoid);
1684 if (tg_type == RI_TRIGGER_NONE)
1686 /* Not an RI trigger, forget it */
1691 * It is an RI trigger, so parse the tgargs bytea.
1693 * NB: we assume the field will never be compressed or moved out of
1694 * line; so does trigger.c ...
1696 tgnargs = pg_trigger->tgnargs;
1698 DatumGetPointer(fastgetattr(tuple,
1699 Anum_pg_trigger_tgargs,
1700 tgrel->rd_att, &isnull));
1701 if (isnull || tgnargs < RI_FIRST_ATTNAME_ARGNO ||
1702 tgnargs > RI_MAX_ARGUMENTS)
1704 /* This probably shouldn't happen, but ignore busted triggers */
1707 argp = (const char *) VARDATA(val);
1708 for (i = 0; i < tgnargs; i++)
1711 argp += strlen(argp) + 1;
1715 * Figure out which item(s) to look at. If the trigger is primary-key
1716 * type and attached to my rel, I should look at the PK fields; if it
1717 * is foreign-key type and attached to my rel, I should look at the FK
1718 * fields. But the opposite rule holds when examining triggers found
1719 * by tgconstrrel search.
1721 examine_pk = (tg_type == RI_TRIGGER_PK) == (!fk_scan);
1726 /* Change the relname if needed */
1727 i = examine_pk ? RI_PK_RELNAME_ARGNO : RI_FK_RELNAME_ARGNO;
1728 if (strcmp(arga[i], oldname) == 0)
1736 /* Change attname(s) if needed */
1737 i = examine_pk ? RI_FIRST_ATTNAME_ARGNO + RI_KEYPAIR_PK_IDX :
1738 RI_FIRST_ATTNAME_ARGNO + RI_KEYPAIR_FK_IDX;
1739 for (; i < tgnargs; i += 2)
1741 if (strcmp(arga[i], oldname) == 0)
1751 /* Don't need to update this tuple */
1756 * Construct modified tgargs bytea.
1759 for (i = 0; i < tgnargs; i++)
1760 newlen += strlen(arga[i]) + 1;
1761 newtgargs = (bytea *) palloc(newlen);
1762 VARATT_SIZEP(newtgargs) = newlen;
1764 for (i = 0; i < tgnargs; i++)
1766 strcpy(((char *) newtgargs) + newlen, arga[i]);
1767 newlen += strlen(arga[i]) + 1;
1771 * Build modified tuple.
1773 for (i = 0; i < Natts_pg_trigger; i++)
1775 values[i] = (Datum) 0;
1779 values[Anum_pg_trigger_tgargs - 1] = PointerGetDatum(newtgargs);
1780 replaces[Anum_pg_trigger_tgargs - 1] = 'r';
1782 tuple = heap_modifytuple(tuple, RelationGetDescr(tgrel), values, nulls, replaces);
1785 * Update pg_trigger and its indexes
1787 simple_heap_update(tgrel, &tuple->t_self, tuple);
1789 CatalogUpdateIndexes(tgrel, tuple);
1792 * Invalidate trigger's relation's relcache entry so that other
1793 * backends (and this one too!) are sent SI message to make them
1794 * rebuild relcache entries. (Ideally this should happen
1797 * We can skip this for triggers on relid itself, since that relcache
1798 * flush will happen anyway due to the table or column rename. We
1799 * just need to catch the far ends of RI relationships.
1801 pg_trigger = (Form_pg_trigger) GETSTRUCT(tuple);
1802 if (pg_trigger->tgrelid != relid)
1803 CacheInvalidateRelcacheByRelid(pg_trigger->tgrelid);
1805 /* free up our scratch memory */
1807 heap_freetuple(tuple);
1810 systable_endscan(trigscan);
1812 heap_close(tgrel, RowExclusiveLock);
1815 * Increment cmd counter to make updates visible; this is needed in case
1816 * the same tuple has to be updated again by next pass (can happen in case
1817 * of a self-referential FK relationship).
1819 CommandCounterIncrement();
1824 * Execute ALTER TABLE, which can be a list of subcommands
1826 * ALTER TABLE is performed in three phases:
1827 * 1. Examine subcommands and perform pre-transformation checking.
1828 * 2. Update system catalogs.
1829 * 3. Scan table(s) to check new constraints, and optionally recopy
1830 * the data into new table(s).
1831 * Phase 3 is not performed unless one or more of the subcommands requires
1832 * it. The intention of this design is to allow multiple independent
1833 * updates of the table schema to be performed with only one pass over the
1836 * ATPrepCmd performs phase 1. A "work queue" entry is created for
1837 * each table to be affected (there may be multiple affected tables if the
1838 * commands traverse a table inheritance hierarchy). Also we do preliminary
1839 * validation of the subcommands, including parse transformation of those
1840 * expressions that need to be evaluated with respect to the old table
1843 * ATRewriteCatalogs performs phase 2 for each affected table (note that
1844 * phases 2 and 3 do no explicit recursion, since phase 1 already did it).
1845 * Certain subcommands need to be performed before others to avoid
1846 * unnecessary conflicts; for example, DROP COLUMN should come before
1847 * ADD COLUMN. Therefore phase 1 divides the subcommands into multiple
1848 * lists, one for each logical "pass" of phase 2.
1850 * ATRewriteTables performs phase 3 for those tables that need it.
1852 * Thanks to the magic of MVCC, an error anywhere along the way rolls back
1853 * the whole operation; we don't have to do anything special to clean up.
1856 AlterTable(AlterTableStmt *stmt)
1858 ATController(relation_openrv(stmt->relation, AccessExclusiveLock),
1860 interpretInhOption(stmt->relation->inhOpt));
1864 * AlterTableInternal
1866 * ALTER TABLE with target specified by OID
1869 AlterTableInternal(Oid relid, List *cmds, bool recurse)
1871 ATController(relation_open(relid, AccessExclusiveLock),
1877 ATController(Relation rel, List *cmds, bool recurse)
1882 /* Phase 1: preliminary examination of commands, create work queue */
1885 AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lcmd);
1887 ATPrepCmd(&wqueue, rel, cmd, recurse, false);
1890 /* Close the relation, but keep lock until commit */
1891 relation_close(rel, NoLock);
1893 /* Phase 2: update system catalogs */
1894 ATRewriteCatalogs(&wqueue);
1896 /* Phase 3: scan/rewrite tables as needed */
1897 ATRewriteTables(&wqueue);
1903 * Traffic cop for ALTER TABLE Phase 1 operations, including simple
1904 * recursion and permission checks.
1906 * Caller must have acquired AccessExclusiveLock on relation already.
1907 * This lock should be held until commit.
1910 ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
1911 bool recurse, bool recursing)
1913 AlteredTableInfo *tab;
1916 /* Find or create work queue entry for this table */
1917 tab = ATGetQueueEntry(wqueue, rel);
1920 * Copy the original subcommand for each table. This avoids conflicts
1921 * when different child tables need to make different parse
1922 * transformations (for example, the same column may have different column
1923 * numbers in different children).
1925 cmd = copyObject(cmd);
1928 * Do permissions checking, recursion to child tables if needed, and any
1929 * additional phase-1 processing needed.
1931 switch (cmd->subtype)
1933 case AT_AddColumn: /* ADD COLUMN */
1934 ATSimplePermissions(rel, false);
1935 /* Performs own recursion */
1936 ATPrepAddColumn(wqueue, rel, recurse, cmd);
1937 pass = AT_PASS_ADD_COL;
1939 case AT_ColumnDefault: /* ALTER COLUMN DEFAULT */
1942 * We allow defaults on views so that INSERT into a view can have
1943 * default-ish behavior. This works because the rewriter
1944 * substitutes default values into INSERTs before it expands
1947 ATSimplePermissions(rel, true);
1948 ATSimpleRecursion(wqueue, rel, cmd, recurse);
1949 /* No command-specific prep needed */
1950 pass = AT_PASS_ADD_CONSTR;
1952 case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */
1953 ATSimplePermissions(rel, false);
1954 ATSimpleRecursion(wqueue, rel, cmd, recurse);
1955 /* No command-specific prep needed */
1956 pass = AT_PASS_DROP;
1958 case AT_SetNotNull: /* ALTER COLUMN SET NOT NULL */
1959 ATSimplePermissions(rel, false);
1960 ATSimpleRecursion(wqueue, rel, cmd, recurse);
1961 /* No command-specific prep needed */
1962 pass = AT_PASS_ADD_CONSTR;
1964 case AT_SetStatistics: /* ALTER COLUMN STATISTICS */
1965 ATSimpleRecursion(wqueue, rel, cmd, recurse);
1966 /* Performs own permission checks */
1967 ATPrepSetStatistics(rel, cmd->name, cmd->def);
1968 pass = AT_PASS_COL_ATTRS;
1970 case AT_SetStorage: /* ALTER COLUMN STORAGE */
1971 ATSimplePermissions(rel, false);
1972 ATSimpleRecursion(wqueue, rel, cmd, recurse);
1973 /* No command-specific prep needed */
1974 pass = AT_PASS_COL_ATTRS;
1976 case AT_DropColumn: /* DROP COLUMN */
1977 ATSimplePermissions(rel, false);
1978 /* Recursion occurs during execution phase */
1979 /* No command-specific prep needed except saving recurse flag */
1981 cmd->subtype = AT_DropColumnRecurse;
1982 pass = AT_PASS_DROP;
1984 case AT_AddIndex: /* ADD INDEX */
1985 ATSimplePermissions(rel, false);
1986 /* This command never recurses */
1987 /* No command-specific prep needed */
1988 pass = AT_PASS_ADD_INDEX;
1990 case AT_AddConstraint: /* ADD CONSTRAINT */
1991 ATSimplePermissions(rel, false);
1994 * Currently we recurse only for CHECK constraints, never for
1995 * foreign-key constraints. UNIQUE/PKEY constraints won't be seen
1998 if (IsA(cmd->def, Constraint))
1999 ATSimpleRecursion(wqueue, rel, cmd, recurse);
2000 /* No command-specific prep needed */
2001 pass = AT_PASS_ADD_CONSTR;
2003 case AT_DropConstraint: /* DROP CONSTRAINT */
2004 ATSimplePermissions(rel, false);
2005 /* Performs own recursion */
2006 ATPrepDropConstraint(wqueue, rel, recurse, cmd);
2007 pass = AT_PASS_DROP;
2009 case AT_DropConstraintQuietly: /* DROP CONSTRAINT for child */
2010 ATSimplePermissions(rel, false);
2011 ATSimpleRecursion(wqueue, rel, cmd, recurse);
2012 /* No command-specific prep needed */
2013 pass = AT_PASS_DROP;
2015 case AT_AlterColumnType: /* ALTER COLUMN TYPE */
2016 ATSimplePermissions(rel, false);
2017 /* Performs own recursion */
2018 ATPrepAlterColumnType(wqueue, tab, rel, recurse, recursing, cmd);
2019 pass = AT_PASS_ALTER_TYPE;
2021 case AT_ToastTable: /* CREATE TOAST TABLE */
2022 ATSimplePermissions(rel, false);
2023 /* This command never recurses */
2024 /* No command-specific prep needed */
2025 pass = AT_PASS_MISC;
2027 case AT_ChangeOwner: /* ALTER OWNER */
2028 /* This command never recurses */
2029 /* No command-specific prep needed */
2030 pass = AT_PASS_MISC;
2032 case AT_ClusterOn: /* CLUSTER ON */
2033 case AT_DropCluster: /* SET WITHOUT CLUSTER */
2034 ATSimplePermissions(rel, false);
2035 /* These commands never recurse */
2036 /* No command-specific prep needed */
2037 pass = AT_PASS_MISC;
2039 case AT_DropOids: /* SET WITHOUT OIDS */
2040 ATSimplePermissions(rel, false);
2041 /* Performs own recursion */
2042 if (rel->rd_rel->relhasoids)
2044 AlterTableCmd *dropCmd = makeNode(AlterTableCmd);
2046 dropCmd->subtype = AT_DropColumn;
2047 dropCmd->name = pstrdup("oid");
2048 dropCmd->behavior = cmd->behavior;
2049 ATPrepCmd(wqueue, rel, dropCmd, recurse, false);
2051 pass = AT_PASS_DROP;
2053 case AT_SetTableSpace: /* SET TABLESPACE */
2054 /* This command never recurses */
2055 ATPrepSetTableSpace(tab, rel, cmd->name);
2056 pass = AT_PASS_MISC; /* doesn't actually matter */
2058 case AT_EnableTrig: /* ENABLE TRIGGER variants */
2059 case AT_EnableTrigAll:
2060 case AT_EnableTrigUser:
2061 case AT_DisableTrig: /* DISABLE TRIGGER variants */
2062 case AT_DisableTrigAll:
2063 case AT_DisableTrigUser:
2064 ATSimplePermissions(rel, false);
2065 /* These commands never recurse */
2066 /* No command-specific prep needed */
2067 pass = AT_PASS_MISC;
2070 elog(ERROR, "unrecognized alter table type: %d",
2071 (int) cmd->subtype);
2072 pass = 0; /* keep compiler quiet */
2076 /* Add the subcommand to the appropriate list for phase 2 */
2077 tab->subcmds[pass] = lappend(tab->subcmds[pass], cmd);
2083 * Traffic cop for ALTER TABLE Phase 2 operations. Subcommands are
2084 * dispatched in a "safe" execution order (designed to avoid unnecessary
2088 ATRewriteCatalogs(List **wqueue)
2094 * We process all the tables "in parallel", one pass at a time. This is
2095 * needed because we may have to propagate work from one table to another
2096 * (specifically, ALTER TYPE on a foreign key's PK has to dispatch the
2097 * re-adding of the foreign key constraint to the other table). Work can
2098 * only be propagated into later passes, however.
2100 for (pass = 0; pass < AT_NUM_PASSES; pass++)
2102 /* Go through each table that needs to be processed */
2103 foreach(ltab, *wqueue)
2105 AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
2106 List *subcmds = tab->subcmds[pass];
2114 * Exclusive lock was obtained by phase 1, needn't get it again
2116 rel = relation_open(tab->relid, NoLock);
2118 foreach(lcmd, subcmds)
2119 ATExecCmd(tab, rel, (AlterTableCmd *) lfirst(lcmd));
2122 * After the ALTER TYPE pass, do cleanup work (this is not done in
2123 * ATExecAlterColumnType since it should be done only once if
2124 * multiple columns of a table are altered).
2126 if (pass == AT_PASS_ALTER_TYPE)
2127 ATPostAlterTypeCleanup(wqueue, tab);
2129 relation_close(rel, NoLock);
2134 * Do an implicit CREATE TOAST TABLE if we executed any subcommands that
2135 * might have added a column or changed column storage.
2137 foreach(ltab, *wqueue)
2139 AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
2141 if (tab->relkind == RELKIND_RELATION &&
2142 (tab->subcmds[AT_PASS_ADD_COL] ||
2143 tab->subcmds[AT_PASS_ALTER_TYPE] ||
2144 tab->subcmds[AT_PASS_COL_ATTRS]))
2145 AlterTableCreateToastTable(tab->relid, true);
2150 * ATExecCmd: dispatch a subcommand to appropriate execution routine
2153 ATExecCmd(AlteredTableInfo *tab, Relation rel, AlterTableCmd *cmd)
2155 switch (cmd->subtype)
2157 case AT_AddColumn: /* ADD COLUMN */
2158 ATExecAddColumn(tab, rel, (ColumnDef *) cmd->def);
2160 case AT_ColumnDefault: /* ALTER COLUMN DEFAULT */
2161 ATExecColumnDefault(rel, cmd->name, cmd->def);
2163 case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */
2164 ATExecDropNotNull(rel, cmd->name);
2166 case AT_SetNotNull: /* ALTER COLUMN SET NOT NULL */
2167 ATExecSetNotNull(tab, rel, cmd->name);
2169 case AT_SetStatistics: /* ALTER COLUMN STATISTICS */
2170 ATExecSetStatistics(rel, cmd->name, cmd->def);
2172 case AT_SetStorage: /* ALTER COLUMN STORAGE */
2173 ATExecSetStorage(rel, cmd->name, cmd->def);
2175 case AT_DropColumn: /* DROP COLUMN */
2176 ATExecDropColumn(rel, cmd->name, cmd->behavior, false, false);
2178 case AT_DropColumnRecurse: /* DROP COLUMN with recursion */
2179 ATExecDropColumn(rel, cmd->name, cmd->behavior, true, false);
2181 case AT_AddIndex: /* ADD INDEX */
2182 ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, false);
2184 case AT_ReAddIndex: /* ADD INDEX */
2185 ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, true);
2187 case AT_AddConstraint: /* ADD CONSTRAINT */
2188 ATExecAddConstraint(tab, rel, cmd->def);
2190 case AT_DropConstraint: /* DROP CONSTRAINT */
2191 ATExecDropConstraint(rel, cmd->name, cmd->behavior, false);
2193 case AT_DropConstraintQuietly: /* DROP CONSTRAINT for child */
2194 ATExecDropConstraint(rel, cmd->name, cmd->behavior, true);
2196 case AT_AlterColumnType: /* ALTER COLUMN TYPE */
2197 ATExecAlterColumnType(tab, rel, cmd->name, (TypeName *) cmd->def);
2199 case AT_ToastTable: /* CREATE TOAST TABLE */
2200 AlterTableCreateToastTable(RelationGetRelid(rel), false);
2202 case AT_ChangeOwner: /* ALTER OWNER */
2203 ATExecChangeOwner(RelationGetRelid(rel),
2204 get_roleid_checked(cmd->name),
2207 case AT_ClusterOn: /* CLUSTER ON */
2208 ATExecClusterOn(rel, cmd->name);
2210 case AT_DropCluster: /* SET WITHOUT CLUSTER */
2211 ATExecDropCluster(rel);
2213 case AT_DropOids: /* SET WITHOUT OIDS */
2216 * Nothing to do here; we'll have generated a DropColumn
2217 * subcommand to do the real work
2220 case AT_SetTableSpace: /* SET TABLESPACE */
2223 * Nothing to do here; Phase 3 does the work
2226 case AT_EnableTrig: /* ENABLE TRIGGER name */
2227 ATExecEnableDisableTrigger(rel, cmd->name, true, false);
2229 case AT_DisableTrig: /* DISABLE TRIGGER name */
2230 ATExecEnableDisableTrigger(rel, cmd->name, false, false);
2232 case AT_EnableTrigAll: /* ENABLE TRIGGER ALL */
2233 ATExecEnableDisableTrigger(rel, NULL, true, false);
2235 case AT_DisableTrigAll: /* DISABLE TRIGGER ALL */
2236 ATExecEnableDisableTrigger(rel, NULL, false, false);
2238 case AT_EnableTrigUser: /* ENABLE TRIGGER USER */
2239 ATExecEnableDisableTrigger(rel, NULL, true, true);
2241 case AT_DisableTrigUser: /* DISABLE TRIGGER USER */
2242 ATExecEnableDisableTrigger(rel, NULL, false, true);
2245 elog(ERROR, "unrecognized alter table type: %d",
2246 (int) cmd->subtype);
2251 * Bump the command counter to ensure the next subcommand in the sequence
2252 * can see the changes so far
2254 CommandCounterIncrement();
2258 * ATRewriteTables: ALTER TABLE phase 3
2261 ATRewriteTables(List **wqueue)
2265 /* Go through each table that needs to be checked or rewritten */
2266 foreach(ltab, *wqueue)
2268 AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
2271 * We only need to rewrite the table if at least one column needs to
2274 if (tab->newvals != NIL)
2276 /* Build a temporary relation and copy data */
2278 char NewHeapName[NAMEDATALEN];
2281 ObjectAddress object;
2283 OldHeap = heap_open(tab->relid, NoLock);
2286 * We can never allow rewriting of shared or nailed-in-cache
2287 * relations, because we can't support changing their relfilenode
2290 if (OldHeap->rd_rel->relisshared || OldHeap->rd_isnailed)
2292 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2293 errmsg("cannot rewrite system relation \"%s\"",
2294 RelationGetRelationName(OldHeap))));
2297 * Don't allow rewrite on temp tables of other backends ... their
2298 * local buffer manager is not going to cope.
2300 if (isOtherTempNamespace(RelationGetNamespace(OldHeap)))
2302 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2303 errmsg("cannot rewrite temporary tables of other sessions")));
2306 * Select destination tablespace (same as original unless user
2307 * requested a change)
2309 if (tab->newTableSpace)
2310 NewTableSpace = tab->newTableSpace;
2312 NewTableSpace = OldHeap->rd_rel->reltablespace;
2314 heap_close(OldHeap, NoLock);
2317 * Create the new heap, using a temporary name in the same
2318 * namespace as the existing table. NOTE: there is some risk of
2319 * collision with user relnames. Working around this seems more
2320 * trouble than it's worth; in particular, we can't create the new
2321 * heap in a different namespace from the old, or we will have
2322 * problems with the TEMP status of temp tables.
2324 snprintf(NewHeapName, sizeof(NewHeapName),
2325 "pg_temp_%u", tab->relid);
2327 OIDNewHeap = make_new_heap(tab->relid, NewHeapName, NewTableSpace);
2330 * Copy the heap data into the new table with the desired
2331 * modifications, and test the current data within the table
2332 * against new constraints generated by ALTER TABLE commands.
2334 ATRewriteTable(tab, OIDNewHeap);
2336 /* Swap the physical files of the old and new heaps. */
2337 swap_relation_files(tab->relid, OIDNewHeap);
2339 CommandCounterIncrement();
2341 /* Destroy new heap with old filenode */
2342 object.classId = RelationRelationId;
2343 object.objectId = OIDNewHeap;
2344 object.objectSubId = 0;
2347 * The new relation is local to our transaction and we know
2348 * nothing depends on it, so DROP_RESTRICT should be OK.
2350 performDeletion(&object, DROP_RESTRICT);
2351 /* performDeletion does CommandCounterIncrement at end */
2354 * Rebuild each index on the relation (but not the toast table,
2355 * which is all-new anyway). We do not need
2356 * CommandCounterIncrement() because reindex_relation does it.
2358 reindex_relation(tab->relid, false);
2363 * Test the current data within the table against new constraints
2364 * generated by ALTER TABLE commands, but don't rebuild data.
2366 if (tab->constraints != NIL)
2367 ATRewriteTable(tab, InvalidOid);
2370 * If we had SET TABLESPACE but no reason to reconstruct tuples,
2371 * just do a block-by-block copy.
2373 if (tab->newTableSpace)
2374 ATExecSetTableSpace(tab->relid, tab->newTableSpace);
2379 * Foreign key constraints are checked in a final pass, since (a) it's
2380 * generally best to examine each one separately, and (b) it's at least
2381 * theoretically possible that we have changed both relations of the
2382 * foreign key, and we'd better have finished both rewrites before we try
2383 * to read the tables.
2385 foreach(ltab, *wqueue)
2387 AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
2388 Relation rel = NULL;
2391 foreach(lcon, tab->constraints)
2393 NewConstraint *con = lfirst(lcon);
2395 if (con->contype == CONSTR_FOREIGN)
2397 FkConstraint *fkconstraint = (FkConstraint *) con->qual;
2402 /* Long since locked, no need for another */
2403 rel = heap_open(tab->relid, NoLock);
2406 refrel = heap_open(con->refrelid, RowShareLock);
2408 validateForeignKeyConstraint(fkconstraint, rel, refrel);
2410 heap_close(refrel, NoLock);
2415 heap_close(rel, NoLock);
2420 * ATRewriteTable: scan or rewrite one table
2422 * OIDNewHeap is InvalidOid if we don't need to rewrite
2425 ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap)
2429 TupleDesc oldTupDesc;
2430 TupleDesc newTupDesc;
2431 bool needscan = false;
2437 * Open the relation(s). We have surely already locked the existing
2440 oldrel = heap_open(tab->relid, NoLock);
2441 oldTupDesc = tab->oldDesc;
2442 newTupDesc = RelationGetDescr(oldrel); /* includes all mods */
2444 if (OidIsValid(OIDNewHeap))
2445 newrel = heap_open(OIDNewHeap, AccessExclusiveLock);
2450 * If we need to rewrite the table, the operation has to be propagated to
2451 * tables that use this table's rowtype as a column type.
2453 * (Eventually this will probably become true for scans as well, but at
2454 * the moment a composite type does not enforce any constraints, so it's
2455 * not necessary/appropriate to enforce them just during ALTER.)
2458 find_composite_type_dependencies(oldrel->rd_rel->reltype,
2459 RelationGetRelationName(oldrel));
2462 * Generate the constraint and default execution states
2465 estate = CreateExecutorState();
2467 /* Build the needed expression execution states */
2468 foreach(l, tab->constraints)
2470 NewConstraint *con = lfirst(l);
2472 switch (con->contype)
2476 con->qualstate = (List *)
2477 ExecPrepareExpr((Expr *) con->qual, estate);
2479 case CONSTR_FOREIGN:
2480 /* Nothing to do here */
2482 case CONSTR_NOTNULL:
2486 elog(ERROR, "unrecognized constraint type: %d",
2487 (int) con->contype);
2491 foreach(l, tab->newvals)
2493 NewColumnValue *ex = lfirst(l);
2497 ex->exprstate = ExecPrepareExpr((Expr *) ex->expr, estate);
2502 ExprContext *econtext;
2505 TupleTableSlot *oldslot;
2506 TupleTableSlot *newslot;
2509 MemoryContext oldCxt;
2510 List *dropped_attrs = NIL;
2513 econtext = GetPerTupleExprContext(estate);
2516 * Make tuple slots for old and new tuples. Note that even when the
2517 * tuples are the same, the tupDescs might not be (consider ADD COLUMN
2518 * without a default).
2520 oldslot = MakeSingleTupleTableSlot(oldTupDesc);
2521 newslot = MakeSingleTupleTableSlot(newTupDesc);
2523 /* Preallocate values/isnull arrays */
2524 i = Max(newTupDesc->natts, oldTupDesc->natts);
2525 values = (Datum *) palloc(i * sizeof(Datum));
2526 isnull = (bool *) palloc(i * sizeof(bool));
2527 memset(values, 0, i * sizeof(Datum));
2528 memset(isnull, true, i * sizeof(bool));
2531 * Any attributes that are dropped according to the new tuple
2532 * descriptor can be set to NULL. We precompute the list of dropped
2533 * attributes to avoid needing to do so in the per-tuple loop.
2535 for (i = 0; i < newTupDesc->natts; i++)
2537 if (newTupDesc->attrs[i]->attisdropped)
2538 dropped_attrs = lappend_int(dropped_attrs, i);
2542 * Scan through the rows, generating a new row if needed and then
2543 * checking all the constraints.
2545 scan = heap_beginscan(oldrel, SnapshotNow, 0, NULL);
2548 * Switch to per-tuple memory context and reset it for each tuple
2549 * produced, so we don't leak memory.
2551 oldCxt = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2553 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
2557 Oid tupOid = InvalidOid;
2559 /* Extract data from old tuple */
2560 heap_deform_tuple(tuple, oldTupDesc, values, isnull);
2561 if (oldTupDesc->tdhasoid)
2562 tupOid = HeapTupleGetOid(tuple);
2564 /* Set dropped attributes to null in new tuple */
2565 foreach(lc, dropped_attrs)
2566 isnull[lfirst_int(lc)] = true;
2569 * Process supplied expressions to replace selected columns.
2570 * Expression inputs come from the old tuple.
2572 ExecStoreTuple(tuple, oldslot, InvalidBuffer, false);
2573 econtext->ecxt_scantuple = oldslot;
2575 foreach(l, tab->newvals)
2577 NewColumnValue *ex = lfirst(l);
2579 values[ex->attnum - 1] = ExecEvalExpr(ex->exprstate,
2581 &isnull[ex->attnum - 1],
2586 * Form the new tuple. Note that we don't explicitly pfree it,
2587 * since the per-tuple memory context will be reset shortly.
2589 tuple = heap_form_tuple(newTupDesc, values, isnull);
2591 /* Preserve OID, if any */
2592 if (newTupDesc->tdhasoid)
2593 HeapTupleSetOid(tuple, tupOid);
2596 /* Now check any constraints on the possibly-changed tuple */
2597 ExecStoreTuple(tuple, newslot, InvalidBuffer, false);
2598 econtext->ecxt_scantuple = newslot;
2600 foreach(l, tab->constraints)
2602 NewConstraint *con = lfirst(l);
2604 switch (con->contype)
2607 if (!ExecQual(con->qualstate, econtext, true))
2609 (errcode(ERRCODE_CHECK_VIOLATION),
2610 errmsg("check constraint \"%s\" is violated by some row",
2613 case CONSTR_NOTNULL:
2618 d = heap_getattr(tuple, con->attnum, newTupDesc,
2622 (errcode(ERRCODE_NOT_NULL_VIOLATION),
2623 errmsg("column \"%s\" contains null values",
2624 get_attname(tab->relid,
2628 case CONSTR_FOREIGN:
2629 /* Nothing to do here */
2632 elog(ERROR, "unrecognized constraint type: %d",
2633 (int) con->contype);
2637 /* Write the tuple out to the new relation */
2639 simple_heap_insert(newrel, tuple);
2641 ResetExprContext(econtext);
2643 CHECK_FOR_INTERRUPTS();
2646 MemoryContextSwitchTo(oldCxt);
2650 FreeExecutorState(estate);
2652 heap_close(oldrel, NoLock);
2654 heap_close(newrel, NoLock);
2658 * ATGetQueueEntry: find or create an entry in the ALTER TABLE work queue
2660 static AlteredTableInfo *
2661 ATGetQueueEntry(List **wqueue, Relation rel)
2663 Oid relid = RelationGetRelid(rel);
2664 AlteredTableInfo *tab;
2667 foreach(ltab, *wqueue)
2669 tab = (AlteredTableInfo *) lfirst(ltab);
2670 if (tab->relid == relid)
2675 * Not there, so add it. Note that we make a copy of the relation's
2676 * existing descriptor before anything interesting can happen to it.
2678 tab = (AlteredTableInfo *) palloc0(sizeof(AlteredTableInfo));
2680 tab->relkind = rel->rd_rel->relkind;
2681 tab->oldDesc = CreateTupleDescCopy(RelationGetDescr(rel));
2683 *wqueue = lappend(*wqueue, tab);
2689 * ATSimplePermissions
2691 * - Ensure that it is a relation (or possibly a view)
2692 * - Ensure this user is the owner
2693 * - Ensure that it is not a system table
2696 ATSimplePermissions(Relation rel, bool allowView)
2698 if (rel->rd_rel->relkind != RELKIND_RELATION)
2702 if (rel->rd_rel->relkind != RELKIND_VIEW)
2704 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2705 errmsg("\"%s\" is not a table or view",
2706 RelationGetRelationName(rel))));
2710 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2711 errmsg("\"%s\" is not a table",
2712 RelationGetRelationName(rel))));
2715 /* Permissions checks */
2716 if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
2717 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
2718 RelationGetRelationName(rel));
2720 if (!allowSystemTableMods && IsSystemRelation(rel))
2722 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2723 errmsg("permission denied: \"%s\" is a system catalog",
2724 RelationGetRelationName(rel))));
2730 * Simple table recursion sufficient for most ALTER TABLE operations.
2731 * All direct and indirect children are processed in an unspecified order.
2732 * Note that if a child inherits from the original table via multiple
2733 * inheritance paths, it will be visited just once.
2736 ATSimpleRecursion(List **wqueue, Relation rel,
2737 AlterTableCmd *cmd, bool recurse)
2740 * Propagate to children if desired. Non-table relations never have
2741 * children, so no need to search in that case.
2743 if (recurse && rel->rd_rel->relkind == RELKIND_RELATION)
2745 Oid relid = RelationGetRelid(rel);
2749 /* this routine is actually in the planner */
2750 children = find_all_inheritors(relid);
2753 * find_all_inheritors does the recursive search of the inheritance
2754 * hierarchy, so all we have to do is process all of the relids in the
2755 * list that it returns.
2757 foreach(child, children)
2759 Oid childrelid = lfirst_oid(child);
2762 if (childrelid == relid)
2764 childrel = relation_open(childrelid, AccessExclusiveLock);
2765 ATPrepCmd(wqueue, childrel, cmd, false, true);
2766 relation_close(childrel, NoLock);
2772 * ATOneLevelRecursion
2774 * Here, we visit only direct inheritance children. It is expected that
2775 * the command's prep routine will recurse again to find indirect children.
2776 * When using this technique, a multiply-inheriting child will be visited
2780 ATOneLevelRecursion(List **wqueue, Relation rel,
2783 Oid relid = RelationGetRelid(rel);
2787 /* this routine is actually in the planner */
2788 children = find_inheritance_children(relid);
2790 foreach(child, children)
2792 Oid childrelid = lfirst_oid(child);
2795 childrel = relation_open(childrelid, AccessExclusiveLock);
2796 ATPrepCmd(wqueue, childrel, cmd, true, true);
2797 relation_close(childrel, NoLock);
2803 * find_composite_type_dependencies
2805 * Check to see if a table's rowtype is being used as a column in some
2806 * other table (possibly nested several levels deep in composite types!).
2807 * Eventually, we'd like to propagate the check or rewrite operation
2808 * into other such tables, but for now, just error out if we find any.
2810 * We assume that functions and views depending on the type are not reasons
2811 * to reject the ALTER. (How safe is this really?)
2814 find_composite_type_dependencies(Oid typeOid, const char *origTblName)
2818 SysScanDesc depScan;
2822 * We scan pg_depend to find those things that depend on the rowtype. (We
2823 * assume we can ignore refobjsubid for a rowtype.)
2825 depRel = heap_open(DependRelationId, AccessShareLock);
2827 ScanKeyInit(&key[0],
2828 Anum_pg_depend_refclassid,
2829 BTEqualStrategyNumber, F_OIDEQ,
2830 ObjectIdGetDatum(TypeRelationId));
2831 ScanKeyInit(&key[1],
2832 Anum_pg_depend_refobjid,
2833 BTEqualStrategyNumber, F_OIDEQ,
2834 ObjectIdGetDatum(typeOid));
2836 depScan = systable_beginscan(depRel, DependReferenceIndexId, true,
2837 SnapshotNow, 2, key);
2839 while (HeapTupleIsValid(depTup = systable_getnext(depScan)))
2841 Form_pg_depend pg_depend = (Form_pg_depend) GETSTRUCT(depTup);
2843 Form_pg_attribute att;
2845 /* Ignore dependees that aren't user columns of relations */
2846 /* (we assume system columns are never of rowtypes) */
2847 if (pg_depend->classid != RelationRelationId ||
2848 pg_depend->objsubid <= 0)
2851 rel = relation_open(pg_depend->objid, AccessShareLock);
2852 att = rel->rd_att->attrs[pg_depend->objsubid - 1];
2854 if (rel->rd_rel->relkind == RELKIND_RELATION)
2857 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2858 errmsg("cannot alter table \"%s\" because column \"%s\".\"%s\" uses its rowtype",
2860 RelationGetRelationName(rel),
2861 NameStr(att->attname))));
2863 else if (OidIsValid(rel->rd_rel->reltype))
2866 * A view or composite type itself isn't a problem, but we must
2867 * recursively check for indirect dependencies via its rowtype.
2869 find_composite_type_dependencies(rel->rd_rel->reltype,
2873 relation_close(rel, AccessShareLock);
2876 systable_endscan(depScan);
2878 relation_close(depRel, AccessShareLock);
2883 * ALTER TABLE ADD COLUMN
2885 * Adds an additional attribute to a relation making the assumption that
2886 * CHECK, NOT NULL, and FOREIGN KEY constraints will be removed from the
2887 * AT_AddColumn AlterTableCmd by analyze.c and added as independent
2891 ATPrepAddColumn(List **wqueue, Relation rel, bool recurse,
2895 * Recurse to add the column to child classes, if requested.
2897 * We must recurse one level at a time, so that multiply-inheriting
2898 * children are visited the right number of times and end up with the
2899 * right attinhcount.
2903 AlterTableCmd *childCmd = copyObject(cmd);
2904 ColumnDef *colDefChild = (ColumnDef *) childCmd->def;
2906 /* Child should see column as singly inherited */
2907 colDefChild->inhcount = 1;
2908 colDefChild->is_local = false;
2909 /* and don't make a support dependency on the child */
2910 colDefChild->support = NULL;
2912 ATOneLevelRecursion(wqueue, rel, childCmd);
2917 * If we are told not to recurse, there had better not be any child
2918 * tables; else the addition would put them out of step.
2920 if (find_inheritance_children(RelationGetRelid(rel)) != NIL)
2922 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
2923 errmsg("column must be added to child tables too")));
2928 ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
2931 Oid myrelid = RelationGetRelid(rel);
2935 HeapTuple attributeTuple;
2936 Form_pg_attribute attribute;
2937 FormData_pg_attribute attributeD;
2941 HeapTuple typeTuple;
2946 attrdesc = heap_open(AttributeRelationId, RowExclusiveLock);
2949 * Are we adding the column to a recursion child? If so, check whether to
2950 * merge with an existing definition for the column.
2952 if (colDef->inhcount > 0)
2956 /* Does child already have a column by this name? */
2957 tuple = SearchSysCacheCopyAttName(myrelid, colDef->colname);
2958 if (HeapTupleIsValid(tuple))
2960 Form_pg_attribute childatt = (Form_pg_attribute) GETSTRUCT(tuple);
2962 /* Okay if child matches by type */
2963 if (typenameTypeId(colDef->typename) != childatt->atttypid ||
2964 colDef->typename->typmod != childatt->atttypmod)
2966 (errcode(ERRCODE_DATATYPE_MISMATCH),
2967 errmsg("child table \"%s\" has different type for column \"%s\"",
2968 RelationGetRelationName(rel), colDef->colname)));
2970 /* Bump the existing child att's inhcount */
2971 childatt->attinhcount++;
2972 simple_heap_update(attrdesc, &tuple->t_self, tuple);
2973 CatalogUpdateIndexes(attrdesc, tuple);
2975 heap_freetuple(tuple);
2977 /* Inform the user about the merge */
2979 (errmsg("merging definition of column \"%s\" for child \"%s\"",
2980 colDef->colname, RelationGetRelationName(rel))));
2982 heap_close(attrdesc, RowExclusiveLock);
2987 pgclass = heap_open(RelationRelationId, RowExclusiveLock);
2989 reltup = SearchSysCacheCopy(RELOID,
2990 ObjectIdGetDatum(myrelid),
2992 if (!HeapTupleIsValid(reltup))
2993 elog(ERROR, "cache lookup failed for relation %u", myrelid);
2996 * this test is deliberately not attisdropped-aware, since if one tries to
2997 * add a column matching a dropped column name, it's gonna fail anyway.
2999 if (SearchSysCacheExists(ATTNAME,
3000 ObjectIdGetDatum(myrelid),
3001 PointerGetDatum(colDef->colname),
3004 (errcode(ERRCODE_DUPLICATE_COLUMN),
3005 errmsg("column \"%s\" of relation \"%s\" already exists",
3006 colDef->colname, RelationGetRelationName(rel))));
3008 minattnum = ((Form_pg_class) GETSTRUCT(reltup))->relnatts;
3009 maxatts = minattnum + 1;
3010 if (maxatts > MaxHeapAttributeNumber)
3012 (errcode(ERRCODE_TOO_MANY_COLUMNS),
3013 errmsg("tables can have at most %d columns",
3014 MaxHeapAttributeNumber)));
3017 typeTuple = typenameType(colDef->typename);
3018 tform = (Form_pg_type) GETSTRUCT(typeTuple);
3019 typeOid = HeapTupleGetOid(typeTuple);
3021 /* make sure datatype is legal for a column */
3022 CheckAttributeType(colDef->colname, typeOid);
3024 attributeTuple = heap_addheader(Natts_pg_attribute,
3026 ATTRIBUTE_TUPLE_SIZE,
3027 (void *) &attributeD);
3029 attribute = (Form_pg_attribute) GETSTRUCT(attributeTuple);
3031 attribute->attrelid = myrelid;
3032 namestrcpy(&(attribute->attname), colDef->colname);
3033 attribute->atttypid = typeOid;
3034 attribute->attstattarget = -1;
3035 attribute->attlen = tform->typlen;
3036 attribute->attcacheoff = -1;
3037 attribute->atttypmod = colDef->typename->typmod;
3038 attribute->attnum = i;
3039 attribute->attbyval = tform->typbyval;
3040 attribute->attndims = list_length(colDef->typename->arrayBounds);
3041 attribute->attstorage = tform->typstorage;
3042 attribute->attalign = tform->typalign;
3043 attribute->attnotnull = colDef->is_not_null;
3044 attribute->atthasdef = false;
3045 attribute->attisdropped = false;
3046 attribute->attislocal = colDef->is_local;
3047 attribute->attinhcount = colDef->inhcount;
3049 ReleaseSysCache(typeTuple);
3051 simple_heap_insert(attrdesc, attributeTuple);
3053 /* Update indexes on pg_attribute */
3054 CatalogUpdateIndexes(attrdesc, attributeTuple);
3056 heap_close(attrdesc, RowExclusiveLock);
3059 * Update number of attributes in pg_class tuple
3061 ((Form_pg_class) GETSTRUCT(reltup))->relnatts = maxatts;
3063 simple_heap_update(pgclass, &reltup->t_self, reltup);
3065 /* keep catalog indexes current */
3066 CatalogUpdateIndexes(pgclass, reltup);
3068 heap_freetuple(reltup);
3070 heap_close(pgclass, RowExclusiveLock);
3072 /* Make the attribute's catalog entry visible */
3073 CommandCounterIncrement();
3076 * Store the DEFAULT, if any, in the catalogs
3078 if (colDef->raw_default)
3080 RawColumnDefault *rawEnt;
3082 rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault));
3083 rawEnt->attnum = attribute->attnum;
3084 rawEnt->raw_default = copyObject(colDef->raw_default);
3087 * This function is intended for CREATE TABLE, so it processes a
3088 * _list_ of defaults, but we just do one.
3090 AddRelationRawConstraints(rel, list_make1(rawEnt), NIL);
3092 /* Make the additional catalog changes visible */
3093 CommandCounterIncrement();
3097 * Tell Phase 3 to fill in the default expression, if there is one.
3099 * If there is no default, Phase 3 doesn't have to do anything, because
3100 * that effectively means that the default is NULL. The heap tuple access
3101 * routines always check for attnum > # of attributes in tuple, and return
3102 * NULL if so, so without any modification of the tuple data we will get
3103 * the effect of NULL values in the new column.
3105 * An exception occurs when the new column is of a domain type: the domain
3106 * might have a NOT NULL constraint, or a check constraint that indirectly
3107 * rejects nulls. If there are any domain constraints then we construct
3108 * an explicit NULL default value that will be passed through
3109 * CoerceToDomain processing. (This is a tad inefficient, since it causes
3110 * rewriting the table which we really don't have to do, but the present
3111 * design of domain processing doesn't offer any simple way of checking
3112 * the constraints more directly.)
3114 * Note: we use build_column_default, and not just the cooked default
3115 * returned by AddRelationRawConstraints, so that the right thing happens
3116 * when a datatype's default applies.
3118 defval = (Expr *) build_column_default(rel, attribute->attnum);
3120 if (!defval && GetDomainConstraints(typeOid) != NIL)
3122 Oid basetype = getBaseType(typeOid);
3124 defval = (Expr *) makeNullConst(basetype);
3125 defval = (Expr *) coerce_to_target_type(NULL,
3129 colDef->typename->typmod,
3130 COERCION_ASSIGNMENT,
3131 COERCE_IMPLICIT_CAST);
3132 if (defval == NULL) /* should not happen */
3133 elog(ERROR, "failed to coerce base type to domain");
3138 NewColumnValue *newval;
3140 newval = (NewColumnValue *) palloc0(sizeof(NewColumnValue));
3141 newval->attnum = attribute->attnum;
3142 newval->expr = defval;
3144 tab->newvals = lappend(tab->newvals, newval);
3148 * Add needed dependency entries for the new column.
3150 add_column_datatype_dependency(myrelid, i, attribute->atttypid);
3151 if (colDef->support != NULL)
3152 add_column_support_dependency(myrelid, i, colDef->support);
3156 * Install a column's dependency on its datatype.
3159 add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid)
3161 ObjectAddress myself,
3164 myself.classId = RelationRelationId;
3165 myself.objectId = relid;
3166 myself.objectSubId = attnum;
3167 referenced.classId = TypeRelationId;
3168 referenced.objectId = typid;
3169 referenced.objectSubId = 0;
3170 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
3174 * Install a dependency for a column's supporting relation (serial sequence).
3177 add_column_support_dependency(Oid relid, int32 attnum, RangeVar *support)
3179 ObjectAddress colobject,
3182 colobject.classId = RelationRelationId;
3183 colobject.objectId = relid;
3184 colobject.objectSubId = attnum;
3185 suppobject.classId = RelationRelationId;
3186 suppobject.objectId = RangeVarGetRelid(support, false);
3187 suppobject.objectSubId = 0;
3188 recordDependencyOn(&suppobject, &colobject, DEPENDENCY_INTERNAL);
3192 * ALTER TABLE ALTER COLUMN DROP NOT NULL
3195 ATExecDropNotNull(Relation rel, const char *colName)
3201 ListCell *indexoidscan;
3204 * lookup the attribute
3206 attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
3208 tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
3210 if (!HeapTupleIsValid(tuple))
3212 (errcode(ERRCODE_UNDEFINED_COLUMN),
3213 errmsg("column \"%s\" of relation \"%s\" does not exist",
3214 colName, RelationGetRelationName(rel))));
3216 attnum = ((Form_pg_attribute) GETSTRUCT(tuple))->attnum;
3218 /* Prevent them from altering a system attribute */
3221 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3222 errmsg("cannot alter system column \"%s\"",
3226 * Check that the attribute is not in a primary key
3229 /* Loop over all indexes on the relation */
3230 indexoidlist = RelationGetIndexList(rel);
3232 foreach(indexoidscan, indexoidlist)
3234 Oid indexoid = lfirst_oid(indexoidscan);
3235 HeapTuple indexTuple;
3236 Form_pg_index indexStruct;
3239 indexTuple = SearchSysCache(INDEXRELID,
3240 ObjectIdGetDatum(indexoid),
3242 if (!HeapTupleIsValid(indexTuple))
3243 elog(ERROR, "cache lookup failed for index %u", indexoid);
3244 indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
3246 /* If the index is not a primary key, skip the check */
3247 if (indexStruct->indisprimary)
3250 * Loop over each attribute in the primary key and see if it
3251 * matches the to-be-altered attribute
3253 for (i = 0; i < indexStruct->indnatts; i++)
3255 if (indexStruct->indkey.values[i] == attnum)
3257 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
3258 errmsg("column \"%s\" is in a primary key",
3263 ReleaseSysCache(indexTuple);
3266 list_free(indexoidlist);
3269 * Okay, actually perform the catalog change ... if needed
3271 if (((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull)
3273 ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull = FALSE;
3275 simple_heap_update(attr_rel, &tuple->t_self, tuple);
3277 /* keep the system catalog indexes current */
3278 CatalogUpdateIndexes(attr_rel, tuple);
3281 heap_close(attr_rel, RowExclusiveLock);
3285 * ALTER TABLE ALTER COLUMN SET NOT NULL
3288 ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
3289 const char *colName)
3294 NewConstraint *newcon;
3297 * lookup the attribute
3299 attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
3301 tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
3303 if (!HeapTupleIsValid(tuple))
3305 (errcode(ERRCODE_UNDEFINED_COLUMN),
3306 errmsg("column \"%s\" of relation \"%s\" does not exist",
3307 colName, RelationGetRelationName(rel))));
3309 attnum = ((Form_pg_attribute) GETSTRUCT(tuple))->attnum;
3311 /* Prevent them from altering a system attribute */
3314 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3315 errmsg("cannot alter system column \"%s\"",
3319 * Okay, actually perform the catalog change ... if needed
3321 if (!((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull)
3323 ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull = TRUE;
3325 simple_heap_update(attr_rel, &tuple->t_self, tuple);
3327 /* keep the system catalog indexes current */
3328 CatalogUpdateIndexes(attr_rel, tuple);
3330 /* Tell Phase 3 to test the constraint */
3331 newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
3332 newcon->contype = CONSTR_NOTNULL;
3333 newcon->attnum = attnum;
3334 newcon->name = "NOT NULL";
3336 tab->constraints = lappend(tab->constraints, newcon);
3339 heap_close(attr_rel, RowExclusiveLock);
3343 * ALTER TABLE ALTER COLUMN SET/DROP DEFAULT
3346 ATExecColumnDefault(Relation rel, const char *colName,
3352 * get the number of the attribute
3354 attnum = get_attnum(RelationGetRelid(rel), colName);
3355 if (attnum == InvalidAttrNumber)
3357 (errcode(ERRCODE_UNDEFINED_COLUMN),
3358 errmsg("column \"%s\" of relation \"%s\" does not exist",
3359 colName, RelationGetRelationName(rel))));
3361 /* Prevent them from altering a system attribute */
3364 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3365 errmsg("cannot alter system column \"%s\"",
3369 * Remove any old default for the column. We use RESTRICT here for
3370 * safety, but at present we do not expect anything to depend on the
3373 RemoveAttrDefault(RelationGetRelid(rel), attnum, DROP_RESTRICT, false);
3378 RawColumnDefault *rawEnt;
3380 rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault));
3381 rawEnt->attnum = attnum;
3382 rawEnt->raw_default = newDefault;
3385 * This function is intended for CREATE TABLE, so it processes a
3386 * _list_ of defaults, but we just do one.
3388 AddRelationRawConstraints(rel, list_make1(rawEnt), NIL);
3393 * ALTER TABLE ALTER COLUMN SET STATISTICS
3396 ATPrepSetStatistics(Relation rel, const char *colName, Node *flagValue)
3399 * We do our own permission checking because (a) we want to allow SET
3400 * STATISTICS on indexes (for expressional index columns), and (b) we want
3401 * to allow SET STATISTICS on system catalogs without requiring
3402 * allowSystemTableMods to be turned on.
3404 if (rel->rd_rel->relkind != RELKIND_RELATION &&
3405 rel->rd_rel->relkind != RELKIND_INDEX)
3407 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
3408 errmsg("\"%s\" is not a table or index",
3409 RelationGetRelationName(rel))));
3411 /* Permissions checks */
3412 if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
3413 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
3414 RelationGetRelationName(rel));
3418 ATExecSetStatistics(Relation rel, const char *colName, Node *newValue)
3421 Relation attrelation;
3423 Form_pg_attribute attrtuple;
3425 Assert(IsA(newValue, Integer));
3426 newtarget = intVal(newValue);
3429 * Limit target to a sane range
3434 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3435 errmsg("statistics target %d is too low",
3438 else if (newtarget > 1000)
3442 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3443 errmsg("lowering statistics target to %d",
3447 attrelation = heap_open(AttributeRelationId, RowExclusiveLock);
3449 tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
3451 if (!HeapTupleIsValid(tuple))
3453 (errcode(ERRCODE_UNDEFINED_COLUMN),
3454 errmsg("column \"%s\" of relation \"%s\" does not exist",
3455 colName, RelationGetRelationName(rel))));
3456 attrtuple = (Form_pg_attribute) GETSTRUCT(tuple);
3458 if (attrtuple->attnum <= 0)
3460 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3461 errmsg("cannot alter system column \"%s\"",
3464 attrtuple->attstattarget = newtarget;
3466 simple_heap_update(attrelation, &tuple->t_self, tuple);
3468 /* keep system catalog indexes current */
3469 CatalogUpdateIndexes(attrelation, tuple);
3471 heap_freetuple(tuple);
3473 heap_close(attrelation, RowExclusiveLock);
3477 * ALTER TABLE ALTER COLUMN SET STORAGE
3480 ATExecSetStorage(Relation rel, const char *colName, Node *newValue)
3484 Relation attrelation;
3486 Form_pg_attribute attrtuple;
3488 Assert(IsA(newValue, String));
3489 storagemode = strVal(newValue);
3491 if (pg_strcasecmp(storagemode, "plain") == 0)
3493 else if (pg_strcasecmp(storagemode, "external") == 0)
3495 else if (pg_strcasecmp(storagemode, "extended") == 0)
3497 else if (pg_strcasecmp(storagemode, "main") == 0)
3502 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3503 errmsg("invalid storage type \"%s\"",
3505 newstorage = 0; /* keep compiler quiet */
3508 attrelation = heap_open(AttributeRelationId, RowExclusiveLock);
3510 tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
3512 if (!HeapTupleIsValid(tuple))
3514 (errcode(ERRCODE_UNDEFINED_COLUMN),
3515 errmsg("column \"%s\" of relation \"%s\" does not exist",
3516 colName, RelationGetRelationName(rel))));
3517 attrtuple = (Form_pg_attribute) GETSTRUCT(tuple);
3519 if (attrtuple->attnum <= 0)
3521 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3522 errmsg("cannot alter system column \"%s\"",
3526 * safety check: do not allow toasted storage modes unless column datatype
3529 if (newstorage == 'p' || TypeIsToastable(attrtuple->atttypid))
3530 attrtuple->attstorage = newstorage;
3533 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3534 errmsg("column data type %s can only have storage PLAIN",
3535 format_type_be(attrtuple->atttypid))));
3537 simple_heap_update(attrelation, &tuple->t_self, tuple);
3539 /* keep system catalog indexes current */
3540 CatalogUpdateIndexes(attrelation, tuple);
3542 heap_freetuple(tuple);
3544 heap_close(attrelation, RowExclusiveLock);
3549 * ALTER TABLE DROP COLUMN
3551 * DROP COLUMN cannot use the normal ALTER TABLE recursion mechanism,
3552 * because we have to decide at runtime whether to recurse or not depending
3553 * on whether attinhcount goes to zero or not. (We can't check this in a
3554 * static pre-pass because it won't handle multiple inheritance situations
3555 * correctly.) Since DROP COLUMN doesn't need to create any work queue
3556 * entries for Phase 3, it's okay to recurse internally in this routine
3557 * without considering the work queue.
3560 ATExecDropColumn(Relation rel, const char *colName,
3561 DropBehavior behavior,
3562 bool recurse, bool recursing)
3565 Form_pg_attribute targetatt;
3568 ObjectAddress object;
3570 /* At top level, permission check was done in ATPrepCmd, else do it */
3572 ATSimplePermissions(rel, false);
3575 * get the number of the attribute
3577 tuple = SearchSysCacheAttName(RelationGetRelid(rel), colName);
3578 if (!HeapTupleIsValid(tuple))
3580 (errcode(ERRCODE_UNDEFINED_COLUMN),
3581 errmsg("column \"%s\" of relation \"%s\" does not exist",
3582 colName, RelationGetRelationName(rel))));
3583 targetatt = (Form_pg_attribute) GETSTRUCT(tuple);
3585 attnum = targetatt->attnum;
3587 /* Can't drop a system attribute, except OID */
3588 if (attnum <= 0 && attnum != ObjectIdAttributeNumber)
3590 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3591 errmsg("cannot drop system column \"%s\"",
3594 /* Don't drop inherited columns */
3595 if (targetatt->attinhcount > 0 && !recursing)
3597 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
3598 errmsg("cannot drop inherited column \"%s\"",
3601 ReleaseSysCache(tuple);
3604 * Propagate to children as appropriate. Unlike most other ALTER
3605 * routines, we have to do this one level of recursion at a time; we can't
3606 * use find_all_inheritors to do it in one pass.
3608 children = find_inheritance_children(RelationGetRelid(rel));
3615 attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
3616 foreach(child, children)
3618 Oid childrelid = lfirst_oid(child);
3620 Form_pg_attribute childatt;
3622 childrel = heap_open(childrelid, AccessExclusiveLock);
3624 tuple = SearchSysCacheCopyAttName(childrelid, colName);
3625 if (!HeapTupleIsValid(tuple)) /* shouldn't happen */
3626 elog(ERROR, "cache lookup failed for attribute \"%s\" of relation %u",
3627 colName, childrelid);
3628 childatt = (Form_pg_attribute) GETSTRUCT(tuple);
3630 if (childatt->attinhcount <= 0) /* shouldn't happen */
3631 elog(ERROR, "relation %u has non-inherited attribute \"%s\"",
3632 childrelid, colName);
3637 * If the child column has other definition sources, just
3638 * decrement its inheritance count; if not, recurse to delete
3641 if (childatt->attinhcount == 1 && !childatt->attislocal)
3643 /* Time to delete this child column, too */
3644 ATExecDropColumn(childrel, colName, behavior, true, true);
3648 /* Child column must survive my deletion */
3649 childatt->attinhcount--;
3651 simple_heap_update(attr_rel, &tuple->t_self, tuple);
3653 /* keep the system catalog indexes current */
3654 CatalogUpdateIndexes(attr_rel, tuple);
3656 /* Make update visible */
3657 CommandCounterIncrement();
3663 * If we were told to drop ONLY in this table (no recursion),
3664 * we need to mark the inheritors' attribute as locally
3665 * defined rather than inherited.
3667 childatt->attinhcount--;
3668 childatt->attislocal = true;
3670 simple_heap_update(attr_rel, &tuple->t_self, tuple);
3672 /* keep the system catalog indexes current */
3673 CatalogUpdateIndexes(attr_rel, tuple);
3675 /* Make update visible */
3676 CommandCounterIncrement();
3679 heap_freetuple(tuple);
3681 heap_close(childrel, NoLock);
3683 heap_close(attr_rel, RowExclusiveLock);
3687 * Perform the actual column deletion
3689 object.classId = RelationRelationId;
3690 object.objectId = RelationGetRelid(rel);
3691 object.objectSubId = attnum;
3693 performDeletion(&object, behavior);
3696 * If we dropped the OID column, must adjust pg_class.relhasoids
3698 if (attnum == ObjectIdAttributeNumber)
3701 Form_pg_class tuple_class;
3703 class_rel = heap_open(RelationRelationId, RowExclusiveLock);
3705 tuple = SearchSysCacheCopy(RELOID,
3706 ObjectIdGetDatum(RelationGetRelid(rel)),
3708 if (!HeapTupleIsValid(tuple))
3709 elog(ERROR, "cache lookup failed for relation %u",
3710 RelationGetRelid(rel));
3711 tuple_class = (Form_pg_class) GETSTRUCT(tuple);
3713 tuple_class->relhasoids = false;
3714 simple_heap_update(class_rel, &tuple->t_self, tuple);
3716 /* Keep the catalog indexes up to date */
3717 CatalogUpdateIndexes(class_rel, tuple);
3719 heap_close(class_rel, RowExclusiveLock);
3724 * ALTER TABLE ADD INDEX
3726 * There is no such command in the grammar, but the parser converts UNIQUE
3727 * and PRIMARY KEY constraints into AT_AddIndex subcommands. This lets us
3728 * schedule creation of the index at the appropriate time during ALTER.
3731 ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
3732 IndexStmt *stmt, bool is_rebuild)
3738 Assert(IsA(stmt, IndexStmt));
3740 /* suppress schema rights check when rebuilding existing index */
3741 check_rights = !is_rebuild;
3742 /* skip index build if phase 3 will have to rewrite table anyway */
3743 skip_build = (tab->newvals != NIL);
3744 /* suppress notices when rebuilding existing index */
3747 DefineIndex(stmt->relation, /* relation */
3748 stmt->idxname, /* index name */
3749 InvalidOid, /* no predefined OID */
3750 stmt->accessMethod, /* am name */
3752 stmt->indexParams, /* parameters */
3753 (Expr *) stmt->whereClause,
3758 true, /* is_alter_table */
3765 * ALTER TABLE ADD CONSTRAINT
3768 ATExecAddConstraint(AlteredTableInfo *tab, Relation rel, Node *newConstraint)
3770 switch (nodeTag(newConstraint))
3774 Constraint *constr = (Constraint *) newConstraint;
3777 * Currently, we only expect to see CONSTR_CHECK nodes
3778 * arriving here (see the preprocessing done in
3779 * parser/analyze.c). Use a switch anyway to make it easier
3780 * to add more code later.
3782 switch (constr->contype)
3790 * Call AddRelationRawConstraints to do the work.
3791 * It returns a list of cooked constraints.
3793 newcons = AddRelationRawConstraints(rel, NIL,
3794 list_make1(constr));
3795 /* Add each constraint to Phase 3's queue */
3796 foreach(lcon, newcons)
3798 CookedConstraint *ccon = (CookedConstraint *) lfirst(lcon);
3799 NewConstraint *newcon;
3801 newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
3802 newcon->name = ccon->name;
3803 newcon->contype = ccon->contype;
3804 newcon->attnum = ccon->attnum;
3805 /* ExecQual wants implicit-AND format */
3806 newcon->qual = (Node *)
3807 make_ands_implicit((Expr *) ccon->expr);
3809 tab->constraints = lappend(tab->constraints,
3815 elog(ERROR, "unrecognized constraint type: %d",
3816 (int) constr->contype);
3820 case T_FkConstraint:
3822 FkConstraint *fkconstraint = (FkConstraint *) newConstraint;
3825 * Assign or validate constraint name
3827 if (fkconstraint->constr_name)
3829 if (ConstraintNameIsUsed(CONSTRAINT_RELATION,
3830 RelationGetRelid(rel),
3831 RelationGetNamespace(rel),
3832 fkconstraint->constr_name))
3834 (errcode(ERRCODE_DUPLICATE_OBJECT),
3835 errmsg("constraint \"%s\" for relation \"%s\" already exists",
3836 fkconstraint->constr_name,
3837 RelationGetRelationName(rel))));
3840 fkconstraint->constr_name =
3841 ChooseConstraintName(RelationGetRelationName(rel),
3842 strVal(linitial(fkconstraint->fk_attrs)),
3844 RelationGetNamespace(rel),
3847 ATAddForeignKeyConstraint(tab, rel, fkconstraint);
3852 elog(ERROR, "unrecognized node type: %d",
3853 (int) nodeTag(newConstraint));
3858 * Add a foreign-key constraint to a single table
3860 * Subroutine for ATExecAddConstraint. Must already hold exclusive
3861 * lock on the rel, and have done appropriate validity/permissions checks
3865 ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
3866 FkConstraint *fkconstraint)
3869 AclResult aclresult;
3870 int16 pkattnum[INDEX_MAX_KEYS];
3871 int16 fkattnum[INDEX_MAX_KEYS];
3872 Oid pktypoid[INDEX_MAX_KEYS];
3873 Oid fktypoid[INDEX_MAX_KEYS];
3874 Oid opclasses[INDEX_MAX_KEYS];
3882 * Grab an exclusive lock on the pk table, so that someone doesn't delete
3883 * rows out from under us. (Although a lesser lock would do for that
3884 * purpose, we'll need exclusive lock anyway to add triggers to the pk
3885 * table; trying to start with a lesser lock will just create a risk of
3888 pkrel = heap_openrv(fkconstraint->pktable, AccessExclusiveLock);
3891 * Validity and permissions checks
3893 * Note: REFERENCES permissions checks are redundant with CREATE TRIGGER,
3894 * but we may as well error out sooner instead of later.
3896 if (pkrel->rd_rel->relkind != RELKIND_RELATION)
3898 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
3899 errmsg("referenced relation \"%s\" is not a table",
3900 RelationGetRelationName(pkrel))));
3902 aclresult = pg_class_aclcheck(RelationGetRelid(pkrel), GetUserId(),
3904 if (aclresult != ACLCHECK_OK)
3905 aclcheck_error(aclresult, ACL_KIND_CLASS,
3906 RelationGetRelationName(pkrel));
3908 if (!allowSystemTableMods && IsSystemRelation(pkrel))
3910 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
3911 errmsg("permission denied: \"%s\" is a system catalog",
3912 RelationGetRelationName(pkrel))));
3914 aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
3916 if (aclresult != ACLCHECK_OK)
3917 aclcheck_error(aclresult, ACL_KIND_CLASS,
3918 RelationGetRelationName(rel));
3921 * Disallow reference from permanent table to temp table or vice versa.
3922 * (The ban on perm->temp is for fairly obvious reasons. The ban on
3923 * temp->perm is because other backends might need to run the RI triggers
3924 * on the perm table, but they can't reliably see tuples the owning
3925 * backend has created in the temp table, because non-shared buffers are
3926 * used for temp tables.)
3928 if (isTempNamespace(RelationGetNamespace(pkrel)))
3930 if (!isTempNamespace(RelationGetNamespace(rel)))
3932 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
3933 errmsg("cannot reference temporary table from permanent table constraint")));
3937 if (isTempNamespace(RelationGetNamespace(rel)))
3939 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
3940 errmsg("cannot reference permanent table from temporary table constraint")));
3944 * Look up the referencing attributes to make sure they exist, and record
3945 * their attnums and type OIDs.
3947 MemSet(pkattnum, 0, sizeof(pkattnum));
3948 MemSet(fkattnum, 0, sizeof(fkattnum));
3949 MemSet(pktypoid, 0, sizeof(pktypoid));
3950 MemSet(fktypoid, 0, sizeof(fktypoid));
3951 MemSet(opclasses, 0, sizeof(opclasses));
3953 numfks = transformColumnNameList(RelationGetRelid(rel),
3954 fkconstraint->fk_attrs,
3955 fkattnum, fktypoid);
3958 * If the attribute list for the referenced table was omitted, lookup the
3959 * definition of the primary key and use it. Otherwise, validate the
3960 * supplied attribute list. In either case, discover the index OID and
3961 * index opclasses, and the attnums and type OIDs of the attributes.
3963 if (fkconstraint->pk_attrs == NIL)
3965 numpks = transformFkeyGetPrimaryKey(pkrel, &indexOid,
3966 &fkconstraint->pk_attrs,
3972 numpks = transformColumnNameList(RelationGetRelid(pkrel),
3973 fkconstraint->pk_attrs,
3974 pkattnum, pktypoid);
3975 /* Look for an index matching the column list */
3976 indexOid = transformFkeyCheckAttrs(pkrel, numpks, pkattnum,
3980 /* Be sure referencing and referenced column types are comparable */
3981 if (numfks != numpks)
3983 (errcode(ERRCODE_INVALID_FOREIGN_KEY),
3984 errmsg("number of referencing and referenced columns for foreign key disagree")));
3986 for (i = 0; i < numpks; i++)
3989 * pktypoid[i] is the primary key table's i'th key's type fktypoid[i]
3990 * is the foreign key table's i'th key's type
3992 * Note that we look for an operator with the PK type on the left;
3993 * when the types are different this is critical because the PK index
3994 * will need operators with the indexkey on the left. (Ordinarily both
3995 * commutator operators will exist if either does, but we won't get
3996 * the right answer from the test below on opclass membership unless
3997 * we select the proper operator.)
3999 Operator o = oper(list_make1(makeString("=")),
4000 pktypoid[i], fktypoid[i], true);
4004 (errcode(ERRCODE_UNDEFINED_FUNCTION),
4005 errmsg("foreign key constraint \"%s\" "
4006 "cannot be implemented",
4007 fkconstraint->constr_name),
4008 errdetail("Key columns \"%s\" and \"%s\" "
4009 "are of incompatible types: %s and %s.",
4010 strVal(list_nth(fkconstraint->fk_attrs, i)),
4011 strVal(list_nth(fkconstraint->pk_attrs, i)),
4012 format_type_be(fktypoid[i]),
4013 format_type_be(pktypoid[i]))));
4016 * Check that the found operator is compatible with the PK index, and
4017 * generate a warning if not, since otherwise costly seqscans will be
4018 * incurred to check FK validity.
4020 if (!op_in_opclass(oprid(o), opclasses[i]))
4022 (errmsg("foreign key constraint \"%s\" "
4023 "will require costly sequential scans",
4024 fkconstraint->constr_name),
4025 errdetail("Key columns \"%s\" and \"%s\" "
4026 "are of different types: %s and %s.",
4027 strVal(list_nth(fkconstraint->fk_attrs, i)),
4028 strVal(list_nth(fkconstraint->pk_attrs, i)),
4029 format_type_be(fktypoid[i]),
4030 format_type_be(pktypoid[i]))));
4036 * Tell Phase 3 to check that the constraint is satisfied by existing rows
4037 * (we can skip this during table creation).
4039 if (!fkconstraint->skip_validation)
4041 NewConstraint *newcon;
4043 newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
4044 newcon->name = fkconstraint->constr_name;
4045 newcon->contype = CONSTR_FOREIGN;
4046 newcon->refrelid = RelationGetRelid(pkrel);
4047 newcon->qual = (Node *) fkconstraint;
4049 tab->constraints = lappend(tab->constraints, newcon);
4053 * Record the FK constraint in pg_constraint.
4055 constrOid = CreateConstraintEntry(fkconstraint->constr_name,
4056 RelationGetNamespace(rel),
4058 fkconstraint->deferrable,
4059 fkconstraint->initdeferred,
4060 RelationGetRelid(rel),
4063 InvalidOid, /* not a domain
4065 RelationGetRelid(pkrel),
4068 fkconstraint->fk_upd_action,
4069 fkconstraint->fk_del_action,
4070 fkconstraint->fk_matchtype,
4072 NULL, /* no check constraint */
4077 * Create the triggers that will enforce the constraint.
4079 createForeignKeyTriggers(rel, fkconstraint, constrOid);
4082 * Close pk table, but keep lock until we've committed.
4084 heap_close(pkrel, NoLock);
4089 * transformColumnNameList - transform list of column names
4091 * Lookup each name and return its attnum and type OID
4094 transformColumnNameList(Oid relId, List *colList,
4095 int16 *attnums, Oid *atttypids)
4103 char *attname = strVal(lfirst(l));
4106 atttuple = SearchSysCacheAttName(relId, attname);
4107 if (!HeapTupleIsValid(atttuple))
4109 (errcode(ERRCODE_UNDEFINED_COLUMN),
4110 errmsg("column \"%s\" referenced in foreign key constraint does not exist",
4112 if (attnum >= INDEX_MAX_KEYS)
4114 (errcode(ERRCODE_TOO_MANY_COLUMNS),
4115 errmsg("cannot have more than %d keys in a foreign key",
4117 attnums[attnum] = ((Form_pg_attribute) GETSTRUCT(atttuple))->attnum;
4118 atttypids[attnum] = ((Form_pg_attribute) GETSTRUCT(atttuple))->atttypid;
4119 ReleaseSysCache(atttuple);
4127 * transformFkeyGetPrimaryKey -
4129 * Look up the names, attnums, and types of the primary key attributes
4130 * for the pkrel. Also return the index OID and index opclasses of the
4131 * index supporting the primary key.
4133 * All parameters except pkrel are output parameters. Also, the function
4134 * return value is the number of attributes in the primary key.
4136 * Used when the column list in the REFERENCES specification is omitted.
4139 transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
4141 int16 *attnums, Oid *atttypids,
4145 ListCell *indexoidscan;
4146 HeapTuple indexTuple = NULL;
4147 Form_pg_index indexStruct = NULL;
4148 Datum indclassDatum;
4150 oidvector *indclass;
4154 * Get the list of index OIDs for the table from the relcache, and look up
4155 * each one in the pg_index syscache until we find one marked primary key
4156 * (hopefully there isn't more than one such).
4158 *indexOid = InvalidOid;
4160 indexoidlist = RelationGetIndexList(pkrel);
4162 foreach(indexoidscan, indexoidlist)
4164 Oid indexoid = lfirst_oid(indexoidscan);
4166 indexTuple = SearchSysCache(INDEXRELID,
4167 ObjectIdGetDatum(indexoid),
4169 if (!HeapTupleIsValid(indexTuple))
4170 elog(ERROR, "cache lookup failed for index %u", indexoid);
4171 indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
4172 if (indexStruct->indisprimary)
4174 *indexOid = indexoid;
4177 ReleaseSysCache(indexTuple);
4180 list_free(indexoidlist);
4183 * Check that we found it
4185 if (!OidIsValid(*indexOid))
4187 (errcode(ERRCODE_UNDEFINED_OBJECT),
4188 errmsg("there is no primary key for referenced table \"%s\"",
4189 RelationGetRelationName(pkrel))));
4191 /* Must get indclass the hard way */
4192 indclassDatum = SysCacheGetAttr(INDEXRELID, indexTuple,
4193 Anum_pg_index_indclass, &isnull);
4195 indclass = (oidvector *) DatumGetPointer(indclassDatum);
4198 * Now build the list of PK attributes from the indkey definition (we
4199 * assume a primary key cannot have expressional elements)
4202 for (i = 0; i < indexStruct->indnatts; i++)
4204 int pkattno = indexStruct->indkey.values[i];
4206 attnums[i] = pkattno;
4207 atttypids[i] = attnumTypeId(pkrel, pkattno);
4208 opclasses[i] = indclass->values[i];
4209 *attnamelist = lappend(*attnamelist,
4210 makeString(pstrdup(NameStr(*attnumAttName(pkrel, pkattno)))));
4213 ReleaseSysCache(indexTuple);
4219 * transformFkeyCheckAttrs -
4221 * Make sure that the attributes of a referenced table belong to a unique
4222 * (or primary key) constraint. Return the OID of the index supporting
4223 * the constraint, as well as the opclasses associated with the index
4227 transformFkeyCheckAttrs(Relation pkrel,
4228 int numattrs, int16 *attnums,
4229 Oid *opclasses) /* output parameter */
4231 Oid indexoid = InvalidOid;
4234 ListCell *indexoidscan;
4237 * Get the list of index OIDs for the table from the relcache, and look up
4238 * each one in the pg_index syscache, and match unique indexes to the list
4239 * of attnums we are given.
4241 indexoidlist = RelationGetIndexList(pkrel);
4243 foreach(indexoidscan, indexoidlist)
4245 HeapTuple indexTuple;
4246 Form_pg_index indexStruct;
4250 indexoid = lfirst_oid(indexoidscan);
4251 indexTuple = SearchSysCache(INDEXRELID,
4252 ObjectIdGetDatum(indexoid),
4254 if (!HeapTupleIsValid(indexTuple))
4255 elog(ERROR, "cache lookup failed for index %u", indexoid);
4256 indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
4259 * Must have the right number of columns; must be unique and not a
4260 * partial index; forget it if there are any expressions, too
4262 if (indexStruct->indnatts == numattrs &&
4263 indexStruct->indisunique &&
4264 heap_attisnull(indexTuple, Anum_pg_index_indpred) &&
4265 heap_attisnull(indexTuple, Anum_pg_index_indexprs))
4267 /* Must get indclass the hard way */
4268 Datum indclassDatum;
4270 oidvector *indclass;
4272 indclassDatum = SysCacheGetAttr(INDEXRELID, indexTuple,
4273 Anum_pg_index_indclass, &isnull);
4275 indclass = (oidvector *) DatumGetPointer(indclassDatum);
4278 * The given attnum list may match the index columns in any order.
4279 * Check that each list is a subset of the other.
4281 for (i = 0; i < numattrs; i++)
4284 for (j = 0; j < numattrs; j++)
4286 if (attnums[i] == indexStruct->indkey.values[j])
4297 for (i = 0; i < numattrs; i++)
4300 for (j = 0; j < numattrs; j++)
4302 if (attnums[j] == indexStruct->indkey.values[i])
4304 opclasses[j] = indclass->values[i];
4314 ReleaseSysCache(indexTuple);
4321 (errcode(ERRCODE_INVALID_FOREIGN_KEY),
4322 errmsg("there is no unique constraint matching given keys for referenced table \"%s\"",
4323 RelationGetRelationName(pkrel))));
4325 list_free(indexoidlist);
4331 * Scan the existing rows in a table to verify they meet a proposed FK
4334 * Caller must have opened and locked both relations.
4337 validateForeignKeyConstraint(FkConstraint *fkconstraint,
4348 * See if we can do it with a single LEFT JOIN query. A FALSE result
4349 * indicates we must proceed with the fire-the-trigger method.
4351 if (RI_Initial_Check(fkconstraint, rel, pkrel))
4355 * Scan through each tuple, calling RI_FKey_check_ins (insert trigger) as
4356 * if that tuple had just been inserted. If any of those fail, it should
4357 * ereport(ERROR) and that's that.
4359 MemSet(&trig, 0, sizeof(trig));
4360 trig.tgoid = InvalidOid;
4361 trig.tgname = fkconstraint->constr_name;
4362 trig.tgenabled = TRUE;
4363 trig.tgisconstraint = TRUE;
4364 trig.tgconstrrelid = RelationGetRelid(pkrel);
4365 trig.tgdeferrable = FALSE;
4366 trig.tginitdeferred = FALSE;
4368 trig.tgargs = (char **) palloc(sizeof(char *) *
4369 (4 + list_length(fkconstraint->fk_attrs)
4370 + list_length(fkconstraint->pk_attrs)));
4372 trig.tgargs[0] = trig.tgname;
4373 trig.tgargs[1] = RelationGetRelationName(rel);
4374 trig.tgargs[2] = RelationGetRelationName(pkrel);
4375 trig.tgargs[3] = fkMatchTypeToString(fkconstraint->fk_matchtype);
4377 foreach(list, fkconstraint->fk_attrs)
4379 char *fk_at = strVal(lfirst(list));
4381 trig.tgargs[count] = fk_at;
4385 foreach(list, fkconstraint->pk_attrs)
4387 char *pk_at = strVal(lfirst(list));
4389 trig.tgargs[count] = pk_at;
4392 trig.tgnargs = count - 1;
4394 scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
4396 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
4398 FunctionCallInfoData fcinfo;
4399 TriggerData trigdata;
4402 * Make a call to the trigger function
4404 * No parameters are passed, but we do set a context
4406 MemSet(&fcinfo, 0, sizeof(fcinfo));
4409 * We assume RI_FKey_check_ins won't look at flinfo...
4411 trigdata.type = T_TriggerData;
4412 trigdata.tg_event = TRIGGER_EVENT_INSERT | TRIGGER_EVENT_ROW;
4413 trigdata.tg_relation = rel;
4414 trigdata.tg_trigtuple = tuple;
4415 trigdata.tg_newtuple = NULL;
4416 trigdata.tg_trigger = &trig;
4417 trigdata.tg_trigtuplebuf = scan->rs_cbuf;
4418 trigdata.tg_newtuplebuf = InvalidBuffer;
4420 fcinfo.context = (Node *) &trigdata;
4422 RI_FKey_check_ins(&fcinfo);
4431 CreateFKCheckTrigger(RangeVar *myRel, FkConstraint *fkconstraint,
4432 ObjectAddress *constrobj, ObjectAddress *trigobj,
4435 CreateTrigStmt *fk_trigger;
4439 fk_trigger = makeNode(CreateTrigStmt);
4440 fk_trigger->trigname = fkconstraint->constr_name;
4441 fk_trigger->relation = myRel;
4442 fk_trigger->before = false;
4443 fk_trigger->row = true;
4445 /* Either ON INSERT or ON UPDATE */
4448 fk_trigger->funcname = SystemFuncName("RI_FKey_check_ins");
4449 fk_trigger->actions[0] = 'i';
4453 fk_trigger->funcname = SystemFuncName("RI_FKey_check_upd");
4454 fk_trigger->actions[0] = 'u';
4456 fk_trigger->actions[1] = '\0';
4458 fk_trigger->isconstraint = true;
4459 fk_trigger->deferrable = fkconstraint->deferrable;
4460 fk_trigger->initdeferred = fkconstraint->initdeferred;
4461 fk_trigger->constrrel = fkconstraint->pktable;
4463 fk_trigger->args = NIL;
4464 fk_trigger->args = lappend(fk_trigger->args,
4465 makeString(fkconstraint->constr_name));
4466 fk_trigger->args = lappend(fk_trigger->args,
4467 makeString(myRel->relname));
4468 fk_trigger->args = lappend(fk_trigger->args,
4469 makeString(fkconstraint->pktable->relname));
4470 fk_trigger->args = lappend(fk_trigger->args,
4471 makeString(fkMatchTypeToString(fkconstraint->fk_matchtype)));
4472 if (list_length(fkconstraint->fk_attrs) != list_length(fkconstraint->pk_attrs))
4474 (errcode(ERRCODE_INVALID_FOREIGN_KEY),
4475 errmsg("number of referencing and referenced columns for foreign key disagree")));
4477 forboth(fk_attr, fkconstraint->fk_attrs,
4478 pk_attr, fkconstraint->pk_attrs)
4480 fk_trigger->args = lappend(fk_trigger->args, lfirst(fk_attr));
4481 fk_trigger->args = lappend(fk_trigger->args, lfirst(pk_attr));
4484 trigobj->objectId = CreateTrigger(fk_trigger, true);
4486 /* Register dependency from trigger to constraint */
4487 recordDependencyOn(trigobj, constrobj, DEPENDENCY_INTERNAL);
4489 /* Make changes-so-far visible */
4490 CommandCounterIncrement();
4494 * Create the triggers that implement an FK constraint.
4497 createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint,
4501 CreateTrigStmt *fk_trigger;
4504 ObjectAddress trigobj,
4508 * Reconstruct a RangeVar for my relation (not passed in, unfortunately).
4510 myRel = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)),
4511 pstrdup(RelationGetRelationName(rel)));
4514 * Preset objectAddress fields
4516 constrobj.classId = ConstraintRelationId;
4517 constrobj.objectId = constrOid;
4518 constrobj.objectSubId = 0;
4519 trigobj.classId = TriggerRelationId;
4520 trigobj.objectSubId = 0;
4522 /* Make changes-so-far visible */
4523 CommandCounterIncrement();
4526 * Build and execute a CREATE CONSTRAINT TRIGGER statement for the CHECK
4527 * action for both INSERTs and UPDATEs on the referencing table.
4529 CreateFKCheckTrigger(myRel, fkconstraint, &constrobj, &trigobj, true);
4530 CreateFKCheckTrigger(myRel, fkconstraint, &constrobj, &trigobj, false);
4533 * Build and execute a CREATE CONSTRAINT TRIGGER statement for the ON
4534 * DELETE action on the referenced table.
4536 fk_trigger = makeNode(CreateTrigStmt);
4537 fk_trigger->trigname = fkconstraint->constr_name;
4538 fk_trigger->relation = fkconstraint->pktable;
4539 fk_trigger->before = false;
4540 fk_trigger->row = true;
4541 fk_trigger->actions[0] = 'd';
4542 fk_trigger->actions[1] = '\0';
4544 fk_trigger->isconstraint = true;
4545 fk_trigger->constrrel = myRel;
4546 switch (fkconstraint->fk_del_action)
4548 case FKCONSTR_ACTION_NOACTION:
4549 fk_trigger->deferrable = fkconstraint->deferrable;
4550 fk_trigger->initdeferred = fkconstraint->initdeferred;
4551 fk_trigger->funcname = SystemFuncName("RI_FKey_noaction_del");
4553 case FKCONSTR_ACTION_RESTRICT:
4554 fk_trigger->deferrable = false;
4555 fk_trigger->initdeferred = false;
4556 fk_trigger->funcname = SystemFuncName("RI_FKey_restrict_del");
4558 case FKCONSTR_ACTION_CASCADE:
4559 fk_trigger->deferrable = false;
4560 fk_trigger->initdeferred = false;
4561 fk_trigger->funcname = SystemFuncName("RI_FKey_cascade_del");
4563 case FKCONSTR_ACTION_SETNULL:
4564 fk_trigger->deferrable = false;
4565 fk_trigger->initdeferred = false;
4566 fk_trigger->funcname = SystemFuncName("RI_FKey_setnull_del");
4568 case FKCONSTR_ACTION_SETDEFAULT:
4569 fk_trigger->deferrable = false;
4570 fk_trigger->initdeferred = false;
4571 fk_trigger->funcname = SystemFuncName("RI_FKey_setdefault_del");
4574 elog(ERROR, "unrecognized FK action type: %d",
4575 (int) fkconstraint->fk_del_action);
4579 fk_trigger->args = NIL;
4580 fk_trigger->args = lappend(fk_trigger->args,
4581 makeString(fkconstraint->constr_name));
4582 fk_trigger->args = lappend(fk_trigger->args,
4583 makeString(myRel->relname));
4584 fk_trigger->args = lappend(fk_trigger->args,
4585 makeString(fkconstraint->pktable->relname));
4586 fk_trigger->args = lappend(fk_trigger->args,
4587 makeString(fkMatchTypeToString(fkconstraint->fk_matchtype)));
4588 forboth(fk_attr, fkconstraint->fk_attrs,
4589 pk_attr, fkconstraint->pk_attrs)
4591 fk_trigger->args = lappend(fk_trigger->args, lfirst(fk_attr));
4592 fk_trigger->args = lappend(fk_trigger->args, lfirst(pk_attr));
4595 trigobj.objectId = CreateTrigger(fk_trigger, true);
4597 /* Register dependency from trigger to constraint */
4598 recordDependencyOn(&trigobj, &constrobj, DEPENDENCY_INTERNAL);
4600 /* Make changes-so-far visible */
4601 CommandCounterIncrement();
4604 * Build and execute a CREATE CONSTRAINT TRIGGER statement for the ON
4605 * UPDATE action on the referenced table.
4607 fk_trigger = makeNode(CreateTrigStmt);
4608 fk_trigger->trigname = fkconstraint->constr_name;
4609 fk_trigger->relation = fkconstraint->pktable;
4610 fk_trigger->before = false;
4611 fk_trigger->row = true;
4612 fk_trigger->actions[0] = 'u';
4613 fk_trigger->actions[1] = '\0';
4614 fk_trigger->isconstraint = true;
4615 fk_trigger->constrrel = myRel;
4616 switch (fkconstraint->fk_upd_action)
4618 case FKCONSTR_ACTION_NOACTION:
4619 fk_trigger->deferrable = fkconstraint->deferrable;
4620 fk_trigger->initdeferred = fkconstraint->initdeferred;
4621 fk_trigger->funcname = SystemFuncName("RI_FKey_noaction_upd");
4623 case FKCONSTR_ACTION_RESTRICT:
4624 fk_trigger->deferrable = false;
4625 fk_trigger->initdeferred = false;
4626 fk_trigger->funcname = SystemFuncName("RI_FKey_restrict_upd");
4628 case FKCONSTR_ACTION_CASCADE:
4629 fk_trigger->deferrable = false;
4630 fk_trigger->initdeferred = false;
4631 fk_trigger->funcname = SystemFuncName("RI_FKey_cascade_upd");
4633 case FKCONSTR_ACTION_SETNULL:
4634 fk_trigger->deferrable = false;
4635 fk_trigger->initdeferred = false;
4636 fk_trigger->funcname = SystemFuncName("RI_FKey_setnull_upd");
4638 case FKCONSTR_ACTION_SETDEFAULT:
4639 fk_trigger->deferrable = false;
4640 fk_trigger->initdeferred = false;
4641 fk_trigger->funcname = SystemFuncName("RI_FKey_setdefault_upd");
4644 elog(ERROR, "unrecognized FK action type: %d",
4645 (int) fkconstraint->fk_upd_action);
4649 fk_trigger->args = NIL;
4650 fk_trigger->args = lappend(fk_trigger->args,
4651 makeString(fkconstraint->constr_name));
4652 fk_trigger->args = lappend(fk_trigger->args,
4653 makeString(myRel->relname));
4654 fk_trigger->args = lappend(fk_trigger->args,
4655 makeString(fkconstraint->pktable->relname));
4656 fk_trigger->args = lappend(fk_trigger->args,
4657 makeString(fkMatchTypeToString(fkconstraint->fk_matchtype)));
4658 forboth(fk_attr, fkconstraint->fk_attrs,
4659 pk_attr, fkconstraint->pk_attrs)
4661 fk_trigger->args = lappend(fk_trigger->args, lfirst(fk_attr));
4662 fk_trigger->args = lappend(fk_trigger->args, lfirst(pk_attr));
4665 trigobj.objectId = CreateTrigger(fk_trigger, true);
4667 /* Register dependency from trigger to constraint */
4668 recordDependencyOn(&trigobj, &constrobj, DEPENDENCY_INTERNAL);
4672 * fkMatchTypeToString -
4673 * convert FKCONSTR_MATCH_xxx code to string to use in trigger args
4676 fkMatchTypeToString(char match_type)
4680 case FKCONSTR_MATCH_FULL:
4681 return pstrdup("FULL");
4682 case FKCONSTR_MATCH_PARTIAL:
4683 return pstrdup("PARTIAL");
4684 case FKCONSTR_MATCH_UNSPECIFIED:
4685 return pstrdup("UNSPECIFIED");
4687 elog(ERROR, "unrecognized match type: %d",
4690 return NULL; /* can't get here */
4694 * ALTER TABLE DROP CONSTRAINT
4697 ATPrepDropConstraint(List **wqueue, Relation rel,
4698 bool recurse, AlterTableCmd *cmd)
4701 * We don't want errors or noise from child tables, so we have to pass
4702 * down a modified command.
4706 AlterTableCmd *childCmd = copyObject(cmd);
4708 childCmd->subtype = AT_DropConstraintQuietly;
4709 ATSimpleRecursion(wqueue, rel, childCmd, recurse);
4714 ATExecDropConstraint(Relation rel, const char *constrName,
4715 DropBehavior behavior, bool quiet)
4719 deleted = RemoveRelConstraints(rel, constrName, behavior);
4723 /* If zero constraints deleted, complain */
4726 (errcode(ERRCODE_UNDEFINED_OBJECT),
4727 errmsg("constraint \"%s\" does not exist",
4729 /* Otherwise if more than one constraint deleted, notify */
4730 else if (deleted > 1)
4732 (errmsg("multiple constraints named \"%s\" were dropped",
4741 ATPrepAlterColumnType(List **wqueue,
4742 AlteredTableInfo *tab, Relation rel,
4743 bool recurse, bool recursing,
4746 char *colName = cmd->name;
4747 TypeName *typename = (TypeName *) cmd->def;
4749 Form_pg_attribute attTup;
4753 NewColumnValue *newval;
4754 ParseState *pstate = make_parsestate(NULL);
4756 /* lookup the attribute so we can check inheritance status */
4757 tuple = SearchSysCacheAttName(RelationGetRelid(rel), colName);
4758 if (!HeapTupleIsValid(tuple))
4760 (errcode(ERRCODE_UNDEFINED_COLUMN),
4761 errmsg("column \"%s\" of relation \"%s\" does not exist",
4762 colName, RelationGetRelationName(rel))));
4763 attTup = (Form_pg_attribute) GETSTRUCT(tuple);
4764 attnum = attTup->attnum;
4766 /* Can't alter a system attribute */
4769 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
4770 errmsg("cannot alter system column \"%s\"",
4773 /* Don't alter inherited columns */
4774 if (attTup->attinhcount > 0 && !recursing)
4776 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
4777 errmsg("cannot alter inherited column \"%s\"",
4780 /* Look up the target type */
4781 targettype = LookupTypeName(typename);
4782 if (!OidIsValid(targettype))
4784 (errcode(ERRCODE_UNDEFINED_OBJECT),
4785 errmsg("type \"%s\" does not exist",
4786 TypeNameToString(typename))));
4788 /* make sure datatype is legal for a column */
4789 CheckAttributeType(colName, targettype);
4792 * Set up an expression to transform the old data value to the new type.
4793 * If a USING option was given, transform and use that expression, else
4794 * just take the old value and try to coerce it. We do this first so that
4795 * type incompatibility can be detected before we waste effort, and
4796 * because we need the expression to be parsed against the original table
4803 /* Expression must be able to access vars of old table */
4804 rte = addRangeTableEntryForRelation(pstate,
4809 addRTEtoQuery(pstate, rte, false, true, true);
4811 transform = transformExpr(pstate, cmd->transform);
4813 /* It can't return a set */
4814 if (expression_returns_set(transform))
4816 (errcode(ERRCODE_DATATYPE_MISMATCH),
4817 errmsg("transform expression must not return a set")));
4819 /* No subplans or aggregates, either... */
4820 if (pstate->p_hasSubLinks)
4822 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
4823 errmsg("cannot use subquery in transform expression")));
4824 if (pstate->p_hasAggs)
4826 (errcode(ERRCODE_GROUPING_ERROR),
4827 errmsg("cannot use aggregate function in transform expression")));
4831 transform = (Node *) makeVar(1, attnum,
4832 attTup->atttypid, attTup->atttypmod,
4836 transform = coerce_to_target_type(pstate,
4837 transform, exprType(transform),
4838 targettype, typename->typmod,
4839 COERCION_ASSIGNMENT,
4840 COERCE_IMPLICIT_CAST);
4841 if (transform == NULL)
4843 (errcode(ERRCODE_DATATYPE_MISMATCH),
4844 errmsg("column \"%s\" cannot be cast to type \"%s\"",
4845 colName, TypeNameToString(typename))));
4848 * Add a work queue item to make ATRewriteTable update the column
4851 newval = (NewColumnValue *) palloc0(sizeof(NewColumnValue));
4852 newval->attnum = attnum;
4853 newval->expr = (Expr *) transform;
4855 tab->newvals = lappend(tab->newvals, newval);
4857 ReleaseSysCache(tuple);
4860 * The recursion case is handled by ATSimpleRecursion. However, if we are
4861 * told not to recurse, there had better not be any child tables; else the
4862 * alter would put them out of step.
4865 ATSimpleRecursion(wqueue, rel, cmd, recurse);
4866 else if (!recursing &&
4867 find_inheritance_children(RelationGetRelid(rel)) != NIL)
4869 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
4870 errmsg("type of inherited column \"%s\" must be changed in child tables too",
4875 ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
4876 const char *colName, TypeName *typename)
4879 Form_pg_attribute attTup;
4881 HeapTuple typeTuple;
4885 Relation attrelation;
4891 attrelation = heap_open(AttributeRelationId, RowExclusiveLock);
4893 /* Look up the target column */
4894 heapTup = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
4895 if (!HeapTupleIsValid(heapTup)) /* shouldn't happen */
4897 (errcode(ERRCODE_UNDEFINED_COLUMN),
4898 errmsg("column \"%s\" of relation \"%s\" does not exist",
4899 colName, RelationGetRelationName(rel))));
4900 attTup = (Form_pg_attribute) GETSTRUCT(heapTup);
4901 attnum = attTup->attnum;
4903 /* Check for multiple ALTER TYPE on same column --- can't cope */
4904 if (attTup->atttypid != tab->oldDesc->attrs[attnum - 1]->atttypid ||
4905 attTup->atttypmod != tab->oldDesc->attrs[attnum - 1]->atttypmod)
4907 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
4908 errmsg("cannot alter type of column \"%s\" twice",
4911 /* Look up the target type (should not fail, since prep found it) */
4912 typeTuple = typenameType(typename);
4913 tform = (Form_pg_type) GETSTRUCT(typeTuple);
4914 targettype = HeapTupleGetOid(typeTuple);
4917 * If there is a default expression for the column, get it and ensure we
4918 * can coerce it to the new datatype. (We must do this before changing
4919 * the column type, because build_column_default itself will try to
4920 * coerce, and will not issue the error message we want if it fails.)
4922 * We remove any implicit coercion steps at the top level of the old
4923 * default expression; this has been agreed to satisfy the principle of
4924 * least surprise. (The conversion to the new column type should act like
4925 * it started from what the user sees as the stored expression, and the
4926 * implicit coercions aren't going to be shown.)
4928 if (attTup->atthasdef)
4930 defaultexpr = build_column_default(rel, attnum);
4931 Assert(defaultexpr);
4932 defaultexpr = strip_implicit_coercions(defaultexpr);
4933 defaultexpr = coerce_to_target_type(NULL, /* no UNKNOWN params */
4934 defaultexpr, exprType(defaultexpr),
4935 targettype, typename->typmod,
4936 COERCION_ASSIGNMENT,
4937 COERCE_IMPLICIT_CAST);
4938 if (defaultexpr == NULL)
4940 (errcode(ERRCODE_DATATYPE_MISMATCH),
4941 errmsg("default for column \"%s\" cannot be cast to type \"%s\"",
4942 colName, TypeNameToString(typename))));
4948 * Find everything that depends on the column (constraints, indexes, etc),
4949 * and record enough information to let us recreate the objects.
4951 * The actual recreation does not happen here, but only after we have
4952 * performed all the individual ALTER TYPE operations. We have to save
4953 * the info before executing ALTER TYPE, though, else the deparser will
4956 * There could be multiple entries for the same object, so we must check
4957 * to ensure we process each one only once. Note: we assume that an index
4958 * that implements a constraint will not show a direct dependency on the
4961 depRel = heap_open(DependRelationId, RowExclusiveLock);
4963 ScanKeyInit(&key[0],
4964 Anum_pg_depend_refclassid,
4965 BTEqualStrategyNumber, F_OIDEQ,
4966 ObjectIdGetDatum(RelationRelationId));
4967 ScanKeyInit(&key[1],
4968 Anum_pg_depend_refobjid,
4969 BTEqualStrategyNumber, F_OIDEQ,
4970 ObjectIdGetDatum(RelationGetRelid(rel)));
4971 ScanKeyInit(&key[2],
4972 Anum_pg_depend_refobjsubid,
4973 BTEqualStrategyNumber, F_INT4EQ,
4974 Int32GetDatum((int32) attnum));
4976 scan = systable_beginscan(depRel, DependReferenceIndexId, true,
4977 SnapshotNow, 3, key);
4979 while (HeapTupleIsValid(depTup = systable_getnext(scan)))
4981 Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(depTup);
4982 ObjectAddress foundObject;
4984 /* We don't expect any PIN dependencies on columns */
4985 if (foundDep->deptype == DEPENDENCY_PIN)
4986 elog(ERROR, "cannot alter type of a pinned column");
4988 foundObject.classId = foundDep->classid;
4989 foundObject.objectId = foundDep->objid;
4990 foundObject.objectSubId = foundDep->objsubid;
4992 switch (getObjectClass(&foundObject))
4996 char relKind = get_rel_relkind(foundObject.objectId);
4998 if (relKind == RELKIND_INDEX)
5000 Assert(foundObject.objectSubId == 0);
5001 if (!list_member_oid(tab->changedIndexOids, foundObject.objectId))
5003 tab->changedIndexOids = lappend_oid(tab->changedIndexOids,
5004 foundObject.objectId);
5005 tab->changedIndexDefs = lappend(tab->changedIndexDefs,
5006 pg_get_indexdef_string(foundObject.objectId));
5009 else if (relKind == RELKIND_SEQUENCE)
5012 * This must be a SERIAL column's sequence. We need
5013 * not do anything to it.
5015 Assert(foundObject.objectSubId == 0);
5019 /* Not expecting any other direct dependencies... */
5020 elog(ERROR, "unexpected object depending on column: %s",
5021 getObjectDescription(&foundObject));
5026 case OCLASS_CONSTRAINT:
5027 Assert(foundObject.objectSubId == 0);
5028 if (!list_member_oid(tab->changedConstraintOids,
5029 foundObject.objectId))
5031 char *defstring = pg_get_constraintdef_string(foundObject.objectId);
5034 * Put NORMAL dependencies at the front of the list and
5035 * AUTO dependencies at the back. This makes sure that
5036 * foreign-key constraints depending on this column will
5037 * be dropped before unique or primary-key constraints of
5038 * the column; which we must have because the FK
5039 * constraints depend on the indexes belonging to the
5040 * unique constraints.
5042 if (foundDep->deptype == DEPENDENCY_NORMAL)
5044 tab->changedConstraintOids =
5045 lcons_oid(foundObject.objectId,
5046 tab->changedConstraintOids);
5047 tab->changedConstraintDefs =
5049 tab->changedConstraintDefs);
5053 tab->changedConstraintOids =
5054 lappend_oid(tab->changedConstraintOids,
5055 foundObject.objectId);
5056 tab->changedConstraintDefs =
5057 lappend(tab->changedConstraintDefs,
5063 case OCLASS_REWRITE:
5064 /* XXX someday see if we can cope with revising views */
5066 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
5067 errmsg("cannot alter type of a column used by a view or rule"),
5068 errdetail("%s depends on column \"%s\"",
5069 getObjectDescription(&foundObject),
5073 case OCLASS_DEFAULT:
5076 * Ignore the column's default expression, since we will fix
5079 Assert(defaultexpr);
5085 case OCLASS_CONVERSION:
5086 case OCLASS_LANGUAGE:
5087 case OCLASS_OPERATOR:
5088 case OCLASS_OPCLASS:
5089 case OCLASS_TRIGGER:
5093 * We don't expect any of these sorts of objects to depend on
5096 elog(ERROR, "unexpected object depending on column: %s",
5097 getObjectDescription(&foundObject));
5101 elog(ERROR, "unrecognized object class: %u",
5102 foundObject.classId);
5106 systable_endscan(scan);
5109 * Now scan for dependencies of this column on other things. The only
5110 * thing we should find is the dependency on the column datatype, which we
5113 ScanKeyInit(&key[0],
5114 Anum_pg_depend_classid,
5115 BTEqualStrategyNumber, F_OIDEQ,
5116 ObjectIdGetDatum(RelationRelationId));
5117 ScanKeyInit(&key[1],
5118 Anum_pg_depend_objid,
5119 BTEqualStrategyNumber, F_OIDEQ,
5120 ObjectIdGetDatum(RelationGetRelid(rel)));
5121 ScanKeyInit(&key[2],
5122 Anum_pg_depend_objsubid,
5123 BTEqualStrategyNumber, F_INT4EQ,
5124 Int32GetDatum((int32) attnum));
5126 scan = systable_beginscan(depRel, DependDependerIndexId, true,
5127 SnapshotNow, 3, key);
5129 while (HeapTupleIsValid(depTup = systable_getnext(scan)))
5131 Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(depTup);
5133 if (foundDep->deptype != DEPENDENCY_NORMAL)
5134 elog(ERROR, "found unexpected dependency type '%c'",
5136 if (foundDep->refclassid != TypeRelationId ||
5137 foundDep->refobjid != attTup->atttypid)
5138 elog(ERROR, "found unexpected dependency for column");
5140 simple_heap_delete(depRel, &depTup->t_self);
5143 systable_endscan(scan);
5145 heap_close(depRel, RowExclusiveLock);
5148 * Here we go --- change the recorded column type. (Note heapTup is a
5149 * copy of the syscache entry, so okay to scribble on.)
5151 attTup->atttypid = targettype;
5152 attTup->atttypmod = typename->typmod;
5153 attTup->attndims = list_length(typename->arrayBounds);
5154 attTup->attlen = tform->typlen;
5155 attTup->attbyval = tform->typbyval;
5156 attTup->attalign = tform->typalign;
5157 attTup->attstorage = tform->typstorage;
5159 ReleaseSysCache(typeTuple);
5161 simple_heap_update(attrelation, &heapTup->t_self, heapTup);
5163 /* keep system catalog indexes current */
5164 CatalogUpdateIndexes(attrelation, heapTup);
5166 heap_close(attrelation, RowExclusiveLock);
5168 /* Install dependency on new datatype */
5169 add_column_datatype_dependency(RelationGetRelid(rel), attnum, targettype);
5172 * Drop any pg_statistic entry for the column, since it's now wrong type
5174 RemoveStatistics(RelationGetRelid(rel), attnum);
5177 * Update the default, if present, by brute force --- remove and re-add
5178 * the default. Probably unsafe to take shortcuts, since the new version
5179 * may well have additional dependencies. (It's okay to do this now,
5180 * rather than after other ALTER TYPE commands, since the default won't
5181 * depend on other column types.)
5185 /* Must make new row visible since it will be updated again */
5186 CommandCounterIncrement();
5189 * We use RESTRICT here for safety, but at present we do not expect
5190 * anything to depend on the default.
5192 RemoveAttrDefault(RelationGetRelid(rel), attnum, DROP_RESTRICT, true);
5194 StoreAttrDefault(rel, attnum, nodeToString(defaultexpr));
5198 heap_freetuple(heapTup);
5202 * Cleanup after we've finished all the ALTER TYPE operations for a
5203 * particular relation. We have to drop and recreate all the indexes
5204 * and constraints that depend on the altered columns.
5207 ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab)
5213 * Re-parse the index and constraint definitions, and attach them to the
5214 * appropriate work queue entries. We do this before dropping because in
5215 * the case of a FOREIGN KEY constraint, we might not yet have exclusive
5216 * lock on the table the constraint is attached to, and we need to get
5217 * that before dropping. It's safe because the parser won't actually look
5218 * at the catalogs to detect the existing entry.
5220 foreach(l, tab->changedIndexDefs)
5221 ATPostAlterTypeParse((char *) lfirst(l), wqueue);
5222 foreach(l, tab->changedConstraintDefs)
5223 ATPostAlterTypeParse((char *) lfirst(l), wqueue);
5226 * Now we can drop the existing constraints and indexes --- constraints
5227 * first, since some of them might depend on the indexes. In fact, we
5228 * have to delete FOREIGN KEY constraints before UNIQUE constraints,
5229 * but we already ordered the constraint list to ensure that would happen.
5230 * It should be okay to use DROP_RESTRICT here, since nothing else should
5231 * be depending on these objects.
5233 foreach(l, tab->changedConstraintOids)
5235 obj.classId = ConstraintRelationId;
5236 obj.objectId = lfirst_oid(l);
5237 obj.objectSubId = 0;
5238 performDeletion(&obj, DROP_RESTRICT);
5241 foreach(l, tab->changedIndexOids)
5243 obj.classId = RelationRelationId;
5244 obj.objectId = lfirst_oid(l);
5245 obj.objectSubId = 0;
5246 performDeletion(&obj, DROP_RESTRICT);
5250 * The objects will get recreated during subsequent passes over the work
5256 ATPostAlterTypeParse(char *cmd, List **wqueue)
5258 List *raw_parsetree_list;
5259 List *querytree_list;
5260 ListCell *list_item;
5263 * We expect that we only have to do raw parsing and parse analysis, not
5264 * any rule rewriting, since these will all be utility statements.
5266 raw_parsetree_list = raw_parser(cmd);
5267 querytree_list = NIL;
5268 foreach(list_item, raw_parsetree_list)
5270 Node *parsetree = (Node *) lfirst(list_item);
5272 querytree_list = list_concat(querytree_list,
5273 parse_analyze(parsetree, NULL, 0));
5277 * Attach each generated command to the proper place in the work queue.
5278 * Note this could result in creation of entirely new work-queue entries.
5280 foreach(list_item, querytree_list)
5282 Query *query = (Query *) lfirst(list_item);
5284 AlteredTableInfo *tab;
5286 Assert(IsA(query, Query));
5287 Assert(query->commandType == CMD_UTILITY);
5288 switch (nodeTag(query->utilityStmt))
5292 IndexStmt *stmt = (IndexStmt *) query->utilityStmt;
5293 AlterTableCmd *newcmd;
5295 rel = relation_openrv(stmt->relation, AccessExclusiveLock);
5296 tab = ATGetQueueEntry(wqueue, rel);
5297 newcmd = makeNode(AlterTableCmd);
5298 newcmd->subtype = AT_ReAddIndex;
5299 newcmd->def = (Node *) stmt;
5300 tab->subcmds[AT_PASS_OLD_INDEX] =
5301 lappend(tab->subcmds[AT_PASS_OLD_INDEX], newcmd);
5302 relation_close(rel, NoLock);
5305 case T_AlterTableStmt:
5307 AlterTableStmt *stmt = (AlterTableStmt *) query->utilityStmt;
5310 rel = relation_openrv(stmt->relation, AccessExclusiveLock);
5311 tab = ATGetQueueEntry(wqueue, rel);
5312 foreach(lcmd, stmt->cmds)
5314 AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lcmd);
5316 switch (cmd->subtype)
5319 cmd->subtype = AT_ReAddIndex;
5320 tab->subcmds[AT_PASS_OLD_INDEX] =
5321 lappend(tab->subcmds[AT_PASS_OLD_INDEX], cmd);
5323 case AT_AddConstraint:
5324 tab->subcmds[AT_PASS_OLD_CONSTR] =
5325 lappend(tab->subcmds[AT_PASS_OLD_CONSTR], cmd);
5328 elog(ERROR, "unexpected statement type: %d",
5329 (int) cmd->subtype);
5332 relation_close(rel, NoLock);
5336 elog(ERROR, "unexpected statement type: %d",
5337 (int) nodeTag(query->utilityStmt));
5346 * recursing is true if we are recursing from a table to its indexes or
5347 * toast table. We don't allow the ownership of those things to be
5348 * changed separately from the parent table. Also, we can skip permission
5349 * checks (this is necessary not just an optimization, else we'd fail to
5350 * handle toast tables properly).
5353 ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing)
5355 Relation target_rel;
5358 Form_pg_class tuple_class;
5361 * Get exclusive lock till end of transaction on the target table. Use
5362 * relation_open so that we can work on indexes and sequences.
5364 target_rel = relation_open(relationOid, AccessExclusiveLock);
5366 /* Get its pg_class tuple, too */
5367 class_rel = heap_open(RelationRelationId, RowExclusiveLock);
5369 tuple = SearchSysCache(RELOID,
5370 ObjectIdGetDatum(relationOid),
5372 if (!HeapTupleIsValid(tuple))
5373 elog(ERROR, "cache lookup failed for relation %u", relationOid);
5374 tuple_class = (Form_pg_class) GETSTRUCT(tuple);
5376 /* Can we change the ownership of this tuple? */
5377 switch (tuple_class->relkind)
5379 case RELKIND_RELATION:
5381 case RELKIND_SEQUENCE:
5382 /* ok to change owner */
5388 * Because ALTER INDEX OWNER used to be allowed, and in fact
5389 * is generated by old versions of pg_dump, we give a warning
5390 * and do nothing rather than erroring out. Also, to avoid
5391 * unnecessary chatter while restoring those old dumps, say
5392 * nothing at all if the command would be a no-op anyway.
5394 if (tuple_class->relowner != newOwnerId)
5396 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
5397 errmsg("cannot change owner of index \"%s\"",
5398 NameStr(tuple_class->relname)),
5399 errhint("Change the ownership of the index's table, instead.")));
5400 /* quick hack to exit via the no-op path */
5401 newOwnerId = tuple_class->relowner;
5404 case RELKIND_TOASTVALUE:
5410 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
5411 errmsg("\"%s\" is not a table, view, or sequence",
5412 NameStr(tuple_class->relname))));
5416 * If the new owner is the same as the existing owner, consider the
5417 * command to have succeeded. This is for dump restoration purposes.
5419 if (tuple_class->relowner != newOwnerId)
5421 Datum repl_val[Natts_pg_class];
5422 char repl_null[Natts_pg_class];
5423 char repl_repl[Natts_pg_class];
5429 /* skip permission checks when recursing to index or toast table */
5432 /* Superusers can always do it */
5435 Oid namespaceOid = tuple_class->relnamespace;
5436 AclResult aclresult;
5438 /* Otherwise, must be owner of the existing object */
5439 if (!pg_class_ownercheck(relationOid, GetUserId()))
5440 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
5441 RelationGetRelationName(target_rel));
5443 /* Must be able to become new owner */
5444 check_is_member_of_role(GetUserId(), newOwnerId);
5446 /* New owner must have CREATE privilege on namespace */
5447 aclresult = pg_namespace_aclcheck(namespaceOid, newOwnerId,
5449 if (aclresult != ACLCHECK_OK)
5450 aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
5451 get_namespace_name(namespaceOid));
5455 memset(repl_null, ' ', sizeof(repl_null));
5456 memset(repl_repl, ' ', sizeof(repl_repl));
5458 repl_repl[Anum_pg_class_relowner - 1] = 'r';
5459 repl_val[Anum_pg_class_relowner - 1] = ObjectIdGetDatum(newOwnerId);
5462 * Determine the modified ACL for the new owner. This is only
5463 * necessary when the ACL is non-null.
5465 aclDatum = SysCacheGetAttr(RELOID, tuple,
5466 Anum_pg_class_relacl,
5470 newAcl = aclnewowner(DatumGetAclP(aclDatum),
5471 tuple_class->relowner, newOwnerId);
5472 repl_repl[Anum_pg_class_relacl - 1] = 'r';
5473 repl_val[Anum_pg_class_relacl - 1] = PointerGetDatum(newAcl);
5476 newtuple = heap_modifytuple(tuple, RelationGetDescr(class_rel), repl_val, repl_null, repl_repl);
5478 simple_heap_update(class_rel, &newtuple->t_self, newtuple);
5479 CatalogUpdateIndexes(class_rel, newtuple);
5481 heap_freetuple(newtuple);
5483 /* Update owner dependency reference */
5484 changeDependencyOnOwner(RelationRelationId, relationOid, newOwnerId);
5487 * Also change the ownership of the table's rowtype, if it has one
5489 if (tuple_class->relkind != RELKIND_INDEX)
5490 AlterTypeOwnerInternal(tuple_class->reltype, newOwnerId);
5493 * If we are operating on a table, also change the ownership of any
5494 * indexes and sequences that belong to the table, as well as the
5495 * table's toast table (if it has one)
5497 if (tuple_class->relkind == RELKIND_RELATION ||
5498 tuple_class->relkind == RELKIND_TOASTVALUE)
5500 List *index_oid_list;
5503 /* Find all the indexes belonging to this relation */
5504 index_oid_list = RelationGetIndexList(target_rel);
5506 /* For each index, recursively change its ownership */
5507 foreach(i, index_oid_list)
5508 ATExecChangeOwner(lfirst_oid(i), newOwnerId, true);
5510 list_free(index_oid_list);
5513 if (tuple_class->relkind == RELKIND_RELATION)
5515 /* If it has a toast table, recurse to change its ownership */
5516 if (tuple_class->reltoastrelid != InvalidOid)
5517 ATExecChangeOwner(tuple_class->reltoastrelid, newOwnerId,
5520 /* If it has dependent sequences, recurse to change them too */
5521 change_owner_recurse_to_sequences(relationOid, newOwnerId);
5525 ReleaseSysCache(tuple);
5526 heap_close(class_rel, RowExclusiveLock);
5527 relation_close(target_rel, NoLock);
5531 * change_owner_recurse_to_sequences
5533 * Helper function for ATExecChangeOwner. Examines pg_depend searching
5534 * for sequences that are dependent on serial columns, and changes their
5538 change_owner_recurse_to_sequences(Oid relationOid, Oid newOwnerId)
5546 * SERIAL sequences are those having an internal dependency on one of the
5547 * table's columns (we don't care *which* column, exactly).
5549 depRel = heap_open(DependRelationId, AccessShareLock);
5551 ScanKeyInit(&key[0],
5552 Anum_pg_depend_refclassid,
5553 BTEqualStrategyNumber, F_OIDEQ,
5554 ObjectIdGetDatum(RelationRelationId));
5555 ScanKeyInit(&key[1],
5556 Anum_pg_depend_refobjid,
5557 BTEqualStrategyNumber, F_OIDEQ,
5558 ObjectIdGetDatum(relationOid));
5559 /* we leave refobjsubid unspecified */
5561 scan = systable_beginscan(depRel, DependReferenceIndexId, true,
5562 SnapshotNow, 2, key);
5564 while (HeapTupleIsValid(tup = systable_getnext(scan)))
5566 Form_pg_depend depForm = (Form_pg_depend) GETSTRUCT(tup);
5569 /* skip dependencies other than internal dependencies on columns */
5570 if (depForm->refobjsubid == 0 ||
5571 depForm->classid != RelationRelationId ||
5572 depForm->objsubid != 0 ||
5573 depForm->deptype != DEPENDENCY_INTERNAL)
5576 /* Use relation_open just in case it's an index */
5577 seqRel = relation_open(depForm->objid, AccessExclusiveLock);
5579 /* skip non-sequence relations */
5580 if (RelationGetForm(seqRel)->relkind != RELKIND_SEQUENCE)
5582 /* No need to keep the lock */
5583 relation_close(seqRel, AccessExclusiveLock);
5587 /* We don't need to close the sequence while we alter it. */
5588 ATExecChangeOwner(depForm->objid, newOwnerId, false);
5590 /* Now we can close it. Keep the lock till end of transaction. */
5591 relation_close(seqRel, NoLock);
5594 systable_endscan(scan);
5596 relation_close(depRel, AccessShareLock);
5600 * ALTER TABLE CLUSTER ON
5602 * The only thing we have to do is to change the indisclustered bits.
5605 ATExecClusterOn(Relation rel, const char *indexName)
5609 indexOid = get_relname_relid(indexName, rel->rd_rel->relnamespace);
5611 if (!OidIsValid(indexOid))
5613 (errcode(ERRCODE_UNDEFINED_OBJECT),
5614 errmsg("index \"%s\" for table \"%s\" does not exist",
5615 indexName, RelationGetRelationName(rel))));
5617 /* Check index is valid to cluster on */
5618 check_index_is_clusterable(rel, indexOid, false);
5620 /* And do the work */
5621 mark_index_clustered(rel, indexOid);
5625 * ALTER TABLE SET WITHOUT CLUSTER
5627 * We have to find any indexes on the table that have indisclustered bit
5628 * set and turn it off.
5631 ATExecDropCluster(Relation rel)
5633 mark_index_clustered(rel, InvalidOid);
5637 * ALTER TABLE SET TABLESPACE
5640 ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel, char *tablespacename)
5643 AclResult aclresult;
5646 * We do our own permission checking because we want to allow this on
5649 if (rel->rd_rel->relkind != RELKIND_RELATION &&
5650 rel->rd_rel->relkind != RELKIND_INDEX)
5652 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
5653 errmsg("\"%s\" is not a table or index",
5654 RelationGetRelationName(rel))));
5656 /* Permissions checks */
5657 if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
5658 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
5659 RelationGetRelationName(rel));
5661 if (!allowSystemTableMods && IsSystemRelation(rel))
5663 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
5664 errmsg("permission denied: \"%s\" is a system catalog",
5665 RelationGetRelationName(rel))));
5667 /* Check that the tablespace exists */
5668 tablespaceId = get_tablespace_oid(tablespacename);
5669 if (!OidIsValid(tablespaceId))
5671 (errcode(ERRCODE_UNDEFINED_OBJECT),
5672 errmsg("tablespace \"%s\" does not exist", tablespacename)));
5674 /* Check its permissions */
5675 aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(), ACL_CREATE);
5676 if (aclresult != ACLCHECK_OK)
5677 aclcheck_error(aclresult, ACL_KIND_TABLESPACE, tablespacename);
5679 /* Save info for Phase 3 to do the real work */
5680 if (OidIsValid(tab->newTableSpace))
5682 (errcode(ERRCODE_SYNTAX_ERROR),
5683 errmsg("cannot have multiple SET TABLESPACE subcommands")));
5684 tab->newTableSpace = tablespaceId;
5688 * Execute ALTER TABLE SET TABLESPACE for cases where there is no tuple
5689 * rewriting to be done, so we just want to copy the data as fast as possible.
5692 ATExecSetTableSpace(Oid tableOid, Oid newTableSpace)
5698 RelFileNode newrnode;
5699 SMgrRelation dstrel;
5702 Form_pg_class rd_rel;
5704 rel = relation_open(tableOid, NoLock);
5707 * We can never allow moving of shared or nailed-in-cache relations,
5708 * because we can't support changing their reltablespace values.
5710 if (rel->rd_rel->relisshared || rel->rd_isnailed)
5712 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
5713 errmsg("cannot move system relation \"%s\"",
5714 RelationGetRelationName(rel))));
5717 * Don't allow moving temp tables of other backends ... their local buffer
5718 * manager is not going to cope.
5720 if (isOtherTempNamespace(RelationGetNamespace(rel)))
5722 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
5723 errmsg("cannot move temporary tables of other sessions")));
5726 * No work if no change in tablespace.
5728 oldTableSpace = rel->rd_rel->reltablespace;
5729 if (newTableSpace == oldTableSpace ||
5730 (newTableSpace == MyDatabaseTableSpace && oldTableSpace == 0))
5732 relation_close(rel, NoLock);
5736 reltoastrelid = rel->rd_rel->reltoastrelid;
5737 reltoastidxid = rel->rd_rel->reltoastidxid;
5739 /* Get a modifiable copy of the relation's pg_class row */
5740 pg_class = heap_open(RelationRelationId, RowExclusiveLock);
5742 tuple = SearchSysCacheCopy(RELOID,
5743 ObjectIdGetDatum(tableOid),
5745 if (!HeapTupleIsValid(tuple))
5746 elog(ERROR, "cache lookup failed for relation %u", tableOid);
5747 rd_rel = (Form_pg_class) GETSTRUCT(tuple);
5749 /* create another storage file. Is it a little ugly ? */
5750 /* NOTE: any conflict in relfilenode value will be caught here */
5751 newrnode = rel->rd_node;
5752 newrnode.spcNode = newTableSpace;
5754 dstrel = smgropen(newrnode);
5755 smgrcreate(dstrel, rel->rd_istemp, false);
5757 /* copy relation data to the new physical file */
5758 copy_relation_data(rel, dstrel);
5760 /* schedule unlinking old physical file */
5761 RelationOpenSmgr(rel);
5762 smgrscheduleunlink(rel->rd_smgr, rel->rd_istemp);
5765 * Now drop smgr references. The source was already dropped by
5766 * smgrscheduleunlink.
5770 /* update the pg_class row */
5771 rd_rel->reltablespace = (newTableSpace == MyDatabaseTableSpace) ? InvalidOid : newTableSpace;
5772 simple_heap_update(pg_class, &tuple->t_self, tuple);
5773 CatalogUpdateIndexes(pg_class, tuple);
5775 heap_freetuple(tuple);
5777 heap_close(pg_class, RowExclusiveLock);
5779 relation_close(rel, NoLock);
5781 /* Make sure the reltablespace change is visible */
5782 CommandCounterIncrement();
5784 /* Move associated toast relation and/or index, too */
5785 if (OidIsValid(reltoastrelid))
5786 ATExecSetTableSpace(reltoastrelid, newTableSpace);
5787 if (OidIsValid(reltoastidxid))
5788 ATExecSetTableSpace(reltoastidxid, newTableSpace);
5792 * Copy data, block by block
5795 copy_relation_data(Relation rel, SMgrRelation dst)
5799 BlockNumber nblocks;
5802 Page page = (Page) buf;
5805 * Since we copy the file directly without looking at the shared buffers,
5806 * we'd better first flush out any pages of the source relation that are
5807 * in shared buffers. We assume no new changes will be made while we are
5808 * holding exclusive lock on the rel.
5810 FlushRelationBuffers(rel);
5813 * We need to log the copied data in WAL iff WAL archiving is enabled AND
5814 * it's not a temp rel.
5816 use_wal = XLogArchivingActive() && !rel->rd_istemp;
5818 nblocks = RelationGetNumberOfBlocks(rel);
5819 /* RelationGetNumberOfBlocks will certainly have opened rd_smgr */
5822 for (blkno = 0; blkno < nblocks; blkno++)
5824 smgrread(src, blkno, buf);
5829 xl_heap_newpage xlrec;
5831 XLogRecData rdata[2];
5833 /* NO ELOG(ERROR) from here till newpage op is logged */
5834 START_CRIT_SECTION();
5836 xlrec.node = dst->smgr_rnode;
5837 xlrec.blkno = blkno;
5839 rdata[0].data = (char *) &xlrec;
5840 rdata[0].len = SizeOfHeapNewpage;
5841 rdata[0].buffer = InvalidBuffer;
5842 rdata[0].next = &(rdata[1]);
5844 rdata[1].data = (char *) page;
5845 rdata[1].len = BLCKSZ;
5846 rdata[1].buffer = InvalidBuffer;
5847 rdata[1].next = NULL;
5849 recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_NEWPAGE, rdata);
5851 PageSetLSN(page, recptr);
5852 PageSetTLI(page, ThisTimeLineID);
5858 * Now write the page. We say isTemp = true even if it's not a temp
5859 * rel, because there's no need for smgr to schedule an fsync for this
5860 * write; we'll do it ourselves below.
5862 smgrwrite(dst, blkno, buf, true);
5866 * If the rel isn't temp, we must fsync it down to disk before it's safe
5867 * to commit the transaction. (For a temp rel we don't care since the rel
5868 * will be uninteresting after a crash anyway.)
5870 * It's obvious that we must do this when not WAL-logging the copy. It's
5871 * less obvious that we have to do it even if we did WAL-log the copied
5872 * pages. The reason is that since we're copying outside shared buffers, a
5873 * CHECKPOINT occurring during the copy has no way to flush the previously
5874 * written data to disk (indeed it won't know the new rel even exists). A
5875 * crash later on would replay WAL from the checkpoint, therefore it
5876 * wouldn't replay our earlier WAL entries. If we do not fsync those pages
5877 * here, they might still not be on disk when the crash occurs.
5879 if (!rel->rd_istemp)
5884 * ALTER TABLE ENABLE/DISABLE TRIGGER
5886 * We just pass this off to trigger.c.
5889 ATExecEnableDisableTrigger(Relation rel, char *trigname,
5890 bool enable, bool skip_system)
5892 EnableDisableTrigger(rel, trigname, enable, skip_system);
5896 * ALTER TABLE CREATE TOAST TABLE
5898 * Note: this is also invoked from outside this module; in such cases we
5899 * expect the caller to have verified that the relation is a table and we
5900 * have all the right permissions. Callers expect this function
5901 * to end with CommandCounterIncrement if it makes any changes.
5904 AlterTableCreateToastTable(Oid relOid, bool silent)
5909 bool shared_relation;
5913 char toast_relname[NAMEDATALEN];
5914 char toast_idxname[NAMEDATALEN];
5915 IndexInfo *indexInfo;
5916 Oid classObjectId[2];
5917 ObjectAddress baseobject,
5921 * Grab an exclusive lock on the target table, which we will NOT release
5922 * until end of transaction. (This is probably redundant in all present
5925 rel = heap_open(relOid, AccessExclusiveLock);
5928 * Toast table is shared if and only if its parent is.
5930 * We cannot allow toasting a shared relation after initdb (because
5931 * there's no way to mark it toasted in other databases' pg_class).
5932 * Unfortunately we can't distinguish initdb from a manually started
5933 * standalone backend (toasting happens after the bootstrap phase, so
5934 * checking IsBootstrapProcessingMode() won't work). However, we can at
5935 * least prevent this mistake under normal multi-user operation.
5937 shared_relation = rel->rd_rel->relisshared;
5938 if (shared_relation && IsUnderPostmaster)
5940 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5941 errmsg("shared tables cannot be toasted after initdb")));
5944 * Is it already toasted?
5946 if (rel->rd_rel->reltoastrelid != InvalidOid)
5950 heap_close(rel, NoLock);
5955 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5956 errmsg("table \"%s\" already has a TOAST table",
5957 RelationGetRelationName(rel))));
5961 * Check to see whether the table actually needs a TOAST table.
5963 if (!needs_toast_table(rel))
5967 heap_close(rel, NoLock);
5972 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5973 errmsg("table \"%s\" does not need a TOAST table",
5974 RelationGetRelationName(rel))));
5978 * Create the toast table and its index
5980 snprintf(toast_relname, sizeof(toast_relname),
5981 "pg_toast_%u", relOid);
5982 snprintf(toast_idxname, sizeof(toast_idxname),
5983 "pg_toast_%u_index", relOid);
5985 /* this is pretty painful... need a tuple descriptor */
5986 tupdesc = CreateTemplateTupleDesc(3, false);
5987 TupleDescInitEntry(tupdesc, (AttrNumber) 1,
5991 TupleDescInitEntry(tupdesc, (AttrNumber) 2,
5995 TupleDescInitEntry(tupdesc, (AttrNumber) 3,
6001 * Ensure that the toast table doesn't itself get toasted, or we'll be
6002 * toast :-(. This is essential for chunk_data because type bytea is
6003 * toastable; hit the other two just to be sure.
6005 tupdesc->attrs[0]->attstorage = 'p';
6006 tupdesc->attrs[1]->attstorage = 'p';
6007 tupdesc->attrs[2]->attstorage = 'p';
6010 * Note: the toast relation is placed in the regular pg_toast namespace
6011 * even if its master relation is a temp table. There cannot be any
6012 * naming collision, and the toast rel will be destroyed when its master
6013 * is, so there's no need to handle the toast rel as temp.
6015 toast_relid = heap_create_with_catalog(toast_relname,
6017 rel->rd_rel->reltablespace,
6019 rel->rd_rel->relowner,
6028 /* make the toast relation visible, else index creation will fail */
6029 CommandCounterIncrement();
6032 * Create unique index on chunk_id, chunk_seq.
6034 * NOTE: the normal TOAST access routines could actually function with a
6035 * single-column index on chunk_id only. However, the slice access
6036 * routines use both columns for faster access to an individual chunk. In
6037 * addition, we want it to be unique as a check against the possibility of
6038 * duplicate TOAST chunk OIDs. The index might also be a little more
6039 * efficient this way, since btree isn't all that happy with large numbers
6043 indexInfo = makeNode(IndexInfo);
6044 indexInfo->ii_NumIndexAttrs = 2;
6045 indexInfo->ii_KeyAttrNumbers[0] = 1;
6046 indexInfo->ii_KeyAttrNumbers[1] = 2;
6047 indexInfo->ii_Expressions = NIL;
6048 indexInfo->ii_ExpressionsState = NIL;
6049 indexInfo->ii_Predicate = NIL;
6050 indexInfo->ii_PredicateState = NIL;
6051 indexInfo->ii_Unique = true;
6053 classObjectId[0] = OID_BTREE_OPS_OID;
6054 classObjectId[1] = INT4_BTREE_OPS_OID;
6056 toast_idxid = index_create(toast_relid, toast_idxname, InvalidOid,
6059 rel->rd_rel->reltablespace,
6061 true, false, true, false);
6064 * Update toast rel's pg_class entry to show that it has an index. The
6065 * index OID is stored into the reltoastidxid field for easy access by the
6068 setRelhasindex(toast_relid, true, true, toast_idxid);
6071 * Store the toast table's OID in the parent relation's pg_class row
6073 class_rel = heap_open(RelationRelationId, RowExclusiveLock);
6075 reltup = SearchSysCacheCopy(RELOID,
6076 ObjectIdGetDatum(relOid),
6078 if (!HeapTupleIsValid(reltup))
6079 elog(ERROR, "cache lookup failed for relation %u", relOid);
6081 ((Form_pg_class) GETSTRUCT(reltup))->reltoastrelid = toast_relid;
6083 simple_heap_update(class_rel, &reltup->t_self, reltup);
6085 /* Keep catalog indexes current */
6086 CatalogUpdateIndexes(class_rel, reltup);
6088 heap_freetuple(reltup);
6090 heap_close(class_rel, RowExclusiveLock);
6093 * Register dependency from the toast table to the master, so that the
6094 * toast table will be deleted if the master is.
6096 baseobject.classId = RelationRelationId;
6097 baseobject.objectId = relOid;
6098 baseobject.objectSubId = 0;
6099 toastobject.classId = RelationRelationId;
6100 toastobject.objectId = toast_relid;
6101 toastobject.objectSubId = 0;
6103 recordDependencyOn(&toastobject, &baseobject, DEPENDENCY_INTERNAL);
6106 * Clean up and make changes visible
6108 heap_close(rel, NoLock);
6110 CommandCounterIncrement();
6114 * Check to see whether the table needs a TOAST table. It does only if
6115 * (1) there are any toastable attributes, and (2) the maximum length
6116 * of a tuple could exceed TOAST_TUPLE_THRESHOLD. (We don't want to
6117 * create a toast table for something like "f1 varchar(20)".)
6120 needs_toast_table(Relation rel)
6122 int32 data_length = 0;
6123 bool maxlength_unknown = false;
6124 bool has_toastable_attrs = false;
6126 Form_pg_attribute *att;
6130 tupdesc = rel->rd_att;
6131 att = tupdesc->attrs;
6133 for (i = 0; i < tupdesc->natts; i++)
6135 if (att[i]->attisdropped)
6137 data_length = att_align(data_length, att[i]->attalign);
6138 if (att[i]->attlen > 0)
6140 /* Fixed-length types are never toastable */
6141 data_length += att[i]->attlen;
6145 int32 maxlen = type_maximum_size(att[i]->atttypid,
6149 maxlength_unknown = true;
6151 data_length += maxlen;
6152 if (att[i]->attstorage != 'p')
6153 has_toastable_attrs = true;
6156 if (!has_toastable_attrs)
6157 return false; /* nothing to toast? */
6158 if (maxlength_unknown)
6159 return true; /* any unlimited-length attrs? */
6160 tuple_length = MAXALIGN(offsetof(HeapTupleHeaderData, t_bits) +
6161 BITMAPLEN(tupdesc->natts)) +
6162 MAXALIGN(data_length);
6163 return (tuple_length > TOAST_TUPLE_THRESHOLD);
6168 * Execute ALTER TABLE SET SCHEMA
6170 * Note: caller must have checked ownership of the relation already
6173 AlterTableNamespace(RangeVar *relation, const char *newschema)
6181 rel = heap_openrv(relation, AccessExclusiveLock);
6183 /* heap_openrv allows TOAST, but we don't want to */
6184 if (rel->rd_rel->relkind == RELKIND_TOASTVALUE)
6186 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
6187 errmsg("\"%s\" is a TOAST relation",
6188 RelationGetRelationName(rel))));
6190 relid = RelationGetRelid(rel);
6191 oldNspOid = RelationGetNamespace(rel);
6193 /* get schema OID and check its permissions */
6194 nspOid = LookupCreationNamespace(newschema);
6196 if (oldNspOid == nspOid)
6198 (errcode(ERRCODE_DUPLICATE_TABLE),
6199 errmsg("relation \"%s\" is already in schema \"%s\"",
6200 RelationGetRelationName(rel),
6203 /* disallow renaming into or out of temp schemas */
6204 if (isAnyTempNamespace(nspOid) || isAnyTempNamespace(oldNspOid))
6206 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
6207 errmsg("cannot move objects into or out of temporary schemas")));
6209 /* same for TOAST schema */
6210 if (nspOid == PG_TOAST_NAMESPACE || oldNspOid == PG_TOAST_NAMESPACE)
6212 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
6213 errmsg("cannot move objects into or out of TOAST schema")));
6215 /* OK, modify the pg_class row and pg_depend entry */
6216 classRel = heap_open(RelationRelationId, RowExclusiveLock);
6218 AlterRelationNamespaceInternal(classRel, relid, oldNspOid, nspOid, true);
6220 /* Fix the table's rowtype too */
6221 AlterTypeNamespaceInternal(rel->rd_rel->reltype, nspOid, false);
6223 /* Fix other dependent stuff */
6224 if (rel->rd_rel->relkind == RELKIND_RELATION)
6226 AlterIndexNamespaces(classRel, rel, oldNspOid, nspOid);
6227 AlterSeqNamespaces(classRel, rel, oldNspOid, nspOid, newschema);
6228 AlterConstraintNamespaces(relid, oldNspOid, nspOid, false);
6231 heap_close(classRel, RowExclusiveLock);
6233 /* close rel, but keep lock until commit */
6234 relation_close(rel, NoLock);
6238 * The guts of relocating a relation to another namespace: fix the pg_class
6239 * entry, and the pg_depend entry if any. Caller must already have
6240 * opened and write-locked pg_class.
6243 AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
6244 Oid oldNspOid, Oid newNspOid,
6245 bool hasDependEntry)
6248 Form_pg_class classForm;
6250 classTup = SearchSysCacheCopy(RELOID,
6251 ObjectIdGetDatum(relOid),
6253 if (!HeapTupleIsValid(classTup))
6254 elog(ERROR, "cache lookup failed for relation %u", relOid);
6255 classForm = (Form_pg_class) GETSTRUCT(classTup);
6257 Assert(classForm->relnamespace == oldNspOid);
6259 /* check for duplicate name (more friendly than unique-index failure) */
6260 if (get_relname_relid(NameStr(classForm->relname),
6261 newNspOid) != InvalidOid)
6263 (errcode(ERRCODE_DUPLICATE_TABLE),
6264 errmsg("relation \"%s\" already exists in schema \"%s\"",
6265 NameStr(classForm->relname),
6266 get_namespace_name(newNspOid))));
6268 /* classTup is a copy, so OK to scribble on */
6269 classForm->relnamespace = newNspOid;
6271 simple_heap_update(classRel, &classTup->t_self, classTup);
6272 CatalogUpdateIndexes(classRel, classTup);
6274 /* Update dependency on schema if caller said so */
6275 if (hasDependEntry &&
6276 changeDependencyFor(RelationRelationId, relOid,
6277 NamespaceRelationId, oldNspOid, newNspOid) != 1)
6278 elog(ERROR, "failed to change schema dependency for relation \"%s\"",
6279 NameStr(classForm->relname));
6281 heap_freetuple(classTup);
6285 * Move all indexes for the specified relation to another namespace.
6287 * Note: we assume adequate permission checking was done by the caller,
6288 * and that the caller has a suitable lock on the owning relation.
6291 AlterIndexNamespaces(Relation classRel, Relation rel,
6292 Oid oldNspOid, Oid newNspOid)
6297 indexList = RelationGetIndexList(rel);
6299 foreach(l, indexList)
6301 Oid indexOid = lfirst_oid(l);
6304 * Note: currently, the index will not have its own dependency on the
6305 * namespace, so we don't need to do changeDependencyFor(). There's no
6306 * rowtype in pg_type, either.
6308 AlterRelationNamespaceInternal(classRel, indexOid,
6309 oldNspOid, newNspOid,
6313 list_free(indexList);
6317 * Move all SERIAL-column sequences of the specified relation to another
6320 * Note: we assume adequate permission checking was done by the caller,
6321 * and that the caller has a suitable lock on the owning relation.
6324 AlterSeqNamespaces(Relation classRel, Relation rel,
6325 Oid oldNspOid, Oid newNspOid, const char *newNspName)
6333 * SERIAL sequences are those having an internal dependency on one of the
6334 * table's columns (we don't care *which* column, exactly).
6336 depRel = heap_open(DependRelationId, AccessShareLock);
6338 ScanKeyInit(&key[0],
6339 Anum_pg_depend_refclassid,
6340 BTEqualStrategyNumber, F_OIDEQ,
6341 ObjectIdGetDatum(RelationRelationId));
6342 ScanKeyInit(&key[1],
6343 Anum_pg_depend_refobjid,
6344 BTEqualStrategyNumber, F_OIDEQ,
6345 ObjectIdGetDatum(RelationGetRelid(rel)));
6346 /* we leave refobjsubid unspecified */
6348 scan = systable_beginscan(depRel, DependReferenceIndexId, true,
6349 SnapshotNow, 2, key);
6351 while (HeapTupleIsValid(tup = systable_getnext(scan)))
6353 Form_pg_depend depForm = (Form_pg_depend) GETSTRUCT(tup);
6356 /* skip dependencies other than internal dependencies on columns */
6357 if (depForm->refobjsubid == 0 ||
6358 depForm->classid != RelationRelationId ||
6359 depForm->objsubid != 0 ||
6360 depForm->deptype != DEPENDENCY_INTERNAL)
6363 /* Use relation_open just in case it's an index */
6364 seqRel = relation_open(depForm->objid, AccessExclusiveLock);
6366 /* skip non-sequence relations */
6367 if (RelationGetForm(seqRel)->relkind != RELKIND_SEQUENCE)
6369 /* No need to keep the lock */
6370 relation_close(seqRel, AccessExclusiveLock);
6374 /* Fix the pg_class and pg_depend entries */
6375 AlterRelationNamespaceInternal(classRel, depForm->objid,
6376 oldNspOid, newNspOid,
6380 * Sequences have entries in pg_type. We need to be careful to move
6381 * them to the new namespace, too.
6383 AlterTypeNamespaceInternal(RelationGetForm(seqRel)->reltype,
6386 /* Now we can close it. Keep the lock till end of transaction. */
6387 relation_close(seqRel, NoLock);
6390 systable_endscan(scan);
6392 relation_close(depRel, AccessShareLock);
6397 * This code supports
6398 * CREATE TEMP TABLE ... ON COMMIT { DROP | PRESERVE ROWS | DELETE ROWS }
6400 * Because we only support this for TEMP tables, it's sufficient to remember
6401 * the state in a backend-local data structure.
6405 * Register a newly-created relation's ON COMMIT action.
6408 register_on_commit_action(Oid relid, OnCommitAction action)
6411 MemoryContext oldcxt;
6414 * We needn't bother registering the relation unless there is an ON COMMIT
6415 * action we need to take.
6417 if (action == ONCOMMIT_NOOP || action == ONCOMMIT_PRESERVE_ROWS)
6420 oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
6422 oc = (OnCommitItem *) palloc(sizeof(OnCommitItem));
6424 oc->oncommit = action;
6425 oc->creating_subid = GetCurrentSubTransactionId();
6426 oc->deleting_subid = InvalidSubTransactionId;
6428 on_commits = lcons(oc, on_commits);
6430 MemoryContextSwitchTo(oldcxt);
6434 * Unregister any ON COMMIT action when a relation is deleted.
6436 * Actually, we only mark the OnCommitItem entry as to be deleted after commit.
6439 remove_on_commit_action(Oid relid)
6443 foreach(l, on_commits)
6445 OnCommitItem *oc = (OnCommitItem *) lfirst(l);
6447 if (oc->relid == relid)
6449 oc->deleting_subid = GetCurrentSubTransactionId();
6456 * Perform ON COMMIT actions.
6458 * This is invoked just before actually committing, since it's possible
6459 * to encounter errors.
6462 PreCommit_on_commit_actions(void)
6465 List *oids_to_truncate = NIL;
6467 foreach(l, on_commits)
6469 OnCommitItem *oc = (OnCommitItem *) lfirst(l);
6471 /* Ignore entry if already dropped in this xact */
6472 if (oc->deleting_subid != InvalidSubTransactionId)
6475 switch (oc->oncommit)
6478 case ONCOMMIT_PRESERVE_ROWS:
6479 /* Do nothing (there shouldn't be such entries, actually) */
6481 case ONCOMMIT_DELETE_ROWS:
6482 oids_to_truncate = lappend_oid(oids_to_truncate, oc->relid);
6486 ObjectAddress object;
6488 object.classId = RelationRelationId;
6489 object.objectId = oc->relid;
6490 object.objectSubId = 0;
6491 performDeletion(&object, DROP_CASCADE);
6494 * Note that table deletion will call
6495 * remove_on_commit_action, so the entry should get marked
6498 Assert(oc->deleting_subid != InvalidSubTransactionId);
6503 if (oids_to_truncate != NIL)
6505 heap_truncate(oids_to_truncate);
6506 CommandCounterIncrement(); /* XXX needed? */
6511 * Post-commit or post-abort cleanup for ON COMMIT management.
6513 * All we do here is remove no-longer-needed OnCommitItem entries.
6515 * During commit, remove entries that were deleted during this transaction;
6516 * during abort, remove those created during this transaction.
6519 AtEOXact_on_commit_actions(bool isCommit)
6522 ListCell *prev_item;
6525 cur_item = list_head(on_commits);
6527 while (cur_item != NULL)
6529 OnCommitItem *oc = (OnCommitItem *) lfirst(cur_item);
6531 if (isCommit ? oc->deleting_subid != InvalidSubTransactionId :
6532 oc->creating_subid != InvalidSubTransactionId)
6534 /* cur_item must be removed */
6535 on_commits = list_delete_cell(on_commits, cur_item, prev_item);
6538 cur_item = lnext(prev_item);
6540 cur_item = list_head(on_commits);
6544 /* cur_item must be preserved */
6545 oc->creating_subid = InvalidSubTransactionId;
6546 oc->deleting_subid = InvalidSubTransactionId;
6547 prev_item = cur_item;
6548 cur_item = lnext(prev_item);
6554 * Post-subcommit or post-subabort cleanup for ON COMMIT management.
6556 * During subabort, we can immediately remove entries created during this
6557 * subtransaction. During subcommit, just relabel entries marked during
6558 * this subtransaction as being the parent's responsibility.
6561 AtEOSubXact_on_commit_actions(bool isCommit, SubTransactionId mySubid,
6562 SubTransactionId parentSubid)
6565 ListCell *prev_item;
6568 cur_item = list_head(on_commits);
6570 while (cur_item != NULL)
6572 OnCommitItem *oc = (OnCommitItem *) lfirst(cur_item);
6574 if (!isCommit && oc->creating_subid == mySubid)
6576 /* cur_item must be removed */
6577 on_commits = list_delete_cell(on_commits, cur_item, prev_item);
6580 cur_item = lnext(prev_item);
6582 cur_item = list_head(on_commits);
6586 /* cur_item must be preserved */
6587 if (oc->creating_subid == mySubid)
6588 oc->creating_subid = parentSubid;
6589 if (oc->deleting_subid == mySubid)
6590 oc->deleting_subid = isCommit ? parentSubid : InvalidSubTransactionId;
6591 prev_item = cur_item;
6592 cur_item = lnext(prev_item);