OSDN Git Service

Cause ALTER TABLE to perform ALTER COLUMN DROP DEFAULT operations during
[pg-rex/syncrep.git] / src / backend / commands / tablecmds.c
1 /*-------------------------------------------------------------------------
2  *
3  * tablecmds.c
4  *        Commands for creating and altering table structures and settings
5  *
6  * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.199 2006/08/03 20:57:06 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/genam.h"
18 #include "access/heapam.h"
19 #include "access/reloptions.h"
20 #include "access/xact.h"
21 #include "catalog/catalog.h"
22 #include "catalog/dependency.h"
23 #include "catalog/heap.h"
24 #include "catalog/index.h"
25 #include "catalog/indexing.h"
26 #include "catalog/namespace.h"
27 #include "catalog/pg_constraint.h"
28 #include "catalog/pg_depend.h"
29 #include "catalog/pg_inherits.h"
30 #include "catalog/pg_namespace.h"
31 #include "catalog/pg_trigger.h"
32 #include "catalog/pg_type.h"
33 #include "catalog/toasting.h"
34 #include "commands/cluster.h"
35 #include "commands/defrem.h"
36 #include "commands/tablecmds.h"
37 #include "commands/tablespace.h"
38 #include "commands/trigger.h"
39 #include "commands/typecmds.h"
40 #include "executor/executor.h"
41 #include "miscadmin.h"
42 #include "nodes/makefuncs.h"
43 #include "optimizer/clauses.h"
44 #include "optimizer/plancat.h"
45 #include "optimizer/prep.h"
46 #include "parser/analyze.h"
47 #include "parser/gramparse.h"
48 #include "parser/parse_clause.h"
49 #include "parser/parse_coerce.h"
50 #include "parser/parse_expr.h"
51 #include "parser/parse_oper.h"
52 #include "parser/parse_relation.h"
53 #include "parser/parse_type.h"
54 #include "parser/parser.h"
55 #include "rewrite/rewriteHandler.h"
56 #include "storage/smgr.h"
57 #include "utils/acl.h"
58 #include "utils/builtins.h"
59 #include "utils/fmgroids.h"
60 #include "utils/inval.h"
61 #include "utils/lsyscache.h"
62 #include "utils/memutils.h"
63 #include "utils/relcache.h"
64 #include "utils/syscache.h"
65
66
67 /*
68  * ON COMMIT action list
69  */
70 typedef struct OnCommitItem
71 {
72         Oid                     relid;                  /* relid of relation */
73         OnCommitAction oncommit;        /* what to do at end of xact */
74
75         /*
76          * If this entry was created during the current transaction,
77          * creating_subid is the ID of the creating subxact; if created in a prior
78          * transaction, creating_subid is zero.  If deleted during the current
79          * transaction, deleting_subid is the ID of the deleting subxact; if no
80          * deletion request is pending, deleting_subid is zero.
81          */
82         SubTransactionId creating_subid;
83         SubTransactionId deleting_subid;
84 } OnCommitItem;
85
86 static List *on_commits = NIL;
87
88
89 /*
90  * State information for ALTER TABLE
91  *
92  * The pending-work queue for an ALTER TABLE is a List of AlteredTableInfo
93  * structs, one for each table modified by the operation (the named table
94  * plus any child tables that are affected).  We save lists of subcommands
95  * to apply to this table (possibly modified by parse transformation steps);
96  * these lists will be executed in Phase 2.  If a Phase 3 step is needed,
97  * necessary information is stored in the constraints and newvals lists.
98  *
99  * Phase 2 is divided into multiple passes; subcommands are executed in
100  * a pass determined by subcommand type.
101  */
102
103 #define AT_PASS_DROP                    0               /* DROP (all flavors) */
104 #define AT_PASS_ALTER_TYPE              1               /* ALTER COLUMN TYPE */
105 #define AT_PASS_OLD_INDEX               2               /* re-add existing indexes */
106 #define AT_PASS_OLD_CONSTR              3               /* re-add existing constraints */
107 #define AT_PASS_COL_ATTRS               4               /* set other column attributes */
108 /* We could support a RENAME COLUMN pass here, but not currently used */
109 #define AT_PASS_ADD_COL                 5               /* ADD COLUMN */
110 #define AT_PASS_ADD_INDEX               6               /* ADD indexes */
111 #define AT_PASS_ADD_CONSTR              7               /* ADD constraints, defaults */
112 #define AT_PASS_MISC                    8               /* other stuff */
113 #define AT_NUM_PASSES                   9
114
115 typedef struct AlteredTableInfo
116 {
117         /* Information saved before any work commences: */
118         Oid                     relid;                  /* Relation to work on */
119         char            relkind;                /* Its relkind */
120         TupleDesc       oldDesc;                /* Pre-modification tuple descriptor */
121         /* Information saved by Phase 1 for Phase 2: */
122         List       *subcmds[AT_NUM_PASSES]; /* Lists of AlterTableCmd */
123         /* Information saved by Phases 1/2 for Phase 3: */
124         List       *constraints;        /* List of NewConstraint */
125         List       *newvals;            /* List of NewColumnValue */
126         bool            new_notnull;    /* T if we added new NOT NULL constraints */
127         Oid                     newTableSpace;  /* new tablespace; 0 means no change */
128         /* Objects to rebuild after completing ALTER TYPE operations */
129         List       *changedConstraintOids;      /* OIDs of constraints to rebuild */
130         List       *changedConstraintDefs;      /* string definitions of same */
131         List       *changedIndexOids;           /* OIDs of indexes to rebuild */
132         List       *changedIndexDefs;           /* string definitions of same */
133 } AlteredTableInfo;
134
135 /* Struct describing one new constraint to check in Phase 3 scan */
136 /* Note: new NOT NULL constraints are handled elsewhere */
137 typedef struct NewConstraint
138 {
139         char       *name;                       /* Constraint name, or NULL if none */
140         ConstrType      contype;                /* CHECK or FOREIGN */
141         Oid                     refrelid;               /* PK rel, if FOREIGN */
142         Node       *qual;                       /* Check expr or FkConstraint struct */
143         List       *qualstate;          /* Execution state for CHECK */
144 } NewConstraint;
145
146 /*
147  * Struct describing one new column value that needs to be computed during
148  * Phase 3 copy (this could be either a new column with a non-null default, or
149  * a column that we're changing the type of).  Columns without such an entry
150  * are just copied from the old table during ATRewriteTable.  Note that the
151  * expr is an expression over *old* table values.
152  */
153 typedef struct NewColumnValue
154 {
155         AttrNumber      attnum;                 /* which column */
156         Expr       *expr;                       /* expression to compute */
157         ExprState  *exprstate;          /* execution state */
158 } NewColumnValue;
159
160
161 static void truncate_check_rel(Relation rel);
162 static List *MergeAttributes(List *schema, List *supers, bool istemp,
163                                 List **supOids, List **supconstr, int *supOidCount);
164 static void MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel);
165 static void MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel);
166 static bool change_varattnos_walker(Node *node, const AttrNumber *newattno);
167 static void StoreCatalogInheritance(Oid relationId, List *supers);
168 static void StoreCatalogInheritance1(Oid relationId, Oid parentOid,
169                                         int16 seqNumber, Relation catalogRelation);
170 static int      findAttrByName(const char *attributeName, List *schema);
171 static void setRelhassubclassInRelation(Oid relationId, bool relhassubclass);
172 static void AlterIndexNamespaces(Relation classRel, Relation rel,
173                                          Oid oldNspOid, Oid newNspOid);
174 static void AlterSeqNamespaces(Relation classRel, Relation rel,
175                                    Oid oldNspOid, Oid newNspOid,
176                                    const char *newNspName);
177 static int transformColumnNameList(Oid relId, List *colList,
178                                                 int16 *attnums, Oid *atttypids);
179 static int transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
180                                                    List **attnamelist,
181                                                    int16 *attnums, Oid *atttypids,
182                                                    Oid *opclasses);
183 static Oid transformFkeyCheckAttrs(Relation pkrel,
184                                                 int numattrs, int16 *attnums,
185                                                 Oid *opclasses);
186 static void validateForeignKeyConstraint(FkConstraint *fkconstraint,
187                                                          Relation rel, Relation pkrel);
188 static void createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint,
189                                                  Oid constrOid);
190 static char *fkMatchTypeToString(char match_type);
191 static void ATController(Relation rel, List *cmds, bool recurse);
192 static void ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
193                   bool recurse, bool recursing);
194 static void ATRewriteCatalogs(List **wqueue);
195 static void ATExecCmd(AlteredTableInfo *tab, Relation rel, AlterTableCmd *cmd);
196 static void ATRewriteTables(List **wqueue);
197 static void ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap);
198 static AlteredTableInfo *ATGetQueueEntry(List **wqueue, Relation rel);
199 static void ATSimplePermissions(Relation rel, bool allowView);
200 static void ATSimplePermissionsRelationOrIndex(Relation rel);
201 static void ATSimpleRecursion(List **wqueue, Relation rel,
202                                   AlterTableCmd *cmd, bool recurse);
203 static void ATOneLevelRecursion(List **wqueue, Relation rel,
204                                         AlterTableCmd *cmd);
205 static void find_composite_type_dependencies(Oid typeOid,
206                                                                  const char *origTblName);
207 static void ATPrepAddColumn(List **wqueue, Relation rel, bool recurse,
208                                 AlterTableCmd *cmd);
209 static void ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
210                                 ColumnDef *colDef);
211 static void add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid);
212 static void add_column_support_dependency(Oid relid, int32 attnum,
213                                                           RangeVar *support);
214 static void ATExecDropNotNull(Relation rel, const char *colName);
215 static void ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
216                                  const char *colName);
217 static void ATExecColumnDefault(Relation rel, const char *colName,
218                                         Node *newDefault);
219 static void ATPrepSetStatistics(Relation rel, const char *colName,
220                                         Node *flagValue);
221 static void ATExecSetStatistics(Relation rel, const char *colName,
222                                         Node *newValue);
223 static void ATExecSetStorage(Relation rel, const char *colName,
224                                  Node *newValue);
225 static void ATExecDropColumn(Relation rel, const char *colName,
226                                  DropBehavior behavior,
227                                  bool recurse, bool recursing);
228 static void ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
229                            IndexStmt *stmt, bool is_rebuild);
230 static void ATExecAddConstraint(AlteredTableInfo *tab, Relation rel,
231                                         Node *newConstraint);
232 static void ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
233                                                   FkConstraint *fkconstraint);
234 static void ATPrepDropConstraint(List **wqueue, Relation rel,
235                                          bool recurse, AlterTableCmd *cmd);
236 static void ATExecDropConstraint(Relation rel, const char *constrName,
237                                          DropBehavior behavior, bool quiet);
238 static void ATPrepAlterColumnType(List **wqueue,
239                                           AlteredTableInfo *tab, Relation rel,
240                                           bool recurse, bool recursing,
241                                           AlterTableCmd *cmd);
242 static void ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
243                                           const char *colName, TypeName *typename);
244 static void ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab);
245 static void ATPostAlterTypeParse(char *cmd, List **wqueue);
246 static void change_owner_recurse_to_sequences(Oid relationOid,
247                                                                   Oid newOwnerId);
248 static void ATExecClusterOn(Relation rel, const char *indexName);
249 static void ATExecDropCluster(Relation rel);
250 static void ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel,
251                                         char *tablespacename);
252 static void ATExecSetTableSpace(Oid tableOid, Oid newTableSpace);
253 static void ATExecSetRelOptions(Relation rel, List *defList, bool isReset);
254 static void ATExecEnableDisableTrigger(Relation rel, char *trigname,
255                                                    bool enable, bool skip_system);
256 static void ATExecAddInherits(Relation rel, RangeVar *parent);
257 static void ATExecDropInherits(Relation rel, RangeVar *parent);
258 static void copy_relation_data(Relation rel, SMgrRelation dst);
259 static void update_ri_trigger_args(Oid relid,
260                                            const char *oldname,
261                                            const char *newname,
262                                            bool fk_scan,
263                                            bool update_relname);
264
265
266 /* ----------------------------------------------------------------
267  *              DefineRelation
268  *                              Creates a new relation.
269  *
270  * If successful, returns the OID of the new relation.
271  * ----------------------------------------------------------------
272  */
273 Oid
274 DefineRelation(CreateStmt *stmt, char relkind)
275 {
276         char            relname[NAMEDATALEN];
277         Oid                     namespaceId;
278         List       *schema = stmt->tableElts;
279         Oid                     relationId;
280         Oid                     tablespaceId;
281         Relation        rel;
282         TupleDesc       descriptor;
283         List       *inheritOids;
284         List       *old_constraints;
285         bool            localHasOids;
286         int                     parentOidCount;
287         List       *rawDefaults;
288         Datum           reloptions;
289         ListCell   *listptr;
290         int                     i;
291         AttrNumber      attnum;
292
293         /*
294          * Truncate relname to appropriate length (probably a waste of time, as
295          * parser should have done this already).
296          */
297         StrNCpy(relname, stmt->relation->relname, NAMEDATALEN);
298
299         /*
300          * Check consistency of arguments
301          */
302         if (stmt->oncommit != ONCOMMIT_NOOP && !stmt->relation->istemp)
303                 ereport(ERROR,
304                                 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
305                                  errmsg("ON COMMIT can only be used on temporary tables")));
306
307         /*
308          * Look up the namespace in which we are supposed to create the relation.
309          * Check we have permission to create there. Skip check if bootstrapping,
310          * since permissions machinery may not be working yet.
311          */
312         namespaceId = RangeVarGetCreationNamespace(stmt->relation);
313
314         if (!IsBootstrapProcessingMode())
315         {
316                 AclResult       aclresult;
317
318                 aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(),
319                                                                                   ACL_CREATE);
320                 if (aclresult != ACLCHECK_OK)
321                         aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
322                                                    get_namespace_name(namespaceId));
323         }
324
325         /*
326          * Select tablespace to use.  If not specified, use default_tablespace
327          * (which may in turn default to database's default).
328          */
329         if (stmt->tablespacename)
330         {
331                 tablespaceId = get_tablespace_oid(stmt->tablespacename);
332                 if (!OidIsValid(tablespaceId))
333                         ereport(ERROR,
334                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
335                                          errmsg("tablespace \"%s\" does not exist",
336                                                         stmt->tablespacename)));
337         }
338         else
339         {
340                 tablespaceId = GetDefaultTablespace();
341                 /* note InvalidOid is OK in this case */
342         }
343
344         /*
345          * Parse and validate reloptions, if any.
346          */
347         reloptions = transformRelOptions((Datum) 0, stmt->options, true, false);
348
349         (void) heap_reloptions(relkind, reloptions, true);
350
351         /* Check permissions except when using database's default */
352         if (OidIsValid(tablespaceId))
353         {
354                 AclResult       aclresult;
355
356                 aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(),
357                                                                                    ACL_CREATE);
358                 if (aclresult != ACLCHECK_OK)
359                         aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
360                                                    get_tablespace_name(tablespaceId));
361         }
362
363         /*
364          * Look up inheritance ancestors and generate relation schema, including
365          * inherited attributes.
366          */
367         schema = MergeAttributes(schema, stmt->inhRelations,
368                                                          stmt->relation->istemp,
369                                                          &inheritOids, &old_constraints, &parentOidCount);
370
371         /*
372          * Create a relation descriptor from the relation schema and create the
373          * relation.  Note that in this stage only inherited (pre-cooked) defaults
374          * and constraints will be included into the new relation.
375          * (BuildDescForRelation takes care of the inherited defaults, but we have
376          * to copy inherited constraints here.)
377          */
378         descriptor = BuildDescForRelation(schema);
379
380         localHasOids = interpretOidsOption(stmt->options);
381         descriptor->tdhasoid = (localHasOids || parentOidCount > 0);
382
383         if (old_constraints != NIL)
384         {
385                 ConstrCheck *check = (ConstrCheck *)
386                 palloc0(list_length(old_constraints) * sizeof(ConstrCheck));
387                 int                     ncheck = 0;
388
389                 foreach(listptr, old_constraints)
390                 {
391                         Constraint *cdef = (Constraint *) lfirst(listptr);
392                         bool            dup = false;
393
394                         if (cdef->contype != CONSTR_CHECK)
395                                 continue;
396                         Assert(cdef->name != NULL);
397                         Assert(cdef->raw_expr == NULL && cdef->cooked_expr != NULL);
398
399                         /*
400                          * In multiple-inheritance situations, it's possible to inherit
401                          * the same grandparent constraint through multiple parents.
402                          * Hence, discard inherited constraints that match as to both name
403                          * and expression.      Otherwise, gripe if the names conflict.
404                          */
405                         for (i = 0; i < ncheck; i++)
406                         {
407                                 if (strcmp(check[i].ccname, cdef->name) != 0)
408                                         continue;
409                                 if (strcmp(check[i].ccbin, cdef->cooked_expr) == 0)
410                                 {
411                                         dup = true;
412                                         break;
413                                 }
414                                 ereport(ERROR,
415                                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
416                                                  errmsg("duplicate check constraint name \"%s\"",
417                                                                 cdef->name)));
418                         }
419                         if (!dup)
420                         {
421                                 check[ncheck].ccname = cdef->name;
422                                 check[ncheck].ccbin = pstrdup(cdef->cooked_expr);
423                                 ncheck++;
424                         }
425                 }
426                 if (ncheck > 0)
427                 {
428                         if (descriptor->constr == NULL)
429                         {
430                                 descriptor->constr = (TupleConstr *) palloc(sizeof(TupleConstr));
431                                 descriptor->constr->defval = NULL;
432                                 descriptor->constr->num_defval = 0;
433                                 descriptor->constr->has_not_null = false;
434                         }
435                         descriptor->constr->num_check = ncheck;
436                         descriptor->constr->check = check;
437                 }
438         }
439
440         relationId = heap_create_with_catalog(relname,
441                                                                                   namespaceId,
442                                                                                   tablespaceId,
443                                                                                   InvalidOid,
444                                                                                   GetUserId(),
445                                                                                   descriptor,
446                                                                                   relkind,
447                                                                                   false,
448                                                                                   localHasOids,
449                                                                                   parentOidCount,
450                                                                                   stmt->oncommit,
451                                                                                   reloptions,
452                                                                                   allowSystemTableMods);
453
454         StoreCatalogInheritance(relationId, inheritOids);
455
456         /*
457          * We must bump the command counter to make the newly-created relation
458          * tuple visible for opening.
459          */
460         CommandCounterIncrement();
461
462         /*
463          * Open the new relation and acquire exclusive lock on it.      This isn't
464          * really necessary for locking out other backends (since they can't see
465          * the new rel anyway until we commit), but it keeps the lock manager from
466          * complaining about deadlock risks.
467          */
468         rel = relation_open(relationId, AccessExclusiveLock);
469
470         /*
471          * Now add any newly specified column default values and CHECK constraints
472          * to the new relation.  These are passed to us in the form of raw
473          * parsetrees; we need to transform them to executable expression trees
474          * before they can be added. The most convenient way to do that is to
475          * apply the parser's transformExpr routine, but transformExpr doesn't
476          * work unless we have a pre-existing relation. So, the transformation has
477          * to be postponed to this final step of CREATE TABLE.
478          *
479          * Another task that's conveniently done at this step is to add dependency
480          * links between columns and supporting relations (such as SERIAL
481          * sequences).
482          *
483          * First, scan schema to find new column defaults.
484          */
485         rawDefaults = NIL;
486         attnum = 0;
487
488         foreach(listptr, schema)
489         {
490                 ColumnDef  *colDef = lfirst(listptr);
491
492                 attnum++;
493
494                 if (colDef->raw_default != NULL)
495                 {
496                         RawColumnDefault *rawEnt;
497
498                         Assert(colDef->cooked_default == NULL);
499
500                         rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault));
501                         rawEnt->attnum = attnum;
502                         rawEnt->raw_default = colDef->raw_default;
503                         rawDefaults = lappend(rawDefaults, rawEnt);
504                 }
505
506                 /* Create dependency for supporting relation for this column */
507                 if (colDef->support != NULL)
508                         add_column_support_dependency(relationId, attnum, colDef->support);
509         }
510
511         /*
512          * Parse and add the defaults/constraints, if any.
513          */
514         if (rawDefaults || stmt->constraints)
515                 AddRelationRawConstraints(rel, rawDefaults, stmt->constraints);
516
517         /*
518          * Clean up.  We keep lock on new relation (although it shouldn't be
519          * visible to anyone else anyway, until commit).
520          */
521         relation_close(rel, NoLock);
522
523         return relationId;
524 }
525
526 /*
527  * RemoveRelation
528  *              Deletes a relation.
529  */
530 void
531 RemoveRelation(const RangeVar *relation, DropBehavior behavior)
532 {
533         Oid                     relOid;
534         ObjectAddress object;
535
536         relOid = RangeVarGetRelid(relation, false);
537
538         object.classId = RelationRelationId;
539         object.objectId = relOid;
540         object.objectSubId = 0;
541
542         performDeletion(&object, behavior);
543 }
544
545 /*
546  * ExecuteTruncate
547  *              Executes a TRUNCATE command.
548  *
549  * This is a multi-relation truncate.  We first open and grab exclusive
550  * lock on all relations involved, checking permissions and otherwise
551  * verifying that the relation is OK for truncation.  In CASCADE mode,
552  * relations having FK references to the targeted relations are automatically
553  * added to the group; in RESTRICT mode, we check that all FK references are
554  * internal to the group that's being truncated.  Finally all the relations
555  * are truncated and reindexed.
556  */
557 void
558 ExecuteTruncate(TruncateStmt *stmt)
559 {
560         List       *rels = NIL;
561         List       *relids = NIL;
562         ListCell   *cell;
563
564         /*
565          * Open, exclusive-lock, and check all the explicitly-specified relations
566          */
567         foreach(cell, stmt->relations)
568         {
569                 RangeVar   *rv = lfirst(cell);
570                 Relation        rel;
571
572                 rel = heap_openrv(rv, AccessExclusiveLock);
573                 truncate_check_rel(rel);
574                 rels = lappend(rels, rel);
575                 relids = lappend_oid(relids, RelationGetRelid(rel));
576         }
577
578         /*
579          * In CASCADE mode, suck in all referencing relations as well.  This
580          * requires multiple iterations to find indirectly-dependent relations.
581          * At each phase, we need to exclusive-lock new rels before looking
582          * for their dependencies, else we might miss something.  Also, we
583          * check each rel as soon as we open it, to avoid a faux pas such as
584          * holding lock for a long time on a rel we have no permissions for.
585          */
586         if (stmt->behavior == DROP_CASCADE)
587         {
588                 for (;;)
589                 {
590                         List   *newrelids;
591
592                         newrelids = heap_truncate_find_FKs(relids);
593                         if (newrelids == NIL)
594                                 break;                  /* nothing else to add */
595
596                         foreach(cell, newrelids)
597                         {
598                                 Oid             relid = lfirst_oid(cell);
599                                 Relation        rel;
600
601                                 rel = heap_open(relid, AccessExclusiveLock);
602                                 ereport(NOTICE,
603                                                 (errmsg("truncate cascades to table \"%s\"",
604                                                                 RelationGetRelationName(rel))));
605                                 truncate_check_rel(rel);
606                                 rels = lappend(rels, rel);
607                                 relids = lappend_oid(relids, relid);
608                         }
609                 }
610         }
611
612         /*
613          * Check foreign key references.  In CASCADE mode, this should be
614          * unnecessary since we just pulled in all the references; but as
615          * a cross-check, do it anyway if in an Assert-enabled build.
616          */
617 #ifdef USE_ASSERT_CHECKING
618         heap_truncate_check_FKs(rels, false);
619 #else
620         if (stmt->behavior == DROP_RESTRICT)
621                 heap_truncate_check_FKs(rels, false);
622 #endif
623
624         /*
625          * OK, truncate each table.
626          */
627         foreach(cell, rels)
628         {
629                 Relation        rel = (Relation) lfirst(cell);
630                 Oid                     heap_relid;
631                 Oid                     toast_relid;
632
633                 /*
634                  * Create a new empty storage file for the relation, and assign it as
635                  * the relfilenode value.       The old storage file is scheduled for
636                  * deletion at commit.
637                  */
638                 setNewRelfilenode(rel);
639
640                 heap_relid = RelationGetRelid(rel);
641                 toast_relid = rel->rd_rel->reltoastrelid;
642
643                 heap_close(rel, NoLock);
644
645                 /*
646                  * The same for the toast table, if any.
647                  */
648                 if (OidIsValid(toast_relid))
649                 {
650                         rel = relation_open(toast_relid, AccessExclusiveLock);
651                         setNewRelfilenode(rel);
652                         heap_close(rel, NoLock);
653                 }
654
655                 /*
656                  * Reconstruct the indexes to match, and we're done.
657                  */
658                 reindex_relation(heap_relid, true);
659         }
660 }
661
662 /*
663  * Check that a given rel is safe to truncate.  Subroutine for ExecuteTruncate
664  */
665 static void
666 truncate_check_rel(Relation rel)
667 {
668         /* Only allow truncate on regular tables */
669         if (rel->rd_rel->relkind != RELKIND_RELATION)
670                 ereport(ERROR,
671                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
672                                  errmsg("\"%s\" is not a table",
673                                                 RelationGetRelationName(rel))));
674
675         /* Permissions checks */
676         if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
677                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
678                                            RelationGetRelationName(rel));
679
680         if (!allowSystemTableMods && IsSystemRelation(rel))
681                 ereport(ERROR,
682                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
683                                  errmsg("permission denied: \"%s\" is a system catalog",
684                                                 RelationGetRelationName(rel))));
685
686         /*
687          * We can never allow truncation of shared or nailed-in-cache
688          * relations, because we can't support changing their relfilenode
689          * values.
690          */
691         if (rel->rd_rel->relisshared || rel->rd_isnailed)
692                 ereport(ERROR,
693                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
694                                  errmsg("cannot truncate system relation \"%s\"",
695                                                 RelationGetRelationName(rel))));
696
697         /*
698          * Don't allow truncate on temp tables of other backends ... their
699          * local buffer manager is not going to cope.
700          */
701         if (isOtherTempNamespace(RelationGetNamespace(rel)))
702                 ereport(ERROR,
703                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
704                                  errmsg("cannot truncate temporary tables of other sessions")));
705 }
706
707 /*----------
708  * MergeAttributes
709  *              Returns new schema given initial schema and superclasses.
710  *
711  * Input arguments:
712  * 'schema' is the column/attribute definition for the table. (It's a list
713  *              of ColumnDef's.) It is destructively changed.
714  * 'supers' is a list of names (as RangeVar nodes) of parent relations.
715  * 'istemp' is TRUE if we are creating a temp relation.
716  *
717  * Output arguments:
718  * 'supOids' receives a list of the OIDs of the parent relations.
719  * 'supconstr' receives a list of constraints belonging to the parents,
720  *              updated as necessary to be valid for the child.
721  * 'supOidCount' is set to the number of parents that have OID columns.
722  *
723  * Return value:
724  * Completed schema list.
725  *
726  * Notes:
727  *        The order in which the attributes are inherited is very important.
728  *        Intuitively, the inherited attributes should come first. If a table
729  *        inherits from multiple parents, the order of those attributes are
730  *        according to the order of the parents specified in CREATE TABLE.
731  *
732  *        Here's an example:
733  *
734  *              create table person (name text, age int4, location point);
735  *              create table emp (salary int4, manager text) inherits(person);
736  *              create table student (gpa float8) inherits (person);
737  *              create table stud_emp (percent int4) inherits (emp, student);
738  *
739  *        The order of the attributes of stud_emp is:
740  *
741  *                                                      person {1:name, 2:age, 3:location}
742  *                                                      /        \
743  *                         {6:gpa}      student   emp {4:salary, 5:manager}
744  *                                                      \        /
745  *                                                 stud_emp {7:percent}
746  *
747  *         If the same attribute name appears multiple times, then it appears
748  *         in the result table in the proper location for its first appearance.
749  *
750  *         Constraints (including NOT NULL constraints) for the child table
751  *         are the union of all relevant constraints, from both the child schema
752  *         and parent tables.
753  *
754  *         The default value for a child column is defined as:
755  *              (1) If the child schema specifies a default, that value is used.
756  *              (2) If neither the child nor any parent specifies a default, then
757  *                      the column will not have a default.
758  *              (3) If conflicting defaults are inherited from different parents
759  *                      (and not overridden by the child), an error is raised.
760  *              (4) Otherwise the inherited default is used.
761  *              Rule (3) is new in Postgres 7.1; in earlier releases you got a
762  *              rather arbitrary choice of which parent default to use.
763  *----------
764  */
765 static List *
766 MergeAttributes(List *schema, List *supers, bool istemp,
767                                 List **supOids, List **supconstr, int *supOidCount)
768 {
769         ListCell   *entry;
770         List       *inhSchema = NIL;
771         List       *parentOids = NIL;
772         List       *constraints = NIL;
773         int                     parentsWithOids = 0;
774         bool            have_bogus_defaults = false;
775         char       *bogus_marker = "Bogus!";            /* marks conflicting defaults */
776         int                     child_attno;
777
778         /*
779          * Check for and reject tables with too many columns. We perform this
780          * check relatively early for two reasons: (a) we don't run the risk of
781          * overflowing an AttrNumber in subsequent code (b) an O(n^2) algorithm is
782          * okay if we're processing <= 1600 columns, but could take minutes to
783          * execute if the user attempts to create a table with hundreds of
784          * thousands of columns.
785          *
786          * Note that we also need to check that any we do not exceed this figure
787          * after including columns from inherited relations.
788          */
789         if (list_length(schema) > MaxHeapAttributeNumber)
790                 ereport(ERROR,
791                                 (errcode(ERRCODE_TOO_MANY_COLUMNS),
792                                  errmsg("tables can have at most %d columns",
793                                                 MaxHeapAttributeNumber)));
794
795         /*
796          * Check for duplicate names in the explicit list of attributes.
797          *
798          * Although we might consider merging such entries in the same way that we
799          * handle name conflicts for inherited attributes, it seems to make more
800          * sense to assume such conflicts are errors.
801          */
802         foreach(entry, schema)
803         {
804                 ColumnDef  *coldef = lfirst(entry);
805                 ListCell   *rest;
806
807                 for_each_cell(rest, lnext(entry))
808                 {
809                         ColumnDef  *restdef = lfirst(rest);
810
811                         if (strcmp(coldef->colname, restdef->colname) == 0)
812                                 ereport(ERROR,
813                                                 (errcode(ERRCODE_DUPLICATE_COLUMN),
814                                                  errmsg("column \"%s\" duplicated",
815                                                                 coldef->colname)));
816                 }
817         }
818
819         /*
820          * Scan the parents left-to-right, and merge their attributes to form a
821          * list of inherited attributes (inhSchema).  Also check to see if we need
822          * to inherit an OID column.
823          */
824         child_attno = 0;
825         foreach(entry, supers)
826         {
827                 RangeVar   *parent = (RangeVar *) lfirst(entry);
828                 Relation        relation;
829                 TupleDesc       tupleDesc;
830                 TupleConstr *constr;
831                 AttrNumber *newattno;
832                 AttrNumber      parent_attno;
833
834                 relation = heap_openrv(parent, AccessShareLock);
835
836                 if (relation->rd_rel->relkind != RELKIND_RELATION)
837                         ereport(ERROR,
838                                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
839                                          errmsg("inherited relation \"%s\" is not a table",
840                                                         parent->relname)));
841                 /* Permanent rels cannot inherit from temporary ones */
842                 if (!istemp && isTempNamespace(RelationGetNamespace(relation)))
843                         ereport(ERROR,
844                                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
845                                          errmsg("cannot inherit from temporary relation \"%s\"",
846                                                         parent->relname)));
847
848                 /*
849                  * We should have an UNDER permission flag for this, but for now,
850                  * demand that creator of a child table own the parent.
851                  */
852                 if (!pg_class_ownercheck(RelationGetRelid(relation), GetUserId()))
853                         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
854                                                    RelationGetRelationName(relation));
855
856                 /*
857                  * Reject duplications in the list of parents.
858                  */
859                 if (list_member_oid(parentOids, RelationGetRelid(relation)))
860                         ereport(ERROR,
861                                         (errcode(ERRCODE_DUPLICATE_TABLE),
862                                          errmsg("inherited relation \"%s\" duplicated",
863                                                         parent->relname)));
864
865                 parentOids = lappend_oid(parentOids, RelationGetRelid(relation));
866
867                 if (relation->rd_rel->relhasoids)
868                         parentsWithOids++;
869
870                 tupleDesc = RelationGetDescr(relation);
871                 constr = tupleDesc->constr;
872
873                 /*
874                  * newattno[] will contain the child-table attribute numbers for the
875                  * attributes of this parent table.  (They are not the same for
876                  * parents after the first one, nor if we have dropped columns.)
877                  */
878                 newattno = (AttrNumber *)
879                         palloc(tupleDesc->natts * sizeof(AttrNumber));
880
881                 for (parent_attno = 1; parent_attno <= tupleDesc->natts;
882                          parent_attno++)
883                 {
884                         Form_pg_attribute attribute = tupleDesc->attrs[parent_attno - 1];
885                         char       *attributeName = NameStr(attribute->attname);
886                         int                     exist_attno;
887                         ColumnDef  *def;
888
889                         /*
890                          * Ignore dropped columns in the parent.
891                          */
892                         if (attribute->attisdropped)
893                         {
894                                 /*
895                                  * change_varattnos_of_a_node asserts that this is greater
896                                  * than zero, so if anything tries to use it, we should find
897                                  * out.
898                                  */
899                                 newattno[parent_attno - 1] = 0;
900                                 continue;
901                         }
902
903                         /*
904                          * Does it conflict with some previously inherited column?
905                          */
906                         exist_attno = findAttrByName(attributeName, inhSchema);
907                         if (exist_attno > 0)
908                         {
909                                 /*
910                                  * Yes, try to merge the two column definitions. They must
911                                  * have the same type and typmod.
912                                  */
913                                 ereport(NOTICE,
914                                                 (errmsg("merging multiple inherited definitions of column \"%s\"",
915                                                                 attributeName)));
916                                 def = (ColumnDef *) list_nth(inhSchema, exist_attno - 1);
917                                 if (typenameTypeId(NULL, def->typename) != attribute->atttypid ||
918                                         def->typename->typmod != attribute->atttypmod)
919                                         ereport(ERROR,
920                                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
921                                                 errmsg("inherited column \"%s\" has a type conflict",
922                                                            attributeName),
923                                                          errdetail("%s versus %s",
924                                                                            TypeNameToString(def->typename),
925                                                                            format_type_be(attribute->atttypid))));
926                                 def->inhcount++;
927                                 /* Merge of NOT NULL constraints = OR 'em together */
928                                 def->is_not_null |= attribute->attnotnull;
929                                 /* Default and other constraints are handled below */
930                                 newattno[parent_attno - 1] = exist_attno;
931                         }
932                         else
933                         {
934                                 /*
935                                  * No, create a new inherited column
936                                  */
937                                 def = makeNode(ColumnDef);
938                                 def->colname = pstrdup(attributeName);
939                                 def->typename = makeTypeNameFromOid(attribute->atttypid,
940                                                                                                         attribute->atttypmod);
941                                 def->inhcount = 1;
942                                 def->is_local = false;
943                                 def->is_not_null = attribute->attnotnull;
944                                 def->raw_default = NULL;
945                                 def->cooked_default = NULL;
946                                 def->constraints = NIL;
947                                 def->support = NULL;
948                                 inhSchema = lappend(inhSchema, def);
949                                 newattno[parent_attno - 1] = ++child_attno;
950                         }
951
952                         /*
953                          * Copy default if any
954                          */
955                         if (attribute->atthasdef)
956                         {
957                                 char       *this_default = NULL;
958                                 AttrDefault *attrdef;
959                                 int                     i;
960
961                                 /* Find default in constraint structure */
962                                 Assert(constr != NULL);
963                                 attrdef = constr->defval;
964                                 for (i = 0; i < constr->num_defval; i++)
965                                 {
966                                         if (attrdef[i].adnum == parent_attno)
967                                         {
968                                                 this_default = attrdef[i].adbin;
969                                                 break;
970                                         }
971                                 }
972                                 Assert(this_default != NULL);
973
974                                 /*
975                                  * If default expr could contain any vars, we'd need to fix
976                                  * 'em, but it can't; so default is ready to apply to child.
977                                  *
978                                  * If we already had a default from some prior parent, check
979                                  * to see if they are the same.  If so, no problem; if not,
980                                  * mark the column as having a bogus default. Below, we will
981                                  * complain if the bogus default isn't overridden by the child
982                                  * schema.
983                                  */
984                                 Assert(def->raw_default == NULL);
985                                 if (def->cooked_default == NULL)
986                                         def->cooked_default = pstrdup(this_default);
987                                 else if (strcmp(def->cooked_default, this_default) != 0)
988                                 {
989                                         def->cooked_default = bogus_marker;
990                                         have_bogus_defaults = true;
991                                 }
992                         }
993                 }
994
995                 /*
996                  * Now copy the constraints of this parent, adjusting attnos using the
997                  * completed newattno[] map
998                  */
999                 if (constr && constr->num_check > 0)
1000                 {
1001                         ConstrCheck *check = constr->check;
1002                         int                     i;
1003
1004                         for (i = 0; i < constr->num_check; i++)
1005                         {
1006                                 Constraint *cdef = makeNode(Constraint);
1007                                 Node       *expr;
1008
1009                                 cdef->contype = CONSTR_CHECK;
1010                                 cdef->name = pstrdup(check[i].ccname);
1011                                 cdef->raw_expr = NULL;
1012                                 /* adjust varattnos of ccbin here */
1013                                 expr = stringToNode(check[i].ccbin);
1014                                 change_varattnos_of_a_node(expr, newattno);
1015                                 cdef->cooked_expr = nodeToString(expr);
1016                                 constraints = lappend(constraints, cdef);
1017                         }
1018                 }
1019
1020                 pfree(newattno);
1021
1022                 /*
1023                  * Close the parent rel, but keep our AccessShareLock on it until xact
1024                  * commit.      That will prevent someone else from deleting or ALTERing
1025                  * the parent before the child is committed.
1026                  */
1027                 heap_close(relation, NoLock);
1028         }
1029
1030         /*
1031          * If we had no inherited attributes, the result schema is just the
1032          * explicitly declared columns.  Otherwise, we need to merge the declared
1033          * columns into the inherited schema list.
1034          */
1035         if (inhSchema != NIL)
1036         {
1037                 foreach(entry, schema)
1038                 {
1039                         ColumnDef  *newdef = lfirst(entry);
1040                         char       *attributeName = newdef->colname;
1041                         int                     exist_attno;
1042
1043                         /*
1044                          * Does it conflict with some previously inherited column?
1045                          */
1046                         exist_attno = findAttrByName(attributeName, inhSchema);
1047                         if (exist_attno > 0)
1048                         {
1049                                 ColumnDef  *def;
1050
1051                                 /*
1052                                  * Yes, try to merge the two column definitions. They must
1053                                  * have the same type and typmod.
1054                                  */
1055                                 ereport(NOTICE,
1056                                    (errmsg("merging column \"%s\" with inherited definition",
1057                                                    attributeName)));
1058                                 def = (ColumnDef *) list_nth(inhSchema, exist_attno - 1);
1059                                 if (typenameTypeId(NULL, def->typename) != typenameTypeId(NULL, newdef->typename) ||
1060                                         def->typename->typmod != newdef->typename->typmod)
1061                                         ereport(ERROR,
1062                                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
1063                                                          errmsg("column \"%s\" has a type conflict",
1064                                                                         attributeName),
1065                                                          errdetail("%s versus %s",
1066                                                                            TypeNameToString(def->typename),
1067                                                                            TypeNameToString(newdef->typename))));
1068                                 /* Mark the column as locally defined */
1069                                 def->is_local = true;
1070                                 /* Merge of NOT NULL constraints = OR 'em together */
1071                                 def->is_not_null |= newdef->is_not_null;
1072                                 /* If new def has a default, override previous default */
1073                                 if (newdef->raw_default != NULL)
1074                                 {
1075                                         def->raw_default = newdef->raw_default;
1076                                         def->cooked_default = newdef->cooked_default;
1077                                 }
1078                         }
1079                         else
1080                         {
1081                                 /*
1082                                  * No, attach new column to result schema
1083                                  */
1084                                 inhSchema = lappend(inhSchema, newdef);
1085                         }
1086                 }
1087
1088                 schema = inhSchema;
1089
1090                 /*
1091                  * Check that we haven't exceeded the legal # of columns after merging
1092                  * in inherited columns.
1093                  */
1094                 if (list_length(schema) > MaxHeapAttributeNumber)
1095                         ereport(ERROR,
1096                                         (errcode(ERRCODE_TOO_MANY_COLUMNS),
1097                                          errmsg("tables can have at most %d columns",
1098                                                         MaxHeapAttributeNumber)));
1099         }
1100
1101         /*
1102          * If we found any conflicting parent default values, check to make sure
1103          * they were overridden by the child.
1104          */
1105         if (have_bogus_defaults)
1106         {
1107                 foreach(entry, schema)
1108                 {
1109                         ColumnDef  *def = lfirst(entry);
1110
1111                         if (def->cooked_default == bogus_marker)
1112                                 ereport(ERROR,
1113                                                 (errcode(ERRCODE_INVALID_COLUMN_DEFINITION),
1114                                   errmsg("column \"%s\" inherits conflicting default values",
1115                                                  def->colname),
1116                                                  errhint("To resolve the conflict, specify a default explicitly.")));
1117                 }
1118         }
1119
1120         *supOids = parentOids;
1121         *supconstr = constraints;
1122         *supOidCount = parentsWithOids;
1123         return schema;
1124 }
1125
1126 /*
1127  * Varattnos of pg_constraint.conbin must be rewritten when subclasses inherit
1128  * constraints from parent classes, since the inherited attributes could
1129  * be given different column numbers in multiple-inheritance cases.
1130  *
1131  * Note that the passed node tree is modified in place!
1132  *
1133  * This function is used elsewhere such as in analyze.c
1134  *
1135  */
1136
1137 void
1138 change_varattnos_of_a_node(Node *node, const AttrNumber *newattno)
1139 {
1140         change_varattnos_walker(node, newattno);
1141 }
1142
1143 /* Generate a map for change_varattnos_of_a_node from two tupledesc's. */
1144
1145 AttrNumber *
1146 varattnos_map(TupleDesc old, TupleDesc new)
1147 {
1148         int i,j;
1149         AttrNumber *attmap = palloc0(sizeof(AttrNumber)*old->natts);
1150         for (i=1; i <= old->natts; i++) {
1151                 if (old->attrs[i-1]->attisdropped) {
1152                         attmap[i-1] = 0;
1153                         continue;
1154                 }
1155                 for (j=1; j<= new->natts; j++)
1156                         if (!strcmp(NameStr(old->attrs[i-1]->attname), NameStr(new->attrs[j-1]->attname)))
1157                                 attmap[i-1] = j;
1158         }
1159         return attmap;
1160 }
1161
1162 /* Generate a map for change_varattnos_of_a_node from a tupledesc and a list of
1163  * ColumnDefs */
1164
1165 AttrNumber *
1166 varattnos_map_schema(TupleDesc old, List *schema) 
1167 {
1168         int i;
1169         AttrNumber *attmap = palloc0(sizeof(AttrNumber)*old->natts);
1170         for (i=1; i <= old->natts; i++) {
1171                 if (old->attrs[i-1]->attisdropped) {
1172                         attmap[i-1] = 0;
1173                         continue;
1174                 }
1175                 attmap[i-1] = findAttrByName(NameStr(old->attrs[i-1]->attname), schema);
1176         }
1177         return attmap;
1178 }
1179
1180 static bool
1181 change_varattnos_walker(Node *node, const AttrNumber *newattno)
1182 {
1183         if (node == NULL)
1184                 return false;
1185         if (IsA(node, Var))
1186         {
1187                 Var                *var = (Var *) node;
1188
1189                 if (var->varlevelsup == 0 && var->varno == 1 &&
1190                         var->varattno > 0)
1191                 {
1192                         /*
1193                          * ??? the following may be a problem when the node is multiply
1194                          * referenced though stringToNode() doesn't create such a node
1195                          * currently.
1196                          */
1197                         Assert(newattno[var->varattno - 1] > 0);
1198                         var->varattno = newattno[var->varattno - 1];
1199                 }
1200                 return false;
1201         }
1202         return expression_tree_walker(node, change_varattnos_walker,
1203                                                                   (void *) newattno);
1204 }
1205
1206 /*
1207  * StoreCatalogInheritance
1208  *              Updates the system catalogs with proper inheritance information.
1209  *
1210  * supers is a list of the OIDs of the new relation's direct ancestors.
1211  */
1212 static void
1213 StoreCatalogInheritance(Oid relationId, List *supers)
1214 {
1215         Relation        relation;
1216         int16           seqNumber;
1217         ListCell   *entry;
1218
1219         /*
1220          * sanity checks
1221          */
1222         AssertArg(OidIsValid(relationId));
1223
1224         if (supers == NIL)
1225                 return;
1226
1227         /*
1228          * Store INHERITS information in pg_inherits using direct ancestors only.
1229          * Also enter dependencies on the direct ancestors, and make sure they are
1230          * marked with relhassubclass = true.
1231          *
1232          * (Once upon a time, both direct and indirect ancestors were found here
1233          * and then entered into pg_ipl.  Since that catalog doesn't exist
1234          * anymore, there's no need to look for indirect ancestors.)
1235          */
1236         relation = heap_open(InheritsRelationId, RowExclusiveLock);
1237
1238         seqNumber = 1;
1239         foreach(entry, supers)
1240         {
1241                 StoreCatalogInheritance1(relationId, lfirst_oid(entry), seqNumber, relation);
1242                 seqNumber++;
1243         }
1244
1245         heap_close(relation, RowExclusiveLock);
1246 }
1247
1248 static void
1249 StoreCatalogInheritance1(Oid relationId, Oid parentOid,
1250                                                  int16 seqNumber, Relation relation) 
1251 {
1252         Datum                   datum[Natts_pg_inherits];
1253         char                    nullarr[Natts_pg_inherits];
1254         ObjectAddress   childobject,
1255                                         parentobject;
1256         HeapTuple               tuple;
1257         TupleDesc               desc = RelationGetDescr(relation);
1258
1259         datum[0] = ObjectIdGetDatum(relationId);        /* inhrel */
1260         datum[1] = ObjectIdGetDatum(parentOid);         /* inhparent */
1261         datum[2] = Int16GetDatum(seqNumber);            /* inhseqno */
1262
1263         nullarr[0] = ' ';
1264         nullarr[1] = ' ';
1265         nullarr[2] = ' ';
1266
1267         tuple = heap_formtuple(desc, datum, nullarr);
1268
1269         simple_heap_insert(relation, tuple);
1270
1271         CatalogUpdateIndexes(relation, tuple);
1272
1273         heap_freetuple(tuple);
1274
1275         /*
1276          * Store a dependency too
1277          */
1278         parentobject.classId = RelationRelationId;
1279         parentobject.objectId = parentOid;
1280         parentobject.objectSubId = 0;
1281         childobject.classId = RelationRelationId;
1282         childobject.objectId = relationId;
1283         childobject.objectSubId = 0;
1284
1285         recordDependencyOn(&childobject, &parentobject, DEPENDENCY_NORMAL);
1286
1287         /*
1288          * Mark the parent as having subclasses.
1289          */
1290         setRelhassubclassInRelation(parentOid, true);
1291 }
1292
1293 /*
1294  * Look for an existing schema entry with the given name.
1295  *
1296  * Returns the index (starting with 1) if attribute already exists in schema,
1297  * 0 if it doesn't.
1298  */
1299 static int
1300 findAttrByName(const char *attributeName, List *schema)
1301 {
1302         ListCell   *s;
1303         int                     i = 1;
1304
1305         foreach(s, schema)
1306         {
1307                 ColumnDef  *def = lfirst(s);
1308
1309                 if (strcmp(attributeName, def->colname) == 0)
1310                         return i;
1311
1312                 i++;
1313         }
1314         return 0;
1315 }
1316
1317 /*
1318  * Update a relation's pg_class.relhassubclass entry to the given value
1319  */
1320 static void
1321 setRelhassubclassInRelation(Oid relationId, bool relhassubclass)
1322 {
1323         Relation        relationRelation;
1324         HeapTuple       tuple;
1325         Form_pg_class classtuple;
1326
1327         /*
1328          * Fetch a modifiable copy of the tuple, modify it, update pg_class.
1329          *
1330          * If the tuple already has the right relhassubclass setting, we don't
1331          * need to update it, but we still need to issue an SI inval message.
1332          */
1333         relationRelation = heap_open(RelationRelationId, RowExclusiveLock);
1334         tuple = SearchSysCacheCopy(RELOID,
1335                                                            ObjectIdGetDatum(relationId),
1336                                                            0, 0, 0);
1337         if (!HeapTupleIsValid(tuple))
1338                 elog(ERROR, "cache lookup failed for relation %u", relationId);
1339         classtuple = (Form_pg_class) GETSTRUCT(tuple);
1340
1341         if (classtuple->relhassubclass != relhassubclass)
1342         {
1343                 classtuple->relhassubclass = relhassubclass;
1344                 simple_heap_update(relationRelation, &tuple->t_self, tuple);
1345
1346                 /* keep the catalog indexes up to date */
1347                 CatalogUpdateIndexes(relationRelation, tuple);
1348         }
1349         else
1350         {
1351                 /* no need to change tuple, but force relcache rebuild anyway */
1352                 CacheInvalidateRelcacheByTuple(tuple);
1353         }
1354
1355         heap_freetuple(tuple);
1356         heap_close(relationRelation, RowExclusiveLock);
1357 }
1358
1359
1360 /*
1361  *              renameatt               - changes the name of a attribute in a relation
1362  *
1363  *              Attname attribute is changed in attribute catalog.
1364  *              No record of the previous attname is kept (correct?).
1365  *
1366  *              get proper relrelation from relation catalog (if not arg)
1367  *              scan attribute catalog
1368  *                              for name conflict (within rel)
1369  *                              for original attribute (if not arg)
1370  *              modify attname in attribute tuple
1371  *              insert modified attribute in attribute catalog
1372  *              delete original attribute from attribute catalog
1373  */
1374 void
1375 renameatt(Oid myrelid,
1376                   const char *oldattname,
1377                   const char *newattname,
1378                   bool recurse,
1379                   bool recursing)
1380 {
1381         Relation        targetrelation;
1382         Relation        attrelation;
1383         HeapTuple       atttup;
1384         Form_pg_attribute attform;
1385         int                     attnum;
1386         List       *indexoidlist;
1387         ListCell   *indexoidscan;
1388
1389         /*
1390          * Grab an exclusive lock on the target table, which we will NOT release
1391          * until end of transaction.
1392          */
1393         targetrelation = relation_open(myrelid, AccessExclusiveLock);
1394
1395         /*
1396          * permissions checking.  this would normally be done in utility.c, but
1397          * this particular routine is recursive.
1398          *
1399          * normally, only the owner of a class can change its schema.
1400          */
1401         if (!pg_class_ownercheck(myrelid, GetUserId()))
1402                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
1403                                            RelationGetRelationName(targetrelation));
1404         if (!allowSystemTableMods && IsSystemRelation(targetrelation))
1405                 ereport(ERROR,
1406                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1407                                  errmsg("permission denied: \"%s\" is a system catalog",
1408                                                 RelationGetRelationName(targetrelation))));
1409
1410         /*
1411          * if the 'recurse' flag is set then we are supposed to rename this
1412          * attribute in all classes that inherit from 'relname' (as well as in
1413          * 'relname').
1414          *
1415          * any permissions or problems with duplicate attributes will cause the
1416          * whole transaction to abort, which is what we want -- all or nothing.
1417          */
1418         if (recurse)
1419         {
1420                 ListCell   *child;
1421                 List       *children;
1422
1423                 /* this routine is actually in the planner */
1424                 children = find_all_inheritors(myrelid);
1425
1426                 /*
1427                  * find_all_inheritors does the recursive search of the inheritance
1428                  * hierarchy, so all we have to do is process all of the relids in the
1429                  * list that it returns.
1430                  */
1431                 foreach(child, children)
1432                 {
1433                         Oid                     childrelid = lfirst_oid(child);
1434
1435                         if (childrelid == myrelid)
1436                                 continue;
1437                         /* note we need not recurse again */
1438                         renameatt(childrelid, oldattname, newattname, false, true);
1439                 }
1440         }
1441         else
1442         {
1443                 /*
1444                  * If we are told not to recurse, there had better not be any child
1445                  * tables; else the rename would put them out of step.
1446                  */
1447                 if (!recursing &&
1448                         find_inheritance_children(myrelid) != NIL)
1449                         ereport(ERROR,
1450                                         (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
1451                                          errmsg("inherited column \"%s\" must be renamed in child tables too",
1452                                                         oldattname)));
1453         }
1454
1455         attrelation = heap_open(AttributeRelationId, RowExclusiveLock);
1456
1457         atttup = SearchSysCacheCopyAttName(myrelid, oldattname);
1458         if (!HeapTupleIsValid(atttup))
1459                 ereport(ERROR,
1460                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
1461                                  errmsg("column \"%s\" does not exist",
1462                                                 oldattname)));
1463         attform = (Form_pg_attribute) GETSTRUCT(atttup);
1464
1465         attnum = attform->attnum;
1466         if (attnum <= 0)
1467                 ereport(ERROR,
1468                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1469                                  errmsg("cannot rename system column \"%s\"",
1470                                                 oldattname)));
1471
1472         /*
1473          * if the attribute is inherited, forbid the renaming, unless we are
1474          * already inside a recursive rename.
1475          */
1476         if (attform->attinhcount > 0 && !recursing)
1477                 ereport(ERROR,
1478                                 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
1479                                  errmsg("cannot rename inherited column \"%s\"",
1480                                                 oldattname)));
1481
1482         /* should not already exist */
1483         /* this test is deliberately not attisdropped-aware */
1484         if (SearchSysCacheExists(ATTNAME,
1485                                                          ObjectIdGetDatum(myrelid),
1486                                                          PointerGetDatum(newattname),
1487                                                          0, 0))
1488                 ereport(ERROR,
1489                                 (errcode(ERRCODE_DUPLICATE_COLUMN),
1490                                  errmsg("column \"%s\" of relation \"%s\" already exists",
1491                                           newattname, RelationGetRelationName(targetrelation))));
1492
1493         namestrcpy(&(attform->attname), newattname);
1494
1495         simple_heap_update(attrelation, &atttup->t_self, atttup);
1496
1497         /* keep system catalog indexes current */
1498         CatalogUpdateIndexes(attrelation, atttup);
1499
1500         heap_freetuple(atttup);
1501
1502         /*
1503          * Update column names of indexes that refer to the column being renamed.
1504          */
1505         indexoidlist = RelationGetIndexList(targetrelation);
1506
1507         foreach(indexoidscan, indexoidlist)
1508         {
1509                 Oid                     indexoid = lfirst_oid(indexoidscan);
1510                 HeapTuple       indextup;
1511                 Form_pg_index indexform;
1512                 int                     i;
1513
1514                 /*
1515                  * Scan through index columns to see if there's any simple index
1516                  * entries for this attribute.  We ignore expressional entries.
1517                  */
1518                 indextup = SearchSysCache(INDEXRELID,
1519                                                                   ObjectIdGetDatum(indexoid),
1520                                                                   0, 0, 0);
1521                 if (!HeapTupleIsValid(indextup))
1522                         elog(ERROR, "cache lookup failed for index %u", indexoid);
1523                 indexform = (Form_pg_index) GETSTRUCT(indextup);
1524
1525                 for (i = 0; i < indexform->indnatts; i++)
1526                 {
1527                         if (attnum != indexform->indkey.values[i])
1528                                 continue;
1529
1530                         /*
1531                          * Found one, rename it.
1532                          */
1533                         atttup = SearchSysCacheCopy(ATTNUM,
1534                                                                                 ObjectIdGetDatum(indexoid),
1535                                                                                 Int16GetDatum(i + 1),
1536                                                                                 0, 0);
1537                         if (!HeapTupleIsValid(atttup))
1538                                 continue;               /* should we raise an error? */
1539
1540                         /*
1541                          * Update the (copied) attribute tuple.
1542                          */
1543                         namestrcpy(&(((Form_pg_attribute) GETSTRUCT(atttup))->attname),
1544                                            newattname);
1545
1546                         simple_heap_update(attrelation, &atttup->t_self, atttup);
1547
1548                         /* keep system catalog indexes current */
1549                         CatalogUpdateIndexes(attrelation, atttup);
1550
1551                         heap_freetuple(atttup);
1552                 }
1553
1554                 ReleaseSysCache(indextup);
1555         }
1556
1557         list_free(indexoidlist);
1558
1559         heap_close(attrelation, RowExclusiveLock);
1560
1561         /*
1562          * Update att name in any RI triggers associated with the relation.
1563          */
1564         if (targetrelation->rd_rel->reltriggers > 0)
1565         {
1566                 /* update tgargs column reference where att is primary key */
1567                 update_ri_trigger_args(RelationGetRelid(targetrelation),
1568                                                            oldattname, newattname,
1569                                                            false, false);
1570                 /* update tgargs column reference where att is foreign key */
1571                 update_ri_trigger_args(RelationGetRelid(targetrelation),
1572                                                            oldattname, newattname,
1573                                                            true, false);
1574         }
1575
1576         relation_close(targetrelation, NoLock);         /* close rel but keep lock */
1577 }
1578
1579 /*
1580  *              renamerel               - change the name of a relation
1581  *
1582  *              XXX - When renaming sequences, we don't bother to modify the
1583  *                        sequence name that is stored within the sequence itself
1584  *                        (this would cause problems with MVCC). In the future,
1585  *                        the sequence name should probably be removed from the
1586  *                        sequence, AFAIK there's no need for it to be there.
1587  */
1588 void
1589 renamerel(Oid myrelid, const char *newrelname)
1590 {
1591         Relation        targetrelation;
1592         Relation        relrelation;    /* for RELATION relation */
1593         HeapTuple       reltup;
1594         Oid                     namespaceId;
1595         char       *oldrelname;
1596         char            relkind;
1597         bool            relhastriggers;
1598
1599         /*
1600          * Grab an exclusive lock on the target table or index, which we will NOT
1601          * release until end of transaction.
1602          */
1603         targetrelation = relation_open(myrelid, AccessExclusiveLock);
1604
1605         oldrelname = pstrdup(RelationGetRelationName(targetrelation));
1606         namespaceId = RelationGetNamespace(targetrelation);
1607
1608         if (!allowSystemTableMods && IsSystemRelation(targetrelation))
1609                 ereport(ERROR,
1610                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1611                                  errmsg("permission denied: \"%s\" is a system catalog",
1612                                                 RelationGetRelationName(targetrelation))));
1613
1614         relkind = targetrelation->rd_rel->relkind;
1615         relhastriggers = (targetrelation->rd_rel->reltriggers > 0);
1616
1617         /*
1618          * Find relation's pg_class tuple, and make sure newrelname isn't in use.
1619          */
1620         relrelation = heap_open(RelationRelationId, RowExclusiveLock);
1621
1622         reltup = SearchSysCacheCopy(RELOID,
1623                                                                 PointerGetDatum(myrelid),
1624                                                                 0, 0, 0);
1625         if (!HeapTupleIsValid(reltup))          /* shouldn't happen */
1626                 elog(ERROR, "cache lookup failed for relation %u", myrelid);
1627
1628         if (get_relname_relid(newrelname, namespaceId) != InvalidOid)
1629                 ereport(ERROR,
1630                                 (errcode(ERRCODE_DUPLICATE_TABLE),
1631                                  errmsg("relation \"%s\" already exists",
1632                                                 newrelname)));
1633
1634         /*
1635          * Update pg_class tuple with new relname.      (Scribbling on reltup is OK
1636          * because it's a copy...)
1637          */
1638         namestrcpy(&(((Form_pg_class) GETSTRUCT(reltup))->relname), newrelname);
1639
1640         simple_heap_update(relrelation, &reltup->t_self, reltup);
1641
1642         /* keep the system catalog indexes current */
1643         CatalogUpdateIndexes(relrelation, reltup);
1644
1645         heap_freetuple(reltup);
1646         heap_close(relrelation, RowExclusiveLock);
1647
1648         /*
1649          * Also rename the associated type, if any.
1650          */
1651         if (relkind != RELKIND_INDEX)
1652                 TypeRename(oldrelname, namespaceId, newrelname);
1653
1654         /*
1655          * Update rel name in any RI triggers associated with the relation.
1656          */
1657         if (relhastriggers)
1658         {
1659                 /* update tgargs where relname is primary key */
1660                 update_ri_trigger_args(myrelid,
1661                                                            oldrelname,
1662                                                            newrelname,
1663                                                            false, true);
1664                 /* update tgargs where relname is foreign key */
1665                 update_ri_trigger_args(myrelid,
1666                                                            oldrelname,
1667                                                            newrelname,
1668                                                            true, true);
1669         }
1670
1671         /*
1672          * Close rel, but keep exclusive lock!
1673          */
1674         relation_close(targetrelation, NoLock);
1675 }
1676
1677 /*
1678  * Scan pg_trigger for RI triggers that are on the specified relation
1679  * (if fk_scan is false) or have it as the tgconstrrel (if fk_scan
1680  * is true).  Update RI trigger args fields matching oldname to contain
1681  * newname instead.  If update_relname is true, examine the relname
1682  * fields; otherwise examine the attname fields.
1683  */
1684 static void
1685 update_ri_trigger_args(Oid relid,
1686                                            const char *oldname,
1687                                            const char *newname,
1688                                            bool fk_scan,
1689                                            bool update_relname)
1690 {
1691         Relation        tgrel;
1692         ScanKeyData skey[1];
1693         SysScanDesc trigscan;
1694         HeapTuple       tuple;
1695         Datum           values[Natts_pg_trigger];
1696         char            nulls[Natts_pg_trigger];
1697         char            replaces[Natts_pg_trigger];
1698
1699         tgrel = heap_open(TriggerRelationId, RowExclusiveLock);
1700         if (fk_scan)
1701         {
1702                 ScanKeyInit(&skey[0],
1703                                         Anum_pg_trigger_tgconstrrelid,
1704                                         BTEqualStrategyNumber, F_OIDEQ,
1705                                         ObjectIdGetDatum(relid));
1706                 trigscan = systable_beginscan(tgrel, TriggerConstrRelidIndexId,
1707                                                                           true, SnapshotNow,
1708                                                                           1, skey);
1709         }
1710         else
1711         {
1712                 ScanKeyInit(&skey[0],
1713                                         Anum_pg_trigger_tgrelid,
1714                                         BTEqualStrategyNumber, F_OIDEQ,
1715                                         ObjectIdGetDatum(relid));
1716                 trigscan = systable_beginscan(tgrel, TriggerRelidNameIndexId,
1717                                                                           true, SnapshotNow,
1718                                                                           1, skey);
1719         }
1720
1721         while ((tuple = systable_getnext(trigscan)) != NULL)
1722         {
1723                 Form_pg_trigger pg_trigger = (Form_pg_trigger) GETSTRUCT(tuple);
1724                 bytea      *val;
1725                 bytea      *newtgargs;
1726                 bool            isnull;
1727                 int                     tg_type;
1728                 bool            examine_pk;
1729                 bool            changed;
1730                 int                     tgnargs;
1731                 int                     i;
1732                 int                     newlen;
1733                 const char *arga[RI_MAX_ARGUMENTS];
1734                 const char *argp;
1735
1736                 tg_type = RI_FKey_trigger_type(pg_trigger->tgfoid);
1737                 if (tg_type == RI_TRIGGER_NONE)
1738                 {
1739                         /* Not an RI trigger, forget it */
1740                         continue;
1741                 }
1742
1743                 /*
1744                  * It is an RI trigger, so parse the tgargs bytea.
1745                  *
1746                  * NB: we assume the field will never be compressed or moved out of
1747                  * line; so does trigger.c ...
1748                  */
1749                 tgnargs = pg_trigger->tgnargs;
1750                 val = (bytea *)
1751                         DatumGetPointer(fastgetattr(tuple,
1752                                                                                 Anum_pg_trigger_tgargs,
1753                                                                                 tgrel->rd_att, &isnull));
1754                 if (isnull || tgnargs < RI_FIRST_ATTNAME_ARGNO ||
1755                         tgnargs > RI_MAX_ARGUMENTS)
1756                 {
1757                         /* This probably shouldn't happen, but ignore busted triggers */
1758                         continue;
1759                 }
1760                 argp = (const char *) VARDATA(val);
1761                 for (i = 0; i < tgnargs; i++)
1762                 {
1763                         arga[i] = argp;
1764                         argp += strlen(argp) + 1;
1765                 }
1766
1767                 /*
1768                  * Figure out which item(s) to look at.  If the trigger is primary-key
1769                  * type and attached to my rel, I should look at the PK fields; if it
1770                  * is foreign-key type and attached to my rel, I should look at the FK
1771                  * fields.      But the opposite rule holds when examining triggers found
1772                  * by tgconstrrel search.
1773                  */
1774                 examine_pk = (tg_type == RI_TRIGGER_PK) == (!fk_scan);
1775
1776                 changed = false;
1777                 if (update_relname)
1778                 {
1779                         /* Change the relname if needed */
1780                         i = examine_pk ? RI_PK_RELNAME_ARGNO : RI_FK_RELNAME_ARGNO;
1781                         if (strcmp(arga[i], oldname) == 0)
1782                         {
1783                                 arga[i] = newname;
1784                                 changed = true;
1785                         }
1786                 }
1787                 else
1788                 {
1789                         /* Change attname(s) if needed */
1790                         i = examine_pk ? RI_FIRST_ATTNAME_ARGNO + RI_KEYPAIR_PK_IDX :
1791                                 RI_FIRST_ATTNAME_ARGNO + RI_KEYPAIR_FK_IDX;
1792                         for (; i < tgnargs; i += 2)
1793                         {
1794                                 if (strcmp(arga[i], oldname) == 0)
1795                                 {
1796                                         arga[i] = newname;
1797                                         changed = true;
1798                                 }
1799                         }
1800                 }
1801
1802                 if (!changed)
1803                 {
1804                         /* Don't need to update this tuple */
1805                         continue;
1806                 }
1807
1808                 /*
1809                  * Construct modified tgargs bytea.
1810                  */
1811                 newlen = VARHDRSZ;
1812                 for (i = 0; i < tgnargs; i++)
1813                         newlen += strlen(arga[i]) + 1;
1814                 newtgargs = (bytea *) palloc(newlen);
1815                 VARATT_SIZEP(newtgargs) = newlen;
1816                 newlen = VARHDRSZ;
1817                 for (i = 0; i < tgnargs; i++)
1818                 {
1819                         strcpy(((char *) newtgargs) + newlen, arga[i]);
1820                         newlen += strlen(arga[i]) + 1;
1821                 }
1822
1823                 /*
1824                  * Build modified tuple.
1825                  */
1826                 for (i = 0; i < Natts_pg_trigger; i++)
1827                 {
1828                         values[i] = (Datum) 0;
1829                         replaces[i] = ' ';
1830                         nulls[i] = ' ';
1831                 }
1832                 values[Anum_pg_trigger_tgargs - 1] = PointerGetDatum(newtgargs);
1833                 replaces[Anum_pg_trigger_tgargs - 1] = 'r';
1834
1835                 tuple = heap_modifytuple(tuple, RelationGetDescr(tgrel), values, nulls, replaces);
1836
1837                 /*
1838                  * Update pg_trigger and its indexes
1839                  */
1840                 simple_heap_update(tgrel, &tuple->t_self, tuple);
1841
1842                 CatalogUpdateIndexes(tgrel, tuple);
1843
1844                 /*
1845                  * Invalidate trigger's relation's relcache entry so that other
1846                  * backends (and this one too!) are sent SI message to make them
1847                  * rebuild relcache entries.  (Ideally this should happen
1848                  * automatically...)
1849                  *
1850                  * We can skip this for triggers on relid itself, since that relcache
1851                  * flush will happen anyway due to the table or column rename.  We
1852                  * just need to catch the far ends of RI relationships.
1853                  */
1854                 pg_trigger = (Form_pg_trigger) GETSTRUCT(tuple);
1855                 if (pg_trigger->tgrelid != relid)
1856                         CacheInvalidateRelcacheByRelid(pg_trigger->tgrelid);
1857
1858                 /* free up our scratch memory */
1859                 pfree(newtgargs);
1860                 heap_freetuple(tuple);
1861         }
1862
1863         systable_endscan(trigscan);
1864
1865         heap_close(tgrel, RowExclusiveLock);
1866
1867         /*
1868          * Increment cmd counter to make updates visible; this is needed in case
1869          * the same tuple has to be updated again by next pass (can happen in case
1870          * of a self-referential FK relationship).
1871          */
1872         CommandCounterIncrement();
1873 }
1874
1875 /*
1876  * AlterTable
1877  *              Execute ALTER TABLE, which can be a list of subcommands
1878  *
1879  * ALTER TABLE is performed in three phases:
1880  *              1. Examine subcommands and perform pre-transformation checking.
1881  *              2. Update system catalogs.
1882  *              3. Scan table(s) to check new constraints, and optionally recopy
1883  *                 the data into new table(s).
1884  * Phase 3 is not performed unless one or more of the subcommands requires
1885  * it.  The intention of this design is to allow multiple independent
1886  * updates of the table schema to be performed with only one pass over the
1887  * data.
1888  *
1889  * ATPrepCmd performs phase 1.  A "work queue" entry is created for
1890  * each table to be affected (there may be multiple affected tables if the
1891  * commands traverse a table inheritance hierarchy).  Also we do preliminary
1892  * validation of the subcommands, including parse transformation of those
1893  * expressions that need to be evaluated with respect to the old table
1894  * schema.
1895  *
1896  * ATRewriteCatalogs performs phase 2 for each affected table (note that
1897  * phases 2 and 3 do no explicit recursion, since phase 1 already did it).
1898  * Certain subcommands need to be performed before others to avoid
1899  * unnecessary conflicts; for example, DROP COLUMN should come before
1900  * ADD COLUMN.  Therefore phase 1 divides the subcommands into multiple
1901  * lists, one for each logical "pass" of phase 2.
1902  *
1903  * ATRewriteTables performs phase 3 for those tables that need it.
1904  *
1905  * Thanks to the magic of MVCC, an error anywhere along the way rolls back
1906  * the whole operation; we don't have to do anything special to clean up.
1907  */
1908 void
1909 AlterTable(AlterTableStmt *stmt)
1910 {
1911         ATController(relation_openrv(stmt->relation, AccessExclusiveLock),
1912                                  stmt->cmds,
1913                                  interpretInhOption(stmt->relation->inhOpt));
1914 }
1915
1916 /*
1917  * AlterTableInternal
1918  *
1919  * ALTER TABLE with target specified by OID
1920  */
1921 void
1922 AlterTableInternal(Oid relid, List *cmds, bool recurse)
1923 {
1924         ATController(relation_open(relid, AccessExclusiveLock),
1925                                  cmds,
1926                                  recurse);
1927 }
1928
1929 static void
1930 ATController(Relation rel, List *cmds, bool recurse)
1931 {
1932         List       *wqueue = NIL;
1933         ListCell   *lcmd;
1934
1935         /* Phase 1: preliminary examination of commands, create work queue */
1936         foreach(lcmd, cmds)
1937         {
1938                 AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lcmd);
1939
1940                 ATPrepCmd(&wqueue, rel, cmd, recurse, false);
1941         }
1942
1943         /* Close the relation, but keep lock until commit */
1944         relation_close(rel, NoLock);
1945
1946         /* Phase 2: update system catalogs */
1947         ATRewriteCatalogs(&wqueue);
1948
1949         /* Phase 3: scan/rewrite tables as needed */
1950         ATRewriteTables(&wqueue);
1951 }
1952
1953 /*
1954  * ATPrepCmd
1955  *
1956  * Traffic cop for ALTER TABLE Phase 1 operations, including simple
1957  * recursion and permission checks.
1958  *
1959  * Caller must have acquired AccessExclusiveLock on relation already.
1960  * This lock should be held until commit.
1961  */
1962 static void
1963 ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
1964                   bool recurse, bool recursing)
1965 {
1966         AlteredTableInfo *tab;
1967         int                     pass;
1968
1969         /* Find or create work queue entry for this table */
1970         tab = ATGetQueueEntry(wqueue, rel);
1971
1972         /*
1973          * Copy the original subcommand for each table.  This avoids conflicts
1974          * when different child tables need to make different parse
1975          * transformations (for example, the same column may have different column
1976          * numbers in different children).
1977          */
1978         cmd = copyObject(cmd);
1979
1980         /*
1981          * Do permissions checking, recursion to child tables if needed, and any
1982          * additional phase-1 processing needed.
1983          */
1984         switch (cmd->subtype)
1985         {
1986                 case AT_AddColumn:              /* ADD COLUMN */
1987                         ATSimplePermissions(rel, false);
1988                         /* Performs own recursion */
1989                         ATPrepAddColumn(wqueue, rel, recurse, cmd);
1990                         pass = AT_PASS_ADD_COL;
1991                         break;
1992                 case AT_ColumnDefault:  /* ALTER COLUMN DEFAULT */
1993
1994                         /*
1995                          * We allow defaults on views so that INSERT into a view can have
1996                          * default-ish behavior.  This works because the rewriter
1997                          * substitutes default values into INSERTs before it expands
1998                          * rules.
1999                          */
2000                         ATSimplePermissions(rel, true);
2001                         ATSimpleRecursion(wqueue, rel, cmd, recurse);
2002                         /* No command-specific prep needed */
2003                         pass = cmd->def ? AT_PASS_ADD_CONSTR : AT_PASS_DROP;
2004                         break;
2005                 case AT_DropNotNull:    /* ALTER COLUMN DROP NOT NULL */
2006                         ATSimplePermissions(rel, false);
2007                         ATSimpleRecursion(wqueue, rel, cmd, recurse);
2008                         /* No command-specific prep needed */
2009                         pass = AT_PASS_DROP;
2010                         break;
2011                 case AT_SetNotNull:             /* ALTER COLUMN SET NOT NULL */
2012                         ATSimplePermissions(rel, false);
2013                         ATSimpleRecursion(wqueue, rel, cmd, recurse);
2014                         /* No command-specific prep needed */
2015                         pass = AT_PASS_ADD_CONSTR;
2016                         break;
2017                 case AT_SetStatistics:  /* ALTER COLUMN STATISTICS */
2018                         ATSimpleRecursion(wqueue, rel, cmd, recurse);
2019                         /* Performs own permission checks */
2020                         ATPrepSetStatistics(rel, cmd->name, cmd->def);
2021                         pass = AT_PASS_COL_ATTRS;
2022                         break;
2023                 case AT_SetStorage:             /* ALTER COLUMN STORAGE */
2024                         ATSimplePermissions(rel, false);
2025                         ATSimpleRecursion(wqueue, rel, cmd, recurse);
2026                         /* No command-specific prep needed */
2027                         pass = AT_PASS_COL_ATTRS;
2028                         break;
2029                 case AT_DropColumn:             /* DROP COLUMN */
2030                         ATSimplePermissions(rel, false);
2031                         /* Recursion occurs during execution phase */
2032                         /* No command-specific prep needed except saving recurse flag */
2033                         if (recurse)
2034                                 cmd->subtype = AT_DropColumnRecurse;
2035                         pass = AT_PASS_DROP;
2036                         break;
2037                 case AT_AddIndex:               /* ADD INDEX */
2038                         ATSimplePermissions(rel, false);
2039                         /* This command never recurses */
2040                         /* No command-specific prep needed */
2041                         pass = AT_PASS_ADD_INDEX;
2042                         break;
2043                 case AT_AddConstraint:  /* ADD CONSTRAINT */
2044                         ATSimplePermissions(rel, false);
2045
2046                         /*
2047                          * Currently we recurse only for CHECK constraints, never for
2048                          * foreign-key constraints.  UNIQUE/PKEY constraints won't be seen
2049                          * here.
2050                          */
2051                         if (IsA(cmd->def, Constraint))
2052                                 ATSimpleRecursion(wqueue, rel, cmd, recurse);
2053                         /* No command-specific prep needed */
2054                         pass = AT_PASS_ADD_CONSTR;
2055                         break;
2056                 case AT_DropConstraint: /* DROP CONSTRAINT */
2057                         ATSimplePermissions(rel, false);
2058                         /* Performs own recursion */
2059                         ATPrepDropConstraint(wqueue, rel, recurse, cmd);
2060                         pass = AT_PASS_DROP;
2061                         break;
2062                 case AT_DropConstraintQuietly:  /* DROP CONSTRAINT for child */
2063                         ATSimplePermissions(rel, false);
2064                         ATSimpleRecursion(wqueue, rel, cmd, recurse);
2065                         /* No command-specific prep needed */
2066                         pass = AT_PASS_DROP;
2067                         break;
2068                 case AT_AlterColumnType:                /* ALTER COLUMN TYPE */
2069                         ATSimplePermissions(rel, false);
2070                         /* Performs own recursion */
2071                         ATPrepAlterColumnType(wqueue, tab, rel, recurse, recursing, cmd);
2072                         pass = AT_PASS_ALTER_TYPE;
2073                         break;
2074                 case AT_ChangeOwner:    /* ALTER OWNER */
2075                         /* This command never recurses */
2076                         /* No command-specific prep needed */
2077                         pass = AT_PASS_MISC;
2078                         break;
2079                 case AT_ClusterOn:              /* CLUSTER ON */
2080                 case AT_DropCluster:    /* SET WITHOUT CLUSTER */
2081                         ATSimplePermissions(rel, false);
2082                         /* These commands never recurse */
2083                         /* No command-specific prep needed */
2084                         pass = AT_PASS_MISC;
2085                         break;
2086                 case AT_DropOids:               /* SET WITHOUT OIDS */
2087                         ATSimplePermissions(rel, false);
2088                         /* Performs own recursion */
2089                         if (rel->rd_rel->relhasoids)
2090                         {
2091                                 AlterTableCmd *dropCmd = makeNode(AlterTableCmd);
2092
2093                                 dropCmd->subtype = AT_DropColumn;
2094                                 dropCmd->name = pstrdup("oid");
2095                                 dropCmd->behavior = cmd->behavior;
2096                                 ATPrepCmd(wqueue, rel, dropCmd, recurse, false);
2097                         }
2098                         pass = AT_PASS_DROP;
2099                         break;
2100                 case AT_SetTableSpace:  /* SET TABLESPACE */
2101                         ATSimplePermissionsRelationOrIndex(rel);
2102                         /* This command never recurses */
2103                         ATPrepSetTableSpace(tab, rel, cmd->name);
2104                         pass = AT_PASS_MISC;    /* doesn't actually matter */
2105                         break;
2106                 case AT_SetRelOptions:          /* SET (...) */
2107                 case AT_ResetRelOptions:        /* RESET (...) */
2108                         ATSimplePermissionsRelationOrIndex(rel);
2109                         /* This command never recurses */
2110                         /* No command-specific prep needed */
2111                         pass = AT_PASS_MISC;
2112                         break;
2113                 case AT_EnableTrig:             /* ENABLE TRIGGER variants */
2114                 case AT_EnableTrigAll:
2115                 case AT_EnableTrigUser:
2116                 case AT_DisableTrig:    /* DISABLE TRIGGER variants */
2117                 case AT_DisableTrigAll:
2118                 case AT_DisableTrigUser:
2119                 case AT_AddInherits:
2120                 case AT_DropInherits:
2121                         ATSimplePermissions(rel, false);
2122                         /* These commands never recurse */
2123                         /* No command-specific prep needed */
2124                         pass = AT_PASS_MISC;
2125                         break;
2126                 default:                                /* oops */
2127                         elog(ERROR, "unrecognized alter table type: %d",
2128                                  (int) cmd->subtype);
2129                         pass = 0;                       /* keep compiler quiet */
2130                         break;
2131         }
2132
2133         /* Add the subcommand to the appropriate list for phase 2 */
2134         tab->subcmds[pass] = lappend(tab->subcmds[pass], cmd);
2135 }
2136
2137 /*
2138  * ATRewriteCatalogs
2139  *
2140  * Traffic cop for ALTER TABLE Phase 2 operations.      Subcommands are
2141  * dispatched in a "safe" execution order (designed to avoid unnecessary
2142  * conflicts).
2143  */
2144 static void
2145 ATRewriteCatalogs(List **wqueue)
2146 {
2147         int                     pass;
2148         ListCell   *ltab;
2149
2150         /*
2151          * We process all the tables "in parallel", one pass at a time.  This is
2152          * needed because we may have to propagate work from one table to another
2153          * (specifically, ALTER TYPE on a foreign key's PK has to dispatch the
2154          * re-adding of the foreign key constraint to the other table).  Work can
2155          * only be propagated into later passes, however.
2156          */
2157         for (pass = 0; pass < AT_NUM_PASSES; pass++)
2158         {
2159                 /* Go through each table that needs to be processed */
2160                 foreach(ltab, *wqueue)
2161                 {
2162                         AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
2163                         List       *subcmds = tab->subcmds[pass];
2164                         Relation        rel;
2165                         ListCell   *lcmd;
2166
2167                         if (subcmds == NIL)
2168                                 continue;
2169
2170                         /*
2171                          * Exclusive lock was obtained by phase 1, needn't get it again
2172                          */
2173                         rel = relation_open(tab->relid, NoLock);
2174
2175                         foreach(lcmd, subcmds)
2176                                 ATExecCmd(tab, rel, (AlterTableCmd *) lfirst(lcmd));
2177
2178                         /*
2179                          * After the ALTER TYPE pass, do cleanup work (this is not done in
2180                          * ATExecAlterColumnType since it should be done only once if
2181                          * multiple columns of a table are altered).
2182                          */
2183                         if (pass == AT_PASS_ALTER_TYPE)
2184                                 ATPostAlterTypeCleanup(wqueue, tab);
2185
2186                         relation_close(rel, NoLock);
2187                 }
2188         }
2189
2190         /*
2191          * Check to see if a toast table must be added, if we executed any
2192          * subcommands that might have added a column or changed column storage.
2193          */
2194         foreach(ltab, *wqueue)
2195         {
2196                 AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
2197
2198                 if (tab->relkind == RELKIND_RELATION &&
2199                         (tab->subcmds[AT_PASS_ADD_COL] ||
2200                          tab->subcmds[AT_PASS_ALTER_TYPE] ||
2201                          tab->subcmds[AT_PASS_COL_ATTRS]))
2202                         AlterTableCreateToastTable(tab->relid);
2203         }
2204 }
2205
2206 /*
2207  * ATExecCmd: dispatch a subcommand to appropriate execution routine
2208  */
2209 static void
2210 ATExecCmd(AlteredTableInfo *tab, Relation rel, AlterTableCmd *cmd)
2211 {
2212         switch (cmd->subtype)
2213         {
2214                 case AT_AddColumn:              /* ADD COLUMN */
2215                         ATExecAddColumn(tab, rel, (ColumnDef *) cmd->def);
2216                         break;
2217                 case AT_ColumnDefault:  /* ALTER COLUMN DEFAULT */
2218                         ATExecColumnDefault(rel, cmd->name, cmd->def);
2219                         break;
2220                 case AT_DropNotNull:    /* ALTER COLUMN DROP NOT NULL */
2221                         ATExecDropNotNull(rel, cmd->name);
2222                         break;
2223                 case AT_SetNotNull:             /* ALTER COLUMN SET NOT NULL */
2224                         ATExecSetNotNull(tab, rel, cmd->name);
2225                         break;
2226                 case AT_SetStatistics:  /* ALTER COLUMN STATISTICS */
2227                         ATExecSetStatistics(rel, cmd->name, cmd->def);
2228                         break;
2229                 case AT_SetStorage:             /* ALTER COLUMN STORAGE */
2230                         ATExecSetStorage(rel, cmd->name, cmd->def);
2231                         break;
2232                 case AT_DropColumn:             /* DROP COLUMN */
2233                         ATExecDropColumn(rel, cmd->name, cmd->behavior, false, false);
2234                         break;
2235                 case AT_DropColumnRecurse:              /* DROP COLUMN with recursion */
2236                         ATExecDropColumn(rel, cmd->name, cmd->behavior, true, false);
2237                         break;
2238                 case AT_AddIndex:               /* ADD INDEX */
2239                         ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, false);
2240                         break;
2241                 case AT_ReAddIndex:             /* ADD INDEX */
2242                         ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, true);
2243                         break;
2244                 case AT_AddConstraint:  /* ADD CONSTRAINT */
2245                         ATExecAddConstraint(tab, rel, cmd->def);
2246                         break;
2247                 case AT_DropConstraint: /* DROP CONSTRAINT */
2248                         ATExecDropConstraint(rel, cmd->name, cmd->behavior, false);
2249                         break;
2250                 case AT_DropConstraintQuietly:  /* DROP CONSTRAINT for child */
2251                         ATExecDropConstraint(rel, cmd->name, cmd->behavior, true);
2252                         break;
2253                 case AT_AlterColumnType:                /* ALTER COLUMN TYPE */
2254                         ATExecAlterColumnType(tab, rel, cmd->name, (TypeName *) cmd->def);
2255                         break;
2256                 case AT_ChangeOwner:    /* ALTER OWNER */
2257                         ATExecChangeOwner(RelationGetRelid(rel),
2258                                                           get_roleid_checked(cmd->name),
2259                                                           false);
2260                         break;
2261                 case AT_ClusterOn:              /* CLUSTER ON */
2262                         ATExecClusterOn(rel, cmd->name);
2263                         break;
2264                 case AT_DropCluster:    /* SET WITHOUT CLUSTER */
2265                         ATExecDropCluster(rel);
2266                         break;
2267                 case AT_DropOids:               /* SET WITHOUT OIDS */
2268
2269                         /*
2270                          * Nothing to do here; we'll have generated a DropColumn
2271                          * subcommand to do the real work
2272                          */
2273                         break;
2274                 case AT_SetTableSpace:  /* SET TABLESPACE */
2275
2276                         /*
2277                          * Nothing to do here; Phase 3 does the work
2278                          */
2279                         break;
2280                 case AT_SetRelOptions:          /* SET (...) */
2281                         ATExecSetRelOptions(rel, (List *) cmd->def, false);
2282                         break;
2283                 case AT_ResetRelOptions:        /* RESET (...) */
2284                         ATExecSetRelOptions(rel, (List *) cmd->def, true);
2285                         break;
2286                 case AT_EnableTrig:             /* ENABLE TRIGGER name */
2287                         ATExecEnableDisableTrigger(rel, cmd->name, true, false);
2288                         break;
2289                 case AT_DisableTrig:    /* DISABLE TRIGGER name */
2290                         ATExecEnableDisableTrigger(rel, cmd->name, false, false);
2291                         break;
2292                 case AT_EnableTrigAll:  /* ENABLE TRIGGER ALL */
2293                         ATExecEnableDisableTrigger(rel, NULL, true, false);
2294                         break;
2295                 case AT_DisableTrigAll: /* DISABLE TRIGGER ALL */
2296                         ATExecEnableDisableTrigger(rel, NULL, false, false);
2297                         break;
2298                 case AT_EnableTrigUser: /* ENABLE TRIGGER USER */
2299                         ATExecEnableDisableTrigger(rel, NULL, true, true);
2300                         break;
2301                 case AT_DisableTrigUser:                /* DISABLE TRIGGER USER */
2302                         ATExecEnableDisableTrigger(rel, NULL, false, true);
2303                         break;
2304                 case AT_DropInherits:
2305                         ATExecDropInherits(rel, cmd->parent);
2306                         break;
2307                 case AT_AddInherits:
2308                         ATExecAddInherits(rel, cmd->parent);
2309                         break;
2310                 default:                                /* oops */
2311                         elog(ERROR, "unrecognized alter table type: %d",
2312                                  (int) cmd->subtype);
2313                         break;
2314         }
2315
2316         /*
2317          * Bump the command counter to ensure the next subcommand in the sequence
2318          * can see the changes so far
2319          */
2320         CommandCounterIncrement();
2321 }
2322
2323 /*
2324  * ATRewriteTables: ALTER TABLE phase 3
2325  */
2326 static void
2327 ATRewriteTables(List **wqueue)
2328 {
2329         ListCell   *ltab;
2330
2331         /* Go through each table that needs to be checked or rewritten */
2332         foreach(ltab, *wqueue)
2333         {
2334                 AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
2335
2336                 /*
2337                  * We only need to rewrite the table if at least one column needs to
2338                  * be recomputed.
2339                  */
2340                 if (tab->newvals != NIL)
2341                 {
2342                         /* Build a temporary relation and copy data */
2343                         Oid                     OIDNewHeap;
2344                         char            NewHeapName[NAMEDATALEN];
2345                         Oid                     NewTableSpace;
2346                         Relation        OldHeap;
2347                         ObjectAddress object;
2348
2349                         OldHeap = heap_open(tab->relid, NoLock);
2350
2351                         /*
2352                          * We can never allow rewriting of shared or nailed-in-cache
2353                          * relations, because we can't support changing their relfilenode
2354                          * values.
2355                          */
2356                         if (OldHeap->rd_rel->relisshared || OldHeap->rd_isnailed)
2357                                 ereport(ERROR,
2358                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2359                                                  errmsg("cannot rewrite system relation \"%s\"",
2360                                                                 RelationGetRelationName(OldHeap))));
2361
2362                         /*
2363                          * Don't allow rewrite on temp tables of other backends ... their
2364                          * local buffer manager is not going to cope.
2365                          */
2366                         if (isOtherTempNamespace(RelationGetNamespace(OldHeap)))
2367                                 ereport(ERROR,
2368                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2369                                 errmsg("cannot rewrite temporary tables of other sessions")));
2370
2371                         /*
2372                          * Select destination tablespace (same as original unless user
2373                          * requested a change)
2374                          */
2375                         if (tab->newTableSpace)
2376                                 NewTableSpace = tab->newTableSpace;
2377                         else
2378                                 NewTableSpace = OldHeap->rd_rel->reltablespace;
2379
2380                         heap_close(OldHeap, NoLock);
2381
2382                         /*
2383                          * Create the new heap, using a temporary name in the same
2384                          * namespace as the existing table.  NOTE: there is some risk of
2385                          * collision with user relnames.  Working around this seems more
2386                          * trouble than it's worth; in particular, we can't create the new
2387                          * heap in a different namespace from the old, or we will have
2388                          * problems with the TEMP status of temp tables.
2389                          */
2390                         snprintf(NewHeapName, sizeof(NewHeapName),
2391                                          "pg_temp_%u", tab->relid);
2392
2393                         OIDNewHeap = make_new_heap(tab->relid, NewHeapName, NewTableSpace);
2394
2395                         /*
2396                          * Copy the heap data into the new table with the desired
2397                          * modifications, and test the current data within the table
2398                          * against new constraints generated by ALTER TABLE commands.
2399                          */
2400                         ATRewriteTable(tab, OIDNewHeap);
2401
2402                         /* Swap the physical files of the old and new heaps. */
2403                         swap_relation_files(tab->relid, OIDNewHeap);
2404
2405                         CommandCounterIncrement();
2406
2407                         /* Destroy new heap with old filenode */
2408                         object.classId = RelationRelationId;
2409                         object.objectId = OIDNewHeap;
2410                         object.objectSubId = 0;
2411
2412                         /*
2413                          * The new relation is local to our transaction and we know
2414                          * nothing depends on it, so DROP_RESTRICT should be OK.
2415                          */
2416                         performDeletion(&object, DROP_RESTRICT);
2417                         /* performDeletion does CommandCounterIncrement at end */
2418
2419                         /*
2420                          * Rebuild each index on the relation (but not the toast table,
2421                          * which is all-new anyway).  We do not need
2422                          * CommandCounterIncrement() because reindex_relation does it.
2423                          */
2424                         reindex_relation(tab->relid, false);
2425                 }
2426                 else
2427                 {
2428                         /*
2429                          * Test the current data within the table against new constraints
2430                          * generated by ALTER TABLE commands, but don't rebuild data.
2431                          */
2432                         if (tab->constraints != NIL || tab->new_notnull)
2433                                 ATRewriteTable(tab, InvalidOid);
2434
2435                         /*
2436                          * If we had SET TABLESPACE but no reason to reconstruct tuples,
2437                          * just do a block-by-block copy.
2438                          */
2439                         if (tab->newTableSpace)
2440                                 ATExecSetTableSpace(tab->relid, tab->newTableSpace);
2441                 }
2442         }
2443
2444         /*
2445          * Foreign key constraints are checked in a final pass, since (a) it's
2446          * generally best to examine each one separately, and (b) it's at least
2447          * theoretically possible that we have changed both relations of the
2448          * foreign key, and we'd better have finished both rewrites before we try
2449          * to read the tables.
2450          */
2451         foreach(ltab, *wqueue)
2452         {
2453                 AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
2454                 Relation        rel = NULL;
2455                 ListCell   *lcon;
2456
2457                 foreach(lcon, tab->constraints)
2458                 {
2459                         NewConstraint *con = lfirst(lcon);
2460
2461                         if (con->contype == CONSTR_FOREIGN)
2462                         {
2463                                 FkConstraint *fkconstraint = (FkConstraint *) con->qual;
2464                                 Relation        refrel;
2465
2466                                 if (rel == NULL)
2467                                 {
2468                                         /* Long since locked, no need for another */
2469                                         rel = heap_open(tab->relid, NoLock);
2470                                 }
2471
2472                                 refrel = heap_open(con->refrelid, RowShareLock);
2473
2474                                 validateForeignKeyConstraint(fkconstraint, rel, refrel);
2475
2476                                 heap_close(refrel, NoLock);
2477                         }
2478                 }
2479
2480                 if (rel)
2481                         heap_close(rel, NoLock);
2482         }
2483 }
2484
2485 /*
2486  * ATRewriteTable: scan or rewrite one table
2487  *
2488  * OIDNewHeap is InvalidOid if we don't need to rewrite
2489  */
2490 static void
2491 ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap)
2492 {
2493         Relation        oldrel;
2494         Relation        newrel;
2495         TupleDesc       oldTupDesc;
2496         TupleDesc       newTupDesc;
2497         bool            needscan = false;
2498         List       *notnull_attrs;
2499         int                     i;
2500         ListCell   *l;
2501         EState     *estate;
2502
2503         /*
2504          * Open the relation(s).  We have surely already locked the existing
2505          * table.
2506          */
2507         oldrel = heap_open(tab->relid, NoLock);
2508         oldTupDesc = tab->oldDesc;
2509         newTupDesc = RelationGetDescr(oldrel);          /* includes all mods */
2510
2511         if (OidIsValid(OIDNewHeap))
2512                 newrel = heap_open(OIDNewHeap, AccessExclusiveLock);
2513         else
2514                 newrel = NULL;
2515
2516         /*
2517          * If we need to rewrite the table, the operation has to be propagated to
2518          * tables that use this table's rowtype as a column type.
2519          *
2520          * (Eventually this will probably become true for scans as well, but at
2521          * the moment a composite type does not enforce any constraints, so it's
2522          * not necessary/appropriate to enforce them just during ALTER.)
2523          */
2524         if (newrel)
2525                 find_composite_type_dependencies(oldrel->rd_rel->reltype,
2526                                                                                  RelationGetRelationName(oldrel));
2527
2528         /*
2529          * Generate the constraint and default execution states
2530          */
2531
2532         estate = CreateExecutorState();
2533
2534         /* Build the needed expression execution states */
2535         foreach(l, tab->constraints)
2536         {
2537                 NewConstraint *con = lfirst(l);
2538
2539                 switch (con->contype)
2540                 {
2541                         case CONSTR_CHECK:
2542                                 needscan = true;
2543                                 con->qualstate = (List *)
2544                                         ExecPrepareExpr((Expr *) con->qual, estate);
2545                                 break;
2546                         case CONSTR_FOREIGN:
2547                                 /* Nothing to do here */
2548                                 break;
2549                         default:
2550                                 elog(ERROR, "unrecognized constraint type: %d",
2551                                          (int) con->contype);
2552                 }
2553         }
2554
2555         foreach(l, tab->newvals)
2556         {
2557                 NewColumnValue *ex = lfirst(l);
2558
2559                 needscan = true;
2560
2561                 ex->exprstate = ExecPrepareExpr((Expr *) ex->expr, estate);
2562         }
2563
2564         notnull_attrs = NIL;
2565         if (newrel || tab->new_notnull)
2566         {
2567                 /*
2568                  * If we are rebuilding the tuples OR if we added any new NOT NULL
2569                  * constraints, check all not-null constraints.  This is a bit of
2570                  * overkill but it minimizes risk of bugs, and heap_attisnull is
2571                  * a pretty cheap test anyway.
2572                  */
2573                 for (i = 0; i < newTupDesc->natts; i++)
2574                 {
2575                         if (newTupDesc->attrs[i]->attnotnull &&
2576                                 !newTupDesc->attrs[i]->attisdropped)
2577                                 notnull_attrs = lappend_int(notnull_attrs, i);
2578                 }
2579                 if (notnull_attrs)
2580                         needscan = true;
2581         }
2582
2583         if (needscan)
2584         {
2585                 ExprContext *econtext;
2586                 Datum      *values;
2587                 bool       *isnull;
2588                 TupleTableSlot *oldslot;
2589                 TupleTableSlot *newslot;
2590                 HeapScanDesc scan;
2591                 HeapTuple       tuple;
2592                 MemoryContext oldCxt;
2593                 List       *dropped_attrs = NIL;
2594                 ListCell   *lc;
2595
2596                 econtext = GetPerTupleExprContext(estate);
2597
2598                 /*
2599                  * Make tuple slots for old and new tuples.  Note that even when the
2600                  * tuples are the same, the tupDescs might not be (consider ADD COLUMN
2601                  * without a default).
2602                  */
2603                 oldslot = MakeSingleTupleTableSlot(oldTupDesc);
2604                 newslot = MakeSingleTupleTableSlot(newTupDesc);
2605
2606                 /* Preallocate values/isnull arrays */
2607                 i = Max(newTupDesc->natts, oldTupDesc->natts);
2608                 values = (Datum *) palloc(i * sizeof(Datum));
2609                 isnull = (bool *) palloc(i * sizeof(bool));
2610                 memset(values, 0, i * sizeof(Datum));
2611                 memset(isnull, true, i * sizeof(bool));
2612
2613                 /*
2614                  * Any attributes that are dropped according to the new tuple
2615                  * descriptor can be set to NULL. We precompute the list of dropped
2616                  * attributes to avoid needing to do so in the per-tuple loop.
2617                  */
2618                 for (i = 0; i < newTupDesc->natts; i++)
2619                 {
2620                         if (newTupDesc->attrs[i]->attisdropped)
2621                                 dropped_attrs = lappend_int(dropped_attrs, i);
2622                 }
2623
2624                 /*
2625                  * Scan through the rows, generating a new row if needed and then
2626                  * checking all the constraints.
2627                  */
2628                 scan = heap_beginscan(oldrel, SnapshotNow, 0, NULL);
2629
2630                 /*
2631                  * Switch to per-tuple memory context and reset it for each tuple
2632                  * produced, so we don't leak memory.
2633                  */
2634                 oldCxt = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2635
2636                 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
2637                 {
2638                         if (newrel)
2639                         {
2640                                 Oid                     tupOid = InvalidOid;
2641
2642                                 /* Extract data from old tuple */
2643                                 heap_deform_tuple(tuple, oldTupDesc, values, isnull);
2644                                 if (oldTupDesc->tdhasoid)
2645                                         tupOid = HeapTupleGetOid(tuple);
2646
2647                                 /* Set dropped attributes to null in new tuple */
2648                                 foreach(lc, dropped_attrs)
2649                                         isnull[lfirst_int(lc)] = true;
2650
2651                                 /*
2652                                  * Process supplied expressions to replace selected columns.
2653                                  * Expression inputs come from the old tuple.
2654                                  */
2655                                 ExecStoreTuple(tuple, oldslot, InvalidBuffer, false);
2656                                 econtext->ecxt_scantuple = oldslot;
2657
2658                                 foreach(l, tab->newvals)
2659                                 {
2660                                         NewColumnValue *ex = lfirst(l);
2661
2662                                         values[ex->attnum - 1] = ExecEvalExpr(ex->exprstate,
2663                                                                                                                   econtext,
2664                                                                                                          &isnull[ex->attnum - 1],
2665                                                                                                                   NULL);
2666                                 }
2667
2668                                 /*
2669                                  * Form the new tuple. Note that we don't explicitly pfree it,
2670                                  * since the per-tuple memory context will be reset shortly.
2671                                  */
2672                                 tuple = heap_form_tuple(newTupDesc, values, isnull);
2673
2674                                 /* Preserve OID, if any */
2675                                 if (newTupDesc->tdhasoid)
2676                                         HeapTupleSetOid(tuple, tupOid);
2677                         }
2678
2679                         /* Now check any constraints on the possibly-changed tuple */
2680                         ExecStoreTuple(tuple, newslot, InvalidBuffer, false);
2681                         econtext->ecxt_scantuple = newslot;
2682
2683                         foreach(l, notnull_attrs)
2684                         {
2685                                 int             attn = lfirst_int(l);
2686
2687                                 if (heap_attisnull(tuple, attn+1))
2688                                         ereport(ERROR,
2689                                                         (errcode(ERRCODE_NOT_NULL_VIOLATION),
2690                                                          errmsg("column \"%s\" contains null values",
2691                                                                         NameStr(newTupDesc->attrs[attn]->attname))));
2692                         }
2693
2694                         foreach(l, tab->constraints)
2695                         {
2696                                 NewConstraint *con = lfirst(l);
2697
2698                                 switch (con->contype)
2699                                 {
2700                                         case CONSTR_CHECK:
2701                                                 if (!ExecQual(con->qualstate, econtext, true))
2702                                                         ereport(ERROR,
2703                                                                         (errcode(ERRCODE_CHECK_VIOLATION),
2704                                                                          errmsg("check constraint \"%s\" is violated by some row",
2705                                                                                         con->name)));
2706                                                 break;
2707                                         case CONSTR_FOREIGN:
2708                                                 /* Nothing to do here */
2709                                                 break;
2710                                         default:
2711                                                 elog(ERROR, "unrecognized constraint type: %d",
2712                                                          (int) con->contype);
2713                                 }
2714                         }
2715
2716                         /* Write the tuple out to the new relation */
2717                         if (newrel)
2718                                 simple_heap_insert(newrel, tuple);
2719
2720                         ResetExprContext(econtext);
2721
2722                         CHECK_FOR_INTERRUPTS();
2723                 }
2724
2725                 MemoryContextSwitchTo(oldCxt);
2726                 heap_endscan(scan);
2727
2728                 ExecDropSingleTupleTableSlot(oldslot);
2729                 ExecDropSingleTupleTableSlot(newslot);
2730         }
2731
2732         FreeExecutorState(estate);
2733
2734         heap_close(oldrel, NoLock);
2735         if (newrel)
2736                 heap_close(newrel, NoLock);
2737 }
2738
2739 /*
2740  * ATGetQueueEntry: find or create an entry in the ALTER TABLE work queue
2741  */
2742 static AlteredTableInfo *
2743 ATGetQueueEntry(List **wqueue, Relation rel)
2744 {
2745         Oid                     relid = RelationGetRelid(rel);
2746         AlteredTableInfo *tab;
2747         ListCell   *ltab;
2748
2749         foreach(ltab, *wqueue)
2750         {
2751                 tab = (AlteredTableInfo *) lfirst(ltab);
2752                 if (tab->relid == relid)
2753                         return tab;
2754         }
2755
2756         /*
2757          * Not there, so add it.  Note that we make a copy of the relation's
2758          * existing descriptor before anything interesting can happen to it.
2759          */
2760         tab = (AlteredTableInfo *) palloc0(sizeof(AlteredTableInfo));
2761         tab->relid = relid;
2762         tab->relkind = rel->rd_rel->relkind;
2763         tab->oldDesc = CreateTupleDescCopy(RelationGetDescr(rel));
2764
2765         *wqueue = lappend(*wqueue, tab);
2766
2767         return tab;
2768 }
2769
2770 /*
2771  * ATSimplePermissions
2772  *
2773  * - Ensure that it is a relation (or possibly a view)
2774  * - Ensure this user is the owner
2775  * - Ensure that it is not a system table
2776  */
2777 static void
2778 ATSimplePermissions(Relation rel, bool allowView)
2779 {
2780         if (rel->rd_rel->relkind != RELKIND_RELATION)
2781         {
2782                 if (allowView)
2783                 {
2784                         if (rel->rd_rel->relkind != RELKIND_VIEW)
2785                                 ereport(ERROR,
2786                                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2787                                                  errmsg("\"%s\" is not a table or view",
2788                                                                 RelationGetRelationName(rel))));
2789                 }
2790                 else
2791                         ereport(ERROR,
2792                                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2793                                          errmsg("\"%s\" is not a table",
2794                                                         RelationGetRelationName(rel))));
2795         }
2796
2797         /* Permissions checks */
2798         if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
2799                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
2800                                            RelationGetRelationName(rel));
2801
2802         if (!allowSystemTableMods && IsSystemRelation(rel))
2803                 ereport(ERROR,
2804                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2805                                  errmsg("permission denied: \"%s\" is a system catalog",
2806                                                 RelationGetRelationName(rel))));
2807 }
2808
2809 /*
2810  * ATSimplePermissionsRelationOrIndex
2811  *
2812  * - Ensure that it is a relation or an index
2813  * - Ensure this user is the owner
2814  * - Ensure that it is not a system table
2815  */
2816 static void
2817 ATSimplePermissionsRelationOrIndex(Relation rel)
2818 {
2819         if (rel->rd_rel->relkind != RELKIND_RELATION &&
2820                 rel->rd_rel->relkind != RELKIND_INDEX)
2821                 ereport(ERROR,
2822                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2823                                  errmsg("\"%s\" is not a table or index",
2824                                                 RelationGetRelationName(rel))));
2825
2826         /* Permissions checks */
2827         if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
2828                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
2829                                            RelationGetRelationName(rel));
2830
2831         if (!allowSystemTableMods && IsSystemRelation(rel))
2832                 ereport(ERROR,
2833                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2834                                  errmsg("permission denied: \"%s\" is a system catalog",
2835                                                 RelationGetRelationName(rel))));
2836 }
2837
2838 /*
2839  * ATSimpleRecursion
2840  *
2841  * Simple table recursion sufficient for most ALTER TABLE operations.
2842  * All direct and indirect children are processed in an unspecified order.
2843  * Note that if a child inherits from the original table via multiple
2844  * inheritance paths, it will be visited just once.
2845  */
2846 static void
2847 ATSimpleRecursion(List **wqueue, Relation rel,
2848                                   AlterTableCmd *cmd, bool recurse)
2849 {
2850         /*
2851          * Propagate to children if desired.  Non-table relations never have
2852          * children, so no need to search in that case.
2853          */
2854         if (recurse && rel->rd_rel->relkind == RELKIND_RELATION)
2855         {
2856                 Oid                     relid = RelationGetRelid(rel);
2857                 ListCell   *child;
2858                 List       *children;
2859
2860                 /* this routine is actually in the planner */
2861                 children = find_all_inheritors(relid);
2862
2863                 /*
2864                  * find_all_inheritors does the recursive search of the inheritance
2865                  * hierarchy, so all we have to do is process all of the relids in the
2866                  * list that it returns.
2867                  */
2868                 foreach(child, children)
2869                 {
2870                         Oid                     childrelid = lfirst_oid(child);
2871                         Relation        childrel;
2872
2873                         if (childrelid == relid)
2874                                 continue;
2875                         childrel = relation_open(childrelid, AccessExclusiveLock);
2876                         ATPrepCmd(wqueue, childrel, cmd, false, true);
2877                         relation_close(childrel, NoLock);
2878                 }
2879         }
2880 }
2881
2882 /*
2883  * ATOneLevelRecursion
2884  *
2885  * Here, we visit only direct inheritance children.  It is expected that
2886  * the command's prep routine will recurse again to find indirect children.
2887  * When using this technique, a multiply-inheriting child will be visited
2888  * multiple times.
2889  */
2890 static void
2891 ATOneLevelRecursion(List **wqueue, Relation rel,
2892                                         AlterTableCmd *cmd)
2893 {
2894         Oid                     relid = RelationGetRelid(rel);
2895         ListCell   *child;
2896         List       *children;
2897
2898         /* this routine is actually in the planner */
2899         children = find_inheritance_children(relid);
2900
2901         foreach(child, children)
2902         {
2903                 Oid                     childrelid = lfirst_oid(child);
2904                 Relation        childrel;
2905
2906                 childrel = relation_open(childrelid, AccessExclusiveLock);
2907                 ATPrepCmd(wqueue, childrel, cmd, true, true);
2908                 relation_close(childrel, NoLock);
2909         }
2910 }
2911
2912
2913 /*
2914  * find_composite_type_dependencies
2915  *
2916  * Check to see if a table's rowtype is being used as a column in some
2917  * other table (possibly nested several levels deep in composite types!).
2918  * Eventually, we'd like to propagate the check or rewrite operation
2919  * into other such tables, but for now, just error out if we find any.
2920  *
2921  * We assume that functions and views depending on the type are not reasons
2922  * to reject the ALTER.  (How safe is this really?)
2923  */
2924 static void
2925 find_composite_type_dependencies(Oid typeOid, const char *origTblName)
2926 {
2927         Relation        depRel;
2928         ScanKeyData key[2];
2929         SysScanDesc depScan;
2930         HeapTuple       depTup;
2931
2932         /*
2933          * We scan pg_depend to find those things that depend on the rowtype. (We
2934          * assume we can ignore refobjsubid for a rowtype.)
2935          */
2936         depRel = heap_open(DependRelationId, AccessShareLock);
2937
2938         ScanKeyInit(&key[0],
2939                                 Anum_pg_depend_refclassid,
2940                                 BTEqualStrategyNumber, F_OIDEQ,
2941                                 ObjectIdGetDatum(TypeRelationId));
2942         ScanKeyInit(&key[1],
2943                                 Anum_pg_depend_refobjid,
2944                                 BTEqualStrategyNumber, F_OIDEQ,
2945                                 ObjectIdGetDatum(typeOid));
2946
2947         depScan = systable_beginscan(depRel, DependReferenceIndexId, true,
2948                                                                  SnapshotNow, 2, key);
2949
2950         while (HeapTupleIsValid(depTup = systable_getnext(depScan)))
2951         {
2952                 Form_pg_depend pg_depend = (Form_pg_depend) GETSTRUCT(depTup);
2953                 Relation        rel;
2954                 Form_pg_attribute att;
2955
2956                 /* Ignore dependees that aren't user columns of relations */
2957                 /* (we assume system columns are never of rowtypes) */
2958                 if (pg_depend->classid != RelationRelationId ||
2959                         pg_depend->objsubid <= 0)
2960                         continue;
2961
2962                 rel = relation_open(pg_depend->objid, AccessShareLock);
2963                 att = rel->rd_att->attrs[pg_depend->objsubid - 1];
2964
2965                 if (rel->rd_rel->relkind == RELKIND_RELATION)
2966                 {
2967                         ereport(ERROR,
2968                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2969                                          errmsg("cannot alter table \"%s\" because column \"%s\".\"%s\" uses its rowtype",
2970                                                         origTblName,
2971                                                         RelationGetRelationName(rel),
2972                                                         NameStr(att->attname))));
2973                 }
2974                 else if (OidIsValid(rel->rd_rel->reltype))
2975                 {
2976                         /*
2977                          * A view or composite type itself isn't a problem, but we must
2978                          * recursively check for indirect dependencies via its rowtype.
2979                          */
2980                         find_composite_type_dependencies(rel->rd_rel->reltype,
2981                                                                                          origTblName);
2982                 }
2983
2984                 relation_close(rel, AccessShareLock);
2985         }
2986
2987         systable_endscan(depScan);
2988
2989         relation_close(depRel, AccessShareLock);
2990 }
2991
2992
2993 /*
2994  * ALTER TABLE ADD COLUMN
2995  *
2996  * Adds an additional attribute to a relation making the assumption that
2997  * CHECK, NOT NULL, and FOREIGN KEY constraints will be removed from the
2998  * AT_AddColumn AlterTableCmd by analyze.c and added as independent
2999  * AlterTableCmd's.
3000  */
3001 static void
3002 ATPrepAddColumn(List **wqueue, Relation rel, bool recurse,
3003                                 AlterTableCmd *cmd)
3004 {
3005         /*
3006          * Recurse to add the column to child classes, if requested.
3007          *
3008          * We must recurse one level at a time, so that multiply-inheriting
3009          * children are visited the right number of times and end up with the
3010          * right attinhcount.
3011          */
3012         if (recurse)
3013         {
3014                 AlterTableCmd *childCmd = copyObject(cmd);
3015                 ColumnDef  *colDefChild = (ColumnDef *) childCmd->def;
3016
3017                 /* Child should see column as singly inherited */
3018                 colDefChild->inhcount = 1;
3019                 colDefChild->is_local = false;
3020                 /* and don't make a support dependency on the child */
3021                 colDefChild->support = NULL;
3022
3023                 ATOneLevelRecursion(wqueue, rel, childCmd);
3024         }
3025         else
3026         {
3027                 /*
3028                  * If we are told not to recurse, there had better not be any child
3029                  * tables; else the addition would put them out of step.
3030                  */
3031                 if (find_inheritance_children(RelationGetRelid(rel)) != NIL)
3032                         ereport(ERROR,
3033                                         (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
3034                                          errmsg("column must be added to child tables too")));
3035         }
3036 }
3037
3038 static void
3039 ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
3040                                 ColumnDef *colDef)
3041 {
3042         Oid                     myrelid = RelationGetRelid(rel);
3043         Relation        pgclass,
3044                                 attrdesc;
3045         HeapTuple       reltup;
3046         HeapTuple       attributeTuple;
3047         Form_pg_attribute attribute;
3048         FormData_pg_attribute attributeD;
3049         int                     i;
3050         int                     minattnum,
3051                                 maxatts;
3052         HeapTuple       typeTuple;
3053         Oid                     typeOid;
3054         Form_pg_type tform;
3055         Expr       *defval;
3056
3057         attrdesc = heap_open(AttributeRelationId, RowExclusiveLock);
3058
3059         /*
3060          * Are we adding the column to a recursion child?  If so, check whether to
3061          * merge with an existing definition for the column.
3062          */
3063         if (colDef->inhcount > 0)
3064         {
3065                 HeapTuple       tuple;
3066
3067                 /* Does child already have a column by this name? */
3068                 tuple = SearchSysCacheCopyAttName(myrelid, colDef->colname);
3069                 if (HeapTupleIsValid(tuple))
3070                 {
3071                         Form_pg_attribute childatt = (Form_pg_attribute) GETSTRUCT(tuple);
3072
3073                         /* Okay if child matches by type */
3074                         if (typenameTypeId(NULL, colDef->typename) != childatt->atttypid ||
3075                                 colDef->typename->typmod != childatt->atttypmod)
3076                                 ereport(ERROR,
3077                                                 (errcode(ERRCODE_DATATYPE_MISMATCH),
3078                                                  errmsg("child table \"%s\" has different type for column \"%s\"",
3079                                                         RelationGetRelationName(rel), colDef->colname)));
3080
3081                         /* Bump the existing child att's inhcount */
3082                         childatt->attinhcount++;
3083                         simple_heap_update(attrdesc, &tuple->t_self, tuple);
3084                         CatalogUpdateIndexes(attrdesc, tuple);
3085
3086                         heap_freetuple(tuple);
3087
3088                         /* Inform the user about the merge */
3089                         ereport(NOTICE,
3090                           (errmsg("merging definition of column \"%s\" for child \"%s\"",
3091                                           colDef->colname, RelationGetRelationName(rel))));
3092
3093                         heap_close(attrdesc, RowExclusiveLock);
3094                         return;
3095                 }
3096         }
3097
3098         pgclass = heap_open(RelationRelationId, RowExclusiveLock);
3099
3100         reltup = SearchSysCacheCopy(RELOID,
3101                                                                 ObjectIdGetDatum(myrelid),
3102                                                                 0, 0, 0);
3103         if (!HeapTupleIsValid(reltup))
3104                 elog(ERROR, "cache lookup failed for relation %u", myrelid);
3105
3106         /*
3107          * this test is deliberately not attisdropped-aware, since if one tries to
3108          * add a column matching a dropped column name, it's gonna fail anyway.
3109          */
3110         if (SearchSysCacheExists(ATTNAME,
3111                                                          ObjectIdGetDatum(myrelid),
3112                                                          PointerGetDatum(colDef->colname),
3113                                                          0, 0))
3114                 ereport(ERROR,
3115                                 (errcode(ERRCODE_DUPLICATE_COLUMN),
3116                                  errmsg("column \"%s\" of relation \"%s\" already exists",
3117                                                 colDef->colname, RelationGetRelationName(rel))));
3118
3119         minattnum = ((Form_pg_class) GETSTRUCT(reltup))->relnatts;
3120         maxatts = minattnum + 1;
3121         if (maxatts > MaxHeapAttributeNumber)
3122                 ereport(ERROR,
3123                                 (errcode(ERRCODE_TOO_MANY_COLUMNS),
3124                                  errmsg("tables can have at most %d columns",
3125                                                 MaxHeapAttributeNumber)));
3126         i = minattnum + 1;
3127
3128         typeTuple = typenameType(NULL, colDef->typename);
3129         tform = (Form_pg_type) GETSTRUCT(typeTuple);
3130         typeOid = HeapTupleGetOid(typeTuple);
3131
3132         /* make sure datatype is legal for a column */
3133         CheckAttributeType(colDef->colname, typeOid);
3134
3135         attributeTuple = heap_addheader(Natts_pg_attribute,
3136                                                                         false,
3137                                                                         ATTRIBUTE_TUPLE_SIZE,
3138                                                                         (void *) &attributeD);
3139
3140         attribute = (Form_pg_attribute) GETSTRUCT(attributeTuple);
3141
3142         attribute->attrelid = myrelid;
3143         namestrcpy(&(attribute->attname), colDef->colname);
3144         attribute->atttypid = typeOid;
3145         attribute->attstattarget = -1;
3146         attribute->attlen = tform->typlen;
3147         attribute->attcacheoff = -1;
3148         attribute->atttypmod = colDef->typename->typmod;
3149         attribute->attnum = i;
3150         attribute->attbyval = tform->typbyval;
3151         attribute->attndims = list_length(colDef->typename->arrayBounds);
3152         attribute->attstorage = tform->typstorage;
3153         attribute->attalign = tform->typalign;
3154         attribute->attnotnull = colDef->is_not_null;
3155         attribute->atthasdef = false;
3156         attribute->attisdropped = false;
3157         attribute->attislocal = colDef->is_local;
3158         attribute->attinhcount = colDef->inhcount;
3159
3160         ReleaseSysCache(typeTuple);
3161
3162         simple_heap_insert(attrdesc, attributeTuple);
3163
3164         /* Update indexes on pg_attribute */
3165         CatalogUpdateIndexes(attrdesc, attributeTuple);
3166
3167         heap_close(attrdesc, RowExclusiveLock);
3168
3169         /*
3170          * Update number of attributes in pg_class tuple
3171          */
3172         ((Form_pg_class) GETSTRUCT(reltup))->relnatts = maxatts;
3173
3174         simple_heap_update(pgclass, &reltup->t_self, reltup);
3175
3176         /* keep catalog indexes current */
3177         CatalogUpdateIndexes(pgclass, reltup);
3178
3179         heap_freetuple(reltup);
3180
3181         heap_close(pgclass, RowExclusiveLock);
3182
3183         /* Make the attribute's catalog entry visible */
3184         CommandCounterIncrement();
3185
3186         /*
3187          * Store the DEFAULT, if any, in the catalogs
3188          */
3189         if (colDef->raw_default)
3190         {
3191                 RawColumnDefault *rawEnt;
3192
3193                 rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault));
3194                 rawEnt->attnum = attribute->attnum;
3195                 rawEnt->raw_default = copyObject(colDef->raw_default);
3196
3197                 /*
3198                  * This function is intended for CREATE TABLE, so it processes a
3199                  * _list_ of defaults, but we just do one.
3200                  */
3201                 AddRelationRawConstraints(rel, list_make1(rawEnt), NIL);
3202
3203                 /* Make the additional catalog changes visible */
3204                 CommandCounterIncrement();
3205         }
3206
3207         /*
3208          * Tell Phase 3 to fill in the default expression, if there is one.
3209          *
3210          * If there is no default, Phase 3 doesn't have to do anything, because
3211          * that effectively means that the default is NULL.  The heap tuple access
3212          * routines always check for attnum > # of attributes in tuple, and return
3213          * NULL if so, so without any modification of the tuple data we will get
3214          * the effect of NULL values in the new column.
3215          *
3216          * An exception occurs when the new column is of a domain type: the domain
3217          * might have a NOT NULL constraint, or a check constraint that indirectly
3218          * rejects nulls.  If there are any domain constraints then we construct
3219          * an explicit NULL default value that will be passed through
3220          * CoerceToDomain processing.  (This is a tad inefficient, since it causes
3221          * rewriting the table which we really don't have to do, but the present
3222          * design of domain processing doesn't offer any simple way of checking
3223          * the constraints more directly.)
3224          *
3225          * Note: we use build_column_default, and not just the cooked default
3226          * returned by AddRelationRawConstraints, so that the right thing happens
3227          * when a datatype's default applies.
3228          */
3229         defval = (Expr *) build_column_default(rel, attribute->attnum);
3230
3231         if (!defval && GetDomainConstraints(typeOid) != NIL)
3232         {
3233                 Oid                     basetype = getBaseType(typeOid);
3234
3235                 defval = (Expr *) makeNullConst(basetype);
3236                 defval = (Expr *) coerce_to_target_type(NULL,
3237                                                                                                 (Node *) defval,
3238                                                                                                 basetype,
3239                                                                                                 typeOid,
3240                                                                                                 colDef->typename->typmod,
3241                                                                                                 COERCION_ASSIGNMENT,
3242                                                                                                 COERCE_IMPLICIT_CAST);
3243                 if (defval == NULL)             /* should not happen */
3244                         elog(ERROR, "failed to coerce base type to domain");
3245         }
3246
3247         if (defval)
3248         {
3249                 NewColumnValue *newval;
3250
3251                 newval = (NewColumnValue *) palloc0(sizeof(NewColumnValue));
3252                 newval->attnum = attribute->attnum;
3253                 newval->expr = defval;
3254
3255                 tab->newvals = lappend(tab->newvals, newval);
3256         }
3257
3258         /*
3259          * Add needed dependency entries for the new column.
3260          */
3261         add_column_datatype_dependency(myrelid, i, attribute->atttypid);
3262         if (colDef->support != NULL)
3263                 add_column_support_dependency(myrelid, i, colDef->support);
3264 }
3265
3266 /*
3267  * Install a column's dependency on its datatype.
3268  */
3269 static void
3270 add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid)
3271 {
3272         ObjectAddress myself,
3273                                 referenced;
3274
3275         myself.classId = RelationRelationId;
3276         myself.objectId = relid;
3277         myself.objectSubId = attnum;
3278         referenced.classId = TypeRelationId;
3279         referenced.objectId = typid;
3280         referenced.objectSubId = 0;
3281         recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
3282 }
3283
3284 /*
3285  * Install a dependency for a column's supporting relation (serial sequence).
3286  */
3287 static void
3288 add_column_support_dependency(Oid relid, int32 attnum, RangeVar *support)
3289 {
3290         ObjectAddress colobject,
3291                                 suppobject;
3292
3293         colobject.classId = RelationRelationId;
3294         colobject.objectId = relid;
3295         colobject.objectSubId = attnum;
3296         suppobject.classId = RelationRelationId;
3297         suppobject.objectId = RangeVarGetRelid(support, false);
3298         suppobject.objectSubId = 0;
3299         recordDependencyOn(&suppobject, &colobject, DEPENDENCY_INTERNAL);
3300 }
3301
3302 /*
3303  * ALTER TABLE ALTER COLUMN DROP NOT NULL
3304  */
3305 static void
3306 ATExecDropNotNull(Relation rel, const char *colName)
3307 {
3308         HeapTuple       tuple;
3309         AttrNumber      attnum;
3310         Relation        attr_rel;
3311         List       *indexoidlist;
3312         ListCell   *indexoidscan;
3313
3314         /*
3315          * lookup the attribute
3316          */
3317         attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
3318
3319         tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
3320
3321         if (!HeapTupleIsValid(tuple))
3322                 ereport(ERROR,
3323                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
3324                                  errmsg("column \"%s\" of relation \"%s\" does not exist",
3325                                                 colName, RelationGetRelationName(rel))));
3326
3327         attnum = ((Form_pg_attribute) GETSTRUCT(tuple))->attnum;
3328
3329         /* Prevent them from altering a system attribute */
3330         if (attnum <= 0)
3331                 ereport(ERROR,
3332                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3333                                  errmsg("cannot alter system column \"%s\"",
3334                                                 colName)));
3335
3336         /*
3337          * Check that the attribute is not in a primary key
3338          */
3339
3340         /* Loop over all indexes on the relation */
3341         indexoidlist = RelationGetIndexList(rel);
3342
3343         foreach(indexoidscan, indexoidlist)
3344         {
3345                 Oid                     indexoid = lfirst_oid(indexoidscan);
3346                 HeapTuple       indexTuple;
3347                 Form_pg_index indexStruct;
3348                 int                     i;
3349
3350                 indexTuple = SearchSysCache(INDEXRELID,
3351                                                                         ObjectIdGetDatum(indexoid),
3352                                                                         0, 0, 0);
3353                 if (!HeapTupleIsValid(indexTuple))
3354                         elog(ERROR, "cache lookup failed for index %u", indexoid);
3355                 indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
3356
3357                 /* If the index is not a primary key, skip the check */
3358                 if (indexStruct->indisprimary)
3359                 {
3360                         /*
3361                          * Loop over each attribute in the primary key and see if it
3362                          * matches the to-be-altered attribute
3363                          */
3364                         for (i = 0; i < indexStruct->indnatts; i++)
3365                         {
3366                                 if (indexStruct->indkey.values[i] == attnum)
3367                                         ereport(ERROR,
3368                                                         (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
3369                                                          errmsg("column \"%s\" is in a primary key",
3370                                                                         colName)));
3371                         }
3372                 }
3373
3374                 ReleaseSysCache(indexTuple);
3375         }
3376
3377         list_free(indexoidlist);
3378
3379         /*
3380          * Okay, actually perform the catalog change ... if needed
3381          */
3382         if (((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull)
3383         {
3384                 ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull = FALSE;
3385
3386                 simple_heap_update(attr_rel, &tuple->t_self, tuple);
3387
3388                 /* keep the system catalog indexes current */
3389                 CatalogUpdateIndexes(attr_rel, tuple);
3390         }
3391
3392         heap_close(attr_rel, RowExclusiveLock);
3393 }
3394
3395 /*
3396  * ALTER TABLE ALTER COLUMN SET NOT NULL
3397  */
3398 static void
3399 ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
3400                                  const char *colName)
3401 {
3402         HeapTuple       tuple;
3403         AttrNumber      attnum;
3404         Relation        attr_rel;
3405
3406         /*
3407          * lookup the attribute
3408          */
3409         attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
3410
3411         tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
3412
3413         if (!HeapTupleIsValid(tuple))
3414                 ereport(ERROR,
3415                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
3416                                  errmsg("column \"%s\" of relation \"%s\" does not exist",
3417                                                 colName, RelationGetRelationName(rel))));
3418
3419         attnum = ((Form_pg_attribute) GETSTRUCT(tuple))->attnum;
3420
3421         /* Prevent them from altering a system attribute */
3422         if (attnum <= 0)
3423                 ereport(ERROR,
3424                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3425                                  errmsg("cannot alter system column \"%s\"",
3426                                                 colName)));
3427
3428         /*
3429          * Okay, actually perform the catalog change ... if needed
3430          */
3431         if (!((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull)
3432         {
3433                 ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull = TRUE;
3434
3435                 simple_heap_update(attr_rel, &tuple->t_self, tuple);
3436
3437                 /* keep the system catalog indexes current */
3438                 CatalogUpdateIndexes(attr_rel, tuple);
3439
3440                 /* Tell Phase 3 it needs to test the constraint */
3441                 tab->new_notnull = true;
3442         }
3443
3444         heap_close(attr_rel, RowExclusiveLock);
3445 }
3446
3447 /*
3448  * ALTER TABLE ALTER COLUMN SET/DROP DEFAULT
3449  */
3450 static void
3451 ATExecColumnDefault(Relation rel, const char *colName,
3452                                         Node *newDefault)
3453 {
3454         AttrNumber      attnum;
3455
3456         /*
3457          * get the number of the attribute
3458          */
3459         attnum = get_attnum(RelationGetRelid(rel), colName);
3460         if (attnum == InvalidAttrNumber)
3461                 ereport(ERROR,
3462                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
3463                                  errmsg("column \"%s\" of relation \"%s\" does not exist",
3464                                                 colName, RelationGetRelationName(rel))));
3465
3466         /* Prevent them from altering a system attribute */
3467         if (attnum <= 0)
3468                 ereport(ERROR,
3469                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3470                                  errmsg("cannot alter system column \"%s\"",
3471                                                 colName)));
3472
3473         /*
3474          * Remove any old default for the column.  We use RESTRICT here for
3475          * safety, but at present we do not expect anything to depend on the
3476          * default.
3477          */
3478         RemoveAttrDefault(RelationGetRelid(rel), attnum, DROP_RESTRICT, false);
3479
3480         if (newDefault)
3481         {
3482                 /* SET DEFAULT */
3483                 RawColumnDefault *rawEnt;
3484
3485                 rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault));
3486                 rawEnt->attnum = attnum;
3487                 rawEnt->raw_default = newDefault;
3488
3489                 /*
3490                  * This function is intended for CREATE TABLE, so it processes a
3491                  * _list_ of defaults, but we just do one.
3492                  */
3493                 AddRelationRawConstraints(rel, list_make1(rawEnt), NIL);
3494         }
3495 }
3496
3497 /*
3498  * ALTER TABLE ALTER COLUMN SET STATISTICS
3499  */
3500 static void
3501 ATPrepSetStatistics(Relation rel, const char *colName, Node *flagValue)
3502 {
3503         /*
3504          * We do our own permission checking because (a) we want to allow SET
3505          * STATISTICS on indexes (for expressional index columns), and (b) we want
3506          * to allow SET STATISTICS on system catalogs without requiring
3507          * allowSystemTableMods to be turned on.
3508          */
3509         if (rel->rd_rel->relkind != RELKIND_RELATION &&
3510                 rel->rd_rel->relkind != RELKIND_INDEX)
3511                 ereport(ERROR,
3512                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
3513                                  errmsg("\"%s\" is not a table or index",
3514                                                 RelationGetRelationName(rel))));
3515
3516         /* Permissions checks */
3517         if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
3518                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
3519                                            RelationGetRelationName(rel));
3520 }
3521
3522 static void
3523 ATExecSetStatistics(Relation rel, const char *colName, Node *newValue)
3524 {
3525         int                     newtarget;
3526         Relation        attrelation;
3527         HeapTuple       tuple;
3528         Form_pg_attribute attrtuple;
3529
3530         Assert(IsA(newValue, Integer));
3531         newtarget = intVal(newValue);
3532
3533         /*
3534          * Limit target to a sane range
3535          */
3536         if (newtarget < -1)
3537         {
3538                 ereport(ERROR,
3539                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3540                                  errmsg("statistics target %d is too low",
3541                                                 newtarget)));
3542         }
3543         else if (newtarget > 1000)
3544         {
3545                 newtarget = 1000;
3546                 ereport(WARNING,
3547                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3548                                  errmsg("lowering statistics target to %d",
3549                                                 newtarget)));
3550         }
3551
3552         attrelation = heap_open(AttributeRelationId, RowExclusiveLock);
3553
3554         tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
3555
3556         if (!HeapTupleIsValid(tuple))
3557                 ereport(ERROR,
3558                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
3559                                  errmsg("column \"%s\" of relation \"%s\" does not exist",
3560                                                 colName, RelationGetRelationName(rel))));
3561         attrtuple = (Form_pg_attribute) GETSTRUCT(tuple);
3562
3563         if (attrtuple->attnum <= 0)
3564                 ereport(ERROR,
3565                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3566                                  errmsg("cannot alter system column \"%s\"",
3567                                                 colName)));
3568
3569         attrtuple->attstattarget = newtarget;
3570
3571         simple_heap_update(attrelation, &tuple->t_self, tuple);
3572
3573         /* keep system catalog indexes current */
3574         CatalogUpdateIndexes(attrelation, tuple);
3575
3576         heap_freetuple(tuple);
3577
3578         heap_close(attrelation, RowExclusiveLock);
3579 }
3580
3581 /*
3582  * ALTER TABLE ALTER COLUMN SET STORAGE
3583  */
3584 static void
3585 ATExecSetStorage(Relation rel, const char *colName, Node *newValue)
3586 {
3587         char       *storagemode;
3588         char            newstorage;
3589         Relation        attrelation;
3590         HeapTuple       tuple;
3591         Form_pg_attribute attrtuple;
3592
3593         Assert(IsA(newValue, String));
3594         storagemode = strVal(newValue);
3595
3596         if (pg_strcasecmp(storagemode, "plain") == 0)
3597                 newstorage = 'p';
3598         else if (pg_strcasecmp(storagemode, "external") == 0)
3599                 newstorage = 'e';
3600         else if (pg_strcasecmp(storagemode, "extended") == 0)
3601                 newstorage = 'x';
3602         else if (pg_strcasecmp(storagemode, "main") == 0)
3603                 newstorage = 'm';
3604         else
3605         {
3606                 ereport(ERROR,
3607                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3608                                  errmsg("invalid storage type \"%s\"",
3609                                                 storagemode)));
3610                 newstorage = 0;                 /* keep compiler quiet */
3611         }
3612
3613         attrelation = heap_open(AttributeRelationId, RowExclusiveLock);
3614
3615         tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
3616
3617         if (!HeapTupleIsValid(tuple))
3618                 ereport(ERROR,
3619                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
3620                                  errmsg("column \"%s\" of relation \"%s\" does not exist",
3621                                                 colName, RelationGetRelationName(rel))));
3622         attrtuple = (Form_pg_attribute) GETSTRUCT(tuple);
3623
3624         if (attrtuple->attnum <= 0)
3625                 ereport(ERROR,
3626                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3627                                  errmsg("cannot alter system column \"%s\"",
3628                                                 colName)));
3629
3630         /*
3631          * safety check: do not allow toasted storage modes unless column datatype
3632          * is TOAST-aware.
3633          */
3634         if (newstorage == 'p' || TypeIsToastable(attrtuple->atttypid))
3635                 attrtuple->attstorage = newstorage;
3636         else
3637                 ereport(ERROR,
3638                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3639                                  errmsg("column data type %s can only have storage PLAIN",
3640                                                 format_type_be(attrtuple->atttypid))));
3641
3642         simple_heap_update(attrelation, &tuple->t_self, tuple);
3643
3644         /* keep system catalog indexes current */
3645         CatalogUpdateIndexes(attrelation, tuple);
3646
3647         heap_freetuple(tuple);
3648
3649         heap_close(attrelation, RowExclusiveLock);
3650 }
3651
3652
3653 /*
3654  * ALTER TABLE DROP COLUMN
3655  *
3656  * DROP COLUMN cannot use the normal ALTER TABLE recursion mechanism,
3657  * because we have to decide at runtime whether to recurse or not depending
3658  * on whether attinhcount goes to zero or not.  (We can't check this in a
3659  * static pre-pass because it won't handle multiple inheritance situations
3660  * correctly.)  Since DROP COLUMN doesn't need to create any work queue
3661  * entries for Phase 3, it's okay to recurse internally in this routine
3662  * without considering the work queue.
3663  */
3664 static void
3665 ATExecDropColumn(Relation rel, const char *colName,
3666                                  DropBehavior behavior,
3667                                  bool recurse, bool recursing)
3668 {
3669         HeapTuple       tuple;
3670         Form_pg_attribute targetatt;
3671         AttrNumber      attnum;
3672         List       *children;
3673         ObjectAddress object;
3674
3675         /* At top level, permission check was done in ATPrepCmd, else do it */
3676         if (recursing)
3677                 ATSimplePermissions(rel, false);
3678
3679         /*
3680          * get the number of the attribute
3681          */
3682         tuple = SearchSysCacheAttName(RelationGetRelid(rel), colName);
3683         if (!HeapTupleIsValid(tuple))
3684                 ereport(ERROR,
3685                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
3686                                  errmsg("column \"%s\" of relation \"%s\" does not exist",
3687                                                 colName, RelationGetRelationName(rel))));
3688         targetatt = (Form_pg_attribute) GETSTRUCT(tuple);
3689
3690         attnum = targetatt->attnum;
3691
3692         /* Can't drop a system attribute, except OID */
3693         if (attnum <= 0 && attnum != ObjectIdAttributeNumber)
3694                 ereport(ERROR,
3695                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3696                                  errmsg("cannot drop system column \"%s\"",
3697                                                 colName)));
3698
3699         /* Don't drop inherited columns */
3700         if (targetatt->attinhcount > 0 && !recursing)
3701                 ereport(ERROR,
3702                                 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
3703                                  errmsg("cannot drop inherited column \"%s\"",
3704                                                 colName)));
3705
3706         ReleaseSysCache(tuple);
3707
3708         /*
3709          * Propagate to children as appropriate.  Unlike most other ALTER
3710          * routines, we have to do this one level of recursion at a time; we can't
3711          * use find_all_inheritors to do it in one pass.
3712          */
3713         children = find_inheritance_children(RelationGetRelid(rel));
3714
3715         if (children)
3716         {
3717                 Relation        attr_rel;
3718                 ListCell   *child;
3719
3720                 attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
3721                 foreach(child, children)
3722                 {
3723                         Oid                     childrelid = lfirst_oid(child);
3724                         Relation        childrel;
3725                         Form_pg_attribute childatt;
3726
3727                         childrel = heap_open(childrelid, AccessExclusiveLock);
3728
3729                         tuple = SearchSysCacheCopyAttName(childrelid, colName);
3730                         if (!HeapTupleIsValid(tuple))           /* shouldn't happen */
3731                                 elog(ERROR, "cache lookup failed for attribute \"%s\" of relation %u",
3732                                          colName, childrelid);
3733                         childatt = (Form_pg_attribute) GETSTRUCT(tuple);
3734
3735                         if (childatt->attinhcount <= 0)         /* shouldn't happen */
3736                                 elog(ERROR, "relation %u has non-inherited attribute \"%s\"",
3737                                          childrelid, colName);
3738
3739                         if (recurse)
3740                         {
3741                                 /*
3742                                  * If the child column has other definition sources, just
3743                                  * decrement its inheritance count; if not, recurse to delete
3744                                  * it.
3745                                  */
3746                                 if (childatt->attinhcount == 1 && !childatt->attislocal)
3747                                 {
3748                                         /* Time to delete this child column, too */
3749                                         ATExecDropColumn(childrel, colName, behavior, true, true);
3750                                 }
3751                                 else
3752                                 {
3753                                         /* Child column must survive my deletion */
3754                                         childatt->attinhcount--;
3755
3756                                         simple_heap_update(attr_rel, &tuple->t_self, tuple);
3757
3758                                         /* keep the system catalog indexes current */
3759                                         CatalogUpdateIndexes(attr_rel, tuple);
3760
3761                                         /* Make update visible */
3762                                         CommandCounterIncrement();
3763                                 }
3764                         }
3765                         else
3766                         {
3767                                 /*
3768                                  * If we were told to drop ONLY in this table (no recursion),
3769                                  * we need to mark the inheritors' attribute as locally
3770                                  * defined rather than inherited.
3771                                  */
3772                                 childatt->attinhcount--;
3773                                 childatt->attislocal = true;
3774
3775                                 simple_heap_update(attr_rel, &tuple->t_self, tuple);
3776
3777                                 /* keep the system catalog indexes current */
3778                                 CatalogUpdateIndexes(attr_rel, tuple);
3779
3780                                 /* Make update visible */
3781                                 CommandCounterIncrement();
3782                         }
3783
3784                         heap_freetuple(tuple);
3785
3786                         heap_close(childrel, NoLock);
3787                 }
3788                 heap_close(attr_rel, RowExclusiveLock);
3789         }
3790
3791         /*
3792          * Perform the actual column deletion
3793          */
3794         object.classId = RelationRelationId;
3795         object.objectId = RelationGetRelid(rel);
3796         object.objectSubId = attnum;
3797
3798         performDeletion(&object, behavior);
3799
3800         /*
3801          * If we dropped the OID column, must adjust pg_class.relhasoids
3802          */
3803         if (attnum == ObjectIdAttributeNumber)
3804         {
3805                 Relation        class_rel;
3806                 Form_pg_class tuple_class;
3807
3808                 class_rel = heap_open(RelationRelationId, RowExclusiveLock);
3809
3810                 tuple = SearchSysCacheCopy(RELOID,
3811                                                                    ObjectIdGetDatum(RelationGetRelid(rel)),
3812                                                                    0, 0, 0);
3813                 if (!HeapTupleIsValid(tuple))
3814                         elog(ERROR, "cache lookup failed for relation %u",
3815                                  RelationGetRelid(rel));
3816                 tuple_class = (Form_pg_class) GETSTRUCT(tuple);
3817
3818                 tuple_class->relhasoids = false;
3819                 simple_heap_update(class_rel, &tuple->t_self, tuple);
3820
3821                 /* Keep the catalog indexes up to date */
3822                 CatalogUpdateIndexes(class_rel, tuple);
3823
3824                 heap_close(class_rel, RowExclusiveLock);
3825         }
3826 }
3827
3828 /*
3829  * ALTER TABLE ADD INDEX
3830  *
3831  * There is no such command in the grammar, but the parser converts UNIQUE
3832  * and PRIMARY KEY constraints into AT_AddIndex subcommands.  This lets us
3833  * schedule creation of the index at the appropriate time during ALTER.
3834  */
3835 static void
3836 ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
3837                            IndexStmt *stmt, bool is_rebuild)
3838 {
3839         bool            check_rights;
3840         bool            skip_build;
3841         bool            quiet;
3842
3843         Assert(IsA(stmt, IndexStmt));
3844
3845         /* suppress schema rights check when rebuilding existing index */
3846         check_rights = !is_rebuild;
3847         /* skip index build if phase 3 will have to rewrite table anyway */
3848         skip_build = (tab->newvals != NIL);
3849         /* suppress notices when rebuilding existing index */
3850         quiet = is_rebuild;
3851
3852         DefineIndex(stmt->relation, /* relation */
3853                                 stmt->idxname,  /* index name */
3854                                 InvalidOid,             /* no predefined OID */
3855                                 stmt->accessMethod,             /* am name */
3856                                 stmt->tableSpace,
3857                                 stmt->indexParams,              /* parameters */
3858                                 (Expr *) stmt->whereClause,
3859                                 stmt->rangetable,
3860                                 stmt->options,
3861                                 stmt->unique,
3862                                 stmt->primary,
3863                                 stmt->isconstraint,
3864                                 true,                   /* is_alter_table */
3865                                 check_rights,
3866                                 skip_build,
3867                                 quiet);
3868 }
3869
3870 /*
3871  * ALTER TABLE ADD CONSTRAINT
3872  */
3873 static void
3874 ATExecAddConstraint(AlteredTableInfo *tab, Relation rel, Node *newConstraint)
3875 {
3876         switch (nodeTag(newConstraint))
3877         {
3878                 case T_Constraint:
3879                         {
3880                                 Constraint *constr = (Constraint *) newConstraint;
3881
3882                                 /*
3883                                  * Currently, we only expect to see CONSTR_CHECK nodes
3884                                  * arriving here (see the preprocessing done in
3885                                  * parser/analyze.c).  Use a switch anyway to make it easier
3886                                  * to add more code later.
3887                                  */
3888                                 switch (constr->contype)
3889                                 {
3890                                         case CONSTR_CHECK:
3891                                                 {
3892                                                         List       *newcons;
3893                                                         ListCell   *lcon;
3894
3895                                                         /*
3896                                                          * Call AddRelationRawConstraints to do the work.
3897                                                          * It returns a list of cooked constraints.
3898                                                          */
3899                                                         newcons = AddRelationRawConstraints(rel, NIL,
3900                                                                                                                  list_make1(constr));
3901                                                         /* Add each constraint to Phase 3's queue */
3902                                                         foreach(lcon, newcons)
3903                                                         {
3904                                                                 CookedConstraint *ccon = (CookedConstraint *) lfirst(lcon);
3905                                                                 NewConstraint *newcon;
3906
3907                                                                 newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
3908                                                                 newcon->name = ccon->name;
3909                                                                 newcon->contype = ccon->contype;
3910                                                                 /* ExecQual wants implicit-AND format */
3911                                                                 newcon->qual = (Node *)
3912                                                                         make_ands_implicit((Expr *) ccon->expr);
3913
3914                                                                 tab->constraints = lappend(tab->constraints,
3915                                                                                                                    newcon);
3916                                                         }
3917                                                         break;
3918                                                 }
3919                                         default:
3920                                                 elog(ERROR, "unrecognized constraint type: %d",
3921                                                          (int) constr->contype);
3922                                 }
3923                                 break;
3924                         }
3925                 case T_FkConstraint:
3926                         {
3927                                 FkConstraint *fkconstraint = (FkConstraint *) newConstraint;
3928
3929                                 /*
3930                                  * Assign or validate constraint name
3931                                  */
3932                                 if (fkconstraint->constr_name)
3933                                 {
3934                                         if (ConstraintNameIsUsed(CONSTRAINT_RELATION,
3935                                                                                          RelationGetRelid(rel),
3936                                                                                          RelationGetNamespace(rel),
3937                                                                                          fkconstraint->constr_name))
3938                                                 ereport(ERROR,
3939                                                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
3940                                                                  errmsg("constraint \"%s\" for relation \"%s\" already exists",
3941                                                                                 fkconstraint->constr_name,
3942                                                                                 RelationGetRelationName(rel))));
3943                                 }
3944                                 else
3945                                         fkconstraint->constr_name =
3946                                                 ChooseConstraintName(RelationGetRelationName(rel),
3947                                                                         strVal(linitial(fkconstraint->fk_attrs)),
3948                                                                                          "fkey",
3949                                                                                          RelationGetNamespace(rel),
3950                                                                                          NIL);
3951
3952                                 ATAddForeignKeyConstraint(tab, rel, fkconstraint);
3953
3954                                 break;
3955                         }
3956                 default:
3957                         elog(ERROR, "unrecognized node type: %d",
3958                                  (int) nodeTag(newConstraint));
3959         }
3960 }
3961
3962 /*
3963  * Add a foreign-key constraint to a single table
3964  *
3965  * Subroutine for ATExecAddConstraint.  Must already hold exclusive
3966  * lock on the rel, and have done appropriate validity/permissions checks
3967  * for it.
3968  */
3969 static void
3970 ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
3971                                                   FkConstraint *fkconstraint)
3972 {
3973         Relation        pkrel;
3974         AclResult       aclresult;
3975         int16           pkattnum[INDEX_MAX_KEYS];
3976         int16           fkattnum[INDEX_MAX_KEYS];
3977         Oid                     pktypoid[INDEX_MAX_KEYS];
3978         Oid                     fktypoid[INDEX_MAX_KEYS];
3979         Oid                     opclasses[INDEX_MAX_KEYS];
3980         int                     i;
3981         int                     numfks,
3982                                 numpks;
3983         Oid                     indexOid;
3984         Oid                     constrOid;
3985
3986         /*
3987          * Grab an exclusive lock on the pk table, so that someone doesn't delete
3988          * rows out from under us. (Although a lesser lock would do for that
3989          * purpose, we'll need exclusive lock anyway to add triggers to the pk
3990          * table; trying to start with a lesser lock will just create a risk of
3991          * deadlock.)
3992          */
3993         pkrel = heap_openrv(fkconstraint->pktable, AccessExclusiveLock);
3994
3995         /*
3996          * Validity and permissions checks
3997          *
3998          * Note: REFERENCES permissions checks are redundant with CREATE TRIGGER,
3999          * but we may as well error out sooner instead of later.
4000          */
4001         if (pkrel->rd_rel->relkind != RELKIND_RELATION)
4002                 ereport(ERROR,
4003                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
4004                                  errmsg("referenced relation \"%s\" is not a table",
4005                                                 RelationGetRelationName(pkrel))));
4006
4007         aclresult = pg_class_aclcheck(RelationGetRelid(pkrel), GetUserId(),
4008                                                                   ACL_REFERENCES);
4009         if (aclresult != ACLCHECK_OK)
4010                 aclcheck_error(aclresult, ACL_KIND_CLASS,
4011                                            RelationGetRelationName(pkrel));
4012
4013         if (!allowSystemTableMods && IsSystemRelation(pkrel))
4014                 ereport(ERROR,
4015                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
4016                                  errmsg("permission denied: \"%s\" is a system catalog",
4017                                                 RelationGetRelationName(pkrel))));
4018
4019         aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
4020                                                                   ACL_REFERENCES);
4021         if (aclresult != ACLCHECK_OK)
4022                 aclcheck_error(aclresult, ACL_KIND_CLASS,
4023                                            RelationGetRelationName(rel));
4024
4025         /*
4026          * Disallow reference from permanent table to temp table or vice versa.
4027          * (The ban on perm->temp is for fairly obvious reasons.  The ban on
4028          * temp->perm is because other backends might need to run the RI triggers
4029          * on the perm table, but they can't reliably see tuples the owning
4030          * backend has created in the temp table, because non-shared buffers are
4031          * used for temp tables.)
4032          */
4033         if (isTempNamespace(RelationGetNamespace(pkrel)))
4034         {
4035                 if (!isTempNamespace(RelationGetNamespace(rel)))
4036                         ereport(ERROR,
4037                                         (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
4038                                          errmsg("cannot reference temporary table from permanent table constraint")));
4039         }
4040         else
4041         {
4042                 if (isTempNamespace(RelationGetNamespace(rel)))
4043                         ereport(ERROR,
4044                                         (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
4045                                          errmsg("cannot reference permanent table from temporary table constraint")));
4046         }
4047
4048         /*
4049          * Look up the referencing attributes to make sure they exist, and record
4050          * their attnums and type OIDs.
4051          */
4052         MemSet(pkattnum, 0, sizeof(pkattnum));
4053         MemSet(fkattnum, 0, sizeof(fkattnum));
4054         MemSet(pktypoid, 0, sizeof(pktypoid));
4055         MemSet(fktypoid, 0, sizeof(fktypoid));
4056         MemSet(opclasses, 0, sizeof(opclasses));
4057
4058         numfks = transformColumnNameList(RelationGetRelid(rel),
4059                                                                          fkconstraint->fk_attrs,
4060                                                                          fkattnum, fktypoid);
4061
4062         /*
4063          * If the attribute list for the referenced table was omitted, lookup the
4064          * definition of the primary key and use it.  Otherwise, validate the
4065          * supplied attribute list.  In either case, discover the index OID and
4066          * index opclasses, and the attnums and type OIDs of the attributes.
4067          */
4068         if (fkconstraint->pk_attrs == NIL)
4069         {
4070                 numpks = transformFkeyGetPrimaryKey(pkrel, &indexOid,
4071                                                                                         &fkconstraint->pk_attrs,
4072                                                                                         pkattnum, pktypoid,
4073                                                                                         opclasses);
4074         }
4075         else
4076         {
4077                 numpks = transformColumnNameList(RelationGetRelid(pkrel),
4078                                                                                  fkconstraint->pk_attrs,
4079                                                                                  pkattnum, pktypoid);
4080                 /* Look for an index matching the column list */
4081                 indexOid = transformFkeyCheckAttrs(pkrel, numpks, pkattnum,
4082                                                                                    opclasses);
4083         }
4084
4085         /* Be sure referencing and referenced column types are comparable */
4086         if (numfks != numpks)
4087                 ereport(ERROR,
4088                                 (errcode(ERRCODE_INVALID_FOREIGN_KEY),
4089                                  errmsg("number of referencing and referenced columns for foreign key disagree")));
4090
4091         for (i = 0; i < numpks; i++)
4092         {
4093                 /*
4094                  * pktypoid[i] is the primary key table's i'th key's type fktypoid[i]
4095                  * is the foreign key table's i'th key's type
4096                  *
4097                  * Note that we look for an operator with the PK type on the left;
4098                  * when the types are different this is critical because the PK index
4099                  * will need operators with the indexkey on the left. (Ordinarily both
4100                  * commutator operators will exist if either does, but we won't get
4101                  * the right answer from the test below on opclass membership unless
4102                  * we select the proper operator.)
4103                  */
4104                 Operator        o = oper(NULL, list_make1(makeString("=")),
4105                                                          pktypoid[i], fktypoid[i],
4106                                                          true, -1);
4107
4108                 if (o == NULL)
4109                         ereport(ERROR,
4110                                         (errcode(ERRCODE_UNDEFINED_FUNCTION),
4111                                          errmsg("foreign key constraint \"%s\" "
4112                                                         "cannot be implemented",
4113                                                         fkconstraint->constr_name),
4114                                          errdetail("Key columns \"%s\" and \"%s\" "
4115                                                            "are of incompatible types: %s and %s.",
4116                                                            strVal(list_nth(fkconstraint->fk_attrs, i)),
4117                                                            strVal(list_nth(fkconstraint->pk_attrs, i)),
4118                                                            format_type_be(fktypoid[i]),
4119                                                            format_type_be(pktypoid[i]))));
4120
4121                 /*
4122                  * Check that the found operator is compatible with the PK index, and
4123                  * generate a warning if not, since otherwise costly seqscans will be
4124                  * incurred to check FK validity.
4125                  */
4126                 if (!op_in_opclass(oprid(o), opclasses[i]))
4127                         ereport(WARNING,
4128                                         (errmsg("foreign key constraint \"%s\" "
4129                                                         "will require costly sequential scans",
4130                                                         fkconstraint->constr_name),
4131                                          errdetail("Key columns \"%s\" and \"%s\" "
4132                                                            "are of different types: %s and %s.",
4133                                                            strVal(list_nth(fkconstraint->fk_attrs, i)),
4134                                                            strVal(list_nth(fkconstraint->pk_attrs, i)),
4135                                                            format_type_be(fktypoid[i]),
4136                                                            format_type_be(pktypoid[i]))));
4137
4138                 ReleaseSysCache(o);
4139         }
4140
4141         /*
4142          * Tell Phase 3 to check that the constraint is satisfied by existing rows
4143          * (we can skip this during table creation).
4144          */
4145         if (!fkconstraint->skip_validation)
4146         {
4147                 NewConstraint *newcon;
4148
4149                 newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
4150                 newcon->name = fkconstraint->constr_name;
4151                 newcon->contype = CONSTR_FOREIGN;
4152                 newcon->refrelid = RelationGetRelid(pkrel);
4153                 newcon->qual = (Node *) fkconstraint;
4154
4155                 tab->constraints = lappend(tab->constraints, newcon);
4156         }
4157
4158         /*
4159          * Record the FK constraint in pg_constraint.
4160          */
4161         constrOid = CreateConstraintEntry(fkconstraint->constr_name,
4162                                                                           RelationGetNamespace(rel),
4163                                                                           CONSTRAINT_FOREIGN,
4164                                                                           fkconstraint->deferrable,
4165                                                                           fkconstraint->initdeferred,
4166                                                                           RelationGetRelid(rel),
4167                                                                           fkattnum,
4168                                                                           numfks,
4169                                                                           InvalidOid,           /* not a domain
4170                                                                                                                  * constraint */
4171                                                                           RelationGetRelid(pkrel),
4172                                                                           pkattnum,
4173                                                                           numpks,
4174                                                                           fkconstraint->fk_upd_action,
4175                                                                           fkconstraint->fk_del_action,
4176                                                                           fkconstraint->fk_matchtype,
4177                                                                           indexOid,
4178                                                                           NULL,         /* no check constraint */
4179                                                                           NULL,
4180                                                                           NULL);
4181
4182         /*
4183          * Create the triggers that will enforce the constraint.
4184          */
4185         createForeignKeyTriggers(rel, fkconstraint, constrOid);
4186
4187         /*
4188          * Close pk table, but keep lock until we've committed.
4189          */
4190         heap_close(pkrel, NoLock);
4191 }
4192
4193
4194 /*
4195  * transformColumnNameList - transform list of column names
4196  *
4197  * Lookup each name and return its attnum and type OID
4198  */
4199 static int
4200 transformColumnNameList(Oid relId, List *colList,
4201                                                 int16 *attnums, Oid *atttypids)
4202 {
4203         ListCell   *l;
4204         int                     attnum;
4205
4206         attnum = 0;
4207         foreach(l, colList)
4208         {
4209                 char       *attname = strVal(lfirst(l));
4210                 HeapTuple       atttuple;
4211
4212                 atttuple = SearchSysCacheAttName(relId, attname);
4213                 if (!HeapTupleIsValid(atttuple))
4214                         ereport(ERROR,
4215                                         (errcode(ERRCODE_UNDEFINED_COLUMN),
4216                                          errmsg("column \"%s\" referenced in foreign key constraint does not exist",
4217                                                         attname)));
4218                 if (attnum >= INDEX_MAX_KEYS)
4219                         ereport(ERROR,
4220                                         (errcode(ERRCODE_TOO_MANY_COLUMNS),
4221                                          errmsg("cannot have more than %d keys in a foreign key",
4222                                                         INDEX_MAX_KEYS)));
4223                 attnums[attnum] = ((Form_pg_attribute) GETSTRUCT(atttuple))->attnum;
4224                 atttypids[attnum] = ((Form_pg_attribute) GETSTRUCT(atttuple))->atttypid;
4225                 ReleaseSysCache(atttuple);
4226                 attnum++;
4227         }
4228
4229         return attnum;
4230 }
4231
4232 /*
4233  * transformFkeyGetPrimaryKey -
4234  *
4235  *      Look up the names, attnums, and types of the primary key attributes
4236  *      for the pkrel.  Also return the index OID and index opclasses of the
4237  *      index supporting the primary key.
4238  *
4239  *      All parameters except pkrel are output parameters.      Also, the function
4240  *      return value is the number of attributes in the primary key.
4241  *
4242  *      Used when the column list in the REFERENCES specification is omitted.
4243  */
4244 static int
4245 transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
4246                                                    List **attnamelist,
4247                                                    int16 *attnums, Oid *atttypids,
4248                                                    Oid *opclasses)
4249 {
4250         List       *indexoidlist;
4251         ListCell   *indexoidscan;
4252         HeapTuple       indexTuple = NULL;
4253         Form_pg_index indexStruct = NULL;
4254         Datum           indclassDatum;
4255         bool            isnull;
4256         oidvector  *indclass;
4257         int                     i;
4258
4259         /*
4260          * Get the list of index OIDs for the table from the relcache, and look up
4261          * each one in the pg_index syscache until we find one marked primary key
4262          * (hopefully there isn't more than one such).
4263          */
4264         *indexOid = InvalidOid;
4265
4266         indexoidlist = RelationGetIndexList(pkrel);
4267
4268         foreach(indexoidscan, indexoidlist)
4269         {
4270                 Oid                     indexoid = lfirst_oid(indexoidscan);
4271
4272                 indexTuple = SearchSysCache(INDEXRELID,
4273                                                                         ObjectIdGetDatum(indexoid),
4274                                                                         0, 0, 0);
4275                 if (!HeapTupleIsValid(indexTuple))
4276                         elog(ERROR, "cache lookup failed for index %u", indexoid);
4277                 indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
4278                 if (indexStruct->indisprimary)
4279                 {
4280                         *indexOid = indexoid;
4281                         break;
4282                 }
4283                 ReleaseSysCache(indexTuple);
4284         }
4285
4286         list_free(indexoidlist);
4287
4288         /*
4289          * Check that we found it
4290          */
4291         if (!OidIsValid(*indexOid))
4292                 ereport(ERROR,
4293                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
4294                                  errmsg("there is no primary key for referenced table \"%s\"",
4295                                                 RelationGetRelationName(pkrel))));
4296
4297         /* Must get indclass the hard way */
4298         indclassDatum = SysCacheGetAttr(INDEXRELID, indexTuple,
4299                                                                         Anum_pg_index_indclass, &isnull);
4300         Assert(!isnull);
4301         indclass = (oidvector *) DatumGetPointer(indclassDatum);
4302
4303         /*
4304          * Now build the list of PK attributes from the indkey definition (we
4305          * assume a primary key cannot have expressional elements)
4306          */
4307         *attnamelist = NIL;
4308         for (i = 0; i < indexStruct->indnatts; i++)
4309         {
4310                 int                     pkattno = indexStruct->indkey.values[i];
4311
4312                 attnums[i] = pkattno;
4313                 atttypids[i] = attnumTypeId(pkrel, pkattno);
4314                 opclasses[i] = indclass->values[i];
4315                 *attnamelist = lappend(*attnamelist,
4316                            makeString(pstrdup(NameStr(*attnumAttName(pkrel, pkattno)))));
4317         }
4318
4319         ReleaseSysCache(indexTuple);
4320
4321         return i;
4322 }
4323
4324 /*
4325  * transformFkeyCheckAttrs -
4326  *
4327  *      Make sure that the attributes of a referenced table belong to a unique
4328  *      (or primary key) constraint.  Return the OID of the index supporting
4329  *      the constraint, as well as the opclasses associated with the index
4330  *      columns.
4331  */
4332 static Oid
4333 transformFkeyCheckAttrs(Relation pkrel,
4334                                                 int numattrs, int16 *attnums,
4335                                                 Oid *opclasses) /* output parameter */
4336 {
4337         Oid                     indexoid = InvalidOid;
4338         bool            found = false;
4339         List       *indexoidlist;
4340         ListCell   *indexoidscan;
4341
4342         /*
4343          * Get the list of index OIDs for the table from the relcache, and look up
4344          * each one in the pg_index syscache, and match unique indexes to the list
4345          * of attnums we are given.
4346          */
4347         indexoidlist = RelationGetIndexList(pkrel);
4348
4349         foreach(indexoidscan, indexoidlist)
4350         {
4351                 HeapTuple       indexTuple;
4352                 Form_pg_index indexStruct;
4353                 int                     i,
4354                                         j;
4355
4356                 indexoid = lfirst_oid(indexoidscan);
4357                 indexTuple = SearchSysCache(INDEXRELID,
4358                                                                         ObjectIdGetDatum(indexoid),
4359                                                                         0, 0, 0);
4360                 if (!HeapTupleIsValid(indexTuple))
4361                         elog(ERROR, "cache lookup failed for index %u", indexoid);
4362                 indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
4363
4364                 /*
4365                  * Must have the right number of columns; must be unique and not a
4366                  * partial index; forget it if there are any expressions, too
4367                  */
4368                 if (indexStruct->indnatts == numattrs &&
4369                         indexStruct->indisunique &&
4370                         heap_attisnull(indexTuple, Anum_pg_index_indpred) &&
4371                         heap_attisnull(indexTuple, Anum_pg_index_indexprs))
4372                 {
4373                         /* Must get indclass the hard way */
4374                         Datum           indclassDatum;
4375                         bool            isnull;
4376                         oidvector  *indclass;
4377
4378                         indclassDatum = SysCacheGetAttr(INDEXRELID, indexTuple,
4379                                                                                         Anum_pg_index_indclass, &isnull);
4380                         Assert(!isnull);
4381                         indclass = (oidvector *) DatumGetPointer(indclassDatum);
4382
4383                         /*
4384                          * The given attnum list may match the index columns in any order.
4385                          * Check that each list is a subset of the other.
4386                          */
4387                         for (i = 0; i < numattrs; i++)
4388                         {
4389                                 found = false;
4390                                 for (j = 0; j < numattrs; j++)
4391                                 {
4392                                         if (attnums[i] == indexStruct->indkey.values[j])
4393                                         {
4394                                                 found = true;
4395                                                 break;
4396                                         }
4397                                 }
4398                                 if (!found)
4399                                         break;
4400                         }
4401                         if (found)
4402                         {
4403                                 for (i = 0; i < numattrs; i++)
4404                                 {
4405                                         found = false;
4406                                         for (j = 0; j < numattrs; j++)
4407                                         {
4408                                                 if (attnums[j] == indexStruct->indkey.values[i])
4409                                                 {
4410                                                         opclasses[j] = indclass->values[i];
4411                                                         found = true;
4412                                                         break;
4413                                                 }
4414                                         }
4415                                         if (!found)
4416                                                 break;
4417                                 }
4418                         }
4419                 }
4420                 ReleaseSysCache(indexTuple);
4421                 if (found)
4422                         break;
4423         }
4424
4425         if (!found)
4426                 ereport(ERROR,
4427                                 (errcode(ERRCODE_INVALID_FOREIGN_KEY),
4428                                  errmsg("there is no unique constraint matching given keys for referenced table \"%s\"",
4429                                                 RelationGetRelationName(pkrel))));
4430
4431         list_free(indexoidlist);
4432
4433         return indexoid;
4434 }
4435
4436 /*
4437  * Scan the existing rows in a table to verify they meet a proposed FK
4438  * constraint.
4439  *
4440  * Caller must have opened and locked both relations.
4441  */
4442 static void
4443 validateForeignKeyConstraint(FkConstraint *fkconstraint,
4444                                                          Relation rel,
4445                                                          Relation pkrel)
4446 {
4447         HeapScanDesc scan;
4448         HeapTuple       tuple;
4449         Trigger         trig;
4450         ListCell   *list;
4451         int                     count;
4452
4453         /*
4454          * See if we can do it with a single LEFT JOIN query.  A FALSE result
4455          * indicates we must proceed with the fire-the-trigger method.
4456          */
4457         if (RI_Initial_Check(fkconstraint, rel, pkrel))
4458                 return;
4459
4460         /*
4461          * Scan through each tuple, calling RI_FKey_check_ins (insert trigger) as
4462          * if that tuple had just been inserted.  If any of those fail, it should
4463          * ereport(ERROR) and that's that.
4464          */
4465         MemSet(&trig, 0, sizeof(trig));
4466         trig.tgoid = InvalidOid;
4467         trig.tgname = fkconstraint->constr_name;
4468         trig.tgenabled = TRUE;
4469         trig.tgisconstraint = TRUE;
4470         trig.tgconstrrelid = RelationGetRelid(pkrel);
4471         trig.tgdeferrable = FALSE;
4472         trig.tginitdeferred = FALSE;
4473
4474         trig.tgargs = (char **) palloc(sizeof(char *) *
4475                                                                    (4 + list_length(fkconstraint->fk_attrs)
4476                                                                         + list_length(fkconstraint->pk_attrs)));
4477
4478         trig.tgargs[0] = trig.tgname;
4479         trig.tgargs[1] = RelationGetRelationName(rel);
4480         trig.tgargs[2] = RelationGetRelationName(pkrel);
4481         trig.tgargs[3] = fkMatchTypeToString(fkconstraint->fk_matchtype);
4482         count = 4;
4483         foreach(list, fkconstraint->fk_attrs)
4484         {
4485                 char       *fk_at = strVal(lfirst(list));
4486
4487                 trig.tgargs[count] = fk_at;
4488                 count += 2;
4489         }
4490         count = 5;
4491         foreach(list, fkconstraint->pk_attrs)
4492         {
4493                 char       *pk_at = strVal(lfirst(list));
4494
4495                 trig.tgargs[count] = pk_at;
4496                 count += 2;
4497         }
4498         trig.tgnargs = count - 1;
4499
4500         scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
4501
4502         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
4503         {
4504                 FunctionCallInfoData fcinfo;
4505                 TriggerData trigdata;
4506
4507                 /*
4508                  * Make a call to the trigger function
4509                  *
4510                  * No parameters are passed, but we do set a context
4511                  */
4512                 MemSet(&fcinfo, 0, sizeof(fcinfo));
4513
4514                 /*
4515                  * We assume RI_FKey_check_ins won't look at flinfo...
4516                  */
4517                 trigdata.type = T_TriggerData;
4518                 trigdata.tg_event = TRIGGER_EVENT_INSERT | TRIGGER_EVENT_ROW;
4519                 trigdata.tg_relation = rel;
4520                 trigdata.tg_trigtuple = tuple;
4521                 trigdata.tg_newtuple = NULL;
4522                 trigdata.tg_trigger = &trig;
4523                 trigdata.tg_trigtuplebuf = scan->rs_cbuf;
4524                 trigdata.tg_newtuplebuf = InvalidBuffer;
4525
4526                 fcinfo.context = (Node *) &trigdata;
4527
4528                 RI_FKey_check_ins(&fcinfo);
4529         }
4530
4531         heap_endscan(scan);
4532
4533         pfree(trig.tgargs);
4534 }
4535
4536 static void
4537 CreateFKCheckTrigger(RangeVar *myRel, FkConstraint *fkconstraint,
4538                                          ObjectAddress *constrobj, ObjectAddress *trigobj,
4539                                          bool on_insert)
4540 {
4541         CreateTrigStmt *fk_trigger;
4542         ListCell   *fk_attr;
4543         ListCell   *pk_attr;
4544
4545         fk_trigger = makeNode(CreateTrigStmt);
4546         fk_trigger->trigname = fkconstraint->constr_name;
4547         fk_trigger->relation = myRel;
4548         fk_trigger->before = false;
4549         fk_trigger->row = true;
4550
4551         /* Either ON INSERT or ON UPDATE */
4552         if (on_insert)
4553         {
4554                 fk_trigger->funcname = SystemFuncName("RI_FKey_check_ins");
4555                 fk_trigger->actions[0] = 'i';
4556         }
4557         else
4558         {
4559                 fk_trigger->funcname = SystemFuncName("RI_FKey_check_upd");
4560                 fk_trigger->actions[0] = 'u';
4561         }
4562         fk_trigger->actions[1] = '\0';
4563
4564         fk_trigger->isconstraint = true;
4565         fk_trigger->deferrable = fkconstraint->deferrable;
4566         fk_trigger->initdeferred = fkconstraint->initdeferred;
4567         fk_trigger->constrrel = fkconstraint->pktable;
4568
4569         fk_trigger->args = NIL;
4570         fk_trigger->args = lappend(fk_trigger->args,
4571                                                            makeString(fkconstraint->constr_name));
4572         fk_trigger->args = lappend(fk_trigger->args,
4573                                                            makeString(myRel->relname));
4574         fk_trigger->args = lappend(fk_trigger->args,
4575                                                            makeString(fkconstraint->pktable->relname));
4576         fk_trigger->args = lappend(fk_trigger->args,
4577                                 makeString(fkMatchTypeToString(fkconstraint->fk_matchtype)));
4578         if (list_length(fkconstraint->fk_attrs) != list_length(fkconstraint->pk_attrs))
4579                 ereport(ERROR,
4580                                 (errcode(ERRCODE_INVALID_FOREIGN_KEY),
4581                                  errmsg("number of referencing and referenced columns for foreign key disagree")));
4582
4583         forboth(fk_attr, fkconstraint->fk_attrs,
4584                         pk_attr, fkconstraint->pk_attrs)
4585         {
4586                 fk_trigger->args = lappend(fk_trigger->args, lfirst(fk_attr));
4587                 fk_trigger->args = lappend(fk_trigger->args, lfirst(pk_attr));
4588         }
4589
4590         trigobj->objectId = CreateTrigger(fk_trigger, true);
4591
4592         /* Register dependency from trigger to constraint */
4593         recordDependencyOn(trigobj, constrobj, DEPENDENCY_INTERNAL);
4594
4595         /* Make changes-so-far visible */
4596         CommandCounterIncrement();
4597 }
4598
4599 /*
4600  * Create the triggers that implement an FK constraint.
4601  */
4602 static void
4603 createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint,
4604                                                  Oid constrOid)
4605 {
4606         RangeVar   *myRel;
4607         CreateTrigStmt *fk_trigger;
4608         ListCell   *fk_attr;
4609         ListCell   *pk_attr;
4610         ObjectAddress trigobj,
4611                                 constrobj;
4612
4613         /*
4614          * Reconstruct a RangeVar for my relation (not passed in, unfortunately).
4615          */
4616         myRel = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)),
4617                                                  pstrdup(RelationGetRelationName(rel)));
4618
4619         /*
4620          * Preset objectAddress fields
4621          */
4622         constrobj.classId = ConstraintRelationId;
4623         constrobj.objectId = constrOid;
4624         constrobj.objectSubId = 0;
4625         trigobj.classId = TriggerRelationId;
4626         trigobj.objectSubId = 0;
4627
4628         /* Make changes-so-far visible */
4629         CommandCounterIncrement();
4630
4631         /*
4632          * Build and execute a CREATE CONSTRAINT TRIGGER statement for the CHECK
4633          * action for both INSERTs and UPDATEs on the referencing table.
4634          */
4635         CreateFKCheckTrigger(myRel, fkconstraint, &constrobj, &trigobj, true);
4636         CreateFKCheckTrigger(myRel, fkconstraint, &constrobj, &trigobj, false);
4637
4638         /*
4639          * Build and execute a CREATE CONSTRAINT TRIGGER statement for the ON
4640          * DELETE action on the referenced table.
4641          */
4642         fk_trigger = makeNode(CreateTrigStmt);
4643         fk_trigger->trigname = fkconstraint->constr_name;
4644         fk_trigger->relation = fkconstraint->pktable;
4645         fk_trigger->before = false;
4646         fk_trigger->row = true;
4647         fk_trigger->actions[0] = 'd';
4648         fk_trigger->actions[1] = '\0';
4649
4650         fk_trigger->isconstraint = true;
4651         fk_trigger->constrrel = myRel;
4652         switch (fkconstraint->fk_del_action)
4653         {
4654                 case FKCONSTR_ACTION_NOACTION:
4655                         fk_trigger->deferrable = fkconstraint->deferrable;
4656                         fk_trigger->initdeferred = fkconstraint->initdeferred;
4657                         fk_trigger->funcname = SystemFuncName("RI_FKey_noaction_del");
4658                         break;
4659                 case FKCONSTR_ACTION_RESTRICT:
4660                         fk_trigger->deferrable = false;
4661                         fk_trigger->initdeferred = false;
4662                         fk_trigger->funcname = SystemFuncName("RI_FKey_restrict_del");
4663                         break;
4664                 case FKCONSTR_ACTION_CASCADE:
4665                         fk_trigger->deferrable = false;
4666                         fk_trigger->initdeferred = false;
4667                         fk_trigger->funcname = SystemFuncName("RI_FKey_cascade_del");
4668                         break;
4669                 case FKCONSTR_ACTION_SETNULL:
4670                         fk_trigger->deferrable = false;
4671                         fk_trigger->initdeferred = false;
4672                         fk_trigger->funcname = SystemFuncName("RI_FKey_setnull_del");
4673                         break;
4674                 case FKCONSTR_ACTION_SETDEFAULT:
4675                         fk_trigger->deferrable = false;
4676                         fk_trigger->initdeferred = false;
4677                         fk_trigger->funcname = SystemFuncName("RI_FKey_setdefault_del");
4678                         break;
4679                 default:
4680                         elog(ERROR, "unrecognized FK action type: %d",
4681                                  (int) fkconstraint->fk_del_action);
4682                         break;
4683         }
4684
4685         fk_trigger->args = NIL;
4686         fk_trigger->args = lappend(fk_trigger->args,
4687                                                            makeString(fkconstraint->constr_name));
4688         fk_trigger->args = lappend(fk_trigger->args,
4689                                                            makeString(myRel->relname));
4690         fk_trigger->args = lappend(fk_trigger->args,
4691                                                            makeString(fkconstraint->pktable->relname));
4692         fk_trigger->args = lappend(fk_trigger->args,
4693                                 makeString(fkMatchTypeToString(fkconstraint->fk_matchtype)));
4694         forboth(fk_attr, fkconstraint->fk_attrs,
4695                         pk_attr, fkconstraint->pk_attrs)
4696         {
4697                 fk_trigger->args = lappend(fk_trigger->args, lfirst(fk_attr));
4698                 fk_trigger->args = lappend(fk_trigger->args, lfirst(pk_attr));
4699         }
4700
4701         trigobj.objectId = CreateTrigger(fk_trigger, true);
4702
4703         /* Register dependency from trigger to constraint */
4704         recordDependencyOn(&trigobj, &constrobj, DEPENDENCY_INTERNAL);
4705
4706         /* Make changes-so-far visible */
4707         CommandCounterIncrement();
4708
4709         /*
4710          * Build and execute a CREATE CONSTRAINT TRIGGER statement for the ON
4711          * UPDATE action on the referenced table.
4712          */
4713         fk_trigger = makeNode(CreateTrigStmt);
4714         fk_trigger->trigname = fkconstraint->constr_name;
4715         fk_trigger->relation = fkconstraint->pktable;
4716         fk_trigger->before = false;
4717         fk_trigger->row = true;
4718         fk_trigger->actions[0] = 'u';
4719         fk_trigger->actions[1] = '\0';
4720         fk_trigger->isconstraint = true;
4721         fk_trigger->constrrel = myRel;
4722         switch (fkconstraint->fk_upd_action)
4723         {
4724                 case FKCONSTR_ACTION_NOACTION:
4725                         fk_trigger->deferrable = fkconstraint->deferrable;
4726                         fk_trigger->initdeferred = fkconstraint->initdeferred;
4727                         fk_trigger->funcname = SystemFuncName("RI_FKey_noaction_upd");
4728                         break;
4729                 case FKCONSTR_ACTION_RESTRICT:
4730                         fk_trigger->deferrable = false;
4731                         fk_trigger->initdeferred = false;
4732                         fk_trigger->funcname = SystemFuncName("RI_FKey_restrict_upd");
4733                         break;
4734                 case FKCONSTR_ACTION_CASCADE:
4735                         fk_trigger->deferrable = false;
4736                         fk_trigger->initdeferred = false;
4737                         fk_trigger->funcname = SystemFuncName("RI_FKey_cascade_upd");
4738                         break;
4739                 case FKCONSTR_ACTION_SETNULL:
4740                         fk_trigger->deferrable = false;
4741                         fk_trigger->initdeferred = false;
4742                         fk_trigger->funcname = SystemFuncName("RI_FKey_setnull_upd");
4743                         break;
4744                 case FKCONSTR_ACTION_SETDEFAULT:
4745                         fk_trigger->deferrable = false;
4746                         fk_trigger->initdeferred = false;
4747                         fk_trigger->funcname = SystemFuncName("RI_FKey_setdefault_upd");
4748                         break;
4749                 default:
4750                         elog(ERROR, "unrecognized FK action type: %d",
4751                                  (int) fkconstraint->fk_upd_action);
4752                         break;
4753         }
4754
4755         fk_trigger->args = NIL;
4756         fk_trigger->args = lappend(fk_trigger->args,
4757                                                            makeString(fkconstraint->constr_name));
4758         fk_trigger->args = lappend(fk_trigger->args,
4759                                                            makeString(myRel->relname));
4760         fk_trigger->args = lappend(fk_trigger->args,
4761                                                            makeString(fkconstraint->pktable->relname));
4762         fk_trigger->args = lappend(fk_trigger->args,
4763                                 makeString(fkMatchTypeToString(fkconstraint->fk_matchtype)));
4764         forboth(fk_attr, fkconstraint->fk_attrs,
4765                         pk_attr, fkconstraint->pk_attrs)
4766         {
4767                 fk_trigger->args = lappend(fk_trigger->args, lfirst(fk_attr));
4768                 fk_trigger->args = lappend(fk_trigger->args, lfirst(pk_attr));
4769         }
4770
4771         trigobj.objectId = CreateTrigger(fk_trigger, true);
4772
4773         /* Register dependency from trigger to constraint */
4774         recordDependencyOn(&trigobj, &constrobj, DEPENDENCY_INTERNAL);
4775 }
4776
4777 /*
4778  * fkMatchTypeToString -
4779  *        convert FKCONSTR_MATCH_xxx code to string to use in trigger args
4780  */
4781 static char *
4782 fkMatchTypeToString(char match_type)
4783 {
4784         switch (match_type)
4785         {
4786                 case FKCONSTR_MATCH_FULL:
4787                         return pstrdup("FULL");
4788                 case FKCONSTR_MATCH_PARTIAL:
4789                         return pstrdup("PARTIAL");
4790                 case FKCONSTR_MATCH_UNSPECIFIED:
4791                         return pstrdup("UNSPECIFIED");
4792                 default:
4793                         elog(ERROR, "unrecognized match type: %d",
4794                                  (int) match_type);
4795         }
4796         return NULL;                            /* can't get here */
4797 }
4798
4799 /*
4800  * ALTER TABLE DROP CONSTRAINT
4801  */
4802 static void
4803 ATPrepDropConstraint(List **wqueue, Relation rel,
4804                                          bool recurse, AlterTableCmd *cmd)
4805 {
4806         /*
4807          * We don't want errors or noise from child tables, so we have to pass
4808          * down a modified command.
4809          */
4810         if (recurse)
4811         {
4812                 AlterTableCmd *childCmd = copyObject(cmd);
4813
4814                 childCmd->subtype = AT_DropConstraintQuietly;
4815                 ATSimpleRecursion(wqueue, rel, childCmd, recurse);
4816         }
4817 }
4818
4819 static void
4820 ATExecDropConstraint(Relation rel, const char *constrName,
4821                                          DropBehavior behavior, bool quiet)
4822 {
4823         int                     deleted;
4824
4825         deleted = RemoveRelConstraints(rel, constrName, behavior);
4826
4827         if (!quiet)
4828         {
4829                 /* If zero constraints deleted, complain */
4830                 if (deleted == 0)
4831                         ereport(ERROR,
4832                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
4833                                          errmsg("constraint \"%s\" does not exist",
4834                                                         constrName)));
4835                 /* Otherwise if more than one constraint deleted, notify */
4836                 else if (deleted > 1)
4837                         ereport(NOTICE,
4838                                         (errmsg("multiple constraints named \"%s\" were dropped",
4839                                                         constrName)));
4840         }
4841 }
4842
4843 /*
4844  * ALTER COLUMN TYPE
4845  */
4846 static void
4847 ATPrepAlterColumnType(List **wqueue,
4848                                           AlteredTableInfo *tab, Relation rel,
4849                                           bool recurse, bool recursing,
4850                                           AlterTableCmd *cmd)
4851 {
4852         char       *colName = cmd->name;
4853         TypeName   *typename = (TypeName *) cmd->def;
4854         HeapTuple       tuple;
4855         Form_pg_attribute attTup;
4856         AttrNumber      attnum;
4857         Oid                     targettype;
4858         Node       *transform;
4859         NewColumnValue *newval;
4860         ParseState *pstate = make_parsestate(NULL);
4861
4862         /* lookup the attribute so we can check inheritance status */
4863         tuple = SearchSysCacheAttName(RelationGetRelid(rel), colName);
4864         if (!HeapTupleIsValid(tuple))
4865                 ereport(ERROR,
4866                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
4867                                  errmsg("column \"%s\" of relation \"%s\" does not exist",
4868                                                 colName, RelationGetRelationName(rel))));
4869         attTup = (Form_pg_attribute) GETSTRUCT(tuple);
4870         attnum = attTup->attnum;
4871
4872         /* Can't alter a system attribute */
4873         if (attnum <= 0)
4874                 ereport(ERROR,
4875                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
4876                                  errmsg("cannot alter system column \"%s\"",
4877                                                 colName)));
4878
4879         /* Don't alter inherited columns */
4880         if (attTup->attinhcount > 0 && !recursing)
4881                 ereport(ERROR,
4882                                 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
4883                                  errmsg("cannot alter inherited column \"%s\"",
4884                                                 colName)));
4885
4886         /* Look up the target type */
4887         targettype = typenameTypeId(NULL, typename);
4888
4889         /* make sure datatype is legal for a column */
4890         CheckAttributeType(colName, targettype);
4891
4892         /*
4893          * Set up an expression to transform the old data value to the new type.
4894          * If a USING option was given, transform and use that expression, else
4895          * just take the old value and try to coerce it.  We do this first so that
4896          * type incompatibility can be detected before we waste effort, and
4897          * because we need the expression to be parsed against the original table
4898          * rowtype.
4899          */
4900         if (cmd->transform)
4901         {
4902                 RangeTblEntry *rte;
4903
4904                 /* Expression must be able to access vars of old table */
4905                 rte = addRangeTableEntryForRelation(pstate,
4906                                                                                         rel,
4907                                                                                         NULL,
4908                                                                                         false,
4909                                                                                         true);
4910                 addRTEtoQuery(pstate, rte, false, true, true);
4911
4912                 transform = transformExpr(pstate, cmd->transform);
4913
4914                 /* It can't return a set */
4915                 if (expression_returns_set(transform))
4916                         ereport(ERROR,
4917                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
4918                                          errmsg("transform expression must not return a set")));
4919
4920                 /* No subplans or aggregates, either... */
4921                 if (pstate->p_hasSubLinks)
4922                         ereport(ERROR,
4923                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
4924                                          errmsg("cannot use subquery in transform expression")));
4925                 if (pstate->p_hasAggs)
4926                         ereport(ERROR,
4927                                         (errcode(ERRCODE_GROUPING_ERROR),
4928                         errmsg("cannot use aggregate function in transform expression")));
4929         }
4930         else
4931         {
4932                 transform = (Node *) makeVar(1, attnum,
4933                                                                          attTup->atttypid, attTup->atttypmod,
4934                                                                          0);
4935         }
4936
4937         transform = coerce_to_target_type(pstate,
4938                                                                           transform, exprType(transform),
4939                                                                           targettype, typename->typmod,
4940                                                                           COERCION_ASSIGNMENT,
4941                                                                           COERCE_IMPLICIT_CAST);
4942         if (transform == NULL)
4943                 ereport(ERROR,
4944                                 (errcode(ERRCODE_DATATYPE_MISMATCH),
4945                                  errmsg("column \"%s\" cannot be cast to type \"%s\"",
4946                                                 colName, TypeNameToString(typename))));
4947
4948         /*
4949          * Add a work queue item to make ATRewriteTable update the column
4950          * contents.
4951          */
4952         newval = (NewColumnValue *) palloc0(sizeof(NewColumnValue));
4953         newval->attnum = attnum;
4954         newval->expr = (Expr *) transform;
4955
4956         tab->newvals = lappend(tab->newvals, newval);
4957
4958         ReleaseSysCache(tuple);
4959
4960         /*
4961          * The recursion case is handled by ATSimpleRecursion.  However, if we are
4962          * told not to recurse, there had better not be any child tables; else the
4963          * alter would put them out of step.
4964          */
4965         if (recurse)
4966                 ATSimpleRecursion(wqueue, rel, cmd, recurse);
4967         else if (!recursing &&
4968                          find_inheritance_children(RelationGetRelid(rel)) != NIL)
4969                 ereport(ERROR,
4970                                 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
4971                                  errmsg("type of inherited column \"%s\" must be changed in child tables too",
4972                                                 colName)));
4973 }
4974
4975 static void
4976 ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
4977                                           const char *colName, TypeName *typename)
4978 {
4979         HeapTuple       heapTup;
4980         Form_pg_attribute attTup;
4981         AttrNumber      attnum;
4982         HeapTuple       typeTuple;
4983         Form_pg_type tform;
4984         Oid                     targettype;
4985         Node       *defaultexpr;
4986         Relation        attrelation;
4987         Relation        depRel;
4988         ScanKeyData key[3];
4989         SysScanDesc scan;
4990         HeapTuple       depTup;
4991
4992         attrelation = heap_open(AttributeRelationId, RowExclusiveLock);
4993
4994         /* Look up the target column */
4995         heapTup = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
4996         if (!HeapTupleIsValid(heapTup))         /* shouldn't happen */
4997                 ereport(ERROR,
4998                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
4999                                  errmsg("column \"%s\" of relation \"%s\" does not exist",
5000                                                 colName, RelationGetRelationName(rel))));
5001         attTup = (Form_pg_attribute) GETSTRUCT(heapTup);
5002         attnum = attTup->attnum;
5003
5004         /* Check for multiple ALTER TYPE on same column --- can't cope */
5005         if (attTup->atttypid != tab->oldDesc->attrs[attnum - 1]->atttypid ||
5006                 attTup->atttypmod != tab->oldDesc->attrs[attnum - 1]->atttypmod)
5007                 ereport(ERROR,
5008                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
5009                                  errmsg("cannot alter type of column \"%s\" twice",
5010                                                 colName)));
5011
5012         /* Look up the target type (should not fail, since prep found it) */
5013         typeTuple = typenameType(NULL, typename);
5014         tform = (Form_pg_type) GETSTRUCT(typeTuple);
5015         targettype = HeapTupleGetOid(typeTuple);
5016
5017         /*
5018          * If there is a default expression for the column, get it and ensure we
5019          * can coerce it to the new datatype.  (We must do this before changing
5020          * the column type, because build_column_default itself will try to
5021          * coerce, and will not issue the error message we want if it fails.)
5022          *
5023          * We remove any implicit coercion steps at the top level of the old
5024          * default expression; this has been agreed to satisfy the principle of
5025          * least surprise.      (The conversion to the new column type should act like
5026          * it started from what the user sees as the stored expression, and the
5027          * implicit coercions aren't going to be shown.)
5028          */
5029         if (attTup->atthasdef)
5030         {
5031                 defaultexpr = build_column_default(rel, attnum);
5032                 Assert(defaultexpr);
5033                 defaultexpr = strip_implicit_coercions(defaultexpr);
5034                 defaultexpr = coerce_to_target_type(NULL,               /* no UNKNOWN params */
5035                                                                                   defaultexpr, exprType(defaultexpr),
5036                                                                                         targettype, typename->typmod,
5037                                                                                         COERCION_ASSIGNMENT,
5038                                                                                         COERCE_IMPLICIT_CAST);
5039                 if (defaultexpr == NULL)
5040                         ereport(ERROR,
5041                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
5042                         errmsg("default for column \"%s\" cannot be cast to type \"%s\"",
5043                                    colName, TypeNameToString(typename))));
5044         }
5045         else
5046                 defaultexpr = NULL;
5047
5048         /*
5049          * Find everything that depends on the column (constraints, indexes, etc),
5050          * and record enough information to let us recreate the objects.
5051          *
5052          * The actual recreation does not happen here, but only after we have
5053          * performed all the individual ALTER TYPE operations.  We have to save
5054          * the info before executing ALTER TYPE, though, else the deparser will
5055          * get confused.
5056          *
5057          * There could be multiple entries for the same object, so we must check
5058          * to ensure we process each one only once.  Note: we assume that an index
5059          * that implements a constraint will not show a direct dependency on the
5060          * column.
5061          */
5062         depRel = heap_open(DependRelationId, RowExclusiveLock);
5063
5064         ScanKeyInit(&key[0],
5065                                 Anum_pg_depend_refclassid,
5066                                 BTEqualStrategyNumber, F_OIDEQ,
5067                                 ObjectIdGetDatum(RelationRelationId));
5068         ScanKeyInit(&key[1],
5069                                 Anum_pg_depend_refobjid,
5070                                 BTEqualStrategyNumber, F_OIDEQ,
5071                                 ObjectIdGetDatum(RelationGetRelid(rel)));
5072         ScanKeyInit(&key[2],
5073                                 Anum_pg_depend_refobjsubid,
5074                                 BTEqualStrategyNumber, F_INT4EQ,
5075                                 Int32GetDatum((int32) attnum));
5076
5077         scan = systable_beginscan(depRel, DependReferenceIndexId, true,
5078                                                           SnapshotNow, 3, key);
5079
5080         while (HeapTupleIsValid(depTup = systable_getnext(scan)))
5081         {
5082                 Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(depTup);
5083                 ObjectAddress foundObject;
5084
5085                 /* We don't expect any PIN dependencies on columns */
5086                 if (foundDep->deptype == DEPENDENCY_PIN)
5087                         elog(ERROR, "cannot alter type of a pinned column");
5088
5089                 foundObject.classId = foundDep->classid;
5090                 foundObject.objectId = foundDep->objid;
5091                 foundObject.objectSubId = foundDep->objsubid;
5092
5093                 switch (getObjectClass(&foundObject))
5094                 {
5095                         case OCLASS_CLASS:
5096                                 {
5097                                         char            relKind = get_rel_relkind(foundObject.objectId);
5098
5099                                         if (relKind == RELKIND_INDEX)
5100                                         {
5101                                                 Assert(foundObject.objectSubId == 0);
5102                                                 if (!list_member_oid(tab->changedIndexOids, foundObject.objectId))
5103                                                 {
5104                                                         tab->changedIndexOids = lappend_oid(tab->changedIndexOids,
5105                                                                                                            foundObject.objectId);
5106                                                         tab->changedIndexDefs = lappend(tab->changedIndexDefs,
5107                                                            pg_get_indexdef_string(foundObject.objectId));
5108                                                 }
5109                                         }
5110                                         else if (relKind == RELKIND_SEQUENCE)
5111                                         {
5112                                                 /*
5113                                                  * This must be a SERIAL column's sequence.  We need
5114                                                  * not do anything to it.
5115                                                  */
5116                                                 Assert(foundObject.objectSubId == 0);
5117                                         }
5118                                         else
5119                                         {
5120                                                 /* Not expecting any other direct dependencies... */
5121                                                 elog(ERROR, "unexpected object depending on column: %s",
5122                                                          getObjectDescription(&foundObject));
5123                                         }
5124                                         break;
5125                                 }
5126
5127                         case OCLASS_CONSTRAINT:
5128                                 Assert(foundObject.objectSubId == 0);
5129                                 if (!list_member_oid(tab->changedConstraintOids,
5130                                                                          foundObject.objectId))
5131                                 {
5132                                         char *defstring = pg_get_constraintdef_string(foundObject.objectId);
5133
5134                                         /*
5135                                          * Put NORMAL dependencies at the front of the list and
5136                                          * AUTO dependencies at the back.  This makes sure that
5137                                          * foreign-key constraints depending on this column will
5138                                          * be dropped before unique or primary-key constraints of
5139                                          * the column; which we must have because the FK
5140                                          * constraints depend on the indexes belonging to the
5141                                          * unique constraints.
5142                                          */
5143                                         if (foundDep->deptype == DEPENDENCY_NORMAL)
5144                                         {
5145                                                 tab->changedConstraintOids =
5146                                                         lcons_oid(foundObject.objectId,
5147                                                                           tab->changedConstraintOids);
5148                                                 tab->changedConstraintDefs =
5149                                                         lcons(defstring,
5150                                                                   tab->changedConstraintDefs);
5151                                         }
5152                                         else
5153                                         {
5154                                                 tab->changedConstraintOids =
5155                                                         lappend_oid(tab->changedConstraintOids,
5156                                                                                 foundObject.objectId);
5157                                                 tab->changedConstraintDefs =
5158                                                         lappend(tab->changedConstraintDefs,
5159                                                                         defstring);
5160                                         }
5161                                 }
5162                                 break;
5163
5164                         case OCLASS_REWRITE:
5165                                 /* XXX someday see if we can cope with revising views */
5166                                 ereport(ERROR,
5167                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
5168                                                  errmsg("cannot alter type of a column used by a view or rule"),
5169                                                  errdetail("%s depends on column \"%s\"",
5170                                                                    getObjectDescription(&foundObject),
5171                                                                    colName)));
5172                                 break;
5173
5174                         case OCLASS_DEFAULT:
5175
5176                                 /*
5177                                  * Ignore the column's default expression, since we will fix
5178                                  * it below.
5179                                  */
5180                                 Assert(defaultexpr);
5181                                 break;
5182
5183                         case OCLASS_PROC:
5184                         case OCLASS_TYPE:
5185                         case OCLASS_CAST:
5186                         case OCLASS_CONVERSION:
5187                         case OCLASS_LANGUAGE:
5188                         case OCLASS_OPERATOR:
5189                         case OCLASS_OPCLASS:
5190                         case OCLASS_TRIGGER:
5191                         case OCLASS_SCHEMA:
5192
5193                                 /*
5194                                  * We don't expect any of these sorts of objects to depend on
5195                                  * a column.
5196                                  */
5197                                 elog(ERROR, "unexpected object depending on column: %s",
5198                                          getObjectDescription(&foundObject));
5199                                 break;
5200
5201                         default:
5202                                 elog(ERROR, "unrecognized object class: %u",
5203                                          foundObject.classId);
5204                 }
5205         }
5206
5207         systable_endscan(scan);
5208
5209         /*
5210          * Now scan for dependencies of this column on other things.  The only
5211          * thing we should find is the dependency on the column datatype, which we
5212          * want to remove.
5213          */
5214         ScanKeyInit(&key[0],
5215                                 Anum_pg_depend_classid,
5216                                 BTEqualStrategyNumber, F_OIDEQ,
5217                                 ObjectIdGetDatum(RelationRelationId));
5218         ScanKeyInit(&key[1],
5219                                 Anum_pg_depend_objid,
5220                                 BTEqualStrategyNumber, F_OIDEQ,
5221                                 ObjectIdGetDatum(RelationGetRelid(rel)));
5222         ScanKeyInit(&key[2],
5223                                 Anum_pg_depend_objsubid,
5224                                 BTEqualStrategyNumber, F_INT4EQ,
5225                                 Int32GetDatum((int32) attnum));
5226
5227         scan = systable_beginscan(depRel, DependDependerIndexId, true,
5228                                                           SnapshotNow, 3, key);
5229
5230         while (HeapTupleIsValid(depTup = systable_getnext(scan)))
5231         {
5232                 Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(depTup);
5233
5234                 if (foundDep->deptype != DEPENDENCY_NORMAL)
5235                         elog(ERROR, "found unexpected dependency type '%c'",
5236                                  foundDep->deptype);
5237                 if (foundDep->refclassid != TypeRelationId ||
5238                         foundDep->refobjid != attTup->atttypid)
5239                         elog(ERROR, "found unexpected dependency for column");
5240
5241                 simple_heap_delete(depRel, &depTup->t_self);
5242         }
5243
5244         systable_endscan(scan);
5245
5246         heap_close(depRel, RowExclusiveLock);
5247
5248         /*
5249          * Here we go --- change the recorded column type.      (Note heapTup is a
5250          * copy of the syscache entry, so okay to scribble on.)
5251          */
5252         attTup->atttypid = targettype;
5253         attTup->atttypmod = typename->typmod;
5254         attTup->attndims = list_length(typename->arrayBounds);
5255         attTup->attlen = tform->typlen;
5256         attTup->attbyval = tform->typbyval;
5257         attTup->attalign = tform->typalign;
5258         attTup->attstorage = tform->typstorage;
5259
5260         ReleaseSysCache(typeTuple);
5261
5262         simple_heap_update(attrelation, &heapTup->t_self, heapTup);
5263
5264         /* keep system catalog indexes current */
5265         CatalogUpdateIndexes(attrelation, heapTup);
5266
5267         heap_close(attrelation, RowExclusiveLock);
5268
5269         /* Install dependency on new datatype */
5270         add_column_datatype_dependency(RelationGetRelid(rel), attnum, targettype);
5271
5272         /*
5273          * Drop any pg_statistic entry for the column, since it's now wrong type
5274          */
5275         RemoveStatistics(RelationGetRelid(rel), attnum);
5276
5277         /*
5278          * Update the default, if present, by brute force --- remove and re-add
5279          * the default.  Probably unsafe to take shortcuts, since the new version
5280          * may well have additional dependencies.  (It's okay to do this now,
5281          * rather than after other ALTER TYPE commands, since the default won't
5282          * depend on other column types.)
5283          */
5284         if (defaultexpr)
5285         {
5286                 /* Must make new row visible since it will be updated again */
5287                 CommandCounterIncrement();
5288
5289                 /*
5290                  * We use RESTRICT here for safety, but at present we do not expect
5291                  * anything to depend on the default.
5292                  */
5293                 RemoveAttrDefault(RelationGetRelid(rel), attnum, DROP_RESTRICT, true);
5294
5295                 StoreAttrDefault(rel, attnum, nodeToString(defaultexpr));
5296         }
5297
5298         /* Cleanup */
5299         heap_freetuple(heapTup);
5300 }
5301
5302 /*
5303  * Cleanup after we've finished all the ALTER TYPE operations for a
5304  * particular relation.  We have to drop and recreate all the indexes
5305  * and constraints that depend on the altered columns.
5306  */
5307 static void
5308 ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab)
5309 {
5310         ObjectAddress obj;
5311         ListCell   *l;
5312
5313         /*
5314          * Re-parse the index and constraint definitions, and attach them to the
5315          * appropriate work queue entries.      We do this before dropping because in
5316          * the case of a FOREIGN KEY constraint, we might not yet have exclusive
5317          * lock on the table the constraint is attached to, and we need to get
5318          * that before dropping.  It's safe because the parser won't actually look
5319          * at the catalogs to detect the existing entry.
5320          */
5321         foreach(l, tab->changedIndexDefs)
5322                 ATPostAlterTypeParse((char *) lfirst(l), wqueue);
5323         foreach(l, tab->changedConstraintDefs)
5324                 ATPostAlterTypeParse((char *) lfirst(l), wqueue);
5325
5326         /*
5327          * Now we can drop the existing constraints and indexes --- constraints
5328          * first, since some of them might depend on the indexes.  In fact, we
5329          * have to delete FOREIGN KEY constraints before UNIQUE constraints,
5330          * but we already ordered the constraint list to ensure that would happen.
5331          * It should be okay to use DROP_RESTRICT here, since nothing else should
5332          * be depending on these objects.
5333          */
5334         foreach(l, tab->changedConstraintOids)
5335         {
5336                 obj.classId = ConstraintRelationId;
5337                 obj.objectId = lfirst_oid(l);
5338                 obj.objectSubId = 0;
5339                 performDeletion(&obj, DROP_RESTRICT);
5340         }
5341
5342         foreach(l, tab->changedIndexOids)
5343         {
5344                 obj.classId = RelationRelationId;
5345                 obj.objectId = lfirst_oid(l);
5346                 obj.objectSubId = 0;
5347                 performDeletion(&obj, DROP_RESTRICT);
5348         }
5349
5350         /*
5351          * The objects will get recreated during subsequent passes over the work
5352          * queue.
5353          */
5354 }
5355
5356 static void
5357 ATPostAlterTypeParse(char *cmd, List **wqueue)
5358 {
5359         List       *raw_parsetree_list;
5360         List       *querytree_list;
5361         ListCell   *list_item;
5362
5363         /*
5364          * We expect that we only have to do raw parsing and parse analysis, not
5365          * any rule rewriting, since these will all be utility statements.
5366          */
5367         raw_parsetree_list = raw_parser(cmd);
5368         querytree_list = NIL;
5369         foreach(list_item, raw_parsetree_list)
5370         {
5371                 Node       *parsetree = (Node *) lfirst(list_item);
5372
5373                 querytree_list = list_concat(querytree_list,
5374                                                                          parse_analyze(parsetree, cmd, NULL, 0));
5375         }
5376
5377         /*
5378          * Attach each generated command to the proper place in the work queue.
5379          * Note this could result in creation of entirely new work-queue entries.
5380          */
5381         foreach(list_item, querytree_list)
5382         {
5383                 Query      *query = (Query *) lfirst(list_item);
5384                 Relation        rel;
5385                 AlteredTableInfo *tab;
5386
5387                 Assert(IsA(query, Query));
5388                 Assert(query->commandType == CMD_UTILITY);
5389                 switch (nodeTag(query->utilityStmt))
5390                 {
5391                         case T_IndexStmt:
5392                                 {
5393                                         IndexStmt  *stmt = (IndexStmt *) query->utilityStmt;
5394                                         AlterTableCmd *newcmd;
5395
5396                                         rel = relation_openrv(stmt->relation, AccessExclusiveLock);
5397                                         tab = ATGetQueueEntry(wqueue, rel);
5398                                         newcmd = makeNode(AlterTableCmd);
5399                                         newcmd->subtype = AT_ReAddIndex;
5400                                         newcmd->def = (Node *) stmt;
5401                                         tab->subcmds[AT_PASS_OLD_INDEX] =
5402                                                 lappend(tab->subcmds[AT_PASS_OLD_INDEX], newcmd);
5403                                         relation_close(rel, NoLock);
5404                                         break;
5405                                 }
5406                         case T_AlterTableStmt:
5407                                 {
5408                                         AlterTableStmt *stmt = (AlterTableStmt *) query->utilityStmt;
5409                                         ListCell   *lcmd;
5410
5411                                         rel = relation_openrv(stmt->relation, AccessExclusiveLock);
5412                                         tab = ATGetQueueEntry(wqueue, rel);
5413                                         foreach(lcmd, stmt->cmds)
5414                                         {
5415                                                 AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lcmd);
5416
5417                                                 switch (cmd->subtype)
5418                                                 {
5419                                                         case AT_AddIndex:
5420                                                                 cmd->subtype = AT_ReAddIndex;
5421                                                                 tab->subcmds[AT_PASS_OLD_INDEX] =
5422                                                                         lappend(tab->subcmds[AT_PASS_OLD_INDEX], cmd);
5423                                                                 break;
5424                                                         case AT_AddConstraint:
5425                                                                 tab->subcmds[AT_PASS_OLD_CONSTR] =
5426                                                                         lappend(tab->subcmds[AT_PASS_OLD_CONSTR], cmd);
5427                                                                 break;
5428                                                         default:
5429                                                                 elog(ERROR, "unexpected statement type: %d",
5430                                                                          (int) cmd->subtype);
5431                                                 }
5432                                         }
5433                                         relation_close(rel, NoLock);
5434                                         break;
5435                                 }
5436                         default:
5437                                 elog(ERROR, "unexpected statement type: %d",
5438                                          (int) nodeTag(query->utilityStmt));
5439                 }
5440         }
5441 }
5442
5443
5444 /*
5445  * ALTER TABLE OWNER
5446  *
5447  * recursing is true if we are recursing from a table to its indexes or
5448  * toast table.  We don't allow the ownership of those things to be
5449  * changed separately from the parent table.  Also, we can skip permission
5450  * checks (this is necessary not just an optimization, else we'd fail to
5451  * handle toast tables properly).
5452  */
5453 void
5454 ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing)
5455 {
5456         Relation        target_rel;
5457         Relation        class_rel;
5458         HeapTuple       tuple;
5459         Form_pg_class tuple_class;
5460
5461         /*
5462          * Get exclusive lock till end of transaction on the target table. Use
5463          * relation_open so that we can work on indexes and sequences.
5464          */
5465         target_rel = relation_open(relationOid, AccessExclusiveLock);
5466
5467         /* Get its pg_class tuple, too */
5468         class_rel = heap_open(RelationRelationId, RowExclusiveLock);
5469
5470         tuple = SearchSysCache(RELOID,
5471                                                    ObjectIdGetDatum(relationOid),
5472                                                    0, 0, 0);
5473         if (!HeapTupleIsValid(tuple))
5474                 elog(ERROR, "cache lookup failed for relation %u", relationOid);
5475         tuple_class = (Form_pg_class) GETSTRUCT(tuple);
5476
5477         /* Can we change the ownership of this tuple? */
5478         switch (tuple_class->relkind)
5479         {
5480                 case RELKIND_RELATION:
5481                 case RELKIND_VIEW:
5482                 case RELKIND_SEQUENCE:
5483                         /* ok to change owner */
5484                         break;
5485                 case RELKIND_INDEX:
5486                         if (!recursing)
5487                         {
5488                                 /*
5489                                  * Because ALTER INDEX OWNER used to be allowed, and in fact
5490                                  * is generated by old versions of pg_dump, we give a warning
5491                                  * and do nothing rather than erroring out.  Also, to avoid
5492                                  * unnecessary chatter while restoring those old dumps, say
5493                                  * nothing at all if the command would be a no-op anyway.
5494                                  */
5495                                 if (tuple_class->relowner != newOwnerId)
5496                                         ereport(WARNING,
5497                                                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
5498                                                          errmsg("cannot change owner of index \"%s\"",
5499                                                                         NameStr(tuple_class->relname)),
5500                                                          errhint("Change the ownership of the index's table, instead.")));
5501                                 /* quick hack to exit via the no-op path */
5502                                 newOwnerId = tuple_class->relowner;
5503                         }
5504                         break;
5505                 case RELKIND_TOASTVALUE:
5506                         if (recursing)
5507                                 break;
5508                         /* FALL THRU */
5509                 default:
5510                         ereport(ERROR,
5511                                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
5512                                          errmsg("\"%s\" is not a table, view, or sequence",
5513                                                         NameStr(tuple_class->relname))));
5514         }
5515
5516         /*
5517          * If the new owner is the same as the existing owner, consider the
5518          * command to have succeeded.  This is for dump restoration purposes.
5519          */
5520         if (tuple_class->relowner != newOwnerId)
5521         {
5522                 Datum           repl_val[Natts_pg_class];
5523                 char            repl_null[Natts_pg_class];
5524                 char            repl_repl[Natts_pg_class];
5525                 Acl                *newAcl;
5526                 Datum           aclDatum;
5527                 bool            isNull;
5528                 HeapTuple       newtuple;
5529
5530                 /* skip permission checks when recursing to index or toast table */
5531                 if (!recursing)
5532                 {
5533                         /* Superusers can always do it */
5534                         if (!superuser())
5535                         {
5536                                 Oid                     namespaceOid = tuple_class->relnamespace;
5537                                 AclResult       aclresult;
5538
5539                                 /* Otherwise, must be owner of the existing object */
5540                                 if (!pg_class_ownercheck(relationOid, GetUserId()))
5541                                         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
5542                                                                    RelationGetRelationName(target_rel));
5543
5544                                 /* Must be able to become new owner */
5545                                 check_is_member_of_role(GetUserId(), newOwnerId);
5546
5547                                 /* New owner must have CREATE privilege on namespace */
5548                                 aclresult = pg_namespace_aclcheck(namespaceOid, newOwnerId,
5549                                                                                                   ACL_CREATE);
5550                                 if (aclresult != ACLCHECK_OK)
5551                                         aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
5552                                                                    get_namespace_name(namespaceOid));
5553                         }
5554                 }
5555
5556                 memset(repl_null, ' ', sizeof(repl_null));
5557                 memset(repl_repl, ' ', sizeof(repl_repl));
5558
5559                 repl_repl[Anum_pg_class_relowner - 1] = 'r';
5560                 repl_val[Anum_pg_class_relowner - 1] = ObjectIdGetDatum(newOwnerId);
5561
5562                 /*
5563                  * Determine the modified ACL for the new owner.  This is only
5564                  * necessary when the ACL is non-null.
5565                  */
5566                 aclDatum = SysCacheGetAttr(RELOID, tuple,
5567                                                                    Anum_pg_class_relacl,
5568                                                                    &isNull);
5569                 if (!isNull)
5570                 {
5571                         newAcl = aclnewowner(DatumGetAclP(aclDatum),
5572                                                                  tuple_class->relowner, newOwnerId);
5573                         repl_repl[Anum_pg_class_relacl - 1] = 'r';
5574                         repl_val[Anum_pg_class_relacl - 1] = PointerGetDatum(newAcl);
5575                 }
5576
5577                 newtuple = heap_modifytuple(tuple, RelationGetDescr(class_rel), repl_val, repl_null, repl_repl);
5578
5579                 simple_heap_update(class_rel, &newtuple->t_self, newtuple);
5580                 CatalogUpdateIndexes(class_rel, newtuple);
5581
5582                 heap_freetuple(newtuple);
5583
5584                 /* Update owner dependency reference */
5585                 changeDependencyOnOwner(RelationRelationId, relationOid, newOwnerId);
5586
5587                 /*
5588                  * Also change the ownership of the table's rowtype, if it has one
5589                  */
5590                 if (tuple_class->relkind != RELKIND_INDEX)
5591                         AlterTypeOwnerInternal(tuple_class->reltype, newOwnerId);
5592
5593                 /*
5594                  * If we are operating on a table, also change the ownership of any
5595                  * indexes and sequences that belong to the table, as well as the
5596                  * table's toast table (if it has one)
5597                  */
5598                 if (tuple_class->relkind == RELKIND_RELATION ||
5599                         tuple_class->relkind == RELKIND_TOASTVALUE)
5600                 {
5601                         List       *index_oid_list;
5602                         ListCell   *i;
5603
5604                         /* Find all the indexes belonging to this relation */
5605                         index_oid_list = RelationGetIndexList(target_rel);
5606
5607                         /* For each index, recursively change its ownership */
5608                         foreach(i, index_oid_list)
5609                                 ATExecChangeOwner(lfirst_oid(i), newOwnerId, true);
5610
5611                         list_free(index_oid_list);
5612                 }
5613
5614                 if (tuple_class->relkind == RELKIND_RELATION)
5615                 {
5616                         /* If it has a toast table, recurse to change its ownership */
5617                         if (tuple_class->reltoastrelid != InvalidOid)
5618                                 ATExecChangeOwner(tuple_class->reltoastrelid, newOwnerId,
5619                                                                   true);
5620
5621                         /* If it has dependent sequences, recurse to change them too */
5622                         change_owner_recurse_to_sequences(relationOid, newOwnerId);
5623                 }
5624         }
5625
5626         ReleaseSysCache(tuple);
5627         heap_close(class_rel, RowExclusiveLock);
5628         relation_close(target_rel, NoLock);
5629 }
5630
5631 /*
5632  * change_owner_recurse_to_sequences
5633  *
5634  * Helper function for ATExecChangeOwner.  Examines pg_depend searching
5635  * for sequences that are dependent on serial columns, and changes their
5636  * ownership.
5637  */
5638 static void
5639 change_owner_recurse_to_sequences(Oid relationOid, Oid newOwnerId)
5640 {
5641         Relation        depRel;
5642         SysScanDesc scan;
5643         ScanKeyData key[2];
5644         HeapTuple       tup;
5645
5646         /*
5647          * SERIAL sequences are those having an internal dependency on one of the
5648          * table's columns (we don't care *which* column, exactly).
5649          */
5650         depRel = heap_open(DependRelationId, AccessShareLock);
5651
5652         ScanKeyInit(&key[0],
5653                                 Anum_pg_depend_refclassid,
5654                                 BTEqualStrategyNumber, F_OIDEQ,
5655                                 ObjectIdGetDatum(RelationRelationId));
5656         ScanKeyInit(&key[1],
5657                                 Anum_pg_depend_refobjid,
5658                                 BTEqualStrategyNumber, F_OIDEQ,
5659                                 ObjectIdGetDatum(relationOid));
5660         /* we leave refobjsubid unspecified */
5661
5662         scan = systable_beginscan(depRel, DependReferenceIndexId, true,
5663                                                           SnapshotNow, 2, key);
5664
5665         while (HeapTupleIsValid(tup = systable_getnext(scan)))
5666         {
5667                 Form_pg_depend depForm = (Form_pg_depend) GETSTRUCT(tup);
5668                 Relation        seqRel;
5669
5670                 /* skip dependencies other than internal dependencies on columns */
5671                 if (depForm->refobjsubid == 0 ||
5672                         depForm->classid != RelationRelationId ||
5673                         depForm->objsubid != 0 ||
5674                         depForm->deptype != DEPENDENCY_INTERNAL)
5675                         continue;
5676
5677                 /* Use relation_open just in case it's an index */
5678                 seqRel = relation_open(depForm->objid, AccessExclusiveLock);
5679
5680                 /* skip non-sequence relations */
5681                 if (RelationGetForm(seqRel)->relkind != RELKIND_SEQUENCE)
5682                 {
5683                         /* No need to keep the lock */
5684                         relation_close(seqRel, AccessExclusiveLock);
5685                         continue;
5686                 }
5687
5688                 /* We don't need to close the sequence while we alter it. */
5689                 ATExecChangeOwner(depForm->objid, newOwnerId, false);
5690
5691                 /* Now we can close it.  Keep the lock till end of transaction. */
5692                 relation_close(seqRel, NoLock);
5693         }
5694
5695         systable_endscan(scan);
5696
5697         relation_close(depRel, AccessShareLock);
5698 }
5699
5700 /*
5701  * ALTER TABLE CLUSTER ON
5702  *
5703  * The only thing we have to do is to change the indisclustered bits.
5704  */
5705 static void
5706 ATExecClusterOn(Relation rel, const char *indexName)
5707 {
5708         Oid                     indexOid;
5709
5710         indexOid = get_relname_relid(indexName, rel->rd_rel->relnamespace);
5711
5712         if (!OidIsValid(indexOid))
5713                 ereport(ERROR,
5714                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
5715                                  errmsg("index \"%s\" for table \"%s\" does not exist",
5716                                                 indexName, RelationGetRelationName(rel))));
5717
5718         /* Check index is valid to cluster on */
5719         check_index_is_clusterable(rel, indexOid, false);
5720
5721         /* And do the work */
5722         mark_index_clustered(rel, indexOid);
5723 }
5724
5725 /*
5726  * ALTER TABLE SET WITHOUT CLUSTER
5727  *
5728  * We have to find any indexes on the table that have indisclustered bit
5729  * set and turn it off.
5730  */
5731 static void
5732 ATExecDropCluster(Relation rel)
5733 {
5734         mark_index_clustered(rel, InvalidOid);
5735 }
5736
5737 /*
5738  * ALTER TABLE SET TABLESPACE
5739  */
5740 static void
5741 ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel, char *tablespacename)
5742 {
5743         Oid                     tablespaceId;
5744         AclResult       aclresult;
5745
5746         /* Check that the tablespace exists */
5747         tablespaceId = get_tablespace_oid(tablespacename);
5748         if (!OidIsValid(tablespaceId))
5749                 ereport(ERROR,
5750                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
5751                                  errmsg("tablespace \"%s\" does not exist", tablespacename)));
5752
5753         /* Check its permissions */
5754         aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(), ACL_CREATE);
5755         if (aclresult != ACLCHECK_OK)
5756                 aclcheck_error(aclresult, ACL_KIND_TABLESPACE, tablespacename);
5757
5758         /* Save info for Phase 3 to do the real work */
5759         if (OidIsValid(tab->newTableSpace))
5760                 ereport(ERROR,
5761                                 (errcode(ERRCODE_SYNTAX_ERROR),
5762                                  errmsg("cannot have multiple SET TABLESPACE subcommands")));
5763         tab->newTableSpace = tablespaceId;
5764 }
5765
5766 /*
5767  * ALTER TABLE/INDEX SET (...) or RESET (...)
5768  */
5769 static void
5770 ATExecSetRelOptions(Relation rel, List *defList, bool isReset)
5771 {
5772         Oid                     relid;
5773         Relation        pgclass;
5774         HeapTuple       tuple;
5775         HeapTuple       newtuple;
5776         Datum           datum;
5777         bool            isnull;
5778         Datum           newOptions;
5779         Datum           repl_val[Natts_pg_class];
5780         char            repl_null[Natts_pg_class];
5781         char            repl_repl[Natts_pg_class];
5782
5783         if (defList == NIL)
5784                 return;                                 /* nothing to do */
5785
5786         pgclass = heap_open(RelationRelationId, RowExclusiveLock);
5787
5788         /* Get the old reloptions */
5789         relid = RelationGetRelid(rel);
5790         tuple = SearchSysCache(RELOID,
5791                                                    ObjectIdGetDatum(relid),
5792                                                    0, 0, 0);
5793         if (!HeapTupleIsValid(tuple))
5794                 elog(ERROR, "cache lookup failed for relation %u", relid);
5795
5796         datum = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reloptions, &isnull);
5797
5798         /* Generate new proposed reloptions (text array) */
5799         newOptions = transformRelOptions(isnull ? (Datum) 0 : datum,
5800                                                                          defList, false, isReset);
5801
5802         /* Validate */
5803         switch (rel->rd_rel->relkind)
5804         {
5805                 case RELKIND_RELATION:
5806                 case RELKIND_TOASTVALUE:
5807                         (void) heap_reloptions(rel->rd_rel->relkind, newOptions, true);
5808                         break;
5809                 case RELKIND_INDEX:
5810                         (void) index_reloptions(rel->rd_am->amoptions, newOptions, true);
5811                         break;
5812                 default:
5813                         ereport(ERROR,
5814                                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
5815                                          errmsg("\"%s\" is not a table, index, or TOAST table",
5816                                                         RelationGetRelationName(rel))));
5817                         break;
5818         }
5819
5820         /*
5821          * All we need do here is update the pg_class row; the new options will be
5822          * propagated into relcaches during post-commit cache inval.
5823          */
5824         memset(repl_val, 0, sizeof(repl_val));
5825         memset(repl_null, ' ', sizeof(repl_null));
5826         memset(repl_repl, ' ', sizeof(repl_repl));
5827
5828         if (newOptions != (Datum) 0)
5829                 repl_val[Anum_pg_class_reloptions - 1] = newOptions;
5830         else
5831                 repl_null[Anum_pg_class_reloptions - 1] = 'n';
5832
5833         repl_repl[Anum_pg_class_reloptions - 1] = 'r';
5834
5835         newtuple = heap_modifytuple(tuple, RelationGetDescr(pgclass),
5836                                                                 repl_val, repl_null, repl_repl);
5837
5838         simple_heap_update(pgclass, &newtuple->t_self, newtuple);
5839
5840         CatalogUpdateIndexes(pgclass, newtuple);
5841
5842         heap_freetuple(newtuple);
5843
5844         ReleaseSysCache(tuple);
5845
5846         heap_close(pgclass, RowExclusiveLock);
5847 }
5848
5849 /*
5850  * Execute ALTER TABLE SET TABLESPACE for cases where there is no tuple
5851  * rewriting to be done, so we just want to copy the data as fast as possible.
5852  */
5853 static void
5854 ATExecSetTableSpace(Oid tableOid, Oid newTableSpace)
5855 {
5856         Relation        rel;
5857         Oid                     oldTableSpace;
5858         Oid                     reltoastrelid;
5859         Oid                     reltoastidxid;
5860         RelFileNode newrnode;
5861         SMgrRelation dstrel;
5862         Relation        pg_class;
5863         HeapTuple       tuple;
5864         Form_pg_class rd_rel;
5865
5866         /*
5867          * Need lock here in case we are recursing to toast table or index
5868          */
5869         rel = relation_open(tableOid, AccessExclusiveLock);
5870
5871         /*
5872          * We can never allow moving of shared or nailed-in-cache relations,
5873          * because we can't support changing their reltablespace values.
5874          */
5875         if (rel->rd_rel->relisshared || rel->rd_isnailed)
5876                 ereport(ERROR,
5877                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
5878                                  errmsg("cannot move system relation \"%s\"",
5879                                                 RelationGetRelationName(rel))));
5880
5881         /*
5882          * Don't allow moving temp tables of other backends ... their local buffer
5883          * manager is not going to cope.
5884          */
5885         if (isOtherTempNamespace(RelationGetNamespace(rel)))
5886                 ereport(ERROR,
5887                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
5888                                  errmsg("cannot move temporary tables of other sessions")));
5889
5890         /*
5891          * No work if no change in tablespace.
5892          */
5893         oldTableSpace = rel->rd_rel->reltablespace;
5894         if (newTableSpace == oldTableSpace ||
5895                 (newTableSpace == MyDatabaseTableSpace && oldTableSpace == 0))
5896         {
5897                 relation_close(rel, NoLock);
5898                 return;
5899         }
5900
5901         reltoastrelid = rel->rd_rel->reltoastrelid;
5902         reltoastidxid = rel->rd_rel->reltoastidxid;
5903
5904         /* Get a modifiable copy of the relation's pg_class row */
5905         pg_class = heap_open(RelationRelationId, RowExclusiveLock);
5906
5907         tuple = SearchSysCacheCopy(RELOID,
5908                                                            ObjectIdGetDatum(tableOid),
5909                                                            0, 0, 0);
5910         if (!HeapTupleIsValid(tuple))
5911                 elog(ERROR, "cache lookup failed for relation %u", tableOid);
5912         rd_rel = (Form_pg_class) GETSTRUCT(tuple);
5913
5914         /* create another storage file. Is it a little ugly ? */
5915         /* NOTE: any conflict in relfilenode value will be caught here */
5916         newrnode = rel->rd_node;
5917         newrnode.spcNode = newTableSpace;
5918
5919         dstrel = smgropen(newrnode);
5920         smgrcreate(dstrel, rel->rd_istemp, false);
5921
5922         /* copy relation data to the new physical file */
5923         copy_relation_data(rel, dstrel);
5924
5925         /* schedule unlinking old physical file */
5926         RelationOpenSmgr(rel);
5927         smgrscheduleunlink(rel->rd_smgr, rel->rd_istemp);
5928
5929         /*
5930          * Now drop smgr references.  The source was already dropped by
5931          * smgrscheduleunlink.
5932          */
5933         smgrclose(dstrel);
5934
5935         /* update the pg_class row */
5936         rd_rel->reltablespace = (newTableSpace == MyDatabaseTableSpace) ? InvalidOid : newTableSpace;
5937         simple_heap_update(pg_class, &tuple->t_self, tuple);
5938         CatalogUpdateIndexes(pg_class, tuple);
5939
5940         heap_freetuple(tuple);
5941
5942         heap_close(pg_class, RowExclusiveLock);
5943
5944         relation_close(rel, NoLock);
5945
5946         /* Make sure the reltablespace change is visible */
5947         CommandCounterIncrement();
5948
5949         /* Move associated toast relation and/or index, too */
5950         if (OidIsValid(reltoastrelid))
5951                 ATExecSetTableSpace(reltoastrelid, newTableSpace);
5952         if (OidIsValid(reltoastidxid))
5953                 ATExecSetTableSpace(reltoastidxid, newTableSpace);
5954 }
5955
5956 /*
5957  * Copy data, block by block
5958  */
5959 static void
5960 copy_relation_data(Relation rel, SMgrRelation dst)
5961 {
5962         SMgrRelation src;
5963         bool            use_wal;
5964         BlockNumber nblocks;
5965         BlockNumber blkno;
5966         char            buf[BLCKSZ];
5967         Page            page = (Page) buf;
5968
5969         /*
5970          * Since we copy the file directly without looking at the shared buffers,
5971          * we'd better first flush out any pages of the source relation that are
5972          * in shared buffers.  We assume no new changes will be made while we are
5973          * holding exclusive lock on the rel.
5974          */
5975         FlushRelationBuffers(rel);
5976
5977         /*
5978          * We need to log the copied data in WAL iff WAL archiving is enabled AND
5979          * it's not a temp rel.
5980          */
5981         use_wal = XLogArchivingActive() && !rel->rd_istemp;
5982
5983         nblocks = RelationGetNumberOfBlocks(rel);
5984         /* RelationGetNumberOfBlocks will certainly have opened rd_smgr */
5985         src = rel->rd_smgr;
5986
5987         for (blkno = 0; blkno < nblocks; blkno++)
5988         {
5989                 smgrread(src, blkno, buf);
5990
5991                 /* XLOG stuff */
5992                 if (use_wal)
5993                 {
5994                         xl_heap_newpage xlrec;
5995                         XLogRecPtr      recptr;
5996                         XLogRecData rdata[2];
5997
5998                         /* NO ELOG(ERROR) from here till newpage op is logged */
5999                         START_CRIT_SECTION();
6000
6001                         xlrec.node = dst->smgr_rnode;
6002                         xlrec.blkno = blkno;
6003
6004                         rdata[0].data = (char *) &xlrec;
6005                         rdata[0].len = SizeOfHeapNewpage;
6006                         rdata[0].buffer = InvalidBuffer;
6007                         rdata[0].next = &(rdata[1]);
6008
6009                         rdata[1].data = (char *) page;
6010                         rdata[1].len = BLCKSZ;
6011                         rdata[1].buffer = InvalidBuffer;
6012                         rdata[1].next = NULL;
6013
6014                         recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_NEWPAGE, rdata);
6015
6016                         PageSetLSN(page, recptr);
6017                         PageSetTLI(page, ThisTimeLineID);
6018
6019                         END_CRIT_SECTION();
6020                 }
6021
6022                 /*
6023                  * Now write the page.  We say isTemp = true even if it's not a temp
6024                  * rel, because there's no need for smgr to schedule an fsync for this
6025                  * write; we'll do it ourselves below.
6026                  */
6027                 smgrwrite(dst, blkno, buf, true);
6028         }
6029
6030         /*
6031          * If the rel isn't temp, we must fsync it down to disk before it's safe
6032          * to commit the transaction.  (For a temp rel we don't care since the rel
6033          * will be uninteresting after a crash anyway.)
6034          *
6035          * It's obvious that we must do this when not WAL-logging the copy. It's
6036          * less obvious that we have to do it even if we did WAL-log the copied
6037          * pages. The reason is that since we're copying outside shared buffers, a
6038          * CHECKPOINT occurring during the copy has no way to flush the previously
6039          * written data to disk (indeed it won't know the new rel even exists).  A
6040          * crash later on would replay WAL from the checkpoint, therefore it
6041          * wouldn't replay our earlier WAL entries. If we do not fsync those pages
6042          * here, they might still not be on disk when the crash occurs.
6043          */
6044         if (!rel->rd_istemp)
6045                 smgrimmedsync(dst);
6046 }
6047
6048 /*
6049  * ALTER TABLE ENABLE/DISABLE TRIGGER
6050  *
6051  * We just pass this off to trigger.c.
6052  */
6053 static void
6054 ATExecEnableDisableTrigger(Relation rel, char *trigname,
6055                                                    bool enable, bool skip_system)
6056 {
6057         EnableDisableTrigger(rel, trigname, enable, skip_system);
6058 }
6059
6060 static char *
6061 decompile_conbin(HeapTuple contup, TupleDesc tupdesc) 
6062 {
6063         Form_pg_constraint      con;
6064         bool                            isnull;
6065         Datum                           attr;
6066         Datum                           expr;
6067
6068         con = (Form_pg_constraint) GETSTRUCT(contup);
6069         attr = heap_getattr(contup, Anum_pg_constraint_conbin, tupdesc, &isnull);
6070         if (isnull)
6071                 elog(ERROR, "null conbin for constraint %u", HeapTupleGetOid(contup));
6072
6073         expr = DirectFunctionCall2(pg_get_expr, attr,
6074                                                            ObjectIdGetDatum(con->conrelid));
6075         return DatumGetCString(DirectFunctionCall1(textout, expr));
6076 }
6077
6078 /*
6079  * ALTER TABLE INHERIT
6080  *
6081  * Add a parent to the child's parents. This verifies that all the columns and
6082  * check constraints of the parent appear in the child and that they have the
6083  * same data type and expressions.
6084  */
6085 static void
6086 ATExecAddInherits(Relation child_rel, RangeVar *parent)
6087 {
6088         Relation        parent_rel,
6089                                 catalogRelation;
6090         SysScanDesc scan;
6091         ScanKeyData key;
6092         HeapTuple       inheritsTuple;
6093         int4            inhseqno;
6094         List       *children;
6095
6096         parent_rel = heap_openrv(parent, AccessShareLock);
6097
6098         /*
6099          * Must be owner of both parent and child -- child is taken care of by
6100          * ATSimplePermissions call in ATPrepCmd
6101          */
6102         ATSimplePermissions(parent_rel, false);
6103
6104         /* Permanent rels cannot inherit from temporary ones */
6105         if (!isTempNamespace(RelationGetNamespace(child_rel)) &&
6106                 isTempNamespace(RelationGetNamespace(parent_rel)))
6107                 ereport(ERROR,
6108                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
6109                                  errmsg("cannot inherit from temporary relation \"%s\"",
6110                                                 parent->relname)));
6111
6112         /* If parent has OIDs then all children must have OIDs */
6113         if (parent_rel->rd_rel->relhasoids && !child_rel->rd_rel->relhasoids)
6114                 ereport(ERROR,
6115                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
6116                                  errmsg("table \"%s\" without OIDs cannot inherit from table \"%s\" with OIDs",
6117                                                 RelationGetRelationName(child_rel), parent->relname)));
6118
6119         /*
6120          * Don't allow any duplicates in the list of parents. We scan through the
6121          * list of parents in pg_inherit and keep track of the first open inhseqno
6122          * slot found to use for the new parent.
6123          */
6124         catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock);
6125         ScanKeyInit(&key,
6126                                 Anum_pg_inherits_inhrelid,
6127                                 BTEqualStrategyNumber, F_OIDEQ,
6128                                 ObjectIdGetDatum(RelationGetRelid(child_rel)));
6129         scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId,
6130                                                           true, SnapshotNow, 1, &key);
6131
6132         /* inhseqno sequences start at 1 */
6133         inhseqno = 0;
6134         while (HeapTupleIsValid(inheritsTuple = systable_getnext(scan)))
6135         {
6136                 Form_pg_inherits inh = (Form_pg_inherits) GETSTRUCT(inheritsTuple);
6137
6138                 if (inh->inhparent == RelationGetRelid(parent_rel))
6139                         ereport(ERROR,
6140                                         (errcode(ERRCODE_DUPLICATE_TABLE),
6141                                          errmsg("inherited relation \"%s\" duplicated",
6142                                                         parent->relname)));
6143                 if (inh->inhseqno == inhseqno + 1)
6144                         inhseqno = inh->inhseqno;
6145         }
6146         systable_endscan(scan);
6147         heap_close(catalogRelation, RowExclusiveLock);
6148
6149         /*
6150          * If the new parent is found in our list of inheritors, we have a circular
6151          * structure
6152          */
6153         children = find_all_inheritors(RelationGetRelid(child_rel));
6154
6155         if (list_member_oid(children, RelationGetRelid(parent_rel)))
6156                 ereport(ERROR,
6157                                 (errcode(ERRCODE_DUPLICATE_TABLE),
6158                                  errmsg("circular inheritance structure found"),
6159                                  errdetail("\"%s\" is already a child of \"%s\".",
6160                                                    parent->relname,
6161                                                    RelationGetRelationName(child_rel))));
6162
6163         /* Match up the columns and bump attinhcount and attislocal */
6164         MergeAttributesIntoExisting(child_rel, parent_rel);
6165
6166         /* Match up the constraints and make sure they're present in child */
6167         MergeConstraintsIntoExisting(child_rel, parent_rel);
6168
6169         catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock);
6170         StoreCatalogInheritance1(RelationGetRelid(child_rel),
6171                                                          RelationGetRelid(parent_rel),
6172                                                          inhseqno + 1, catalogRelation);
6173         heap_close(catalogRelation, RowExclusiveLock);
6174
6175         /* keep our lock on the parent relation until commit */
6176         heap_close(parent_rel, NoLock);
6177 }
6178
6179 /*
6180  * Check columns in child table match up with columns in parent
6181  *
6182  * Called by ATExecAddInherits
6183  *
6184  * Currently all columns must be found in child. Missing columns are an error.
6185  * One day we might consider creating new columns like CREATE TABLE does.
6186  *
6187  * The data type must match perfectly. If the parent column is NOT NULL then
6188  * the child table must be as well. Defaults are ignored however.
6189  */
6190 static void
6191 MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel)
6192 {
6193         Relation         attrdesc;
6194         AttrNumber       parent_attno;
6195         int                      parent_natts;
6196         TupleDesc        tupleDesc;
6197         TupleConstr *constr;
6198         HeapTuple        tuple;
6199
6200         tupleDesc = RelationGetDescr(parent_rel);
6201         parent_natts = tupleDesc->natts;
6202         constr = tupleDesc->constr;
6203
6204         for (parent_attno = 1; parent_attno <= parent_natts; parent_attno++)
6205         {
6206                 Form_pg_attribute attribute = tupleDesc->attrs[parent_attno - 1];
6207                 char       *attributeName = NameStr(attribute->attname);
6208
6209                 /* Ignore dropped columns in the parent. */
6210                 if (attribute->attisdropped)
6211                         continue;
6212
6213                 /* Does it conflict with an existing column? */
6214                 attrdesc = heap_open(AttributeRelationId, RowExclusiveLock);
6215
6216                 tuple = SearchSysCacheCopyAttName(RelationGetRelid(child_rel),
6217                                                                                   attributeName);
6218                 if (HeapTupleIsValid(tuple))
6219                 {
6220                         /*
6221                          * Yes, try to merge the two column definitions. They must have
6222                          * the same type and typmod.
6223                          */
6224                         Form_pg_attribute childatt = (Form_pg_attribute) GETSTRUCT(tuple);
6225
6226                         if (attribute->atttypid != childatt->atttypid ||
6227                                 attribute->atttypmod != childatt->atttypmod)
6228                                 ereport(ERROR,
6229                                                 (errcode(ERRCODE_DATATYPE_MISMATCH),
6230                                                  errmsg("child table \"%s\" has different type for column \"%s\"",
6231                                 RelationGetRelationName(child_rel), NameStr(attribute->attname))));
6232
6233                         if (attribute->attnotnull && !childatt->attnotnull)
6234                                 ereport(ERROR,
6235                                                 (errcode(ERRCODE_DATATYPE_MISMATCH),
6236                                                  errmsg("column \"%s\" in child table must be NOT NULL",
6237                                                                 NameStr(attribute->attname))));
6238
6239                         childatt->attinhcount++;
6240                         simple_heap_update(attrdesc, &tuple->t_self, tuple);
6241                         /* XXX strength reduce open indexes to outside loop? */
6242                         CatalogUpdateIndexes(attrdesc, tuple);
6243                         heap_freetuple(tuple);
6244
6245                         /*
6246                          * We don't touch default at all since we're not making any other
6247                          * DDL changes to the child
6248                          */
6249                 }
6250                 else
6251                 {
6252                         /*
6253                          * Creating inherited columns in this case seems to be unpopular.
6254                          * In the common use case of partitioned tables it's a foot-gun.
6255                          */
6256                         ereport(ERROR,
6257                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
6258                                          errmsg("child table missing column \"%s\"",
6259                                                         NameStr(attribute->attname))));
6260                 }
6261
6262                 heap_close(attrdesc, RowExclusiveLock);
6263         }
6264 }
6265
6266 /*
6267  * Check constraints in child table match up with constraints in parent
6268  *
6269  * Called by ATExecAddInherits
6270  *
6271  * Currently all constraints in parent must be present in the child. One day we
6272  * may consider adding new constraints like CREATE TABLE does. We may also want
6273  * to allow an optional flag on parent table constraints indicating they are
6274  * intended to ONLY apply to the master table, not to the children. That would
6275  * make it possible to ensure no records are mistakenly inserted into the
6276  * master in partitioned tables rather than the appropriate child.
6277  *
6278  * XXX this is O(n^2) which may be issue with tables with hundreds of
6279  * constraints. As long as tables have more like 10 constraints it shouldn't be
6280  * an issue though. Even 100 constraints ought not be the end of the world.
6281  */
6282 static void
6283 MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel)
6284 {
6285         Relation        catalogRelation;
6286         TupleDesc       tupleDesc;
6287         SysScanDesc scan;
6288         ScanKeyData key;
6289         HeapTuple       constraintTuple;
6290         ListCell   *elem;
6291         List       *constraints;
6292
6293         /* First gather up the child's constraint definitions */
6294         catalogRelation = heap_open(ConstraintRelationId, AccessShareLock);
6295         tupleDesc = RelationGetDescr(catalogRelation);
6296
6297         ScanKeyInit(&key,
6298                                 Anum_pg_constraint_conrelid,
6299                                 BTEqualStrategyNumber,
6300                                 F_OIDEQ,
6301                                 ObjectIdGetDatum(RelationGetRelid(child_rel)));
6302         scan = systable_beginscan(catalogRelation, ConstraintRelidIndexId,
6303                                                           true, SnapshotNow, 1, &key);
6304         constraints = NIL;
6305
6306         while (HeapTupleIsValid(constraintTuple = systable_getnext(scan)))
6307         {
6308                 Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(constraintTuple);
6309
6310                 if (con->contype != CONSTRAINT_CHECK)
6311                         continue;
6312
6313                 constraints = lappend(constraints, heap_copytuple(constraintTuple));
6314         }
6315         systable_endscan(scan);
6316
6317         /* Then loop through the parent's constraints looking for them in the list */
6318         ScanKeyInit(&key,
6319                                 Anum_pg_constraint_conrelid,
6320                                 BTEqualStrategyNumber,
6321                                 F_OIDEQ,
6322                                 ObjectIdGetDatum(RelationGetRelid(parent_rel)));
6323         scan = systable_beginscan(catalogRelation, ConstraintRelidIndexId, true,
6324                                                           SnapshotNow, 1, &key);
6325         while (HeapTupleIsValid(constraintTuple = systable_getnext(scan)))
6326         {
6327                 bool            found = false;
6328                 Form_pg_constraint parent_con = (Form_pg_constraint) GETSTRUCT(constraintTuple);
6329                 Form_pg_constraint child_con = NULL;
6330                 HeapTuple       child_contuple = NULL;
6331
6332                 if (parent_con->contype != CONSTRAINT_CHECK)
6333                         continue;
6334
6335                 foreach(elem, constraints)
6336                 {
6337                         child_contuple = lfirst(elem);
6338                         child_con = (Form_pg_constraint) GETSTRUCT(child_contuple);
6339                         if (!strcmp(NameStr(parent_con->conname),
6340                                                 NameStr(child_con->conname)))
6341                         {
6342                                 found = true;
6343                                 break;
6344                         }
6345                 }
6346
6347                 if (!found)
6348                         ereport(ERROR,
6349                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
6350                                          errmsg("child table missing constraint matching parent table constraint \"%s\"",
6351                                                         NameStr(parent_con->conname))));
6352
6353                 if (parent_con->condeferrable != child_con->condeferrable ||
6354                         parent_con->condeferred != child_con->condeferred ||
6355                         parent_con->contypid != child_con->contypid ||
6356                         strcmp(decompile_conbin(constraintTuple, tupleDesc),
6357                                    decompile_conbin(child_contuple, tupleDesc)))
6358                         ereport(ERROR,
6359                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
6360                                          errmsg("constraint definition for CHECK constraint \"%s\" doesn't match",
6361                                                         NameStr(parent_con->conname))));
6362
6363                 /*
6364                  * TODO: add conislocal,coninhcount to constraints. This is where we
6365                  * would have to bump them just like attributes
6366                  */
6367         }
6368
6369         systable_endscan(scan);
6370         heap_close(catalogRelation, AccessShareLock);
6371 }
6372
6373 /*
6374  * ALTER TABLE NO INHERIT
6375  *
6376  * Drop a parent from the child's parents. This just adjusts the attinhcount
6377  * and attislocal of the columns and removes the pg_inherit and pg_depend
6378  * entries.
6379  *
6380  * If attinhcount goes to 0 then attislocal gets set to true. If it goes back up
6381  * attislocal stays 0 which means if a child is ever removed from a parent then
6382  * its columns will never be automatically dropped which may surprise. But at
6383  * least we'll never surprise by dropping columns someone isn't expecting to be
6384  * dropped which would actually mean data loss.
6385  */
6386 static void
6387 ATExecDropInherits(Relation rel, RangeVar *parent)
6388 {
6389         Relation        catalogRelation;
6390         SysScanDesc scan;
6391         ScanKeyData key[2];
6392         HeapTuple       inheritsTuple,
6393                                 attributeTuple,
6394                                 depTuple;
6395         Oid                     inhparent;
6396         Oid                     dropparent;
6397         int                     found = false;
6398
6399         /*
6400          * Get the OID of parent -- if no schema is specified use the regular
6401          * search path and only drop the one table that's found. We could try to
6402          * be clever and look at each parent and see if it matches but that would
6403          * be inconsistent with other operations I think.
6404          */
6405         dropparent = RangeVarGetRelid(parent, false);
6406
6407         /* Search through the direct parents of rel looking for dropparent oid */
6408
6409         catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock);
6410         ScanKeyInit(key,
6411                                 Anum_pg_inherits_inhrelid,
6412                                 BTEqualStrategyNumber, F_OIDEQ,
6413                                 ObjectIdGetDatum(RelationGetRelid(rel)));
6414         scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId,
6415                                                           true, SnapshotNow, 1, key);
6416
6417         while (HeapTupleIsValid(inheritsTuple = systable_getnext(scan)))
6418         {
6419                 inhparent = ((Form_pg_inherits) GETSTRUCT(inheritsTuple))->inhparent;
6420                 if (inhparent == dropparent)
6421                 {
6422                         simple_heap_delete(catalogRelation, &inheritsTuple->t_self);
6423                         found = true;
6424                         break;
6425                 }
6426         }
6427
6428         systable_endscan(scan);
6429         heap_close(catalogRelation, RowExclusiveLock);
6430
6431         if (!found)
6432         {
6433                 if (parent->schemaname)
6434                         ereport(ERROR,
6435                                         (errcode(ERRCODE_UNDEFINED_TABLE),
6436                           errmsg("relation \"%s.%s\" is not a parent of relation \"%s\"",
6437                                          parent->schemaname, parent->relname, RelationGetRelationName(rel))));
6438                 else
6439                         ereport(ERROR,
6440                                         (errcode(ERRCODE_UNDEFINED_TABLE),
6441                                  errmsg("relation \"%s\" is not a parent of relation \"%s\"",
6442                                                 parent->relname, RelationGetRelationName(rel))));
6443         }
6444
6445         /* Search through columns looking for matching columns from parent table */
6446
6447         catalogRelation = heap_open(AttributeRelationId, RowExclusiveLock);
6448         ScanKeyInit(key,
6449                                 Anum_pg_attribute_attrelid,
6450                                 BTEqualStrategyNumber,
6451                                 F_OIDEQ,
6452                                 ObjectIdGetDatum(RelationGetRelid(rel)));
6453         scan = systable_beginscan(catalogRelation, AttributeRelidNumIndexId,
6454                                                           true, SnapshotNow, 1, key);
6455         while (HeapTupleIsValid(attributeTuple = systable_getnext(scan)))
6456         {
6457                 Form_pg_attribute att = (Form_pg_attribute) GETSTRUCT(attributeTuple);
6458
6459                 /*
6460                  * Not an inherited column at all (do NOT use islocal for this
6461                  * test--it can be true for inherited columns)
6462                  */
6463                 if (att->attinhcount == 0)
6464                         continue;
6465                 if (att->attisdropped)
6466                         continue;
6467
6468                 if (SearchSysCacheExistsAttName(dropparent, NameStr(att->attname)))
6469                 {
6470                         /* Decrement inhcount and possibly set islocal to 1 */
6471                         HeapTuple       copyTuple = heap_copytuple(attributeTuple);
6472                         Form_pg_attribute copy_att = (Form_pg_attribute) GETSTRUCT(copyTuple);
6473
6474                         copy_att->attinhcount--;
6475                         if (copy_att->attinhcount == 0)
6476                                 copy_att->attislocal = true;
6477
6478                         simple_heap_update(catalogRelation, &copyTuple->t_self, copyTuple);
6479
6480                         /*
6481                          * XXX "Avoid using it for multiple tuples, since opening the
6482                          * indexes and building the index info structures is moderately
6483                          * expensive." Perhaps this can be moved outside the loop or else
6484                          * at least the CatalogOpenIndexes/CatalogCloseIndexes moved
6485                          * outside the loop but when I try that it seg faults?!
6486                          */
6487                         CatalogUpdateIndexes(catalogRelation, copyTuple);
6488                         heap_freetuple(copyTuple);
6489                 }
6490         }
6491         systable_endscan(scan);
6492         heap_close(catalogRelation, RowExclusiveLock);
6493
6494         /*
6495          * Drop the dependency
6496          *
6497          * There's no convenient way to do this, so go trawling through pg_depend
6498          */
6499
6500         catalogRelation = heap_open(DependRelationId, RowExclusiveLock);
6501
6502         ScanKeyInit(&key[0],
6503                                 Anum_pg_depend_classid,
6504                                 BTEqualStrategyNumber, F_OIDEQ,
6505                                 RelationRelationId);
6506         ScanKeyInit(&key[1],
6507                                 Anum_pg_depend_objid,
6508                                 BTEqualStrategyNumber, F_OIDEQ,
6509                                 ObjectIdGetDatum(RelationGetRelid(rel)));
6510
6511         scan = systable_beginscan(catalogRelation, DependDependerIndexId, true,
6512                                                           SnapshotNow, 2, key);
6513
6514         while (HeapTupleIsValid(depTuple = systable_getnext(scan)))
6515         {
6516                 Form_pg_depend dep = (Form_pg_depend) GETSTRUCT(depTuple);
6517
6518                 if (dep->refclassid == RelationRelationId &&
6519                         dep->refobjid == dropparent &&
6520                         dep->deptype == DEPENDENCY_NORMAL)
6521                 {
6522                         /*
6523                          * Only delete a single dependency -- there shouldn't be more but
6524                          * just in case...
6525                          */
6526                         simple_heap_delete(catalogRelation, &depTuple->t_self);
6527                         break;
6528                 }
6529         }
6530
6531         systable_endscan(scan);
6532         heap_close(catalogRelation, RowExclusiveLock);
6533 }
6534
6535
6536 /*
6537  * Execute ALTER TABLE SET SCHEMA
6538  *
6539  * Note: caller must have checked ownership of the relation already
6540  */
6541 void
6542 AlterTableNamespace(RangeVar *relation, const char *newschema)
6543 {
6544         Relation        rel;
6545         Oid                     relid;
6546         Oid                     oldNspOid;
6547         Oid                     nspOid;
6548         Relation        classRel;
6549
6550         rel = heap_openrv(relation, AccessExclusiveLock);
6551
6552         /* heap_openrv allows TOAST, but we don't want to */
6553         if (rel->rd_rel->relkind == RELKIND_TOASTVALUE)
6554                 ereport(ERROR,
6555                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
6556                                  errmsg("\"%s\" is a TOAST relation",
6557                                                 RelationGetRelationName(rel))));
6558
6559         relid = RelationGetRelid(rel);
6560         oldNspOid = RelationGetNamespace(rel);
6561
6562         /* get schema OID and check its permissions */
6563         nspOid = LookupCreationNamespace(newschema);
6564
6565         if (oldNspOid == nspOid)
6566                 ereport(ERROR,
6567                                 (errcode(ERRCODE_DUPLICATE_TABLE),
6568                                  errmsg("relation \"%s\" is already in schema \"%s\"",
6569                                                 RelationGetRelationName(rel),
6570                                                 newschema)));
6571
6572         /* disallow renaming into or out of temp schemas */
6573         if (isAnyTempNamespace(nspOid) || isAnyTempNamespace(oldNspOid))
6574                 ereport(ERROR,
6575                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
6576                         errmsg("cannot move objects into or out of temporary schemas")));
6577
6578         /* same for TOAST schema */
6579         if (nspOid == PG_TOAST_NAMESPACE || oldNspOid == PG_TOAST_NAMESPACE)
6580                 ereport(ERROR,
6581                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
6582                                  errmsg("cannot move objects into or out of TOAST schema")));
6583
6584         /* OK, modify the pg_class row and pg_depend entry */
6585         classRel = heap_open(RelationRelationId, RowExclusiveLock);
6586
6587         AlterRelationNamespaceInternal(classRel, relid, oldNspOid, nspOid, true);
6588
6589         /* Fix the table's rowtype too */
6590         AlterTypeNamespaceInternal(rel->rd_rel->reltype, nspOid, false);
6591
6592         /* Fix other dependent stuff */
6593         if (rel->rd_rel->relkind == RELKIND_RELATION)
6594         {
6595                 AlterIndexNamespaces(classRel, rel, oldNspOid, nspOid);
6596                 AlterSeqNamespaces(classRel, rel, oldNspOid, nspOid, newschema);
6597                 AlterConstraintNamespaces(relid, oldNspOid, nspOid, false);
6598         }
6599
6600         heap_close(classRel, RowExclusiveLock);
6601
6602         /* close rel, but keep lock until commit */
6603         relation_close(rel, NoLock);
6604 }
6605
6606 /*
6607  * The guts of relocating a relation to another namespace: fix the pg_class
6608  * entry, and the pg_depend entry if any.  Caller must already have
6609  * opened and write-locked pg_class.
6610  */
6611 void
6612 AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
6613                                                            Oid oldNspOid, Oid newNspOid,
6614                                                            bool hasDependEntry)
6615 {
6616         HeapTuple       classTup;
6617         Form_pg_class classForm;
6618
6619         classTup = SearchSysCacheCopy(RELOID,
6620                                                                   ObjectIdGetDatum(relOid),
6621                                                                   0, 0, 0);
6622         if (!HeapTupleIsValid(classTup))
6623                 elog(ERROR, "cache lookup failed for relation %u", relOid);
6624         classForm = (Form_pg_class) GETSTRUCT(classTup);
6625
6626         Assert(classForm->relnamespace == oldNspOid);
6627
6628         /* check for duplicate name (more friendly than unique-index failure) */
6629         if (get_relname_relid(NameStr(classForm->relname),
6630                                                   newNspOid) != InvalidOid)
6631                 ereport(ERROR,
6632                                 (errcode(ERRCODE_DUPLICATE_TABLE),
6633                                  errmsg("relation \"%s\" already exists in schema \"%s\"",
6634                                                 NameStr(classForm->relname),
6635                                                 get_namespace_name(newNspOid))));
6636
6637         /* classTup is a copy, so OK to scribble on */
6638         classForm->relnamespace = newNspOid;
6639
6640         simple_heap_update(classRel, &classTup->t_self, classTup);
6641         CatalogUpdateIndexes(classRel, classTup);
6642
6643         /* Update dependency on schema if caller said so */
6644         if (hasDependEntry &&
6645                 changeDependencyFor(RelationRelationId, relOid,
6646                                                         NamespaceRelationId, oldNspOid, newNspOid) != 1)
6647                 elog(ERROR, "failed to change schema dependency for relation \"%s\"",
6648                          NameStr(classForm->relname));
6649
6650         heap_freetuple(classTup);
6651 }
6652
6653 /*
6654  * Move all indexes for the specified relation to another namespace.
6655  *
6656  * Note: we assume adequate permission checking was done by the caller,
6657  * and that the caller has a suitable lock on the owning relation.
6658  */
6659 static void
6660 AlterIndexNamespaces(Relation classRel, Relation rel,
6661                                          Oid oldNspOid, Oid newNspOid)
6662 {
6663         List       *indexList;
6664         ListCell   *l;
6665
6666         indexList = RelationGetIndexList(rel);
6667
6668         foreach(l, indexList)
6669         {
6670                 Oid                     indexOid = lfirst_oid(l);
6671
6672                 /*
6673                  * Note: currently, the index will not have its own dependency on the
6674                  * namespace, so we don't need to do changeDependencyFor(). There's no
6675                  * rowtype in pg_type, either.
6676                  */
6677                 AlterRelationNamespaceInternal(classRel, indexOid,
6678                                                                            oldNspOid, newNspOid,
6679                                                                            false);
6680         }
6681
6682         list_free(indexList);
6683 }
6684
6685 /*
6686  * Move all SERIAL-column sequences of the specified relation to another
6687  * namespace.
6688  *
6689  * Note: we assume adequate permission checking was done by the caller,
6690  * and that the caller has a suitable lock on the owning relation.
6691  */
6692 static void
6693 AlterSeqNamespaces(Relation classRel, Relation rel,
6694                                    Oid oldNspOid, Oid newNspOid, const char *newNspName)
6695 {
6696         Relation        depRel;
6697         SysScanDesc scan;
6698         ScanKeyData key[2];
6699         HeapTuple       tup;
6700
6701         /*
6702          * SERIAL sequences are those having an internal dependency on one of the
6703          * table's columns (we don't care *which* column, exactly).
6704          */
6705         depRel = heap_open(DependRelationId, AccessShareLock);
6706
6707         ScanKeyInit(&key[0],
6708                                 Anum_pg_depend_refclassid,
6709                                 BTEqualStrategyNumber, F_OIDEQ,
6710                                 ObjectIdGetDatum(RelationRelationId));
6711         ScanKeyInit(&key[1],
6712                                 Anum_pg_depend_refobjid,
6713                                 BTEqualStrategyNumber, F_OIDEQ,
6714                                 ObjectIdGetDatum(RelationGetRelid(rel)));
6715         /* we leave refobjsubid unspecified */
6716
6717         scan = systable_beginscan(depRel, DependReferenceIndexId, true,
6718                                                           SnapshotNow, 2, key);
6719
6720         while (HeapTupleIsValid(tup = systable_getnext(scan)))
6721         {
6722                 Form_pg_depend depForm = (Form_pg_depend) GETSTRUCT(tup);
6723                 Relation        seqRel;
6724
6725                 /* skip dependencies other than internal dependencies on columns */
6726                 if (depForm->refobjsubid == 0 ||
6727                         depForm->classid != RelationRelationId ||
6728                         depForm->objsubid != 0 ||
6729                         depForm->deptype != DEPENDENCY_INTERNAL)
6730                         continue;
6731
6732                 /* Use relation_open just in case it's an index */
6733                 seqRel = relation_open(depForm->objid, AccessExclusiveLock);
6734
6735                 /* skip non-sequence relations */
6736                 if (RelationGetForm(seqRel)->relkind != RELKIND_SEQUENCE)
6737                 {
6738                         /* No need to keep the lock */
6739                         relation_close(seqRel, AccessExclusiveLock);
6740                         continue;
6741                 }
6742
6743                 /* Fix the pg_class and pg_depend entries */
6744                 AlterRelationNamespaceInternal(classRel, depForm->objid,
6745                                                                            oldNspOid, newNspOid,
6746                                                                            true);
6747
6748                 /*
6749                  * Sequences have entries in pg_type. We need to be careful to move
6750                  * them to the new namespace, too.
6751                  */
6752                 AlterTypeNamespaceInternal(RelationGetForm(seqRel)->reltype,
6753                                                                    newNspOid, false);
6754
6755                 /* Now we can close it.  Keep the lock till end of transaction. */
6756                 relation_close(seqRel, NoLock);
6757         }
6758
6759         systable_endscan(scan);
6760
6761         relation_close(depRel, AccessShareLock);
6762 }
6763
6764
6765 /*
6766  * This code supports
6767  *      CREATE TEMP TABLE ... ON COMMIT { DROP | PRESERVE ROWS | DELETE ROWS }
6768  *
6769  * Because we only support this for TEMP tables, it's sufficient to remember
6770  * the state in a backend-local data structure.
6771  */
6772
6773 /*
6774  * Register a newly-created relation's ON COMMIT action.
6775  */
6776 void
6777 register_on_commit_action(Oid relid, OnCommitAction action)
6778 {
6779         OnCommitItem *oc;
6780         MemoryContext oldcxt;
6781
6782         /*
6783          * We needn't bother registering the relation unless there is an ON COMMIT
6784          * action we need to take.
6785          */
6786         if (action == ONCOMMIT_NOOP || action == ONCOMMIT_PRESERVE_ROWS)
6787                 return;
6788
6789         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
6790
6791         oc = (OnCommitItem *) palloc(sizeof(OnCommitItem));
6792         oc->relid = relid;
6793         oc->oncommit = action;
6794         oc->creating_subid = GetCurrentSubTransactionId();
6795         oc->deleting_subid = InvalidSubTransactionId;
6796
6797         on_commits = lcons(oc, on_commits);
6798
6799         MemoryContextSwitchTo(oldcxt);
6800 }
6801
6802 /*
6803  * Unregister any ON COMMIT action when a relation is deleted.
6804  *
6805  * Actually, we only mark the OnCommitItem entry as to be deleted after commit.
6806  */
6807 void
6808 remove_on_commit_action(Oid relid)
6809 {
6810         ListCell   *l;
6811
6812         foreach(l, on_commits)
6813         {
6814                 OnCommitItem *oc = (OnCommitItem *) lfirst(l);
6815
6816                 if (oc->relid == relid)
6817                 {
6818                         oc->deleting_subid = GetCurrentSubTransactionId();
6819                         break;
6820                 }
6821         }
6822 }
6823
6824 /*
6825  * Perform ON COMMIT actions.
6826  *
6827  * This is invoked just before actually committing, since it's possible
6828  * to encounter errors.
6829  */
6830 void
6831 PreCommit_on_commit_actions(void)
6832 {
6833         ListCell   *l;
6834         List       *oids_to_truncate = NIL;
6835
6836         foreach(l, on_commits)
6837         {
6838                 OnCommitItem *oc = (OnCommitItem *) lfirst(l);
6839
6840                 /* Ignore entry if already dropped in this xact */
6841                 if (oc->deleting_subid != InvalidSubTransactionId)
6842                         continue;
6843
6844                 switch (oc->oncommit)
6845                 {
6846                         case ONCOMMIT_NOOP:
6847                         case ONCOMMIT_PRESERVE_ROWS:
6848                                 /* Do nothing (there shouldn't be such entries, actually) */
6849                                 break;
6850                         case ONCOMMIT_DELETE_ROWS:
6851                                 oids_to_truncate = lappend_oid(oids_to_truncate, oc->relid);
6852                                 break;
6853                         case ONCOMMIT_DROP:
6854                                 {
6855                                         ObjectAddress object;
6856
6857                                         object.classId = RelationRelationId;
6858                                         object.objectId = oc->relid;
6859                                         object.objectSubId = 0;
6860                                         performDeletion(&object, DROP_CASCADE);
6861
6862                                         /*
6863                                          * Note that table deletion will call
6864                                          * remove_on_commit_action, so the entry should get marked
6865                                          * as deleted.
6866                                          */
6867                                         Assert(oc->deleting_subid != InvalidSubTransactionId);
6868                                         break;
6869                                 }
6870                 }
6871         }
6872         if (oids_to_truncate != NIL)
6873         {
6874                 heap_truncate(oids_to_truncate);
6875                 CommandCounterIncrement();              /* XXX needed? */
6876         }
6877 }
6878
6879 /*
6880  * Post-commit or post-abort cleanup for ON COMMIT management.
6881  *
6882  * All we do here is remove no-longer-needed OnCommitItem entries.
6883  *
6884  * During commit, remove entries that were deleted during this transaction;
6885  * during abort, remove those created during this transaction.
6886  */
6887 void
6888 AtEOXact_on_commit_actions(bool isCommit)
6889 {
6890         ListCell   *cur_item;
6891         ListCell   *prev_item;
6892
6893         prev_item = NULL;
6894         cur_item = list_head(on_commits);
6895
6896         while (cur_item != NULL)
6897         {
6898                 OnCommitItem *oc = (OnCommitItem *) lfirst(cur_item);
6899
6900                 if (isCommit ? oc->deleting_subid != InvalidSubTransactionId :
6901                         oc->creating_subid != InvalidSubTransactionId)
6902                 {
6903                         /* cur_item must be removed */
6904                         on_commits = list_delete_cell(on_commits, cur_item, prev_item);
6905                         pfree(oc);
6906                         if (prev_item)
6907                                 cur_item = lnext(prev_item);
6908                         else
6909                                 cur_item = list_head(on_commits);
6910                 }
6911                 else
6912                 {
6913                         /* cur_item must be preserved */
6914                         oc->creating_subid = InvalidSubTransactionId;
6915                         oc->deleting_subid = InvalidSubTransactionId;
6916                         prev_item = cur_item;
6917                         cur_item = lnext(prev_item);
6918                 }
6919         }
6920 }
6921
6922 /*
6923  * Post-subcommit or post-subabort cleanup for ON COMMIT management.
6924  *
6925  * During subabort, we can immediately remove entries created during this
6926  * subtransaction.      During subcommit, just relabel entries marked during
6927  * this subtransaction as being the parent's responsibility.
6928  */
6929 void
6930 AtEOSubXact_on_commit_actions(bool isCommit, SubTransactionId mySubid,
6931                                                           SubTransactionId parentSubid)
6932 {
6933         ListCell   *cur_item;
6934         ListCell   *prev_item;
6935
6936         prev_item = NULL;
6937         cur_item = list_head(on_commits);
6938
6939         while (cur_item != NULL)
6940         {
6941                 OnCommitItem *oc = (OnCommitItem *) lfirst(cur_item);
6942
6943                 if (!isCommit && oc->creating_subid == mySubid)
6944                 {
6945                         /* cur_item must be removed */
6946                         on_commits = list_delete_cell(on_commits, cur_item, prev_item);
6947                         pfree(oc);
6948                         if (prev_item)
6949                                 cur_item = lnext(prev_item);
6950                         else
6951                                 cur_item = list_head(on_commits);
6952                 }
6953                 else
6954                 {
6955                         /* cur_item must be preserved */
6956                         if (oc->creating_subid == mySubid)
6957                                 oc->creating_subid = parentSubid;
6958                         if (oc->deleting_subid == mySubid)
6959                                 oc->deleting_subid = isCommit ? parentSubid : InvalidSubTransactionId;
6960                         prev_item = cur_item;
6961                         cur_item = lnext(prev_item);
6962                 }
6963         }
6964 }