OSDN Git Service

Replace pg_shadow and pg_group by new role-capable catalogs pg_authid
[pg-rex/syncrep.git] / src / backend / commands / tablecmds.c
1 /*-------------------------------------------------------------------------
2  *
3  * tablecmds.c
4  *        Commands for creating and altering table structures and settings
5  *
6  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.162 2005/06/28 05:08:54 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/genam.h"
18 #include "access/tuptoaster.h"
19 #include "catalog/catalog.h"
20 #include "catalog/dependency.h"
21 #include "catalog/heap.h"
22 #include "catalog/index.h"
23 #include "catalog/indexing.h"
24 #include "catalog/namespace.h"
25 #include "catalog/pg_constraint.h"
26 #include "catalog/pg_depend.h"
27 #include "catalog/pg_inherits.h"
28 #include "catalog/pg_namespace.h"
29 #include "catalog/pg_opclass.h"
30 #include "catalog/pg_trigger.h"
31 #include "catalog/pg_type.h"
32 #include "commands/cluster.h"
33 #include "commands/defrem.h"
34 #include "commands/tablecmds.h"
35 #include "commands/tablespace.h"
36 #include "commands/trigger.h"
37 #include "commands/typecmds.h"
38 #include "executor/executor.h"
39 #include "lib/stringinfo.h"
40 #include "miscadmin.h"
41 #include "nodes/makefuncs.h"
42 #include "optimizer/clauses.h"
43 #include "optimizer/plancat.h"
44 #include "optimizer/prep.h"
45 #include "parser/analyze.h"
46 #include "parser/gramparse.h"
47 #include "parser/parser.h"
48 #include "parser/parse_clause.h"
49 #include "parser/parse_coerce.h"
50 #include "parser/parse_expr.h"
51 #include "parser/parse_oper.h"
52 #include "parser/parse_relation.h"
53 #include "parser/parse_type.h"
54 #include "rewrite/rewriteHandler.h"
55 #include "storage/smgr.h"
56 #include "utils/acl.h"
57 #include "utils/builtins.h"
58 #include "utils/fmgroids.h"
59 #include "utils/inval.h"
60 #include "utils/lsyscache.h"
61 #include "utils/memutils.h"
62 #include "utils/relcache.h"
63 #include "utils/syscache.h"
64
65
66 /*
67  * ON COMMIT action list
68  */
69 typedef struct OnCommitItem
70 {
71         Oid                     relid;                  /* relid of relation */
72         OnCommitAction oncommit;        /* what to do at end of xact */
73
74         /*
75          * If this entry was created during the current transaction,
76          * creating_subid is the ID of the creating subxact; if created in a prior
77          * transaction, creating_subid is zero.  If deleted during the current
78          * transaction, deleting_subid is the ID of the deleting subxact; if no
79          * deletion request is pending, deleting_subid is zero.
80          */
81         SubTransactionId creating_subid;
82         SubTransactionId deleting_subid;
83 } OnCommitItem;
84
85 static List *on_commits = NIL;
86
87
88 /*
89  * State information for ALTER TABLE
90  *
91  * The pending-work queue for an ALTER TABLE is a List of AlteredTableInfo
92  * structs, one for each table modified by the operation (the named table
93  * plus any child tables that are affected).  We save lists of subcommands
94  * to apply to this table (possibly modified by parse transformation steps);
95  * these lists will be executed in Phase 2.  If a Phase 3 step is needed,
96  * necessary information is stored in the constraints and newvals lists.
97  *
98  * Phase 2 is divided into multiple passes; subcommands are executed in
99  * a pass determined by subcommand type.
100  */
101
102 #define AT_PASS_DROP                    0               /* DROP (all flavors) */
103 #define AT_PASS_ALTER_TYPE              1               /* ALTER COLUMN TYPE */
104 #define AT_PASS_OLD_INDEX               2               /* re-add existing indexes */
105 #define AT_PASS_OLD_CONSTR              3               /* re-add existing constraints */
106 #define AT_PASS_COL_ATTRS               4               /* set other column attributes */
107 /* We could support a RENAME COLUMN pass here, but not currently used */
108 #define AT_PASS_ADD_COL                 5               /* ADD COLUMN */
109 #define AT_PASS_ADD_INDEX               6               /* ADD indexes */
110 #define AT_PASS_ADD_CONSTR              7               /* ADD constraints, defaults */
111 #define AT_PASS_MISC                    8               /* other stuff */
112 #define AT_NUM_PASSES                   9
113
114 typedef struct AlteredTableInfo
115 {
116         /* Information saved before any work commences: */
117         Oid                     relid;                  /* Relation to work on */
118         char            relkind;                /* Its relkind */
119         TupleDesc       oldDesc;                /* Pre-modification tuple descriptor */
120         /* Information saved by Phase 1 for Phase 2: */
121         List       *subcmds[AT_NUM_PASSES]; /* Lists of AlterTableCmd */
122         /* Information saved by Phases 1/2 for Phase 3: */
123         List       *constraints;        /* List of NewConstraint */
124         List       *newvals;            /* List of NewColumnValue */
125         Oid                     newTableSpace;  /* new tablespace; 0 means no change */
126         /* Objects to rebuild after completing ALTER TYPE operations */
127         List       *changedConstraintOids;      /* OIDs of constraints to rebuild */
128         List       *changedConstraintDefs;      /* string definitions of same */
129         List       *changedIndexOids;           /* OIDs of indexes to rebuild */
130         List       *changedIndexDefs;           /* string definitions of same */
131 } AlteredTableInfo;
132
133 /* Struct describing one new constraint to check in Phase 3 scan */
134 typedef struct NewConstraint
135 {
136         char       *name;                       /* Constraint name, or NULL if none */
137         ConstrType      contype;                /* CHECK, NOT_NULL, or FOREIGN */
138         AttrNumber      attnum;                 /* only relevant for NOT_NULL */
139         Oid                     refrelid;               /* PK rel, if FOREIGN */
140         Node       *qual;                       /* Check expr or FkConstraint struct */
141         List       *qualstate;          /* Execution state for CHECK */
142 } NewConstraint;
143
144 /*
145  * Struct describing one new column value that needs to be computed during
146  * Phase 3 copy (this could be either a new column with a non-null default, or
147  * a column that we're changing the type of).  Columns without such an entry
148  * are just copied from the old table during ATRewriteTable.  Note that the
149  * expr is an expression over *old* table values.
150  */
151 typedef struct NewColumnValue
152 {
153         AttrNumber      attnum;                 /* which column */
154         Expr       *expr;                       /* expression to compute */
155         ExprState  *exprstate;          /* execution state */
156 } NewColumnValue;
157
158
159 static List *MergeAttributes(List *schema, List *supers, bool istemp,
160                                 List **supOids, List **supconstr, int *supOidCount);
161 static bool change_varattnos_of_a_node(Node *node, const AttrNumber *newattno);
162 static void StoreCatalogInheritance(Oid relationId, List *supers);
163 static int      findAttrByName(const char *attributeName, List *schema);
164 static void setRelhassubclassInRelation(Oid relationId, bool relhassubclass);
165 static bool needs_toast_table(Relation rel);
166 static int transformColumnNameList(Oid relId, List *colList,
167                                                 int16 *attnums, Oid *atttypids);
168 static int transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
169                                                    List **attnamelist,
170                                                    int16 *attnums, Oid *atttypids,
171                                                    Oid *opclasses);
172 static Oid transformFkeyCheckAttrs(Relation pkrel,
173                                                 int numattrs, int16 *attnums,
174                                                 Oid *opclasses);
175 static void validateForeignKeyConstraint(FkConstraint *fkconstraint,
176                                                          Relation rel, Relation pkrel);
177 static void createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint,
178                                                  Oid constrOid);
179 static char *fkMatchTypeToString(char match_type);
180 static void ATController(Relation rel, List *cmds, bool recurse);
181 static void ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
182                   bool recurse, bool recursing);
183 static void ATRewriteCatalogs(List **wqueue);
184 static void ATExecCmd(AlteredTableInfo *tab, Relation rel, AlterTableCmd *cmd);
185 static void ATRewriteTables(List **wqueue);
186 static void ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap);
187 static AlteredTableInfo *ATGetQueueEntry(List **wqueue, Relation rel);
188 static void ATSimplePermissions(Relation rel, bool allowView);
189 static void ATSimpleRecursion(List **wqueue, Relation rel,
190                                   AlterTableCmd *cmd, bool recurse);
191 static void ATOneLevelRecursion(List **wqueue, Relation rel,
192                                         AlterTableCmd *cmd);
193 static void find_composite_type_dependencies(Oid typeOid,
194                                                                  const char *origTblName);
195 static void ATPrepAddColumn(List **wqueue, Relation rel, bool recurse,
196                                 AlterTableCmd *cmd);
197 static void ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
198                                 ColumnDef *colDef);
199 static void add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid);
200 static void add_column_support_dependency(Oid relid, int32 attnum,
201                                                           RangeVar *support);
202 static void ATExecDropNotNull(Relation rel, const char *colName);
203 static void ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
204                                  const char *colName);
205 static void ATExecColumnDefault(Relation rel, const char *colName,
206                                         Node *newDefault);
207 static void ATPrepSetStatistics(Relation rel, const char *colName,
208                                         Node *flagValue);
209 static void ATExecSetStatistics(Relation rel, const char *colName,
210                                         Node *newValue);
211 static void ATExecSetStorage(Relation rel, const char *colName,
212                                  Node *newValue);
213 static void ATExecDropColumn(Relation rel, const char *colName,
214                                  DropBehavior behavior,
215                                  bool recurse, bool recursing);
216 static void ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
217                            IndexStmt *stmt, bool is_rebuild);
218 static void ATExecAddConstraint(AlteredTableInfo *tab, Relation rel,
219                                         Node *newConstraint);
220 static void ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
221                                                   FkConstraint *fkconstraint);
222 static void ATPrepDropConstraint(List **wqueue, Relation rel,
223                                          bool recurse, AlterTableCmd *cmd);
224 static void ATExecDropConstraint(Relation rel, const char *constrName,
225                                          DropBehavior behavior, bool quiet);
226 static void ATPrepAlterColumnType(List **wqueue,
227                                           AlteredTableInfo *tab, Relation rel,
228                                           bool recurse, bool recursing,
229                                           AlterTableCmd *cmd);
230 static void ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
231                                           const char *colName, TypeName *typename);
232 static void ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab);
233 static void ATPostAlterTypeParse(char *cmd, List **wqueue);
234 static void ATExecChangeOwner(Oid relationOid, Oid newOwnerId);
235 static void change_owner_recurse_to_sequences(Oid relationOid,
236                                                                                           Oid newOwnerId);
237 static void ATExecClusterOn(Relation rel, const char *indexName);
238 static void ATExecDropCluster(Relation rel);
239 static void ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel,
240                                         char *tablespacename);
241 static void ATExecSetTableSpace(Oid tableOid, Oid newTableSpace);
242 static void copy_relation_data(Relation rel, SMgrRelation dst);
243 static void update_ri_trigger_args(Oid relid,
244                                            const char *oldname,
245                                            const char *newname,
246                                            bool fk_scan,
247                                            bool update_relname);
248
249
250 /* ----------------------------------------------------------------
251  *              DefineRelation
252  *                              Creates a new relation.
253  *
254  * If successful, returns the OID of the new relation.
255  * ----------------------------------------------------------------
256  */
257 Oid
258 DefineRelation(CreateStmt *stmt, char relkind)
259 {
260         char            relname[NAMEDATALEN];
261         Oid                     namespaceId;
262         List       *schema = stmt->tableElts;
263         Oid                     relationId;
264         Oid                     tablespaceId;
265         Relation        rel;
266         TupleDesc       descriptor;
267         List       *inheritOids;
268         List       *old_constraints;
269         bool            localHasOids;
270         int                     parentOidCount;
271         List       *rawDefaults;
272         ListCell   *listptr;
273         int                     i;
274         AttrNumber      attnum;
275
276         /*
277          * Truncate relname to appropriate length (probably a waste of time,
278          * as parser should have done this already).
279          */
280         StrNCpy(relname, stmt->relation->relname, NAMEDATALEN);
281
282         /*
283          * Check consistency of arguments
284          */
285         if (stmt->oncommit != ONCOMMIT_NOOP && !stmt->relation->istemp)
286                 ereport(ERROR,
287                                 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
288                           errmsg("ON COMMIT can only be used on temporary tables")));
289
290         /*
291          * Look up the namespace in which we are supposed to create the
292          * relation.  Check we have permission to create there. Skip check if
293          * bootstrapping, since permissions machinery may not be working yet.
294          */
295         namespaceId = RangeVarGetCreationNamespace(stmt->relation);
296
297         if (!IsBootstrapProcessingMode())
298         {
299                 AclResult       aclresult;
300
301                 aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(),
302                                                                                   ACL_CREATE);
303                 if (aclresult != ACLCHECK_OK)
304                         aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
305                                                    get_namespace_name(namespaceId));
306         }
307
308         /*
309          * Select tablespace to use.  If not specified, use default_tablespace
310          * (which may in turn default to database's default).
311          */
312         if (stmt->tablespacename)
313         {
314                 tablespaceId = get_tablespace_oid(stmt->tablespacename);
315                 if (!OidIsValid(tablespaceId))
316                         ereport(ERROR,
317                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
318                                          errmsg("tablespace \"%s\" does not exist",
319                                                         stmt->tablespacename)));
320         }
321         else
322         {
323                 tablespaceId = GetDefaultTablespace();
324                 /* note InvalidOid is OK in this case */
325         }
326
327         /* Check permissions except when using database's default */
328         if (OidIsValid(tablespaceId))
329         {
330                 AclResult       aclresult;
331
332                 aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(),
333                                                                                    ACL_CREATE);
334                 if (aclresult != ACLCHECK_OK)
335                         aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
336                                                    get_tablespace_name(tablespaceId));
337         }
338
339         /*
340          * Look up inheritance ancestors and generate relation schema,
341          * including inherited attributes.
342          */
343         schema = MergeAttributes(schema, stmt->inhRelations,
344                                                          stmt->relation->istemp,
345                                                 &inheritOids, &old_constraints, &parentOidCount);
346
347         /*
348          * Create a relation descriptor from the relation schema and create
349          * the relation.  Note that in this stage only inherited (pre-cooked)
350          * defaults and constraints will be included into the new relation.
351          * (BuildDescForRelation takes care of the inherited defaults, but we
352          * have to copy inherited constraints here.)
353          */
354         descriptor = BuildDescForRelation(schema);
355
356         localHasOids = interpretOidsOption(stmt->hasoids);
357         descriptor->tdhasoid = (localHasOids || parentOidCount > 0);
358
359         if (old_constraints != NIL)
360         {
361                 ConstrCheck *check = (ConstrCheck *)
362                 palloc0(list_length(old_constraints) * sizeof(ConstrCheck));
363                 int                     ncheck = 0;
364
365                 foreach(listptr, old_constraints)
366                 {
367                         Constraint *cdef = (Constraint *) lfirst(listptr);
368                         bool            dup = false;
369
370                         if (cdef->contype != CONSTR_CHECK)
371                                 continue;
372                         Assert(cdef->name != NULL);
373                         Assert(cdef->raw_expr == NULL && cdef->cooked_expr != NULL);
374
375                         /*
376                          * In multiple-inheritance situations, it's possible to
377                          * inherit the same grandparent constraint through multiple
378                          * parents. Hence, discard inherited constraints that match as
379                          * to both name and expression.  Otherwise, gripe if the names
380                          * conflict.
381                          */
382                         for (i = 0; i < ncheck; i++)
383                         {
384                                 if (strcmp(check[i].ccname, cdef->name) != 0)
385                                         continue;
386                                 if (strcmp(check[i].ccbin, cdef->cooked_expr) == 0)
387                                 {
388                                         dup = true;
389                                         break;
390                                 }
391                                 ereport(ERROR,
392                                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
393                                                  errmsg("duplicate check constraint name \"%s\"",
394                                                                 cdef->name)));
395                         }
396                         if (!dup)
397                         {
398                                 check[ncheck].ccname = cdef->name;
399                                 check[ncheck].ccbin = pstrdup(cdef->cooked_expr);
400                                 ncheck++;
401                         }
402                 }
403                 if (ncheck > 0)
404                 {
405                         if (descriptor->constr == NULL)
406                         {
407                                 descriptor->constr = (TupleConstr *) palloc(sizeof(TupleConstr));
408                                 descriptor->constr->defval = NULL;
409                                 descriptor->constr->num_defval = 0;
410                                 descriptor->constr->has_not_null = false;
411                         }
412                         descriptor->constr->num_check = ncheck;
413                         descriptor->constr->check = check;
414                 }
415         }
416
417         relationId = heap_create_with_catalog(relname,
418                                                                                   namespaceId,
419                                                                                   tablespaceId,
420                                                                                   InvalidOid,
421                                                                                   descriptor,
422                                                                                   relkind,
423                                                                                   false,
424                                                                                   localHasOids,
425                                                                                   parentOidCount,
426                                                                                   stmt->oncommit,
427                                                                                   allowSystemTableMods);
428
429         StoreCatalogInheritance(relationId, inheritOids);
430
431         /*
432          * We must bump the command counter to make the newly-created relation
433          * tuple visible for opening.
434          */
435         CommandCounterIncrement();
436
437         /*
438          * Open the new relation and acquire exclusive lock on it.      This isn't
439          * really necessary for locking out other backends (since they can't
440          * see the new rel anyway until we commit), but it keeps the lock
441          * manager from complaining about deadlock risks.
442          */
443         rel = relation_open(relationId, AccessExclusiveLock);
444
445         /*
446          * Now add any newly specified column default values and CHECK
447          * constraints to the new relation.  These are passed to us in the
448          * form of raw parsetrees; we need to transform them to executable
449          * expression trees before they can be added. The most convenient way
450          * to do that is to apply the parser's transformExpr routine, but
451          * transformExpr doesn't work unless we have a pre-existing relation.
452          * So, the transformation has to be postponed to this final step of
453          * CREATE TABLE.
454          *
455          * Another task that's conveniently done at this step is to add
456          * dependency links between columns and supporting relations (such as
457          * SERIAL sequences).
458          *
459          * First, scan schema to find new column defaults.
460          */
461         rawDefaults = NIL;
462         attnum = 0;
463
464         foreach(listptr, schema)
465         {
466                 ColumnDef  *colDef = lfirst(listptr);
467
468                 attnum++;
469
470                 if (colDef->raw_default != NULL)
471                 {
472                         RawColumnDefault *rawEnt;
473
474                         Assert(colDef->cooked_default == NULL);
475
476                         rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault));
477                         rawEnt->attnum = attnum;
478                         rawEnt->raw_default = colDef->raw_default;
479                         rawDefaults = lappend(rawDefaults, rawEnt);
480                 }
481
482                 /* Create dependency for supporting relation for this column */
483                 if (colDef->support != NULL)
484                         add_column_support_dependency(relationId, attnum, colDef->support);
485         }
486
487         /*
488          * Parse and add the defaults/constraints, if any.
489          */
490         if (rawDefaults || stmt->constraints)
491                 AddRelationRawConstraints(rel, rawDefaults, stmt->constraints);
492
493         /*
494          * Clean up.  We keep lock on new relation (although it shouldn't be
495          * visible to anyone else anyway, until commit).
496          */
497         relation_close(rel, NoLock);
498
499         return relationId;
500 }
501
502 /*
503  * RemoveRelation
504  *              Deletes a relation.
505  */
506 void
507 RemoveRelation(const RangeVar *relation, DropBehavior behavior)
508 {
509         Oid                     relOid;
510         ObjectAddress object;
511
512         relOid = RangeVarGetRelid(relation, false);
513
514         object.classId = RelationRelationId;
515         object.objectId = relOid;
516         object.objectSubId = 0;
517
518         performDeletion(&object, behavior);
519 }
520
521 /*
522  * ExecuteTruncate
523  *              Executes a TRUNCATE command.
524  *
525  * This is a multi-relation truncate.  It first opens and grabs exclusive
526  * locks on all relations involved, checking permissions and otherwise
527  * verifying that the relation is OK for truncation.  When they are all
528  * open, it checks foreign key references on them, namely that FK references
529  * are all internal to the group that's being truncated.  Finally all
530  * relations are truncated and reindexed.
531  */
532 void
533 ExecuteTruncate(List *relations)
534 {
535         List            *rels = NIL;
536         ListCell        *cell;
537
538         foreach(cell, relations)
539         {
540                 RangeVar   *rv = lfirst(cell);
541                 Relation        rel;
542
543                 /* Grab exclusive lock in preparation for truncate */
544                 rel = heap_openrv(rv, AccessExclusiveLock);
545
546                 /* Only allow truncate on regular tables */
547                 if (rel->rd_rel->relkind != RELKIND_RELATION)
548                         ereport(ERROR,
549                                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
550                                          errmsg("\"%s\" is not a table",
551                                                  RelationGetRelationName(rel))));
552
553                 /* Permissions checks */
554                 if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
555                         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
556                                         RelationGetRelationName(rel));
557
558                 if (!allowSystemTableMods && IsSystemRelation(rel))
559                         ereport(ERROR,
560                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
561                                          errmsg("permission denied: \"%s\" is a system catalog",
562                                                  RelationGetRelationName(rel))));
563
564                 /*
565                  * We can never allow truncation of shared or nailed-in-cache
566                  * relations, because we can't support changing their relfilenode
567                  * values.
568                  */
569                 if (rel->rd_rel->relisshared || rel->rd_isnailed)
570                         ereport(ERROR,
571                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
572                                          errmsg("cannot truncate system relation \"%s\"",
573                                                  RelationGetRelationName(rel))));
574
575                 /*
576                  * Don't allow truncate on temp tables of other backends ... their
577                  * local buffer manager is not going to cope.
578                  */
579                 if (isOtherTempNamespace(RelationGetNamespace(rel)))
580                         ereport(ERROR,
581                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
582                                          errmsg("cannot truncate temporary tables of other sessions")));
583
584                 /* Save it into the list of rels to truncate */
585                 rels = lappend(rels, rel);
586         }
587
588         /*
589          * Check foreign key references.
590          */
591         heap_truncate_check_FKs(rels, false);
592
593         /*
594          * OK, truncate each table.
595          */
596         foreach(cell, rels)
597         {
598                 Relation        rel = lfirst(cell);
599                 Oid                     heap_relid;
600                 Oid                     toast_relid;
601
602                 /*
603                  * Create a new empty storage file for the relation, and assign it as
604                  * the relfilenode value.       The old storage file is scheduled for
605                  * deletion at commit.
606                  */
607                 setNewRelfilenode(rel);
608
609                 heap_relid = RelationGetRelid(rel);
610                 toast_relid = rel->rd_rel->reltoastrelid;
611
612                 heap_close(rel, NoLock);
613
614                 /*
615                  * The same for the toast table, if any.
616                  */
617                 if (OidIsValid(toast_relid))
618                 {
619                         rel = relation_open(toast_relid, AccessExclusiveLock);
620                         setNewRelfilenode(rel);
621                         heap_close(rel, NoLock);
622                 }
623
624                 /*
625                  * Reconstruct the indexes to match, and we're done.
626                  */
627                 reindex_relation(heap_relid, true);
628         }
629 }
630
631 /*----------
632  * MergeAttributes
633  *              Returns new schema given initial schema and superclasses.
634  *
635  * Input arguments:
636  * 'schema' is the column/attribute definition for the table. (It's a list
637  *              of ColumnDef's.) It is destructively changed.
638  * 'supers' is a list of names (as RangeVar nodes) of parent relations.
639  * 'istemp' is TRUE if we are creating a temp relation.
640  *
641  * Output arguments:
642  * 'supOids' receives a list of the OIDs of the parent relations.
643  * 'supconstr' receives a list of constraints belonging to the parents,
644  *              updated as necessary to be valid for the child.
645  * 'supOidCount' is set to the number of parents that have OID columns.
646  *
647  * Return value:
648  * Completed schema list.
649  *
650  * Notes:
651  *        The order in which the attributes are inherited is very important.
652  *        Intuitively, the inherited attributes should come first. If a table
653  *        inherits from multiple parents, the order of those attributes are
654  *        according to the order of the parents specified in CREATE TABLE.
655  *
656  *        Here's an example:
657  *
658  *              create table person (name text, age int4, location point);
659  *              create table emp (salary int4, manager text) inherits(person);
660  *              create table student (gpa float8) inherits (person);
661  *              create table stud_emp (percent int4) inherits (emp, student);
662  *
663  *        The order of the attributes of stud_emp is:
664  *
665  *                                                      person {1:name, 2:age, 3:location}
666  *                                                      /        \
667  *                         {6:gpa}      student   emp {4:salary, 5:manager}
668  *                                                      \        /
669  *                                                 stud_emp {7:percent}
670  *
671  *         If the same attribute name appears multiple times, then it appears
672  *         in the result table in the proper location for its first appearance.
673  *
674  *         Constraints (including NOT NULL constraints) for the child table
675  *         are the union of all relevant constraints, from both the child schema
676  *         and parent tables.
677  *
678  *         The default value for a child column is defined as:
679  *              (1) If the child schema specifies a default, that value is used.
680  *              (2) If neither the child nor any parent specifies a default, then
681  *                      the column will not have a default.
682  *              (3) If conflicting defaults are inherited from different parents
683  *                      (and not overridden by the child), an error is raised.
684  *              (4) Otherwise the inherited default is used.
685  *              Rule (3) is new in Postgres 7.1; in earlier releases you got a
686  *              rather arbitrary choice of which parent default to use.
687  *----------
688  */
689 static List *
690 MergeAttributes(List *schema, List *supers, bool istemp,
691                                 List **supOids, List **supconstr, int *supOidCount)
692 {
693         ListCell   *entry;
694         List       *inhSchema = NIL;
695         List       *parentOids = NIL;
696         List       *constraints = NIL;
697         int                     parentsWithOids = 0;
698         bool            have_bogus_defaults = false;
699         char       *bogus_marker = "Bogus!";            /* marks conflicting
700                                                                                                  * defaults */
701         int                     child_attno;
702
703         /*
704          * Check for and reject tables with too many columns. We perform
705          * this check relatively early for two reasons: (a) we don't run
706          * the risk of overflowing an AttrNumber in subsequent code (b) an
707          * O(n^2) algorithm is okay if we're processing <= 1600 columns,
708          * but could take minutes to execute if the user attempts to
709          * create a table with hundreds of thousands of columns.
710          *
711          * Note that we also need to check that any we do not exceed this
712          * figure after including columns from inherited relations.
713          */
714         if (list_length(schema) > MaxHeapAttributeNumber)
715                 ereport(ERROR,
716                                 (errcode(ERRCODE_TOO_MANY_COLUMNS),
717                                  errmsg("tables can have at most %d columns",
718                                                 MaxHeapAttributeNumber)));
719
720         /*
721          * Check for duplicate names in the explicit list of attributes.
722          *
723          * Although we might consider merging such entries in the same way that
724          * we handle name conflicts for inherited attributes, it seems to make
725          * more sense to assume such conflicts are errors.
726          */
727         foreach(entry, schema)
728         {
729                 ColumnDef  *coldef = lfirst(entry);
730                 ListCell   *rest;
731
732                 for_each_cell(rest, lnext(entry))
733                 {
734                         ColumnDef  *restdef = lfirst(rest);
735
736                         if (strcmp(coldef->colname, restdef->colname) == 0)
737                                 ereport(ERROR,
738                                                 (errcode(ERRCODE_DUPLICATE_COLUMN),
739                                                  errmsg("column \"%s\" duplicated",
740                                                                 coldef->colname)));
741                 }
742         }
743
744         /*
745          * Scan the parents left-to-right, and merge their attributes to form
746          * a list of inherited attributes (inhSchema).  Also check to see if
747          * we need to inherit an OID column.
748          */
749         child_attno = 0;
750         foreach(entry, supers)
751         {
752                 RangeVar   *parent = (RangeVar *) lfirst(entry);
753                 Relation        relation;
754                 TupleDesc       tupleDesc;
755                 TupleConstr *constr;
756                 AttrNumber *newattno;
757                 AttrNumber      parent_attno;
758
759                 relation = heap_openrv(parent, AccessShareLock);
760
761                 if (relation->rd_rel->relkind != RELKIND_RELATION)
762                         ereport(ERROR,
763                                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
764                                          errmsg("inherited relation \"%s\" is not a table",
765                                                         parent->relname)));
766                 /* Permanent rels cannot inherit from temporary ones */
767                 if (!istemp && isTempNamespace(RelationGetNamespace(relation)))
768                         ereport(ERROR,
769                                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
770                                   errmsg("cannot inherit from temporary relation \"%s\"",
771                                                  parent->relname)));
772
773                 /*
774                  * We should have an UNDER permission flag for this, but for now,
775                  * demand that creator of a child table own the parent.
776                  */
777                 if (!pg_class_ownercheck(RelationGetRelid(relation), GetUserId()))
778                         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
779                                                    RelationGetRelationName(relation));
780
781                 /*
782                  * Reject duplications in the list of parents.
783                  */
784                 if (list_member_oid(parentOids, RelationGetRelid(relation)))
785                         ereport(ERROR,
786                                         (errcode(ERRCODE_DUPLICATE_TABLE),
787                                          errmsg("inherited relation \"%s\" duplicated",
788                                                         parent->relname)));
789
790                 parentOids = lappend_oid(parentOids, RelationGetRelid(relation));
791
792                 if (relation->rd_rel->relhasoids)
793                         parentsWithOids++;
794
795                 tupleDesc = RelationGetDescr(relation);
796                 constr = tupleDesc->constr;
797
798                 /*
799                  * newattno[] will contain the child-table attribute numbers for
800                  * the attributes of this parent table.  (They are not the same
801                  * for parents after the first one, nor if we have dropped
802                  * columns.)
803                  */
804                 newattno = (AttrNumber *)
805                         palloc(tupleDesc->natts * sizeof(AttrNumber));
806
807                 for (parent_attno = 1; parent_attno <= tupleDesc->natts;
808                          parent_attno++)
809                 {
810                         Form_pg_attribute attribute = tupleDesc->attrs[parent_attno - 1];
811                         char       *attributeName = NameStr(attribute->attname);
812                         int                     exist_attno;
813                         ColumnDef  *def;
814                         TypeName   *typename;
815
816                         /*
817                          * Ignore dropped columns in the parent.
818                          */
819                         if (attribute->attisdropped)
820                         {
821                                 /*
822                                  * change_varattnos_of_a_node asserts that this is greater
823                                  * than zero, so if anything tries to use it, we should
824                                  * find out.
825                                  */
826                                 newattno[parent_attno - 1] = 0;
827                                 continue;
828                         }
829
830                         /*
831                          * Does it conflict with some previously inherited column?
832                          */
833                         exist_attno = findAttrByName(attributeName, inhSchema);
834                         if (exist_attno > 0)
835                         {
836                                 /*
837                                  * Yes, try to merge the two column definitions. They must
838                                  * have the same type and typmod.
839                                  */
840                                 ereport(NOTICE,
841                                                 (errmsg("merging multiple inherited definitions of column \"%s\"",
842                                                                 attributeName)));
843                                 def = (ColumnDef *) list_nth(inhSchema, exist_attno - 1);
844                                 if (typenameTypeId(def->typename) != attribute->atttypid ||
845                                         def->typename->typmod != attribute->atttypmod)
846                                         ereport(ERROR,
847                                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
848                                         errmsg("inherited column \"%s\" has a type conflict",
849                                                    attributeName),
850                                                          errdetail("%s versus %s",
851                                                                            TypeNameToString(def->typename),
852                                                                   format_type_be(attribute->atttypid))));
853                                 def->inhcount++;
854                                 /* Merge of NOT NULL constraints = OR 'em together */
855                                 def->is_not_null |= attribute->attnotnull;
856                                 /* Default and other constraints are handled below */
857                                 newattno[parent_attno - 1] = exist_attno;
858                         }
859                         else
860                         {
861                                 /*
862                                  * No, create a new inherited column
863                                  */
864                                 def = makeNode(ColumnDef);
865                                 def->colname = pstrdup(attributeName);
866                                 typename = makeNode(TypeName);
867                                 typename->typeid = attribute->atttypid;
868                                 typename->typmod = attribute->atttypmod;
869                                 def->typename = typename;
870                                 def->inhcount = 1;
871                                 def->is_local = false;
872                                 def->is_not_null = attribute->attnotnull;
873                                 def->raw_default = NULL;
874                                 def->cooked_default = NULL;
875                                 def->constraints = NIL;
876                                 def->support = NULL;
877                                 inhSchema = lappend(inhSchema, def);
878                                 newattno[parent_attno - 1] = ++child_attno;
879                         }
880
881                         /*
882                          * Copy default if any
883                          */
884                         if (attribute->atthasdef)
885                         {
886                                 char       *this_default = NULL;
887                                 AttrDefault *attrdef;
888                                 int                     i;
889
890                                 /* Find default in constraint structure */
891                                 Assert(constr != NULL);
892                                 attrdef = constr->defval;
893                                 for (i = 0; i < constr->num_defval; i++)
894                                 {
895                                         if (attrdef[i].adnum == parent_attno)
896                                         {
897                                                 this_default = attrdef[i].adbin;
898                                                 break;
899                                         }
900                                 }
901                                 Assert(this_default != NULL);
902
903                                 /*
904                                  * If default expr could contain any vars, we'd need to
905                                  * fix 'em, but it can't; so default is ready to apply to
906                                  * child.
907                                  *
908                                  * If we already had a default from some prior parent, check
909                                  * to see if they are the same.  If so, no problem; if
910                                  * not, mark the column as having a bogus default. Below,
911                                  * we will complain if the bogus default isn't overridden
912                                  * by the child schema.
913                                  */
914                                 Assert(def->raw_default == NULL);
915                                 if (def->cooked_default == NULL)
916                                         def->cooked_default = pstrdup(this_default);
917                                 else if (strcmp(def->cooked_default, this_default) != 0)
918                                 {
919                                         def->cooked_default = bogus_marker;
920                                         have_bogus_defaults = true;
921                                 }
922                         }
923                 }
924
925                 /*
926                  * Now copy the constraints of this parent, adjusting attnos using
927                  * the completed newattno[] map
928                  */
929                 if (constr && constr->num_check > 0)
930                 {
931                         ConstrCheck *check = constr->check;
932                         int                     i;
933
934                         for (i = 0; i < constr->num_check; i++)
935                         {
936                                 Constraint *cdef = makeNode(Constraint);
937                                 Node       *expr;
938
939                                 cdef->contype = CONSTR_CHECK;
940                                 cdef->name = pstrdup(check[i].ccname);
941                                 cdef->raw_expr = NULL;
942                                 /* adjust varattnos of ccbin here */
943                                 expr = stringToNode(check[i].ccbin);
944                                 change_varattnos_of_a_node(expr, newattno);
945                                 cdef->cooked_expr = nodeToString(expr);
946                                 constraints = lappend(constraints, cdef);
947                         }
948                 }
949
950                 pfree(newattno);
951
952                 /*
953                  * Close the parent rel, but keep our AccessShareLock on it until
954                  * xact commit.  That will prevent someone else from deleting or
955                  * ALTERing the parent before the child is committed.
956                  */
957                 heap_close(relation, NoLock);
958         }
959
960         /*
961          * If we had no inherited attributes, the result schema is just the
962          * explicitly declared columns.  Otherwise, we need to merge the
963          * declared columns into the inherited schema list.
964          */
965         if (inhSchema != NIL)
966         {
967                 foreach(entry, schema)
968                 {
969                         ColumnDef  *newdef = lfirst(entry);
970                         char       *attributeName = newdef->colname;
971                         int                     exist_attno;
972
973                         /*
974                          * Does it conflict with some previously inherited column?
975                          */
976                         exist_attno = findAttrByName(attributeName, inhSchema);
977                         if (exist_attno > 0)
978                         {
979                                 ColumnDef  *def;
980
981                                 /*
982                                  * Yes, try to merge the two column definitions. They must
983                                  * have the same type and typmod.
984                                  */
985                                 ereport(NOTICE,
986                                 (errmsg("merging column \"%s\" with inherited definition",
987                                                 attributeName)));
988                                 def = (ColumnDef *) list_nth(inhSchema, exist_attno - 1);
989                                 if (typenameTypeId(def->typename) != typenameTypeId(newdef->typename) ||
990                                         def->typename->typmod != newdef->typename->typmod)
991                                         ereport(ERROR,
992                                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
993                                                          errmsg("column \"%s\" has a type conflict",
994                                                                         attributeName),
995                                                          errdetail("%s versus %s",
996                                                                            TypeNameToString(def->typename),
997                                                                    TypeNameToString(newdef->typename))));
998                                 /* Mark the column as locally defined */
999                                 def->is_local = true;
1000                                 /* Merge of NOT NULL constraints = OR 'em together */
1001                                 def->is_not_null |= newdef->is_not_null;
1002                                 /* If new def has a default, override previous default */
1003                                 if (newdef->raw_default != NULL)
1004                                 {
1005                                         def->raw_default = newdef->raw_default;
1006                                         def->cooked_default = newdef->cooked_default;
1007                                 }
1008                         }
1009                         else
1010                         {
1011                                 /*
1012                                  * No, attach new column to result schema
1013                                  */
1014                                 inhSchema = lappend(inhSchema, newdef);
1015                         }
1016                 }
1017
1018                 schema = inhSchema;
1019
1020                 /*
1021                  * Check that we haven't exceeded the legal # of columns after
1022                  * merging in inherited columns.
1023                  */
1024                 if (list_length(schema) > MaxHeapAttributeNumber)
1025                         ereport(ERROR,
1026                                         (errcode(ERRCODE_TOO_MANY_COLUMNS),
1027                                          errmsg("tables can have at most %d columns",
1028                                                         MaxHeapAttributeNumber)));
1029         }
1030
1031         /*
1032          * If we found any conflicting parent default values, check to make
1033          * sure they were overridden by the child.
1034          */
1035         if (have_bogus_defaults)
1036         {
1037                 foreach(entry, schema)
1038                 {
1039                         ColumnDef  *def = lfirst(entry);
1040
1041                         if (def->cooked_default == bogus_marker)
1042                                 ereport(ERROR,
1043                                                 (errcode(ERRCODE_INVALID_COLUMN_DEFINITION),
1044                                                  errmsg("column \"%s\" inherits conflicting default values",
1045                                                                 def->colname),
1046                                                  errhint("To resolve the conflict, specify a default explicitly.")));
1047                 }
1048         }
1049
1050         *supOids = parentOids;
1051         *supconstr = constraints;
1052         *supOidCount = parentsWithOids;
1053         return schema;
1054 }
1055
1056 /*
1057  * complementary static functions for MergeAttributes().
1058  *
1059  * Varattnos of pg_constraint.conbin must be rewritten when subclasses inherit
1060  * constraints from parent classes, since the inherited attributes could
1061  * be given different column numbers in multiple-inheritance cases.
1062  *
1063  * Note that the passed node tree is modified in place!
1064  */
1065 static bool
1066 change_varattnos_walker(Node *node, const AttrNumber *newattno)
1067 {
1068         if (node == NULL)
1069                 return false;
1070         if (IsA(node, Var))
1071         {
1072                 Var                *var = (Var *) node;
1073
1074                 if (var->varlevelsup == 0 && var->varno == 1 &&
1075                         var->varattno > 0)
1076                 {
1077                         /*
1078                          * ??? the following may be a problem when the node is
1079                          * multiply referenced though stringToNode() doesn't create
1080                          * such a node currently.
1081                          */
1082                         Assert(newattno[var->varattno - 1] > 0);
1083                         var->varattno = newattno[var->varattno - 1];
1084                 }
1085                 return false;
1086         }
1087         return expression_tree_walker(node, change_varattnos_walker,
1088                                                                   (void *) newattno);
1089 }
1090
1091 static bool
1092 change_varattnos_of_a_node(Node *node, const AttrNumber *newattno)
1093 {
1094         return change_varattnos_walker(node, newattno);
1095 }
1096
1097 /*
1098  * StoreCatalogInheritance
1099  *              Updates the system catalogs with proper inheritance information.
1100  *
1101  * supers is a list of the OIDs of the new relation's direct ancestors.
1102  */
1103 static void
1104 StoreCatalogInheritance(Oid relationId, List *supers)
1105 {
1106         Relation        relation;
1107         TupleDesc       desc;
1108         int16           seqNumber;
1109         ListCell   *entry;
1110         HeapTuple       tuple;
1111
1112         /*
1113          * sanity checks
1114          */
1115         AssertArg(OidIsValid(relationId));
1116
1117         if (supers == NIL)
1118                 return;
1119
1120         /*
1121          * Store INHERITS information in pg_inherits using direct ancestors
1122          * only. Also enter dependencies on the direct ancestors, and make
1123          * sure they are marked with relhassubclass = true.
1124          *
1125          * (Once upon a time, both direct and indirect ancestors were found here
1126          * and then entered into pg_ipl.  Since that catalog doesn't exist
1127          * anymore, there's no need to look for indirect ancestors.)
1128          */
1129         relation = heap_open(InheritsRelationId, RowExclusiveLock);
1130         desc = RelationGetDescr(relation);
1131
1132         seqNumber = 1;
1133         foreach(entry, supers)
1134         {
1135                 Oid                     parentOid = lfirst_oid(entry);
1136                 Datum           datum[Natts_pg_inherits];
1137                 char            nullarr[Natts_pg_inherits];
1138                 ObjectAddress childobject,
1139                                         parentobject;
1140
1141                 datum[0] = ObjectIdGetDatum(relationId);                /* inhrel */
1142                 datum[1] = ObjectIdGetDatum(parentOid); /* inhparent */
1143                 datum[2] = Int16GetDatum(seqNumber);    /* inhseqno */
1144
1145                 nullarr[0] = ' ';
1146                 nullarr[1] = ' ';
1147                 nullarr[2] = ' ';
1148
1149                 tuple = heap_formtuple(desc, datum, nullarr);
1150
1151                 simple_heap_insert(relation, tuple);
1152
1153                 CatalogUpdateIndexes(relation, tuple);
1154
1155                 heap_freetuple(tuple);
1156
1157                 /*
1158                  * Store a dependency too
1159                  */
1160                 parentobject.classId = RelationRelationId;
1161                 parentobject.objectId = parentOid;
1162                 parentobject.objectSubId = 0;
1163                 childobject.classId = RelationRelationId;
1164                 childobject.objectId = relationId;
1165                 childobject.objectSubId = 0;
1166
1167                 recordDependencyOn(&childobject, &parentobject, DEPENDENCY_NORMAL);
1168
1169                 /*
1170                  * Mark the parent as having subclasses.
1171                  */
1172                 setRelhassubclassInRelation(parentOid, true);
1173
1174                 seqNumber += 1;
1175         }
1176
1177         heap_close(relation, RowExclusiveLock);
1178 }
1179
1180 /*
1181  * Look for an existing schema entry with the given name.
1182  *
1183  * Returns the index (starting with 1) if attribute already exists in schema,
1184  * 0 if it doesn't.
1185  */
1186 static int
1187 findAttrByName(const char *attributeName, List *schema)
1188 {
1189         ListCell   *s;
1190         int                     i = 1;
1191
1192         foreach(s, schema)
1193         {
1194                 ColumnDef  *def = lfirst(s);
1195
1196                 if (strcmp(attributeName, def->colname) == 0)
1197                         return i;
1198
1199                 i++;
1200         }
1201         return 0;
1202 }
1203
1204 /*
1205  * Update a relation's pg_class.relhassubclass entry to the given value
1206  */
1207 static void
1208 setRelhassubclassInRelation(Oid relationId, bool relhassubclass)
1209 {
1210         Relation        relationRelation;
1211         HeapTuple       tuple;
1212         Form_pg_class classtuple;
1213
1214         /*
1215          * Fetch a modifiable copy of the tuple, modify it, update pg_class.
1216          *
1217          * If the tuple already has the right relhassubclass setting, we don't
1218          * need to update it, but we still need to issue an SI inval message.
1219          */
1220         relationRelation = heap_open(RelationRelationId, RowExclusiveLock);
1221         tuple = SearchSysCacheCopy(RELOID,
1222                                                            ObjectIdGetDatum(relationId),
1223                                                            0, 0, 0);
1224         if (!HeapTupleIsValid(tuple))
1225                 elog(ERROR, "cache lookup failed for relation %u", relationId);
1226         classtuple = (Form_pg_class) GETSTRUCT(tuple);
1227
1228         if (classtuple->relhassubclass != relhassubclass)
1229         {
1230                 classtuple->relhassubclass = relhassubclass;
1231                 simple_heap_update(relationRelation, &tuple->t_self, tuple);
1232
1233                 /* keep the catalog indexes up to date */
1234                 CatalogUpdateIndexes(relationRelation, tuple);
1235         }
1236         else
1237         {
1238                 /* no need to change tuple, but force relcache rebuild anyway */
1239                 CacheInvalidateRelcacheByTuple(tuple);
1240         }
1241
1242         heap_freetuple(tuple);
1243         heap_close(relationRelation, RowExclusiveLock);
1244 }
1245
1246
1247 /*
1248  *              renameatt               - changes the name of a attribute in a relation
1249  *
1250  *              Attname attribute is changed in attribute catalog.
1251  *              No record of the previous attname is kept (correct?).
1252  *
1253  *              get proper relrelation from relation catalog (if not arg)
1254  *              scan attribute catalog
1255  *                              for name conflict (within rel)
1256  *                              for original attribute (if not arg)
1257  *              modify attname in attribute tuple
1258  *              insert modified attribute in attribute catalog
1259  *              delete original attribute from attribute catalog
1260  */
1261 void
1262 renameatt(Oid myrelid,
1263                   const char *oldattname,
1264                   const char *newattname,
1265                   bool recurse,
1266                   bool recursing)
1267 {
1268         Relation        targetrelation;
1269         Relation        attrelation;
1270         HeapTuple       atttup;
1271         Form_pg_attribute attform;
1272         int                     attnum;
1273         List       *indexoidlist;
1274         ListCell   *indexoidscan;
1275
1276         /*
1277          * Grab an exclusive lock on the target table, which we will NOT
1278          * release until end of transaction.
1279          */
1280         targetrelation = relation_open(myrelid, AccessExclusiveLock);
1281
1282         /*
1283          * permissions checking.  this would normally be done in utility.c,
1284          * but this particular routine is recursive.
1285          *
1286          * normally, only the owner of a class can change its schema.
1287          */
1288         if (!pg_class_ownercheck(myrelid, GetUserId()))
1289                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
1290                                            RelationGetRelationName(targetrelation));
1291         if (!allowSystemTableMods && IsSystemRelation(targetrelation))
1292                 ereport(ERROR,
1293                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1294                                  errmsg("permission denied: \"%s\" is a system catalog",
1295                                                 RelationGetRelationName(targetrelation))));
1296
1297         /*
1298          * if the 'recurse' flag is set then we are supposed to rename this
1299          * attribute in all classes that inherit from 'relname' (as well as in
1300          * 'relname').
1301          *
1302          * any permissions or problems with duplicate attributes will cause the
1303          * whole transaction to abort, which is what we want -- all or
1304          * nothing.
1305          */
1306         if (recurse)
1307         {
1308                 ListCell   *child;
1309                 List       *children;
1310
1311                 /* this routine is actually in the planner */
1312                 children = find_all_inheritors(myrelid);
1313
1314                 /*
1315                  * find_all_inheritors does the recursive search of the
1316                  * inheritance hierarchy, so all we have to do is process all of
1317                  * the relids in the list that it returns.
1318                  */
1319                 foreach(child, children)
1320                 {
1321                         Oid                     childrelid = lfirst_oid(child);
1322
1323                         if (childrelid == myrelid)
1324                                 continue;
1325                         /* note we need not recurse again */
1326                         renameatt(childrelid, oldattname, newattname, false, true);
1327                 }
1328         }
1329         else
1330         {
1331                 /*
1332                  * If we are told not to recurse, there had better not be any
1333                  * child tables; else the rename would put them out of step.
1334                  */
1335                 if (!recursing &&
1336                         find_inheritance_children(myrelid) != NIL)
1337                         ereport(ERROR,
1338                                         (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
1339                                          errmsg("inherited column \"%s\" must be renamed in child tables too",
1340                                                         oldattname)));
1341         }
1342
1343         attrelation = heap_open(AttributeRelationId, RowExclusiveLock);
1344
1345         atttup = SearchSysCacheCopyAttName(myrelid, oldattname);
1346         if (!HeapTupleIsValid(atttup))
1347                 ereport(ERROR,
1348                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
1349                                  errmsg("column \"%s\" does not exist",
1350                                                 oldattname)));
1351         attform = (Form_pg_attribute) GETSTRUCT(atttup);
1352
1353         attnum = attform->attnum;
1354         if (attnum <= 0)
1355                 ereport(ERROR,
1356                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1357                                  errmsg("cannot rename system column \"%s\"",
1358                                                 oldattname)));
1359
1360         /*
1361          * if the attribute is inherited, forbid the renaming, unless we are
1362          * already inside a recursive rename.
1363          */
1364         if (attform->attinhcount > 0 && !recursing)
1365                 ereport(ERROR,
1366                                 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
1367                                  errmsg("cannot rename inherited column \"%s\"",
1368                                                 oldattname)));
1369
1370         /* should not already exist */
1371         /* this test is deliberately not attisdropped-aware */
1372         if (SearchSysCacheExists(ATTNAME,
1373                                                          ObjectIdGetDatum(myrelid),
1374                                                          PointerGetDatum(newattname),
1375                                                          0, 0))
1376                 ereport(ERROR,
1377                                 (errcode(ERRCODE_DUPLICATE_COLUMN),
1378                                  errmsg("column \"%s\" of relation \"%s\" already exists",
1379                                   newattname, RelationGetRelationName(targetrelation))));
1380
1381         namestrcpy(&(attform->attname), newattname);
1382
1383         simple_heap_update(attrelation, &atttup->t_self, atttup);
1384
1385         /* keep system catalog indexes current */
1386         CatalogUpdateIndexes(attrelation, atttup);
1387
1388         heap_freetuple(atttup);
1389
1390         /*
1391          * Update column names of indexes that refer to the column being
1392          * renamed.
1393          */
1394         indexoidlist = RelationGetIndexList(targetrelation);
1395
1396         foreach(indexoidscan, indexoidlist)
1397         {
1398                 Oid                     indexoid = lfirst_oid(indexoidscan);
1399                 HeapTuple       indextup;
1400                 Form_pg_index indexform;
1401                 int                     i;
1402
1403                 /*
1404                  * Scan through index columns to see if there's any simple index
1405                  * entries for this attribute.  We ignore expressional entries.
1406                  */
1407                 indextup = SearchSysCache(INDEXRELID,
1408                                                                   ObjectIdGetDatum(indexoid),
1409                                                                   0, 0, 0);
1410                 if (!HeapTupleIsValid(indextup))
1411                         elog(ERROR, "cache lookup failed for index %u", indexoid);
1412                 indexform = (Form_pg_index) GETSTRUCT(indextup);
1413
1414                 for (i = 0; i < indexform->indnatts; i++)
1415                 {
1416                         if (attnum != indexform->indkey.values[i])
1417                                 continue;
1418
1419                         /*
1420                          * Found one, rename it.
1421                          */
1422                         atttup = SearchSysCacheCopy(ATTNUM,
1423                                                                                 ObjectIdGetDatum(indexoid),
1424                                                                                 Int16GetDatum(i + 1),
1425                                                                                 0, 0);
1426                         if (!HeapTupleIsValid(atttup))
1427                                 continue;               /* should we raise an error? */
1428
1429                         /*
1430                          * Update the (copied) attribute tuple.
1431                          */
1432                         namestrcpy(&(((Form_pg_attribute) GETSTRUCT(atttup))->attname),
1433                                            newattname);
1434
1435                         simple_heap_update(attrelation, &atttup->t_self, atttup);
1436
1437                         /* keep system catalog indexes current */
1438                         CatalogUpdateIndexes(attrelation, atttup);
1439
1440                         heap_freetuple(atttup);
1441                 }
1442
1443                 ReleaseSysCache(indextup);
1444         }
1445
1446         list_free(indexoidlist);
1447
1448         heap_close(attrelation, RowExclusiveLock);
1449
1450         /*
1451          * Update att name in any RI triggers associated with the relation.
1452          */
1453         if (targetrelation->rd_rel->reltriggers > 0)
1454         {
1455                 /* update tgargs column reference where att is primary key */
1456                 update_ri_trigger_args(RelationGetRelid(targetrelation),
1457                                                            oldattname, newattname,
1458                                                            false, false);
1459                 /* update tgargs column reference where att is foreign key */
1460                 update_ri_trigger_args(RelationGetRelid(targetrelation),
1461                                                            oldattname, newattname,
1462                                                            true, false);
1463         }
1464
1465         relation_close(targetrelation, NoLock);         /* close rel but keep lock */
1466 }
1467
1468 /*
1469  *              renamerel               - change the name of a relation
1470  *
1471  *              XXX - When renaming sequences, we don't bother to modify the
1472  *                        sequence name that is stored within the sequence itself
1473  *                        (this would cause problems with MVCC). In the future,
1474  *                        the sequence name should probably be removed from the
1475  *                        sequence, AFAIK there's no need for it to be there.
1476  */
1477 void
1478 renamerel(Oid myrelid, const char *newrelname)
1479 {
1480         Relation        targetrelation;
1481         Relation        relrelation;    /* for RELATION relation */
1482         HeapTuple       reltup;
1483         Oid                     namespaceId;
1484         char       *oldrelname;
1485         char            relkind;
1486         bool            relhastriggers;
1487
1488         /*
1489          * Grab an exclusive lock on the target table or index, which we will
1490          * NOT release until end of transaction.
1491          */
1492         targetrelation = relation_open(myrelid, AccessExclusiveLock);
1493
1494         oldrelname = pstrdup(RelationGetRelationName(targetrelation));
1495         namespaceId = RelationGetNamespace(targetrelation);
1496
1497         if (!allowSystemTableMods && IsSystemRelation(targetrelation))
1498                 ereport(ERROR,
1499                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1500                                  errmsg("permission denied: \"%s\" is a system catalog",
1501                                                 RelationGetRelationName(targetrelation))));
1502
1503         relkind = targetrelation->rd_rel->relkind;
1504         relhastriggers = (targetrelation->rd_rel->reltriggers > 0);
1505
1506         /*
1507          * Find relation's pg_class tuple, and make sure newrelname isn't in
1508          * use.
1509          */
1510         relrelation = heap_open(RelationRelationId, RowExclusiveLock);
1511
1512         reltup = SearchSysCacheCopy(RELOID,
1513                                                                 PointerGetDatum(myrelid),
1514                                                                 0, 0, 0);
1515         if (!HeapTupleIsValid(reltup))          /* shouldn't happen */
1516                 elog(ERROR, "cache lookup failed for relation %u", myrelid);
1517
1518         if (get_relname_relid(newrelname, namespaceId) != InvalidOid)
1519                 ereport(ERROR,
1520                                 (errcode(ERRCODE_DUPLICATE_TABLE),
1521                                  errmsg("relation \"%s\" already exists",
1522                                                 newrelname)));
1523
1524         /*
1525          * Update pg_class tuple with new relname.      (Scribbling on reltup is
1526          * OK because it's a copy...)
1527          */
1528         namestrcpy(&(((Form_pg_class) GETSTRUCT(reltup))->relname), newrelname);
1529
1530         simple_heap_update(relrelation, &reltup->t_self, reltup);
1531
1532         /* keep the system catalog indexes current */
1533         CatalogUpdateIndexes(relrelation, reltup);
1534
1535         heap_freetuple(reltup);
1536         heap_close(relrelation, RowExclusiveLock);
1537
1538         /*
1539          * Also rename the associated type, if any.
1540          */
1541         if (relkind != RELKIND_INDEX)
1542                 TypeRename(oldrelname, namespaceId, newrelname);
1543
1544         /*
1545          * Update rel name in any RI triggers associated with the relation.
1546          */
1547         if (relhastriggers)
1548         {
1549                 /* update tgargs where relname is primary key */
1550                 update_ri_trigger_args(myrelid,
1551                                                            oldrelname,
1552                                                            newrelname,
1553                                                            false, true);
1554                 /* update tgargs where relname is foreign key */
1555                 update_ri_trigger_args(myrelid,
1556                                                            oldrelname,
1557                                                            newrelname,
1558                                                            true, true);
1559         }
1560
1561         /*
1562          * Close rel, but keep exclusive lock!
1563          */
1564         relation_close(targetrelation, NoLock);
1565 }
1566
1567 /*
1568  * Scan pg_trigger for RI triggers that are on the specified relation
1569  * (if fk_scan is false) or have it as the tgconstrrel (if fk_scan
1570  * is true).  Update RI trigger args fields matching oldname to contain
1571  * newname instead.  If update_relname is true, examine the relname
1572  * fields; otherwise examine the attname fields.
1573  */
1574 static void
1575 update_ri_trigger_args(Oid relid,
1576                                            const char *oldname,
1577                                            const char *newname,
1578                                            bool fk_scan,
1579                                            bool update_relname)
1580 {
1581         Relation        tgrel;
1582         ScanKeyData skey[1];
1583         SysScanDesc trigscan;
1584         HeapTuple       tuple;
1585         Datum           values[Natts_pg_trigger];
1586         char            nulls[Natts_pg_trigger];
1587         char            replaces[Natts_pg_trigger];
1588
1589         tgrel = heap_open(TriggerRelationId, RowExclusiveLock);
1590         if (fk_scan)
1591         {
1592                 ScanKeyInit(&skey[0],
1593                                         Anum_pg_trigger_tgconstrrelid,
1594                                         BTEqualStrategyNumber, F_OIDEQ,
1595                                         ObjectIdGetDatum(relid));
1596                 trigscan = systable_beginscan(tgrel, TriggerConstrRelidIndexId,
1597                                                                           true, SnapshotNow,
1598                                                                           1, skey);
1599         }
1600         else
1601         {
1602                 ScanKeyInit(&skey[0],
1603                                         Anum_pg_trigger_tgrelid,
1604                                         BTEqualStrategyNumber, F_OIDEQ,
1605                                         ObjectIdGetDatum(relid));
1606                 trigscan = systable_beginscan(tgrel, TriggerRelidNameIndexId,
1607                                                                           true, SnapshotNow,
1608                                                                           1, skey);
1609         }
1610
1611         while ((tuple = systable_getnext(trigscan)) != NULL)
1612         {
1613                 Form_pg_trigger pg_trigger = (Form_pg_trigger) GETSTRUCT(tuple);
1614                 bytea      *val;
1615                 bytea      *newtgargs;
1616                 bool            isnull;
1617                 int                     tg_type;
1618                 bool            examine_pk;
1619                 bool            changed;
1620                 int                     tgnargs;
1621                 int                     i;
1622                 int                     newlen;
1623                 const char *arga[RI_MAX_ARGUMENTS];
1624                 const char *argp;
1625
1626                 tg_type = RI_FKey_trigger_type(pg_trigger->tgfoid);
1627                 if (tg_type == RI_TRIGGER_NONE)
1628                 {
1629                         /* Not an RI trigger, forget it */
1630                         continue;
1631                 }
1632
1633                 /*
1634                  * It is an RI trigger, so parse the tgargs bytea.
1635                  *
1636                  * NB: we assume the field will never be compressed or moved out of
1637                  * line; so does trigger.c ...
1638                  */
1639                 tgnargs = pg_trigger->tgnargs;
1640                 val = (bytea *)
1641                         DatumGetPointer(fastgetattr(tuple,
1642                                                                                 Anum_pg_trigger_tgargs,
1643                                                                                 tgrel->rd_att, &isnull));
1644                 if (isnull || tgnargs < RI_FIRST_ATTNAME_ARGNO ||
1645                         tgnargs > RI_MAX_ARGUMENTS)
1646                 {
1647                         /* This probably shouldn't happen, but ignore busted triggers */
1648                         continue;
1649                 }
1650                 argp = (const char *) VARDATA(val);
1651                 for (i = 0; i < tgnargs; i++)
1652                 {
1653                         arga[i] = argp;
1654                         argp += strlen(argp) + 1;
1655                 }
1656
1657                 /*
1658                  * Figure out which item(s) to look at.  If the trigger is
1659                  * primary-key type and attached to my rel, I should look at the
1660                  * PK fields; if it is foreign-key type and attached to my rel, I
1661                  * should look at the FK fields.  But the opposite rule holds when
1662                  * examining triggers found by tgconstrrel search.
1663                  */
1664                 examine_pk = (tg_type == RI_TRIGGER_PK) == (!fk_scan);
1665
1666                 changed = false;
1667                 if (update_relname)
1668                 {
1669                         /* Change the relname if needed */
1670                         i = examine_pk ? RI_PK_RELNAME_ARGNO : RI_FK_RELNAME_ARGNO;
1671                         if (strcmp(arga[i], oldname) == 0)
1672                         {
1673                                 arga[i] = newname;
1674                                 changed = true;
1675                         }
1676                 }
1677                 else
1678                 {
1679                         /* Change attname(s) if needed */
1680                         i = examine_pk ? RI_FIRST_ATTNAME_ARGNO + RI_KEYPAIR_PK_IDX :
1681                                 RI_FIRST_ATTNAME_ARGNO + RI_KEYPAIR_FK_IDX;
1682                         for (; i < tgnargs; i += 2)
1683                         {
1684                                 if (strcmp(arga[i], oldname) == 0)
1685                                 {
1686                                         arga[i] = newname;
1687                                         changed = true;
1688                                 }
1689                         }
1690                 }
1691
1692                 if (!changed)
1693                 {
1694                         /* Don't need to update this tuple */
1695                         continue;
1696                 }
1697
1698                 /*
1699                  * Construct modified tgargs bytea.
1700                  */
1701                 newlen = VARHDRSZ;
1702                 for (i = 0; i < tgnargs; i++)
1703                         newlen += strlen(arga[i]) + 1;
1704                 newtgargs = (bytea *) palloc(newlen);
1705                 VARATT_SIZEP(newtgargs) = newlen;
1706                 newlen = VARHDRSZ;
1707                 for (i = 0; i < tgnargs; i++)
1708                 {
1709                         strcpy(((char *) newtgargs) + newlen, arga[i]);
1710                         newlen += strlen(arga[i]) + 1;
1711                 }
1712
1713                 /*
1714                  * Build modified tuple.
1715                  */
1716                 for (i = 0; i < Natts_pg_trigger; i++)
1717                 {
1718                         values[i] = (Datum) 0;
1719                         replaces[i] = ' ';
1720                         nulls[i] = ' ';
1721                 }
1722                 values[Anum_pg_trigger_tgargs - 1] = PointerGetDatum(newtgargs);
1723                 replaces[Anum_pg_trigger_tgargs - 1] = 'r';
1724
1725                 tuple = heap_modifytuple(tuple, RelationGetDescr(tgrel), values, nulls, replaces);
1726
1727                 /*
1728                  * Update pg_trigger and its indexes
1729                  */
1730                 simple_heap_update(tgrel, &tuple->t_self, tuple);
1731
1732                 CatalogUpdateIndexes(tgrel, tuple);
1733
1734                 /*
1735                  * Invalidate trigger's relation's relcache entry so that other
1736                  * backends (and this one too!) are sent SI message to make them
1737                  * rebuild relcache entries.  (Ideally this should happen
1738                  * automatically...)
1739                  *
1740                  * We can skip this for triggers on relid itself, since that relcache
1741                  * flush will happen anyway due to the table or column rename.  We
1742                  * just need to catch the far ends of RI relationships.
1743                  */
1744                 pg_trigger = (Form_pg_trigger) GETSTRUCT(tuple);
1745                 if (pg_trigger->tgrelid != relid)
1746                         CacheInvalidateRelcacheByRelid(pg_trigger->tgrelid);
1747
1748                 /* free up our scratch memory */
1749                 pfree(newtgargs);
1750                 heap_freetuple(tuple);
1751         }
1752
1753         systable_endscan(trigscan);
1754
1755         heap_close(tgrel, RowExclusiveLock);
1756
1757         /*
1758          * Increment cmd counter to make updates visible; this is needed in
1759          * case the same tuple has to be updated again by next pass (can
1760          * happen in case of a self-referential FK relationship).
1761          */
1762         CommandCounterIncrement();
1763 }
1764
1765 /*
1766  * AlterTable
1767  *              Execute ALTER TABLE, which can be a list of subcommands
1768  *
1769  * ALTER TABLE is performed in three phases:
1770  *              1. Examine subcommands and perform pre-transformation checking.
1771  *              2. Update system catalogs.
1772  *              3. Scan table(s) to check new constraints, and optionally recopy
1773  *                 the data into new table(s).
1774  * Phase 3 is not performed unless one or more of the subcommands requires
1775  * it.  The intention of this design is to allow multiple independent
1776  * updates of the table schema to be performed with only one pass over the
1777  * data.
1778  *
1779  * ATPrepCmd performs phase 1.  A "work queue" entry is created for
1780  * each table to be affected (there may be multiple affected tables if the
1781  * commands traverse a table inheritance hierarchy).  Also we do preliminary
1782  * validation of the subcommands, including parse transformation of those
1783  * expressions that need to be evaluated with respect to the old table
1784  * schema.
1785  *
1786  * ATRewriteCatalogs performs phase 2 for each affected table (note that
1787  * phases 2 and 3 do no explicit recursion, since phase 1 already did it).
1788  * Certain subcommands need to be performed before others to avoid
1789  * unnecessary conflicts; for example, DROP COLUMN should come before
1790  * ADD COLUMN.  Therefore phase 1 divides the subcommands into multiple
1791  * lists, one for each logical "pass" of phase 2.
1792  *
1793  * ATRewriteTables performs phase 3 for those tables that need it.
1794  *
1795  * Thanks to the magic of MVCC, an error anywhere along the way rolls back
1796  * the whole operation; we don't have to do anything special to clean up.
1797  */
1798 void
1799 AlterTable(AlterTableStmt *stmt)
1800 {
1801         ATController(relation_openrv(stmt->relation, AccessExclusiveLock),
1802                                  stmt->cmds,
1803                                  interpretInhOption(stmt->relation->inhOpt));
1804 }
1805
1806 /*
1807  * AlterTableInternal
1808  *
1809  * ALTER TABLE with target specified by OID
1810  */
1811 void
1812 AlterTableInternal(Oid relid, List *cmds, bool recurse)
1813 {
1814         ATController(relation_open(relid, AccessExclusiveLock),
1815                                  cmds,
1816                                  recurse);
1817 }
1818
1819 static void
1820 ATController(Relation rel, List *cmds, bool recurse)
1821 {
1822         List       *wqueue = NIL;
1823         ListCell   *lcmd;
1824
1825         /* Phase 1: preliminary examination of commands, create work queue */
1826         foreach(lcmd, cmds)
1827         {
1828                 AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lcmd);
1829
1830                 ATPrepCmd(&wqueue, rel, cmd, recurse, false);
1831         }
1832
1833         /* Close the relation, but keep lock until commit */
1834         relation_close(rel, NoLock);
1835
1836         /* Phase 2: update system catalogs */
1837         ATRewriteCatalogs(&wqueue);
1838
1839         /* Phase 3: scan/rewrite tables as needed */
1840         ATRewriteTables(&wqueue);
1841 }
1842
1843 /*
1844  * ATPrepCmd
1845  *
1846  * Traffic cop for ALTER TABLE Phase 1 operations, including simple
1847  * recursion and permission checks.
1848  *
1849  * Caller must have acquired AccessExclusiveLock on relation already.
1850  * This lock should be held until commit.
1851  */
1852 static void
1853 ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
1854                   bool recurse, bool recursing)
1855 {
1856         AlteredTableInfo *tab;
1857         int                     pass;
1858
1859         /* Find or create work queue entry for this table */
1860         tab = ATGetQueueEntry(wqueue, rel);
1861
1862         /*
1863          * Copy the original subcommand for each table.  This avoids conflicts
1864          * when different child tables need to make different parse
1865          * transformations (for example, the same column may have different
1866          * column numbers in different children).
1867          */
1868         cmd = copyObject(cmd);
1869
1870         /*
1871          * Do permissions checking, recursion to child tables if needed, and
1872          * any additional phase-1 processing needed.
1873          */
1874         switch (cmd->subtype)
1875         {
1876                 case AT_AddColumn:              /* ADD COLUMN */
1877                         ATSimplePermissions(rel, false);
1878                         /* Performs own recursion */
1879                         ATPrepAddColumn(wqueue, rel, recurse, cmd);
1880                         pass = AT_PASS_ADD_COL;
1881                         break;
1882                 case AT_ColumnDefault:  /* ALTER COLUMN DEFAULT */
1883
1884                         /*
1885                          * We allow defaults on views so that INSERT into a view can
1886                          * have default-ish behavior.  This works because the rewriter
1887                          * substitutes default values into INSERTs before it expands
1888                          * rules.
1889                          */
1890                         ATSimplePermissions(rel, true);
1891                         ATSimpleRecursion(wqueue, rel, cmd, recurse);
1892                         /* No command-specific prep needed */
1893                         pass = AT_PASS_ADD_CONSTR;
1894                         break;
1895                 case AT_DropNotNull:    /* ALTER COLUMN DROP NOT NULL */
1896                         ATSimplePermissions(rel, false);
1897                         ATSimpleRecursion(wqueue, rel, cmd, recurse);
1898                         /* No command-specific prep needed */
1899                         pass = AT_PASS_DROP;
1900                         break;
1901                 case AT_SetNotNull:             /* ALTER COLUMN SET NOT NULL */
1902                         ATSimplePermissions(rel, false);
1903                         ATSimpleRecursion(wqueue, rel, cmd, recurse);
1904                         /* No command-specific prep needed */
1905                         pass = AT_PASS_ADD_CONSTR;
1906                         break;
1907                 case AT_SetStatistics:  /* ALTER COLUMN STATISTICS */
1908                         ATSimpleRecursion(wqueue, rel, cmd, recurse);
1909                         /* Performs own permission checks */
1910                         ATPrepSetStatistics(rel, cmd->name, cmd->def);
1911                         pass = AT_PASS_COL_ATTRS;
1912                         break;
1913                 case AT_SetStorage:             /* ALTER COLUMN STORAGE */
1914                         ATSimplePermissions(rel, false);
1915                         ATSimpleRecursion(wqueue, rel, cmd, recurse);
1916                         /* No command-specific prep needed */
1917                         pass = AT_PASS_COL_ATTRS;
1918                         break;
1919                 case AT_DropColumn:             /* DROP COLUMN */
1920                         ATSimplePermissions(rel, false);
1921                         /* Recursion occurs during execution phase */
1922                         /* No command-specific prep needed except saving recurse flag */
1923                         if (recurse)
1924                                 cmd->subtype = AT_DropColumnRecurse;
1925                         pass = AT_PASS_DROP;
1926                         break;
1927                 case AT_AddIndex:               /* ADD INDEX */
1928                         ATSimplePermissions(rel, false);
1929                         /* This command never recurses */
1930                         /* No command-specific prep needed */
1931                         pass = AT_PASS_ADD_INDEX;
1932                         break;
1933                 case AT_AddConstraint:  /* ADD CONSTRAINT */
1934                         ATSimplePermissions(rel, false);
1935
1936                         /*
1937                          * Currently we recurse only for CHECK constraints, never for
1938                          * foreign-key constraints.  UNIQUE/PKEY constraints won't be
1939                          * seen here.
1940                          */
1941                         if (IsA(cmd->def, Constraint))
1942                                 ATSimpleRecursion(wqueue, rel, cmd, recurse);
1943                         /* No command-specific prep needed */
1944                         pass = AT_PASS_ADD_CONSTR;
1945                         break;
1946                 case AT_DropConstraint: /* DROP CONSTRAINT */
1947                         ATSimplePermissions(rel, false);
1948                         /* Performs own recursion */
1949                         ATPrepDropConstraint(wqueue, rel, recurse, cmd);
1950                         pass = AT_PASS_DROP;
1951                         break;
1952                 case AT_DropConstraintQuietly:  /* DROP CONSTRAINT for child */
1953                         ATSimplePermissions(rel, false);
1954                         ATSimpleRecursion(wqueue, rel, cmd, recurse);
1955                         /* No command-specific prep needed */
1956                         pass = AT_PASS_DROP;
1957                         break;
1958                 case AT_AlterColumnType:                /* ALTER COLUMN TYPE */
1959                         ATSimplePermissions(rel, false);
1960                         /* Performs own recursion */
1961                         ATPrepAlterColumnType(wqueue, tab, rel, recurse, recursing, cmd);
1962                         pass = AT_PASS_ALTER_TYPE;
1963                         break;
1964                 case AT_ToastTable:             /* CREATE TOAST TABLE */
1965                         ATSimplePermissions(rel, false);
1966                         /* This command never recurses */
1967                         /* No command-specific prep needed */
1968                         pass = AT_PASS_MISC;
1969                         break;
1970                 case AT_ChangeOwner:    /* ALTER OWNER */
1971                         /* This command never recurses */
1972                         /* No command-specific prep needed */
1973                         pass = AT_PASS_MISC;
1974                         break;
1975                 case AT_ClusterOn:              /* CLUSTER ON */
1976                 case AT_DropCluster:    /* SET WITHOUT CLUSTER */
1977                         ATSimplePermissions(rel, false);
1978                         /* These commands never recurse */
1979                         /* No command-specific prep needed */
1980                         pass = AT_PASS_MISC;
1981                         break;
1982                 case AT_DropOids:               /* SET WITHOUT OIDS */
1983                         ATSimplePermissions(rel, false);
1984                         /* Performs own recursion */
1985                         if (rel->rd_rel->relhasoids)
1986                         {
1987                                 AlterTableCmd *dropCmd = makeNode(AlterTableCmd);
1988
1989                                 dropCmd->subtype = AT_DropColumn;
1990                                 dropCmd->name = pstrdup("oid");
1991                                 dropCmd->behavior = cmd->behavior;
1992                                 ATPrepCmd(wqueue, rel, dropCmd, recurse, false);
1993                         }
1994                         pass = AT_PASS_DROP;
1995                         break;
1996                 case AT_SetTableSpace:  /* SET TABLESPACE */
1997                         /* This command never recurses */
1998                         ATPrepSetTableSpace(tab, rel, cmd->name);
1999                         pass = AT_PASS_MISC;    /* doesn't actually matter */
2000                         break;
2001                 default:                                /* oops */
2002                         elog(ERROR, "unrecognized alter table type: %d",
2003                                  (int) cmd->subtype);
2004                         pass = 0;                       /* keep compiler quiet */
2005                         break;
2006         }
2007
2008         /* Add the subcommand to the appropriate list for phase 2 */
2009         tab->subcmds[pass] = lappend(tab->subcmds[pass], cmd);
2010 }
2011
2012 /*
2013  * ATRewriteCatalogs
2014  *
2015  * Traffic cop for ALTER TABLE Phase 2 operations.      Subcommands are
2016  * dispatched in a "safe" execution order (designed to avoid unnecessary
2017  * conflicts).
2018  */
2019 static void
2020 ATRewriteCatalogs(List **wqueue)
2021 {
2022         int                     pass;
2023         ListCell   *ltab;
2024
2025         /*
2026          * We process all the tables "in parallel", one pass at a time.  This
2027          * is needed because we may have to propagate work from one table to
2028          * another (specifically, ALTER TYPE on a foreign key's PK has to
2029          * dispatch the re-adding of the foreign key constraint to the other
2030          * table).      Work can only be propagated into later passes, however.
2031          */
2032         for (pass = 0; pass < AT_NUM_PASSES; pass++)
2033         {
2034                 /* Go through each table that needs to be processed */
2035                 foreach(ltab, *wqueue)
2036                 {
2037                         AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
2038                         List       *subcmds = tab->subcmds[pass];
2039                         Relation        rel;
2040                         ListCell   *lcmd;
2041
2042                         if (subcmds == NIL)
2043                                 continue;
2044
2045                         /*
2046                          * Exclusive lock was obtained by phase 1, needn't get it
2047                          * again
2048                          */
2049                         rel = relation_open(tab->relid, NoLock);
2050
2051                         foreach(lcmd, subcmds)
2052                                 ATExecCmd(tab, rel, (AlterTableCmd *) lfirst(lcmd));
2053
2054                         /*
2055                          * After the ALTER TYPE pass, do cleanup work (this is not
2056                          * done in ATExecAlterColumnType since it should be done only
2057                          * once if multiple columns of a table are altered).
2058                          */
2059                         if (pass == AT_PASS_ALTER_TYPE)
2060                                 ATPostAlterTypeCleanup(wqueue, tab);
2061
2062                         relation_close(rel, NoLock);
2063                 }
2064         }
2065
2066         /*
2067          * Do an implicit CREATE TOAST TABLE if we executed any subcommands
2068          * that might have added a column or changed column storage.
2069          */
2070         foreach(ltab, *wqueue)
2071         {
2072                 AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
2073
2074                 if (tab->relkind == RELKIND_RELATION &&
2075                         (tab->subcmds[AT_PASS_ADD_COL] ||
2076                          tab->subcmds[AT_PASS_ALTER_TYPE] ||
2077                          tab->subcmds[AT_PASS_COL_ATTRS]))
2078                         AlterTableCreateToastTable(tab->relid, true);
2079         }
2080 }
2081
2082 /*
2083  * ATExecCmd: dispatch a subcommand to appropriate execution routine
2084  */
2085 static void
2086 ATExecCmd(AlteredTableInfo *tab, Relation rel, AlterTableCmd *cmd)
2087 {
2088         switch (cmd->subtype)
2089         {
2090                 case AT_AddColumn:              /* ADD COLUMN */
2091                         ATExecAddColumn(tab, rel, (ColumnDef *) cmd->def);
2092                         break;
2093                 case AT_ColumnDefault:  /* ALTER COLUMN DEFAULT */
2094                         ATExecColumnDefault(rel, cmd->name, cmd->def);
2095                         break;
2096                 case AT_DropNotNull:    /* ALTER COLUMN DROP NOT NULL */
2097                         ATExecDropNotNull(rel, cmd->name);
2098                         break;
2099                 case AT_SetNotNull:             /* ALTER COLUMN SET NOT NULL */
2100                         ATExecSetNotNull(tab, rel, cmd->name);
2101                         break;
2102                 case AT_SetStatistics:  /* ALTER COLUMN STATISTICS */
2103                         ATExecSetStatistics(rel, cmd->name, cmd->def);
2104                         break;
2105                 case AT_SetStorage:             /* ALTER COLUMN STORAGE */
2106                         ATExecSetStorage(rel, cmd->name, cmd->def);
2107                         break;
2108                 case AT_DropColumn:             /* DROP COLUMN */
2109                         ATExecDropColumn(rel, cmd->name, cmd->behavior, false, false);
2110                         break;
2111                 case AT_DropColumnRecurse:              /* DROP COLUMN with recursion */
2112                         ATExecDropColumn(rel, cmd->name, cmd->behavior, true, false);
2113                         break;
2114                 case AT_AddIndex:               /* ADD INDEX */
2115                         ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, false);
2116                         break;
2117                 case AT_ReAddIndex:             /* ADD INDEX */
2118                         ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, true);
2119                         break;
2120                 case AT_AddConstraint:  /* ADD CONSTRAINT */
2121                         ATExecAddConstraint(tab, rel, cmd->def);
2122                         break;
2123                 case AT_DropConstraint: /* DROP CONSTRAINT */
2124                         ATExecDropConstraint(rel, cmd->name, cmd->behavior, false);
2125                         break;
2126                 case AT_DropConstraintQuietly:  /* DROP CONSTRAINT for child */
2127                         ATExecDropConstraint(rel, cmd->name, cmd->behavior, true);
2128                         break;
2129                 case AT_AlterColumnType:                /* ALTER COLUMN TYPE */
2130                         ATExecAlterColumnType(tab, rel, cmd->name, (TypeName *) cmd->def);
2131                         break;
2132                 case AT_ToastTable:             /* CREATE TOAST TABLE */
2133                         AlterTableCreateToastTable(RelationGetRelid(rel), false);
2134                         break;
2135                 case AT_ChangeOwner:    /* ALTER OWNER */
2136                         ATExecChangeOwner(RelationGetRelid(rel),
2137                                                           get_roleid_checked(cmd->name));
2138                         break;
2139                 case AT_ClusterOn:              /* CLUSTER ON */
2140                         ATExecClusterOn(rel, cmd->name);
2141                         break;
2142                 case AT_DropCluster:    /* SET WITHOUT CLUSTER */
2143                         ATExecDropCluster(rel);
2144                         break;
2145                 case AT_DropOids:               /* SET WITHOUT OIDS */
2146
2147                         /*
2148                          * Nothing to do here; we'll have generated a DropColumn
2149                          * subcommand to do the real work
2150                          */
2151                         break;
2152                 case AT_SetTableSpace:  /* SET TABLESPACE */
2153
2154                         /*
2155                          * Nothing to do here; Phase 3 does the work
2156                          */
2157                         break;
2158                 default:                                /* oops */
2159                         elog(ERROR, "unrecognized alter table type: %d",
2160                                  (int) cmd->subtype);
2161                         break;
2162         }
2163
2164         /*
2165          * Bump the command counter to ensure the next subcommand in the
2166          * sequence can see the changes so far
2167          */
2168         CommandCounterIncrement();
2169 }
2170
2171 /*
2172  * ATRewriteTables: ALTER TABLE phase 3
2173  */
2174 static void
2175 ATRewriteTables(List **wqueue)
2176 {
2177         ListCell   *ltab;
2178
2179         /* Go through each table that needs to be checked or rewritten */
2180         foreach(ltab, *wqueue)
2181         {
2182                 AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
2183
2184                 /*
2185                  * We only need to rewrite the table if at least one column needs
2186                  * to be recomputed.
2187                  */
2188                 if (tab->newvals != NIL)
2189                 {
2190                         /* Build a temporary relation and copy data */
2191                         Oid                     OIDNewHeap;
2192                         char            NewHeapName[NAMEDATALEN];
2193                         Oid                     NewTableSpace;
2194                         Relation        OldHeap;
2195                         ObjectAddress object;
2196
2197                         OldHeap = heap_open(tab->relid, NoLock);
2198
2199                         /*
2200                          * We can never allow rewriting of shared or nailed-in-cache
2201                          * relations, because we can't support changing their
2202                          * relfilenode values.
2203                          */
2204                         if (OldHeap->rd_rel->relisshared || OldHeap->rd_isnailed)
2205                                 ereport(ERROR,
2206                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2207                                                  errmsg("cannot rewrite system relation \"%s\"",
2208                                                                 RelationGetRelationName(OldHeap))));
2209
2210                         /*
2211                          * Don't allow rewrite on temp tables of other backends ...
2212                          * their local buffer manager is not going to cope.
2213                          */
2214                         if (isOtherTempNamespace(RelationGetNamespace(OldHeap)))
2215                                 ereport(ERROR,
2216                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2217                                                  errmsg("cannot rewrite temporary tables of other sessions")));
2218
2219                         /*
2220                          * Select destination tablespace (same as original unless user
2221                          * requested a change)
2222                          */
2223                         if (tab->newTableSpace)
2224                                 NewTableSpace = tab->newTableSpace;
2225                         else
2226                                 NewTableSpace = OldHeap->rd_rel->reltablespace;
2227
2228                         heap_close(OldHeap, NoLock);
2229
2230                         /*
2231                          * Create the new heap, using a temporary name in the same
2232                          * namespace as the existing table.  NOTE: there is some risk
2233                          * of collision with user relnames.  Working around this seems
2234                          * more trouble than it's worth; in particular, we can't
2235                          * create the new heap in a different namespace from the old,
2236                          * or we will have problems with the TEMP status of temp
2237                          * tables.
2238                          */
2239                         snprintf(NewHeapName, sizeof(NewHeapName),
2240                                          "pg_temp_%u", tab->relid);
2241
2242                         OIDNewHeap = make_new_heap(tab->relid, NewHeapName, NewTableSpace);
2243
2244                         /*
2245                          * Copy the heap data into the new table with the desired
2246                          * modifications, and test the current data within the table
2247                          * against new constraints generated by ALTER TABLE commands.
2248                          */
2249                         ATRewriteTable(tab, OIDNewHeap);
2250
2251                         /* Swap the physical files of the old and new heaps. */
2252                         swap_relation_files(tab->relid, OIDNewHeap);
2253
2254                         CommandCounterIncrement();
2255
2256                         /* Destroy new heap with old filenode */
2257                         object.classId = RelationRelationId;
2258                         object.objectId = OIDNewHeap;
2259                         object.objectSubId = 0;
2260
2261                         /*
2262                          * The new relation is local to our transaction and we know
2263                          * nothing depends on it, so DROP_RESTRICT should be OK.
2264                          */
2265                         performDeletion(&object, DROP_RESTRICT);
2266                         /* performDeletion does CommandCounterIncrement at end */
2267
2268                         /*
2269                          * Rebuild each index on the relation (but not the toast
2270                          * table, which is all-new anyway).  We do not need
2271                          * CommandCounterIncrement() because reindex_relation does it.
2272                          */
2273                         reindex_relation(tab->relid, false);
2274                 }
2275                 else
2276                 {
2277                         /*
2278                          * Test the current data within the table against new
2279                          * constraints generated by ALTER TABLE commands, but don't
2280                          * rebuild data.
2281                          */
2282                         if (tab->constraints != NIL)
2283                                 ATRewriteTable(tab, InvalidOid);
2284
2285                         /*
2286                          * If we had SET TABLESPACE but no reason to reconstruct
2287                          * tuples, just do a block-by-block copy.
2288                          */
2289                         if (tab->newTableSpace)
2290                                 ATExecSetTableSpace(tab->relid, tab->newTableSpace);
2291                 }
2292         }
2293
2294         /*
2295          * Foreign key constraints are checked in a final pass, since (a) it's
2296          * generally best to examine each one separately, and (b) it's at
2297          * least theoretically possible that we have changed both relations of
2298          * the foreign key, and we'd better have finished both rewrites before
2299          * we try to read the tables.
2300          */
2301         foreach(ltab, *wqueue)
2302         {
2303                 AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
2304                 Relation        rel = NULL;
2305                 ListCell   *lcon;
2306
2307                 foreach(lcon, tab->constraints)
2308                 {
2309                         NewConstraint *con = lfirst(lcon);
2310
2311                         if (con->contype == CONSTR_FOREIGN)
2312                         {
2313                                 FkConstraint *fkconstraint = (FkConstraint *) con->qual;
2314                                 Relation        refrel;
2315
2316                                 if (rel == NULL)
2317                                 {
2318                                         /* Long since locked, no need for another */
2319                                         rel = heap_open(tab->relid, NoLock);
2320                                 }
2321
2322                                 refrel = heap_open(con->refrelid, RowShareLock);
2323
2324                                 validateForeignKeyConstraint(fkconstraint, rel, refrel);
2325
2326                                 heap_close(refrel, NoLock);
2327                         }
2328                 }
2329
2330                 if (rel)
2331                         heap_close(rel, NoLock);
2332         }
2333 }
2334
2335 /*
2336  * ATRewriteTable: scan or rewrite one table
2337  *
2338  * OIDNewHeap is InvalidOid if we don't need to rewrite
2339  */
2340 static void
2341 ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap)
2342 {
2343         Relation        oldrel;
2344         Relation        newrel;
2345         TupleDesc       oldTupDesc;
2346         TupleDesc       newTupDesc;
2347         bool            needscan = false;
2348         int                     i;
2349         ListCell   *l;
2350         EState     *estate;
2351
2352         /*
2353          * Open the relation(s).  We have surely already locked the existing
2354          * table.
2355          */
2356         oldrel = heap_open(tab->relid, NoLock);
2357         oldTupDesc = tab->oldDesc;
2358         newTupDesc = RelationGetDescr(oldrel);          /* includes all mods */
2359
2360         if (OidIsValid(OIDNewHeap))
2361                 newrel = heap_open(OIDNewHeap, AccessExclusiveLock);
2362         else
2363                 newrel = NULL;
2364
2365         /*
2366          * If we need to rewrite the table, the operation has to be propagated
2367          * to tables that use this table's rowtype as a column type.
2368          *
2369          * (Eventually this will probably become true for scans as well, but at
2370          * the moment a composite type does not enforce any constraints, so
2371          * it's not necessary/appropriate to enforce them just during ALTER.)
2372          */
2373         if (newrel)
2374                 find_composite_type_dependencies(oldrel->rd_rel->reltype,
2375                                                                                  RelationGetRelationName(oldrel));
2376
2377         /*
2378          * Generate the constraint and default execution states
2379          */
2380
2381         estate = CreateExecutorState();
2382
2383         /* Build the needed expression execution states */
2384         foreach(l, tab->constraints)
2385         {
2386                 NewConstraint *con = lfirst(l);
2387
2388                 switch (con->contype)
2389                 {
2390                         case CONSTR_CHECK:
2391                                 needscan = true;
2392                                 con->qualstate = (List *)
2393                                         ExecPrepareExpr((Expr *) con->qual, estate);
2394                                 break;
2395                         case CONSTR_FOREIGN:
2396                                 /* Nothing to do here */
2397                                 break;
2398                         case CONSTR_NOTNULL:
2399                                 needscan = true;
2400                                 break;
2401                         default:
2402                                 elog(ERROR, "unrecognized constraint type: %d",
2403                                          (int) con->contype);
2404                 }
2405         }
2406
2407         foreach(l, tab->newvals)
2408         {
2409                 NewColumnValue *ex = lfirst(l);
2410
2411                 needscan = true;
2412
2413                 ex->exprstate = ExecPrepareExpr((Expr *) ex->expr, estate);
2414         }
2415
2416         if (needscan)
2417         {
2418                 ExprContext *econtext;
2419                 Datum      *values;
2420                 bool       *isnull;
2421                 TupleTableSlot *oldslot;
2422                 TupleTableSlot *newslot;
2423                 HeapScanDesc scan;
2424                 HeapTuple       tuple;
2425                 MemoryContext oldCxt;
2426                 List *dropped_attrs = NIL;
2427                 ListCell *lc;
2428
2429                 econtext = GetPerTupleExprContext(estate);
2430
2431                 /*
2432                  * Make tuple slots for old and new tuples.  Note that even when
2433                  * the tuples are the same, the tupDescs might not be (consider
2434                  * ADD COLUMN without a default).
2435                  */
2436                 oldslot = MakeSingleTupleTableSlot(oldTupDesc);
2437                 newslot = MakeSingleTupleTableSlot(newTupDesc);
2438
2439                 /* Preallocate values/isnull arrays */
2440                 i = Max(newTupDesc->natts, oldTupDesc->natts);
2441                 values = (Datum *) palloc(i * sizeof(Datum));
2442                 isnull = (bool *) palloc(i * sizeof(bool));
2443                 memset(values, 0, i * sizeof(Datum));
2444                 memset(isnull, true, i * sizeof(bool));
2445
2446                 /*
2447                  * Any attributes that are dropped according to the new tuple
2448                  * descriptor can be set to NULL. We precompute the list of
2449                  * dropped attributes to avoid needing to do so in the
2450                  * per-tuple loop.
2451                  */
2452                 for (i = 0; i < newTupDesc->natts; i++)
2453                 {
2454                         if (newTupDesc->attrs[i]->attisdropped)
2455                                 dropped_attrs = lappend_int(dropped_attrs, i);
2456                 }
2457
2458                 /*
2459                  * Scan through the rows, generating a new row if needed and then
2460                  * checking all the constraints.
2461                  */
2462                 scan = heap_beginscan(oldrel, SnapshotNow, 0, NULL);
2463
2464                 /*
2465                  * Switch to per-tuple memory context and reset it for each
2466                  * tuple produced, so we don't leak memory.
2467                  */
2468                 oldCxt = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2469
2470                 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
2471                 {
2472                         if (newrel)
2473                         {
2474                                 /* Extract data from old tuple */
2475                                 heap_deform_tuple(tuple, oldTupDesc, values, isnull);
2476
2477                                 /* Set dropped attributes to null in new tuple */
2478                                 foreach (lc, dropped_attrs)
2479                                         isnull[lfirst_int(lc)] = true;
2480
2481                                 /*
2482                                  * Process supplied expressions to replace selected
2483                                  * columns. Expression inputs come from the old tuple.
2484                                  */
2485                                 ExecStoreTuple(tuple, oldslot, InvalidBuffer, false);
2486                                 econtext->ecxt_scantuple = oldslot;
2487
2488                                 foreach(l, tab->newvals)
2489                                 {
2490                                         NewColumnValue *ex = lfirst(l);
2491
2492                                         values[ex->attnum - 1] = ExecEvalExpr(ex->exprstate,
2493                                                                                                                   econtext,
2494                                                                                                                   &isnull[ex->attnum - 1],
2495                                                                                                                   NULL);
2496                                 }
2497
2498                                 /*
2499                                  * Form the new tuple. Note that we don't explicitly
2500                                  * pfree it, since the per-tuple memory context will
2501                                  * be reset shortly.
2502                                  */
2503                                 tuple = heap_form_tuple(newTupDesc, values, isnull);
2504                         }
2505
2506                         /* Now check any constraints on the possibly-changed tuple */
2507                         ExecStoreTuple(tuple, newslot, InvalidBuffer, false);
2508                         econtext->ecxt_scantuple = newslot;
2509
2510                         foreach(l, tab->constraints)
2511                         {
2512                                 NewConstraint *con = lfirst(l);
2513
2514                                 switch (con->contype)
2515                                 {
2516                                         case CONSTR_CHECK:
2517                                                 if (!ExecQual(con->qualstate, econtext, true))
2518                                                         ereport(ERROR,
2519                                                                         (errcode(ERRCODE_CHECK_VIOLATION),
2520                                                                          errmsg("check constraint \"%s\" is violated by some row",
2521                                                                                         con->name)));
2522                                                 break;
2523                                         case CONSTR_NOTNULL:
2524                                                 {
2525                                                         Datum           d;
2526                                                         bool            isnull;
2527
2528                                                         d = heap_getattr(tuple, con->attnum, newTupDesc,
2529                                                                                          &isnull);
2530                                                         if (isnull)
2531                                                                 ereport(ERROR,
2532                                                                         (errcode(ERRCODE_NOT_NULL_VIOLATION),
2533                                                                          errmsg("column \"%s\" contains null values",
2534                                                                                         get_attname(tab->relid,
2535                                                                                                                 con->attnum))));
2536                                                 }
2537                                                 break;
2538                                         case CONSTR_FOREIGN:
2539                                                 /* Nothing to do here */
2540                                                 break;
2541                                         default:
2542                                                 elog(ERROR, "unrecognized constraint type: %d",
2543                                                          (int) con->contype);
2544                                 }
2545                         }
2546
2547                         /* Write the tuple out to the new relation */
2548                         if (newrel)
2549                                 simple_heap_insert(newrel, tuple);
2550
2551                         ResetExprContext(econtext);
2552
2553                         CHECK_FOR_INTERRUPTS();
2554                 }
2555
2556                 MemoryContextSwitchTo(oldCxt);
2557                 heap_endscan(scan);
2558         }
2559
2560         FreeExecutorState(estate);
2561
2562         heap_close(oldrel, NoLock);
2563         if (newrel)
2564                 heap_close(newrel, NoLock);
2565 }
2566
2567 /*
2568  * ATGetQueueEntry: find or create an entry in the ALTER TABLE work queue
2569  */
2570 static AlteredTableInfo *
2571 ATGetQueueEntry(List **wqueue, Relation rel)
2572 {
2573         Oid                     relid = RelationGetRelid(rel);
2574         AlteredTableInfo *tab;
2575         ListCell   *ltab;
2576
2577         foreach(ltab, *wqueue)
2578         {
2579                 tab = (AlteredTableInfo *) lfirst(ltab);
2580                 if (tab->relid == relid)
2581                         return tab;
2582         }
2583
2584         /*
2585          * Not there, so add it.  Note that we make a copy of the relation's
2586          * existing descriptor before anything interesting can happen to it.
2587          */
2588         tab = (AlteredTableInfo *) palloc0(sizeof(AlteredTableInfo));
2589         tab->relid = relid;
2590         tab->relkind = rel->rd_rel->relkind;
2591         tab->oldDesc = CreateTupleDescCopy(RelationGetDescr(rel));
2592
2593         *wqueue = lappend(*wqueue, tab);
2594
2595         return tab;
2596 }
2597
2598 /*
2599  * ATSimplePermissions
2600  *
2601  * - Ensure that it is a relation (or possibly a view)
2602  * - Ensure this user is the owner
2603  * - Ensure that it is not a system table
2604  */
2605 static void
2606 ATSimplePermissions(Relation rel, bool allowView)
2607 {
2608         if (rel->rd_rel->relkind != RELKIND_RELATION)
2609         {
2610                 if (allowView)
2611                 {
2612                         if (rel->rd_rel->relkind != RELKIND_VIEW)
2613                                 ereport(ERROR,
2614                                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2615                                                  errmsg("\"%s\" is not a table or view",
2616                                                                 RelationGetRelationName(rel))));
2617                 }
2618                 else
2619                         ereport(ERROR,
2620                                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2621                                          errmsg("\"%s\" is not a table",
2622                                                         RelationGetRelationName(rel))));
2623         }
2624
2625         /* Permissions checks */
2626         if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
2627                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
2628                                            RelationGetRelationName(rel));
2629
2630         if (!allowSystemTableMods && IsSystemRelation(rel))
2631                 ereport(ERROR,
2632                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2633                                  errmsg("permission denied: \"%s\" is a system catalog",
2634                                                 RelationGetRelationName(rel))));
2635 }
2636
2637 /*
2638  * ATSimpleRecursion
2639  *
2640  * Simple table recursion sufficient for most ALTER TABLE operations.
2641  * All direct and indirect children are processed in an unspecified order.
2642  * Note that if a child inherits from the original table via multiple
2643  * inheritance paths, it will be visited just once.
2644  */
2645 static void
2646 ATSimpleRecursion(List **wqueue, Relation rel,
2647                                   AlterTableCmd *cmd, bool recurse)
2648 {
2649         /*
2650          * Propagate to children if desired.  Non-table relations never have
2651          * children, so no need to search in that case.
2652          */
2653         if (recurse && rel->rd_rel->relkind == RELKIND_RELATION)
2654         {
2655                 Oid                     relid = RelationGetRelid(rel);
2656                 ListCell   *child;
2657                 List       *children;
2658
2659                 /* this routine is actually in the planner */
2660                 children = find_all_inheritors(relid);
2661
2662                 /*
2663                  * find_all_inheritors does the recursive search of the
2664                  * inheritance hierarchy, so all we have to do is process all of
2665                  * the relids in the list that it returns.
2666                  */
2667                 foreach(child, children)
2668                 {
2669                         Oid                     childrelid = lfirst_oid(child);
2670                         Relation        childrel;
2671
2672                         if (childrelid == relid)
2673                                 continue;
2674                         childrel = relation_open(childrelid, AccessExclusiveLock);
2675                         ATPrepCmd(wqueue, childrel, cmd, false, true);
2676                         relation_close(childrel, NoLock);
2677                 }
2678         }
2679 }
2680
2681 /*
2682  * ATOneLevelRecursion
2683  *
2684  * Here, we visit only direct inheritance children.  It is expected that
2685  * the command's prep routine will recurse again to find indirect children.
2686  * When using this technique, a multiply-inheriting child will be visited
2687  * multiple times.
2688  */
2689 static void
2690 ATOneLevelRecursion(List **wqueue, Relation rel,
2691                                         AlterTableCmd *cmd)
2692 {
2693         Oid                     relid = RelationGetRelid(rel);
2694         ListCell   *child;
2695         List       *children;
2696
2697         /* this routine is actually in the planner */
2698         children = find_inheritance_children(relid);
2699
2700         foreach(child, children)
2701         {
2702                 Oid                     childrelid = lfirst_oid(child);
2703                 Relation        childrel;
2704
2705                 childrel = relation_open(childrelid, AccessExclusiveLock);
2706                 ATPrepCmd(wqueue, childrel, cmd, true, true);
2707                 relation_close(childrel, NoLock);
2708         }
2709 }
2710
2711
2712 /*
2713  * find_composite_type_dependencies
2714  *
2715  * Check to see if a table's rowtype is being used as a column in some
2716  * other table (possibly nested several levels deep in composite types!).
2717  * Eventually, we'd like to propagate the check or rewrite operation
2718  * into other such tables, but for now, just error out if we find any.
2719  *
2720  * We assume that functions and views depending on the type are not reasons
2721  * to reject the ALTER.  (How safe is this really?)
2722  */
2723 static void
2724 find_composite_type_dependencies(Oid typeOid, const char *origTblName)
2725 {
2726         Relation        depRel;
2727         ScanKeyData key[2];
2728         SysScanDesc depScan;
2729         HeapTuple       depTup;
2730
2731         /*
2732          * We scan pg_depend to find those things that depend on the rowtype.
2733          * (We assume we can ignore refobjsubid for a rowtype.)
2734          */
2735         depRel = heap_open(DependRelationId, AccessShareLock);
2736
2737         ScanKeyInit(&key[0],
2738                                 Anum_pg_depend_refclassid,
2739                                 BTEqualStrategyNumber, F_OIDEQ,
2740                                 ObjectIdGetDatum(TypeRelationId));
2741         ScanKeyInit(&key[1],
2742                                 Anum_pg_depend_refobjid,
2743                                 BTEqualStrategyNumber, F_OIDEQ,
2744                                 ObjectIdGetDatum(typeOid));
2745
2746         depScan = systable_beginscan(depRel, DependReferenceIndexId, true,
2747                                                                  SnapshotNow, 2, key);
2748
2749         while (HeapTupleIsValid(depTup = systable_getnext(depScan)))
2750         {
2751                 Form_pg_depend pg_depend = (Form_pg_depend) GETSTRUCT(depTup);
2752                 Relation        rel;
2753                 Form_pg_attribute att;
2754
2755                 /* Ignore dependees that aren't user columns of relations */
2756                 /* (we assume system columns are never of rowtypes) */
2757                 if (pg_depend->classid != RelationRelationId ||
2758                         pg_depend->objsubid <= 0)
2759                         continue;
2760
2761                 rel = relation_open(pg_depend->objid, AccessShareLock);
2762                 att = rel->rd_att->attrs[pg_depend->objsubid - 1];
2763
2764                 if (rel->rd_rel->relkind == RELKIND_RELATION)
2765                 {
2766                         ereport(ERROR,
2767                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2768                                          errmsg("cannot alter table \"%s\" because column \"%s\".\"%s\" uses its rowtype",
2769                                                         origTblName,
2770                                                         RelationGetRelationName(rel),
2771                                                         NameStr(att->attname))));
2772                 }
2773                 else if (OidIsValid(rel->rd_rel->reltype))
2774                 {
2775                         /*
2776                          * A view or composite type itself isn't a problem, but we
2777                          * must recursively check for indirect dependencies via its
2778                          * rowtype.
2779                          */
2780                         find_composite_type_dependencies(rel->rd_rel->reltype,
2781                                                                                          origTblName);
2782                 }
2783
2784                 relation_close(rel, AccessShareLock);
2785         }
2786
2787         systable_endscan(depScan);
2788
2789         relation_close(depRel, AccessShareLock);
2790 }
2791
2792
2793 /*
2794  * ALTER TABLE ADD COLUMN
2795  *
2796  * Adds an additional attribute to a relation making the assumption that
2797  * CHECK, NOT NULL, and FOREIGN KEY constraints will be removed from the
2798  * AT_AddColumn AlterTableCmd by analyze.c and added as independent
2799  * AlterTableCmd's.
2800  */
2801 static void
2802 ATPrepAddColumn(List **wqueue, Relation rel, bool recurse,
2803                                 AlterTableCmd *cmd)
2804 {
2805         /*
2806          * Recurse to add the column to child classes, if requested.
2807          *
2808          * We must recurse one level at a time, so that multiply-inheriting
2809          * children are visited the right number of times and end up with the
2810          * right attinhcount.
2811          */
2812         if (recurse)
2813         {
2814                 AlterTableCmd *childCmd = copyObject(cmd);
2815                 ColumnDef  *colDefChild = (ColumnDef *) childCmd->def;
2816
2817                 /* Child should see column as singly inherited */
2818                 colDefChild->inhcount = 1;
2819                 colDefChild->is_local = false;
2820                 /* and don't make a support dependency on the child */
2821                 colDefChild->support = NULL;
2822
2823                 ATOneLevelRecursion(wqueue, rel, childCmd);
2824         }
2825         else
2826         {
2827                 /*
2828                  * If we are told not to recurse, there had better not be any
2829                  * child tables; else the addition would put them out of step.
2830                  */
2831                 if (find_inheritance_children(RelationGetRelid(rel)) != NIL)
2832                         ereport(ERROR,
2833                                         (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
2834                                          errmsg("column must be added to child tables too")));
2835         }
2836 }
2837
2838 static void
2839 ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
2840                                 ColumnDef *colDef)
2841 {
2842         Oid                     myrelid = RelationGetRelid(rel);
2843         Relation        pgclass,
2844                                 attrdesc;
2845         HeapTuple       reltup;
2846         HeapTuple       attributeTuple;
2847         Form_pg_attribute attribute;
2848         FormData_pg_attribute attributeD;
2849         int                     i;
2850         int                     minattnum,
2851                                 maxatts;
2852         HeapTuple       typeTuple;
2853         Oid                     typeOid;
2854         Form_pg_type tform;
2855         Expr       *defval;
2856
2857         attrdesc = heap_open(AttributeRelationId, RowExclusiveLock);
2858
2859         /*
2860          * Are we adding the column to a recursion child?  If so, check
2861          * whether to merge with an existing definition for the column.
2862          */
2863         if (colDef->inhcount > 0)
2864         {
2865                 HeapTuple       tuple;
2866
2867                 /* Does child already have a column by this name? */
2868                 tuple = SearchSysCacheCopyAttName(myrelid, colDef->colname);
2869                 if (HeapTupleIsValid(tuple))
2870                 {
2871                         Form_pg_attribute childatt = (Form_pg_attribute) GETSTRUCT(tuple);
2872
2873                         /* Okay if child matches by type */
2874                         if (typenameTypeId(colDef->typename) != childatt->atttypid ||
2875                                 colDef->typename->typmod != childatt->atttypmod)
2876                                 ereport(ERROR,
2877                                                 (errcode(ERRCODE_DATATYPE_MISMATCH),
2878                                                  errmsg("child table \"%s\" has different type for column \"%s\"",
2879                                                 RelationGetRelationName(rel), colDef->colname)));
2880
2881                         /* Bump the existing child att's inhcount */
2882                         childatt->attinhcount++;
2883                         simple_heap_update(attrdesc, &tuple->t_self, tuple);
2884                         CatalogUpdateIndexes(attrdesc, tuple);
2885
2886                         heap_freetuple(tuple);
2887
2888                         /* Inform the user about the merge */
2889                         ereport(NOTICE,
2890                                         (errmsg("merging definition of column \"%s\" for child \"%s\"",
2891                                                 colDef->colname, RelationGetRelationName(rel))));
2892
2893                         heap_close(attrdesc, RowExclusiveLock);
2894                         return;
2895                 }
2896         }
2897
2898         pgclass = heap_open(RelationRelationId, RowExclusiveLock);
2899
2900         reltup = SearchSysCacheCopy(RELOID,
2901                                                                 ObjectIdGetDatum(myrelid),
2902                                                                 0, 0, 0);
2903         if (!HeapTupleIsValid(reltup))
2904                 elog(ERROR, "cache lookup failed for relation %u", myrelid);
2905
2906         /*
2907          * this test is deliberately not attisdropped-aware, since if one
2908          * tries to add a column matching a dropped column name, it's gonna
2909          * fail anyway.
2910          */
2911         if (SearchSysCacheExists(ATTNAME,
2912                                                          ObjectIdGetDatum(myrelid),
2913                                                          PointerGetDatum(colDef->colname),
2914                                                          0, 0))
2915                 ereport(ERROR,
2916                                 (errcode(ERRCODE_DUPLICATE_COLUMN),
2917                                  errmsg("column \"%s\" of relation \"%s\" already exists",
2918                                                 colDef->colname, RelationGetRelationName(rel))));
2919
2920         minattnum = ((Form_pg_class) GETSTRUCT(reltup))->relnatts;
2921         maxatts = minattnum + 1;
2922         if (maxatts > MaxHeapAttributeNumber)
2923                 ereport(ERROR,
2924                                 (errcode(ERRCODE_TOO_MANY_COLUMNS),
2925                                  errmsg("tables can have at most %d columns",
2926                                                 MaxHeapAttributeNumber)));
2927         i = minattnum + 1;
2928
2929         typeTuple = typenameType(colDef->typename);
2930         tform = (Form_pg_type) GETSTRUCT(typeTuple);
2931         typeOid = HeapTupleGetOid(typeTuple);
2932
2933         /* make sure datatype is legal for a column */
2934         CheckAttributeType(colDef->colname, typeOid);
2935
2936         attributeTuple = heap_addheader(Natts_pg_attribute,
2937                                                                         false,
2938                                                                         ATTRIBUTE_TUPLE_SIZE,
2939                                                                         (void *) &attributeD);
2940
2941         attribute = (Form_pg_attribute) GETSTRUCT(attributeTuple);
2942
2943         attribute->attrelid = myrelid;
2944         namestrcpy(&(attribute->attname), colDef->colname);
2945         attribute->atttypid = typeOid;
2946         attribute->attstattarget = -1;
2947         attribute->attlen = tform->typlen;
2948         attribute->attcacheoff = -1;
2949         attribute->atttypmod = colDef->typename->typmod;
2950         attribute->attnum = i;
2951         attribute->attbyval = tform->typbyval;
2952         attribute->attndims = list_length(colDef->typename->arrayBounds);
2953         attribute->attstorage = tform->typstorage;
2954         attribute->attalign = tform->typalign;
2955         attribute->attnotnull = colDef->is_not_null;
2956         attribute->atthasdef = false;
2957         attribute->attisdropped = false;
2958         attribute->attislocal = colDef->is_local;
2959         attribute->attinhcount = colDef->inhcount;
2960
2961         ReleaseSysCache(typeTuple);
2962
2963         simple_heap_insert(attrdesc, attributeTuple);
2964
2965         /* Update indexes on pg_attribute */
2966         CatalogUpdateIndexes(attrdesc, attributeTuple);
2967
2968         heap_close(attrdesc, RowExclusiveLock);
2969
2970         /*
2971          * Update number of attributes in pg_class tuple
2972          */
2973         ((Form_pg_class) GETSTRUCT(reltup))->relnatts = maxatts;
2974
2975         simple_heap_update(pgclass, &reltup->t_self, reltup);
2976
2977         /* keep catalog indexes current */
2978         CatalogUpdateIndexes(pgclass, reltup);
2979
2980         heap_freetuple(reltup);
2981
2982         heap_close(pgclass, RowExclusiveLock);
2983
2984         /* Make the attribute's catalog entry visible */
2985         CommandCounterIncrement();
2986
2987         /*
2988          * Store the DEFAULT, if any, in the catalogs
2989          */
2990         if (colDef->raw_default)
2991         {
2992                 RawColumnDefault *rawEnt;
2993
2994                 rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault));
2995                 rawEnt->attnum = attribute->attnum;
2996                 rawEnt->raw_default = copyObject(colDef->raw_default);
2997
2998                 /*
2999                  * This function is intended for CREATE TABLE, so it processes a
3000                  * _list_ of defaults, but we just do one.
3001                  */
3002                 AddRelationRawConstraints(rel, list_make1(rawEnt), NIL);
3003
3004                 /* Make the additional catalog changes visible */
3005                 CommandCounterIncrement();
3006         }
3007
3008         /*
3009          * Tell Phase 3 to fill in the default expression, if there is one.
3010          *
3011          * If there is no default, Phase 3 doesn't have to do anything, because
3012          * that effectively means that the default is NULL.  The heap tuple
3013          * access routines always check for attnum > # of attributes in tuple,
3014          * and return NULL if so, so without any modification of the tuple
3015          * data we will get the effect of NULL values in the new column.
3016          *
3017          * An exception occurs when the new column is of a domain type: the
3018          * domain might have a NOT NULL constraint, or a check constraint that
3019          * indirectly rejects nulls.  If there are any domain constraints then
3020          * we construct an explicit NULL default value that will be passed through
3021          * CoerceToDomain processing.  (This is a tad inefficient, since it
3022          * causes rewriting the table which we really don't have to do, but
3023          * the present design of domain processing doesn't offer any simple way
3024          * of checking the constraints more directly.)
3025          *
3026          * Note: we use build_column_default, and not just the cooked default
3027          * returned by AddRelationRawConstraints, so that the right thing
3028          * happens when a datatype's default applies.
3029          */
3030         defval = (Expr *) build_column_default(rel, attribute->attnum);
3031
3032         if (!defval && GetDomainConstraints(typeOid) != NIL)
3033         {
3034                 Oid             basetype = getBaseType(typeOid);
3035
3036                 defval = (Expr *) makeNullConst(basetype);
3037                 defval = (Expr *) coerce_to_target_type(NULL,
3038                                                                                                 (Node *) defval,
3039                                                                                                 basetype,
3040                                                                                                 typeOid,
3041                                                                                                 colDef->typename->typmod,
3042                                                                                                 COERCION_ASSIGNMENT,
3043                                                                                                 COERCE_IMPLICIT_CAST);
3044                 if (defval == NULL)             /* should not happen */
3045                         elog(ERROR, "failed to coerce base type to domain");
3046         }
3047
3048         if (defval)
3049         {
3050                 NewColumnValue *newval;
3051
3052                 newval = (NewColumnValue *) palloc0(sizeof(NewColumnValue));
3053                 newval->attnum = attribute->attnum;
3054                 newval->expr = defval;
3055
3056                 tab->newvals = lappend(tab->newvals, newval);
3057         }
3058
3059         /*
3060          * Add needed dependency entries for the new column.
3061          */
3062         add_column_datatype_dependency(myrelid, i, attribute->atttypid);
3063         if (colDef->support != NULL)
3064                 add_column_support_dependency(myrelid, i, colDef->support);
3065 }
3066
3067 /*
3068  * Install a column's dependency on its datatype.
3069  */
3070 static void
3071 add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid)
3072 {
3073         ObjectAddress myself,
3074                                 referenced;
3075
3076         myself.classId = RelationRelationId;
3077         myself.objectId = relid;
3078         myself.objectSubId = attnum;
3079         referenced.classId = TypeRelationId;
3080         referenced.objectId = typid;
3081         referenced.objectSubId = 0;
3082         recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
3083 }
3084
3085 /*
3086  * Install a dependency for a column's supporting relation (serial sequence).
3087  */
3088 static void
3089 add_column_support_dependency(Oid relid, int32 attnum, RangeVar *support)
3090 {
3091         ObjectAddress colobject,
3092                                 suppobject;
3093
3094         colobject.classId = RelationRelationId;
3095         colobject.objectId = relid;
3096         colobject.objectSubId = attnum;
3097         suppobject.classId = RelationRelationId;
3098         suppobject.objectId = RangeVarGetRelid(support, false);
3099         suppobject.objectSubId = 0;
3100         recordDependencyOn(&suppobject, &colobject, DEPENDENCY_INTERNAL);
3101 }
3102
3103 /*
3104  * ALTER TABLE ALTER COLUMN DROP NOT NULL
3105  */
3106 static void
3107 ATExecDropNotNull(Relation rel, const char *colName)
3108 {
3109         HeapTuple       tuple;
3110         AttrNumber      attnum;
3111         Relation        attr_rel;
3112         List       *indexoidlist;
3113         ListCell   *indexoidscan;
3114
3115         /*
3116          * lookup the attribute
3117          */
3118         attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
3119
3120         tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
3121
3122         if (!HeapTupleIsValid(tuple))
3123                 ereport(ERROR,
3124                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
3125                                  errmsg("column \"%s\" of relation \"%s\" does not exist",
3126                                                 colName, RelationGetRelationName(rel))));
3127
3128         attnum = ((Form_pg_attribute) GETSTRUCT(tuple))->attnum;
3129
3130         /* Prevent them from altering a system attribute */
3131         if (attnum <= 0)
3132                 ereport(ERROR,
3133                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3134                                  errmsg("cannot alter system column \"%s\"",
3135                                                 colName)));
3136
3137         /*
3138          * Check that the attribute is not in a primary key
3139          */
3140
3141         /* Loop over all indexes on the relation */
3142         indexoidlist = RelationGetIndexList(rel);
3143
3144         foreach(indexoidscan, indexoidlist)
3145         {
3146                 Oid                     indexoid = lfirst_oid(indexoidscan);
3147                 HeapTuple       indexTuple;
3148                 Form_pg_index indexStruct;
3149                 int                     i;
3150
3151                 indexTuple = SearchSysCache(INDEXRELID,
3152                                                                         ObjectIdGetDatum(indexoid),
3153                                                                         0, 0, 0);
3154                 if (!HeapTupleIsValid(indexTuple))
3155                         elog(ERROR, "cache lookup failed for index %u", indexoid);
3156                 indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
3157
3158                 /* If the index is not a primary key, skip the check */
3159                 if (indexStruct->indisprimary)
3160                 {
3161                         /*
3162                          * Loop over each attribute in the primary key and see if it
3163                          * matches the to-be-altered attribute
3164                          */
3165                         for (i = 0; i < indexStruct->indnatts; i++)
3166                         {
3167                                 if (indexStruct->indkey.values[i] == attnum)
3168                                         ereport(ERROR,
3169                                                         (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
3170                                                          errmsg("column \"%s\" is in a primary key",
3171                                                                         colName)));
3172                         }
3173                 }
3174
3175                 ReleaseSysCache(indexTuple);
3176         }
3177
3178         list_free(indexoidlist);
3179
3180         /*
3181          * Okay, actually perform the catalog change ... if needed
3182          */
3183         if (((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull)
3184         {
3185                 ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull = FALSE;
3186
3187                 simple_heap_update(attr_rel, &tuple->t_self, tuple);
3188
3189                 /* keep the system catalog indexes current */
3190                 CatalogUpdateIndexes(attr_rel, tuple);
3191         }
3192
3193         heap_close(attr_rel, RowExclusiveLock);
3194 }
3195
3196 /*
3197  * ALTER TABLE ALTER COLUMN SET NOT NULL
3198  */
3199 static void
3200 ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
3201                                  const char *colName)
3202 {
3203         HeapTuple       tuple;
3204         AttrNumber      attnum;
3205         Relation        attr_rel;
3206         NewConstraint *newcon;
3207
3208         /*
3209          * lookup the attribute
3210          */
3211         attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
3212
3213         tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
3214
3215         if (!HeapTupleIsValid(tuple))
3216                 ereport(ERROR,
3217                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
3218                                  errmsg("column \"%s\" of relation \"%s\" does not exist",
3219                                                 colName, RelationGetRelationName(rel))));
3220
3221         attnum = ((Form_pg_attribute) GETSTRUCT(tuple))->attnum;
3222
3223         /* Prevent them from altering a system attribute */
3224         if (attnum <= 0)
3225                 ereport(ERROR,
3226                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3227                                  errmsg("cannot alter system column \"%s\"",
3228                                                 colName)));
3229
3230         /*
3231          * Okay, actually perform the catalog change ... if needed
3232          */
3233         if (!((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull)
3234         {
3235                 ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull = TRUE;
3236
3237                 simple_heap_update(attr_rel, &tuple->t_self, tuple);
3238
3239                 /* keep the system catalog indexes current */
3240                 CatalogUpdateIndexes(attr_rel, tuple);
3241
3242                 /* Tell Phase 3 to test the constraint */
3243                 newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
3244                 newcon->contype = CONSTR_NOTNULL;
3245                 newcon->attnum = attnum;
3246                 newcon->name = "NOT NULL";
3247
3248                 tab->constraints = lappend(tab->constraints, newcon);
3249         }
3250
3251         heap_close(attr_rel, RowExclusiveLock);
3252 }
3253
3254 /*
3255  * ALTER TABLE ALTER COLUMN SET/DROP DEFAULT
3256  */
3257 static void
3258 ATExecColumnDefault(Relation rel, const char *colName,
3259                                         Node *newDefault)
3260 {
3261         AttrNumber      attnum;
3262
3263         /*
3264          * get the number of the attribute
3265          */
3266         attnum = get_attnum(RelationGetRelid(rel), colName);
3267         if (attnum == InvalidAttrNumber)
3268                 ereport(ERROR,
3269                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
3270                                  errmsg("column \"%s\" of relation \"%s\" does not exist",
3271                                                 colName, RelationGetRelationName(rel))));
3272
3273         /* Prevent them from altering a system attribute */
3274         if (attnum <= 0)
3275                 ereport(ERROR,
3276                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3277                                  errmsg("cannot alter system column \"%s\"",
3278                                                 colName)));
3279
3280         /*
3281          * Remove any old default for the column.  We use RESTRICT here for
3282          * safety, but at present we do not expect anything to depend on the
3283          * default.
3284          */
3285         RemoveAttrDefault(RelationGetRelid(rel), attnum, DROP_RESTRICT, false);
3286
3287         if (newDefault)
3288         {
3289                 /* SET DEFAULT */
3290                 RawColumnDefault *rawEnt;
3291
3292                 rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault));
3293                 rawEnt->attnum = attnum;
3294                 rawEnt->raw_default = newDefault;
3295
3296                 /*
3297                  * This function is intended for CREATE TABLE, so it processes a
3298                  * _list_ of defaults, but we just do one.
3299                  */
3300                 AddRelationRawConstraints(rel, list_make1(rawEnt), NIL);
3301         }
3302 }
3303
3304 /*
3305  * ALTER TABLE ALTER COLUMN SET STATISTICS
3306  */
3307 static void
3308 ATPrepSetStatistics(Relation rel, const char *colName, Node *flagValue)
3309 {
3310         /*
3311          * We do our own permission checking because (a) we want to allow SET
3312          * STATISTICS on indexes (for expressional index columns), and (b) we
3313          * want to allow SET STATISTICS on system catalogs without requiring
3314          * allowSystemTableMods to be turned on.
3315          */
3316         if (rel->rd_rel->relkind != RELKIND_RELATION &&
3317                 rel->rd_rel->relkind != RELKIND_INDEX)
3318                 ereport(ERROR,
3319                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
3320                                  errmsg("\"%s\" is not a table or index",
3321                                                 RelationGetRelationName(rel))));
3322
3323         /* Permissions checks */
3324         if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
3325                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
3326                                            RelationGetRelationName(rel));
3327 }
3328
3329 static void
3330 ATExecSetStatistics(Relation rel, const char *colName, Node *newValue)
3331 {
3332         int                     newtarget;
3333         Relation        attrelation;
3334         HeapTuple       tuple;
3335         Form_pg_attribute attrtuple;
3336
3337         Assert(IsA(newValue, Integer));
3338         newtarget = intVal(newValue);
3339
3340         /*
3341          * Limit target to a sane range
3342          */
3343         if (newtarget < -1)
3344         {
3345                 ereport(ERROR,
3346                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3347                                  errmsg("statistics target %d is too low",
3348                                                 newtarget)));
3349         }
3350         else if (newtarget > 1000)
3351         {
3352                 newtarget = 1000;
3353                 ereport(WARNING,
3354                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3355                                  errmsg("lowering statistics target to %d",
3356                                                 newtarget)));
3357         }
3358
3359         attrelation = heap_open(AttributeRelationId, RowExclusiveLock);
3360
3361         tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
3362
3363         if (!HeapTupleIsValid(tuple))
3364                 ereport(ERROR,
3365                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
3366                                  errmsg("column \"%s\" of relation \"%s\" does not exist",
3367                                                 colName, RelationGetRelationName(rel))));
3368         attrtuple = (Form_pg_attribute) GETSTRUCT(tuple);
3369
3370         if (attrtuple->attnum <= 0)
3371                 ereport(ERROR,
3372                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3373                                  errmsg("cannot alter system column \"%s\"",
3374                                                 colName)));
3375
3376         attrtuple->attstattarget = newtarget;
3377
3378         simple_heap_update(attrelation, &tuple->t_self, tuple);
3379
3380         /* keep system catalog indexes current */
3381         CatalogUpdateIndexes(attrelation, tuple);
3382
3383         heap_freetuple(tuple);
3384
3385         heap_close(attrelation, RowExclusiveLock);
3386 }
3387
3388 /*
3389  * ALTER TABLE ALTER COLUMN SET STORAGE
3390  */
3391 static void
3392 ATExecSetStorage(Relation rel, const char *colName, Node *newValue)
3393 {
3394         char       *storagemode;
3395         char            newstorage;
3396         Relation        attrelation;
3397         HeapTuple       tuple;
3398         Form_pg_attribute attrtuple;
3399
3400         Assert(IsA(newValue, String));
3401         storagemode = strVal(newValue);
3402
3403         if (pg_strcasecmp(storagemode, "plain") == 0)
3404                 newstorage = 'p';
3405         else if (pg_strcasecmp(storagemode, "external") == 0)
3406                 newstorage = 'e';
3407         else if (pg_strcasecmp(storagemode, "extended") == 0)
3408                 newstorage = 'x';
3409         else if (pg_strcasecmp(storagemode, "main") == 0)
3410                 newstorage = 'm';
3411         else
3412         {
3413                 ereport(ERROR,
3414                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3415                                  errmsg("invalid storage type \"%s\"",
3416                                                 storagemode)));
3417                 newstorage = 0;                 /* keep compiler quiet */
3418         }
3419
3420         attrelation = heap_open(AttributeRelationId, RowExclusiveLock);
3421
3422         tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
3423
3424         if (!HeapTupleIsValid(tuple))
3425                 ereport(ERROR,
3426                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
3427                                  errmsg("column \"%s\" of relation \"%s\" does not exist",
3428                                                 colName, RelationGetRelationName(rel))));
3429         attrtuple = (Form_pg_attribute) GETSTRUCT(tuple);
3430
3431         if (attrtuple->attnum <= 0)
3432                 ereport(ERROR,
3433                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3434                                  errmsg("cannot alter system column \"%s\"",
3435                                                 colName)));
3436
3437         /*
3438          * safety check: do not allow toasted storage modes unless column
3439          * datatype is TOAST-aware.
3440          */
3441         if (newstorage == 'p' || TypeIsToastable(attrtuple->atttypid))
3442                 attrtuple->attstorage = newstorage;
3443         else
3444                 ereport(ERROR,
3445                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3446                                  errmsg("column data type %s can only have storage PLAIN",
3447                                                 format_type_be(attrtuple->atttypid))));
3448
3449         simple_heap_update(attrelation, &tuple->t_self, tuple);
3450
3451         /* keep system catalog indexes current */
3452         CatalogUpdateIndexes(attrelation, tuple);
3453
3454         heap_freetuple(tuple);
3455
3456         heap_close(attrelation, RowExclusiveLock);
3457 }
3458
3459
3460 /*
3461  * ALTER TABLE DROP COLUMN
3462  *
3463  * DROP COLUMN cannot use the normal ALTER TABLE recursion mechanism,
3464  * because we have to decide at runtime whether to recurse or not depending
3465  * on whether attinhcount goes to zero or not.  (We can't check this in a
3466  * static pre-pass because it won't handle multiple inheritance situations
3467  * correctly.)  Since DROP COLUMN doesn't need to create any work queue
3468  * entries for Phase 3, it's okay to recurse internally in this routine
3469  * without considering the work queue.
3470  */
3471 static void
3472 ATExecDropColumn(Relation rel, const char *colName,
3473                                  DropBehavior behavior,
3474                                  bool recurse, bool recursing)
3475 {
3476         HeapTuple       tuple;
3477         Form_pg_attribute targetatt;
3478         AttrNumber      attnum;
3479         List       *children;
3480         ObjectAddress object;
3481
3482         /* At top level, permission check was done in ATPrepCmd, else do it */
3483         if (recursing)
3484                 ATSimplePermissions(rel, false);
3485
3486         /*
3487          * get the number of the attribute
3488          */
3489         tuple = SearchSysCacheAttName(RelationGetRelid(rel), colName);
3490         if (!HeapTupleIsValid(tuple))
3491                 ereport(ERROR,
3492                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
3493                                  errmsg("column \"%s\" of relation \"%s\" does not exist",
3494                                                 colName, RelationGetRelationName(rel))));
3495         targetatt = (Form_pg_attribute) GETSTRUCT(tuple);
3496
3497         attnum = targetatt->attnum;
3498
3499         /* Can't drop a system attribute, except OID */
3500         if (attnum <= 0 && attnum != ObjectIdAttributeNumber)
3501                 ereport(ERROR,
3502                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3503                                  errmsg("cannot drop system column \"%s\"",
3504                                                 colName)));
3505
3506         /* Don't drop inherited columns */
3507         if (targetatt->attinhcount > 0 && !recursing)
3508                 ereport(ERROR,
3509                                 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
3510                                  errmsg("cannot drop inherited column \"%s\"",
3511                                                 colName)));
3512
3513         ReleaseSysCache(tuple);
3514
3515         /*
3516          * Propagate to children as appropriate.  Unlike most other ALTER
3517          * routines, we have to do this one level of recursion at a time; we
3518          * can't use find_all_inheritors to do it in one pass.
3519          */
3520         children = find_inheritance_children(RelationGetRelid(rel));
3521
3522         if (children)
3523         {
3524                 Relation        attr_rel;
3525                 ListCell   *child;
3526
3527                 attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
3528                 foreach(child, children)
3529                 {
3530                         Oid                     childrelid = lfirst_oid(child);
3531                         Relation        childrel;
3532                         Form_pg_attribute childatt;
3533
3534                         childrel = heap_open(childrelid, AccessExclusiveLock);
3535
3536                         tuple = SearchSysCacheCopyAttName(childrelid, colName);
3537                         if (!HeapTupleIsValid(tuple))           /* shouldn't happen */
3538                                 elog(ERROR, "cache lookup failed for attribute \"%s\" of relation %u",
3539                                          colName, childrelid);
3540                         childatt = (Form_pg_attribute) GETSTRUCT(tuple);
3541
3542                         if (childatt->attinhcount <= 0)         /* shouldn't happen */
3543                                 elog(ERROR, "relation %u has non-inherited attribute \"%s\"",
3544                                          childrelid, colName);
3545
3546                         if (recurse)
3547                         {
3548                                 /*
3549                                  * If the child column has other definition sources, just
3550                                  * decrement its inheritance count; if not, recurse to
3551                                  * delete it.
3552                                  */
3553                                 if (childatt->attinhcount == 1 && !childatt->attislocal)
3554                                 {
3555                                         /* Time to delete this child column, too */
3556                                         ATExecDropColumn(childrel, colName, behavior, true, true);
3557                                 }
3558                                 else
3559                                 {
3560                                         /* Child column must survive my deletion */
3561                                         childatt->attinhcount--;
3562
3563                                         simple_heap_update(attr_rel, &tuple->t_self, tuple);
3564
3565                                         /* keep the system catalog indexes current */
3566                                         CatalogUpdateIndexes(attr_rel, tuple);
3567
3568                                         /* Make update visible */
3569                                         CommandCounterIncrement();
3570                                 }
3571                         }
3572                         else
3573                         {
3574                                 /*
3575                                  * If we were told to drop ONLY in this table (no
3576                                  * recursion), we need to mark the inheritors' attribute
3577                                  * as locally defined rather than inherited.
3578                                  */
3579                                 childatt->attinhcount--;
3580                                 childatt->attislocal = true;
3581
3582                                 simple_heap_update(attr_rel, &tuple->t_self, tuple);
3583
3584                                 /* keep the system catalog indexes current */
3585                                 CatalogUpdateIndexes(attr_rel, tuple);
3586
3587                                 /* Make update visible */
3588                                 CommandCounterIncrement();
3589                         }
3590
3591                         heap_freetuple(tuple);
3592
3593                         heap_close(childrel, NoLock);
3594                 }
3595                 heap_close(attr_rel, RowExclusiveLock);
3596         }
3597
3598         /*
3599          * Perform the actual column deletion
3600          */
3601         object.classId = RelationRelationId;
3602         object.objectId = RelationGetRelid(rel);
3603         object.objectSubId = attnum;
3604
3605         performDeletion(&object, behavior);
3606
3607         /*
3608          * If we dropped the OID column, must adjust pg_class.relhasoids
3609          */
3610         if (attnum == ObjectIdAttributeNumber)
3611         {
3612                 Relation        class_rel;
3613                 Form_pg_class tuple_class;
3614
3615                 class_rel = heap_open(RelationRelationId, RowExclusiveLock);
3616
3617                 tuple = SearchSysCacheCopy(RELOID,
3618                                                                  ObjectIdGetDatum(RelationGetRelid(rel)),
3619                                                                    0, 0, 0);
3620                 if (!HeapTupleIsValid(tuple))
3621                         elog(ERROR, "cache lookup failed for relation %u",
3622                                  RelationGetRelid(rel));
3623                 tuple_class = (Form_pg_class) GETSTRUCT(tuple);
3624
3625                 tuple_class->relhasoids = false;
3626                 simple_heap_update(class_rel, &tuple->t_self, tuple);
3627
3628                 /* Keep the catalog indexes up to date */
3629                 CatalogUpdateIndexes(class_rel, tuple);
3630
3631                 heap_close(class_rel, RowExclusiveLock);
3632         }
3633 }
3634
3635 /*
3636  * ALTER TABLE ADD INDEX
3637  *
3638  * There is no such command in the grammar, but the parser converts UNIQUE
3639  * and PRIMARY KEY constraints into AT_AddIndex subcommands.  This lets us
3640  * schedule creation of the index at the appropriate time during ALTER.
3641  */
3642 static void
3643 ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
3644                            IndexStmt *stmt, bool is_rebuild)
3645 {
3646         bool            check_rights;
3647         bool            skip_build;
3648         bool            quiet;
3649
3650         Assert(IsA(stmt, IndexStmt));
3651
3652         /* suppress schema rights check when rebuilding existing index */
3653         check_rights = !is_rebuild;
3654         /* skip index build if phase 3 will have to rewrite table anyway */
3655         skip_build = (tab->newvals != NIL);
3656         /* suppress notices when rebuilding existing index */
3657         quiet = is_rebuild;
3658
3659         DefineIndex(stmt->relation, /* relation */
3660                                 stmt->idxname,  /* index name */
3661                                 InvalidOid,             /* no predefined OID */
3662                                 stmt->accessMethod,             /* am name */
3663                                 stmt->tableSpace,
3664                                 stmt->indexParams,              /* parameters */
3665                                 (Expr *) stmt->whereClause,
3666                                 stmt->rangetable,
3667                                 stmt->unique,
3668                                 stmt->primary,
3669                                 stmt->isconstraint,
3670                                 true,                   /* is_alter_table */
3671                                 check_rights,
3672                                 skip_build,
3673                                 quiet);
3674 }
3675
3676 /*
3677  * ALTER TABLE ADD CONSTRAINT
3678  */
3679 static void
3680 ATExecAddConstraint(AlteredTableInfo *tab, Relation rel, Node *newConstraint)
3681 {
3682         switch (nodeTag(newConstraint))
3683         {
3684                 case T_Constraint:
3685                         {
3686                                 Constraint *constr = (Constraint *) newConstraint;
3687
3688                                 /*
3689                                  * Currently, we only expect to see CONSTR_CHECK nodes
3690                                  * arriving here (see the preprocessing done in
3691                                  * parser/analyze.c).  Use a switch anyway to make it
3692                                  * easier to add more code later.
3693                                  */
3694                                 switch (constr->contype)
3695                                 {
3696                                         case CONSTR_CHECK:
3697                                                 {
3698                                                         List       *newcons;
3699                                                         ListCell   *lcon;
3700
3701                                                         /*
3702                                                          * Call AddRelationRawConstraints to do the
3703                                                          * work. It returns a list of cooked
3704                                                          * constraints.
3705                                                          */
3706                                                         newcons = AddRelationRawConstraints(rel, NIL,
3707                                                                                                          list_make1(constr));
3708                                                         /* Add each constraint to Phase 3's queue */
3709                                                         foreach(lcon, newcons)
3710                                                         {
3711                                                                 CookedConstraint *ccon = (CookedConstraint *) lfirst(lcon);
3712                                                                 NewConstraint *newcon;
3713
3714                                                                 newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
3715                                                                 newcon->name = ccon->name;
3716                                                                 newcon->contype = ccon->contype;
3717                                                                 newcon->attnum = ccon->attnum;
3718                                                                 /* ExecQual wants implicit-AND format */
3719                                                                 newcon->qual = (Node *)
3720                                                                         make_ands_implicit((Expr *) ccon->expr);
3721
3722                                                                 tab->constraints = lappend(tab->constraints,
3723                                                                                                                    newcon);
3724                                                         }
3725                                                         break;
3726                                                 }
3727                                         default:
3728                                                 elog(ERROR, "unrecognized constraint type: %d",
3729                                                          (int) constr->contype);
3730                                 }
3731                                 break;
3732                         }
3733                 case T_FkConstraint:
3734                         {
3735                                 FkConstraint *fkconstraint = (FkConstraint *) newConstraint;
3736
3737                                 /*
3738                                  * Assign or validate constraint name
3739                                  */
3740                                 if (fkconstraint->constr_name)
3741                                 {
3742                                         if (ConstraintNameIsUsed(CONSTRAINT_RELATION,
3743                                                                                          RelationGetRelid(rel),
3744                                                                                          RelationGetNamespace(rel),
3745                                                                                          fkconstraint->constr_name))
3746                                                 ereport(ERROR,
3747                                                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
3748                                                                  errmsg("constraint \"%s\" for relation \"%s\" already exists",
3749                                                                                 fkconstraint->constr_name,
3750                                                                                 RelationGetRelationName(rel))));
3751                                 }
3752                                 else
3753                                         fkconstraint->constr_name =
3754                                                 ChooseConstraintName(RelationGetRelationName(rel),
3755                                                                 strVal(linitial(fkconstraint->fk_attrs)),
3756                                                                                          "fkey",
3757                                                                                          RelationGetNamespace(rel),
3758                                                                                          NIL);
3759
3760                                 ATAddForeignKeyConstraint(tab, rel, fkconstraint);
3761
3762                                 break;
3763                         }
3764                 default:
3765                         elog(ERROR, "unrecognized node type: %d",
3766                                  (int) nodeTag(newConstraint));
3767         }
3768 }
3769
3770 /*
3771  * Add a foreign-key constraint to a single table
3772  *
3773  * Subroutine for ATExecAddConstraint.  Must already hold exclusive
3774  * lock on the rel, and have done appropriate validity/permissions checks
3775  * for it.
3776  */
3777 static void
3778 ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
3779                                                   FkConstraint *fkconstraint)
3780 {
3781         Relation        pkrel;
3782         AclResult       aclresult;
3783         int16           pkattnum[INDEX_MAX_KEYS];
3784         int16           fkattnum[INDEX_MAX_KEYS];
3785         Oid                     pktypoid[INDEX_MAX_KEYS];
3786         Oid                     fktypoid[INDEX_MAX_KEYS];
3787         Oid                     opclasses[INDEX_MAX_KEYS];
3788         int                     i;
3789         int                     numfks,
3790                                 numpks;
3791         Oid                     indexOid;
3792         Oid                     constrOid;
3793
3794         /*
3795          * Grab an exclusive lock on the pk table, so that someone doesn't
3796          * delete rows out from under us. (Although a lesser lock would do for
3797          * that purpose, we'll need exclusive lock anyway to add triggers to
3798          * the pk table; trying to start with a lesser lock will just create a
3799          * risk of deadlock.)
3800          */
3801         pkrel = heap_openrv(fkconstraint->pktable, AccessExclusiveLock);
3802
3803         /*
3804          * Validity and permissions checks
3805          *
3806          * Note: REFERENCES permissions checks are redundant with CREATE TRIGGER,
3807          * but we may as well error out sooner instead of later.
3808          */
3809         if (pkrel->rd_rel->relkind != RELKIND_RELATION)
3810                 ereport(ERROR,
3811                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
3812                                  errmsg("referenced relation \"%s\" is not a table",
3813                                                 RelationGetRelationName(pkrel))));
3814
3815         aclresult = pg_class_aclcheck(RelationGetRelid(pkrel), GetUserId(),
3816                                                                   ACL_REFERENCES);
3817         if (aclresult != ACLCHECK_OK)
3818                 aclcheck_error(aclresult, ACL_KIND_CLASS,
3819                                            RelationGetRelationName(pkrel));
3820
3821         if (!allowSystemTableMods && IsSystemRelation(pkrel))
3822                 ereport(ERROR,
3823                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
3824                                  errmsg("permission denied: \"%s\" is a system catalog",
3825                                                 RelationGetRelationName(pkrel))));
3826
3827         aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
3828                                                                   ACL_REFERENCES);
3829         if (aclresult != ACLCHECK_OK)
3830                 aclcheck_error(aclresult, ACL_KIND_CLASS,
3831                                            RelationGetRelationName(rel));
3832
3833         /*
3834          * Disallow reference from permanent table to temp table or vice
3835          * versa. (The ban on perm->temp is for fairly obvious reasons.  The
3836          * ban on temp->perm is because other backends might need to run the
3837          * RI triggers on the perm table, but they can't reliably see tuples
3838          * the owning backend has created in the temp table, because
3839          * non-shared buffers are used for temp tables.)
3840          */
3841         if (isTempNamespace(RelationGetNamespace(pkrel)))
3842         {
3843                 if (!isTempNamespace(RelationGetNamespace(rel)))
3844                         ereport(ERROR,
3845                                         (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
3846                                          errmsg("cannot reference temporary table from permanent table constraint")));
3847         }
3848         else
3849         {
3850                 if (isTempNamespace(RelationGetNamespace(rel)))
3851                         ereport(ERROR,
3852                                         (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
3853                                          errmsg("cannot reference permanent table from temporary table constraint")));
3854         }
3855
3856         /*
3857          * Look up the referencing attributes to make sure they exist, and
3858          * record their attnums and type OIDs.
3859          */
3860         MemSet(pkattnum, 0, sizeof(pkattnum));
3861         MemSet(fkattnum, 0, sizeof(fkattnum));
3862         MemSet(pktypoid, 0, sizeof(pktypoid));
3863         MemSet(fktypoid, 0, sizeof(fktypoid));
3864         MemSet(opclasses, 0, sizeof(opclasses));
3865
3866         numfks = transformColumnNameList(RelationGetRelid(rel),
3867                                                                          fkconstraint->fk_attrs,
3868                                                                          fkattnum, fktypoid);
3869
3870         /*
3871          * If the attribute list for the referenced table was omitted, lookup
3872          * the definition of the primary key and use it.  Otherwise, validate
3873          * the supplied attribute list.  In either case, discover the index
3874          * OID and index opclasses, and the attnums and type OIDs of the
3875          * attributes.
3876          */
3877         if (fkconstraint->pk_attrs == NIL)
3878         {
3879                 numpks = transformFkeyGetPrimaryKey(pkrel, &indexOid,
3880                                                                                         &fkconstraint->pk_attrs,
3881                                                                                         pkattnum, pktypoid,
3882                                                                                         opclasses);
3883         }
3884         else
3885         {
3886                 numpks = transformColumnNameList(RelationGetRelid(pkrel),
3887                                                                                  fkconstraint->pk_attrs,
3888                                                                                  pkattnum, pktypoid);
3889                 /* Look for an index matching the column list */
3890                 indexOid = transformFkeyCheckAttrs(pkrel, numpks, pkattnum,
3891                                                                                    opclasses);
3892         }
3893
3894         /* Be sure referencing and referenced column types are comparable */
3895         if (numfks != numpks)
3896                 ereport(ERROR,
3897                                 (errcode(ERRCODE_INVALID_FOREIGN_KEY),
3898                                  errmsg("number of referencing and referenced columns for foreign key disagree")));
3899
3900         for (i = 0; i < numpks; i++)
3901         {
3902                 /*
3903                  * pktypoid[i] is the primary key table's i'th key's type
3904                  * fktypoid[i] is the foreign key table's i'th key's type
3905                  *
3906                  * Note that we look for an operator with the PK type on the left;
3907                  * when the types are different this is critical because the PK
3908                  * index will need operators with the indexkey on the left.
3909                  * (Ordinarily both commutator operators will exist if either
3910                  * does, but we won't get the right answer from the test below on
3911                  * opclass membership unless we select the proper operator.)
3912                  */
3913                 Operator        o = oper(list_make1(makeString("=")),
3914                                                          pktypoid[i], fktypoid[i], true);
3915
3916                 if (o == NULL)
3917                         ereport(ERROR,
3918                                         (errcode(ERRCODE_UNDEFINED_FUNCTION),
3919                                          errmsg("foreign key constraint \"%s\" "
3920                                                         "cannot be implemented",
3921                                                         fkconstraint->constr_name),
3922                                          errdetail("Key columns \"%s\" and \"%s\" "
3923                                                            "are of incompatible types: %s and %s.",
3924                                                          strVal(list_nth(fkconstraint->fk_attrs, i)),
3925                                                          strVal(list_nth(fkconstraint->pk_attrs, i)),
3926                                                            format_type_be(fktypoid[i]),
3927                                                            format_type_be(pktypoid[i]))));
3928
3929                 /*
3930                  * Check that the found operator is compatible with the PK index,
3931                  * and generate a warning if not, since otherwise costly seqscans
3932                  * will be incurred to check FK validity.
3933                  */
3934                 if (!op_in_opclass(oprid(o), opclasses[i]))
3935                         ereport(WARNING,
3936                                         (errmsg("foreign key constraint \"%s\" "
3937                                                         "will require costly sequential scans",
3938                                                         fkconstraint->constr_name),
3939                                          errdetail("Key columns \"%s\" and \"%s\" "
3940                                                            "are of different types: %s and %s.",
3941                                                          strVal(list_nth(fkconstraint->fk_attrs, i)),
3942                                                          strVal(list_nth(fkconstraint->pk_attrs, i)),
3943                                                            format_type_be(fktypoid[i]),
3944                                                            format_type_be(pktypoid[i]))));
3945
3946                 ReleaseSysCache(o);
3947         }
3948
3949         /*
3950          * Tell Phase 3 to check that the constraint is satisfied by existing
3951          * rows (we can skip this during table creation).
3952          */
3953         if (!fkconstraint->skip_validation)
3954         {
3955                 NewConstraint *newcon;
3956
3957                 newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
3958                 newcon->name = fkconstraint->constr_name;
3959                 newcon->contype = CONSTR_FOREIGN;
3960                 newcon->refrelid = RelationGetRelid(pkrel);
3961                 newcon->qual = (Node *) fkconstraint;
3962
3963                 tab->constraints = lappend(tab->constraints, newcon);
3964         }
3965
3966         /*
3967          * Record the FK constraint in pg_constraint.
3968          */
3969         constrOid = CreateConstraintEntry(fkconstraint->constr_name,
3970                                                                           RelationGetNamespace(rel),
3971                                                                           CONSTRAINT_FOREIGN,
3972                                                                           fkconstraint->deferrable,
3973                                                                           fkconstraint->initdeferred,
3974                                                                           RelationGetRelid(rel),
3975                                                                           fkattnum,
3976                                                                           numfks,
3977                                                                           InvalidOid,           /* not a domain
3978                                                                                                                  * constraint */
3979                                                                           RelationGetRelid(pkrel),
3980                                                                           pkattnum,
3981                                                                           numpks,
3982                                                                           fkconstraint->fk_upd_action,
3983                                                                           fkconstraint->fk_del_action,
3984                                                                           fkconstraint->fk_matchtype,
3985                                                                           indexOid,
3986                                                                           NULL,         /* no check constraint */
3987                                                                           NULL,
3988                                                                           NULL);
3989
3990         /*
3991          * Create the triggers that will enforce the constraint.
3992          */
3993         createForeignKeyTriggers(rel, fkconstraint, constrOid);
3994
3995         /*
3996          * Close pk table, but keep lock until we've committed.
3997          */
3998         heap_close(pkrel, NoLock);
3999 }
4000
4001
4002 /*
4003  * transformColumnNameList - transform list of column names
4004  *
4005  * Lookup each name and return its attnum and type OID
4006  */
4007 static int
4008 transformColumnNameList(Oid relId, List *colList,
4009                                                 int16 *attnums, Oid *atttypids)
4010 {
4011         ListCell   *l;
4012         int                     attnum;
4013
4014         attnum = 0;
4015         foreach(l, colList)
4016         {
4017                 char       *attname = strVal(lfirst(l));
4018                 HeapTuple       atttuple;
4019
4020                 atttuple = SearchSysCacheAttName(relId, attname);
4021                 if (!HeapTupleIsValid(atttuple))
4022                         ereport(ERROR,
4023                                         (errcode(ERRCODE_UNDEFINED_COLUMN),
4024                                          errmsg("column \"%s\" referenced in foreign key constraint does not exist",
4025                                                         attname)));
4026                 if (attnum >= INDEX_MAX_KEYS)
4027                         ereport(ERROR,
4028                                         (errcode(ERRCODE_TOO_MANY_COLUMNS),
4029                                  errmsg("cannot have more than %d keys in a foreign key",
4030                                                 INDEX_MAX_KEYS)));
4031                 attnums[attnum] = ((Form_pg_attribute) GETSTRUCT(atttuple))->attnum;
4032                 atttypids[attnum] = ((Form_pg_attribute) GETSTRUCT(atttuple))->atttypid;
4033                 ReleaseSysCache(atttuple);
4034                 attnum++;
4035         }
4036
4037         return attnum;
4038 }
4039
4040 /*
4041  * transformFkeyGetPrimaryKey -
4042  *
4043  *      Look up the names, attnums, and types of the primary key attributes
4044  *      for the pkrel.  Also return the index OID and index opclasses of the
4045  *      index supporting the primary key.
4046  *
4047  *      All parameters except pkrel are output parameters.      Also, the function
4048  *      return value is the number of attributes in the primary key.
4049  *
4050  *      Used when the column list in the REFERENCES specification is omitted.
4051  */
4052 static int
4053 transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
4054                                                    List **attnamelist,
4055                                                    int16 *attnums, Oid *atttypids,
4056                                                    Oid *opclasses)
4057 {
4058         List       *indexoidlist;
4059         ListCell   *indexoidscan;
4060         HeapTuple       indexTuple = NULL;
4061         Form_pg_index indexStruct = NULL;
4062         Datum           indclassDatum;
4063         bool            isnull;
4064         oidvector  *indclass;
4065         int                     i;
4066
4067         /*
4068          * Get the list of index OIDs for the table from the relcache, and
4069          * look up each one in the pg_index syscache until we find one marked
4070          * primary key (hopefully there isn't more than one such).
4071          */
4072         indexoidlist = RelationGetIndexList(pkrel);
4073
4074         foreach(indexoidscan, indexoidlist)
4075         {
4076                 Oid                     indexoid = lfirst_oid(indexoidscan);
4077
4078                 indexTuple = SearchSysCache(INDEXRELID,
4079                                                                         ObjectIdGetDatum(indexoid),
4080                                                                         0, 0, 0);
4081                 if (!HeapTupleIsValid(indexTuple))
4082                         elog(ERROR, "cache lookup failed for index %u", indexoid);
4083                 indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
4084                 if (indexStruct->indisprimary)
4085                 {
4086                         *indexOid = indexoid;
4087                         break;
4088                 }
4089                 ReleaseSysCache(indexTuple);
4090                 indexStruct = NULL;
4091         }
4092
4093         list_free(indexoidlist);
4094
4095         /*
4096          * Check that we found it
4097          */
4098         if (indexStruct == NULL)
4099                 ereport(ERROR,
4100                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
4101                         errmsg("there is no primary key for referenced table \"%s\"",
4102                                    RelationGetRelationName(pkrel))));
4103
4104         /* Must get indclass the hard way */
4105         indclassDatum = SysCacheGetAttr(INDEXRELID, indexTuple,
4106                                                                         Anum_pg_index_indclass, &isnull);
4107         Assert(!isnull);
4108         indclass = (oidvector *) DatumGetPointer(indclassDatum);
4109
4110         /*
4111          * Now build the list of PK attributes from the indkey definition (we
4112          * assume a primary key cannot have expressional elements)
4113          */
4114         *attnamelist = NIL;
4115         for (i = 0; i < indexStruct->indnatts; i++)
4116         {
4117                 int                     pkattno = indexStruct->indkey.values[i];
4118
4119                 attnums[i] = pkattno;
4120                 atttypids[i] = attnumTypeId(pkrel, pkattno);
4121                 opclasses[i] = indclass->values[i];
4122                 *attnamelist = lappend(*attnamelist,
4123                    makeString(pstrdup(NameStr(*attnumAttName(pkrel, pkattno)))));
4124         }
4125
4126         ReleaseSysCache(indexTuple);
4127
4128         return i;
4129 }
4130
4131 /*
4132  * transformFkeyCheckAttrs -
4133  *
4134  *      Make sure that the attributes of a referenced table belong to a unique
4135  *      (or primary key) constraint.  Return the OID of the index supporting
4136  *      the constraint, as well as the opclasses associated with the index
4137  *      columns.
4138  */
4139 static Oid
4140 transformFkeyCheckAttrs(Relation pkrel,
4141                                                 int numattrs, int16 *attnums,
4142                                                 Oid *opclasses) /* output parameter */
4143 {
4144         Oid                     indexoid = InvalidOid;
4145         bool            found = false;
4146         List       *indexoidlist;
4147         ListCell   *indexoidscan;
4148
4149         /*
4150          * Get the list of index OIDs for the table from the relcache, and
4151          * look up each one in the pg_index syscache, and match unique indexes
4152          * to the list of attnums we are given.
4153          */
4154         indexoidlist = RelationGetIndexList(pkrel);
4155
4156         foreach(indexoidscan, indexoidlist)
4157         {
4158                 HeapTuple       indexTuple;
4159                 Form_pg_index indexStruct;
4160                 int                     i,
4161                                         j;
4162
4163                 indexoid = lfirst_oid(indexoidscan);
4164                 indexTuple = SearchSysCache(INDEXRELID,
4165                                                                         ObjectIdGetDatum(indexoid),
4166                                                                         0, 0, 0);
4167                 if (!HeapTupleIsValid(indexTuple))
4168                         elog(ERROR, "cache lookup failed for index %u", indexoid);
4169                 indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
4170
4171                 /*
4172                  * Must have the right number of columns; must be unique and not a
4173                  * partial index; forget it if there are any expressions, too
4174                  */
4175                 if (indexStruct->indnatts == numattrs &&
4176                         indexStruct->indisunique &&
4177                         heap_attisnull(indexTuple, Anum_pg_index_indpred) &&
4178                         heap_attisnull(indexTuple, Anum_pg_index_indexprs))
4179                 {
4180                         /* Must get indclass the hard way */
4181                         Datum           indclassDatum;
4182                         bool            isnull;
4183                         oidvector  *indclass;
4184
4185                         indclassDatum = SysCacheGetAttr(INDEXRELID, indexTuple,
4186                                                                                         Anum_pg_index_indclass, &isnull);
4187                         Assert(!isnull);
4188                         indclass = (oidvector *) DatumGetPointer(indclassDatum);
4189
4190                         /*
4191                          * The given attnum list may match the index columns in any
4192                          * order.  Check that each list is a subset of the other.
4193                          */
4194                         for (i = 0; i < numattrs; i++)
4195                         {
4196                                 found = false;
4197                                 for (j = 0; j < numattrs; j++)
4198                                 {
4199                                         if (attnums[i] == indexStruct->indkey.values[j])
4200                                         {
4201                                                 found = true;
4202                                                 break;
4203                                         }
4204                                 }
4205                                 if (!found)
4206                                         break;
4207                         }
4208                         if (found)
4209                         {
4210                                 for (i = 0; i < numattrs; i++)
4211                                 {
4212                                         found = false;
4213                                         for (j = 0; j < numattrs; j++)
4214                                         {
4215                                                 if (attnums[j] == indexStruct->indkey.values[i])
4216                                                 {
4217                                                         opclasses[j] = indclass->values[i];
4218                                                         found = true;
4219                                                         break;
4220                                                 }
4221                                         }
4222                                         if (!found)
4223                                                 break;
4224                                 }
4225                         }
4226                 }
4227                 ReleaseSysCache(indexTuple);
4228                 if (found)
4229                         break;
4230         }
4231
4232         if (!found)
4233                 ereport(ERROR,
4234                                 (errcode(ERRCODE_INVALID_FOREIGN_KEY),
4235                                  errmsg("there is no unique constraint matching given keys for referenced table \"%s\"",
4236                                                 RelationGetRelationName(pkrel))));
4237
4238         list_free(indexoidlist);
4239
4240         return indexoid;
4241 }
4242
4243 /*
4244  * Scan the existing rows in a table to verify they meet a proposed FK
4245  * constraint.
4246  *
4247  * Caller must have opened and locked both relations.
4248  */
4249 static void
4250 validateForeignKeyConstraint(FkConstraint *fkconstraint,
4251                                                          Relation rel,
4252                                                          Relation pkrel)
4253 {
4254         HeapScanDesc scan;
4255         HeapTuple       tuple;
4256         Trigger         trig;
4257         ListCell   *list;
4258         int                     count;
4259
4260         /*
4261          * See if we can do it with a single LEFT JOIN query.  A FALSE result
4262          * indicates we must proceed with the fire-the-trigger method.
4263          */
4264         if (RI_Initial_Check(fkconstraint, rel, pkrel))
4265                 return;
4266
4267         /*
4268          * Scan through each tuple, calling RI_FKey_check_ins (insert trigger)
4269          * as if that tuple had just been inserted.  If any of those fail, it
4270          * should ereport(ERROR) and that's that.
4271          */
4272         MemSet(&trig, 0, sizeof(trig));
4273         trig.tgoid = InvalidOid;
4274         trig.tgname = fkconstraint->constr_name;
4275         trig.tgenabled = TRUE;
4276         trig.tgisconstraint = TRUE;
4277         trig.tgconstrrelid = RelationGetRelid(pkrel);
4278         trig.tgdeferrable = FALSE;
4279         trig.tginitdeferred = FALSE;
4280
4281         trig.tgargs = (char **) palloc(sizeof(char *) *
4282                                                                  (4 + list_length(fkconstraint->fk_attrs)
4283                                                                   + list_length(fkconstraint->pk_attrs)));
4284
4285         trig.tgargs[0] = trig.tgname;
4286         trig.tgargs[1] = RelationGetRelationName(rel);
4287         trig.tgargs[2] = RelationGetRelationName(pkrel);
4288         trig.tgargs[3] = fkMatchTypeToString(fkconstraint->fk_matchtype);
4289         count = 4;
4290         foreach(list, fkconstraint->fk_attrs)
4291         {
4292                 char       *fk_at = strVal(lfirst(list));
4293
4294                 trig.tgargs[count] = fk_at;
4295                 count += 2;
4296         }
4297         count = 5;
4298         foreach(list, fkconstraint->pk_attrs)
4299         {
4300                 char       *pk_at = strVal(lfirst(list));
4301
4302                 trig.tgargs[count] = pk_at;
4303                 count += 2;
4304         }
4305         trig.tgnargs = count - 1;
4306
4307         scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
4308
4309         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
4310         {
4311                 FunctionCallInfoData fcinfo;
4312                 TriggerData trigdata;
4313
4314                 /*
4315                  * Make a call to the trigger function
4316                  *
4317                  * No parameters are passed, but we do set a context
4318                  */
4319                 MemSet(&fcinfo, 0, sizeof(fcinfo));
4320
4321                 /*
4322                  * We assume RI_FKey_check_ins won't look at flinfo...
4323                  */
4324                 trigdata.type = T_TriggerData;
4325                 trigdata.tg_event = TRIGGER_EVENT_INSERT | TRIGGER_EVENT_ROW;
4326                 trigdata.tg_relation = rel;
4327                 trigdata.tg_trigtuple = tuple;
4328                 trigdata.tg_newtuple = NULL;
4329                 trigdata.tg_trigger = &trig;
4330                 trigdata.tg_trigtuplebuf = scan->rs_cbuf;
4331                 trigdata.tg_newtuplebuf = InvalidBuffer;
4332
4333                 fcinfo.context = (Node *) &trigdata;
4334
4335                 RI_FKey_check_ins(&fcinfo);
4336         }
4337
4338         heap_endscan(scan);
4339
4340         pfree(trig.tgargs);
4341 }
4342
4343 static void
4344 CreateFKCheckTrigger(RangeVar *myRel, FkConstraint *fkconstraint,
4345                                          ObjectAddress *constrobj, ObjectAddress *trigobj,
4346                                          bool on_insert)
4347 {
4348         CreateTrigStmt *fk_trigger;
4349         ListCell   *fk_attr;
4350         ListCell   *pk_attr;
4351
4352         fk_trigger = makeNode(CreateTrigStmt);
4353         fk_trigger->trigname = fkconstraint->constr_name;
4354         fk_trigger->relation = myRel;
4355         fk_trigger->before = false;
4356         fk_trigger->row = true;
4357
4358         /* Either ON INSERT or ON UPDATE */
4359         if (on_insert)
4360         {
4361                 fk_trigger->funcname = SystemFuncName("RI_FKey_check_ins");
4362                 fk_trigger->actions[0] = 'i';
4363         }
4364         else
4365         {
4366                 fk_trigger->funcname = SystemFuncName("RI_FKey_check_upd");
4367                 fk_trigger->actions[0] = 'u';
4368         }
4369         fk_trigger->actions[1] = '\0';
4370
4371         fk_trigger->isconstraint = true;
4372         fk_trigger->deferrable = fkconstraint->deferrable;
4373         fk_trigger->initdeferred = fkconstraint->initdeferred;
4374         fk_trigger->constrrel = fkconstraint->pktable;
4375
4376         fk_trigger->args = NIL;
4377         fk_trigger->args = lappend(fk_trigger->args,
4378                                                            makeString(fkconstraint->constr_name));
4379         fk_trigger->args = lappend(fk_trigger->args,
4380                                                            makeString(myRel->relname));
4381         fk_trigger->args = lappend(fk_trigger->args,
4382                                                          makeString(fkconstraint->pktable->relname));
4383         fk_trigger->args = lappend(fk_trigger->args,
4384                         makeString(fkMatchTypeToString(fkconstraint->fk_matchtype)));
4385         if (list_length(fkconstraint->fk_attrs) != list_length(fkconstraint->pk_attrs))
4386                 ereport(ERROR,
4387                                 (errcode(ERRCODE_INVALID_FOREIGN_KEY),
4388                                  errmsg("number of referencing and referenced columns for foreign key disagree")));
4389
4390         forboth(fk_attr, fkconstraint->fk_attrs,
4391                         pk_attr, fkconstraint->pk_attrs)
4392         {
4393                 fk_trigger->args = lappend(fk_trigger->args, lfirst(fk_attr));
4394                 fk_trigger->args = lappend(fk_trigger->args, lfirst(pk_attr));
4395         }
4396
4397         trigobj->objectId = CreateTrigger(fk_trigger, true);
4398
4399         /* Register dependency from trigger to constraint */
4400         recordDependencyOn(trigobj, constrobj, DEPENDENCY_INTERNAL);
4401
4402         /* Make changes-so-far visible */
4403         CommandCounterIncrement();
4404 }
4405
4406 /*
4407  * Create the triggers that implement an FK constraint.
4408  */
4409 static void
4410 createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint,
4411                                                  Oid constrOid)
4412 {
4413         RangeVar   *myRel;
4414         CreateTrigStmt *fk_trigger;
4415         ListCell   *fk_attr;
4416         ListCell   *pk_attr;
4417         ObjectAddress trigobj,
4418                                 constrobj;
4419
4420         /*
4421          * Reconstruct a RangeVar for my relation (not passed in,
4422          * unfortunately).
4423          */
4424         myRel = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)),
4425                                                  pstrdup(RelationGetRelationName(rel)));
4426
4427         /*
4428          * Preset objectAddress fields
4429          */
4430         constrobj.classId = ConstraintRelationId;
4431         constrobj.objectId = constrOid;
4432         constrobj.objectSubId = 0;
4433         trigobj.classId = TriggerRelationId;
4434         trigobj.objectSubId = 0;
4435
4436         /* Make changes-so-far visible */
4437         CommandCounterIncrement();
4438
4439         /*
4440          * Build and execute a CREATE CONSTRAINT TRIGGER statement for the
4441          * CHECK action for both INSERTs and UPDATEs on the referencing table.
4442          */
4443         CreateFKCheckTrigger(myRel, fkconstraint, &constrobj, &trigobj, true);
4444         CreateFKCheckTrigger(myRel, fkconstraint, &constrobj, &trigobj, false);
4445
4446         /*
4447          * Build and execute a CREATE CONSTRAINT TRIGGER statement for the ON
4448          * DELETE action on the referenced table.
4449          */
4450         fk_trigger = makeNode(CreateTrigStmt);
4451         fk_trigger->trigname = fkconstraint->constr_name;
4452         fk_trigger->relation = fkconstraint->pktable;
4453         fk_trigger->before = false;
4454         fk_trigger->row = true;
4455         fk_trigger->actions[0] = 'd';
4456         fk_trigger->actions[1] = '\0';
4457
4458         fk_trigger->isconstraint = true;
4459         fk_trigger->constrrel = myRel;
4460         switch (fkconstraint->fk_del_action)
4461         {
4462                 case FKCONSTR_ACTION_NOACTION:
4463                         fk_trigger->deferrable = fkconstraint->deferrable;
4464                         fk_trigger->initdeferred = fkconstraint->initdeferred;
4465                         fk_trigger->funcname = SystemFuncName("RI_FKey_noaction_del");
4466                         break;
4467                 case FKCONSTR_ACTION_RESTRICT:
4468                         fk_trigger->deferrable = false;
4469                         fk_trigger->initdeferred = false;
4470                         fk_trigger->funcname = SystemFuncName("RI_FKey_restrict_del");
4471                         break;
4472                 case FKCONSTR_ACTION_CASCADE:
4473                         fk_trigger->deferrable = false;
4474                         fk_trigger->initdeferred = false;
4475                         fk_trigger->funcname = SystemFuncName("RI_FKey_cascade_del");
4476                         break;
4477                 case FKCONSTR_ACTION_SETNULL:
4478                         fk_trigger->deferrable = false;
4479                         fk_trigger->initdeferred = false;
4480                         fk_trigger->funcname = SystemFuncName("RI_FKey_setnull_del");
4481                         break;
4482                 case FKCONSTR_ACTION_SETDEFAULT:
4483                         fk_trigger->deferrable = false;
4484                         fk_trigger->initdeferred = false;
4485                         fk_trigger->funcname = SystemFuncName("RI_FKey_setdefault_del");
4486                         break;
4487                 default:
4488                         elog(ERROR, "unrecognized FK action type: %d",
4489                                  (int) fkconstraint->fk_del_action);
4490                         break;
4491         }
4492
4493         fk_trigger->args = NIL;
4494         fk_trigger->args = lappend(fk_trigger->args,
4495                                                            makeString(fkconstraint->constr_name));
4496         fk_trigger->args = lappend(fk_trigger->args,
4497                                                            makeString(myRel->relname));
4498         fk_trigger->args = lappend(fk_trigger->args,
4499                                                          makeString(fkconstraint->pktable->relname));
4500         fk_trigger->args = lappend(fk_trigger->args,
4501                         makeString(fkMatchTypeToString(fkconstraint->fk_matchtype)));
4502         forboth(fk_attr, fkconstraint->fk_attrs,
4503                         pk_attr, fkconstraint->pk_attrs)
4504         {
4505                 fk_trigger->args = lappend(fk_trigger->args, lfirst(fk_attr));
4506                 fk_trigger->args = lappend(fk_trigger->args, lfirst(pk_attr));
4507         }
4508
4509         trigobj.objectId = CreateTrigger(fk_trigger, true);
4510
4511         /* Register dependency from trigger to constraint */
4512         recordDependencyOn(&trigobj, &constrobj, DEPENDENCY_INTERNAL);
4513
4514         /* Make changes-so-far visible */
4515         CommandCounterIncrement();
4516
4517         /*
4518          * Build and execute a CREATE CONSTRAINT TRIGGER statement for the ON
4519          * UPDATE action on the referenced table.
4520          */
4521         fk_trigger = makeNode(CreateTrigStmt);
4522         fk_trigger->trigname = fkconstraint->constr_name;
4523         fk_trigger->relation = fkconstraint->pktable;
4524         fk_trigger->before = false;
4525         fk_trigger->row = true;
4526         fk_trigger->actions[0] = 'u';
4527         fk_trigger->actions[1] = '\0';
4528         fk_trigger->isconstraint = true;
4529         fk_trigger->constrrel = myRel;
4530         switch (fkconstraint->fk_upd_action)
4531         {
4532                 case FKCONSTR_ACTION_NOACTION:
4533                         fk_trigger->deferrable = fkconstraint->deferrable;
4534                         fk_trigger->initdeferred = fkconstraint->initdeferred;
4535                         fk_trigger->funcname = SystemFuncName("RI_FKey_noaction_upd");
4536                         break;
4537                 case FKCONSTR_ACTION_RESTRICT:
4538                         fk_trigger->deferrable = false;
4539                         fk_trigger->initdeferred = false;
4540                         fk_trigger->funcname = SystemFuncName("RI_FKey_restrict_upd");
4541                         break;
4542                 case FKCONSTR_ACTION_CASCADE:
4543                         fk_trigger->deferrable = false;
4544                         fk_trigger->initdeferred = false;
4545                         fk_trigger->funcname = SystemFuncName("RI_FKey_cascade_upd");
4546                         break;
4547                 case FKCONSTR_ACTION_SETNULL:
4548                         fk_trigger->deferrable = false;
4549                         fk_trigger->initdeferred = false;
4550                         fk_trigger->funcname = SystemFuncName("RI_FKey_setnull_upd");
4551                         break;
4552                 case FKCONSTR_ACTION_SETDEFAULT:
4553                         fk_trigger->deferrable = false;
4554                         fk_trigger->initdeferred = false;
4555                         fk_trigger->funcname = SystemFuncName("RI_FKey_setdefault_upd");
4556                         break;
4557                 default:
4558                         elog(ERROR, "unrecognized FK action type: %d",
4559                                  (int) fkconstraint->fk_upd_action);
4560                         break;
4561         }
4562
4563         fk_trigger->args = NIL;
4564         fk_trigger->args = lappend(fk_trigger->args,
4565                                                            makeString(fkconstraint->constr_name));
4566         fk_trigger->args = lappend(fk_trigger->args,
4567                                                            makeString(myRel->relname));
4568         fk_trigger->args = lappend(fk_trigger->args,
4569                                                          makeString(fkconstraint->pktable->relname));
4570         fk_trigger->args = lappend(fk_trigger->args,
4571                         makeString(fkMatchTypeToString(fkconstraint->fk_matchtype)));
4572         forboth(fk_attr, fkconstraint->fk_attrs,
4573                         pk_attr, fkconstraint->pk_attrs)
4574         {
4575                 fk_trigger->args = lappend(fk_trigger->args, lfirst(fk_attr));
4576                 fk_trigger->args = lappend(fk_trigger->args, lfirst(pk_attr));
4577         }
4578
4579         trigobj.objectId = CreateTrigger(fk_trigger, true);
4580
4581         /* Register dependency from trigger to constraint */
4582         recordDependencyOn(&trigobj, &constrobj, DEPENDENCY_INTERNAL);
4583 }
4584
4585 /*
4586  * fkMatchTypeToString -
4587  *        convert FKCONSTR_MATCH_xxx code to string to use in trigger args
4588  */
4589 static char *
4590 fkMatchTypeToString(char match_type)
4591 {
4592         switch (match_type)
4593         {
4594                 case FKCONSTR_MATCH_FULL:
4595                         return pstrdup("FULL");
4596                 case FKCONSTR_MATCH_PARTIAL:
4597                         return pstrdup("PARTIAL");
4598                 case FKCONSTR_MATCH_UNSPECIFIED:
4599                         return pstrdup("UNSPECIFIED");
4600                 default:
4601                         elog(ERROR, "unrecognized match type: %d",
4602                                  (int) match_type);
4603         }
4604         return NULL;                            /* can't get here */
4605 }
4606
4607 /*
4608  * ALTER TABLE DROP CONSTRAINT
4609  */
4610 static void
4611 ATPrepDropConstraint(List **wqueue, Relation rel,
4612                                          bool recurse, AlterTableCmd *cmd)
4613 {
4614         /*
4615          * We don't want errors or noise from child tables, so we have to pass
4616          * down a modified command.
4617          */
4618         if (recurse)
4619         {
4620                 AlterTableCmd *childCmd = copyObject(cmd);
4621
4622                 childCmd->subtype = AT_DropConstraintQuietly;
4623                 ATSimpleRecursion(wqueue, rel, childCmd, recurse);
4624         }
4625 }
4626
4627 static void
4628 ATExecDropConstraint(Relation rel, const char *constrName,
4629                                          DropBehavior behavior, bool quiet)
4630 {
4631         int                     deleted;
4632
4633         deleted = RemoveRelConstraints(rel, constrName, behavior);
4634
4635         if (!quiet)
4636         {
4637                 /* If zero constraints deleted, complain */
4638                 if (deleted == 0)
4639                         ereport(ERROR,
4640                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
4641                                          errmsg("constraint \"%s\" does not exist",
4642                                                         constrName)));
4643                 /* Otherwise if more than one constraint deleted, notify */
4644                 else if (deleted > 1)
4645                         ereport(NOTICE,
4646                                 (errmsg("multiple constraints named \"%s\" were dropped",
4647                                                 constrName)));
4648         }
4649 }
4650
4651 /*
4652  * ALTER COLUMN TYPE
4653  */
4654 static void
4655 ATPrepAlterColumnType(List **wqueue,
4656                                           AlteredTableInfo *tab, Relation rel,
4657                                           bool recurse, bool recursing,
4658                                           AlterTableCmd *cmd)
4659 {
4660         char       *colName = cmd->name;
4661         TypeName   *typename = (TypeName *) cmd->def;
4662         HeapTuple       tuple;
4663         Form_pg_attribute attTup;
4664         AttrNumber      attnum;
4665         Oid                     targettype;
4666         Node       *transform;
4667         NewColumnValue *newval;
4668         ParseState *pstate = make_parsestate(NULL);
4669
4670         /* lookup the attribute so we can check inheritance status */
4671         tuple = SearchSysCacheAttName(RelationGetRelid(rel), colName);
4672         if (!HeapTupleIsValid(tuple))
4673                 ereport(ERROR,
4674                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
4675                                  errmsg("column \"%s\" of relation \"%s\" does not exist",
4676                                                 colName, RelationGetRelationName(rel))));
4677         attTup = (Form_pg_attribute) GETSTRUCT(tuple);
4678         attnum = attTup->attnum;
4679
4680         /* Can't alter a system attribute */
4681         if (attnum <= 0)
4682                 ereport(ERROR,
4683                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
4684                                  errmsg("cannot alter system column \"%s\"",
4685                                                 colName)));
4686
4687         /* Don't alter inherited columns */
4688         if (attTup->attinhcount > 0 && !recursing)
4689                 ereport(ERROR,
4690                                 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
4691                                  errmsg("cannot alter inherited column \"%s\"",
4692                                                 colName)));
4693
4694         /* Look up the target type */
4695         targettype = LookupTypeName(typename);
4696         if (!OidIsValid(targettype))
4697                 ereport(ERROR,
4698                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
4699                                  errmsg("type \"%s\" does not exist",
4700                                                 TypeNameToString(typename))));
4701
4702         /* make sure datatype is legal for a column */
4703         CheckAttributeType(colName, targettype);
4704
4705         /*
4706          * Set up an expression to transform the old data value to the new
4707          * type. If a USING option was given, transform and use that
4708          * expression, else just take the old value and try to coerce it.  We
4709          * do this first so that type incompatibility can be detected before
4710          * we waste effort, and because we need the expression to be parsed
4711          * against the original table rowtype.
4712          */
4713         if (cmd->transform)
4714         {
4715                 RangeTblEntry *rte;
4716
4717                 /* Expression must be able to access vars of old table */
4718                 rte = addRangeTableEntryForRelation(pstate,
4719                                                                                         rel,
4720                                                                                         NULL,
4721                                                                                         false,
4722                                                                                         true);
4723                 addRTEtoQuery(pstate, rte, false, true, true);
4724
4725                 transform = transformExpr(pstate, cmd->transform);
4726
4727                 /* It can't return a set */
4728                 if (expression_returns_set(transform))
4729                         ereport(ERROR,
4730                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
4731                                   errmsg("transform expression must not return a set")));
4732
4733                 /* No subplans or aggregates, either... */
4734                 if (pstate->p_hasSubLinks)
4735                         ereport(ERROR,
4736                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
4737                                  errmsg("cannot use subquery in transform expression")));
4738                 if (pstate->p_hasAggs)
4739                         ereport(ERROR,
4740                                         (errcode(ERRCODE_GROUPING_ERROR),
4741                                          errmsg("cannot use aggregate function in transform expression")));
4742         }
4743         else
4744         {
4745                 transform = (Node *) makeVar(1, attnum,
4746                                                                          attTup->atttypid, attTup->atttypmod,
4747                                                                          0);
4748         }
4749
4750         transform = coerce_to_target_type(pstate,
4751                                                                           transform, exprType(transform),
4752                                                                           targettype, typename->typmod,
4753                                                                           COERCION_ASSIGNMENT,
4754                                                                           COERCE_IMPLICIT_CAST);
4755         if (transform == NULL)
4756                 ereport(ERROR,
4757                                 (errcode(ERRCODE_DATATYPE_MISMATCH),
4758                                  errmsg("column \"%s\" cannot be cast to type \"%s\"",
4759                                                 colName, TypeNameToString(typename))));
4760
4761         /*
4762          * Add a work queue item to make ATRewriteTable update the column
4763          * contents.
4764          */
4765         newval = (NewColumnValue *) palloc0(sizeof(NewColumnValue));
4766         newval->attnum = attnum;
4767         newval->expr = (Expr *) transform;
4768
4769         tab->newvals = lappend(tab->newvals, newval);
4770
4771         ReleaseSysCache(tuple);
4772
4773         /*
4774          * The recursion case is handled by ATSimpleRecursion.  However, if we
4775          * are told not to recurse, there had better not be any child tables;
4776          * else the alter would put them out of step.
4777          */
4778         if (recurse)
4779                 ATSimpleRecursion(wqueue, rel, cmd, recurse);
4780         else if (!recursing &&
4781                          find_inheritance_children(RelationGetRelid(rel)) != NIL)
4782                 ereport(ERROR,
4783                                 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
4784                                  errmsg("type of inherited column \"%s\" must be changed in child tables too",
4785                                                 colName)));
4786 }
4787
4788 static void
4789 ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
4790                                           const char *colName, TypeName *typename)
4791 {
4792         HeapTuple       heapTup;
4793         Form_pg_attribute attTup;
4794         AttrNumber      attnum;
4795         HeapTuple       typeTuple;
4796         Form_pg_type tform;
4797         Oid                     targettype;
4798         Node       *defaultexpr;
4799         Relation        attrelation;
4800         Relation        depRel;
4801         ScanKeyData key[3];
4802         SysScanDesc scan;
4803         HeapTuple       depTup;
4804
4805         attrelation = heap_open(AttributeRelationId, RowExclusiveLock);
4806
4807         /* Look up the target column */
4808         heapTup = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
4809         if (!HeapTupleIsValid(heapTup))         /* shouldn't happen */
4810                 ereport(ERROR,
4811                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
4812                                  errmsg("column \"%s\" of relation \"%s\" does not exist",
4813                                                 colName, RelationGetRelationName(rel))));
4814         attTup = (Form_pg_attribute) GETSTRUCT(heapTup);
4815         attnum = attTup->attnum;
4816
4817         /* Check for multiple ALTER TYPE on same column --- can't cope */
4818         if (attTup->atttypid != tab->oldDesc->attrs[attnum - 1]->atttypid ||
4819                 attTup->atttypmod != tab->oldDesc->attrs[attnum - 1]->atttypmod)
4820                 ereport(ERROR,
4821                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
4822                                  errmsg("cannot alter type of column \"%s\" twice",
4823                                                 colName)));
4824
4825         /* Look up the target type (should not fail, since prep found it) */
4826         typeTuple = typenameType(typename);
4827         tform = (Form_pg_type) GETSTRUCT(typeTuple);
4828         targettype = HeapTupleGetOid(typeTuple);
4829
4830         /*
4831          * If there is a default expression for the column, get it and ensure
4832          * we can coerce it to the new datatype.  (We must do this before
4833          * changing the column type, because build_column_default itself will
4834          * try to coerce, and will not issue the error message we want if it
4835          * fails.)
4836          *
4837          * We remove any implicit coercion steps at the top level of the old
4838          * default expression; this has been agreed to satisfy the principle
4839          * of least surprise.  (The conversion to the new column type should
4840          * act like it started from what the user sees as the stored expression,
4841          * and the implicit coercions aren't going to be shown.)
4842          */
4843         if (attTup->atthasdef)
4844         {
4845                 defaultexpr = build_column_default(rel, attnum);
4846                 Assert(defaultexpr);
4847                 defaultexpr = strip_implicit_coercions(defaultexpr);
4848                 defaultexpr = coerce_to_target_type(NULL,               /* no UNKNOWN params */
4849                                                                                         defaultexpr, exprType(defaultexpr),
4850                                                                                         targettype, typename->typmod,
4851                                                                                         COERCION_ASSIGNMENT,
4852                                                                                         COERCE_IMPLICIT_CAST);
4853                 if (defaultexpr == NULL)
4854                         ereport(ERROR,
4855                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
4856                                          errmsg("default for column \"%s\" cannot be cast to type \"%s\"",
4857                                                         colName, TypeNameToString(typename))));
4858         }
4859         else
4860                 defaultexpr = NULL;
4861
4862         /*
4863          * Find everything that depends on the column (constraints, indexes,
4864          * etc), and record enough information to let us recreate the objects.
4865          *
4866          * The actual recreation does not happen here, but only after we have
4867          * performed all the individual ALTER TYPE operations.  We have to
4868          * save the info before executing ALTER TYPE, though, else the
4869          * deparser will get confused.
4870          *
4871          * There could be multiple entries for the same object, so we must check
4872          * to ensure we process each one only once.  Note: we assume that an
4873          * index that implements a constraint will not show a direct
4874          * dependency on the column.
4875          */
4876         depRel = heap_open(DependRelationId, RowExclusiveLock);
4877
4878         ScanKeyInit(&key[0],
4879                                 Anum_pg_depend_refclassid,
4880                                 BTEqualStrategyNumber, F_OIDEQ,
4881                                 ObjectIdGetDatum(RelationRelationId));
4882         ScanKeyInit(&key[1],
4883                                 Anum_pg_depend_refobjid,
4884                                 BTEqualStrategyNumber, F_OIDEQ,
4885                                 ObjectIdGetDatum(RelationGetRelid(rel)));
4886         ScanKeyInit(&key[2],
4887                                 Anum_pg_depend_refobjsubid,
4888                                 BTEqualStrategyNumber, F_INT4EQ,
4889                                 Int32GetDatum((int32) attnum));
4890
4891         scan = systable_beginscan(depRel, DependReferenceIndexId, true,
4892                                                           SnapshotNow, 3, key);
4893
4894         while (HeapTupleIsValid(depTup = systable_getnext(scan)))
4895         {
4896                 Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(depTup);
4897                 ObjectAddress foundObject;
4898
4899                 /* We don't expect any PIN dependencies on columns */
4900                 if (foundDep->deptype == DEPENDENCY_PIN)
4901                         elog(ERROR, "cannot alter type of a pinned column");
4902
4903                 foundObject.classId = foundDep->classid;
4904                 foundObject.objectId = foundDep->objid;
4905                 foundObject.objectSubId = foundDep->objsubid;
4906
4907                 switch (getObjectClass(&foundObject))
4908                 {
4909                         case OCLASS_CLASS:
4910                                 {
4911                                         char            relKind = get_rel_relkind(foundObject.objectId);
4912
4913                                         if (relKind == RELKIND_INDEX)
4914                                         {
4915                                                 Assert(foundObject.objectSubId == 0);
4916                                                 if (!list_member_oid(tab->changedIndexOids, foundObject.objectId))
4917                                                 {
4918                                                         tab->changedIndexOids = lappend_oid(tab->changedIndexOids,
4919                                                                                                    foundObject.objectId);
4920                                                         tab->changedIndexDefs = lappend(tab->changedIndexDefs,
4921                                                         pg_get_indexdef_string(foundObject.objectId));
4922                                                 }
4923                                         }
4924                                         else if (relKind == RELKIND_SEQUENCE)
4925                                         {
4926                                                 /*
4927                                                  * This must be a SERIAL column's sequence.  We
4928                                                  * need not do anything to it.
4929                                                  */
4930                                                 Assert(foundObject.objectSubId == 0);
4931                                         }
4932                                         else
4933                                         {
4934                                                 /* Not expecting any other direct dependencies... */
4935                                                 elog(ERROR, "unexpected object depending on column: %s",
4936                                                          getObjectDescription(&foundObject));
4937                                         }
4938                                         break;
4939                                 }
4940
4941                         case OCLASS_CONSTRAINT:
4942                                 Assert(foundObject.objectSubId == 0);
4943                                 if (!list_member_oid(tab->changedConstraintOids, foundObject.objectId))
4944                                 {
4945                                         tab->changedConstraintOids = lappend_oid(tab->changedConstraintOids,
4946                                                                                                    foundObject.objectId);
4947                                         tab->changedConstraintDefs = lappend(tab->changedConstraintDefs,
4948                                           pg_get_constraintdef_string(foundObject.objectId));
4949                                 }
4950                                 break;
4951
4952                         case OCLASS_REWRITE:
4953                                 /* XXX someday see if we can cope with revising views */
4954                                 ereport(ERROR,
4955                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
4956                                                  errmsg("cannot alter type of a column used by a view or rule"),
4957                                                  errdetail("%s depends on column \"%s\"",
4958                                                                    getObjectDescription(&foundObject),
4959                                                                    colName)));
4960                                 break;
4961
4962                         case OCLASS_DEFAULT:
4963
4964                                 /*
4965                                  * Ignore the column's default expression, since we will
4966                                  * fix it below.
4967                                  */
4968                                 Assert(defaultexpr);
4969                                 break;
4970
4971                         case OCLASS_PROC:
4972                         case OCLASS_TYPE:
4973                         case OCLASS_CAST:
4974                         case OCLASS_CONVERSION:
4975                         case OCLASS_LANGUAGE:
4976                         case OCLASS_OPERATOR:
4977                         case OCLASS_OPCLASS:
4978                         case OCLASS_TRIGGER:
4979                         case OCLASS_SCHEMA:
4980
4981                                 /*
4982                                  * We don't expect any of these sorts of objects to depend
4983                                  * on a column.
4984                                  */
4985                                 elog(ERROR, "unexpected object depending on column: %s",
4986                                          getObjectDescription(&foundObject));
4987                                 break;
4988
4989                         default:
4990                                 elog(ERROR, "unrecognized object class: %u",
4991                                          foundObject.classId);
4992                 }
4993         }
4994
4995         systable_endscan(scan);
4996
4997         /*
4998          * Now scan for dependencies of this column on other things.  The only
4999          * thing we should find is the dependency on the column datatype,
5000          * which we want to remove.
5001          */
5002         ScanKeyInit(&key[0],
5003                                 Anum_pg_depend_classid,
5004                                 BTEqualStrategyNumber, F_OIDEQ,
5005                                 ObjectIdGetDatum(RelationRelationId));
5006         ScanKeyInit(&key[1],
5007                                 Anum_pg_depend_objid,
5008                                 BTEqualStrategyNumber, F_OIDEQ,
5009                                 ObjectIdGetDatum(RelationGetRelid(rel)));
5010         ScanKeyInit(&key[2],
5011                                 Anum_pg_depend_objsubid,
5012                                 BTEqualStrategyNumber, F_INT4EQ,
5013                                 Int32GetDatum((int32) attnum));
5014
5015         scan = systable_beginscan(depRel, DependDependerIndexId, true,
5016                                                           SnapshotNow, 3, key);
5017
5018         while (HeapTupleIsValid(depTup = systable_getnext(scan)))
5019         {
5020                 Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(depTup);
5021
5022                 if (foundDep->deptype != DEPENDENCY_NORMAL)
5023                         elog(ERROR, "found unexpected dependency type '%c'",
5024                                  foundDep->deptype);
5025                 if (foundDep->refclassid != TypeRelationId ||
5026                         foundDep->refobjid != attTup->atttypid)
5027                         elog(ERROR, "found unexpected dependency for column");
5028
5029                 simple_heap_delete(depRel, &depTup->t_self);
5030         }
5031
5032         systable_endscan(scan);
5033
5034         heap_close(depRel, RowExclusiveLock);
5035
5036         /*
5037          * Here we go --- change the recorded column type.      (Note heapTup is a
5038          * copy of the syscache entry, so okay to scribble on.)
5039          */
5040         attTup->atttypid = targettype;
5041         attTup->atttypmod = typename->typmod;
5042         attTup->attndims = list_length(typename->arrayBounds);
5043         attTup->attlen = tform->typlen;
5044         attTup->attbyval = tform->typbyval;
5045         attTup->attalign = tform->typalign;
5046         attTup->attstorage = tform->typstorage;
5047
5048         ReleaseSysCache(typeTuple);
5049
5050         simple_heap_update(attrelation, &heapTup->t_self, heapTup);
5051
5052         /* keep system catalog indexes current */
5053         CatalogUpdateIndexes(attrelation, heapTup);
5054
5055         heap_close(attrelation, RowExclusiveLock);
5056
5057         /* Install dependency on new datatype */
5058         add_column_datatype_dependency(RelationGetRelid(rel), attnum, targettype);
5059
5060         /*
5061          * Drop any pg_statistic entry for the column, since it's now wrong
5062          * type
5063          */
5064         RemoveStatistics(RelationGetRelid(rel), attnum);
5065
5066         /*
5067          * Update the default, if present, by brute force --- remove and
5068          * re-add the default.  Probably unsafe to take shortcuts, since the
5069          * new version may well have additional dependencies.  (It's okay to
5070          * do this now, rather than after other ALTER TYPE commands, since the
5071          * default won't depend on other column types.)
5072          */
5073         if (defaultexpr)
5074         {
5075                 /* Must make new row visible since it will be updated again */
5076                 CommandCounterIncrement();
5077
5078                 /*
5079                  * We use RESTRICT here for safety, but at present we do not
5080                  * expect anything to depend on the default.
5081                  */
5082                 RemoveAttrDefault(RelationGetRelid(rel), attnum, DROP_RESTRICT, true);
5083
5084                 StoreAttrDefault(rel, attnum, nodeToString(defaultexpr));
5085         }
5086
5087         /* Cleanup */
5088         heap_freetuple(heapTup);
5089 }
5090
5091 /*
5092  * Cleanup after we've finished all the ALTER TYPE operations for a
5093  * particular relation.  We have to drop and recreate all the indexes
5094  * and constraints that depend on the altered columns.
5095  */
5096 static void
5097 ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab)
5098 {
5099         ObjectAddress obj;
5100         ListCell   *l;
5101
5102         /*
5103          * Re-parse the index and constraint definitions, and attach them to
5104          * the appropriate work queue entries.  We do this before dropping
5105          * because in the case of a FOREIGN KEY constraint, we might not yet
5106          * have exclusive lock on the table the constraint is attached to, and
5107          * we need to get that before dropping.  It's safe because the parser
5108          * won't actually look at the catalogs to detect the existing entry.
5109          */
5110         foreach(l, tab->changedIndexDefs)
5111                 ATPostAlterTypeParse((char *) lfirst(l), wqueue);
5112         foreach(l, tab->changedConstraintDefs)
5113                 ATPostAlterTypeParse((char *) lfirst(l), wqueue);
5114
5115         /*
5116          * Now we can drop the existing constraints and indexes ---
5117          * constraints first, since some of them might depend on the indexes.
5118          * It should be okay to use DROP_RESTRICT here, since nothing else
5119          * should be depending on these objects.
5120          */
5121         foreach(l, tab->changedConstraintOids)
5122         {
5123                 obj.classId = ConstraintRelationId;
5124                 obj.objectId = lfirst_oid(l);
5125                 obj.objectSubId = 0;
5126                 performDeletion(&obj, DROP_RESTRICT);
5127         }
5128
5129         foreach(l, tab->changedIndexOids)
5130         {
5131                 obj.classId = RelationRelationId;
5132                 obj.objectId = lfirst_oid(l);
5133                 obj.objectSubId = 0;
5134                 performDeletion(&obj, DROP_RESTRICT);
5135         }
5136
5137         /*
5138          * The objects will get recreated during subsequent passes over the
5139          * work queue.
5140          */
5141 }
5142
5143 static void
5144 ATPostAlterTypeParse(char *cmd, List **wqueue)
5145 {
5146         List       *raw_parsetree_list;
5147         List       *querytree_list;
5148         ListCell   *list_item;
5149
5150         /*
5151          * We expect that we only have to do raw parsing and parse analysis,
5152          * not any rule rewriting, since these will all be utility statements.
5153          */
5154         raw_parsetree_list = raw_parser(cmd);
5155         querytree_list = NIL;
5156         foreach(list_item, raw_parsetree_list)
5157         {
5158                 Node       *parsetree = (Node *) lfirst(list_item);
5159
5160                 querytree_list = list_concat(querytree_list,
5161                                                                          parse_analyze(parsetree, NULL, 0));
5162         }
5163
5164         /*
5165          * Attach each generated command to the proper place in the work
5166          * queue. Note this could result in creation of entirely new
5167          * work-queue entries.
5168          */
5169         foreach(list_item, querytree_list)
5170         {
5171                 Query      *query = (Query *) lfirst(list_item);
5172                 Relation        rel;
5173                 AlteredTableInfo *tab;
5174
5175                 Assert(IsA(query, Query));
5176                 Assert(query->commandType == CMD_UTILITY);
5177                 switch (nodeTag(query->utilityStmt))
5178                 {
5179                         case T_IndexStmt:
5180                                 {
5181                                         IndexStmt  *stmt = (IndexStmt *) query->utilityStmt;
5182                                         AlterTableCmd *newcmd;
5183
5184                                         rel = relation_openrv(stmt->relation, AccessExclusiveLock);
5185                                         tab = ATGetQueueEntry(wqueue, rel);
5186                                         newcmd = makeNode(AlterTableCmd);
5187                                         newcmd->subtype = AT_ReAddIndex;
5188                                         newcmd->def = (Node *) stmt;
5189                                         tab->subcmds[AT_PASS_OLD_INDEX] =
5190                                                 lappend(tab->subcmds[AT_PASS_OLD_INDEX], newcmd);
5191                                         relation_close(rel, NoLock);
5192                                         break;
5193                                 }
5194                         case T_AlterTableStmt:
5195                                 {
5196                                         AlterTableStmt *stmt = (AlterTableStmt *) query->utilityStmt;
5197                                         ListCell   *lcmd;
5198
5199                                         rel = relation_openrv(stmt->relation, AccessExclusiveLock);
5200                                         tab = ATGetQueueEntry(wqueue, rel);
5201                                         foreach(lcmd, stmt->cmds)
5202                                         {
5203                                                 AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lcmd);
5204
5205                                                 switch (cmd->subtype)
5206                                                 {
5207                                                         case AT_AddIndex:
5208                                                                 cmd->subtype = AT_ReAddIndex;
5209                                                                 tab->subcmds[AT_PASS_OLD_INDEX] =
5210                                                                         lappend(tab->subcmds[AT_PASS_OLD_INDEX], cmd);
5211                                                                 break;
5212                                                         case AT_AddConstraint:
5213                                                                 tab->subcmds[AT_PASS_OLD_CONSTR] =
5214                                                                         lappend(tab->subcmds[AT_PASS_OLD_CONSTR], cmd);
5215                                                                 break;
5216                                                         default:
5217                                                                 elog(ERROR, "unexpected statement type: %d",
5218                                                                          (int) cmd->subtype);
5219                                                 }
5220                                         }
5221                                         relation_close(rel, NoLock);
5222                                         break;
5223                                 }
5224                         default:
5225                                 elog(ERROR, "unexpected statement type: %d",
5226                                          (int) nodeTag(query->utilityStmt));
5227                 }
5228         }
5229 }
5230
5231
5232 /*
5233  * ALTER TABLE OWNER
5234  */
5235 static void
5236 ATExecChangeOwner(Oid relationOid, Oid newOwnerId)
5237 {
5238         Relation        target_rel;
5239         Relation        class_rel;
5240         HeapTuple       tuple;
5241         Form_pg_class tuple_class;
5242
5243         /*
5244          * Get exclusive lock till end of transaction on the target table.
5245          * Use relation_open so that we can work on indexes and sequences.
5246          */
5247         target_rel = relation_open(relationOid, AccessExclusiveLock);
5248
5249         /* Get its pg_class tuple, too */
5250         class_rel = heap_open(RelationRelationId, RowExclusiveLock);
5251
5252         tuple = SearchSysCache(RELOID,
5253                                                    ObjectIdGetDatum(relationOid),
5254                                                    0, 0, 0);
5255         if (!HeapTupleIsValid(tuple))
5256                 elog(ERROR, "cache lookup failed for relation %u", relationOid);
5257         tuple_class = (Form_pg_class) GETSTRUCT(tuple);
5258
5259         /* Can we change the ownership of this tuple? */
5260         switch (tuple_class->relkind)
5261         {
5262                 case RELKIND_RELATION:
5263                 case RELKIND_INDEX:
5264                 case RELKIND_VIEW:
5265                 case RELKIND_SEQUENCE:
5266                 case RELKIND_TOASTVALUE:
5267                         /* ok to change owner */
5268                         break;
5269                 default:
5270                         ereport(ERROR,
5271                                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
5272                                          errmsg("\"%s\" is not a table, TOAST table, index, view, or sequence",
5273                                                         NameStr(tuple_class->relname))));
5274         }
5275
5276         /*
5277          * If the new owner is the same as the existing owner, consider the
5278          * command to have succeeded.  This is for dump restoration purposes.
5279          */
5280         if (tuple_class->relowner != newOwnerId)
5281         {
5282                 Datum           repl_val[Natts_pg_class];
5283                 char            repl_null[Natts_pg_class];
5284                 char            repl_repl[Natts_pg_class];
5285                 Acl                *newAcl;
5286                 Datum           aclDatum;
5287                 bool            isNull;
5288                 HeapTuple       newtuple;
5289
5290                 /* Otherwise, check that we are the superuser */
5291                 if (!superuser())
5292                         ereport(ERROR,
5293                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
5294                                          errmsg("must be superuser to change owner")));
5295
5296                 memset(repl_null, ' ', sizeof(repl_null));
5297                 memset(repl_repl, ' ', sizeof(repl_repl));
5298
5299                 repl_repl[Anum_pg_class_relowner - 1] = 'r';
5300                 repl_val[Anum_pg_class_relowner - 1] = ObjectIdGetDatum(newOwnerId);
5301
5302                 /*
5303                  * Determine the modified ACL for the new owner.  This is only
5304                  * necessary when the ACL is non-null.
5305                  */
5306                 aclDatum = SysCacheGetAttr(RELOID, tuple,
5307                                                                    Anum_pg_class_relacl,
5308                                                                    &isNull);
5309                 if (!isNull)
5310                 {
5311                         newAcl = aclnewowner(DatumGetAclP(aclDatum),
5312                                                                  tuple_class->relowner, newOwnerId);
5313                         repl_repl[Anum_pg_class_relacl - 1] = 'r';
5314                         repl_val[Anum_pg_class_relacl - 1] = PointerGetDatum(newAcl);
5315                 }
5316
5317                 newtuple = heap_modifytuple(tuple, RelationGetDescr(class_rel), repl_val, repl_null, repl_repl);
5318
5319                 simple_heap_update(class_rel, &newtuple->t_self, newtuple);
5320                 CatalogUpdateIndexes(class_rel, newtuple);
5321
5322                 heap_freetuple(newtuple);
5323
5324                 /*
5325                  * If we are operating on a table, also change the ownership of
5326                  * any indexes and sequences that belong to the table, as well as
5327                  * the table's toast table (if it has one)
5328                  */
5329                 if (tuple_class->relkind == RELKIND_RELATION ||
5330                         tuple_class->relkind == RELKIND_TOASTVALUE)
5331                 {
5332                         List       *index_oid_list;
5333                         ListCell   *i;
5334
5335                         /* Find all the indexes belonging to this relation */
5336                         index_oid_list = RelationGetIndexList(target_rel);
5337
5338                         /* For each index, recursively change its ownership */
5339                         foreach(i, index_oid_list)
5340                                 ATExecChangeOwner(lfirst_oid(i), newOwnerId);
5341
5342                         list_free(index_oid_list);
5343                 }
5344
5345                 if (tuple_class->relkind == RELKIND_RELATION)
5346                 {
5347                         /* If it has a toast table, recurse to change its ownership */
5348                         if (tuple_class->reltoastrelid != InvalidOid)
5349                                 ATExecChangeOwner(tuple_class->reltoastrelid, newOwnerId);
5350
5351                         /* If it has dependent sequences, recurse to change them too */
5352                         change_owner_recurse_to_sequences(relationOid, newOwnerId);
5353                 }
5354         }
5355
5356         ReleaseSysCache(tuple);
5357         heap_close(class_rel, RowExclusiveLock);
5358         relation_close(target_rel, NoLock);
5359 }
5360
5361 /*
5362  * change_owner_recurse_to_sequences
5363  *
5364  * Helper function for ATExecChangeOwner.  Examines pg_depend searching
5365  * for sequences that are dependent on serial columns, and changes their
5366  * ownership.
5367  */
5368 static void
5369 change_owner_recurse_to_sequences(Oid relationOid, Oid newOwnerId)
5370 {
5371         Relation        depRel;
5372         SysScanDesc scan;
5373         ScanKeyData     key[2];
5374         HeapTuple       tup;
5375
5376         /*
5377          * SERIAL sequences are those having an internal dependency on one
5378          * of the table's columns (we don't care *which* column, exactly).
5379          */
5380         depRel = heap_open(DependRelationId, AccessShareLock);
5381
5382         ScanKeyInit(&key[0],
5383                         Anum_pg_depend_refclassid,
5384                         BTEqualStrategyNumber, F_OIDEQ,
5385                         ObjectIdGetDatum(RelationRelationId));
5386         ScanKeyInit(&key[1],
5387                         Anum_pg_depend_refobjid,
5388                         BTEqualStrategyNumber, F_OIDEQ,
5389                         ObjectIdGetDatum(relationOid));
5390         /* we leave refobjsubid unspecified */
5391
5392         scan = systable_beginscan(depRel, DependReferenceIndexId, true,
5393                                                           SnapshotNow, 2, key);
5394
5395         while (HeapTupleIsValid(tup = systable_getnext(scan)))
5396         {
5397                 Form_pg_depend depForm = (Form_pg_depend) GETSTRUCT(tup);
5398                 Relation        seqRel;
5399
5400                 /* skip dependencies other than internal dependencies on columns */
5401                 if (depForm->refobjsubid == 0 ||
5402                         depForm->classid != RelationRelationId ||
5403                         depForm->objsubid != 0 ||
5404                         depForm->deptype != DEPENDENCY_INTERNAL)
5405                         continue;
5406
5407                 /* Use relation_open just in case it's an index */
5408                 seqRel = relation_open(depForm->objid, AccessExclusiveLock);
5409
5410                 /* skip non-sequence relations */
5411                 if (RelationGetForm(seqRel)->relkind != RELKIND_SEQUENCE)
5412                 {
5413                         /* No need to keep the lock */
5414                         relation_close(seqRel, AccessExclusiveLock);
5415                         continue;
5416                 }
5417
5418                 /* We don't need to close the sequence while we alter it. */
5419                 ATExecChangeOwner(depForm->objid, newOwnerId);
5420
5421                 /* Now we can close it.  Keep the lock till end of transaction. */
5422                 relation_close(seqRel, NoLock);
5423         }
5424
5425         systable_endscan(scan);
5426
5427         relation_close(depRel, AccessShareLock);
5428 }
5429
5430 /*
5431  * ALTER TABLE CLUSTER ON
5432  *
5433  * The only thing we have to do is to change the indisclustered bits.
5434  */
5435 static void
5436 ATExecClusterOn(Relation rel, const char *indexName)
5437 {
5438         Oid                     indexOid;
5439
5440         indexOid = get_relname_relid(indexName, rel->rd_rel->relnamespace);
5441
5442         if (!OidIsValid(indexOid))
5443                 ereport(ERROR,
5444                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
5445                                  errmsg("index \"%s\" for table \"%s\" does not exist",
5446                                                 indexName, RelationGetRelationName(rel))));
5447
5448         /* Check index is valid to cluster on */
5449         check_index_is_clusterable(rel, indexOid, false);
5450
5451         /* And do the work */
5452         mark_index_clustered(rel, indexOid);
5453 }
5454
5455 /*
5456  * ALTER TABLE SET WITHOUT CLUSTER
5457  *
5458  * We have to find any indexes on the table that have indisclustered bit
5459  * set and turn it off.
5460  */
5461 static void
5462 ATExecDropCluster(Relation rel)
5463 {
5464         mark_index_clustered(rel, InvalidOid);
5465 }
5466
5467 /*
5468  * ALTER TABLE SET TABLESPACE
5469  */
5470 static void
5471 ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel, char *tablespacename)
5472 {
5473         Oid                     tablespaceId;
5474         AclResult       aclresult;
5475
5476         /*
5477          * We do our own permission checking because we want to allow this on
5478          * indexes.
5479          */
5480         if (rel->rd_rel->relkind != RELKIND_RELATION &&
5481                 rel->rd_rel->relkind != RELKIND_INDEX)
5482                 ereport(ERROR,
5483                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
5484                                  errmsg("\"%s\" is not a table or index",
5485                                                 RelationGetRelationName(rel))));
5486
5487         /* Permissions checks */
5488         if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
5489                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
5490                                            RelationGetRelationName(rel));
5491
5492         if (!allowSystemTableMods && IsSystemRelation(rel))
5493                 ereport(ERROR,
5494                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
5495                                  errmsg("permission denied: \"%s\" is a system catalog",
5496                                                 RelationGetRelationName(rel))));
5497
5498         /* Check that the tablespace exists */
5499         tablespaceId = get_tablespace_oid(tablespacename);
5500         if (!OidIsValid(tablespaceId))
5501                 ereport(ERROR,
5502                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
5503                         errmsg("tablespace \"%s\" does not exist", tablespacename)));
5504
5505         /* Check its permissions */
5506         aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(), ACL_CREATE);
5507         if (aclresult != ACLCHECK_OK)
5508                 aclcheck_error(aclresult, ACL_KIND_TABLESPACE, tablespacename);
5509
5510         /* Save info for Phase 3 to do the real work */
5511         if (OidIsValid(tab->newTableSpace))
5512                 ereport(ERROR,
5513                                 (errcode(ERRCODE_SYNTAX_ERROR),
5514                    errmsg("cannot have multiple SET TABLESPACE subcommands")));
5515         tab->newTableSpace = tablespaceId;
5516 }
5517
5518 /*
5519  * Execute ALTER TABLE SET TABLESPACE for cases where there is no tuple
5520  * rewriting to be done, so we just want to copy the data as fast as possible.
5521  */
5522 static void
5523 ATExecSetTableSpace(Oid tableOid, Oid newTableSpace)
5524 {
5525         Relation        rel;
5526         Oid                     oldTableSpace;
5527         Oid                     reltoastrelid;
5528         Oid                     reltoastidxid;
5529         RelFileNode newrnode;
5530         SMgrRelation dstrel;
5531         Relation        pg_class;
5532         HeapTuple       tuple;
5533         Form_pg_class rd_rel;
5534
5535         rel = relation_open(tableOid, NoLock);
5536
5537         /*
5538          * We can never allow moving of shared or nailed-in-cache relations,
5539          * because we can't support changing their reltablespace values.
5540          */
5541         if (rel->rd_rel->relisshared || rel->rd_isnailed)
5542                 ereport(ERROR,
5543                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
5544                                  errmsg("cannot move system relation \"%s\"",
5545                                                 RelationGetRelationName(rel))));
5546
5547         /*
5548          * Don't allow moving temp tables of other backends ... their local
5549          * buffer manager is not going to cope.
5550          */
5551         if (isOtherTempNamespace(RelationGetNamespace(rel)))
5552                 ereport(ERROR,
5553                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
5554                           errmsg("cannot move temporary tables of other sessions")));
5555
5556         /*
5557          * No work if no change in tablespace.
5558          */
5559         oldTableSpace = rel->rd_rel->reltablespace;
5560         if (newTableSpace == oldTableSpace ||
5561                 (newTableSpace == MyDatabaseTableSpace && oldTableSpace == 0))
5562         {
5563                 relation_close(rel, NoLock);
5564                 return;
5565         }
5566
5567         reltoastrelid = rel->rd_rel->reltoastrelid;
5568         reltoastidxid = rel->rd_rel->reltoastidxid;
5569
5570         /* Get a modifiable copy of the relation's pg_class row */
5571         pg_class = heap_open(RelationRelationId, RowExclusiveLock);
5572
5573         tuple = SearchSysCacheCopy(RELOID,
5574                                                            ObjectIdGetDatum(tableOid),
5575                                                            0, 0, 0);
5576         if (!HeapTupleIsValid(tuple))
5577                 elog(ERROR, "cache lookup failed for relation %u", tableOid);
5578         rd_rel = (Form_pg_class) GETSTRUCT(tuple);
5579
5580         /* create another storage file. Is it a little ugly ? */
5581         /* NOTE: any conflict in relfilenode value will be caught here */
5582         newrnode = rel->rd_node;
5583         newrnode.spcNode = newTableSpace;
5584
5585         dstrel = smgropen(newrnode);
5586         smgrcreate(dstrel, rel->rd_istemp, false);
5587
5588         /* copy relation data to the new physical file */
5589         copy_relation_data(rel, dstrel);
5590
5591         /* schedule unlinking old physical file */
5592         RelationOpenSmgr(rel);
5593         smgrscheduleunlink(rel->rd_smgr, rel->rd_istemp);
5594
5595         /*
5596          * Now drop smgr references.  The source was already dropped by
5597          * smgrscheduleunlink.
5598          */
5599         smgrclose(dstrel);
5600
5601         /* update the pg_class row */
5602         rd_rel->reltablespace = (newTableSpace == MyDatabaseTableSpace) ? InvalidOid : newTableSpace;
5603         simple_heap_update(pg_class, &tuple->t_self, tuple);
5604         CatalogUpdateIndexes(pg_class, tuple);
5605
5606         heap_freetuple(tuple);
5607
5608         heap_close(pg_class, RowExclusiveLock);
5609
5610         relation_close(rel, NoLock);
5611
5612         /* Make sure the reltablespace change is visible */
5613         CommandCounterIncrement();
5614
5615         /* Move associated toast relation and/or index, too */
5616         if (OidIsValid(reltoastrelid))
5617                 ATExecSetTableSpace(reltoastrelid, newTableSpace);
5618         if (OidIsValid(reltoastidxid))
5619                 ATExecSetTableSpace(reltoastidxid, newTableSpace);
5620 }
5621
5622 /*
5623  * Copy data, block by block
5624  */
5625 static void
5626 copy_relation_data(Relation rel, SMgrRelation dst)
5627 {
5628         SMgrRelation src;
5629         bool            use_wal;
5630         BlockNumber nblocks;
5631         BlockNumber blkno;
5632         char            buf[BLCKSZ];
5633         Page            page = (Page) buf;
5634
5635         /*
5636          * Since we copy the file directly without looking at the shared
5637          * buffers, we'd better first flush out any pages of the source
5638          * relation that are in shared buffers.  We assume no new changes
5639          * will be made while we are holding exclusive lock on the rel.
5640          */
5641         FlushRelationBuffers(rel);
5642
5643         /*
5644          * We need to log the copied data in WAL iff WAL archiving is enabled
5645          * AND it's not a temp rel.
5646          */
5647         use_wal = XLogArchivingActive() && !rel->rd_istemp;
5648
5649         nblocks = RelationGetNumberOfBlocks(rel);
5650         /* RelationGetNumberOfBlocks will certainly have opened rd_smgr */
5651         src = rel->rd_smgr;
5652
5653         for (blkno = 0; blkno < nblocks; blkno++)
5654         {
5655                 smgrread(src, blkno, buf);
5656
5657                 /* XLOG stuff */
5658                 if (use_wal)
5659                 {
5660                         xl_heap_newpage xlrec;
5661                         XLogRecPtr      recptr;
5662                         XLogRecData rdata[2];
5663
5664                         /* NO ELOG(ERROR) from here till newpage op is logged */
5665                         START_CRIT_SECTION();
5666
5667                         xlrec.node = dst->smgr_rnode;
5668                         xlrec.blkno = blkno;
5669
5670                         rdata[0].data = (char *) &xlrec;
5671                         rdata[0].len = SizeOfHeapNewpage;
5672                         rdata[0].buffer = InvalidBuffer;
5673                         rdata[0].next = &(rdata[1]);
5674
5675                         rdata[1].data = (char *) page;
5676                         rdata[1].len = BLCKSZ;
5677                         rdata[1].buffer = InvalidBuffer;
5678                         rdata[1].next = NULL;
5679
5680                         recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_NEWPAGE, rdata);
5681
5682                         PageSetLSN(page, recptr);
5683                         PageSetTLI(page, ThisTimeLineID);
5684
5685                         END_CRIT_SECTION();
5686                 }
5687
5688                 /*
5689                  * Now write the page.  We say isTemp = true even if it's not a
5690                  * temp rel, because there's no need for smgr to schedule an fsync
5691                  * for this write; we'll do it ourselves below.
5692                  */
5693                 smgrwrite(dst, blkno, buf, true);
5694         }
5695
5696         /*
5697          * If the rel isn't temp, we must fsync it down to disk before it's
5698          * safe to commit the transaction.      (For a temp rel we don't care
5699          * since the rel will be uninteresting after a crash anyway.)
5700          *
5701          * It's obvious that we must do this when not WAL-logging the copy. It's
5702          * less obvious that we have to do it even if we did WAL-log the
5703          * copied pages.  The reason is that since we're copying outside
5704          * shared buffers, a CHECKPOINT occurring during the copy has no way
5705          * to flush the previously written data to disk (indeed it won't know
5706          * the new rel even exists).  A crash later on would replay WAL from
5707          * the checkpoint, therefore it wouldn't replay our earlier WAL
5708          * entries. If we do not fsync those pages here, they might still not
5709          * be on disk when the crash occurs.
5710          */
5711         if (!rel->rd_istemp)
5712                 smgrimmedsync(dst);
5713 }
5714
5715 /*
5716  * ALTER TABLE CREATE TOAST TABLE
5717  *
5718  * Note: this is also invoked from outside this module; in such cases we
5719  * expect the caller to have verified that the relation is a table and we
5720  * have all the right permissions.      Callers expect this function
5721  * to end with CommandCounterIncrement if it makes any changes.
5722  */
5723 void
5724 AlterTableCreateToastTable(Oid relOid, bool silent)
5725 {
5726         Relation        rel;
5727         HeapTuple       reltup;
5728         TupleDesc       tupdesc;
5729         bool            shared_relation;
5730         Relation        class_rel;
5731         Oid                     toast_relid;
5732         Oid                     toast_idxid;
5733         char            toast_relname[NAMEDATALEN];
5734         char            toast_idxname[NAMEDATALEN];
5735         IndexInfo  *indexInfo;
5736         Oid                     classObjectId[2];
5737         ObjectAddress baseobject,
5738                                 toastobject;
5739
5740         /*
5741          * Grab an exclusive lock on the target table, which we will NOT
5742          * release until end of transaction.  (This is probably redundant in
5743          * all present uses...)
5744          */
5745         rel = heap_open(relOid, AccessExclusiveLock);
5746
5747         /*
5748          * Toast table is shared if and only if its parent is.
5749          *
5750          * We cannot allow toasting a shared relation after initdb (because
5751          * there's no way to mark it toasted in other databases' pg_class).
5752          * Unfortunately we can't distinguish initdb from a manually started
5753          * standalone backend (toasting happens after the bootstrap phase, so
5754          * checking IsBootstrapProcessingMode() won't work).  However, we can
5755          * at least prevent this mistake under normal multi-user operation.
5756          */
5757         shared_relation = rel->rd_rel->relisshared;
5758         if (shared_relation && IsUnderPostmaster)
5759                 ereport(ERROR,
5760                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5761                                  errmsg("shared tables cannot be toasted after initdb")));
5762
5763         /*
5764          * Is it already toasted?
5765          */
5766         if (rel->rd_rel->reltoastrelid != InvalidOid)
5767         {
5768                 if (silent)
5769                 {
5770                         heap_close(rel, NoLock);
5771                         return;
5772                 }
5773
5774                 ereport(ERROR,
5775                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5776                                  errmsg("table \"%s\" already has a TOAST table",
5777                                                 RelationGetRelationName(rel))));
5778         }
5779
5780         /*
5781          * Check to see whether the table actually needs a TOAST table.
5782          */
5783         if (!needs_toast_table(rel))
5784         {
5785                 if (silent)
5786                 {
5787                         heap_close(rel, NoLock);
5788                         return;
5789                 }
5790
5791                 ereport(ERROR,
5792                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5793                                  errmsg("table \"%s\" does not need a TOAST table",
5794                                                 RelationGetRelationName(rel))));
5795         }
5796
5797         /*
5798          * Create the toast table and its index
5799          */
5800         snprintf(toast_relname, sizeof(toast_relname),
5801                          "pg_toast_%u", relOid);
5802         snprintf(toast_idxname, sizeof(toast_idxname),
5803                          "pg_toast_%u_index", relOid);
5804
5805         /* this is pretty painful...  need a tuple descriptor */
5806         tupdesc = CreateTemplateTupleDesc(3, false);
5807         TupleDescInitEntry(tupdesc, (AttrNumber) 1,
5808                                            "chunk_id",
5809                                            OIDOID,
5810                                            -1, 0);
5811         TupleDescInitEntry(tupdesc, (AttrNumber) 2,
5812                                            "chunk_seq",
5813                                            INT4OID,
5814                                            -1, 0);
5815         TupleDescInitEntry(tupdesc, (AttrNumber) 3,
5816                                            "chunk_data",
5817                                            BYTEAOID,
5818                                            -1, 0);
5819
5820         /*
5821          * Ensure that the toast table doesn't itself get toasted, or we'll be
5822          * toast :-(.  This is essential for chunk_data because type bytea is
5823          * toastable; hit the other two just to be sure.
5824          */
5825         tupdesc->attrs[0]->attstorage = 'p';
5826         tupdesc->attrs[1]->attstorage = 'p';
5827         tupdesc->attrs[2]->attstorage = 'p';
5828
5829         /*
5830          * Note: the toast relation is placed in the regular pg_toast
5831          * namespace even if its master relation is a temp table.  There
5832          * cannot be any naming collision, and the toast rel will be destroyed
5833          * when its master is, so there's no need to handle the toast rel as
5834          * temp.
5835          */
5836         toast_relid = heap_create_with_catalog(toast_relname,
5837                                                                                    PG_TOAST_NAMESPACE,
5838                                                                                    rel->rd_rel->reltablespace,
5839                                                                                    InvalidOid,
5840                                                                                    tupdesc,
5841                                                                                    RELKIND_TOASTVALUE,
5842                                                                                    shared_relation,
5843                                                                                    true,
5844                                                                                    0,
5845                                                                                    ONCOMMIT_NOOP,
5846                                                                                    true);
5847
5848         /* make the toast relation visible, else index creation will fail */
5849         CommandCounterIncrement();
5850
5851         /*
5852          * Create unique index on chunk_id, chunk_seq.
5853          *
5854          * NOTE: the normal TOAST access routines could actually function with a
5855          * single-column index on chunk_id only. However, the slice access
5856          * routines use both columns for faster access to an individual chunk.
5857          * In addition, we want it to be unique as a check against the
5858          * possibility of duplicate TOAST chunk OIDs. The index might also be
5859          * a little more efficient this way, since btree isn't all that happy
5860          * with large numbers of equal keys.
5861          */
5862
5863         indexInfo = makeNode(IndexInfo);
5864         indexInfo->ii_NumIndexAttrs = 2;
5865         indexInfo->ii_KeyAttrNumbers[0] = 1;
5866         indexInfo->ii_KeyAttrNumbers[1] = 2;
5867         indexInfo->ii_Expressions = NIL;
5868         indexInfo->ii_ExpressionsState = NIL;
5869         indexInfo->ii_Predicate = NIL;
5870         indexInfo->ii_PredicateState = NIL;
5871         indexInfo->ii_Unique = true;
5872
5873         classObjectId[0] = OID_BTREE_OPS_OID;
5874         classObjectId[1] = INT4_BTREE_OPS_OID;
5875
5876         toast_idxid = index_create(toast_relid, toast_idxname, InvalidOid,
5877                                                            indexInfo,
5878                                                            BTREE_AM_OID,
5879                                                            rel->rd_rel->reltablespace,
5880                                                            classObjectId,
5881                                                            true, false, true, false);
5882
5883         /*
5884          * Update toast rel's pg_class entry to show that it has an index. The
5885          * index OID is stored into the reltoastidxid field for easy access by
5886          * the tuple toaster.
5887          */
5888         setRelhasindex(toast_relid, true, true, toast_idxid);
5889
5890         /*
5891          * Store the toast table's OID in the parent relation's pg_class row
5892          */
5893         class_rel = heap_open(RelationRelationId, RowExclusiveLock);
5894
5895         reltup = SearchSysCacheCopy(RELOID,
5896                                                                 ObjectIdGetDatum(relOid),
5897                                                                 0, 0, 0);
5898         if (!HeapTupleIsValid(reltup))
5899                 elog(ERROR, "cache lookup failed for relation %u", relOid);
5900
5901         ((Form_pg_class) GETSTRUCT(reltup))->reltoastrelid = toast_relid;
5902
5903         simple_heap_update(class_rel, &reltup->t_self, reltup);
5904
5905         /* Keep catalog indexes current */
5906         CatalogUpdateIndexes(class_rel, reltup);
5907
5908         heap_freetuple(reltup);
5909
5910         heap_close(class_rel, RowExclusiveLock);
5911
5912         /*
5913          * Register dependency from the toast table to the master, so that the
5914          * toast table will be deleted if the master is.
5915          */
5916         baseobject.classId = RelationRelationId;
5917         baseobject.objectId = relOid;
5918         baseobject.objectSubId = 0;
5919         toastobject.classId = RelationRelationId;
5920         toastobject.objectId = toast_relid;
5921         toastobject.objectSubId = 0;
5922
5923         recordDependencyOn(&toastobject, &baseobject, DEPENDENCY_INTERNAL);
5924
5925         /*
5926          * Clean up and make changes visible
5927          */
5928         heap_close(rel, NoLock);
5929
5930         CommandCounterIncrement();
5931 }
5932
5933 /*
5934  * Check to see whether the table needs a TOAST table.  It does only if
5935  * (1) there are any toastable attributes, and (2) the maximum length
5936  * of a tuple could exceed TOAST_TUPLE_THRESHOLD.  (We don't want to
5937  * create a toast table for something like "f1 varchar(20)".)
5938  */
5939 static bool
5940 needs_toast_table(Relation rel)
5941 {
5942         int32           data_length = 0;
5943         bool            maxlength_unknown = false;
5944         bool            has_toastable_attrs = false;
5945         TupleDesc       tupdesc;
5946         Form_pg_attribute *att;
5947         int32           tuple_length;
5948         int                     i;
5949
5950         tupdesc = rel->rd_att;
5951         att = tupdesc->attrs;
5952
5953         for (i = 0; i < tupdesc->natts; i++)
5954         {
5955                 if (att[i]->attisdropped)
5956                         continue;
5957                 data_length = att_align(data_length, att[i]->attalign);
5958                 if (att[i]->attlen > 0)
5959                 {
5960                         /* Fixed-length types are never toastable */
5961                         data_length += att[i]->attlen;
5962                 }
5963                 else
5964                 {
5965                         int32           maxlen = type_maximum_size(att[i]->atttypid,
5966                                                                                                    att[i]->atttypmod);
5967
5968                         if (maxlen < 0)
5969                                 maxlength_unknown = true;
5970                         else
5971                                 data_length += maxlen;
5972                         if (att[i]->attstorage != 'p')
5973                                 has_toastable_attrs = true;
5974                 }
5975         }
5976         if (!has_toastable_attrs)
5977                 return false;                   /* nothing to toast? */
5978         if (maxlength_unknown)
5979                 return true;                    /* any unlimited-length attrs? */
5980         tuple_length = MAXALIGN(offsetof(HeapTupleHeaderData, t_bits) +
5981                                                         BITMAPLEN(tupdesc->natts)) +
5982                 MAXALIGN(data_length);
5983         return (tuple_length > TOAST_TUPLE_THRESHOLD);
5984 }
5985
5986
5987 /*
5988  * This code supports
5989  *      CREATE TEMP TABLE ... ON COMMIT { DROP | PRESERVE ROWS | DELETE ROWS }
5990  *
5991  * Because we only support this for TEMP tables, it's sufficient to remember
5992  * the state in a backend-local data structure.
5993  */
5994
5995 /*
5996  * Register a newly-created relation's ON COMMIT action.
5997  */
5998 void
5999 register_on_commit_action(Oid relid, OnCommitAction action)
6000 {
6001         OnCommitItem *oc;
6002         MemoryContext oldcxt;
6003
6004         /*
6005          * We needn't bother registering the relation unless there is an ON
6006          * COMMIT action we need to take.
6007          */
6008         if (action == ONCOMMIT_NOOP || action == ONCOMMIT_PRESERVE_ROWS)
6009                 return;
6010
6011         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
6012
6013         oc = (OnCommitItem *) palloc(sizeof(OnCommitItem));
6014         oc->relid = relid;
6015         oc->oncommit = action;
6016         oc->creating_subid = GetCurrentSubTransactionId();
6017         oc->deleting_subid = InvalidSubTransactionId;
6018
6019         on_commits = lcons(oc, on_commits);
6020
6021         MemoryContextSwitchTo(oldcxt);
6022 }
6023
6024 /*
6025  * Unregister any ON COMMIT action when a relation is deleted.
6026  *
6027  * Actually, we only mark the OnCommitItem entry as to be deleted after commit.
6028  */
6029 void
6030 remove_on_commit_action(Oid relid)
6031 {
6032         ListCell   *l;
6033
6034         foreach(l, on_commits)
6035         {
6036                 OnCommitItem *oc = (OnCommitItem *) lfirst(l);
6037
6038                 if (oc->relid == relid)
6039                 {
6040                         oc->deleting_subid = GetCurrentSubTransactionId();
6041                         break;
6042                 }
6043         }
6044 }
6045
6046 /*
6047  * Perform ON COMMIT actions.
6048  *
6049  * This is invoked just before actually committing, since it's possible
6050  * to encounter errors.
6051  */
6052 void
6053 PreCommit_on_commit_actions(void)
6054 {
6055         ListCell   *l;
6056         List       *oids_to_truncate = NIL;
6057
6058         foreach(l, on_commits)
6059         {
6060                 OnCommitItem *oc = (OnCommitItem *) lfirst(l);
6061
6062                 /* Ignore entry if already dropped in this xact */
6063                 if (oc->deleting_subid != InvalidSubTransactionId)
6064                         continue;
6065
6066                 switch (oc->oncommit)
6067                 {
6068                         case ONCOMMIT_NOOP:
6069                         case ONCOMMIT_PRESERVE_ROWS:
6070                                 /* Do nothing (there shouldn't be such entries, actually) */
6071                                 break;
6072                         case ONCOMMIT_DELETE_ROWS:
6073                                 oids_to_truncate = lappend_oid(oids_to_truncate, oc->relid);
6074                                 break;
6075                         case ONCOMMIT_DROP:
6076                                 {
6077                                         ObjectAddress object;
6078
6079                                         object.classId = RelationRelationId;
6080                                         object.objectId = oc->relid;
6081                                         object.objectSubId = 0;
6082                                         performDeletion(&object, DROP_CASCADE);
6083
6084                                         /*
6085                                          * Note that table deletion will call
6086                                          * remove_on_commit_action, so the entry should get
6087                                          * marked as deleted.
6088                                          */
6089                                         Assert(oc->deleting_subid != InvalidSubTransactionId);
6090                                         break;
6091                                 }
6092                 }
6093         }
6094         if (oids_to_truncate != NIL)
6095         {
6096                 heap_truncate(oids_to_truncate);
6097                 CommandCounterIncrement();                              /* XXX needed? */
6098         }
6099 }
6100
6101 /*
6102  * Post-commit or post-abort cleanup for ON COMMIT management.
6103  *
6104  * All we do here is remove no-longer-needed OnCommitItem entries.
6105  *
6106  * During commit, remove entries that were deleted during this transaction;
6107  * during abort, remove those created during this transaction.
6108  */
6109 void
6110 AtEOXact_on_commit_actions(bool isCommit)
6111 {
6112         ListCell   *cur_item;
6113         ListCell   *prev_item;
6114
6115         prev_item = NULL;
6116         cur_item = list_head(on_commits);
6117
6118         while (cur_item != NULL)
6119         {
6120                 OnCommitItem *oc = (OnCommitItem *) lfirst(cur_item);
6121
6122                 if (isCommit ? oc->deleting_subid != InvalidSubTransactionId :
6123                         oc->creating_subid != InvalidSubTransactionId)
6124                 {
6125                         /* cur_item must be removed */
6126                         on_commits = list_delete_cell(on_commits, cur_item, prev_item);
6127                         pfree(oc);
6128                         if (prev_item)
6129                                 cur_item = lnext(prev_item);
6130                         else
6131                                 cur_item = list_head(on_commits);
6132                 }
6133                 else
6134                 {
6135                         /* cur_item must be preserved */
6136                         oc->creating_subid = InvalidSubTransactionId;
6137                         oc->deleting_subid = InvalidSubTransactionId;
6138                         prev_item = cur_item;
6139                         cur_item = lnext(prev_item);
6140                 }
6141         }
6142 }
6143
6144 /*
6145  * Post-subcommit or post-subabort cleanup for ON COMMIT management.
6146  *
6147  * During subabort, we can immediately remove entries created during this
6148  * subtransaction.      During subcommit, just relabel entries marked during
6149  * this subtransaction as being the parent's responsibility.
6150  */
6151 void
6152 AtEOSubXact_on_commit_actions(bool isCommit, SubTransactionId mySubid,
6153                                                           SubTransactionId parentSubid)
6154 {
6155         ListCell   *cur_item;
6156         ListCell   *prev_item;
6157
6158         prev_item = NULL;
6159         cur_item = list_head(on_commits);
6160
6161         while (cur_item != NULL)
6162         {
6163                 OnCommitItem *oc = (OnCommitItem *) lfirst(cur_item);
6164
6165                 if (!isCommit && oc->creating_subid == mySubid)
6166                 {
6167                         /* cur_item must be removed */
6168                         on_commits = list_delete_cell(on_commits, cur_item, prev_item);
6169                         pfree(oc);
6170                         if (prev_item)
6171                                 cur_item = lnext(prev_item);
6172                         else
6173                                 cur_item = list_head(on_commits);
6174                 }
6175                 else
6176                 {
6177                         /* cur_item must be preserved */
6178                         if (oc->creating_subid == mySubid)
6179                                 oc->creating_subid = parentSubid;
6180                         if (oc->deleting_subid == mySubid)
6181                                 oc->deleting_subid = isCommit ? parentSubid : InvalidSubTransactionId;
6182                         prev_item = cur_item;
6183                         cur_item = lnext(prev_item);
6184                 }
6185         }
6186 }