OSDN Git Service

Re-run pgindent, fixing a problem where comment lines after a blank
[pg-rex/syncrep.git] / src / backend / commands / tablecmds.c
1 /*-------------------------------------------------------------------------
2  *
3  * tablecmds.c
4  *        Commands for creating and altering table structures and settings
5  *
6  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.176 2005/11/22 18:17:09 momjian Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/genam.h"
18 #include "access/tuptoaster.h"
19 #include "catalog/catalog.h"
20 #include "catalog/dependency.h"
21 #include "catalog/heap.h"
22 #include "catalog/index.h"
23 #include "catalog/indexing.h"
24 #include "catalog/namespace.h"
25 #include "catalog/pg_constraint.h"
26 #include "catalog/pg_depend.h"
27 #include "catalog/pg_inherits.h"
28 #include "catalog/pg_namespace.h"
29 #include "catalog/pg_opclass.h"
30 #include "catalog/pg_trigger.h"
31 #include "catalog/pg_type.h"
32 #include "commands/cluster.h"
33 #include "commands/defrem.h"
34 #include "commands/tablecmds.h"
35 #include "commands/tablespace.h"
36 #include "commands/trigger.h"
37 #include "commands/typecmds.h"
38 #include "executor/executor.h"
39 #include "lib/stringinfo.h"
40 #include "miscadmin.h"
41 #include "nodes/makefuncs.h"
42 #include "optimizer/clauses.h"
43 #include "optimizer/plancat.h"
44 #include "optimizer/prep.h"
45 #include "parser/analyze.h"
46 #include "parser/gramparse.h"
47 #include "parser/parser.h"
48 #include "parser/parse_clause.h"
49 #include "parser/parse_coerce.h"
50 #include "parser/parse_expr.h"
51 #include "parser/parse_oper.h"
52 #include "parser/parse_relation.h"
53 #include "parser/parse_type.h"
54 #include "rewrite/rewriteHandler.h"
55 #include "storage/smgr.h"
56 #include "utils/acl.h"
57 #include "utils/builtins.h"
58 #include "utils/fmgroids.h"
59 #include "utils/inval.h"
60 #include "utils/lsyscache.h"
61 #include "utils/memutils.h"
62 #include "utils/relcache.h"
63 #include "utils/syscache.h"
64
65
66 /*
67  * ON COMMIT action list
68  */
69 typedef struct OnCommitItem
70 {
71         Oid                     relid;                  /* relid of relation */
72         OnCommitAction oncommit;        /* what to do at end of xact */
73
74         /*
75          * If this entry was created during the current transaction,
76          * creating_subid is the ID of the creating subxact; if created in a prior
77          * transaction, creating_subid is zero.  If deleted during the current
78          * transaction, deleting_subid is the ID of the deleting subxact; if no
79          * deletion request is pending, deleting_subid is zero.
80          */
81         SubTransactionId creating_subid;
82         SubTransactionId deleting_subid;
83 } OnCommitItem;
84
85 static List *on_commits = NIL;
86
87
88 /*
89  * State information for ALTER TABLE
90  *
91  * The pending-work queue for an ALTER TABLE is a List of AlteredTableInfo
92  * structs, one for each table modified by the operation (the named table
93  * plus any child tables that are affected).  We save lists of subcommands
94  * to apply to this table (possibly modified by parse transformation steps);
95  * these lists will be executed in Phase 2.  If a Phase 3 step is needed,
96  * necessary information is stored in the constraints and newvals lists.
97  *
98  * Phase 2 is divided into multiple passes; subcommands are executed in
99  * a pass determined by subcommand type.
100  */
101
102 #define AT_PASS_DROP                    0               /* DROP (all flavors) */
103 #define AT_PASS_ALTER_TYPE              1               /* ALTER COLUMN TYPE */
104 #define AT_PASS_OLD_INDEX               2               /* re-add existing indexes */
105 #define AT_PASS_OLD_CONSTR              3               /* re-add existing constraints */
106 #define AT_PASS_COL_ATTRS               4               /* set other column attributes */
107 /* We could support a RENAME COLUMN pass here, but not currently used */
108 #define AT_PASS_ADD_COL                 5               /* ADD COLUMN */
109 #define AT_PASS_ADD_INDEX               6               /* ADD indexes */
110 #define AT_PASS_ADD_CONSTR              7               /* ADD constraints, defaults */
111 #define AT_PASS_MISC                    8               /* other stuff */
112 #define AT_NUM_PASSES                   9
113
114 typedef struct AlteredTableInfo
115 {
116         /* Information saved before any work commences: */
117         Oid                     relid;                  /* Relation to work on */
118         char            relkind;                /* Its relkind */
119         TupleDesc       oldDesc;                /* Pre-modification tuple descriptor */
120         /* Information saved by Phase 1 for Phase 2: */
121         List       *subcmds[AT_NUM_PASSES]; /* Lists of AlterTableCmd */
122         /* Information saved by Phases 1/2 for Phase 3: */
123         List       *constraints;        /* List of NewConstraint */
124         List       *newvals;            /* List of NewColumnValue */
125         Oid                     newTableSpace;  /* new tablespace; 0 means no change */
126         /* Objects to rebuild after completing ALTER TYPE operations */
127         List       *changedConstraintOids;      /* OIDs of constraints to rebuild */
128         List       *changedConstraintDefs;      /* string definitions of same */
129         List       *changedIndexOids;           /* OIDs of indexes to rebuild */
130         List       *changedIndexDefs;           /* string definitions of same */
131 } AlteredTableInfo;
132
133 /* Struct describing one new constraint to check in Phase 3 scan */
134 typedef struct NewConstraint
135 {
136         char       *name;                       /* Constraint name, or NULL if none */
137         ConstrType      contype;                /* CHECK, NOT_NULL, or FOREIGN */
138         AttrNumber      attnum;                 /* only relevant for NOT_NULL */
139         Oid                     refrelid;               /* PK rel, if FOREIGN */
140         Node       *qual;                       /* Check expr or FkConstraint struct */
141         List       *qualstate;          /* Execution state for CHECK */
142 } NewConstraint;
143
144 /*
145  * Struct describing one new column value that needs to be computed during
146  * Phase 3 copy (this could be either a new column with a non-null default, or
147  * a column that we're changing the type of).  Columns without such an entry
148  * are just copied from the old table during ATRewriteTable.  Note that the
149  * expr is an expression over *old* table values.
150  */
151 typedef struct NewColumnValue
152 {
153         AttrNumber      attnum;                 /* which column */
154         Expr       *expr;                       /* expression to compute */
155         ExprState  *exprstate;          /* execution state */
156 } NewColumnValue;
157
158
159 static List *MergeAttributes(List *schema, List *supers, bool istemp,
160                                 List **supOids, List **supconstr, int *supOidCount);
161 static bool change_varattnos_of_a_node(Node *node, const AttrNumber *newattno);
162 static void StoreCatalogInheritance(Oid relationId, List *supers);
163 static int      findAttrByName(const char *attributeName, List *schema);
164 static void setRelhassubclassInRelation(Oid relationId, bool relhassubclass);
165 static bool needs_toast_table(Relation rel);
166 static void AlterIndexNamespaces(Relation classRel, Relation rel,
167                                          Oid oldNspOid, Oid newNspOid);
168 static void AlterSeqNamespaces(Relation classRel, Relation rel,
169                                    Oid oldNspOid, Oid newNspOid,
170                                    const char *newNspName);
171 static int transformColumnNameList(Oid relId, List *colList,
172                                                 int16 *attnums, Oid *atttypids);
173 static int transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
174                                                    List **attnamelist,
175                                                    int16 *attnums, Oid *atttypids,
176                                                    Oid *opclasses);
177 static Oid transformFkeyCheckAttrs(Relation pkrel,
178                                                 int numattrs, int16 *attnums,
179                                                 Oid *opclasses);
180 static void validateForeignKeyConstraint(FkConstraint *fkconstraint,
181                                                          Relation rel, Relation pkrel);
182 static void createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint,
183                                                  Oid constrOid);
184 static char *fkMatchTypeToString(char match_type);
185 static void ATController(Relation rel, List *cmds, bool recurse);
186 static void ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
187                   bool recurse, bool recursing);
188 static void ATRewriteCatalogs(List **wqueue);
189 static void ATExecCmd(AlteredTableInfo *tab, Relation rel, AlterTableCmd *cmd);
190 static void ATRewriteTables(List **wqueue);
191 static void ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap);
192 static AlteredTableInfo *ATGetQueueEntry(List **wqueue, Relation rel);
193 static void ATSimplePermissions(Relation rel, bool allowView);
194 static void ATSimpleRecursion(List **wqueue, Relation rel,
195                                   AlterTableCmd *cmd, bool recurse);
196 static void ATOneLevelRecursion(List **wqueue, Relation rel,
197                                         AlterTableCmd *cmd);
198 static void find_composite_type_dependencies(Oid typeOid,
199                                                                  const char *origTblName);
200 static void ATPrepAddColumn(List **wqueue, Relation rel, bool recurse,
201                                 AlterTableCmd *cmd);
202 static void ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
203                                 ColumnDef *colDef);
204 static void add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid);
205 static void add_column_support_dependency(Oid relid, int32 attnum,
206                                                           RangeVar *support);
207 static void ATExecDropNotNull(Relation rel, const char *colName);
208 static void ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
209                                  const char *colName);
210 static void ATExecColumnDefault(Relation rel, const char *colName,
211                                         Node *newDefault);
212 static void ATPrepSetStatistics(Relation rel, const char *colName,
213                                         Node *flagValue);
214 static void ATExecSetStatistics(Relation rel, const char *colName,
215                                         Node *newValue);
216 static void ATExecSetStorage(Relation rel, const char *colName,
217                                  Node *newValue);
218 static void ATExecDropColumn(Relation rel, const char *colName,
219                                  DropBehavior behavior,
220                                  bool recurse, bool recursing);
221 static void ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
222                            IndexStmt *stmt, bool is_rebuild);
223 static void ATExecAddConstraint(AlteredTableInfo *tab, Relation rel,
224                                         Node *newConstraint);
225 static void ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
226                                                   FkConstraint *fkconstraint);
227 static void ATPrepDropConstraint(List **wqueue, Relation rel,
228                                          bool recurse, AlterTableCmd *cmd);
229 static void ATExecDropConstraint(Relation rel, const char *constrName,
230                                          DropBehavior behavior, bool quiet);
231 static void ATPrepAlterColumnType(List **wqueue,
232                                           AlteredTableInfo *tab, Relation rel,
233                                           bool recurse, bool recursing,
234                                           AlterTableCmd *cmd);
235 static void ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
236                                           const char *colName, TypeName *typename);
237 static void ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab);
238 static void ATPostAlterTypeParse(char *cmd, List **wqueue);
239 static void change_owner_recurse_to_sequences(Oid relationOid,
240                                                                   Oid newOwnerId);
241 static void ATExecClusterOn(Relation rel, const char *indexName);
242 static void ATExecDropCluster(Relation rel);
243 static void ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel,
244                                         char *tablespacename);
245 static void ATExecSetTableSpace(Oid tableOid, Oid newTableSpace);
246 static void ATExecEnableDisableTrigger(Relation rel, char *trigname,
247                                                    bool enable, bool skip_system);
248 static void copy_relation_data(Relation rel, SMgrRelation dst);
249 static void update_ri_trigger_args(Oid relid,
250                                            const char *oldname,
251                                            const char *newname,
252                                            bool fk_scan,
253                                            bool update_relname);
254
255
256 /* ----------------------------------------------------------------
257  *              DefineRelation
258  *                              Creates a new relation.
259  *
260  * If successful, returns the OID of the new relation.
261  * ----------------------------------------------------------------
262  */
263 Oid
264 DefineRelation(CreateStmt *stmt, char relkind)
265 {
266         char            relname[NAMEDATALEN];
267         Oid                     namespaceId;
268         List       *schema = stmt->tableElts;
269         Oid                     relationId;
270         Oid                     tablespaceId;
271         Relation        rel;
272         TupleDesc       descriptor;
273         List       *inheritOids;
274         List       *old_constraints;
275         bool            localHasOids;
276         int                     parentOidCount;
277         List       *rawDefaults;
278         ListCell   *listptr;
279         int                     i;
280         AttrNumber      attnum;
281
282         /*
283          * Truncate relname to appropriate length (probably a waste of time, as
284          * parser should have done this already).
285          */
286         StrNCpy(relname, stmt->relation->relname, NAMEDATALEN);
287
288         /*
289          * Check consistency of arguments
290          */
291         if (stmt->oncommit != ONCOMMIT_NOOP && !stmt->relation->istemp)
292                 ereport(ERROR,
293                                 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
294                                  errmsg("ON COMMIT can only be used on temporary tables")));
295
296         /*
297          * Look up the namespace in which we are supposed to create the relation.
298          * Check we have permission to create there. Skip check if bootstrapping,
299          * since permissions machinery may not be working yet.
300          */
301         namespaceId = RangeVarGetCreationNamespace(stmt->relation);
302
303         if (!IsBootstrapProcessingMode())
304         {
305                 AclResult       aclresult;
306
307                 aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(),
308                                                                                   ACL_CREATE);
309                 if (aclresult != ACLCHECK_OK)
310                         aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
311                                                    get_namespace_name(namespaceId));
312         }
313
314         /*
315          * Select tablespace to use.  If not specified, use default_tablespace
316          * (which may in turn default to database's default).
317          */
318         if (stmt->tablespacename)
319         {
320                 tablespaceId = get_tablespace_oid(stmt->tablespacename);
321                 if (!OidIsValid(tablespaceId))
322                         ereport(ERROR,
323                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
324                                          errmsg("tablespace \"%s\" does not exist",
325                                                         stmt->tablespacename)));
326         }
327         else
328         {
329                 tablespaceId = GetDefaultTablespace();
330                 /* note InvalidOid is OK in this case */
331         }
332
333         /* Check permissions except when using database's default */
334         if (OidIsValid(tablespaceId))
335         {
336                 AclResult       aclresult;
337
338                 aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(),
339                                                                                    ACL_CREATE);
340                 if (aclresult != ACLCHECK_OK)
341                         aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
342                                                    get_tablespace_name(tablespaceId));
343         }
344
345         /*
346          * Look up inheritance ancestors and generate relation schema, including
347          * inherited attributes.
348          */
349         schema = MergeAttributes(schema, stmt->inhRelations,
350                                                          stmt->relation->istemp,
351                                                          &inheritOids, &old_constraints, &parentOidCount);
352
353         /*
354          * Create a relation descriptor from the relation schema and create the
355          * relation.  Note that in this stage only inherited (pre-cooked) defaults
356          * and constraints will be included into the new relation.
357          * (BuildDescForRelation takes care of the inherited defaults, but we have
358          * to copy inherited constraints here.)
359          */
360         descriptor = BuildDescForRelation(schema);
361
362         localHasOids = interpretOidsOption(stmt->hasoids);
363         descriptor->tdhasoid = (localHasOids || parentOidCount > 0);
364
365         if (old_constraints != NIL)
366         {
367                 ConstrCheck *check = (ConstrCheck *)
368                 palloc0(list_length(old_constraints) * sizeof(ConstrCheck));
369                 int                     ncheck = 0;
370
371                 foreach(listptr, old_constraints)
372                 {
373                         Constraint *cdef = (Constraint *) lfirst(listptr);
374                         bool            dup = false;
375
376                         if (cdef->contype != CONSTR_CHECK)
377                                 continue;
378                         Assert(cdef->name != NULL);
379                         Assert(cdef->raw_expr == NULL && cdef->cooked_expr != NULL);
380
381                         /*
382                          * In multiple-inheritance situations, it's possible to inherit
383                          * the same grandparent constraint through multiple parents.
384                          * Hence, discard inherited constraints that match as to both name
385                          * and expression.      Otherwise, gripe if the names conflict.
386                          */
387                         for (i = 0; i < ncheck; i++)
388                         {
389                                 if (strcmp(check[i].ccname, cdef->name) != 0)
390                                         continue;
391                                 if (strcmp(check[i].ccbin, cdef->cooked_expr) == 0)
392                                 {
393                                         dup = true;
394                                         break;
395                                 }
396                                 ereport(ERROR,
397                                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
398                                                  errmsg("duplicate check constraint name \"%s\"",
399                                                                 cdef->name)));
400                         }
401                         if (!dup)
402                         {
403                                 check[ncheck].ccname = cdef->name;
404                                 check[ncheck].ccbin = pstrdup(cdef->cooked_expr);
405                                 ncheck++;
406                         }
407                 }
408                 if (ncheck > 0)
409                 {
410                         if (descriptor->constr == NULL)
411                         {
412                                 descriptor->constr = (TupleConstr *) palloc(sizeof(TupleConstr));
413                                 descriptor->constr->defval = NULL;
414                                 descriptor->constr->num_defval = 0;
415                                 descriptor->constr->has_not_null = false;
416                         }
417                         descriptor->constr->num_check = ncheck;
418                         descriptor->constr->check = check;
419                 }
420         }
421
422         relationId = heap_create_with_catalog(relname,
423                                                                                   namespaceId,
424                                                                                   tablespaceId,
425                                                                                   InvalidOid,
426                                                                                   GetUserId(),
427                                                                                   descriptor,
428                                                                                   relkind,
429                                                                                   false,
430                                                                                   localHasOids,
431                                                                                   parentOidCount,
432                                                                                   stmt->oncommit,
433                                                                                   allowSystemTableMods);
434
435         StoreCatalogInheritance(relationId, inheritOids);
436
437         /*
438          * We must bump the command counter to make the newly-created relation
439          * tuple visible for opening.
440          */
441         CommandCounterIncrement();
442
443         /*
444          * Open the new relation and acquire exclusive lock on it.      This isn't
445          * really necessary for locking out other backends (since they can't see
446          * the new rel anyway until we commit), but it keeps the lock manager from
447          * complaining about deadlock risks.
448          */
449         rel = relation_open(relationId, AccessExclusiveLock);
450
451         /*
452          * Now add any newly specified column default values and CHECK constraints
453          * to the new relation.  These are passed to us in the form of raw
454          * parsetrees; we need to transform them to executable expression trees
455          * before they can be added. The most convenient way to do that is to
456          * apply the parser's transformExpr routine, but transformExpr doesn't
457          * work unless we have a pre-existing relation. So, the transformation has
458          * to be postponed to this final step of CREATE TABLE.
459          *
460          * Another task that's conveniently done at this step is to add dependency
461          * links between columns and supporting relations (such as SERIAL
462          * sequences).
463          *
464          * First, scan schema to find new column defaults.
465          */
466         rawDefaults = NIL;
467         attnum = 0;
468
469         foreach(listptr, schema)
470         {
471                 ColumnDef  *colDef = lfirst(listptr);
472
473                 attnum++;
474
475                 if (colDef->raw_default != NULL)
476                 {
477                         RawColumnDefault *rawEnt;
478
479                         Assert(colDef->cooked_default == NULL);
480
481                         rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault));
482                         rawEnt->attnum = attnum;
483                         rawEnt->raw_default = colDef->raw_default;
484                         rawDefaults = lappend(rawDefaults, rawEnt);
485                 }
486
487                 /* Create dependency for supporting relation for this column */
488                 if (colDef->support != NULL)
489                         add_column_support_dependency(relationId, attnum, colDef->support);
490         }
491
492         /*
493          * Parse and add the defaults/constraints, if any.
494          */
495         if (rawDefaults || stmt->constraints)
496                 AddRelationRawConstraints(rel, rawDefaults, stmt->constraints);
497
498         /*
499          * Clean up.  We keep lock on new relation (although it shouldn't be
500          * visible to anyone else anyway, until commit).
501          */
502         relation_close(rel, NoLock);
503
504         return relationId;
505 }
506
507 /*
508  * RemoveRelation
509  *              Deletes a relation.
510  */
511 void
512 RemoveRelation(const RangeVar *relation, DropBehavior behavior)
513 {
514         Oid                     relOid;
515         ObjectAddress object;
516
517         relOid = RangeVarGetRelid(relation, false);
518
519         object.classId = RelationRelationId;
520         object.objectId = relOid;
521         object.objectSubId = 0;
522
523         performDeletion(&object, behavior);
524 }
525
526 /*
527  * ExecuteTruncate
528  *              Executes a TRUNCATE command.
529  *
530  * This is a multi-relation truncate.  It first opens and grabs exclusive
531  * locks on all relations involved, checking permissions and otherwise
532  * verifying that the relation is OK for truncation.  When they are all
533  * open, it checks foreign key references on them, namely that FK references
534  * are all internal to the group that's being truncated.  Finally all
535  * relations are truncated and reindexed.
536  */
537 void
538 ExecuteTruncate(List *relations)
539 {
540         List       *rels = NIL;
541         ListCell   *cell;
542
543         foreach(cell, relations)
544         {
545                 RangeVar   *rv = lfirst(cell);
546                 Relation        rel;
547
548                 /* Grab exclusive lock in preparation for truncate */
549                 rel = heap_openrv(rv, AccessExclusiveLock);
550
551                 /* Only allow truncate on regular tables */
552                 if (rel->rd_rel->relkind != RELKIND_RELATION)
553                         ereport(ERROR,
554                                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
555                                          errmsg("\"%s\" is not a table",
556                                                         RelationGetRelationName(rel))));
557
558                 /* Permissions checks */
559                 if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
560                         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
561                                                    RelationGetRelationName(rel));
562
563                 if (!allowSystemTableMods && IsSystemRelation(rel))
564                         ereport(ERROR,
565                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
566                                          errmsg("permission denied: \"%s\" is a system catalog",
567                                                         RelationGetRelationName(rel))));
568
569                 /*
570                  * We can never allow truncation of shared or nailed-in-cache
571                  * relations, because we can't support changing their relfilenode
572                  * values.
573                  */
574                 if (rel->rd_rel->relisshared || rel->rd_isnailed)
575                         ereport(ERROR,
576                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
577                                          errmsg("cannot truncate system relation \"%s\"",
578                                                         RelationGetRelationName(rel))));
579
580                 /*
581                  * Don't allow truncate on temp tables of other backends ... their
582                  * local buffer manager is not going to cope.
583                  */
584                 if (isOtherTempNamespace(RelationGetNamespace(rel)))
585                         ereport(ERROR,
586                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
587                           errmsg("cannot truncate temporary tables of other sessions")));
588
589                 /* Save it into the list of rels to truncate */
590                 rels = lappend(rels, rel);
591         }
592
593         /*
594          * Check foreign key references.
595          */
596         heap_truncate_check_FKs(rels, false);
597
598         /*
599          * OK, truncate each table.
600          */
601         foreach(cell, rels)
602         {
603                 Relation        rel = lfirst(cell);
604                 Oid                     heap_relid;
605                 Oid                     toast_relid;
606
607                 /*
608                  * Create a new empty storage file for the relation, and assign it as
609                  * the relfilenode value.       The old storage file is scheduled for
610                  * deletion at commit.
611                  */
612                 setNewRelfilenode(rel);
613
614                 heap_relid = RelationGetRelid(rel);
615                 toast_relid = rel->rd_rel->reltoastrelid;
616
617                 heap_close(rel, NoLock);
618
619                 /*
620                  * The same for the toast table, if any.
621                  */
622                 if (OidIsValid(toast_relid))
623                 {
624                         rel = relation_open(toast_relid, AccessExclusiveLock);
625                         setNewRelfilenode(rel);
626                         heap_close(rel, NoLock);
627                 }
628
629                 /*
630                  * Reconstruct the indexes to match, and we're done.
631                  */
632                 reindex_relation(heap_relid, true);
633         }
634 }
635
636 /*----------
637  * MergeAttributes
638  *              Returns new schema given initial schema and superclasses.
639  *
640  * Input arguments:
641  * 'schema' is the column/attribute definition for the table. (It's a list
642  *              of ColumnDef's.) It is destructively changed.
643  * 'supers' is a list of names (as RangeVar nodes) of parent relations.
644  * 'istemp' is TRUE if we are creating a temp relation.
645  *
646  * Output arguments:
647  * 'supOids' receives a list of the OIDs of the parent relations.
648  * 'supconstr' receives a list of constraints belonging to the parents,
649  *              updated as necessary to be valid for the child.
650  * 'supOidCount' is set to the number of parents that have OID columns.
651  *
652  * Return value:
653  * Completed schema list.
654  *
655  * Notes:
656  *        The order in which the attributes are inherited is very important.
657  *        Intuitively, the inherited attributes should come first. If a table
658  *        inherits from multiple parents, the order of those attributes are
659  *        according to the order of the parents specified in CREATE TABLE.
660  *
661  *        Here's an example:
662  *
663  *              create table person (name text, age int4, location point);
664  *              create table emp (salary int4, manager text) inherits(person);
665  *              create table student (gpa float8) inherits (person);
666  *              create table stud_emp (percent int4) inherits (emp, student);
667  *
668  *        The order of the attributes of stud_emp is:
669  *
670  *                                                      person {1:name, 2:age, 3:location}
671  *                                                      /        \
672  *                         {6:gpa}      student   emp {4:salary, 5:manager}
673  *                                                      \        /
674  *                                                 stud_emp {7:percent}
675  *
676  *         If the same attribute name appears multiple times, then it appears
677  *         in the result table in the proper location for its first appearance.
678  *
679  *         Constraints (including NOT NULL constraints) for the child table
680  *         are the union of all relevant constraints, from both the child schema
681  *         and parent tables.
682  *
683  *         The default value for a child column is defined as:
684  *              (1) If the child schema specifies a default, that value is used.
685  *              (2) If neither the child nor any parent specifies a default, then
686  *                      the column will not have a default.
687  *              (3) If conflicting defaults are inherited from different parents
688  *                      (and not overridden by the child), an error is raised.
689  *              (4) Otherwise the inherited default is used.
690  *              Rule (3) is new in Postgres 7.1; in earlier releases you got a
691  *              rather arbitrary choice of which parent default to use.
692  *----------
693  */
694 static List *
695 MergeAttributes(List *schema, List *supers, bool istemp,
696                                 List **supOids, List **supconstr, int *supOidCount)
697 {
698         ListCell   *entry;
699         List       *inhSchema = NIL;
700         List       *parentOids = NIL;
701         List       *constraints = NIL;
702         int                     parentsWithOids = 0;
703         bool            have_bogus_defaults = false;
704         char       *bogus_marker = "Bogus!";            /* marks conflicting defaults */
705         int                     child_attno;
706
707         /*
708          * Check for and reject tables with too many columns. We perform this
709          * check relatively early for two reasons: (a) we don't run the risk of
710          * overflowing an AttrNumber in subsequent code (b) an O(n^2) algorithm is
711          * okay if we're processing <= 1600 columns, but could take minutes to
712          * execute if the user attempts to create a table with hundreds of
713          * thousands of columns.
714          *
715          * Note that we also need to check that any we do not exceed this figure
716          * after including columns from inherited relations.
717          */
718         if (list_length(schema) > MaxHeapAttributeNumber)
719                 ereport(ERROR,
720                                 (errcode(ERRCODE_TOO_MANY_COLUMNS),
721                                  errmsg("tables can have at most %d columns",
722                                                 MaxHeapAttributeNumber)));
723
724         /*
725          * Check for duplicate names in the explicit list of attributes.
726          *
727          * Although we might consider merging such entries in the same way that we
728          * handle name conflicts for inherited attributes, it seems to make more
729          * sense to assume such conflicts are errors.
730          */
731         foreach(entry, schema)
732         {
733                 ColumnDef  *coldef = lfirst(entry);
734                 ListCell   *rest;
735
736                 for_each_cell(rest, lnext(entry))
737                 {
738                         ColumnDef  *restdef = lfirst(rest);
739
740                         if (strcmp(coldef->colname, restdef->colname) == 0)
741                                 ereport(ERROR,
742                                                 (errcode(ERRCODE_DUPLICATE_COLUMN),
743                                                  errmsg("column \"%s\" duplicated",
744                                                                 coldef->colname)));
745                 }
746         }
747
748         /*
749          * Scan the parents left-to-right, and merge their attributes to form a
750          * list of inherited attributes (inhSchema).  Also check to see if we need
751          * to inherit an OID column.
752          */
753         child_attno = 0;
754         foreach(entry, supers)
755         {
756                 RangeVar   *parent = (RangeVar *) lfirst(entry);
757                 Relation        relation;
758                 TupleDesc       tupleDesc;
759                 TupleConstr *constr;
760                 AttrNumber *newattno;
761                 AttrNumber      parent_attno;
762
763                 relation = heap_openrv(parent, AccessShareLock);
764
765                 if (relation->rd_rel->relkind != RELKIND_RELATION)
766                         ereport(ERROR,
767                                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
768                                          errmsg("inherited relation \"%s\" is not a table",
769                                                         parent->relname)));
770                 /* Permanent rels cannot inherit from temporary ones */
771                 if (!istemp && isTempNamespace(RelationGetNamespace(relation)))
772                         ereport(ERROR,
773                                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
774                                          errmsg("cannot inherit from temporary relation \"%s\"",
775                                                         parent->relname)));
776
777                 /*
778                  * We should have an UNDER permission flag for this, but for now,
779                  * demand that creator of a child table own the parent.
780                  */
781                 if (!pg_class_ownercheck(RelationGetRelid(relation), GetUserId()))
782                         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
783                                                    RelationGetRelationName(relation));
784
785                 /*
786                  * Reject duplications in the list of parents.
787                  */
788                 if (list_member_oid(parentOids, RelationGetRelid(relation)))
789                         ereport(ERROR,
790                                         (errcode(ERRCODE_DUPLICATE_TABLE),
791                                          errmsg("inherited relation \"%s\" duplicated",
792                                                         parent->relname)));
793
794                 parentOids = lappend_oid(parentOids, RelationGetRelid(relation));
795
796                 if (relation->rd_rel->relhasoids)
797                         parentsWithOids++;
798
799                 tupleDesc = RelationGetDescr(relation);
800                 constr = tupleDesc->constr;
801
802                 /*
803                  * newattno[] will contain the child-table attribute numbers for the
804                  * attributes of this parent table.  (They are not the same for
805                  * parents after the first one, nor if we have dropped columns.)
806                  */
807                 newattno = (AttrNumber *)
808                         palloc(tupleDesc->natts * sizeof(AttrNumber));
809
810                 for (parent_attno = 1; parent_attno <= tupleDesc->natts;
811                          parent_attno++)
812                 {
813                         Form_pg_attribute attribute = tupleDesc->attrs[parent_attno - 1];
814                         char       *attributeName = NameStr(attribute->attname);
815                         int                     exist_attno;
816                         ColumnDef  *def;
817                         TypeName   *typename;
818
819                         /*
820                          * Ignore dropped columns in the parent.
821                          */
822                         if (attribute->attisdropped)
823                         {
824                                 /*
825                                  * change_varattnos_of_a_node asserts that this is greater
826                                  * than zero, so if anything tries to use it, we should find
827                                  * out.
828                                  */
829                                 newattno[parent_attno - 1] = 0;
830                                 continue;
831                         }
832
833                         /*
834                          * Does it conflict with some previously inherited column?
835                          */
836                         exist_attno = findAttrByName(attributeName, inhSchema);
837                         if (exist_attno > 0)
838                         {
839                                 /*
840                                  * Yes, try to merge the two column definitions. They must
841                                  * have the same type and typmod.
842                                  */
843                                 ereport(NOTICE,
844                                                 (errmsg("merging multiple inherited definitions of column \"%s\"",
845                                                                 attributeName)));
846                                 def = (ColumnDef *) list_nth(inhSchema, exist_attno - 1);
847                                 if (typenameTypeId(def->typename) != attribute->atttypid ||
848                                         def->typename->typmod != attribute->atttypmod)
849                                         ereport(ERROR,
850                                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
851                                                 errmsg("inherited column \"%s\" has a type conflict",
852                                                            attributeName),
853                                                          errdetail("%s versus %s",
854                                                                            TypeNameToString(def->typename),
855                                                                            format_type_be(attribute->atttypid))));
856                                 def->inhcount++;
857                                 /* Merge of NOT NULL constraints = OR 'em together */
858                                 def->is_not_null |= attribute->attnotnull;
859                                 /* Default and other constraints are handled below */
860                                 newattno[parent_attno - 1] = exist_attno;
861                         }
862                         else
863                         {
864                                 /*
865                                  * No, create a new inherited column
866                                  */
867                                 def = makeNode(ColumnDef);
868                                 def->colname = pstrdup(attributeName);
869                                 typename = makeNode(TypeName);
870                                 typename->typeid = attribute->atttypid;
871                                 typename->typmod = attribute->atttypmod;
872                                 def->typename = typename;
873                                 def->inhcount = 1;
874                                 def->is_local = false;
875                                 def->is_not_null = attribute->attnotnull;
876                                 def->raw_default = NULL;
877                                 def->cooked_default = NULL;
878                                 def->constraints = NIL;
879                                 def->support = NULL;
880                                 inhSchema = lappend(inhSchema, def);
881                                 newattno[parent_attno - 1] = ++child_attno;
882                         }
883
884                         /*
885                          * Copy default if any
886                          */
887                         if (attribute->atthasdef)
888                         {
889                                 char       *this_default = NULL;
890                                 AttrDefault *attrdef;
891                                 int                     i;
892
893                                 /* Find default in constraint structure */
894                                 Assert(constr != NULL);
895                                 attrdef = constr->defval;
896                                 for (i = 0; i < constr->num_defval; i++)
897                                 {
898                                         if (attrdef[i].adnum == parent_attno)
899                                         {
900                                                 this_default = attrdef[i].adbin;
901                                                 break;
902                                         }
903                                 }
904                                 Assert(this_default != NULL);
905
906                                 /*
907                                  * If default expr could contain any vars, we'd need to fix
908                                  * 'em, but it can't; so default is ready to apply to child.
909                                  *
910                                  * If we already had a default from some prior parent, check
911                                  * to see if they are the same.  If so, no problem; if not,
912                                  * mark the column as having a bogus default. Below, we will
913                                  * complain if the bogus default isn't overridden by the child
914                                  * schema.
915                                  */
916                                 Assert(def->raw_default == NULL);
917                                 if (def->cooked_default == NULL)
918                                         def->cooked_default = pstrdup(this_default);
919                                 else if (strcmp(def->cooked_default, this_default) != 0)
920                                 {
921                                         def->cooked_default = bogus_marker;
922                                         have_bogus_defaults = true;
923                                 }
924                         }
925                 }
926
927                 /*
928                  * Now copy the constraints of this parent, adjusting attnos using the
929                  * completed newattno[] map
930                  */
931                 if (constr && constr->num_check > 0)
932                 {
933                         ConstrCheck *check = constr->check;
934                         int                     i;
935
936                         for (i = 0; i < constr->num_check; i++)
937                         {
938                                 Constraint *cdef = makeNode(Constraint);
939                                 Node       *expr;
940
941                                 cdef->contype = CONSTR_CHECK;
942                                 cdef->name = pstrdup(check[i].ccname);
943                                 cdef->raw_expr = NULL;
944                                 /* adjust varattnos of ccbin here */
945                                 expr = stringToNode(check[i].ccbin);
946                                 change_varattnos_of_a_node(expr, newattno);
947                                 cdef->cooked_expr = nodeToString(expr);
948                                 constraints = lappend(constraints, cdef);
949                         }
950                 }
951
952                 pfree(newattno);
953
954                 /*
955                  * Close the parent rel, but keep our AccessShareLock on it until xact
956                  * commit.      That will prevent someone else from deleting or ALTERing
957                  * the parent before the child is committed.
958                  */
959                 heap_close(relation, NoLock);
960         }
961
962         /*
963          * If we had no inherited attributes, the result schema is just the
964          * explicitly declared columns.  Otherwise, we need to merge the declared
965          * columns into the inherited schema list.
966          */
967         if (inhSchema != NIL)
968         {
969                 foreach(entry, schema)
970                 {
971                         ColumnDef  *newdef = lfirst(entry);
972                         char       *attributeName = newdef->colname;
973                         int                     exist_attno;
974
975                         /*
976                          * Does it conflict with some previously inherited column?
977                          */
978                         exist_attno = findAttrByName(attributeName, inhSchema);
979                         if (exist_attno > 0)
980                         {
981                                 ColumnDef  *def;
982
983                                 /*
984                                  * Yes, try to merge the two column definitions. They must
985                                  * have the same type and typmod.
986                                  */
987                                 ereport(NOTICE,
988                                    (errmsg("merging column \"%s\" with inherited definition",
989                                                    attributeName)));
990                                 def = (ColumnDef *) list_nth(inhSchema, exist_attno - 1);
991                                 if (typenameTypeId(def->typename) != typenameTypeId(newdef->typename) ||
992                                         def->typename->typmod != newdef->typename->typmod)
993                                         ereport(ERROR,
994                                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
995                                                          errmsg("column \"%s\" has a type conflict",
996                                                                         attributeName),
997                                                          errdetail("%s versus %s",
998                                                                            TypeNameToString(def->typename),
999                                                                            TypeNameToString(newdef->typename))));
1000                                 /* Mark the column as locally defined */
1001                                 def->is_local = true;
1002                                 /* Merge of NOT NULL constraints = OR 'em together */
1003                                 def->is_not_null |= newdef->is_not_null;
1004                                 /* If new def has a default, override previous default */
1005                                 if (newdef->raw_default != NULL)
1006                                 {
1007                                         def->raw_default = newdef->raw_default;
1008                                         def->cooked_default = newdef->cooked_default;
1009                                 }
1010                         }
1011                         else
1012                         {
1013                                 /*
1014                                  * No, attach new column to result schema
1015                                  */
1016                                 inhSchema = lappend(inhSchema, newdef);
1017                         }
1018                 }
1019
1020                 schema = inhSchema;
1021
1022                 /*
1023                  * Check that we haven't exceeded the legal # of columns after merging
1024                  * in inherited columns.
1025                  */
1026                 if (list_length(schema) > MaxHeapAttributeNumber)
1027                         ereport(ERROR,
1028                                         (errcode(ERRCODE_TOO_MANY_COLUMNS),
1029                                          errmsg("tables can have at most %d columns",
1030                                                         MaxHeapAttributeNumber)));
1031         }
1032
1033         /*
1034          * If we found any conflicting parent default values, check to make sure
1035          * they were overridden by the child.
1036          */
1037         if (have_bogus_defaults)
1038         {
1039                 foreach(entry, schema)
1040                 {
1041                         ColumnDef  *def = lfirst(entry);
1042
1043                         if (def->cooked_default == bogus_marker)
1044                                 ereport(ERROR,
1045                                                 (errcode(ERRCODE_INVALID_COLUMN_DEFINITION),
1046                                   errmsg("column \"%s\" inherits conflicting default values",
1047                                                  def->colname),
1048                                                  errhint("To resolve the conflict, specify a default explicitly.")));
1049                 }
1050         }
1051
1052         *supOids = parentOids;
1053         *supconstr = constraints;
1054         *supOidCount = parentsWithOids;
1055         return schema;
1056 }
1057
1058 /*
1059  * complementary static functions for MergeAttributes().
1060  *
1061  * Varattnos of pg_constraint.conbin must be rewritten when subclasses inherit
1062  * constraints from parent classes, since the inherited attributes could
1063  * be given different column numbers in multiple-inheritance cases.
1064  *
1065  * Note that the passed node tree is modified in place!
1066  */
1067 static bool
1068 change_varattnos_walker(Node *node, const AttrNumber *newattno)
1069 {
1070         if (node == NULL)
1071                 return false;
1072         if (IsA(node, Var))
1073         {
1074                 Var                *var = (Var *) node;
1075
1076                 if (var->varlevelsup == 0 && var->varno == 1 &&
1077                         var->varattno > 0)
1078                 {
1079                         /*
1080                          * ??? the following may be a problem when the node is multiply
1081                          * referenced though stringToNode() doesn't create such a node
1082                          * currently.
1083                          */
1084                         Assert(newattno[var->varattno - 1] > 0);
1085                         var->varattno = newattno[var->varattno - 1];
1086                 }
1087                 return false;
1088         }
1089         return expression_tree_walker(node, change_varattnos_walker,
1090                                                                   (void *) newattno);
1091 }
1092
1093 static bool
1094 change_varattnos_of_a_node(Node *node, const AttrNumber *newattno)
1095 {
1096         return change_varattnos_walker(node, newattno);
1097 }
1098
1099 /*
1100  * StoreCatalogInheritance
1101  *              Updates the system catalogs with proper inheritance information.
1102  *
1103  * supers is a list of the OIDs of the new relation's direct ancestors.
1104  */
1105 static void
1106 StoreCatalogInheritance(Oid relationId, List *supers)
1107 {
1108         Relation        relation;
1109         TupleDesc       desc;
1110         int16           seqNumber;
1111         ListCell   *entry;
1112         HeapTuple       tuple;
1113
1114         /*
1115          * sanity checks
1116          */
1117         AssertArg(OidIsValid(relationId));
1118
1119         if (supers == NIL)
1120                 return;
1121
1122         /*
1123          * Store INHERITS information in pg_inherits using direct ancestors only.
1124          * Also enter dependencies on the direct ancestors, and make sure they are
1125          * marked with relhassubclass = true.
1126          *
1127          * (Once upon a time, both direct and indirect ancestors were found here
1128          * and then entered into pg_ipl.  Since that catalog doesn't exist
1129          * anymore, there's no need to look for indirect ancestors.)
1130          */
1131         relation = heap_open(InheritsRelationId, RowExclusiveLock);
1132         desc = RelationGetDescr(relation);
1133
1134         seqNumber = 1;
1135         foreach(entry, supers)
1136         {
1137                 Oid                     parentOid = lfirst_oid(entry);
1138                 Datum           datum[Natts_pg_inherits];
1139                 char            nullarr[Natts_pg_inherits];
1140                 ObjectAddress childobject,
1141                                         parentobject;
1142
1143                 datum[0] = ObjectIdGetDatum(relationId);                /* inhrel */
1144                 datum[1] = ObjectIdGetDatum(parentOid); /* inhparent */
1145                 datum[2] = Int16GetDatum(seqNumber);    /* inhseqno */
1146
1147                 nullarr[0] = ' ';
1148                 nullarr[1] = ' ';
1149                 nullarr[2] = ' ';
1150
1151                 tuple = heap_formtuple(desc, datum, nullarr);
1152
1153                 simple_heap_insert(relation, tuple);
1154
1155                 CatalogUpdateIndexes(relation, tuple);
1156
1157                 heap_freetuple(tuple);
1158
1159                 /*
1160                  * Store a dependency too
1161                  */
1162                 parentobject.classId = RelationRelationId;
1163                 parentobject.objectId = parentOid;
1164                 parentobject.objectSubId = 0;
1165                 childobject.classId = RelationRelationId;
1166                 childobject.objectId = relationId;
1167                 childobject.objectSubId = 0;
1168
1169                 recordDependencyOn(&childobject, &parentobject, DEPENDENCY_NORMAL);
1170
1171                 /*
1172                  * Mark the parent as having subclasses.
1173                  */
1174                 setRelhassubclassInRelation(parentOid, true);
1175
1176                 seqNumber += 1;
1177         }
1178
1179         heap_close(relation, RowExclusiveLock);
1180 }
1181
1182 /*
1183  * Look for an existing schema entry with the given name.
1184  *
1185  * Returns the index (starting with 1) if attribute already exists in schema,
1186  * 0 if it doesn't.
1187  */
1188 static int
1189 findAttrByName(const char *attributeName, List *schema)
1190 {
1191         ListCell   *s;
1192         int                     i = 1;
1193
1194         foreach(s, schema)
1195         {
1196                 ColumnDef  *def = lfirst(s);
1197
1198                 if (strcmp(attributeName, def->colname) == 0)
1199                         return i;
1200
1201                 i++;
1202         }
1203         return 0;
1204 }
1205
1206 /*
1207  * Update a relation's pg_class.relhassubclass entry to the given value
1208  */
1209 static void
1210 setRelhassubclassInRelation(Oid relationId, bool relhassubclass)
1211 {
1212         Relation        relationRelation;
1213         HeapTuple       tuple;
1214         Form_pg_class classtuple;
1215
1216         /*
1217          * Fetch a modifiable copy of the tuple, modify it, update pg_class.
1218          *
1219          * If the tuple already has the right relhassubclass setting, we don't
1220          * need to update it, but we still need to issue an SI inval message.
1221          */
1222         relationRelation = heap_open(RelationRelationId, RowExclusiveLock);
1223         tuple = SearchSysCacheCopy(RELOID,
1224                                                            ObjectIdGetDatum(relationId),
1225                                                            0, 0, 0);
1226         if (!HeapTupleIsValid(tuple))
1227                 elog(ERROR, "cache lookup failed for relation %u", relationId);
1228         classtuple = (Form_pg_class) GETSTRUCT(tuple);
1229
1230         if (classtuple->relhassubclass != relhassubclass)
1231         {
1232                 classtuple->relhassubclass = relhassubclass;
1233                 simple_heap_update(relationRelation, &tuple->t_self, tuple);
1234
1235                 /* keep the catalog indexes up to date */
1236                 CatalogUpdateIndexes(relationRelation, tuple);
1237         }
1238         else
1239         {
1240                 /* no need to change tuple, but force relcache rebuild anyway */
1241                 CacheInvalidateRelcacheByTuple(tuple);
1242         }
1243
1244         heap_freetuple(tuple);
1245         heap_close(relationRelation, RowExclusiveLock);
1246 }
1247
1248
1249 /*
1250  *              renameatt               - changes the name of a attribute in a relation
1251  *
1252  *              Attname attribute is changed in attribute catalog.
1253  *              No record of the previous attname is kept (correct?).
1254  *
1255  *              get proper relrelation from relation catalog (if not arg)
1256  *              scan attribute catalog
1257  *                              for name conflict (within rel)
1258  *                              for original attribute (if not arg)
1259  *              modify attname in attribute tuple
1260  *              insert modified attribute in attribute catalog
1261  *              delete original attribute from attribute catalog
1262  */
1263 void
1264 renameatt(Oid myrelid,
1265                   const char *oldattname,
1266                   const char *newattname,
1267                   bool recurse,
1268                   bool recursing)
1269 {
1270         Relation        targetrelation;
1271         Relation        attrelation;
1272         HeapTuple       atttup;
1273         Form_pg_attribute attform;
1274         int                     attnum;
1275         List       *indexoidlist;
1276         ListCell   *indexoidscan;
1277
1278         /*
1279          * Grab an exclusive lock on the target table, which we will NOT release
1280          * until end of transaction.
1281          */
1282         targetrelation = relation_open(myrelid, AccessExclusiveLock);
1283
1284         /*
1285          * permissions checking.  this would normally be done in utility.c, but
1286          * this particular routine is recursive.
1287          *
1288          * normally, only the owner of a class can change its schema.
1289          */
1290         if (!pg_class_ownercheck(myrelid, GetUserId()))
1291                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
1292                                            RelationGetRelationName(targetrelation));
1293         if (!allowSystemTableMods && IsSystemRelation(targetrelation))
1294                 ereport(ERROR,
1295                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1296                                  errmsg("permission denied: \"%s\" is a system catalog",
1297                                                 RelationGetRelationName(targetrelation))));
1298
1299         /*
1300          * if the 'recurse' flag is set then we are supposed to rename this
1301          * attribute in all classes that inherit from 'relname' (as well as in
1302          * 'relname').
1303          *
1304          * any permissions or problems with duplicate attributes will cause the
1305          * whole transaction to abort, which is what we want -- all or nothing.
1306          */
1307         if (recurse)
1308         {
1309                 ListCell   *child;
1310                 List       *children;
1311
1312                 /* this routine is actually in the planner */
1313                 children = find_all_inheritors(myrelid);
1314
1315                 /*
1316                  * find_all_inheritors does the recursive search of the inheritance
1317                  * hierarchy, so all we have to do is process all of the relids in the
1318                  * list that it returns.
1319                  */
1320                 foreach(child, children)
1321                 {
1322                         Oid                     childrelid = lfirst_oid(child);
1323
1324                         if (childrelid == myrelid)
1325                                 continue;
1326                         /* note we need not recurse again */
1327                         renameatt(childrelid, oldattname, newattname, false, true);
1328                 }
1329         }
1330         else
1331         {
1332                 /*
1333                  * If we are told not to recurse, there had better not be any child
1334                  * tables; else the rename would put them out of step.
1335                  */
1336                 if (!recursing &&
1337                         find_inheritance_children(myrelid) != NIL)
1338                         ereport(ERROR,
1339                                         (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
1340                                          errmsg("inherited column \"%s\" must be renamed in child tables too",
1341                                                         oldattname)));
1342         }
1343
1344         attrelation = heap_open(AttributeRelationId, RowExclusiveLock);
1345
1346         atttup = SearchSysCacheCopyAttName(myrelid, oldattname);
1347         if (!HeapTupleIsValid(atttup))
1348                 ereport(ERROR,
1349                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
1350                                  errmsg("column \"%s\" does not exist",
1351                                                 oldattname)));
1352         attform = (Form_pg_attribute) GETSTRUCT(atttup);
1353
1354         attnum = attform->attnum;
1355         if (attnum <= 0)
1356                 ereport(ERROR,
1357                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1358                                  errmsg("cannot rename system column \"%s\"",
1359                                                 oldattname)));
1360
1361         /*
1362          * if the attribute is inherited, forbid the renaming, unless we are
1363          * already inside a recursive rename.
1364          */
1365         if (attform->attinhcount > 0 && !recursing)
1366                 ereport(ERROR,
1367                                 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
1368                                  errmsg("cannot rename inherited column \"%s\"",
1369                                                 oldattname)));
1370
1371         /* should not already exist */
1372         /* this test is deliberately not attisdropped-aware */
1373         if (SearchSysCacheExists(ATTNAME,
1374                                                          ObjectIdGetDatum(myrelid),
1375                                                          PointerGetDatum(newattname),
1376                                                          0, 0))
1377                 ereport(ERROR,
1378                                 (errcode(ERRCODE_DUPLICATE_COLUMN),
1379                                  errmsg("column \"%s\" of relation \"%s\" already exists",
1380                                           newattname, RelationGetRelationName(targetrelation))));
1381
1382         namestrcpy(&(attform->attname), newattname);
1383
1384         simple_heap_update(attrelation, &atttup->t_self, atttup);
1385
1386         /* keep system catalog indexes current */
1387         CatalogUpdateIndexes(attrelation, atttup);
1388
1389         heap_freetuple(atttup);
1390
1391         /*
1392          * Update column names of indexes that refer to the column being renamed.
1393          */
1394         indexoidlist = RelationGetIndexList(targetrelation);
1395
1396         foreach(indexoidscan, indexoidlist)
1397         {
1398                 Oid                     indexoid = lfirst_oid(indexoidscan);
1399                 HeapTuple       indextup;
1400                 Form_pg_index indexform;
1401                 int                     i;
1402
1403                 /*
1404                  * Scan through index columns to see if there's any simple index
1405                  * entries for this attribute.  We ignore expressional entries.
1406                  */
1407                 indextup = SearchSysCache(INDEXRELID,
1408                                                                   ObjectIdGetDatum(indexoid),
1409                                                                   0, 0, 0);
1410                 if (!HeapTupleIsValid(indextup))
1411                         elog(ERROR, "cache lookup failed for index %u", indexoid);
1412                 indexform = (Form_pg_index) GETSTRUCT(indextup);
1413
1414                 for (i = 0; i < indexform->indnatts; i++)
1415                 {
1416                         if (attnum != indexform->indkey.values[i])
1417                                 continue;
1418
1419                         /*
1420                          * Found one, rename it.
1421                          */
1422                         atttup = SearchSysCacheCopy(ATTNUM,
1423                                                                                 ObjectIdGetDatum(indexoid),
1424                                                                                 Int16GetDatum(i + 1),
1425                                                                                 0, 0);
1426                         if (!HeapTupleIsValid(atttup))
1427                                 continue;               /* should we raise an error? */
1428
1429                         /*
1430                          * Update the (copied) attribute tuple.
1431                          */
1432                         namestrcpy(&(((Form_pg_attribute) GETSTRUCT(atttup))->attname),
1433                                            newattname);
1434
1435                         simple_heap_update(attrelation, &atttup->t_self, atttup);
1436
1437                         /* keep system catalog indexes current */
1438                         CatalogUpdateIndexes(attrelation, atttup);
1439
1440                         heap_freetuple(atttup);
1441                 }
1442
1443                 ReleaseSysCache(indextup);
1444         }
1445
1446         list_free(indexoidlist);
1447
1448         heap_close(attrelation, RowExclusiveLock);
1449
1450         /*
1451          * Update att name in any RI triggers associated with the relation.
1452          */
1453         if (targetrelation->rd_rel->reltriggers > 0)
1454         {
1455                 /* update tgargs column reference where att is primary key */
1456                 update_ri_trigger_args(RelationGetRelid(targetrelation),
1457                                                            oldattname, newattname,
1458                                                            false, false);
1459                 /* update tgargs column reference where att is foreign key */
1460                 update_ri_trigger_args(RelationGetRelid(targetrelation),
1461                                                            oldattname, newattname,
1462                                                            true, false);
1463         }
1464
1465         relation_close(targetrelation, NoLock);         /* close rel but keep lock */
1466 }
1467
1468 /*
1469  *              renamerel               - change the name of a relation
1470  *
1471  *              XXX - When renaming sequences, we don't bother to modify the
1472  *                        sequence name that is stored within the sequence itself
1473  *                        (this would cause problems with MVCC). In the future,
1474  *                        the sequence name should probably be removed from the
1475  *                        sequence, AFAIK there's no need for it to be there.
1476  */
1477 void
1478 renamerel(Oid myrelid, const char *newrelname)
1479 {
1480         Relation        targetrelation;
1481         Relation        relrelation;    /* for RELATION relation */
1482         HeapTuple       reltup;
1483         Oid                     namespaceId;
1484         char       *oldrelname;
1485         char            relkind;
1486         bool            relhastriggers;
1487
1488         /*
1489          * Grab an exclusive lock on the target table or index, which we will NOT
1490          * release until end of transaction.
1491          */
1492         targetrelation = relation_open(myrelid, AccessExclusiveLock);
1493
1494         oldrelname = pstrdup(RelationGetRelationName(targetrelation));
1495         namespaceId = RelationGetNamespace(targetrelation);
1496
1497         if (!allowSystemTableMods && IsSystemRelation(targetrelation))
1498                 ereport(ERROR,
1499                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1500                                  errmsg("permission denied: \"%s\" is a system catalog",
1501                                                 RelationGetRelationName(targetrelation))));
1502
1503         relkind = targetrelation->rd_rel->relkind;
1504         relhastriggers = (targetrelation->rd_rel->reltriggers > 0);
1505
1506         /*
1507          * Find relation's pg_class tuple, and make sure newrelname isn't in use.
1508          */
1509         relrelation = heap_open(RelationRelationId, RowExclusiveLock);
1510
1511         reltup = SearchSysCacheCopy(RELOID,
1512                                                                 PointerGetDatum(myrelid),
1513                                                                 0, 0, 0);
1514         if (!HeapTupleIsValid(reltup))          /* shouldn't happen */
1515                 elog(ERROR, "cache lookup failed for relation %u", myrelid);
1516
1517         if (get_relname_relid(newrelname, namespaceId) != InvalidOid)
1518                 ereport(ERROR,
1519                                 (errcode(ERRCODE_DUPLICATE_TABLE),
1520                                  errmsg("relation \"%s\" already exists",
1521                                                 newrelname)));
1522
1523         /*
1524          * Update pg_class tuple with new relname.      (Scribbling on reltup is OK
1525          * because it's a copy...)
1526          */
1527         namestrcpy(&(((Form_pg_class) GETSTRUCT(reltup))->relname), newrelname);
1528
1529         simple_heap_update(relrelation, &reltup->t_self, reltup);
1530
1531         /* keep the system catalog indexes current */
1532         CatalogUpdateIndexes(relrelation, reltup);
1533
1534         heap_freetuple(reltup);
1535         heap_close(relrelation, RowExclusiveLock);
1536
1537         /*
1538          * Also rename the associated type, if any.
1539          */
1540         if (relkind != RELKIND_INDEX)
1541                 TypeRename(oldrelname, namespaceId, newrelname);
1542
1543         /*
1544          * Update rel name in any RI triggers associated with the relation.
1545          */
1546         if (relhastriggers)
1547         {
1548                 /* update tgargs where relname is primary key */
1549                 update_ri_trigger_args(myrelid,
1550                                                            oldrelname,
1551                                                            newrelname,
1552                                                            false, true);
1553                 /* update tgargs where relname is foreign key */
1554                 update_ri_trigger_args(myrelid,
1555                                                            oldrelname,
1556                                                            newrelname,
1557                                                            true, true);
1558         }
1559
1560         /*
1561          * Close rel, but keep exclusive lock!
1562          */
1563         relation_close(targetrelation, NoLock);
1564 }
1565
1566 /*
1567  * Scan pg_trigger for RI triggers that are on the specified relation
1568  * (if fk_scan is false) or have it as the tgconstrrel (if fk_scan
1569  * is true).  Update RI trigger args fields matching oldname to contain
1570  * newname instead.  If update_relname is true, examine the relname
1571  * fields; otherwise examine the attname fields.
1572  */
1573 static void
1574 update_ri_trigger_args(Oid relid,
1575                                            const char *oldname,
1576                                            const char *newname,
1577                                            bool fk_scan,
1578                                            bool update_relname)
1579 {
1580         Relation        tgrel;
1581         ScanKeyData skey[1];
1582         SysScanDesc trigscan;
1583         HeapTuple       tuple;
1584         Datum           values[Natts_pg_trigger];
1585         char            nulls[Natts_pg_trigger];
1586         char            replaces[Natts_pg_trigger];
1587
1588         tgrel = heap_open(TriggerRelationId, RowExclusiveLock);
1589         if (fk_scan)
1590         {
1591                 ScanKeyInit(&skey[0],
1592                                         Anum_pg_trigger_tgconstrrelid,
1593                                         BTEqualStrategyNumber, F_OIDEQ,
1594                                         ObjectIdGetDatum(relid));
1595                 trigscan = systable_beginscan(tgrel, TriggerConstrRelidIndexId,
1596                                                                           true, SnapshotNow,
1597                                                                           1, skey);
1598         }
1599         else
1600         {
1601                 ScanKeyInit(&skey[0],
1602                                         Anum_pg_trigger_tgrelid,
1603                                         BTEqualStrategyNumber, F_OIDEQ,
1604                                         ObjectIdGetDatum(relid));
1605                 trigscan = systable_beginscan(tgrel, TriggerRelidNameIndexId,
1606                                                                           true, SnapshotNow,
1607                                                                           1, skey);
1608         }
1609
1610         while ((tuple = systable_getnext(trigscan)) != NULL)
1611         {
1612                 Form_pg_trigger pg_trigger = (Form_pg_trigger) GETSTRUCT(tuple);
1613                 bytea      *val;
1614                 bytea      *newtgargs;
1615                 bool            isnull;
1616                 int                     tg_type;
1617                 bool            examine_pk;
1618                 bool            changed;
1619                 int                     tgnargs;
1620                 int                     i;
1621                 int                     newlen;
1622                 const char *arga[RI_MAX_ARGUMENTS];
1623                 const char *argp;
1624
1625                 tg_type = RI_FKey_trigger_type(pg_trigger->tgfoid);
1626                 if (tg_type == RI_TRIGGER_NONE)
1627                 {
1628                         /* Not an RI trigger, forget it */
1629                         continue;
1630                 }
1631
1632                 /*
1633                  * It is an RI trigger, so parse the tgargs bytea.
1634                  *
1635                  * NB: we assume the field will never be compressed or moved out of
1636                  * line; so does trigger.c ...
1637                  */
1638                 tgnargs = pg_trigger->tgnargs;
1639                 val = (bytea *)
1640                         DatumGetPointer(fastgetattr(tuple,
1641                                                                                 Anum_pg_trigger_tgargs,
1642                                                                                 tgrel->rd_att, &isnull));
1643                 if (isnull || tgnargs < RI_FIRST_ATTNAME_ARGNO ||
1644                         tgnargs > RI_MAX_ARGUMENTS)
1645                 {
1646                         /* This probably shouldn't happen, but ignore busted triggers */
1647                         continue;
1648                 }
1649                 argp = (const char *) VARDATA(val);
1650                 for (i = 0; i < tgnargs; i++)
1651                 {
1652                         arga[i] = argp;
1653                         argp += strlen(argp) + 1;
1654                 }
1655
1656                 /*
1657                  * Figure out which item(s) to look at.  If the trigger is primary-key
1658                  * type and attached to my rel, I should look at the PK fields; if it
1659                  * is foreign-key type and attached to my rel, I should look at the FK
1660                  * fields.      But the opposite rule holds when examining triggers found
1661                  * by tgconstrrel search.
1662                  */
1663                 examine_pk = (tg_type == RI_TRIGGER_PK) == (!fk_scan);
1664
1665                 changed = false;
1666                 if (update_relname)
1667                 {
1668                         /* Change the relname if needed */
1669                         i = examine_pk ? RI_PK_RELNAME_ARGNO : RI_FK_RELNAME_ARGNO;
1670                         if (strcmp(arga[i], oldname) == 0)
1671                         {
1672                                 arga[i] = newname;
1673                                 changed = true;
1674                         }
1675                 }
1676                 else
1677                 {
1678                         /* Change attname(s) if needed */
1679                         i = examine_pk ? RI_FIRST_ATTNAME_ARGNO + RI_KEYPAIR_PK_IDX :
1680                                 RI_FIRST_ATTNAME_ARGNO + RI_KEYPAIR_FK_IDX;
1681                         for (; i < tgnargs; i += 2)
1682                         {
1683                                 if (strcmp(arga[i], oldname) == 0)
1684                                 {
1685                                         arga[i] = newname;
1686                                         changed = true;
1687                                 }
1688                         }
1689                 }
1690
1691                 if (!changed)
1692                 {
1693                         /* Don't need to update this tuple */
1694                         continue;
1695                 }
1696
1697                 /*
1698                  * Construct modified tgargs bytea.
1699                  */
1700                 newlen = VARHDRSZ;
1701                 for (i = 0; i < tgnargs; i++)
1702                         newlen += strlen(arga[i]) + 1;
1703                 newtgargs = (bytea *) palloc(newlen);
1704                 VARATT_SIZEP(newtgargs) = newlen;
1705                 newlen = VARHDRSZ;
1706                 for (i = 0; i < tgnargs; i++)
1707                 {
1708                         strcpy(((char *) newtgargs) + newlen, arga[i]);
1709                         newlen += strlen(arga[i]) + 1;
1710                 }
1711
1712                 /*
1713                  * Build modified tuple.
1714                  */
1715                 for (i = 0; i < Natts_pg_trigger; i++)
1716                 {
1717                         values[i] = (Datum) 0;
1718                         replaces[i] = ' ';
1719                         nulls[i] = ' ';
1720                 }
1721                 values[Anum_pg_trigger_tgargs - 1] = PointerGetDatum(newtgargs);
1722                 replaces[Anum_pg_trigger_tgargs - 1] = 'r';
1723
1724                 tuple = heap_modifytuple(tuple, RelationGetDescr(tgrel), values, nulls, replaces);
1725
1726                 /*
1727                  * Update pg_trigger and its indexes
1728                  */
1729                 simple_heap_update(tgrel, &tuple->t_self, tuple);
1730
1731                 CatalogUpdateIndexes(tgrel, tuple);
1732
1733                 /*
1734                  * Invalidate trigger's relation's relcache entry so that other
1735                  * backends (and this one too!) are sent SI message to make them
1736                  * rebuild relcache entries.  (Ideally this should happen
1737                  * automatically...)
1738                  *
1739                  * We can skip this for triggers on relid itself, since that relcache
1740                  * flush will happen anyway due to the table or column rename.  We
1741                  * just need to catch the far ends of RI relationships.
1742                  */
1743                 pg_trigger = (Form_pg_trigger) GETSTRUCT(tuple);
1744                 if (pg_trigger->tgrelid != relid)
1745                         CacheInvalidateRelcacheByRelid(pg_trigger->tgrelid);
1746
1747                 /* free up our scratch memory */
1748                 pfree(newtgargs);
1749                 heap_freetuple(tuple);
1750         }
1751
1752         systable_endscan(trigscan);
1753
1754         heap_close(tgrel, RowExclusiveLock);
1755
1756         /*
1757          * Increment cmd counter to make updates visible; this is needed in case
1758          * the same tuple has to be updated again by next pass (can happen in case
1759          * of a self-referential FK relationship).
1760          */
1761         CommandCounterIncrement();
1762 }
1763
1764 /*
1765  * AlterTable
1766  *              Execute ALTER TABLE, which can be a list of subcommands
1767  *
1768  * ALTER TABLE is performed in three phases:
1769  *              1. Examine subcommands and perform pre-transformation checking.
1770  *              2. Update system catalogs.
1771  *              3. Scan table(s) to check new constraints, and optionally recopy
1772  *                 the data into new table(s).
1773  * Phase 3 is not performed unless one or more of the subcommands requires
1774  * it.  The intention of this design is to allow multiple independent
1775  * updates of the table schema to be performed with only one pass over the
1776  * data.
1777  *
1778  * ATPrepCmd performs phase 1.  A "work queue" entry is created for
1779  * each table to be affected (there may be multiple affected tables if the
1780  * commands traverse a table inheritance hierarchy).  Also we do preliminary
1781  * validation of the subcommands, including parse transformation of those
1782  * expressions that need to be evaluated with respect to the old table
1783  * schema.
1784  *
1785  * ATRewriteCatalogs performs phase 2 for each affected table (note that
1786  * phases 2 and 3 do no explicit recursion, since phase 1 already did it).
1787  * Certain subcommands need to be performed before others to avoid
1788  * unnecessary conflicts; for example, DROP COLUMN should come before
1789  * ADD COLUMN.  Therefore phase 1 divides the subcommands into multiple
1790  * lists, one for each logical "pass" of phase 2.
1791  *
1792  * ATRewriteTables performs phase 3 for those tables that need it.
1793  *
1794  * Thanks to the magic of MVCC, an error anywhere along the way rolls back
1795  * the whole operation; we don't have to do anything special to clean up.
1796  */
1797 void
1798 AlterTable(AlterTableStmt *stmt)
1799 {
1800         ATController(relation_openrv(stmt->relation, AccessExclusiveLock),
1801                                  stmt->cmds,
1802                                  interpretInhOption(stmt->relation->inhOpt));
1803 }
1804
1805 /*
1806  * AlterTableInternal
1807  *
1808  * ALTER TABLE with target specified by OID
1809  */
1810 void
1811 AlterTableInternal(Oid relid, List *cmds, bool recurse)
1812 {
1813         ATController(relation_open(relid, AccessExclusiveLock),
1814                                  cmds,
1815                                  recurse);
1816 }
1817
1818 static void
1819 ATController(Relation rel, List *cmds, bool recurse)
1820 {
1821         List       *wqueue = NIL;
1822         ListCell   *lcmd;
1823
1824         /* Phase 1: preliminary examination of commands, create work queue */
1825         foreach(lcmd, cmds)
1826         {
1827                 AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lcmd);
1828
1829                 ATPrepCmd(&wqueue, rel, cmd, recurse, false);
1830         }
1831
1832         /* Close the relation, but keep lock until commit */
1833         relation_close(rel, NoLock);
1834
1835         /* Phase 2: update system catalogs */
1836         ATRewriteCatalogs(&wqueue);
1837
1838         /* Phase 3: scan/rewrite tables as needed */
1839         ATRewriteTables(&wqueue);
1840 }
1841
1842 /*
1843  * ATPrepCmd
1844  *
1845  * Traffic cop for ALTER TABLE Phase 1 operations, including simple
1846  * recursion and permission checks.
1847  *
1848  * Caller must have acquired AccessExclusiveLock on relation already.
1849  * This lock should be held until commit.
1850  */
1851 static void
1852 ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
1853                   bool recurse, bool recursing)
1854 {
1855         AlteredTableInfo *tab;
1856         int                     pass;
1857
1858         /* Find or create work queue entry for this table */
1859         tab = ATGetQueueEntry(wqueue, rel);
1860
1861         /*
1862          * Copy the original subcommand for each table.  This avoids conflicts
1863          * when different child tables need to make different parse
1864          * transformations (for example, the same column may have different column
1865          * numbers in different children).
1866          */
1867         cmd = copyObject(cmd);
1868
1869         /*
1870          * Do permissions checking, recursion to child tables if needed, and any
1871          * additional phase-1 processing needed.
1872          */
1873         switch (cmd->subtype)
1874         {
1875                 case AT_AddColumn:              /* ADD COLUMN */
1876                         ATSimplePermissions(rel, false);
1877                         /* Performs own recursion */
1878                         ATPrepAddColumn(wqueue, rel, recurse, cmd);
1879                         pass = AT_PASS_ADD_COL;
1880                         break;
1881                 case AT_ColumnDefault:  /* ALTER COLUMN DEFAULT */
1882
1883                         /*
1884                          * We allow defaults on views so that INSERT into a view can have
1885                          * default-ish behavior.  This works because the rewriter
1886                          * substitutes default values into INSERTs before it expands
1887                          * rules.
1888                          */
1889                         ATSimplePermissions(rel, true);
1890                         ATSimpleRecursion(wqueue, rel, cmd, recurse);
1891                         /* No command-specific prep needed */
1892                         pass = AT_PASS_ADD_CONSTR;
1893                         break;
1894                 case AT_DropNotNull:    /* ALTER COLUMN DROP NOT NULL */
1895                         ATSimplePermissions(rel, false);
1896                         ATSimpleRecursion(wqueue, rel, cmd, recurse);
1897                         /* No command-specific prep needed */
1898                         pass = AT_PASS_DROP;
1899                         break;
1900                 case AT_SetNotNull:             /* ALTER COLUMN SET NOT NULL */
1901                         ATSimplePermissions(rel, false);
1902                         ATSimpleRecursion(wqueue, rel, cmd, recurse);
1903                         /* No command-specific prep needed */
1904                         pass = AT_PASS_ADD_CONSTR;
1905                         break;
1906                 case AT_SetStatistics:  /* ALTER COLUMN STATISTICS */
1907                         ATSimpleRecursion(wqueue, rel, cmd, recurse);
1908                         /* Performs own permission checks */
1909                         ATPrepSetStatistics(rel, cmd->name, cmd->def);
1910                         pass = AT_PASS_COL_ATTRS;
1911                         break;
1912                 case AT_SetStorage:             /* ALTER COLUMN STORAGE */
1913                         ATSimplePermissions(rel, false);
1914                         ATSimpleRecursion(wqueue, rel, cmd, recurse);
1915                         /* No command-specific prep needed */
1916                         pass = AT_PASS_COL_ATTRS;
1917                         break;
1918                 case AT_DropColumn:             /* DROP COLUMN */
1919                         ATSimplePermissions(rel, false);
1920                         /* Recursion occurs during execution phase */
1921                         /* No command-specific prep needed except saving recurse flag */
1922                         if (recurse)
1923                                 cmd->subtype = AT_DropColumnRecurse;
1924                         pass = AT_PASS_DROP;
1925                         break;
1926                 case AT_AddIndex:               /* ADD INDEX */
1927                         ATSimplePermissions(rel, false);
1928                         /* This command never recurses */
1929                         /* No command-specific prep needed */
1930                         pass = AT_PASS_ADD_INDEX;
1931                         break;
1932                 case AT_AddConstraint:  /* ADD CONSTRAINT */
1933                         ATSimplePermissions(rel, false);
1934
1935                         /*
1936                          * Currently we recurse only for CHECK constraints, never for
1937                          * foreign-key constraints.  UNIQUE/PKEY constraints won't be seen
1938                          * here.
1939                          */
1940                         if (IsA(cmd->def, Constraint))
1941                                 ATSimpleRecursion(wqueue, rel, cmd, recurse);
1942                         /* No command-specific prep needed */
1943                         pass = AT_PASS_ADD_CONSTR;
1944                         break;
1945                 case AT_DropConstraint: /* DROP CONSTRAINT */
1946                         ATSimplePermissions(rel, false);
1947                         /* Performs own recursion */
1948                         ATPrepDropConstraint(wqueue, rel, recurse, cmd);
1949                         pass = AT_PASS_DROP;
1950                         break;
1951                 case AT_DropConstraintQuietly:  /* DROP CONSTRAINT for child */
1952                         ATSimplePermissions(rel, false);
1953                         ATSimpleRecursion(wqueue, rel, cmd, recurse);
1954                         /* No command-specific prep needed */
1955                         pass = AT_PASS_DROP;
1956                         break;
1957                 case AT_AlterColumnType:                /* ALTER COLUMN TYPE */
1958                         ATSimplePermissions(rel, false);
1959                         /* Performs own recursion */
1960                         ATPrepAlterColumnType(wqueue, tab, rel, recurse, recursing, cmd);
1961                         pass = AT_PASS_ALTER_TYPE;
1962                         break;
1963                 case AT_ToastTable:             /* CREATE TOAST TABLE */
1964                         ATSimplePermissions(rel, false);
1965                         /* This command never recurses */
1966                         /* No command-specific prep needed */
1967                         pass = AT_PASS_MISC;
1968                         break;
1969                 case AT_ChangeOwner:    /* ALTER OWNER */
1970                         /* This command never recurses */
1971                         /* No command-specific prep needed */
1972                         pass = AT_PASS_MISC;
1973                         break;
1974                 case AT_ClusterOn:              /* CLUSTER ON */
1975                 case AT_DropCluster:    /* SET WITHOUT CLUSTER */
1976                         ATSimplePermissions(rel, false);
1977                         /* These commands never recurse */
1978                         /* No command-specific prep needed */
1979                         pass = AT_PASS_MISC;
1980                         break;
1981                 case AT_DropOids:               /* SET WITHOUT OIDS */
1982                         ATSimplePermissions(rel, false);
1983                         /* Performs own recursion */
1984                         if (rel->rd_rel->relhasoids)
1985                         {
1986                                 AlterTableCmd *dropCmd = makeNode(AlterTableCmd);
1987
1988                                 dropCmd->subtype = AT_DropColumn;
1989                                 dropCmd->name = pstrdup("oid");
1990                                 dropCmd->behavior = cmd->behavior;
1991                                 ATPrepCmd(wqueue, rel, dropCmd, recurse, false);
1992                         }
1993                         pass = AT_PASS_DROP;
1994                         break;
1995                 case AT_SetTableSpace:  /* SET TABLESPACE */
1996                         /* This command never recurses */
1997                         ATPrepSetTableSpace(tab, rel, cmd->name);
1998                         pass = AT_PASS_MISC;    /* doesn't actually matter */
1999                         break;
2000                 case AT_EnableTrig:             /* ENABLE TRIGGER variants */
2001                 case AT_EnableTrigAll:
2002                 case AT_EnableTrigUser:
2003                 case AT_DisableTrig:    /* DISABLE TRIGGER variants */
2004                 case AT_DisableTrigAll:
2005                 case AT_DisableTrigUser:
2006                         ATSimplePermissions(rel, false);
2007                         /* These commands never recurse */
2008                         /* No command-specific prep needed */
2009                         pass = AT_PASS_MISC;
2010                         break;
2011                 default:                                /* oops */
2012                         elog(ERROR, "unrecognized alter table type: %d",
2013                                  (int) cmd->subtype);
2014                         pass = 0;                       /* keep compiler quiet */
2015                         break;
2016         }
2017
2018         /* Add the subcommand to the appropriate list for phase 2 */
2019         tab->subcmds[pass] = lappend(tab->subcmds[pass], cmd);
2020 }
2021
2022 /*
2023  * ATRewriteCatalogs
2024  *
2025  * Traffic cop for ALTER TABLE Phase 2 operations.      Subcommands are
2026  * dispatched in a "safe" execution order (designed to avoid unnecessary
2027  * conflicts).
2028  */
2029 static void
2030 ATRewriteCatalogs(List **wqueue)
2031 {
2032         int                     pass;
2033         ListCell   *ltab;
2034
2035         /*
2036          * We process all the tables "in parallel", one pass at a time.  This is
2037          * needed because we may have to propagate work from one table to another
2038          * (specifically, ALTER TYPE on a foreign key's PK has to dispatch the
2039          * re-adding of the foreign key constraint to the other table).  Work can
2040          * only be propagated into later passes, however.
2041          */
2042         for (pass = 0; pass < AT_NUM_PASSES; pass++)
2043         {
2044                 /* Go through each table that needs to be processed */
2045                 foreach(ltab, *wqueue)
2046                 {
2047                         AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
2048                         List       *subcmds = tab->subcmds[pass];
2049                         Relation        rel;
2050                         ListCell   *lcmd;
2051
2052                         if (subcmds == NIL)
2053                                 continue;
2054
2055                         /*
2056                          * Exclusive lock was obtained by phase 1, needn't get it again
2057                          */
2058                         rel = relation_open(tab->relid, NoLock);
2059
2060                         foreach(lcmd, subcmds)
2061                                 ATExecCmd(tab, rel, (AlterTableCmd *) lfirst(lcmd));
2062
2063                         /*
2064                          * After the ALTER TYPE pass, do cleanup work (this is not done in
2065                          * ATExecAlterColumnType since it should be done only once if
2066                          * multiple columns of a table are altered).
2067                          */
2068                         if (pass == AT_PASS_ALTER_TYPE)
2069                                 ATPostAlterTypeCleanup(wqueue, tab);
2070
2071                         relation_close(rel, NoLock);
2072                 }
2073         }
2074
2075         /*
2076          * Do an implicit CREATE TOAST TABLE if we executed any subcommands that
2077          * might have added a column or changed column storage.
2078          */
2079         foreach(ltab, *wqueue)
2080         {
2081                 AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
2082
2083                 if (tab->relkind == RELKIND_RELATION &&
2084                         (tab->subcmds[AT_PASS_ADD_COL] ||
2085                          tab->subcmds[AT_PASS_ALTER_TYPE] ||
2086                          tab->subcmds[AT_PASS_COL_ATTRS]))
2087                         AlterTableCreateToastTable(tab->relid, true);
2088         }
2089 }
2090
2091 /*
2092  * ATExecCmd: dispatch a subcommand to appropriate execution routine
2093  */
2094 static void
2095 ATExecCmd(AlteredTableInfo *tab, Relation rel, AlterTableCmd *cmd)
2096 {
2097         switch (cmd->subtype)
2098         {
2099                 case AT_AddColumn:              /* ADD COLUMN */
2100                         ATExecAddColumn(tab, rel, (ColumnDef *) cmd->def);
2101                         break;
2102                 case AT_ColumnDefault:  /* ALTER COLUMN DEFAULT */
2103                         ATExecColumnDefault(rel, cmd->name, cmd->def);
2104                         break;
2105                 case AT_DropNotNull:    /* ALTER COLUMN DROP NOT NULL */
2106                         ATExecDropNotNull(rel, cmd->name);
2107                         break;
2108                 case AT_SetNotNull:             /* ALTER COLUMN SET NOT NULL */
2109                         ATExecSetNotNull(tab, rel, cmd->name);
2110                         break;
2111                 case AT_SetStatistics:  /* ALTER COLUMN STATISTICS */
2112                         ATExecSetStatistics(rel, cmd->name, cmd->def);
2113                         break;
2114                 case AT_SetStorage:             /* ALTER COLUMN STORAGE */
2115                         ATExecSetStorage(rel, cmd->name, cmd->def);
2116                         break;
2117                 case AT_DropColumn:             /* DROP COLUMN */
2118                         ATExecDropColumn(rel, cmd->name, cmd->behavior, false, false);
2119                         break;
2120                 case AT_DropColumnRecurse:              /* DROP COLUMN with recursion */
2121                         ATExecDropColumn(rel, cmd->name, cmd->behavior, true, false);
2122                         break;
2123                 case AT_AddIndex:               /* ADD INDEX */
2124                         ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, false);
2125                         break;
2126                 case AT_ReAddIndex:             /* ADD INDEX */
2127                         ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, true);
2128                         break;
2129                 case AT_AddConstraint:  /* ADD CONSTRAINT */
2130                         ATExecAddConstraint(tab, rel, cmd->def);
2131                         break;
2132                 case AT_DropConstraint: /* DROP CONSTRAINT */
2133                         ATExecDropConstraint(rel, cmd->name, cmd->behavior, false);
2134                         break;
2135                 case AT_DropConstraintQuietly:  /* DROP CONSTRAINT for child */
2136                         ATExecDropConstraint(rel, cmd->name, cmd->behavior, true);
2137                         break;
2138                 case AT_AlterColumnType:                /* ALTER COLUMN TYPE */
2139                         ATExecAlterColumnType(tab, rel, cmd->name, (TypeName *) cmd->def);
2140                         break;
2141                 case AT_ToastTable:             /* CREATE TOAST TABLE */
2142                         AlterTableCreateToastTable(RelationGetRelid(rel), false);
2143                         break;
2144                 case AT_ChangeOwner:    /* ALTER OWNER */
2145                         ATExecChangeOwner(RelationGetRelid(rel),
2146                                                           get_roleid_checked(cmd->name),
2147                                                           false);
2148                         break;
2149                 case AT_ClusterOn:              /* CLUSTER ON */
2150                         ATExecClusterOn(rel, cmd->name);
2151                         break;
2152                 case AT_DropCluster:    /* SET WITHOUT CLUSTER */
2153                         ATExecDropCluster(rel);
2154                         break;
2155                 case AT_DropOids:               /* SET WITHOUT OIDS */
2156
2157                         /*
2158                          * Nothing to do here; we'll have generated a DropColumn
2159                          * subcommand to do the real work
2160                          */
2161                         break;
2162                 case AT_SetTableSpace:  /* SET TABLESPACE */
2163
2164                         /*
2165                          * Nothing to do here; Phase 3 does the work
2166                          */
2167                         break;
2168                 case AT_EnableTrig:             /* ENABLE TRIGGER name */
2169                         ATExecEnableDisableTrigger(rel, cmd->name, true, false);
2170                         break;
2171                 case AT_DisableTrig:    /* DISABLE TRIGGER name */
2172                         ATExecEnableDisableTrigger(rel, cmd->name, false, false);
2173                         break;
2174                 case AT_EnableTrigAll:  /* ENABLE TRIGGER ALL */
2175                         ATExecEnableDisableTrigger(rel, NULL, true, false);
2176                         break;
2177                 case AT_DisableTrigAll: /* DISABLE TRIGGER ALL */
2178                         ATExecEnableDisableTrigger(rel, NULL, false, false);
2179                         break;
2180                 case AT_EnableTrigUser: /* ENABLE TRIGGER USER */
2181                         ATExecEnableDisableTrigger(rel, NULL, true, true);
2182                         break;
2183                 case AT_DisableTrigUser:                /* DISABLE TRIGGER USER */
2184                         ATExecEnableDisableTrigger(rel, NULL, false, true);
2185                         break;
2186                 default:                                /* oops */
2187                         elog(ERROR, "unrecognized alter table type: %d",
2188                                  (int) cmd->subtype);
2189                         break;
2190         }
2191
2192         /*
2193          * Bump the command counter to ensure the next subcommand in the sequence
2194          * can see the changes so far
2195          */
2196         CommandCounterIncrement();
2197 }
2198
2199 /*
2200  * ATRewriteTables: ALTER TABLE phase 3
2201  */
2202 static void
2203 ATRewriteTables(List **wqueue)
2204 {
2205         ListCell   *ltab;
2206
2207         /* Go through each table that needs to be checked or rewritten */
2208         foreach(ltab, *wqueue)
2209         {
2210                 AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
2211
2212                 /*
2213                  * We only need to rewrite the table if at least one column needs to
2214                  * be recomputed.
2215                  */
2216                 if (tab->newvals != NIL)
2217                 {
2218                         /* Build a temporary relation and copy data */
2219                         Oid                     OIDNewHeap;
2220                         char            NewHeapName[NAMEDATALEN];
2221                         Oid                     NewTableSpace;
2222                         Relation        OldHeap;
2223                         ObjectAddress object;
2224
2225                         OldHeap = heap_open(tab->relid, NoLock);
2226
2227                         /*
2228                          * We can never allow rewriting of shared or nailed-in-cache
2229                          * relations, because we can't support changing their relfilenode
2230                          * values.
2231                          */
2232                         if (OldHeap->rd_rel->relisshared || OldHeap->rd_isnailed)
2233                                 ereport(ERROR,
2234                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2235                                                  errmsg("cannot rewrite system relation \"%s\"",
2236                                                                 RelationGetRelationName(OldHeap))));
2237
2238                         /*
2239                          * Don't allow rewrite on temp tables of other backends ... their
2240                          * local buffer manager is not going to cope.
2241                          */
2242                         if (isOtherTempNamespace(RelationGetNamespace(OldHeap)))
2243                                 ereport(ERROR,
2244                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2245                                 errmsg("cannot rewrite temporary tables of other sessions")));
2246
2247                         /*
2248                          * Select destination tablespace (same as original unless user
2249                          * requested a change)
2250                          */
2251                         if (tab->newTableSpace)
2252                                 NewTableSpace = tab->newTableSpace;
2253                         else
2254                                 NewTableSpace = OldHeap->rd_rel->reltablespace;
2255
2256                         heap_close(OldHeap, NoLock);
2257
2258                         /*
2259                          * Create the new heap, using a temporary name in the same
2260                          * namespace as the existing table.  NOTE: there is some risk of
2261                          * collision with user relnames.  Working around this seems more
2262                          * trouble than it's worth; in particular, we can't create the new
2263                          * heap in a different namespace from the old, or we will have
2264                          * problems with the TEMP status of temp tables.
2265                          */
2266                         snprintf(NewHeapName, sizeof(NewHeapName),
2267                                          "pg_temp_%u", tab->relid);
2268
2269                         OIDNewHeap = make_new_heap(tab->relid, NewHeapName, NewTableSpace);
2270
2271                         /*
2272                          * Copy the heap data into the new table with the desired
2273                          * modifications, and test the current data within the table
2274                          * against new constraints generated by ALTER TABLE commands.
2275                          */
2276                         ATRewriteTable(tab, OIDNewHeap);
2277
2278                         /* Swap the physical files of the old and new heaps. */
2279                         swap_relation_files(tab->relid, OIDNewHeap);
2280
2281                         CommandCounterIncrement();
2282
2283                         /* Destroy new heap with old filenode */
2284                         object.classId = RelationRelationId;
2285                         object.objectId = OIDNewHeap;
2286                         object.objectSubId = 0;
2287
2288                         /*
2289                          * The new relation is local to our transaction and we know
2290                          * nothing depends on it, so DROP_RESTRICT should be OK.
2291                          */
2292                         performDeletion(&object, DROP_RESTRICT);
2293                         /* performDeletion does CommandCounterIncrement at end */
2294
2295                         /*
2296                          * Rebuild each index on the relation (but not the toast table,
2297                          * which is all-new anyway).  We do not need
2298                          * CommandCounterIncrement() because reindex_relation does it.
2299                          */
2300                         reindex_relation(tab->relid, false);
2301                 }
2302                 else
2303                 {
2304                         /*
2305                          * Test the current data within the table against new constraints
2306                          * generated by ALTER TABLE commands, but don't rebuild data.
2307                          */
2308                         if (tab->constraints != NIL)
2309                                 ATRewriteTable(tab, InvalidOid);
2310
2311                         /*
2312                          * If we had SET TABLESPACE but no reason to reconstruct tuples,
2313                          * just do a block-by-block copy.
2314                          */
2315                         if (tab->newTableSpace)
2316                                 ATExecSetTableSpace(tab->relid, tab->newTableSpace);
2317                 }
2318         }
2319
2320         /*
2321          * Foreign key constraints are checked in a final pass, since (a) it's
2322          * generally best to examine each one separately, and (b) it's at least
2323          * theoretically possible that we have changed both relations of the
2324          * foreign key, and we'd better have finished both rewrites before we try
2325          * to read the tables.
2326          */
2327         foreach(ltab, *wqueue)
2328         {
2329                 AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
2330                 Relation        rel = NULL;
2331                 ListCell   *lcon;
2332
2333                 foreach(lcon, tab->constraints)
2334                 {
2335                         NewConstraint *con = lfirst(lcon);
2336
2337                         if (con->contype == CONSTR_FOREIGN)
2338                         {
2339                                 FkConstraint *fkconstraint = (FkConstraint *) con->qual;
2340                                 Relation        refrel;
2341
2342                                 if (rel == NULL)
2343                                 {
2344                                         /* Long since locked, no need for another */
2345                                         rel = heap_open(tab->relid, NoLock);
2346                                 }
2347
2348                                 refrel = heap_open(con->refrelid, RowShareLock);
2349
2350                                 validateForeignKeyConstraint(fkconstraint, rel, refrel);
2351
2352                                 heap_close(refrel, NoLock);
2353                         }
2354                 }
2355
2356                 if (rel)
2357                         heap_close(rel, NoLock);
2358         }
2359 }
2360
2361 /*
2362  * ATRewriteTable: scan or rewrite one table
2363  *
2364  * OIDNewHeap is InvalidOid if we don't need to rewrite
2365  */
2366 static void
2367 ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap)
2368 {
2369         Relation        oldrel;
2370         Relation        newrel;
2371         TupleDesc       oldTupDesc;
2372         TupleDesc       newTupDesc;
2373         bool            needscan = false;
2374         int                     i;
2375         ListCell   *l;
2376         EState     *estate;
2377
2378         /*
2379          * Open the relation(s).  We have surely already locked the existing
2380          * table.
2381          */
2382         oldrel = heap_open(tab->relid, NoLock);
2383         oldTupDesc = tab->oldDesc;
2384         newTupDesc = RelationGetDescr(oldrel);          /* includes all mods */
2385
2386         if (OidIsValid(OIDNewHeap))
2387                 newrel = heap_open(OIDNewHeap, AccessExclusiveLock);
2388         else
2389                 newrel = NULL;
2390
2391         /*
2392          * If we need to rewrite the table, the operation has to be propagated to
2393          * tables that use this table's rowtype as a column type.
2394          *
2395          * (Eventually this will probably become true for scans as well, but at
2396          * the moment a composite type does not enforce any constraints, so it's
2397          * not necessary/appropriate to enforce them just during ALTER.)
2398          */
2399         if (newrel)
2400                 find_composite_type_dependencies(oldrel->rd_rel->reltype,
2401                                                                                  RelationGetRelationName(oldrel));
2402
2403         /*
2404          * Generate the constraint and default execution states
2405          */
2406
2407         estate = CreateExecutorState();
2408
2409         /* Build the needed expression execution states */
2410         foreach(l, tab->constraints)
2411         {
2412                 NewConstraint *con = lfirst(l);
2413
2414                 switch (con->contype)
2415                 {
2416                         case CONSTR_CHECK:
2417                                 needscan = true;
2418                                 con->qualstate = (List *)
2419                                         ExecPrepareExpr((Expr *) con->qual, estate);
2420                                 break;
2421                         case CONSTR_FOREIGN:
2422                                 /* Nothing to do here */
2423                                 break;
2424                         case CONSTR_NOTNULL:
2425                                 needscan = true;
2426                                 break;
2427                         default:
2428                                 elog(ERROR, "unrecognized constraint type: %d",
2429                                          (int) con->contype);
2430                 }
2431         }
2432
2433         foreach(l, tab->newvals)
2434         {
2435                 NewColumnValue *ex = lfirst(l);
2436
2437                 needscan = true;
2438
2439                 ex->exprstate = ExecPrepareExpr((Expr *) ex->expr, estate);
2440         }
2441
2442         if (needscan)
2443         {
2444                 ExprContext *econtext;
2445                 Datum      *values;
2446                 bool       *isnull;
2447                 TupleTableSlot *oldslot;
2448                 TupleTableSlot *newslot;
2449                 HeapScanDesc scan;
2450                 HeapTuple       tuple;
2451                 MemoryContext oldCxt;
2452                 List       *dropped_attrs = NIL;
2453                 ListCell   *lc;
2454
2455                 econtext = GetPerTupleExprContext(estate);
2456
2457                 /*
2458                  * Make tuple slots for old and new tuples.  Note that even when the
2459                  * tuples are the same, the tupDescs might not be (consider ADD COLUMN
2460                  * without a default).
2461                  */
2462                 oldslot = MakeSingleTupleTableSlot(oldTupDesc);
2463                 newslot = MakeSingleTupleTableSlot(newTupDesc);
2464
2465                 /* Preallocate values/isnull arrays */
2466                 i = Max(newTupDesc->natts, oldTupDesc->natts);
2467                 values = (Datum *) palloc(i * sizeof(Datum));
2468                 isnull = (bool *) palloc(i * sizeof(bool));
2469                 memset(values, 0, i * sizeof(Datum));
2470                 memset(isnull, true, i * sizeof(bool));
2471
2472                 /*
2473                  * Any attributes that are dropped according to the new tuple
2474                  * descriptor can be set to NULL. We precompute the list of dropped
2475                  * attributes to avoid needing to do so in the per-tuple loop.
2476                  */
2477                 for (i = 0; i < newTupDesc->natts; i++)
2478                 {
2479                         if (newTupDesc->attrs[i]->attisdropped)
2480                                 dropped_attrs = lappend_int(dropped_attrs, i);
2481                 }
2482
2483                 /*
2484                  * Scan through the rows, generating a new row if needed and then
2485                  * checking all the constraints.
2486                  */
2487                 scan = heap_beginscan(oldrel, SnapshotNow, 0, NULL);
2488
2489                 /*
2490                  * Switch to per-tuple memory context and reset it for each tuple
2491                  * produced, so we don't leak memory.
2492                  */
2493                 oldCxt = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2494
2495                 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
2496                 {
2497                         if (newrel)
2498                         {
2499                                 Oid                     tupOid = InvalidOid;
2500
2501                                 /* Extract data from old tuple */
2502                                 heap_deform_tuple(tuple, oldTupDesc, values, isnull);
2503                                 if (oldTupDesc->tdhasoid)
2504                                         tupOid = HeapTupleGetOid(tuple);
2505
2506                                 /* Set dropped attributes to null in new tuple */
2507                                 foreach(lc, dropped_attrs)
2508                                         isnull[lfirst_int(lc)] = true;
2509
2510                                 /*
2511                                  * Process supplied expressions to replace selected columns.
2512                                  * Expression inputs come from the old tuple.
2513                                  */
2514                                 ExecStoreTuple(tuple, oldslot, InvalidBuffer, false);
2515                                 econtext->ecxt_scantuple = oldslot;
2516
2517                                 foreach(l, tab->newvals)
2518                                 {
2519                                         NewColumnValue *ex = lfirst(l);
2520
2521                                         values[ex->attnum - 1] = ExecEvalExpr(ex->exprstate,
2522                                                                                                                   econtext,
2523                                                                                                          &isnull[ex->attnum - 1],
2524                                                                                                                   NULL);
2525                                 }
2526
2527                                 /*
2528                                  * Form the new tuple. Note that we don't explicitly pfree it,
2529                                  * since the per-tuple memory context will be reset shortly.
2530                                  */
2531                                 tuple = heap_form_tuple(newTupDesc, values, isnull);
2532
2533                                 /* Preserve OID, if any */
2534                                 if (newTupDesc->tdhasoid)
2535                                         HeapTupleSetOid(tuple, tupOid);
2536                         }
2537
2538                         /* Now check any constraints on the possibly-changed tuple */
2539                         ExecStoreTuple(tuple, newslot, InvalidBuffer, false);
2540                         econtext->ecxt_scantuple = newslot;
2541
2542                         foreach(l, tab->constraints)
2543                         {
2544                                 NewConstraint *con = lfirst(l);
2545
2546                                 switch (con->contype)
2547                                 {
2548                                         case CONSTR_CHECK:
2549                                                 if (!ExecQual(con->qualstate, econtext, true))
2550                                                         ereport(ERROR,
2551                                                                         (errcode(ERRCODE_CHECK_VIOLATION),
2552                                                                          errmsg("check constraint \"%s\" is violated by some row",
2553                                                                                         con->name)));
2554                                                 break;
2555                                         case CONSTR_NOTNULL:
2556                                                 {
2557                                                         Datum           d;
2558                                                         bool            isnull;
2559
2560                                                         d = heap_getattr(tuple, con->attnum, newTupDesc,
2561                                                                                          &isnull);
2562                                                         if (isnull)
2563                                                                 ereport(ERROR,
2564                                                                                 (errcode(ERRCODE_NOT_NULL_VIOLATION),
2565                                                                  errmsg("column \"%s\" contains null values",
2566                                                                                 get_attname(tab->relid,
2567                                                                                                         con->attnum))));
2568                                                 }
2569                                                 break;
2570                                         case CONSTR_FOREIGN:
2571                                                 /* Nothing to do here */
2572                                                 break;
2573                                         default:
2574                                                 elog(ERROR, "unrecognized constraint type: %d",
2575                                                          (int) con->contype);
2576                                 }
2577                         }
2578
2579                         /* Write the tuple out to the new relation */
2580                         if (newrel)
2581                                 simple_heap_insert(newrel, tuple);
2582
2583                         ResetExprContext(econtext);
2584
2585                         CHECK_FOR_INTERRUPTS();
2586                 }
2587
2588                 MemoryContextSwitchTo(oldCxt);
2589                 heap_endscan(scan);
2590         }
2591
2592         FreeExecutorState(estate);
2593
2594         heap_close(oldrel, NoLock);
2595         if (newrel)
2596                 heap_close(newrel, NoLock);
2597 }
2598
2599 /*
2600  * ATGetQueueEntry: find or create an entry in the ALTER TABLE work queue
2601  */
2602 static AlteredTableInfo *
2603 ATGetQueueEntry(List **wqueue, Relation rel)
2604 {
2605         Oid                     relid = RelationGetRelid(rel);
2606         AlteredTableInfo *tab;
2607         ListCell   *ltab;
2608
2609         foreach(ltab, *wqueue)
2610         {
2611                 tab = (AlteredTableInfo *) lfirst(ltab);
2612                 if (tab->relid == relid)
2613                         return tab;
2614         }
2615
2616         /*
2617          * Not there, so add it.  Note that we make a copy of the relation's
2618          * existing descriptor before anything interesting can happen to it.
2619          */
2620         tab = (AlteredTableInfo *) palloc0(sizeof(AlteredTableInfo));
2621         tab->relid = relid;
2622         tab->relkind = rel->rd_rel->relkind;
2623         tab->oldDesc = CreateTupleDescCopy(RelationGetDescr(rel));
2624
2625         *wqueue = lappend(*wqueue, tab);
2626
2627         return tab;
2628 }
2629
2630 /*
2631  * ATSimplePermissions
2632  *
2633  * - Ensure that it is a relation (or possibly a view)
2634  * - Ensure this user is the owner
2635  * - Ensure that it is not a system table
2636  */
2637 static void
2638 ATSimplePermissions(Relation rel, bool allowView)
2639 {
2640         if (rel->rd_rel->relkind != RELKIND_RELATION)
2641         {
2642                 if (allowView)
2643                 {
2644                         if (rel->rd_rel->relkind != RELKIND_VIEW)
2645                                 ereport(ERROR,
2646                                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2647                                                  errmsg("\"%s\" is not a table or view",
2648                                                                 RelationGetRelationName(rel))));
2649                 }
2650                 else
2651                         ereport(ERROR,
2652                                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2653                                          errmsg("\"%s\" is not a table",
2654                                                         RelationGetRelationName(rel))));
2655         }
2656
2657         /* Permissions checks */
2658         if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
2659                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
2660                                            RelationGetRelationName(rel));
2661
2662         if (!allowSystemTableMods && IsSystemRelation(rel))
2663                 ereport(ERROR,
2664                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2665                                  errmsg("permission denied: \"%s\" is a system catalog",
2666                                                 RelationGetRelationName(rel))));
2667 }
2668
2669 /*
2670  * ATSimpleRecursion
2671  *
2672  * Simple table recursion sufficient for most ALTER TABLE operations.
2673  * All direct and indirect children are processed in an unspecified order.
2674  * Note that if a child inherits from the original table via multiple
2675  * inheritance paths, it will be visited just once.
2676  */
2677 static void
2678 ATSimpleRecursion(List **wqueue, Relation rel,
2679                                   AlterTableCmd *cmd, bool recurse)
2680 {
2681         /*
2682          * Propagate to children if desired.  Non-table relations never have
2683          * children, so no need to search in that case.
2684          */
2685         if (recurse && rel->rd_rel->relkind == RELKIND_RELATION)
2686         {
2687                 Oid                     relid = RelationGetRelid(rel);
2688                 ListCell   *child;
2689                 List       *children;
2690
2691                 /* this routine is actually in the planner */
2692                 children = find_all_inheritors(relid);
2693
2694                 /*
2695                  * find_all_inheritors does the recursive search of the inheritance
2696                  * hierarchy, so all we have to do is process all of the relids in the
2697                  * list that it returns.
2698                  */
2699                 foreach(child, children)
2700                 {
2701                         Oid                     childrelid = lfirst_oid(child);
2702                         Relation        childrel;
2703
2704                         if (childrelid == relid)
2705                                 continue;
2706                         childrel = relation_open(childrelid, AccessExclusiveLock);
2707                         ATPrepCmd(wqueue, childrel, cmd, false, true);
2708                         relation_close(childrel, NoLock);
2709                 }
2710         }
2711 }
2712
2713 /*
2714  * ATOneLevelRecursion
2715  *
2716  * Here, we visit only direct inheritance children.  It is expected that
2717  * the command's prep routine will recurse again to find indirect children.
2718  * When using this technique, a multiply-inheriting child will be visited
2719  * multiple times.
2720  */
2721 static void
2722 ATOneLevelRecursion(List **wqueue, Relation rel,
2723                                         AlterTableCmd *cmd)
2724 {
2725         Oid                     relid = RelationGetRelid(rel);
2726         ListCell   *child;
2727         List       *children;
2728
2729         /* this routine is actually in the planner */
2730         children = find_inheritance_children(relid);
2731
2732         foreach(child, children)
2733         {
2734                 Oid                     childrelid = lfirst_oid(child);
2735                 Relation        childrel;
2736
2737                 childrel = relation_open(childrelid, AccessExclusiveLock);
2738                 ATPrepCmd(wqueue, childrel, cmd, true, true);
2739                 relation_close(childrel, NoLock);
2740         }
2741 }
2742
2743
2744 /*
2745  * find_composite_type_dependencies
2746  *
2747  * Check to see if a table's rowtype is being used as a column in some
2748  * other table (possibly nested several levels deep in composite types!).
2749  * Eventually, we'd like to propagate the check or rewrite operation
2750  * into other such tables, but for now, just error out if we find any.
2751  *
2752  * We assume that functions and views depending on the type are not reasons
2753  * to reject the ALTER.  (How safe is this really?)
2754  */
2755 static void
2756 find_composite_type_dependencies(Oid typeOid, const char *origTblName)
2757 {
2758         Relation        depRel;
2759         ScanKeyData key[2];
2760         SysScanDesc depScan;
2761         HeapTuple       depTup;
2762
2763         /*
2764          * We scan pg_depend to find those things that depend on the rowtype. (We
2765          * assume we can ignore refobjsubid for a rowtype.)
2766          */
2767         depRel = heap_open(DependRelationId, AccessShareLock);
2768
2769         ScanKeyInit(&key[0],
2770                                 Anum_pg_depend_refclassid,
2771                                 BTEqualStrategyNumber, F_OIDEQ,
2772                                 ObjectIdGetDatum(TypeRelationId));
2773         ScanKeyInit(&key[1],
2774                                 Anum_pg_depend_refobjid,
2775                                 BTEqualStrategyNumber, F_OIDEQ,
2776                                 ObjectIdGetDatum(typeOid));
2777
2778         depScan = systable_beginscan(depRel, DependReferenceIndexId, true,
2779                                                                  SnapshotNow, 2, key);
2780
2781         while (HeapTupleIsValid(depTup = systable_getnext(depScan)))
2782         {
2783                 Form_pg_depend pg_depend = (Form_pg_depend) GETSTRUCT(depTup);
2784                 Relation        rel;
2785                 Form_pg_attribute att;
2786
2787                 /* Ignore dependees that aren't user columns of relations */
2788                 /* (we assume system columns are never of rowtypes) */
2789                 if (pg_depend->classid != RelationRelationId ||
2790                         pg_depend->objsubid <= 0)
2791                         continue;
2792
2793                 rel = relation_open(pg_depend->objid, AccessShareLock);
2794                 att = rel->rd_att->attrs[pg_depend->objsubid - 1];
2795
2796                 if (rel->rd_rel->relkind == RELKIND_RELATION)
2797                 {
2798                         ereport(ERROR,
2799                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2800                                          errmsg("cannot alter table \"%s\" because column \"%s\".\"%s\" uses its rowtype",
2801                                                         origTblName,
2802                                                         RelationGetRelationName(rel),
2803                                                         NameStr(att->attname))));
2804                 }
2805                 else if (OidIsValid(rel->rd_rel->reltype))
2806                 {
2807                         /*
2808                          * A view or composite type itself isn't a problem, but we must
2809                          * recursively check for indirect dependencies via its rowtype.
2810                          */
2811                         find_composite_type_dependencies(rel->rd_rel->reltype,
2812                                                                                          origTblName);
2813                 }
2814
2815                 relation_close(rel, AccessShareLock);
2816         }
2817
2818         systable_endscan(depScan);
2819
2820         relation_close(depRel, AccessShareLock);
2821 }
2822
2823
2824 /*
2825  * ALTER TABLE ADD COLUMN
2826  *
2827  * Adds an additional attribute to a relation making the assumption that
2828  * CHECK, NOT NULL, and FOREIGN KEY constraints will be removed from the
2829  * AT_AddColumn AlterTableCmd by analyze.c and added as independent
2830  * AlterTableCmd's.
2831  */
2832 static void
2833 ATPrepAddColumn(List **wqueue, Relation rel, bool recurse,
2834                                 AlterTableCmd *cmd)
2835 {
2836         /*
2837          * Recurse to add the column to child classes, if requested.
2838          *
2839          * We must recurse one level at a time, so that multiply-inheriting
2840          * children are visited the right number of times and end up with the
2841          * right attinhcount.
2842          */
2843         if (recurse)
2844         {
2845                 AlterTableCmd *childCmd = copyObject(cmd);
2846                 ColumnDef  *colDefChild = (ColumnDef *) childCmd->def;
2847
2848                 /* Child should see column as singly inherited */
2849                 colDefChild->inhcount = 1;
2850                 colDefChild->is_local = false;
2851                 /* and don't make a support dependency on the child */
2852                 colDefChild->support = NULL;
2853
2854                 ATOneLevelRecursion(wqueue, rel, childCmd);
2855         }
2856         else
2857         {
2858                 /*
2859                  * If we are told not to recurse, there had better not be any child
2860                  * tables; else the addition would put them out of step.
2861                  */
2862                 if (find_inheritance_children(RelationGetRelid(rel)) != NIL)
2863                         ereport(ERROR,
2864                                         (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
2865                                          errmsg("column must be added to child tables too")));
2866         }
2867 }
2868
2869 static void
2870 ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
2871                                 ColumnDef *colDef)
2872 {
2873         Oid                     myrelid = RelationGetRelid(rel);
2874         Relation        pgclass,
2875                                 attrdesc;
2876         HeapTuple       reltup;
2877         HeapTuple       attributeTuple;
2878         Form_pg_attribute attribute;
2879         FormData_pg_attribute attributeD;
2880         int                     i;
2881         int                     minattnum,
2882                                 maxatts;
2883         HeapTuple       typeTuple;
2884         Oid                     typeOid;
2885         Form_pg_type tform;
2886         Expr       *defval;
2887
2888         attrdesc = heap_open(AttributeRelationId, RowExclusiveLock);
2889
2890         /*
2891          * Are we adding the column to a recursion child?  If so, check whether to
2892          * merge with an existing definition for the column.
2893          */
2894         if (colDef->inhcount > 0)
2895         {
2896                 HeapTuple       tuple;
2897
2898                 /* Does child already have a column by this name? */
2899                 tuple = SearchSysCacheCopyAttName(myrelid, colDef->colname);
2900                 if (HeapTupleIsValid(tuple))
2901                 {
2902                         Form_pg_attribute childatt = (Form_pg_attribute) GETSTRUCT(tuple);
2903
2904                         /* Okay if child matches by type */
2905                         if (typenameTypeId(colDef->typename) != childatt->atttypid ||
2906                                 colDef->typename->typmod != childatt->atttypmod)
2907                                 ereport(ERROR,
2908                                                 (errcode(ERRCODE_DATATYPE_MISMATCH),
2909                                                  errmsg("child table \"%s\" has different type for column \"%s\"",
2910                                                         RelationGetRelationName(rel), colDef->colname)));
2911
2912                         /* Bump the existing child att's inhcount */
2913                         childatt->attinhcount++;
2914                         simple_heap_update(attrdesc, &tuple->t_self, tuple);
2915                         CatalogUpdateIndexes(attrdesc, tuple);
2916
2917                         heap_freetuple(tuple);
2918
2919                         /* Inform the user about the merge */
2920                         ereport(NOTICE,
2921                           (errmsg("merging definition of column \"%s\" for child \"%s\"",
2922                                           colDef->colname, RelationGetRelationName(rel))));
2923
2924                         heap_close(attrdesc, RowExclusiveLock);
2925                         return;
2926                 }
2927         }
2928
2929         pgclass = heap_open(RelationRelationId, RowExclusiveLock);
2930
2931         reltup = SearchSysCacheCopy(RELOID,
2932                                                                 ObjectIdGetDatum(myrelid),
2933                                                                 0, 0, 0);
2934         if (!HeapTupleIsValid(reltup))
2935                 elog(ERROR, "cache lookup failed for relation %u", myrelid);
2936
2937         /*
2938          * this test is deliberately not attisdropped-aware, since if one tries to
2939          * add a column matching a dropped column name, it's gonna fail anyway.
2940          */
2941         if (SearchSysCacheExists(ATTNAME,
2942                                                          ObjectIdGetDatum(myrelid),
2943                                                          PointerGetDatum(colDef->colname),
2944                                                          0, 0))
2945                 ereport(ERROR,
2946                                 (errcode(ERRCODE_DUPLICATE_COLUMN),
2947                                  errmsg("column \"%s\" of relation \"%s\" already exists",
2948                                                 colDef->colname, RelationGetRelationName(rel))));
2949
2950         minattnum = ((Form_pg_class) GETSTRUCT(reltup))->relnatts;
2951         maxatts = minattnum + 1;
2952         if (maxatts > MaxHeapAttributeNumber)
2953                 ereport(ERROR,
2954                                 (errcode(ERRCODE_TOO_MANY_COLUMNS),
2955                                  errmsg("tables can have at most %d columns",
2956                                                 MaxHeapAttributeNumber)));
2957         i = minattnum + 1;
2958
2959         typeTuple = typenameType(colDef->typename);
2960         tform = (Form_pg_type) GETSTRUCT(typeTuple);
2961         typeOid = HeapTupleGetOid(typeTuple);
2962
2963         /* make sure datatype is legal for a column */
2964         CheckAttributeType(colDef->colname, typeOid);
2965
2966         attributeTuple = heap_addheader(Natts_pg_attribute,
2967                                                                         false,
2968                                                                         ATTRIBUTE_TUPLE_SIZE,
2969                                                                         (void *) &attributeD);
2970
2971         attribute = (Form_pg_attribute) GETSTRUCT(attributeTuple);
2972
2973         attribute->attrelid = myrelid;
2974         namestrcpy(&(attribute->attname), colDef->colname);
2975         attribute->atttypid = typeOid;
2976         attribute->attstattarget = -1;
2977         attribute->attlen = tform->typlen;
2978         attribute->attcacheoff = -1;
2979         attribute->atttypmod = colDef->typename->typmod;
2980         attribute->attnum = i;
2981         attribute->attbyval = tform->typbyval;
2982         attribute->attndims = list_length(colDef->typename->arrayBounds);
2983         attribute->attstorage = tform->typstorage;
2984         attribute->attalign = tform->typalign;
2985         attribute->attnotnull = colDef->is_not_null;
2986         attribute->atthasdef = false;
2987         attribute->attisdropped = false;
2988         attribute->attislocal = colDef->is_local;
2989         attribute->attinhcount = colDef->inhcount;
2990
2991         ReleaseSysCache(typeTuple);
2992
2993         simple_heap_insert(attrdesc, attributeTuple);
2994
2995         /* Update indexes on pg_attribute */
2996         CatalogUpdateIndexes(attrdesc, attributeTuple);
2997
2998         heap_close(attrdesc, RowExclusiveLock);
2999
3000         /*
3001          * Update number of attributes in pg_class tuple
3002          */
3003         ((Form_pg_class) GETSTRUCT(reltup))->relnatts = maxatts;
3004
3005         simple_heap_update(pgclass, &reltup->t_self, reltup);
3006
3007         /* keep catalog indexes current */
3008         CatalogUpdateIndexes(pgclass, reltup);
3009
3010         heap_freetuple(reltup);
3011
3012         heap_close(pgclass, RowExclusiveLock);
3013
3014         /* Make the attribute's catalog entry visible */
3015         CommandCounterIncrement();
3016
3017         /*
3018          * Store the DEFAULT, if any, in the catalogs
3019          */
3020         if (colDef->raw_default)
3021         {
3022                 RawColumnDefault *rawEnt;
3023
3024                 rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault));
3025                 rawEnt->attnum = attribute->attnum;
3026                 rawEnt->raw_default = copyObject(colDef->raw_default);
3027
3028                 /*
3029                  * This function is intended for CREATE TABLE, so it processes a
3030                  * _list_ of defaults, but we just do one.
3031                  */
3032                 AddRelationRawConstraints(rel, list_make1(rawEnt), NIL);
3033
3034                 /* Make the additional catalog changes visible */
3035                 CommandCounterIncrement();
3036         }
3037
3038         /*
3039          * Tell Phase 3 to fill in the default expression, if there is one.
3040          *
3041          * If there is no default, Phase 3 doesn't have to do anything, because
3042          * that effectively means that the default is NULL.  The heap tuple access
3043          * routines always check for attnum > # of attributes in tuple, and return
3044          * NULL if so, so without any modification of the tuple data we will get
3045          * the effect of NULL values in the new column.
3046          *
3047          * An exception occurs when the new column is of a domain type: the domain
3048          * might have a NOT NULL constraint, or a check constraint that indirectly
3049          * rejects nulls.  If there are any domain constraints then we construct
3050          * an explicit NULL default value that will be passed through
3051          * CoerceToDomain processing.  (This is a tad inefficient, since it causes
3052          * rewriting the table which we really don't have to do, but the present
3053          * design of domain processing doesn't offer any simple way of checking
3054          * the constraints more directly.)
3055          *
3056          * Note: we use build_column_default, and not just the cooked default
3057          * returned by AddRelationRawConstraints, so that the right thing happens
3058          * when a datatype's default applies.
3059          */
3060         defval = (Expr *) build_column_default(rel, attribute->attnum);
3061
3062         if (!defval && GetDomainConstraints(typeOid) != NIL)
3063         {
3064                 Oid                     basetype = getBaseType(typeOid);
3065
3066                 defval = (Expr *) makeNullConst(basetype);
3067                 defval = (Expr *) coerce_to_target_type(NULL,
3068                                                                                                 (Node *) defval,
3069                                                                                                 basetype,
3070                                                                                                 typeOid,
3071                                                                                                 colDef->typename->typmod,
3072                                                                                                 COERCION_ASSIGNMENT,
3073                                                                                                 COERCE_IMPLICIT_CAST);
3074                 if (defval == NULL)             /* should not happen */
3075                         elog(ERROR, "failed to coerce base type to domain");
3076         }
3077
3078         if (defval)
3079         {
3080                 NewColumnValue *newval;
3081
3082                 newval = (NewColumnValue *) palloc0(sizeof(NewColumnValue));
3083                 newval->attnum = attribute->attnum;
3084                 newval->expr = defval;
3085
3086                 tab->newvals = lappend(tab->newvals, newval);
3087         }
3088
3089         /*
3090          * Add needed dependency entries for the new column.
3091          */
3092         add_column_datatype_dependency(myrelid, i, attribute->atttypid);
3093         if (colDef->support != NULL)
3094                 add_column_support_dependency(myrelid, i, colDef->support);
3095 }
3096
3097 /*
3098  * Install a column's dependency on its datatype.
3099  */
3100 static void
3101 add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid)
3102 {
3103         ObjectAddress myself,
3104                                 referenced;
3105
3106         myself.classId = RelationRelationId;
3107         myself.objectId = relid;
3108         myself.objectSubId = attnum;
3109         referenced.classId = TypeRelationId;
3110         referenced.objectId = typid;
3111         referenced.objectSubId = 0;
3112         recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
3113 }
3114
3115 /*
3116  * Install a dependency for a column's supporting relation (serial sequence).
3117  */
3118 static void
3119 add_column_support_dependency(Oid relid, int32 attnum, RangeVar *support)
3120 {
3121         ObjectAddress colobject,
3122                                 suppobject;
3123
3124         colobject.classId = RelationRelationId;
3125         colobject.objectId = relid;
3126         colobject.objectSubId = attnum;
3127         suppobject.classId = RelationRelationId;
3128         suppobject.objectId = RangeVarGetRelid(support, false);
3129         suppobject.objectSubId = 0;
3130         recordDependencyOn(&suppobject, &colobject, DEPENDENCY_INTERNAL);
3131 }
3132
3133 /*
3134  * ALTER TABLE ALTER COLUMN DROP NOT NULL
3135  */
3136 static void
3137 ATExecDropNotNull(Relation rel, const char *colName)
3138 {
3139         HeapTuple       tuple;
3140         AttrNumber      attnum;
3141         Relation        attr_rel;
3142         List       *indexoidlist;
3143         ListCell   *indexoidscan;
3144
3145         /*
3146          * lookup the attribute
3147          */
3148         attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
3149
3150         tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
3151
3152         if (!HeapTupleIsValid(tuple))
3153                 ereport(ERROR,
3154                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
3155                                  errmsg("column \"%s\" of relation \"%s\" does not exist",
3156                                                 colName, RelationGetRelationName(rel))));
3157
3158         attnum = ((Form_pg_attribute) GETSTRUCT(tuple))->attnum;
3159
3160         /* Prevent them from altering a system attribute */
3161         if (attnum <= 0)
3162                 ereport(ERROR,
3163                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3164                                  errmsg("cannot alter system column \"%s\"",
3165                                                 colName)));
3166
3167         /*
3168          * Check that the attribute is not in a primary key
3169          */
3170
3171         /* Loop over all indexes on the relation */
3172         indexoidlist = RelationGetIndexList(rel);
3173
3174         foreach(indexoidscan, indexoidlist)
3175         {
3176                 Oid                     indexoid = lfirst_oid(indexoidscan);
3177                 HeapTuple       indexTuple;
3178                 Form_pg_index indexStruct;
3179                 int                     i;
3180
3181                 indexTuple = SearchSysCache(INDEXRELID,
3182                                                                         ObjectIdGetDatum(indexoid),
3183                                                                         0, 0, 0);
3184                 if (!HeapTupleIsValid(indexTuple))
3185                         elog(ERROR, "cache lookup failed for index %u", indexoid);
3186                 indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
3187
3188                 /* If the index is not a primary key, skip the check */
3189                 if (indexStruct->indisprimary)
3190                 {
3191                         /*
3192                          * Loop over each attribute in the primary key and see if it
3193                          * matches the to-be-altered attribute
3194                          */
3195                         for (i = 0; i < indexStruct->indnatts; i++)
3196                         {
3197                                 if (indexStruct->indkey.values[i] == attnum)
3198                                         ereport(ERROR,
3199                                                         (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
3200                                                          errmsg("column \"%s\" is in a primary key",
3201                                                                         colName)));
3202                         }
3203                 }
3204
3205                 ReleaseSysCache(indexTuple);
3206         }
3207
3208         list_free(indexoidlist);
3209
3210         /*
3211          * Okay, actually perform the catalog change ... if needed
3212          */
3213         if (((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull)
3214         {
3215                 ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull = FALSE;
3216
3217                 simple_heap_update(attr_rel, &tuple->t_self, tuple);
3218
3219                 /* keep the system catalog indexes current */
3220                 CatalogUpdateIndexes(attr_rel, tuple);
3221         }
3222
3223         heap_close(attr_rel, RowExclusiveLock);
3224 }
3225
3226 /*
3227  * ALTER TABLE ALTER COLUMN SET NOT NULL
3228  */
3229 static void
3230 ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
3231                                  const char *colName)
3232 {
3233         HeapTuple       tuple;
3234         AttrNumber      attnum;
3235         Relation        attr_rel;
3236         NewConstraint *newcon;
3237
3238         /*
3239          * lookup the attribute
3240          */
3241         attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
3242
3243         tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
3244
3245         if (!HeapTupleIsValid(tuple))
3246                 ereport(ERROR,
3247                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
3248                                  errmsg("column \"%s\" of relation \"%s\" does not exist",
3249                                                 colName, RelationGetRelationName(rel))));
3250
3251         attnum = ((Form_pg_attribute) GETSTRUCT(tuple))->attnum;
3252
3253         /* Prevent them from altering a system attribute */
3254         if (attnum <= 0)
3255                 ereport(ERROR,
3256                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3257                                  errmsg("cannot alter system column \"%s\"",
3258                                                 colName)));
3259
3260         /*
3261          * Okay, actually perform the catalog change ... if needed
3262          */
3263         if (!((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull)
3264         {
3265                 ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull = TRUE;
3266
3267                 simple_heap_update(attr_rel, &tuple->t_self, tuple);
3268
3269                 /* keep the system catalog indexes current */
3270                 CatalogUpdateIndexes(attr_rel, tuple);
3271
3272                 /* Tell Phase 3 to test the constraint */
3273                 newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
3274                 newcon->contype = CONSTR_NOTNULL;
3275                 newcon->attnum = attnum;
3276                 newcon->name = "NOT NULL";
3277
3278                 tab->constraints = lappend(tab->constraints, newcon);
3279         }
3280
3281         heap_close(attr_rel, RowExclusiveLock);
3282 }
3283
3284 /*
3285  * ALTER TABLE ALTER COLUMN SET/DROP DEFAULT
3286  */
3287 static void
3288 ATExecColumnDefault(Relation rel, const char *colName,
3289                                         Node *newDefault)
3290 {
3291         AttrNumber      attnum;
3292
3293         /*
3294          * get the number of the attribute
3295          */
3296         attnum = get_attnum(RelationGetRelid(rel), colName);
3297         if (attnum == InvalidAttrNumber)
3298                 ereport(ERROR,
3299                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
3300                                  errmsg("column \"%s\" of relation \"%s\" does not exist",
3301                                                 colName, RelationGetRelationName(rel))));
3302
3303         /* Prevent them from altering a system attribute */
3304         if (attnum <= 0)
3305                 ereport(ERROR,
3306                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3307                                  errmsg("cannot alter system column \"%s\"",
3308                                                 colName)));
3309
3310         /*
3311          * Remove any old default for the column.  We use RESTRICT here for
3312          * safety, but at present we do not expect anything to depend on the
3313          * default.
3314          */
3315         RemoveAttrDefault(RelationGetRelid(rel), attnum, DROP_RESTRICT, false);
3316
3317         if (newDefault)
3318         {
3319                 /* SET DEFAULT */
3320                 RawColumnDefault *rawEnt;
3321
3322                 rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault));
3323                 rawEnt->attnum = attnum;
3324                 rawEnt->raw_default = newDefault;
3325
3326                 /*
3327                  * This function is intended for CREATE TABLE, so it processes a
3328                  * _list_ of defaults, but we just do one.
3329                  */
3330                 AddRelationRawConstraints(rel, list_make1(rawEnt), NIL);
3331         }
3332 }
3333
3334 /*
3335  * ALTER TABLE ALTER COLUMN SET STATISTICS
3336  */
3337 static void
3338 ATPrepSetStatistics(Relation rel, const char *colName, Node *flagValue)
3339 {
3340         /*
3341          * We do our own permission checking because (a) we want to allow SET
3342          * STATISTICS on indexes (for expressional index columns), and (b) we want
3343          * to allow SET STATISTICS on system catalogs without requiring
3344          * allowSystemTableMods to be turned on.
3345          */
3346         if (rel->rd_rel->relkind != RELKIND_RELATION &&
3347                 rel->rd_rel->relkind != RELKIND_INDEX)
3348                 ereport(ERROR,
3349                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
3350                                  errmsg("\"%s\" is not a table or index",
3351                                                 RelationGetRelationName(rel))));
3352
3353         /* Permissions checks */
3354         if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
3355                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
3356                                            RelationGetRelationName(rel));
3357 }
3358
3359 static void
3360 ATExecSetStatistics(Relation rel, const char *colName, Node *newValue)
3361 {
3362         int                     newtarget;
3363         Relation        attrelation;
3364         HeapTuple       tuple;
3365         Form_pg_attribute attrtuple;
3366
3367         Assert(IsA(newValue, Integer));
3368         newtarget = intVal(newValue);
3369
3370         /*
3371          * Limit target to a sane range
3372          */
3373         if (newtarget < -1)
3374         {
3375                 ereport(ERROR,
3376                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3377                                  errmsg("statistics target %d is too low",
3378                                                 newtarget)));
3379         }
3380         else if (newtarget > 1000)
3381         {
3382                 newtarget = 1000;
3383                 ereport(WARNING,
3384                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3385                                  errmsg("lowering statistics target to %d",
3386                                                 newtarget)));
3387         }
3388
3389         attrelation = heap_open(AttributeRelationId, RowExclusiveLock);
3390
3391         tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
3392
3393         if (!HeapTupleIsValid(tuple))
3394                 ereport(ERROR,
3395                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
3396                                  errmsg("column \"%s\" of relation \"%s\" does not exist",
3397                                                 colName, RelationGetRelationName(rel))));
3398         attrtuple = (Form_pg_attribute) GETSTRUCT(tuple);
3399
3400         if (attrtuple->attnum <= 0)
3401                 ereport(ERROR,
3402                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3403                                  errmsg("cannot alter system column \"%s\"",
3404                                                 colName)));
3405
3406         attrtuple->attstattarget = newtarget;
3407
3408         simple_heap_update(attrelation, &tuple->t_self, tuple);
3409
3410         /* keep system catalog indexes current */
3411         CatalogUpdateIndexes(attrelation, tuple);
3412
3413         heap_freetuple(tuple);
3414
3415         heap_close(attrelation, RowExclusiveLock);
3416 }
3417
3418 /*
3419  * ALTER TABLE ALTER COLUMN SET STORAGE
3420  */
3421 static void
3422 ATExecSetStorage(Relation rel, const char *colName, Node *newValue)
3423 {
3424         char       *storagemode;
3425         char            newstorage;
3426         Relation        attrelation;
3427         HeapTuple       tuple;
3428         Form_pg_attribute attrtuple;
3429
3430         Assert(IsA(newValue, String));
3431         storagemode = strVal(newValue);
3432
3433         if (pg_strcasecmp(storagemode, "plain") == 0)
3434                 newstorage = 'p';
3435         else if (pg_strcasecmp(storagemode, "external") == 0)
3436                 newstorage = 'e';
3437         else if (pg_strcasecmp(storagemode, "extended") == 0)
3438                 newstorage = 'x';
3439         else if (pg_strcasecmp(storagemode, "main") == 0)
3440                 newstorage = 'm';
3441         else
3442         {
3443                 ereport(ERROR,
3444                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3445                                  errmsg("invalid storage type \"%s\"",
3446                                                 storagemode)));
3447                 newstorage = 0;                 /* keep compiler quiet */
3448         }
3449
3450         attrelation = heap_open(AttributeRelationId, RowExclusiveLock);
3451
3452         tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
3453
3454         if (!HeapTupleIsValid(tuple))
3455                 ereport(ERROR,
3456                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
3457                                  errmsg("column \"%s\" of relation \"%s\" does not exist",
3458                                                 colName, RelationGetRelationName(rel))));
3459         attrtuple = (Form_pg_attribute) GETSTRUCT(tuple);
3460
3461         if (attrtuple->attnum <= 0)
3462                 ereport(ERROR,
3463                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3464                                  errmsg("cannot alter system column \"%s\"",
3465                                                 colName)));
3466
3467         /*
3468          * safety check: do not allow toasted storage modes unless column datatype
3469          * is TOAST-aware.
3470          */
3471         if (newstorage == 'p' || TypeIsToastable(attrtuple->atttypid))
3472                 attrtuple->attstorage = newstorage;
3473         else
3474                 ereport(ERROR,
3475                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3476                                  errmsg("column data type %s can only have storage PLAIN",
3477                                                 format_type_be(attrtuple->atttypid))));
3478
3479         simple_heap_update(attrelation, &tuple->t_self, tuple);
3480
3481         /* keep system catalog indexes current */
3482         CatalogUpdateIndexes(attrelation, tuple);
3483
3484         heap_freetuple(tuple);
3485
3486         heap_close(attrelation, RowExclusiveLock);
3487 }
3488
3489
3490 /*
3491  * ALTER TABLE DROP COLUMN
3492  *
3493  * DROP COLUMN cannot use the normal ALTER TABLE recursion mechanism,
3494  * because we have to decide at runtime whether to recurse or not depending
3495  * on whether attinhcount goes to zero or not.  (We can't check this in a
3496  * static pre-pass because it won't handle multiple inheritance situations
3497  * correctly.)  Since DROP COLUMN doesn't need to create any work queue
3498  * entries for Phase 3, it's okay to recurse internally in this routine
3499  * without considering the work queue.
3500  */
3501 static void
3502 ATExecDropColumn(Relation rel, const char *colName,
3503                                  DropBehavior behavior,
3504                                  bool recurse, bool recursing)
3505 {
3506         HeapTuple       tuple;
3507         Form_pg_attribute targetatt;
3508         AttrNumber      attnum;
3509         List       *children;
3510         ObjectAddress object;
3511
3512         /* At top level, permission check was done in ATPrepCmd, else do it */
3513         if (recursing)
3514                 ATSimplePermissions(rel, false);
3515
3516         /*
3517          * get the number of the attribute
3518          */
3519         tuple = SearchSysCacheAttName(RelationGetRelid(rel), colName);
3520         if (!HeapTupleIsValid(tuple))
3521                 ereport(ERROR,
3522                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
3523                                  errmsg("column \"%s\" of relation \"%s\" does not exist",
3524                                                 colName, RelationGetRelationName(rel))));
3525         targetatt = (Form_pg_attribute) GETSTRUCT(tuple);
3526
3527         attnum = targetatt->attnum;
3528
3529         /* Can't drop a system attribute, except OID */
3530         if (attnum <= 0 && attnum != ObjectIdAttributeNumber)
3531                 ereport(ERROR,
3532                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3533                                  errmsg("cannot drop system column \"%s\"",
3534                                                 colName)));
3535
3536         /* Don't drop inherited columns */
3537         if (targetatt->attinhcount > 0 && !recursing)
3538                 ereport(ERROR,
3539                                 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
3540                                  errmsg("cannot drop inherited column \"%s\"",
3541                                                 colName)));
3542
3543         ReleaseSysCache(tuple);
3544
3545         /*
3546          * Propagate to children as appropriate.  Unlike most other ALTER
3547          * routines, we have to do this one level of recursion at a time; we can't
3548          * use find_all_inheritors to do it in one pass.
3549          */
3550         children = find_inheritance_children(RelationGetRelid(rel));
3551
3552         if (children)
3553         {
3554                 Relation        attr_rel;
3555                 ListCell   *child;
3556
3557                 attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
3558                 foreach(child, children)
3559                 {
3560                         Oid                     childrelid = lfirst_oid(child);
3561                         Relation        childrel;
3562                         Form_pg_attribute childatt;
3563
3564                         childrel = heap_open(childrelid, AccessExclusiveLock);
3565
3566                         tuple = SearchSysCacheCopyAttName(childrelid, colName);
3567                         if (!HeapTupleIsValid(tuple))           /* shouldn't happen */
3568                                 elog(ERROR, "cache lookup failed for attribute \"%s\" of relation %u",
3569                                          colName, childrelid);
3570                         childatt = (Form_pg_attribute) GETSTRUCT(tuple);
3571
3572                         if (childatt->attinhcount <= 0)         /* shouldn't happen */
3573                                 elog(ERROR, "relation %u has non-inherited attribute \"%s\"",
3574                                          childrelid, colName);
3575
3576                         if (recurse)
3577                         {
3578                                 /*
3579                                  * If the child column has other definition sources, just
3580                                  * decrement its inheritance count; if not, recurse to delete
3581                                  * it.
3582                                  */
3583                                 if (childatt->attinhcount == 1 && !childatt->attislocal)
3584                                 {
3585                                         /* Time to delete this child column, too */
3586                                         ATExecDropColumn(childrel, colName, behavior, true, true);
3587                                 }
3588                                 else
3589                                 {
3590                                         /* Child column must survive my deletion */
3591                                         childatt->attinhcount--;
3592
3593                                         simple_heap_update(attr_rel, &tuple->t_self, tuple);
3594
3595                                         /* keep the system catalog indexes current */
3596                                         CatalogUpdateIndexes(attr_rel, tuple);
3597
3598                                         /* Make update visible */
3599                                         CommandCounterIncrement();
3600                                 }
3601                         }
3602                         else
3603                         {
3604                                 /*
3605                                  * If we were told to drop ONLY in this table (no recursion),
3606                                  * we need to mark the inheritors' attribute as locally
3607                                  * defined rather than inherited.
3608                                  */
3609                                 childatt->attinhcount--;
3610                                 childatt->attislocal = true;
3611
3612                                 simple_heap_update(attr_rel, &tuple->t_self, tuple);
3613
3614                                 /* keep the system catalog indexes current */
3615                                 CatalogUpdateIndexes(attr_rel, tuple);
3616
3617                                 /* Make update visible */
3618                                 CommandCounterIncrement();
3619                         }
3620
3621                         heap_freetuple(tuple);
3622
3623                         heap_close(childrel, NoLock);
3624                 }
3625                 heap_close(attr_rel, RowExclusiveLock);
3626         }
3627
3628         /*
3629          * Perform the actual column deletion
3630          */
3631         object.classId = RelationRelationId;
3632         object.objectId = RelationGetRelid(rel);
3633         object.objectSubId = attnum;
3634
3635         performDeletion(&object, behavior);
3636
3637         /*
3638          * If we dropped the OID column, must adjust pg_class.relhasoids
3639          */
3640         if (attnum == ObjectIdAttributeNumber)
3641         {
3642                 Relation        class_rel;
3643                 Form_pg_class tuple_class;
3644
3645                 class_rel = heap_open(RelationRelationId, RowExclusiveLock);
3646
3647                 tuple = SearchSysCacheCopy(RELOID,
3648                                                                    ObjectIdGetDatum(RelationGetRelid(rel)),
3649                                                                    0, 0, 0);
3650                 if (!HeapTupleIsValid(tuple))
3651                         elog(ERROR, "cache lookup failed for relation %u",
3652                                  RelationGetRelid(rel));
3653                 tuple_class = (Form_pg_class) GETSTRUCT(tuple);
3654
3655                 tuple_class->relhasoids = false;
3656                 simple_heap_update(class_rel, &tuple->t_self, tuple);
3657
3658                 /* Keep the catalog indexes up to date */
3659                 CatalogUpdateIndexes(class_rel, tuple);
3660
3661                 heap_close(class_rel, RowExclusiveLock);
3662         }
3663 }
3664
3665 /*
3666  * ALTER TABLE ADD INDEX
3667  *
3668  * There is no such command in the grammar, but the parser converts UNIQUE
3669  * and PRIMARY KEY constraints into AT_AddIndex subcommands.  This lets us
3670  * schedule creation of the index at the appropriate time during ALTER.
3671  */
3672 static void
3673 ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
3674                            IndexStmt *stmt, bool is_rebuild)
3675 {
3676         bool            check_rights;
3677         bool            skip_build;
3678         bool            quiet;
3679
3680         Assert(IsA(stmt, IndexStmt));
3681
3682         /* suppress schema rights check when rebuilding existing index */
3683         check_rights = !is_rebuild;
3684         /* skip index build if phase 3 will have to rewrite table anyway */
3685         skip_build = (tab->newvals != NIL);
3686         /* suppress notices when rebuilding existing index */
3687         quiet = is_rebuild;
3688
3689         DefineIndex(stmt->relation, /* relation */
3690                                 stmt->idxname,  /* index name */
3691                                 InvalidOid,             /* no predefined OID */
3692                                 stmt->accessMethod,             /* am name */
3693                                 stmt->tableSpace,
3694                                 stmt->indexParams,              /* parameters */
3695                                 (Expr *) stmt->whereClause,
3696                                 stmt->rangetable,
3697                                 stmt->unique,
3698                                 stmt->primary,
3699                                 stmt->isconstraint,
3700                                 true,                   /* is_alter_table */
3701                                 check_rights,
3702                                 skip_build,
3703                                 quiet);
3704 }
3705
3706 /*
3707  * ALTER TABLE ADD CONSTRAINT
3708  */
3709 static void
3710 ATExecAddConstraint(AlteredTableInfo *tab, Relation rel, Node *newConstraint)
3711 {
3712         switch (nodeTag(newConstraint))
3713         {
3714                 case T_Constraint:
3715                         {
3716                                 Constraint *constr = (Constraint *) newConstraint;
3717
3718                                 /*
3719                                  * Currently, we only expect to see CONSTR_CHECK nodes
3720                                  * arriving here (see the preprocessing done in
3721                                  * parser/analyze.c).  Use a switch anyway to make it easier
3722                                  * to add more code later.
3723                                  */
3724                                 switch (constr->contype)
3725                                 {
3726                                         case CONSTR_CHECK:
3727                                                 {
3728                                                         List       *newcons;
3729                                                         ListCell   *lcon;
3730
3731                                                         /*
3732                                                          * Call AddRelationRawConstraints to do the work.
3733                                                          * It returns a list of cooked constraints.
3734                                                          */
3735                                                         newcons = AddRelationRawConstraints(rel, NIL,
3736                                                                                                                  list_make1(constr));
3737                                                         /* Add each constraint to Phase 3's queue */
3738                                                         foreach(lcon, newcons)
3739                                                         {
3740                                                                 CookedConstraint *ccon = (CookedConstraint *) lfirst(lcon);
3741                                                                 NewConstraint *newcon;
3742
3743                                                                 newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
3744                                                                 newcon->name = ccon->name;
3745                                                                 newcon->contype = ccon->contype;
3746                                                                 newcon->attnum = ccon->attnum;
3747                                                                 /* ExecQual wants implicit-AND format */
3748                                                                 newcon->qual = (Node *)
3749                                                                         make_ands_implicit((Expr *) ccon->expr);
3750
3751                                                                 tab->constraints = lappend(tab->constraints,
3752                                                                                                                    newcon);
3753                                                         }
3754                                                         break;
3755                                                 }
3756                                         default:
3757                                                 elog(ERROR, "unrecognized constraint type: %d",
3758                                                          (int) constr->contype);
3759                                 }
3760                                 break;
3761                         }
3762                 case T_FkConstraint:
3763                         {
3764                                 FkConstraint *fkconstraint = (FkConstraint *) newConstraint;
3765
3766                                 /*
3767                                  * Assign or validate constraint name
3768                                  */
3769                                 if (fkconstraint->constr_name)
3770                                 {
3771                                         if (ConstraintNameIsUsed(CONSTRAINT_RELATION,
3772                                                                                          RelationGetRelid(rel),
3773                                                                                          RelationGetNamespace(rel),
3774                                                                                          fkconstraint->constr_name))
3775                                                 ereport(ERROR,
3776                                                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
3777                                                                  errmsg("constraint \"%s\" for relation \"%s\" already exists",
3778                                                                                 fkconstraint->constr_name,
3779                                                                                 RelationGetRelationName(rel))));
3780                                 }
3781                                 else
3782                                         fkconstraint->constr_name =
3783                                                 ChooseConstraintName(RelationGetRelationName(rel),
3784                                                                         strVal(linitial(fkconstraint->fk_attrs)),
3785                                                                                          "fkey",
3786                                                                                          RelationGetNamespace(rel),
3787                                                                                          NIL);
3788
3789                                 ATAddForeignKeyConstraint(tab, rel, fkconstraint);
3790
3791                                 break;
3792                         }
3793                 default:
3794                         elog(ERROR, "unrecognized node type: %d",
3795                                  (int) nodeTag(newConstraint));
3796         }
3797 }
3798
3799 /*
3800  * Add a foreign-key constraint to a single table
3801  *
3802  * Subroutine for ATExecAddConstraint.  Must already hold exclusive
3803  * lock on the rel, and have done appropriate validity/permissions checks
3804  * for it.
3805  */
3806 static void
3807 ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
3808                                                   FkConstraint *fkconstraint)
3809 {
3810         Relation        pkrel;
3811         AclResult       aclresult;
3812         int16           pkattnum[INDEX_MAX_KEYS];
3813         int16           fkattnum[INDEX_MAX_KEYS];
3814         Oid                     pktypoid[INDEX_MAX_KEYS];
3815         Oid                     fktypoid[INDEX_MAX_KEYS];
3816         Oid                     opclasses[INDEX_MAX_KEYS];
3817         int                     i;
3818         int                     numfks,
3819                                 numpks;
3820         Oid                     indexOid;
3821         Oid                     constrOid;
3822
3823         /*
3824          * Grab an exclusive lock on the pk table, so that someone doesn't delete
3825          * rows out from under us. (Although a lesser lock would do for that
3826          * purpose, we'll need exclusive lock anyway to add triggers to the pk
3827          * table; trying to start with a lesser lock will just create a risk of
3828          * deadlock.)
3829          */
3830         pkrel = heap_openrv(fkconstraint->pktable, AccessExclusiveLock);
3831
3832         /*
3833          * Validity and permissions checks
3834          *
3835          * Note: REFERENCES permissions checks are redundant with CREATE TRIGGER,
3836          * but we may as well error out sooner instead of later.
3837          */
3838         if (pkrel->rd_rel->relkind != RELKIND_RELATION)
3839                 ereport(ERROR,
3840                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
3841                                  errmsg("referenced relation \"%s\" is not a table",
3842                                                 RelationGetRelationName(pkrel))));
3843
3844         aclresult = pg_class_aclcheck(RelationGetRelid(pkrel), GetUserId(),
3845                                                                   ACL_REFERENCES);
3846         if (aclresult != ACLCHECK_OK)
3847                 aclcheck_error(aclresult, ACL_KIND_CLASS,
3848                                            RelationGetRelationName(pkrel));
3849
3850         if (!allowSystemTableMods && IsSystemRelation(pkrel))
3851                 ereport(ERROR,
3852                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
3853                                  errmsg("permission denied: \"%s\" is a system catalog",
3854                                                 RelationGetRelationName(pkrel))));
3855
3856         aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
3857                                                                   ACL_REFERENCES);
3858         if (aclresult != ACLCHECK_OK)
3859                 aclcheck_error(aclresult, ACL_KIND_CLASS,
3860                                            RelationGetRelationName(rel));
3861
3862         /*
3863          * Disallow reference from permanent table to temp table or vice versa.
3864          * (The ban on perm->temp is for fairly obvious reasons.  The ban on
3865          * temp->perm is because other backends might need to run the RI triggers
3866          * on the perm table, but they can't reliably see tuples the owning
3867          * backend has created in the temp table, because non-shared buffers are
3868          * used for temp tables.)
3869          */
3870         if (isTempNamespace(RelationGetNamespace(pkrel)))
3871         {
3872                 if (!isTempNamespace(RelationGetNamespace(rel)))
3873                         ereport(ERROR,
3874                                         (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
3875                                          errmsg("cannot reference temporary table from permanent table constraint")));
3876         }
3877         else
3878         {
3879                 if (isTempNamespace(RelationGetNamespace(rel)))
3880                         ereport(ERROR,
3881                                         (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
3882                                          errmsg("cannot reference permanent table from temporary table constraint")));
3883         }
3884
3885         /*
3886          * Look up the referencing attributes to make sure they exist, and record
3887          * their attnums and type OIDs.
3888          */
3889         MemSet(pkattnum, 0, sizeof(pkattnum));
3890         MemSet(fkattnum, 0, sizeof(fkattnum));
3891         MemSet(pktypoid, 0, sizeof(pktypoid));
3892         MemSet(fktypoid, 0, sizeof(fktypoid));
3893         MemSet(opclasses, 0, sizeof(opclasses));
3894
3895         numfks = transformColumnNameList(RelationGetRelid(rel),
3896                                                                          fkconstraint->fk_attrs,
3897                                                                          fkattnum, fktypoid);
3898
3899         /*
3900          * If the attribute list for the referenced table was omitted, lookup the
3901          * definition of the primary key and use it.  Otherwise, validate the
3902          * supplied attribute list.  In either case, discover the index OID and
3903          * index opclasses, and the attnums and type OIDs of the attributes.
3904          */
3905         if (fkconstraint->pk_attrs == NIL)
3906         {
3907                 numpks = transformFkeyGetPrimaryKey(pkrel, &indexOid,
3908                                                                                         &fkconstraint->pk_attrs,
3909                                                                                         pkattnum, pktypoid,
3910                                                                                         opclasses);
3911         }
3912         else
3913         {
3914                 numpks = transformColumnNameList(RelationGetRelid(pkrel),
3915                                                                                  fkconstraint->pk_attrs,
3916                                                                                  pkattnum, pktypoid);
3917                 /* Look for an index matching the column list */
3918                 indexOid = transformFkeyCheckAttrs(pkrel, numpks, pkattnum,
3919                                                                                    opclasses);
3920         }
3921
3922         /* Be sure referencing and referenced column types are comparable */
3923         if (numfks != numpks)
3924                 ereport(ERROR,
3925                                 (errcode(ERRCODE_INVALID_FOREIGN_KEY),
3926                                  errmsg("number of referencing and referenced columns for foreign key disagree")));
3927
3928         for (i = 0; i < numpks; i++)
3929         {
3930                 /*
3931                  * pktypoid[i] is the primary key table's i'th key's type fktypoid[i]
3932                  * is the foreign key table's i'th key's type
3933                  *
3934                  * Note that we look for an operator with the PK type on the left;
3935                  * when the types are different this is critical because the PK index
3936                  * will need operators with the indexkey on the left. (Ordinarily both
3937                  * commutator operators will exist if either does, but we won't get
3938                  * the right answer from the test below on opclass membership unless
3939                  * we select the proper operator.)
3940                  */
3941                 Operator        o = oper(list_make1(makeString("=")),
3942                                                          pktypoid[i], fktypoid[i], true);
3943
3944                 if (o == NULL)
3945                         ereport(ERROR,
3946                                         (errcode(ERRCODE_UNDEFINED_FUNCTION),
3947                                          errmsg("foreign key constraint \"%s\" "
3948                                                         "cannot be implemented",
3949                                                         fkconstraint->constr_name),
3950                                          errdetail("Key columns \"%s\" and \"%s\" "
3951                                                            "are of incompatible types: %s and %s.",
3952                                                            strVal(list_nth(fkconstraint->fk_attrs, i)),
3953                                                            strVal(list_nth(fkconstraint->pk_attrs, i)),
3954                                                            format_type_be(fktypoid[i]),
3955                                                            format_type_be(pktypoid[i]))));
3956
3957                 /*
3958                  * Check that the found operator is compatible with the PK index, and
3959                  * generate a warning if not, since otherwise costly seqscans will be
3960                  * incurred to check FK validity.
3961                  */
3962                 if (!op_in_opclass(oprid(o), opclasses[i]))
3963                         ereport(WARNING,
3964                                         (errmsg("foreign key constraint \"%s\" "
3965                                                         "will require costly sequential scans",
3966                                                         fkconstraint->constr_name),
3967                                          errdetail("Key columns \"%s\" and \"%s\" "
3968                                                            "are of different types: %s and %s.",
3969                                                            strVal(list_nth(fkconstraint->fk_attrs, i)),
3970                                                            strVal(list_nth(fkconstraint->pk_attrs, i)),
3971                                                            format_type_be(fktypoid[i]),
3972                                                            format_type_be(pktypoid[i]))));
3973
3974                 ReleaseSysCache(o);
3975         }
3976
3977         /*
3978          * Tell Phase 3 to check that the constraint is satisfied by existing rows
3979          * (we can skip this during table creation).
3980          */
3981         if (!fkconstraint->skip_validation)
3982         {
3983                 NewConstraint *newcon;
3984
3985                 newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
3986                 newcon->name = fkconstraint->constr_name;
3987                 newcon->contype = CONSTR_FOREIGN;
3988                 newcon->refrelid = RelationGetRelid(pkrel);
3989                 newcon->qual = (Node *) fkconstraint;
3990
3991                 tab->constraints = lappend(tab->constraints, newcon);
3992         }
3993
3994         /*
3995          * Record the FK constraint in pg_constraint.
3996          */
3997         constrOid = CreateConstraintEntry(fkconstraint->constr_name,
3998                                                                           RelationGetNamespace(rel),
3999                                                                           CONSTRAINT_FOREIGN,
4000                                                                           fkconstraint->deferrable,
4001                                                                           fkconstraint->initdeferred,
4002                                                                           RelationGetRelid(rel),
4003                                                                           fkattnum,
4004                                                                           numfks,
4005                                                                           InvalidOid,           /* not a domain
4006                                                                                                                  * constraint */
4007                                                                           RelationGetRelid(pkrel),
4008                                                                           pkattnum,
4009                                                                           numpks,
4010                                                                           fkconstraint->fk_upd_action,
4011                                                                           fkconstraint->fk_del_action,
4012                                                                           fkconstraint->fk_matchtype,
4013                                                                           indexOid,
4014                                                                           NULL,         /* no check constraint */
4015                                                                           NULL,
4016                                                                           NULL);
4017
4018         /*
4019          * Create the triggers that will enforce the constraint.
4020          */
4021         createForeignKeyTriggers(rel, fkconstraint, constrOid);
4022
4023         /*
4024          * Close pk table, but keep lock until we've committed.
4025          */
4026         heap_close(pkrel, NoLock);
4027 }
4028
4029
4030 /*
4031  * transformColumnNameList - transform list of column names
4032  *
4033  * Lookup each name and return its attnum and type OID
4034  */
4035 static int
4036 transformColumnNameList(Oid relId, List *colList,
4037                                                 int16 *attnums, Oid *atttypids)
4038 {
4039         ListCell   *l;
4040         int                     attnum;
4041
4042         attnum = 0;
4043         foreach(l, colList)
4044         {
4045                 char       *attname = strVal(lfirst(l));
4046                 HeapTuple       atttuple;
4047
4048                 atttuple = SearchSysCacheAttName(relId, attname);
4049                 if (!HeapTupleIsValid(atttuple))
4050                         ereport(ERROR,
4051                                         (errcode(ERRCODE_UNDEFINED_COLUMN),
4052                                          errmsg("column \"%s\" referenced in foreign key constraint does not exist",
4053                                                         attname)));
4054                 if (attnum >= INDEX_MAX_KEYS)
4055                         ereport(ERROR,
4056                                         (errcode(ERRCODE_TOO_MANY_COLUMNS),
4057                                          errmsg("cannot have more than %d keys in a foreign key",
4058                                                         INDEX_MAX_KEYS)));
4059                 attnums[attnum] = ((Form_pg_attribute) GETSTRUCT(atttuple))->attnum;
4060                 atttypids[attnum] = ((Form_pg_attribute) GETSTRUCT(atttuple))->atttypid;
4061                 ReleaseSysCache(atttuple);
4062                 attnum++;
4063         }
4064
4065         return attnum;
4066 }
4067
4068 /*
4069  * transformFkeyGetPrimaryKey -
4070  *
4071  *      Look up the names, attnums, and types of the primary key attributes
4072  *      for the pkrel.  Also return the index OID and index opclasses of the
4073  *      index supporting the primary key.
4074  *
4075  *      All parameters except pkrel are output parameters.      Also, the function
4076  *      return value is the number of attributes in the primary key.
4077  *
4078  *      Used when the column list in the REFERENCES specification is omitted.
4079  */
4080 static int
4081 transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
4082                                                    List **attnamelist,
4083                                                    int16 *attnums, Oid *atttypids,
4084                                                    Oid *opclasses)
4085 {
4086         List       *indexoidlist;
4087         ListCell   *indexoidscan;
4088         HeapTuple       indexTuple = NULL;
4089         Form_pg_index indexStruct = NULL;
4090         Datum           indclassDatum;
4091         bool            isnull;
4092         oidvector  *indclass;
4093         int                     i;
4094
4095         /*
4096          * Get the list of index OIDs for the table from the relcache, and look up
4097          * each one in the pg_index syscache until we find one marked primary key
4098          * (hopefully there isn't more than one such).
4099          */
4100         *indexOid = InvalidOid;
4101
4102         indexoidlist = RelationGetIndexList(pkrel);
4103
4104         foreach(indexoidscan, indexoidlist)
4105         {
4106                 Oid                     indexoid = lfirst_oid(indexoidscan);
4107
4108                 indexTuple = SearchSysCache(INDEXRELID,
4109                                                                         ObjectIdGetDatum(indexoid),
4110                                                                         0, 0, 0);
4111                 if (!HeapTupleIsValid(indexTuple))
4112                         elog(ERROR, "cache lookup failed for index %u", indexoid);
4113                 indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
4114                 if (indexStruct->indisprimary)
4115                 {
4116                         *indexOid = indexoid;
4117                         break;
4118                 }
4119                 ReleaseSysCache(indexTuple);
4120         }
4121
4122         list_free(indexoidlist);
4123
4124         /*
4125          * Check that we found it
4126          */
4127         if (!OidIsValid(*indexOid))
4128                 ereport(ERROR,
4129                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
4130                                  errmsg("there is no primary key for referenced table \"%s\"",
4131                                                 RelationGetRelationName(pkrel))));
4132
4133         /* Must get indclass the hard way */
4134         indclassDatum = SysCacheGetAttr(INDEXRELID, indexTuple,
4135                                                                         Anum_pg_index_indclass, &isnull);
4136         Assert(!isnull);
4137         indclass = (oidvector *) DatumGetPointer(indclassDatum);
4138
4139         /*
4140          * Now build the list of PK attributes from the indkey definition (we
4141          * assume a primary key cannot have expressional elements)
4142          */
4143         *attnamelist = NIL;
4144         for (i = 0; i < indexStruct->indnatts; i++)
4145         {
4146                 int                     pkattno = indexStruct->indkey.values[i];
4147
4148                 attnums[i] = pkattno;
4149                 atttypids[i] = attnumTypeId(pkrel, pkattno);
4150                 opclasses[i] = indclass->values[i];
4151                 *attnamelist = lappend(*attnamelist,
4152                            makeString(pstrdup(NameStr(*attnumAttName(pkrel, pkattno)))));
4153         }
4154
4155         ReleaseSysCache(indexTuple);
4156
4157         return i;
4158 }
4159
4160 /*
4161  * transformFkeyCheckAttrs -
4162  *
4163  *      Make sure that the attributes of a referenced table belong to a unique
4164  *      (or primary key) constraint.  Return the OID of the index supporting
4165  *      the constraint, as well as the opclasses associated with the index
4166  *      columns.
4167  */
4168 static Oid
4169 transformFkeyCheckAttrs(Relation pkrel,
4170                                                 int numattrs, int16 *attnums,
4171                                                 Oid *opclasses) /* output parameter */
4172 {
4173         Oid                     indexoid = InvalidOid;
4174         bool            found = false;
4175         List       *indexoidlist;
4176         ListCell   *indexoidscan;
4177
4178         /*
4179          * Get the list of index OIDs for the table from the relcache, and look up
4180          * each one in the pg_index syscache, and match unique indexes to the list
4181          * of attnums we are given.
4182          */
4183         indexoidlist = RelationGetIndexList(pkrel);
4184
4185         foreach(indexoidscan, indexoidlist)
4186         {
4187                 HeapTuple       indexTuple;
4188                 Form_pg_index indexStruct;
4189                 int                     i,
4190                                         j;
4191
4192                 indexoid = lfirst_oid(indexoidscan);
4193                 indexTuple = SearchSysCache(INDEXRELID,
4194                                                                         ObjectIdGetDatum(indexoid),
4195                                                                         0, 0, 0);
4196                 if (!HeapTupleIsValid(indexTuple))
4197                         elog(ERROR, "cache lookup failed for index %u", indexoid);
4198                 indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
4199
4200                 /*
4201                  * Must have the right number of columns; must be unique and not a
4202                  * partial index; forget it if there are any expressions, too
4203                  */
4204                 if (indexStruct->indnatts == numattrs &&
4205                         indexStruct->indisunique &&
4206                         heap_attisnull(indexTuple, Anum_pg_index_indpred) &&
4207                         heap_attisnull(indexTuple, Anum_pg_index_indexprs))
4208                 {
4209                         /* Must get indclass the hard way */
4210                         Datum           indclassDatum;
4211                         bool            isnull;
4212                         oidvector  *indclass;
4213
4214                         indclassDatum = SysCacheGetAttr(INDEXRELID, indexTuple,
4215                                                                                         Anum_pg_index_indclass, &isnull);
4216                         Assert(!isnull);
4217                         indclass = (oidvector *) DatumGetPointer(indclassDatum);
4218
4219                         /*
4220                          * The given attnum list may match the index columns in any order.
4221                          * Check that each list is a subset of the other.
4222                          */
4223                         for (i = 0; i < numattrs; i++)
4224                         {
4225                                 found = false;
4226                                 for (j = 0; j < numattrs; j++)
4227                                 {
4228                                         if (attnums[i] == indexStruct->indkey.values[j])
4229                                         {
4230                                                 found = true;
4231                                                 break;
4232                                         }
4233                                 }
4234                                 if (!found)
4235                                         break;
4236                         }
4237                         if (found)
4238                         {
4239                                 for (i = 0; i < numattrs; i++)
4240                                 {
4241                                         found = false;
4242                                         for (j = 0; j < numattrs; j++)
4243                                         {
4244                                                 if (attnums[j] == indexStruct->indkey.values[i])
4245                                                 {
4246                                                         opclasses[j] = indclass->values[i];
4247                                                         found = true;
4248                                                         break;
4249                                                 }
4250                                         }
4251                                         if (!found)
4252                                                 break;
4253                                 }
4254                         }
4255                 }
4256                 ReleaseSysCache(indexTuple);
4257                 if (found)
4258                         break;
4259         }
4260
4261         if (!found)
4262                 ereport(ERROR,
4263                                 (errcode(ERRCODE_INVALID_FOREIGN_KEY),
4264                                  errmsg("there is no unique constraint matching given keys for referenced table \"%s\"",
4265                                                 RelationGetRelationName(pkrel))));
4266
4267         list_free(indexoidlist);
4268
4269         return indexoid;
4270 }
4271
4272 /*
4273  * Scan the existing rows in a table to verify they meet a proposed FK
4274  * constraint.
4275  *
4276  * Caller must have opened and locked both relations.
4277  */
4278 static void
4279 validateForeignKeyConstraint(FkConstraint *fkconstraint,
4280                                                          Relation rel,
4281                                                          Relation pkrel)
4282 {
4283         HeapScanDesc scan;
4284         HeapTuple       tuple;
4285         Trigger         trig;
4286         ListCell   *list;
4287         int                     count;
4288
4289         /*
4290          * See if we can do it with a single LEFT JOIN query.  A FALSE result
4291          * indicates we must proceed with the fire-the-trigger method.
4292          */
4293         if (RI_Initial_Check(fkconstraint, rel, pkrel))
4294                 return;
4295
4296         /*
4297          * Scan through each tuple, calling RI_FKey_check_ins (insert trigger) as
4298          * if that tuple had just been inserted.  If any of those fail, it should
4299          * ereport(ERROR) and that's that.
4300          */
4301         MemSet(&trig, 0, sizeof(trig));
4302         trig.tgoid = InvalidOid;
4303         trig.tgname = fkconstraint->constr_name;
4304         trig.tgenabled = TRUE;
4305         trig.tgisconstraint = TRUE;
4306         trig.tgconstrrelid = RelationGetRelid(pkrel);
4307         trig.tgdeferrable = FALSE;
4308         trig.tginitdeferred = FALSE;
4309
4310         trig.tgargs = (char **) palloc(sizeof(char *) *
4311                                                                    (4 + list_length(fkconstraint->fk_attrs)
4312                                                                         + list_length(fkconstraint->pk_attrs)));
4313
4314         trig.tgargs[0] = trig.tgname;
4315         trig.tgargs[1] = RelationGetRelationName(rel);
4316         trig.tgargs[2] = RelationGetRelationName(pkrel);
4317         trig.tgargs[3] = fkMatchTypeToString(fkconstraint->fk_matchtype);
4318         count = 4;
4319         foreach(list, fkconstraint->fk_attrs)
4320         {
4321                 char       *fk_at = strVal(lfirst(list));
4322
4323                 trig.tgargs[count] = fk_at;
4324                 count += 2;
4325         }
4326         count = 5;
4327         foreach(list, fkconstraint->pk_attrs)
4328         {
4329                 char       *pk_at = strVal(lfirst(list));
4330
4331                 trig.tgargs[count] = pk_at;
4332                 count += 2;
4333         }
4334         trig.tgnargs = count - 1;
4335
4336         scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
4337
4338         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
4339         {
4340                 FunctionCallInfoData fcinfo;
4341                 TriggerData trigdata;
4342
4343                 /*
4344                  * Make a call to the trigger function
4345                  *
4346                  * No parameters are passed, but we do set a context
4347                  */
4348                 MemSet(&fcinfo, 0, sizeof(fcinfo));
4349
4350                 /*
4351                  * We assume RI_FKey_check_ins won't look at flinfo...
4352                  */
4353                 trigdata.type = T_TriggerData;
4354                 trigdata.tg_event = TRIGGER_EVENT_INSERT | TRIGGER_EVENT_ROW;
4355                 trigdata.tg_relation = rel;
4356                 trigdata.tg_trigtuple = tuple;
4357                 trigdata.tg_newtuple = NULL;
4358                 trigdata.tg_trigger = &trig;
4359                 trigdata.tg_trigtuplebuf = scan->rs_cbuf;
4360                 trigdata.tg_newtuplebuf = InvalidBuffer;
4361
4362                 fcinfo.context = (Node *) &trigdata;
4363
4364                 RI_FKey_check_ins(&fcinfo);
4365         }
4366
4367         heap_endscan(scan);
4368
4369         pfree(trig.tgargs);
4370 }
4371
4372 static void
4373 CreateFKCheckTrigger(RangeVar *myRel, FkConstraint *fkconstraint,
4374                                          ObjectAddress *constrobj, ObjectAddress *trigobj,
4375                                          bool on_insert)
4376 {
4377         CreateTrigStmt *fk_trigger;
4378         ListCell   *fk_attr;
4379         ListCell   *pk_attr;
4380
4381         fk_trigger = makeNode(CreateTrigStmt);
4382         fk_trigger->trigname = fkconstraint->constr_name;
4383         fk_trigger->relation = myRel;
4384         fk_trigger->before = false;
4385         fk_trigger->row = true;
4386
4387         /* Either ON INSERT or ON UPDATE */
4388         if (on_insert)
4389         {
4390                 fk_trigger->funcname = SystemFuncName("RI_FKey_check_ins");
4391                 fk_trigger->actions[0] = 'i';
4392         }
4393         else
4394         {
4395                 fk_trigger->funcname = SystemFuncName("RI_FKey_check_upd");
4396                 fk_trigger->actions[0] = 'u';
4397         }
4398         fk_trigger->actions[1] = '\0';
4399
4400         fk_trigger->isconstraint = true;
4401         fk_trigger->deferrable = fkconstraint->deferrable;
4402         fk_trigger->initdeferred = fkconstraint->initdeferred;
4403         fk_trigger->constrrel = fkconstraint->pktable;
4404
4405         fk_trigger->args = NIL;
4406         fk_trigger->args = lappend(fk_trigger->args,
4407                                                            makeString(fkconstraint->constr_name));
4408         fk_trigger->args = lappend(fk_trigger->args,
4409                                                            makeString(myRel->relname));
4410         fk_trigger->args = lappend(fk_trigger->args,
4411                                                            makeString(fkconstraint->pktable->relname));
4412         fk_trigger->args = lappend(fk_trigger->args,
4413                                 makeString(fkMatchTypeToString(fkconstraint->fk_matchtype)));
4414         if (list_length(fkconstraint->fk_attrs) != list_length(fkconstraint->pk_attrs))
4415                 ereport(ERROR,
4416                                 (errcode(ERRCODE_INVALID_FOREIGN_KEY),
4417                                  errmsg("number of referencing and referenced columns for foreign key disagree")));
4418
4419         forboth(fk_attr, fkconstraint->fk_attrs,
4420                         pk_attr, fkconstraint->pk_attrs)
4421         {
4422                 fk_trigger->args = lappend(fk_trigger->args, lfirst(fk_attr));
4423                 fk_trigger->args = lappend(fk_trigger->args, lfirst(pk_attr));
4424         }
4425
4426         trigobj->objectId = CreateTrigger(fk_trigger, true);
4427
4428         /* Register dependency from trigger to constraint */
4429         recordDependencyOn(trigobj, constrobj, DEPENDENCY_INTERNAL);
4430
4431         /* Make changes-so-far visible */
4432         CommandCounterIncrement();
4433 }
4434
4435 /*
4436  * Create the triggers that implement an FK constraint.
4437  */
4438 static void
4439 createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint,
4440                                                  Oid constrOid)
4441 {
4442         RangeVar   *myRel;
4443         CreateTrigStmt *fk_trigger;
4444         ListCell   *fk_attr;
4445         ListCell   *pk_attr;
4446         ObjectAddress trigobj,
4447                                 constrobj;
4448
4449         /*
4450          * Reconstruct a RangeVar for my relation (not passed in, unfortunately).
4451          */
4452         myRel = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)),
4453                                                  pstrdup(RelationGetRelationName(rel)));
4454
4455         /*
4456          * Preset objectAddress fields
4457          */
4458         constrobj.classId = ConstraintRelationId;
4459         constrobj.objectId = constrOid;
4460         constrobj.objectSubId = 0;
4461         trigobj.classId = TriggerRelationId;
4462         trigobj.objectSubId = 0;
4463
4464         /* Make changes-so-far visible */
4465         CommandCounterIncrement();
4466
4467         /*
4468          * Build and execute a CREATE CONSTRAINT TRIGGER statement for the CHECK
4469          * action for both INSERTs and UPDATEs on the referencing table.
4470          */
4471         CreateFKCheckTrigger(myRel, fkconstraint, &constrobj, &trigobj, true);
4472         CreateFKCheckTrigger(myRel, fkconstraint, &constrobj, &trigobj, false);
4473
4474         /*
4475          * Build and execute a CREATE CONSTRAINT TRIGGER statement for the ON
4476          * DELETE action on the referenced table.
4477          */
4478         fk_trigger = makeNode(CreateTrigStmt);
4479         fk_trigger->trigname = fkconstraint->constr_name;
4480         fk_trigger->relation = fkconstraint->pktable;
4481         fk_trigger->before = false;
4482         fk_trigger->row = true;
4483         fk_trigger->actions[0] = 'd';
4484         fk_trigger->actions[1] = '\0';
4485
4486         fk_trigger->isconstraint = true;
4487         fk_trigger->constrrel = myRel;
4488         switch (fkconstraint->fk_del_action)
4489         {
4490                 case FKCONSTR_ACTION_NOACTION:
4491                         fk_trigger->deferrable = fkconstraint->deferrable;
4492                         fk_trigger->initdeferred = fkconstraint->initdeferred;
4493                         fk_trigger->funcname = SystemFuncName("RI_FKey_noaction_del");
4494                         break;
4495                 case FKCONSTR_ACTION_RESTRICT:
4496                         fk_trigger->deferrable = false;
4497                         fk_trigger->initdeferred = false;
4498                         fk_trigger->funcname = SystemFuncName("RI_FKey_restrict_del");
4499                         break;
4500                 case FKCONSTR_ACTION_CASCADE:
4501                         fk_trigger->deferrable = false;
4502                         fk_trigger->initdeferred = false;
4503                         fk_trigger->funcname = SystemFuncName("RI_FKey_cascade_del");
4504                         break;
4505                 case FKCONSTR_ACTION_SETNULL:
4506                         fk_trigger->deferrable = false;
4507                         fk_trigger->initdeferred = false;
4508                         fk_trigger->funcname = SystemFuncName("RI_FKey_setnull_del");
4509                         break;
4510                 case FKCONSTR_ACTION_SETDEFAULT:
4511                         fk_trigger->deferrable = false;
4512                         fk_trigger->initdeferred = false;
4513                         fk_trigger->funcname = SystemFuncName("RI_FKey_setdefault_del");
4514                         break;
4515                 default:
4516                         elog(ERROR, "unrecognized FK action type: %d",
4517                                  (int) fkconstraint->fk_del_action);
4518                         break;
4519         }
4520
4521         fk_trigger->args = NIL;
4522         fk_trigger->args = lappend(fk_trigger->args,
4523                                                            makeString(fkconstraint->constr_name));
4524         fk_trigger->args = lappend(fk_trigger->args,
4525                                                            makeString(myRel->relname));
4526         fk_trigger->args = lappend(fk_trigger->args,
4527                                                            makeString(fkconstraint->pktable->relname));
4528         fk_trigger->args = lappend(fk_trigger->args,
4529                                 makeString(fkMatchTypeToString(fkconstraint->fk_matchtype)));
4530         forboth(fk_attr, fkconstraint->fk_attrs,
4531                         pk_attr, fkconstraint->pk_attrs)
4532         {
4533                 fk_trigger->args = lappend(fk_trigger->args, lfirst(fk_attr));
4534                 fk_trigger->args = lappend(fk_trigger->args, lfirst(pk_attr));
4535         }
4536
4537         trigobj.objectId = CreateTrigger(fk_trigger, true);
4538
4539         /* Register dependency from trigger to constraint */
4540         recordDependencyOn(&trigobj, &constrobj, DEPENDENCY_INTERNAL);
4541
4542         /* Make changes-so-far visible */
4543         CommandCounterIncrement();
4544
4545         /*
4546          * Build and execute a CREATE CONSTRAINT TRIGGER statement for the ON
4547          * UPDATE action on the referenced table.
4548          */
4549         fk_trigger = makeNode(CreateTrigStmt);
4550         fk_trigger->trigname = fkconstraint->constr_name;
4551         fk_trigger->relation = fkconstraint->pktable;
4552         fk_trigger->before = false;
4553         fk_trigger->row = true;
4554         fk_trigger->actions[0] = 'u';
4555         fk_trigger->actions[1] = '\0';
4556         fk_trigger->isconstraint = true;
4557         fk_trigger->constrrel = myRel;
4558         switch (fkconstraint->fk_upd_action)
4559         {
4560                 case FKCONSTR_ACTION_NOACTION:
4561                         fk_trigger->deferrable = fkconstraint->deferrable;
4562                         fk_trigger->initdeferred = fkconstraint->initdeferred;
4563                         fk_trigger->funcname = SystemFuncName("RI_FKey_noaction_upd");
4564                         break;
4565                 case FKCONSTR_ACTION_RESTRICT:
4566                         fk_trigger->deferrable = false;
4567                         fk_trigger->initdeferred = false;
4568                         fk_trigger->funcname = SystemFuncName("RI_FKey_restrict_upd");
4569                         break;
4570                 case FKCONSTR_ACTION_CASCADE:
4571                         fk_trigger->deferrable = false;
4572                         fk_trigger->initdeferred = false;
4573                         fk_trigger->funcname = SystemFuncName("RI_FKey_cascade_upd");
4574                         break;
4575                 case FKCONSTR_ACTION_SETNULL:
4576                         fk_trigger->deferrable = false;
4577                         fk_trigger->initdeferred = false;
4578                         fk_trigger->funcname = SystemFuncName("RI_FKey_setnull_upd");
4579                         break;
4580                 case FKCONSTR_ACTION_SETDEFAULT:
4581                         fk_trigger->deferrable = false;
4582                         fk_trigger->initdeferred = false;
4583                         fk_trigger->funcname = SystemFuncName("RI_FKey_setdefault_upd");
4584                         break;
4585                 default:
4586                         elog(ERROR, "unrecognized FK action type: %d",
4587                                  (int) fkconstraint->fk_upd_action);
4588                         break;
4589         }
4590
4591         fk_trigger->args = NIL;
4592         fk_trigger->args = lappend(fk_trigger->args,
4593                                                            makeString(fkconstraint->constr_name));
4594         fk_trigger->args = lappend(fk_trigger->args,
4595                                                            makeString(myRel->relname));
4596         fk_trigger->args = lappend(fk_trigger->args,
4597                                                            makeString(fkconstraint->pktable->relname));
4598         fk_trigger->args = lappend(fk_trigger->args,
4599                                 makeString(fkMatchTypeToString(fkconstraint->fk_matchtype)));
4600         forboth(fk_attr, fkconstraint->fk_attrs,
4601                         pk_attr, fkconstraint->pk_attrs)
4602         {
4603                 fk_trigger->args = lappend(fk_trigger->args, lfirst(fk_attr));
4604                 fk_trigger->args = lappend(fk_trigger->args, lfirst(pk_attr));
4605         }
4606
4607         trigobj.objectId = CreateTrigger(fk_trigger, true);
4608
4609         /* Register dependency from trigger to constraint */
4610         recordDependencyOn(&trigobj, &constrobj, DEPENDENCY_INTERNAL);
4611 }
4612
4613 /*
4614  * fkMatchTypeToString -
4615  *        convert FKCONSTR_MATCH_xxx code to string to use in trigger args
4616  */
4617 static char *
4618 fkMatchTypeToString(char match_type)
4619 {
4620         switch (match_type)
4621         {
4622                 case FKCONSTR_MATCH_FULL:
4623                         return pstrdup("FULL");
4624                 case FKCONSTR_MATCH_PARTIAL:
4625                         return pstrdup("PARTIAL");
4626                 case FKCONSTR_MATCH_UNSPECIFIED:
4627                         return pstrdup("UNSPECIFIED");
4628                 default:
4629                         elog(ERROR, "unrecognized match type: %d",
4630                                  (int) match_type);
4631         }
4632         return NULL;                            /* can't get here */
4633 }
4634
4635 /*
4636  * ALTER TABLE DROP CONSTRAINT
4637  */
4638 static void
4639 ATPrepDropConstraint(List **wqueue, Relation rel,
4640                                          bool recurse, AlterTableCmd *cmd)
4641 {
4642         /*
4643          * We don't want errors or noise from child tables, so we have to pass
4644          * down a modified command.
4645          */
4646         if (recurse)
4647         {
4648                 AlterTableCmd *childCmd = copyObject(cmd);
4649
4650                 childCmd->subtype = AT_DropConstraintQuietly;
4651                 ATSimpleRecursion(wqueue, rel, childCmd, recurse);
4652         }
4653 }
4654
4655 static void
4656 ATExecDropConstraint(Relation rel, const char *constrName,
4657                                          DropBehavior behavior, bool quiet)
4658 {
4659         int                     deleted;
4660
4661         deleted = RemoveRelConstraints(rel, constrName, behavior);
4662
4663         if (!quiet)
4664         {
4665                 /* If zero constraints deleted, complain */
4666                 if (deleted == 0)
4667                         ereport(ERROR,
4668                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
4669                                          errmsg("constraint \"%s\" does not exist",
4670                                                         constrName)));
4671                 /* Otherwise if more than one constraint deleted, notify */
4672                 else if (deleted > 1)
4673                         ereport(NOTICE,
4674                                         (errmsg("multiple constraints named \"%s\" were dropped",
4675                                                         constrName)));
4676         }
4677 }
4678
4679 /*
4680  * ALTER COLUMN TYPE
4681  */
4682 static void
4683 ATPrepAlterColumnType(List **wqueue,
4684                                           AlteredTableInfo *tab, Relation rel,
4685                                           bool recurse, bool recursing,
4686                                           AlterTableCmd *cmd)
4687 {
4688         char       *colName = cmd->name;
4689         TypeName   *typename = (TypeName *) cmd->def;
4690         HeapTuple       tuple;
4691         Form_pg_attribute attTup;
4692         AttrNumber      attnum;
4693         Oid                     targettype;
4694         Node       *transform;
4695         NewColumnValue *newval;
4696         ParseState *pstate = make_parsestate(NULL);
4697
4698         /* lookup the attribute so we can check inheritance status */
4699         tuple = SearchSysCacheAttName(RelationGetRelid(rel), colName);
4700         if (!HeapTupleIsValid(tuple))
4701                 ereport(ERROR,
4702                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
4703                                  errmsg("column \"%s\" of relation \"%s\" does not exist",
4704                                                 colName, RelationGetRelationName(rel))));
4705         attTup = (Form_pg_attribute) GETSTRUCT(tuple);
4706         attnum = attTup->attnum;
4707
4708         /* Can't alter a system attribute */
4709         if (attnum <= 0)
4710                 ereport(ERROR,
4711                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
4712                                  errmsg("cannot alter system column \"%s\"",
4713                                                 colName)));
4714
4715         /* Don't alter inherited columns */
4716         if (attTup->attinhcount > 0 && !recursing)
4717                 ereport(ERROR,
4718                                 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
4719                                  errmsg("cannot alter inherited column \"%s\"",
4720                                                 colName)));
4721
4722         /* Look up the target type */
4723         targettype = LookupTypeName(typename);
4724         if (!OidIsValid(targettype))
4725                 ereport(ERROR,
4726                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
4727                                  errmsg("type \"%s\" does not exist",
4728                                                 TypeNameToString(typename))));
4729
4730         /* make sure datatype is legal for a column */
4731         CheckAttributeType(colName, targettype);
4732
4733         /*
4734          * Set up an expression to transform the old data value to the new type.
4735          * If a USING option was given, transform and use that expression, else
4736          * just take the old value and try to coerce it.  We do this first so that
4737          * type incompatibility can be detected before we waste effort, and
4738          * because we need the expression to be parsed against the original table
4739          * rowtype.
4740          */
4741         if (cmd->transform)
4742         {
4743                 RangeTblEntry *rte;
4744
4745                 /* Expression must be able to access vars of old table */
4746                 rte = addRangeTableEntryForRelation(pstate,
4747                                                                                         rel,
4748                                                                                         NULL,
4749                                                                                         false,
4750                                                                                         true);
4751                 addRTEtoQuery(pstate, rte, false, true, true);
4752
4753                 transform = transformExpr(pstate, cmd->transform);
4754
4755                 /* It can't return a set */
4756                 if (expression_returns_set(transform))
4757                         ereport(ERROR,
4758                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
4759                                          errmsg("transform expression must not return a set")));
4760
4761                 /* No subplans or aggregates, either... */
4762                 if (pstate->p_hasSubLinks)
4763                         ereport(ERROR,
4764                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
4765                                          errmsg("cannot use subquery in transform expression")));
4766                 if (pstate->p_hasAggs)
4767                         ereport(ERROR,
4768                                         (errcode(ERRCODE_GROUPING_ERROR),
4769                         errmsg("cannot use aggregate function in transform expression")));
4770         }
4771         else
4772         {
4773                 transform = (Node *) makeVar(1, attnum,
4774                                                                          attTup->atttypid, attTup->atttypmod,
4775                                                                          0);
4776         }
4777
4778         transform = coerce_to_target_type(pstate,
4779                                                                           transform, exprType(transform),
4780                                                                           targettype, typename->typmod,
4781                                                                           COERCION_ASSIGNMENT,
4782                                                                           COERCE_IMPLICIT_CAST);
4783         if (transform == NULL)
4784                 ereport(ERROR,
4785                                 (errcode(ERRCODE_DATATYPE_MISMATCH),
4786                                  errmsg("column \"%s\" cannot be cast to type \"%s\"",
4787                                                 colName, TypeNameToString(typename))));
4788
4789         /*
4790          * Add a work queue item to make ATRewriteTable update the column
4791          * contents.
4792          */
4793         newval = (NewColumnValue *) palloc0(sizeof(NewColumnValue));
4794         newval->attnum = attnum;
4795         newval->expr = (Expr *) transform;
4796
4797         tab->newvals = lappend(tab->newvals, newval);
4798
4799         ReleaseSysCache(tuple);
4800
4801         /*
4802          * The recursion case is handled by ATSimpleRecursion.  However, if we are
4803          * told not to recurse, there had better not be any child tables; else the
4804          * alter would put them out of step.
4805          */
4806         if (recurse)
4807                 ATSimpleRecursion(wqueue, rel, cmd, recurse);
4808         else if (!recursing &&
4809                          find_inheritance_children(RelationGetRelid(rel)) != NIL)
4810                 ereport(ERROR,
4811                                 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
4812                                  errmsg("type of inherited column \"%s\" must be changed in child tables too",
4813                                                 colName)));
4814 }
4815
4816 static void
4817 ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
4818                                           const char *colName, TypeName *typename)
4819 {
4820         HeapTuple       heapTup;
4821         Form_pg_attribute attTup;
4822         AttrNumber      attnum;
4823         HeapTuple       typeTuple;
4824         Form_pg_type tform;
4825         Oid                     targettype;
4826         Node       *defaultexpr;
4827         Relation        attrelation;
4828         Relation        depRel;
4829         ScanKeyData key[3];
4830         SysScanDesc scan;
4831         HeapTuple       depTup;
4832
4833         attrelation = heap_open(AttributeRelationId, RowExclusiveLock);
4834
4835         /* Look up the target column */
4836         heapTup = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
4837         if (!HeapTupleIsValid(heapTup))         /* shouldn't happen */
4838                 ereport(ERROR,
4839                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
4840                                  errmsg("column \"%s\" of relation \"%s\" does not exist",
4841                                                 colName, RelationGetRelationName(rel))));
4842         attTup = (Form_pg_attribute) GETSTRUCT(heapTup);
4843         attnum = attTup->attnum;
4844
4845         /* Check for multiple ALTER TYPE on same column --- can't cope */
4846         if (attTup->atttypid != tab->oldDesc->attrs[attnum - 1]->atttypid ||
4847                 attTup->atttypmod != tab->oldDesc->attrs[attnum - 1]->atttypmod)
4848                 ereport(ERROR,
4849                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
4850                                  errmsg("cannot alter type of column \"%s\" twice",
4851                                                 colName)));
4852
4853         /* Look up the target type (should not fail, since prep found it) */
4854         typeTuple = typenameType(typename);
4855         tform = (Form_pg_type) GETSTRUCT(typeTuple);
4856         targettype = HeapTupleGetOid(typeTuple);
4857
4858         /*
4859          * If there is a default expression for the column, get it and ensure we
4860          * can coerce it to the new datatype.  (We must do this before changing
4861          * the column type, because build_column_default itself will try to
4862          * coerce, and will not issue the error message we want if it fails.)
4863          *
4864          * We remove any implicit coercion steps at the top level of the old
4865          * default expression; this has been agreed to satisfy the principle of
4866          * least surprise.      (The conversion to the new column type should act like
4867          * it started from what the user sees as the stored expression, and the
4868          * implicit coercions aren't going to be shown.)
4869          */
4870         if (attTup->atthasdef)
4871         {
4872                 defaultexpr = build_column_default(rel, attnum);
4873                 Assert(defaultexpr);
4874                 defaultexpr = strip_implicit_coercions(defaultexpr);
4875                 defaultexpr = coerce_to_target_type(NULL,               /* no UNKNOWN params */
4876                                                                                   defaultexpr, exprType(defaultexpr),
4877                                                                                         targettype, typename->typmod,
4878                                                                                         COERCION_ASSIGNMENT,
4879                                                                                         COERCE_IMPLICIT_CAST);
4880                 if (defaultexpr == NULL)
4881                         ereport(ERROR,
4882                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
4883                         errmsg("default for column \"%s\" cannot be cast to type \"%s\"",
4884                                    colName, TypeNameToString(typename))));
4885         }
4886         else
4887                 defaultexpr = NULL;
4888
4889         /*
4890          * Find everything that depends on the column (constraints, indexes, etc),
4891          * and record enough information to let us recreate the objects.
4892          *
4893          * The actual recreation does not happen here, but only after we have
4894          * performed all the individual ALTER TYPE operations.  We have to save
4895          * the info before executing ALTER TYPE, though, else the deparser will
4896          * get confused.
4897          *
4898          * There could be multiple entries for the same object, so we must check
4899          * to ensure we process each one only once.  Note: we assume that an index
4900          * that implements a constraint will not show a direct dependency on the
4901          * column.
4902          */
4903         depRel = heap_open(DependRelationId, RowExclusiveLock);
4904
4905         ScanKeyInit(&key[0],
4906                                 Anum_pg_depend_refclassid,
4907                                 BTEqualStrategyNumber, F_OIDEQ,
4908                                 ObjectIdGetDatum(RelationRelationId));
4909         ScanKeyInit(&key[1],
4910                                 Anum_pg_depend_refobjid,
4911                                 BTEqualStrategyNumber, F_OIDEQ,
4912                                 ObjectIdGetDatum(RelationGetRelid(rel)));
4913         ScanKeyInit(&key[2],
4914                                 Anum_pg_depend_refobjsubid,
4915                                 BTEqualStrategyNumber, F_INT4EQ,
4916                                 Int32GetDatum((int32) attnum));
4917
4918         scan = systable_beginscan(depRel, DependReferenceIndexId, true,
4919                                                           SnapshotNow, 3, key);
4920
4921         while (HeapTupleIsValid(depTup = systable_getnext(scan)))
4922         {
4923                 Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(depTup);
4924                 ObjectAddress foundObject;
4925
4926                 /* We don't expect any PIN dependencies on columns */
4927                 if (foundDep->deptype == DEPENDENCY_PIN)
4928                         elog(ERROR, "cannot alter type of a pinned column");
4929
4930                 foundObject.classId = foundDep->classid;
4931                 foundObject.objectId = foundDep->objid;
4932                 foundObject.objectSubId = foundDep->objsubid;
4933
4934                 switch (getObjectClass(&foundObject))
4935                 {
4936                         case OCLASS_CLASS:
4937                                 {
4938                                         char            relKind = get_rel_relkind(foundObject.objectId);
4939
4940                                         if (relKind == RELKIND_INDEX)
4941                                         {
4942                                                 Assert(foundObject.objectSubId == 0);
4943                                                 if (!list_member_oid(tab->changedIndexOids, foundObject.objectId))
4944                                                 {
4945                                                         tab->changedIndexOids = lappend_oid(tab->changedIndexOids,
4946                                                                                                            foundObject.objectId);
4947                                                         tab->changedIndexDefs = lappend(tab->changedIndexDefs,
4948                                                            pg_get_indexdef_string(foundObject.objectId));
4949                                                 }
4950                                         }
4951                                         else if (relKind == RELKIND_SEQUENCE)
4952                                         {
4953                                                 /*
4954                                                  * This must be a SERIAL column's sequence.  We need
4955                                                  * not do anything to it.
4956                                                  */
4957                                                 Assert(foundObject.objectSubId == 0);
4958                                         }
4959                                         else
4960                                         {
4961                                                 /* Not expecting any other direct dependencies... */
4962                                                 elog(ERROR, "unexpected object depending on column: %s",
4963                                                          getObjectDescription(&foundObject));
4964                                         }
4965                                         break;
4966                                 }
4967
4968                         case OCLASS_CONSTRAINT:
4969                                 Assert(foundObject.objectSubId == 0);
4970                                 if (!list_member_oid(tab->changedConstraintOids, foundObject.objectId))
4971                                 {
4972                                         tab->changedConstraintOids = lappend_oid(tab->changedConstraintOids,
4973                                                                                                            foundObject.objectId);
4974                                         tab->changedConstraintDefs = lappend(tab->changedConstraintDefs,
4975                                                   pg_get_constraintdef_string(foundObject.objectId));
4976                                 }
4977                                 break;
4978
4979                         case OCLASS_REWRITE:
4980                                 /* XXX someday see if we can cope with revising views */
4981                                 ereport(ERROR,
4982                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
4983                                                  errmsg("cannot alter type of a column used by a view or rule"),
4984                                                  errdetail("%s depends on column \"%s\"",
4985                                                                    getObjectDescription(&foundObject),
4986                                                                    colName)));
4987                                 break;
4988
4989                         case OCLASS_DEFAULT:
4990
4991                                 /*
4992                                  * Ignore the column's default expression, since we will fix
4993                                  * it below.
4994                                  */
4995                                 Assert(defaultexpr);
4996                                 break;
4997
4998                         case OCLASS_PROC:
4999                         case OCLASS_TYPE:
5000                         case OCLASS_CAST:
5001                         case OCLASS_CONVERSION:
5002                         case OCLASS_LANGUAGE:
5003                         case OCLASS_OPERATOR:
5004                         case OCLASS_OPCLASS:
5005                         case OCLASS_TRIGGER:
5006                         case OCLASS_SCHEMA:
5007
5008                                 /*
5009                                  * We don't expect any of these sorts of objects to depend on
5010                                  * a column.
5011                                  */
5012                                 elog(ERROR, "unexpected object depending on column: %s",
5013                                          getObjectDescription(&foundObject));
5014                                 break;
5015
5016                         default:
5017                                 elog(ERROR, "unrecognized object class: %u",
5018                                          foundObject.classId);
5019                 }
5020         }
5021
5022         systable_endscan(scan);
5023
5024         /*
5025          * Now scan for dependencies of this column on other things.  The only
5026          * thing we should find is the dependency on the column datatype, which we
5027          * want to remove.
5028          */
5029         ScanKeyInit(&key[0],
5030                                 Anum_pg_depend_classid,
5031                                 BTEqualStrategyNumber, F_OIDEQ,
5032                                 ObjectIdGetDatum(RelationRelationId));
5033         ScanKeyInit(&key[1],
5034                                 Anum_pg_depend_objid,
5035                                 BTEqualStrategyNumber, F_OIDEQ,
5036                                 ObjectIdGetDatum(RelationGetRelid(rel)));
5037         ScanKeyInit(&key[2],
5038                                 Anum_pg_depend_objsubid,
5039                                 BTEqualStrategyNumber, F_INT4EQ,
5040                                 Int32GetDatum((int32) attnum));
5041
5042         scan = systable_beginscan(depRel, DependDependerIndexId, true,
5043                                                           SnapshotNow, 3, key);
5044
5045         while (HeapTupleIsValid(depTup = systable_getnext(scan)))
5046         {
5047                 Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(depTup);
5048
5049                 if (foundDep->deptype != DEPENDENCY_NORMAL)
5050                         elog(ERROR, "found unexpected dependency type '%c'",
5051                                  foundDep->deptype);
5052                 if (foundDep->refclassid != TypeRelationId ||
5053                         foundDep->refobjid != attTup->atttypid)
5054                         elog(ERROR, "found unexpected dependency for column");
5055
5056                 simple_heap_delete(depRel, &depTup->t_self);
5057         }
5058
5059         systable_endscan(scan);
5060
5061         heap_close(depRel, RowExclusiveLock);
5062
5063         /*
5064          * Here we go --- change the recorded column type.      (Note heapTup is a
5065          * copy of the syscache entry, so okay to scribble on.)
5066          */
5067         attTup->atttypid = targettype;
5068         attTup->atttypmod = typename->typmod;
5069         attTup->attndims = list_length(typename->arrayBounds);
5070         attTup->attlen = tform->typlen;
5071         attTup->attbyval = tform->typbyval;
5072         attTup->attalign = tform->typalign;
5073         attTup->attstorage = tform->typstorage;
5074
5075         ReleaseSysCache(typeTuple);
5076
5077         simple_heap_update(attrelation, &heapTup->t_self, heapTup);
5078
5079         /* keep system catalog indexes current */
5080         CatalogUpdateIndexes(attrelation, heapTup);
5081
5082         heap_close(attrelation, RowExclusiveLock);
5083
5084         /* Install dependency on new datatype */
5085         add_column_datatype_dependency(RelationGetRelid(rel), attnum, targettype);
5086
5087         /*
5088          * Drop any pg_statistic entry for the column, since it's now wrong type
5089          */
5090         RemoveStatistics(RelationGetRelid(rel), attnum);
5091
5092         /*
5093          * Update the default, if present, by brute force --- remove and re-add
5094          * the default.  Probably unsafe to take shortcuts, since the new version
5095          * may well have additional dependencies.  (It's okay to do this now,
5096          * rather than after other ALTER TYPE commands, since the default won't
5097          * depend on other column types.)
5098          */
5099         if (defaultexpr)
5100         {
5101                 /* Must make new row visible since it will be updated again */
5102                 CommandCounterIncrement();
5103
5104                 /*
5105                  * We use RESTRICT here for safety, but at present we do not expect
5106                  * anything to depend on the default.
5107                  */
5108                 RemoveAttrDefault(RelationGetRelid(rel), attnum, DROP_RESTRICT, true);
5109
5110                 StoreAttrDefault(rel, attnum, nodeToString(defaultexpr));
5111         }
5112
5113         /* Cleanup */
5114         heap_freetuple(heapTup);
5115 }
5116
5117 /*
5118  * Cleanup after we've finished all the ALTER TYPE operations for a
5119  * particular relation.  We have to drop and recreate all the indexes
5120  * and constraints that depend on the altered columns.
5121  */
5122 static void
5123 ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab)
5124 {
5125         ObjectAddress obj;
5126         ListCell   *l;
5127
5128         /*
5129          * Re-parse the index and constraint definitions, and attach them to the
5130          * appropriate work queue entries.      We do this before dropping because in
5131          * the case of a FOREIGN KEY constraint, we might not yet have exclusive
5132          * lock on the table the constraint is attached to, and we need to get
5133          * that before dropping.  It's safe because the parser won't actually look
5134          * at the catalogs to detect the existing entry.
5135          */
5136         foreach(l, tab->changedIndexDefs)
5137                 ATPostAlterTypeParse((char *) lfirst(l), wqueue);
5138         foreach(l, tab->changedConstraintDefs)
5139                 ATPostAlterTypeParse((char *) lfirst(l), wqueue);
5140
5141         /*
5142          * Now we can drop the existing constraints and indexes --- constraints
5143          * first, since some of them might depend on the indexes. It should be
5144          * okay to use DROP_RESTRICT here, since nothing else should be depending
5145          * on these objects.
5146          */
5147         foreach(l, tab->changedConstraintOids)
5148         {
5149                 obj.classId = ConstraintRelationId;
5150                 obj.objectId = lfirst_oid(l);
5151                 obj.objectSubId = 0;
5152                 performDeletion(&obj, DROP_RESTRICT);
5153         }
5154
5155         foreach(l, tab->changedIndexOids)
5156         {
5157                 obj.classId = RelationRelationId;
5158                 obj.objectId = lfirst_oid(l);
5159                 obj.objectSubId = 0;
5160                 performDeletion(&obj, DROP_RESTRICT);
5161         }
5162
5163         /*
5164          * The objects will get recreated during subsequent passes over the work
5165          * queue.
5166          */
5167 }
5168
5169 static void
5170 ATPostAlterTypeParse(char *cmd, List **wqueue)
5171 {
5172         List       *raw_parsetree_list;
5173         List       *querytree_list;
5174         ListCell   *list_item;
5175
5176         /*
5177          * We expect that we only have to do raw parsing and parse analysis, not
5178          * any rule rewriting, since these will all be utility statements.
5179          */
5180         raw_parsetree_list = raw_parser(cmd);
5181         querytree_list = NIL;
5182         foreach(list_item, raw_parsetree_list)
5183         {
5184                 Node       *parsetree = (Node *) lfirst(list_item);
5185
5186                 querytree_list = list_concat(querytree_list,
5187                                                                          parse_analyze(parsetree, NULL, 0));
5188         }
5189
5190         /*
5191          * Attach each generated command to the proper place in the work queue.
5192          * Note this could result in creation of entirely new work-queue entries.
5193          */
5194         foreach(list_item, querytree_list)
5195         {
5196                 Query      *query = (Query *) lfirst(list_item);
5197                 Relation        rel;
5198                 AlteredTableInfo *tab;
5199
5200                 Assert(IsA(query, Query));
5201                 Assert(query->commandType == CMD_UTILITY);
5202                 switch (nodeTag(query->utilityStmt))
5203                 {
5204                         case T_IndexStmt:
5205                                 {
5206                                         IndexStmt  *stmt = (IndexStmt *) query->utilityStmt;
5207                                         AlterTableCmd *newcmd;
5208
5209                                         rel = relation_openrv(stmt->relation, AccessExclusiveLock);
5210                                         tab = ATGetQueueEntry(wqueue, rel);
5211                                         newcmd = makeNode(AlterTableCmd);
5212                                         newcmd->subtype = AT_ReAddIndex;
5213                                         newcmd->def = (Node *) stmt;
5214                                         tab->subcmds[AT_PASS_OLD_INDEX] =
5215                                                 lappend(tab->subcmds[AT_PASS_OLD_INDEX], newcmd);
5216                                         relation_close(rel, NoLock);
5217                                         break;
5218                                 }
5219                         case T_AlterTableStmt:
5220                                 {
5221                                         AlterTableStmt *stmt = (AlterTableStmt *) query->utilityStmt;
5222                                         ListCell   *lcmd;
5223
5224                                         rel = relation_openrv(stmt->relation, AccessExclusiveLock);
5225                                         tab = ATGetQueueEntry(wqueue, rel);
5226                                         foreach(lcmd, stmt->cmds)
5227                                         {
5228                                                 AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lcmd);
5229
5230                                                 switch (cmd->subtype)
5231                                                 {
5232                                                         case AT_AddIndex:
5233                                                                 cmd->subtype = AT_ReAddIndex;
5234                                                                 tab->subcmds[AT_PASS_OLD_INDEX] =
5235                                                                         lappend(tab->subcmds[AT_PASS_OLD_INDEX], cmd);
5236                                                                 break;
5237                                                         case AT_AddConstraint:
5238                                                                 tab->subcmds[AT_PASS_OLD_CONSTR] =
5239                                                                         lappend(tab->subcmds[AT_PASS_OLD_CONSTR], cmd);
5240                                                                 break;
5241                                                         default:
5242                                                                 elog(ERROR, "unexpected statement type: %d",
5243                                                                          (int) cmd->subtype);
5244                                                 }
5245                                         }
5246                                         relation_close(rel, NoLock);
5247                                         break;
5248                                 }
5249                         default:
5250                                 elog(ERROR, "unexpected statement type: %d",
5251                                          (int) nodeTag(query->utilityStmt));
5252                 }
5253         }
5254 }
5255
5256
5257 /*
5258  * ALTER TABLE OWNER
5259  *
5260  * recursing is true if we are recursing from a table to its indexes or
5261  * toast table.  We don't allow the ownership of those things to be
5262  * changed separately from the parent table.  Also, we can skip permission
5263  * checks (this is necessary not just an optimization, else we'd fail to
5264  * handle toast tables properly).
5265  */
5266 void
5267 ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing)
5268 {
5269         Relation        target_rel;
5270         Relation        class_rel;
5271         HeapTuple       tuple;
5272         Form_pg_class tuple_class;
5273
5274         /*
5275          * Get exclusive lock till end of transaction on the target table. Use
5276          * relation_open so that we can work on indexes and sequences.
5277          */
5278         target_rel = relation_open(relationOid, AccessExclusiveLock);
5279
5280         /* Get its pg_class tuple, too */
5281         class_rel = heap_open(RelationRelationId, RowExclusiveLock);
5282
5283         tuple = SearchSysCache(RELOID,
5284                                                    ObjectIdGetDatum(relationOid),
5285                                                    0, 0, 0);
5286         if (!HeapTupleIsValid(tuple))
5287                 elog(ERROR, "cache lookup failed for relation %u", relationOid);
5288         tuple_class = (Form_pg_class) GETSTRUCT(tuple);
5289
5290         /* Can we change the ownership of this tuple? */
5291         switch (tuple_class->relkind)
5292         {
5293                 case RELKIND_RELATION:
5294                 case RELKIND_VIEW:
5295                 case RELKIND_SEQUENCE:
5296                         /* ok to change owner */
5297                         break;
5298                 case RELKIND_INDEX:
5299                         if (!recursing)
5300                         {
5301                                 /*
5302                                  * Because ALTER INDEX OWNER used to be allowed, and in fact
5303                                  * is generated by old versions of pg_dump, we give a warning
5304                                  * and do nothing rather than erroring out.  Also, to avoid
5305                                  * unnecessary chatter while restoring those old dumps, say
5306                                  * nothing at all if the command would be a no-op anyway.
5307                                  */
5308                                 if (tuple_class->relowner != newOwnerId)
5309                                         ereport(WARNING,
5310                                                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
5311                                                          errmsg("cannot change owner of index \"%s\"",
5312                                                                         NameStr(tuple_class->relname)),
5313                                                          errhint("Change the ownership of the index's table, instead.")));
5314                                 /* quick hack to exit via the no-op path */
5315                                 newOwnerId = tuple_class->relowner;
5316                         }
5317                         break;
5318                 case RELKIND_TOASTVALUE:
5319                         if (recursing)
5320                                 break;
5321                         /* FALL THRU */
5322                 default:
5323                         ereport(ERROR,
5324                                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
5325                                          errmsg("\"%s\" is not a table, view, or sequence",
5326                                                         NameStr(tuple_class->relname))));
5327         }
5328
5329         /*
5330          * If the new owner is the same as the existing owner, consider the
5331          * command to have succeeded.  This is for dump restoration purposes.
5332          */
5333         if (tuple_class->relowner != newOwnerId)
5334         {
5335                 Datum           repl_val[Natts_pg_class];
5336                 char            repl_null[Natts_pg_class];
5337                 char            repl_repl[Natts_pg_class];
5338                 Acl                *newAcl;
5339                 Datum           aclDatum;
5340                 bool            isNull;
5341                 HeapTuple       newtuple;
5342
5343                 /* skip permission checks when recursing to index or toast table */
5344                 if (!recursing)
5345                 {
5346                         /* Superusers can always do it */
5347                         if (!superuser())
5348                         {
5349                                 Oid                     namespaceOid = tuple_class->relnamespace;
5350                                 AclResult       aclresult;
5351
5352                                 /* Otherwise, must be owner of the existing object */
5353                                 if (!pg_class_ownercheck(relationOid, GetUserId()))
5354                                         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
5355                                                                    RelationGetRelationName(target_rel));
5356
5357                                 /* Must be able to become new owner */
5358                                 check_is_member_of_role(GetUserId(), newOwnerId);
5359
5360                                 /* New owner must have CREATE privilege on namespace */
5361                                 aclresult = pg_namespace_aclcheck(namespaceOid, newOwnerId,
5362                                                                                                   ACL_CREATE);
5363                                 if (aclresult != ACLCHECK_OK)
5364                                         aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
5365                                                                    get_namespace_name(namespaceOid));
5366                         }
5367                 }
5368
5369                 memset(repl_null, ' ', sizeof(repl_null));
5370                 memset(repl_repl, ' ', sizeof(repl_repl));
5371
5372                 repl_repl[Anum_pg_class_relowner - 1] = 'r';
5373                 repl_val[Anum_pg_class_relowner - 1] = ObjectIdGetDatum(newOwnerId);
5374
5375                 /*
5376                  * Determine the modified ACL for the new owner.  This is only
5377                  * necessary when the ACL is non-null.
5378                  */
5379                 aclDatum = SysCacheGetAttr(RELOID, tuple,
5380                                                                    Anum_pg_class_relacl,
5381                                                                    &isNull);
5382                 if (!isNull)
5383                 {
5384                         newAcl = aclnewowner(DatumGetAclP(aclDatum),
5385                                                                  tuple_class->relowner, newOwnerId);
5386                         repl_repl[Anum_pg_class_relacl - 1] = 'r';
5387                         repl_val[Anum_pg_class_relacl - 1] = PointerGetDatum(newAcl);
5388                 }
5389
5390                 newtuple = heap_modifytuple(tuple, RelationGetDescr(class_rel), repl_val, repl_null, repl_repl);
5391
5392                 simple_heap_update(class_rel, &newtuple->t_self, newtuple);
5393                 CatalogUpdateIndexes(class_rel, newtuple);
5394
5395                 heap_freetuple(newtuple);
5396
5397                 /* Update owner dependency reference */
5398                 changeDependencyOnOwner(RelationRelationId, relationOid, newOwnerId);
5399
5400                 /*
5401                  * Also change the ownership of the table's rowtype, if it has one
5402                  */
5403                 if (tuple_class->relkind != RELKIND_INDEX)
5404                         AlterTypeOwnerInternal(tuple_class->reltype, newOwnerId);
5405
5406                 /*
5407                  * If we are operating on a table, also change the ownership of any
5408                  * indexes and sequences that belong to the table, as well as the
5409                  * table's toast table (if it has one)
5410                  */
5411                 if (tuple_class->relkind == RELKIND_RELATION ||
5412                         tuple_class->relkind == RELKIND_TOASTVALUE)
5413                 {
5414                         List       *index_oid_list;
5415                         ListCell   *i;
5416
5417                         /* Find all the indexes belonging to this relation */
5418                         index_oid_list = RelationGetIndexList(target_rel);
5419
5420                         /* For each index, recursively change its ownership */
5421                         foreach(i, index_oid_list)
5422                                 ATExecChangeOwner(lfirst_oid(i), newOwnerId, true);
5423
5424                         list_free(index_oid_list);
5425                 }
5426
5427                 if (tuple_class->relkind == RELKIND_RELATION)
5428                 {
5429                         /* If it has a toast table, recurse to change its ownership */
5430                         if (tuple_class->reltoastrelid != InvalidOid)
5431                                 ATExecChangeOwner(tuple_class->reltoastrelid, newOwnerId,
5432                                                                   true);
5433
5434                         /* If it has dependent sequences, recurse to change them too */
5435                         change_owner_recurse_to_sequences(relationOid, newOwnerId);
5436                 }
5437         }
5438
5439         ReleaseSysCache(tuple);
5440         heap_close(class_rel, RowExclusiveLock);
5441         relation_close(target_rel, NoLock);
5442 }
5443
5444 /*
5445  * change_owner_recurse_to_sequences
5446  *
5447  * Helper function for ATExecChangeOwner.  Examines pg_depend searching
5448  * for sequences that are dependent on serial columns, and changes their
5449  * ownership.
5450  */
5451 static void
5452 change_owner_recurse_to_sequences(Oid relationOid, Oid newOwnerId)
5453 {
5454         Relation        depRel;
5455         SysScanDesc scan;
5456         ScanKeyData key[2];
5457         HeapTuple       tup;
5458
5459         /*
5460          * SERIAL sequences are those having an internal dependency on one of the
5461          * table's columns (we don't care *which* column, exactly).
5462          */
5463         depRel = heap_open(DependRelationId, AccessShareLock);
5464
5465         ScanKeyInit(&key[0],
5466                                 Anum_pg_depend_refclassid,
5467                                 BTEqualStrategyNumber, F_OIDEQ,
5468                                 ObjectIdGetDatum(RelationRelationId));
5469         ScanKeyInit(&key[1],
5470                                 Anum_pg_depend_refobjid,
5471                                 BTEqualStrategyNumber, F_OIDEQ,
5472                                 ObjectIdGetDatum(relationOid));
5473         /* we leave refobjsubid unspecified */
5474
5475         scan = systable_beginscan(depRel, DependReferenceIndexId, true,
5476                                                           SnapshotNow, 2, key);
5477
5478         while (HeapTupleIsValid(tup = systable_getnext(scan)))
5479         {
5480                 Form_pg_depend depForm = (Form_pg_depend) GETSTRUCT(tup);
5481                 Relation        seqRel;
5482
5483                 /* skip dependencies other than internal dependencies on columns */
5484                 if (depForm->refobjsubid == 0 ||
5485                         depForm->classid != RelationRelationId ||
5486                         depForm->objsubid != 0 ||
5487                         depForm->deptype != DEPENDENCY_INTERNAL)
5488                         continue;
5489
5490                 /* Use relation_open just in case it's an index */
5491                 seqRel = relation_open(depForm->objid, AccessExclusiveLock);
5492
5493                 /* skip non-sequence relations */
5494                 if (RelationGetForm(seqRel)->relkind != RELKIND_SEQUENCE)
5495                 {
5496                         /* No need to keep the lock */
5497                         relation_close(seqRel, AccessExclusiveLock);
5498                         continue;
5499                 }
5500
5501                 /* We don't need to close the sequence while we alter it. */
5502                 ATExecChangeOwner(depForm->objid, newOwnerId, false);
5503
5504                 /* Now we can close it.  Keep the lock till end of transaction. */
5505                 relation_close(seqRel, NoLock);
5506         }
5507
5508         systable_endscan(scan);
5509
5510         relation_close(depRel, AccessShareLock);
5511 }
5512
5513 /*
5514  * ALTER TABLE CLUSTER ON
5515  *
5516  * The only thing we have to do is to change the indisclustered bits.
5517  */
5518 static void
5519 ATExecClusterOn(Relation rel, const char *indexName)
5520 {
5521         Oid                     indexOid;
5522
5523         indexOid = get_relname_relid(indexName, rel->rd_rel->relnamespace);
5524
5525         if (!OidIsValid(indexOid))
5526                 ereport(ERROR,
5527                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
5528                                  errmsg("index \"%s\" for table \"%s\" does not exist",
5529                                                 indexName, RelationGetRelationName(rel))));
5530
5531         /* Check index is valid to cluster on */
5532         check_index_is_clusterable(rel, indexOid, false);
5533
5534         /* And do the work */
5535         mark_index_clustered(rel, indexOid);
5536 }
5537
5538 /*
5539  * ALTER TABLE SET WITHOUT CLUSTER
5540  *
5541  * We have to find any indexes on the table that have indisclustered bit
5542  * set and turn it off.
5543  */
5544 static void
5545 ATExecDropCluster(Relation rel)
5546 {
5547         mark_index_clustered(rel, InvalidOid);
5548 }
5549
5550 /*
5551  * ALTER TABLE SET TABLESPACE
5552  */
5553 static void
5554 ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel, char *tablespacename)
5555 {
5556         Oid                     tablespaceId;
5557         AclResult       aclresult;
5558
5559         /*
5560          * We do our own permission checking because we want to allow this on
5561          * indexes.
5562          */
5563         if (rel->rd_rel->relkind != RELKIND_RELATION &&
5564                 rel->rd_rel->relkind != RELKIND_INDEX)
5565                 ereport(ERROR,
5566                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
5567                                  errmsg("\"%s\" is not a table or index",
5568                                                 RelationGetRelationName(rel))));
5569
5570         /* Permissions checks */
5571         if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
5572                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
5573                                            RelationGetRelationName(rel));
5574
5575         if (!allowSystemTableMods && IsSystemRelation(rel))
5576                 ereport(ERROR,
5577                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
5578                                  errmsg("permission denied: \"%s\" is a system catalog",
5579                                                 RelationGetRelationName(rel))));
5580
5581         /* Check that the tablespace exists */
5582         tablespaceId = get_tablespace_oid(tablespacename);
5583         if (!OidIsValid(tablespaceId))
5584                 ereport(ERROR,
5585                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
5586                                  errmsg("tablespace \"%s\" does not exist", tablespacename)));
5587
5588         /* Check its permissions */
5589         aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(), ACL_CREATE);
5590         if (aclresult != ACLCHECK_OK)
5591                 aclcheck_error(aclresult, ACL_KIND_TABLESPACE, tablespacename);
5592
5593         /* Save info for Phase 3 to do the real work */
5594         if (OidIsValid(tab->newTableSpace))
5595                 ereport(ERROR,
5596                                 (errcode(ERRCODE_SYNTAX_ERROR),
5597                                  errmsg("cannot have multiple SET TABLESPACE subcommands")));
5598         tab->newTableSpace = tablespaceId;
5599 }
5600
5601 /*
5602  * Execute ALTER TABLE SET TABLESPACE for cases where there is no tuple
5603  * rewriting to be done, so we just want to copy the data as fast as possible.
5604  */
5605 static void
5606 ATExecSetTableSpace(Oid tableOid, Oid newTableSpace)
5607 {
5608         Relation        rel;
5609         Oid                     oldTableSpace;
5610         Oid                     reltoastrelid;
5611         Oid                     reltoastidxid;
5612         RelFileNode newrnode;
5613         SMgrRelation dstrel;
5614         Relation        pg_class;
5615         HeapTuple       tuple;
5616         Form_pg_class rd_rel;
5617
5618         rel = relation_open(tableOid, NoLock);
5619
5620         /*
5621          * We can never allow moving of shared or nailed-in-cache relations,
5622          * because we can't support changing their reltablespace values.
5623          */
5624         if (rel->rd_rel->relisshared || rel->rd_isnailed)
5625                 ereport(ERROR,
5626                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
5627                                  errmsg("cannot move system relation \"%s\"",
5628                                                 RelationGetRelationName(rel))));
5629
5630         /*
5631          * Don't allow moving temp tables of other backends ... their local buffer
5632          * manager is not going to cope.
5633          */
5634         if (isOtherTempNamespace(RelationGetNamespace(rel)))
5635                 ereport(ERROR,
5636                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
5637                                  errmsg("cannot move temporary tables of other sessions")));
5638
5639         /*
5640          * No work if no change in tablespace.
5641          */
5642         oldTableSpace = rel->rd_rel->reltablespace;
5643         if (newTableSpace == oldTableSpace ||
5644                 (newTableSpace == MyDatabaseTableSpace && oldTableSpace == 0))
5645         {
5646                 relation_close(rel, NoLock);
5647                 return;
5648         }
5649
5650         reltoastrelid = rel->rd_rel->reltoastrelid;
5651         reltoastidxid = rel->rd_rel->reltoastidxid;
5652
5653         /* Get a modifiable copy of the relation's pg_class row */
5654         pg_class = heap_open(RelationRelationId, RowExclusiveLock);
5655
5656         tuple = SearchSysCacheCopy(RELOID,
5657                                                            ObjectIdGetDatum(tableOid),
5658                                                            0, 0, 0);
5659         if (!HeapTupleIsValid(tuple))
5660                 elog(ERROR, "cache lookup failed for relation %u", tableOid);
5661         rd_rel = (Form_pg_class) GETSTRUCT(tuple);
5662
5663         /* create another storage file. Is it a little ugly ? */
5664         /* NOTE: any conflict in relfilenode value will be caught here */
5665         newrnode = rel->rd_node;
5666         newrnode.spcNode = newTableSpace;
5667
5668         dstrel = smgropen(newrnode);
5669         smgrcreate(dstrel, rel->rd_istemp, false);
5670
5671         /* copy relation data to the new physical file */
5672         copy_relation_data(rel, dstrel);
5673
5674         /* schedule unlinking old physical file */
5675         RelationOpenSmgr(rel);
5676         smgrscheduleunlink(rel->rd_smgr, rel->rd_istemp);
5677
5678         /*
5679          * Now drop smgr references.  The source was already dropped by
5680          * smgrscheduleunlink.
5681          */
5682         smgrclose(dstrel);
5683
5684         /* update the pg_class row */
5685         rd_rel->reltablespace = (newTableSpace == MyDatabaseTableSpace) ? InvalidOid : newTableSpace;
5686         simple_heap_update(pg_class, &tuple->t_self, tuple);
5687         CatalogUpdateIndexes(pg_class, tuple);
5688
5689         heap_freetuple(tuple);
5690
5691         heap_close(pg_class, RowExclusiveLock);
5692
5693         relation_close(rel, NoLock);
5694
5695         /* Make sure the reltablespace change is visible */
5696         CommandCounterIncrement();
5697
5698         /* Move associated toast relation and/or index, too */
5699         if (OidIsValid(reltoastrelid))
5700                 ATExecSetTableSpace(reltoastrelid, newTableSpace);
5701         if (OidIsValid(reltoastidxid))
5702                 ATExecSetTableSpace(reltoastidxid, newTableSpace);
5703 }
5704
5705 /*
5706  * Copy data, block by block
5707  */
5708 static void
5709 copy_relation_data(Relation rel, SMgrRelation dst)
5710 {
5711         SMgrRelation src;
5712         bool            use_wal;
5713         BlockNumber nblocks;
5714         BlockNumber blkno;
5715         char            buf[BLCKSZ];
5716         Page            page = (Page) buf;
5717
5718         /*
5719          * Since we copy the file directly without looking at the shared buffers,
5720          * we'd better first flush out any pages of the source relation that are
5721          * in shared buffers.  We assume no new changes will be made while we are
5722          * holding exclusive lock on the rel.
5723          */
5724         FlushRelationBuffers(rel);
5725
5726         /*
5727          * We need to log the copied data in WAL iff WAL archiving is enabled AND
5728          * it's not a temp rel.
5729          */
5730         use_wal = XLogArchivingActive() && !rel->rd_istemp;
5731
5732         nblocks = RelationGetNumberOfBlocks(rel);
5733         /* RelationGetNumberOfBlocks will certainly have opened rd_smgr */
5734         src = rel->rd_smgr;
5735
5736         for (blkno = 0; blkno < nblocks; blkno++)
5737         {
5738                 smgrread(src, blkno, buf);
5739
5740                 /* XLOG stuff */
5741                 if (use_wal)
5742                 {
5743                         xl_heap_newpage xlrec;
5744                         XLogRecPtr      recptr;
5745                         XLogRecData rdata[2];
5746
5747                         /* NO ELOG(ERROR) from here till newpage op is logged */
5748                         START_CRIT_SECTION();
5749
5750                         xlrec.node = dst->smgr_rnode;
5751                         xlrec.blkno = blkno;
5752
5753                         rdata[0].data = (char *) &xlrec;
5754                         rdata[0].len = SizeOfHeapNewpage;
5755                         rdata[0].buffer = InvalidBuffer;
5756                         rdata[0].next = &(rdata[1]);
5757
5758                         rdata[1].data = (char *) page;
5759                         rdata[1].len = BLCKSZ;
5760                         rdata[1].buffer = InvalidBuffer;
5761                         rdata[1].next = NULL;
5762
5763                         recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_NEWPAGE, rdata);
5764
5765                         PageSetLSN(page, recptr);
5766                         PageSetTLI(page, ThisTimeLineID);
5767
5768                         END_CRIT_SECTION();
5769                 }
5770
5771                 /*
5772                  * Now write the page.  We say isTemp = true even if it's not a temp
5773                  * rel, because there's no need for smgr to schedule an fsync for this
5774                  * write; we'll do it ourselves below.
5775                  */
5776                 smgrwrite(dst, blkno, buf, true);
5777         }
5778
5779         /*
5780          * If the rel isn't temp, we must fsync it down to disk before it's safe
5781          * to commit the transaction.  (For a temp rel we don't care since the rel
5782          * will be uninteresting after a crash anyway.)
5783          *
5784          * It's obvious that we must do this when not WAL-logging the copy. It's
5785          * less obvious that we have to do it even if we did WAL-log the copied
5786          * pages. The reason is that since we're copying outside shared buffers, a
5787          * CHECKPOINT occurring during the copy has no way to flush the previously
5788          * written data to disk (indeed it won't know the new rel even exists).  A
5789          * crash later on would replay WAL from the checkpoint, therefore it
5790          * wouldn't replay our earlier WAL entries. If we do not fsync those pages
5791          * here, they might still not be on disk when the crash occurs.
5792          */
5793         if (!rel->rd_istemp)
5794                 smgrimmedsync(dst);
5795 }
5796
5797 /*
5798  * ALTER TABLE ENABLE/DISABLE TRIGGER
5799  *
5800  * We just pass this off to trigger.c.
5801  */
5802 static void
5803 ATExecEnableDisableTrigger(Relation rel, char *trigname,
5804                                                    bool enable, bool skip_system)
5805 {
5806         EnableDisableTrigger(rel, trigname, enable, skip_system);
5807 }
5808
5809 /*
5810  * ALTER TABLE CREATE TOAST TABLE
5811  *
5812  * Note: this is also invoked from outside this module; in such cases we
5813  * expect the caller to have verified that the relation is a table and we
5814  * have all the right permissions.      Callers expect this function
5815  * to end with CommandCounterIncrement if it makes any changes.
5816  */
5817 void
5818 AlterTableCreateToastTable(Oid relOid, bool silent)
5819 {
5820         Relation        rel;
5821         HeapTuple       reltup;
5822         TupleDesc       tupdesc;
5823         bool            shared_relation;
5824         Relation        class_rel;
5825         Oid                     toast_relid;
5826         Oid                     toast_idxid;
5827         char            toast_relname[NAMEDATALEN];
5828         char            toast_idxname[NAMEDATALEN];
5829         IndexInfo  *indexInfo;
5830         Oid                     classObjectId[2];
5831         ObjectAddress baseobject,
5832                                 toastobject;
5833
5834         /*
5835          * Grab an exclusive lock on the target table, which we will NOT release
5836          * until end of transaction.  (This is probably redundant in all present
5837          * uses...)
5838          */
5839         rel = heap_open(relOid, AccessExclusiveLock);
5840
5841         /*
5842          * Toast table is shared if and only if its parent is.
5843          *
5844          * We cannot allow toasting a shared relation after initdb (because
5845          * there's no way to mark it toasted in other databases' pg_class).
5846          * Unfortunately we can't distinguish initdb from a manually started
5847          * standalone backend (toasting happens after the bootstrap phase, so
5848          * checking IsBootstrapProcessingMode() won't work).  However, we can at
5849          * least prevent this mistake under normal multi-user operation.
5850          */
5851         shared_relation = rel->rd_rel->relisshared;
5852         if (shared_relation && IsUnderPostmaster)
5853                 ereport(ERROR,
5854                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5855                                  errmsg("shared tables cannot be toasted after initdb")));
5856
5857         /*
5858          * Is it already toasted?
5859          */
5860         if (rel->rd_rel->reltoastrelid != InvalidOid)
5861         {
5862                 if (silent)
5863                 {
5864                         heap_close(rel, NoLock);
5865                         return;
5866                 }
5867
5868                 ereport(ERROR,
5869                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5870                                  errmsg("table \"%s\" already has a TOAST table",
5871                                                 RelationGetRelationName(rel))));
5872         }
5873
5874         /*
5875          * Check to see whether the table actually needs a TOAST table.
5876          */
5877         if (!needs_toast_table(rel))
5878         {
5879                 if (silent)
5880                 {
5881                         heap_close(rel, NoLock);
5882                         return;
5883                 }
5884
5885                 ereport(ERROR,
5886                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5887                                  errmsg("table \"%s\" does not need a TOAST table",
5888                                                 RelationGetRelationName(rel))));
5889         }
5890
5891         /*
5892          * Create the toast table and its index
5893          */
5894         snprintf(toast_relname, sizeof(toast_relname),
5895                          "pg_toast_%u", relOid);
5896         snprintf(toast_idxname, sizeof(toast_idxname),
5897                          "pg_toast_%u_index", relOid);
5898
5899         /* this is pretty painful...  need a tuple descriptor */
5900         tupdesc = CreateTemplateTupleDesc(3, false);
5901         TupleDescInitEntry(tupdesc, (AttrNumber) 1,
5902                                            "chunk_id",
5903                                            OIDOID,
5904                                            -1, 0);
5905         TupleDescInitEntry(tupdesc, (AttrNumber) 2,
5906                                            "chunk_seq",
5907                                            INT4OID,
5908                                            -1, 0);
5909         TupleDescInitEntry(tupdesc, (AttrNumber) 3,
5910                                            "chunk_data",
5911                                            BYTEAOID,
5912                                            -1, 0);
5913
5914         /*
5915          * Ensure that the toast table doesn't itself get toasted, or we'll be
5916          * toast :-(.  This is essential for chunk_data because type bytea is
5917          * toastable; hit the other two just to be sure.
5918          */
5919         tupdesc->attrs[0]->attstorage = 'p';
5920         tupdesc->attrs[1]->attstorage = 'p';
5921         tupdesc->attrs[2]->attstorage = 'p';
5922
5923         /*
5924          * Note: the toast relation is placed in the regular pg_toast namespace
5925          * even if its master relation is a temp table.  There cannot be any
5926          * naming collision, and the toast rel will be destroyed when its master
5927          * is, so there's no need to handle the toast rel as temp.
5928          */
5929         toast_relid = heap_create_with_catalog(toast_relname,
5930                                                                                    PG_TOAST_NAMESPACE,
5931                                                                                    rel->rd_rel->reltablespace,
5932                                                                                    InvalidOid,
5933                                                                                    rel->rd_rel->relowner,
5934                                                                                    tupdesc,
5935                                                                                    RELKIND_TOASTVALUE,
5936                                                                                    shared_relation,
5937                                                                                    true,
5938                                                                                    0,
5939                                                                                    ONCOMMIT_NOOP,
5940                                                                                    true);
5941
5942         /* make the toast relation visible, else index creation will fail */
5943         CommandCounterIncrement();
5944
5945         /*
5946          * Create unique index on chunk_id, chunk_seq.
5947          *
5948          * NOTE: the normal TOAST access routines could actually function with a
5949          * single-column index on chunk_id only. However, the slice access
5950          * routines use both columns for faster access to an individual chunk. In
5951          * addition, we want it to be unique as a check against the possibility of
5952          * duplicate TOAST chunk OIDs. The index might also be a little more
5953          * efficient this way, since btree isn't all that happy with large numbers
5954          * of equal keys.
5955          */
5956
5957         indexInfo = makeNode(IndexInfo);
5958         indexInfo->ii_NumIndexAttrs = 2;
5959         indexInfo->ii_KeyAttrNumbers[0] = 1;
5960         indexInfo->ii_KeyAttrNumbers[1] = 2;
5961         indexInfo->ii_Expressions = NIL;
5962         indexInfo->ii_ExpressionsState = NIL;
5963         indexInfo->ii_Predicate = NIL;
5964         indexInfo->ii_PredicateState = NIL;
5965         indexInfo->ii_Unique = true;
5966
5967         classObjectId[0] = OID_BTREE_OPS_OID;
5968         classObjectId[1] = INT4_BTREE_OPS_OID;
5969
5970         toast_idxid = index_create(toast_relid, toast_idxname, InvalidOid,
5971                                                            indexInfo,
5972                                                            BTREE_AM_OID,
5973                                                            rel->rd_rel->reltablespace,
5974                                                            classObjectId,
5975                                                            true, false, true, false);
5976
5977         /*
5978          * Update toast rel's pg_class entry to show that it has an index. The
5979          * index OID is stored into the reltoastidxid field for easy access by the
5980          * tuple toaster.
5981          */
5982         setRelhasindex(toast_relid, true, true, toast_idxid);
5983
5984         /*
5985          * Store the toast table's OID in the parent relation's pg_class row
5986          */
5987         class_rel = heap_open(RelationRelationId, RowExclusiveLock);
5988
5989         reltup = SearchSysCacheCopy(RELOID,
5990                                                                 ObjectIdGetDatum(relOid),
5991                                                                 0, 0, 0);
5992         if (!HeapTupleIsValid(reltup))
5993                 elog(ERROR, "cache lookup failed for relation %u", relOid);
5994
5995         ((Form_pg_class) GETSTRUCT(reltup))->reltoastrelid = toast_relid;
5996
5997         simple_heap_update(class_rel, &reltup->t_self, reltup);
5998
5999         /* Keep catalog indexes current */
6000         CatalogUpdateIndexes(class_rel, reltup);
6001
6002         heap_freetuple(reltup);
6003
6004         heap_close(class_rel, RowExclusiveLock);
6005
6006         /*
6007          * Register dependency from the toast table to the master, so that the
6008          * toast table will be deleted if the master is.
6009          */
6010         baseobject.classId = RelationRelationId;
6011         baseobject.objectId = relOid;
6012         baseobject.objectSubId = 0;
6013         toastobject.classId = RelationRelationId;
6014         toastobject.objectId = toast_relid;
6015         toastobject.objectSubId = 0;
6016
6017         recordDependencyOn(&toastobject, &baseobject, DEPENDENCY_INTERNAL);
6018
6019         /*
6020          * Clean up and make changes visible
6021          */
6022         heap_close(rel, NoLock);
6023
6024         CommandCounterIncrement();
6025 }
6026
6027 /*
6028  * Check to see whether the table needs a TOAST table.  It does only if
6029  * (1) there are any toastable attributes, and (2) the maximum length
6030  * of a tuple could exceed TOAST_TUPLE_THRESHOLD.  (We don't want to
6031  * create a toast table for something like "f1 varchar(20)".)
6032  */
6033 static bool
6034 needs_toast_table(Relation rel)
6035 {
6036         int32           data_length = 0;
6037         bool            maxlength_unknown = false;
6038         bool            has_toastable_attrs = false;
6039         TupleDesc       tupdesc;
6040         Form_pg_attribute *att;
6041         int32           tuple_length;
6042         int                     i;
6043
6044         tupdesc = rel->rd_att;
6045         att = tupdesc->attrs;
6046
6047         for (i = 0; i < tupdesc->natts; i++)
6048         {
6049                 if (att[i]->attisdropped)
6050                         continue;
6051                 data_length = att_align(data_length, att[i]->attalign);
6052                 if (att[i]->attlen > 0)
6053                 {
6054                         /* Fixed-length types are never toastable */
6055                         data_length += att[i]->attlen;
6056                 }
6057                 else
6058                 {
6059                         int32           maxlen = type_maximum_size(att[i]->atttypid,
6060                                                                                                    att[i]->atttypmod);
6061
6062                         if (maxlen < 0)
6063                                 maxlength_unknown = true;
6064                         else
6065                                 data_length += maxlen;
6066                         if (att[i]->attstorage != 'p')
6067                                 has_toastable_attrs = true;
6068                 }
6069         }
6070         if (!has_toastable_attrs)
6071                 return false;                   /* nothing to toast? */
6072         if (maxlength_unknown)
6073                 return true;                    /* any unlimited-length attrs? */
6074         tuple_length = MAXALIGN(offsetof(HeapTupleHeaderData, t_bits) +
6075                                                         BITMAPLEN(tupdesc->natts)) +
6076                 MAXALIGN(data_length);
6077         return (tuple_length > TOAST_TUPLE_THRESHOLD);
6078 }
6079
6080
6081 /*
6082  * Execute ALTER TABLE SET SCHEMA
6083  *
6084  * Note: caller must have checked ownership of the relation already
6085  */
6086 void
6087 AlterTableNamespace(RangeVar *relation, const char *newschema)
6088 {
6089         Relation        rel;
6090         Oid                     relid;
6091         Oid                     oldNspOid;
6092         Oid                     nspOid;
6093         Relation        classRel;
6094
6095         rel = heap_openrv(relation, AccessExclusiveLock);
6096
6097         /* heap_openrv allows TOAST, but we don't want to */
6098         if (rel->rd_rel->relkind == RELKIND_TOASTVALUE)
6099                 ereport(ERROR,
6100                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
6101                                  errmsg("\"%s\" is a TOAST relation",
6102                                                 RelationGetRelationName(rel))));
6103
6104         relid = RelationGetRelid(rel);
6105         oldNspOid = RelationGetNamespace(rel);
6106
6107         /* get schema OID and check its permissions */
6108         nspOid = LookupCreationNamespace(newschema);
6109
6110         if (oldNspOid == nspOid)
6111                 ereport(ERROR,
6112                                 (errcode(ERRCODE_DUPLICATE_TABLE),
6113                                  errmsg("relation \"%s\" is already in schema \"%s\"",
6114                                                 RelationGetRelationName(rel),
6115                                                 newschema)));
6116
6117         /* disallow renaming into or out of temp schemas */
6118         if (isAnyTempNamespace(nspOid) || isAnyTempNamespace(oldNspOid))
6119                 ereport(ERROR,
6120                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
6121                         errmsg("cannot move objects into or out of temporary schemas")));
6122
6123         /* same for TOAST schema */
6124         if (nspOid == PG_TOAST_NAMESPACE || oldNspOid == PG_TOAST_NAMESPACE)
6125                 ereport(ERROR,
6126                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
6127                                  errmsg("cannot move objects into or out of TOAST schema")));
6128
6129         /* OK, modify the pg_class row and pg_depend entry */
6130         classRel = heap_open(RelationRelationId, RowExclusiveLock);
6131
6132         AlterRelationNamespaceInternal(classRel, relid, oldNspOid, nspOid, true);
6133
6134         /* Fix the table's rowtype too */
6135         AlterTypeNamespaceInternal(rel->rd_rel->reltype, nspOid, false);
6136
6137         /* Fix other dependent stuff */
6138         if (rel->rd_rel->relkind == RELKIND_RELATION)
6139         {
6140                 AlterIndexNamespaces(classRel, rel, oldNspOid, nspOid);
6141                 AlterSeqNamespaces(classRel, rel, oldNspOid, nspOid, newschema);
6142                 AlterConstraintNamespaces(relid, oldNspOid, nspOid, false);
6143         }
6144
6145         heap_close(classRel, RowExclusiveLock);
6146
6147         /* close rel, but keep lock until commit */
6148         relation_close(rel, NoLock);
6149 }
6150
6151 /*
6152  * The guts of relocating a relation to another namespace: fix the pg_class
6153  * entry, and the pg_depend entry if any.  Caller must already have
6154  * opened and write-locked pg_class.
6155  */
6156 void
6157 AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
6158                                                            Oid oldNspOid, Oid newNspOid,
6159                                                            bool hasDependEntry)
6160 {
6161         HeapTuple       classTup;
6162         Form_pg_class classForm;
6163
6164         classTup = SearchSysCacheCopy(RELOID,
6165                                                                   ObjectIdGetDatum(relOid),
6166                                                                   0, 0, 0);
6167         if (!HeapTupleIsValid(classTup))
6168                 elog(ERROR, "cache lookup failed for relation %u", relOid);
6169         classForm = (Form_pg_class) GETSTRUCT(classTup);
6170
6171         Assert(classForm->relnamespace == oldNspOid);
6172
6173         /* check for duplicate name (more friendly than unique-index failure) */
6174         if (get_relname_relid(NameStr(classForm->relname),
6175                                                   newNspOid) != InvalidOid)
6176                 ereport(ERROR,
6177                                 (errcode(ERRCODE_DUPLICATE_TABLE),
6178                                  errmsg("relation \"%s\" already exists in schema \"%s\"",
6179                                                 NameStr(classForm->relname),
6180                                                 get_namespace_name(newNspOid))));
6181
6182         /* classTup is a copy, so OK to scribble on */
6183         classForm->relnamespace = newNspOid;
6184
6185         simple_heap_update(classRel, &classTup->t_self, classTup);
6186         CatalogUpdateIndexes(classRel, classTup);
6187
6188         /* Update dependency on schema if caller said so */
6189         if (hasDependEntry &&
6190                 changeDependencyFor(RelationRelationId, relOid,
6191                                                         NamespaceRelationId, oldNspOid, newNspOid) != 1)
6192                 elog(ERROR, "failed to change schema dependency for relation \"%s\"",
6193                          NameStr(classForm->relname));
6194
6195         heap_freetuple(classTup);
6196 }
6197
6198 /*
6199  * Move all indexes for the specified relation to another namespace.
6200  *
6201  * Note: we assume adequate permission checking was done by the caller,
6202  * and that the caller has a suitable lock on the owning relation.
6203  */
6204 static void
6205 AlterIndexNamespaces(Relation classRel, Relation rel,
6206                                          Oid oldNspOid, Oid newNspOid)
6207 {
6208         List       *indexList;
6209         ListCell   *l;
6210
6211         indexList = RelationGetIndexList(rel);
6212
6213         foreach(l, indexList)
6214         {
6215                 Oid                     indexOid = lfirst_oid(l);
6216
6217                 /*
6218                  * Note: currently, the index will not have its own dependency on the
6219                  * namespace, so we don't need to do changeDependencyFor(). There's no
6220                  * rowtype in pg_type, either.
6221                  */
6222                 AlterRelationNamespaceInternal(classRel, indexOid,
6223                                                                            oldNspOid, newNspOid,
6224                                                                            false);
6225         }
6226
6227         list_free(indexList);
6228 }
6229
6230 /*
6231  * Move all SERIAL-column sequences of the specified relation to another
6232  * namespace.
6233  *
6234  * Note: we assume adequate permission checking was done by the caller,
6235  * and that the caller has a suitable lock on the owning relation.
6236  */
6237 static void
6238 AlterSeqNamespaces(Relation classRel, Relation rel,
6239                                    Oid oldNspOid, Oid newNspOid, const char *newNspName)
6240 {
6241         Relation        depRel;
6242         SysScanDesc scan;
6243         ScanKeyData key[2];
6244         HeapTuple       tup;
6245
6246         /*
6247          * SERIAL sequences are those having an internal dependency on one of the
6248          * table's columns (we don't care *which* column, exactly).
6249          */
6250         depRel = heap_open(DependRelationId, AccessShareLock);
6251
6252         ScanKeyInit(&key[0],
6253                                 Anum_pg_depend_refclassid,
6254                                 BTEqualStrategyNumber, F_OIDEQ,
6255                                 ObjectIdGetDatum(RelationRelationId));
6256         ScanKeyInit(&key[1],
6257                                 Anum_pg_depend_refobjid,
6258                                 BTEqualStrategyNumber, F_OIDEQ,
6259                                 ObjectIdGetDatum(RelationGetRelid(rel)));
6260         /* we leave refobjsubid unspecified */
6261
6262         scan = systable_beginscan(depRel, DependReferenceIndexId, true,
6263                                                           SnapshotNow, 2, key);
6264
6265         while (HeapTupleIsValid(tup = systable_getnext(scan)))
6266         {
6267                 Form_pg_depend depForm = (Form_pg_depend) GETSTRUCT(tup);
6268                 Relation        seqRel;
6269
6270                 /* skip dependencies other than internal dependencies on columns */
6271                 if (depForm->refobjsubid == 0 ||
6272                         depForm->classid != RelationRelationId ||
6273                         depForm->objsubid != 0 ||
6274                         depForm->deptype != DEPENDENCY_INTERNAL)
6275                         continue;
6276
6277                 /* Use relation_open just in case it's an index */
6278                 seqRel = relation_open(depForm->objid, AccessExclusiveLock);
6279
6280                 /* skip non-sequence relations */
6281                 if (RelationGetForm(seqRel)->relkind != RELKIND_SEQUENCE)
6282                 {
6283                         /* No need to keep the lock */
6284                         relation_close(seqRel, AccessExclusiveLock);
6285                         continue;
6286                 }
6287
6288                 /* Fix the pg_class and pg_depend entries */
6289                 AlterRelationNamespaceInternal(classRel, depForm->objid,
6290                                                                            oldNspOid, newNspOid,
6291                                                                            true);
6292
6293                 /*
6294                  * Sequences have entries in pg_type. We need to be careful to move
6295                  * them to the new namespace, too.
6296                  */
6297                 AlterTypeNamespaceInternal(RelationGetForm(seqRel)->reltype,
6298                                                                    newNspOid, false);
6299
6300                 /* Now we can close it.  Keep the lock till end of transaction. */
6301                 relation_close(seqRel, NoLock);
6302         }
6303
6304         systable_endscan(scan);
6305
6306         relation_close(depRel, AccessShareLock);
6307 }
6308
6309
6310 /*
6311  * This code supports
6312  *      CREATE TEMP TABLE ... ON COMMIT { DROP | PRESERVE ROWS | DELETE ROWS }
6313  *
6314  * Because we only support this for TEMP tables, it's sufficient to remember
6315  * the state in a backend-local data structure.
6316  */
6317
6318 /*
6319  * Register a newly-created relation's ON COMMIT action.
6320  */
6321 void
6322 register_on_commit_action(Oid relid, OnCommitAction action)
6323 {
6324         OnCommitItem *oc;
6325         MemoryContext oldcxt;
6326
6327         /*
6328          * We needn't bother registering the relation unless there is an ON COMMIT
6329          * action we need to take.
6330          */
6331         if (action == ONCOMMIT_NOOP || action == ONCOMMIT_PRESERVE_ROWS)
6332                 return;
6333
6334         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
6335
6336         oc = (OnCommitItem *) palloc(sizeof(OnCommitItem));
6337         oc->relid = relid;
6338         oc->oncommit = action;
6339         oc->creating_subid = GetCurrentSubTransactionId();
6340         oc->deleting_subid = InvalidSubTransactionId;
6341
6342         on_commits = lcons(oc, on_commits);
6343
6344         MemoryContextSwitchTo(oldcxt);
6345 }
6346
6347 /*
6348  * Unregister any ON COMMIT action when a relation is deleted.
6349  *
6350  * Actually, we only mark the OnCommitItem entry as to be deleted after commit.
6351  */
6352 void
6353 remove_on_commit_action(Oid relid)
6354 {
6355         ListCell   *l;
6356
6357         foreach(l, on_commits)
6358         {
6359                 OnCommitItem *oc = (OnCommitItem *) lfirst(l);
6360
6361                 if (oc->relid == relid)
6362                 {
6363                         oc->deleting_subid = GetCurrentSubTransactionId();
6364                         break;
6365                 }
6366         }
6367 }
6368
6369 /*
6370  * Perform ON COMMIT actions.
6371  *
6372  * This is invoked just before actually committing, since it's possible
6373  * to encounter errors.
6374  */
6375 void
6376 PreCommit_on_commit_actions(void)
6377 {
6378         ListCell   *l;
6379         List       *oids_to_truncate = NIL;
6380
6381         foreach(l, on_commits)
6382         {
6383                 OnCommitItem *oc = (OnCommitItem *) lfirst(l);
6384
6385                 /* Ignore entry if already dropped in this xact */
6386                 if (oc->deleting_subid != InvalidSubTransactionId)
6387                         continue;
6388
6389                 switch (oc->oncommit)
6390                 {
6391                         case ONCOMMIT_NOOP:
6392                         case ONCOMMIT_PRESERVE_ROWS:
6393                                 /* Do nothing (there shouldn't be such entries, actually) */
6394                                 break;
6395                         case ONCOMMIT_DELETE_ROWS:
6396                                 oids_to_truncate = lappend_oid(oids_to_truncate, oc->relid);
6397                                 break;
6398                         case ONCOMMIT_DROP:
6399                                 {
6400                                         ObjectAddress object;
6401
6402                                         object.classId = RelationRelationId;
6403                                         object.objectId = oc->relid;
6404                                         object.objectSubId = 0;
6405                                         performDeletion(&object, DROP_CASCADE);
6406
6407                                         /*
6408                                          * Note that table deletion will call
6409                                          * remove_on_commit_action, so the entry should get marked
6410                                          * as deleted.
6411                                          */
6412                                         Assert(oc->deleting_subid != InvalidSubTransactionId);
6413                                         break;
6414                                 }
6415                 }
6416         }
6417         if (oids_to_truncate != NIL)
6418         {
6419                 heap_truncate(oids_to_truncate);
6420                 CommandCounterIncrement();              /* XXX needed? */
6421         }
6422 }
6423
6424 /*
6425  * Post-commit or post-abort cleanup for ON COMMIT management.
6426  *
6427  * All we do here is remove no-longer-needed OnCommitItem entries.
6428  *
6429  * During commit, remove entries that were deleted during this transaction;
6430  * during abort, remove those created during this transaction.
6431  */
6432 void
6433 AtEOXact_on_commit_actions(bool isCommit)
6434 {
6435         ListCell   *cur_item;
6436         ListCell   *prev_item;
6437
6438         prev_item = NULL;
6439         cur_item = list_head(on_commits);
6440
6441         while (cur_item != NULL)
6442         {
6443                 OnCommitItem *oc = (OnCommitItem *) lfirst(cur_item);
6444
6445                 if (isCommit ? oc->deleting_subid != InvalidSubTransactionId :
6446                         oc->creating_subid != InvalidSubTransactionId)
6447                 {
6448                         /* cur_item must be removed */
6449                         on_commits = list_delete_cell(on_commits, cur_item, prev_item);
6450                         pfree(oc);
6451                         if (prev_item)
6452                                 cur_item = lnext(prev_item);
6453                         else
6454                                 cur_item = list_head(on_commits);
6455                 }
6456                 else
6457                 {
6458                         /* cur_item must be preserved */
6459                         oc->creating_subid = InvalidSubTransactionId;
6460                         oc->deleting_subid = InvalidSubTransactionId;
6461                         prev_item = cur_item;
6462                         cur_item = lnext(prev_item);
6463                 }
6464         }
6465 }
6466
6467 /*
6468  * Post-subcommit or post-subabort cleanup for ON COMMIT management.
6469  *
6470  * During subabort, we can immediately remove entries created during this
6471  * subtransaction.      During subcommit, just relabel entries marked during
6472  * this subtransaction as being the parent's responsibility.
6473  */
6474 void
6475 AtEOSubXact_on_commit_actions(bool isCommit, SubTransactionId mySubid,
6476                                                           SubTransactionId parentSubid)
6477 {
6478         ListCell   *cur_item;
6479         ListCell   *prev_item;
6480
6481         prev_item = NULL;
6482         cur_item = list_head(on_commits);
6483
6484         while (cur_item != NULL)
6485         {
6486                 OnCommitItem *oc = (OnCommitItem *) lfirst(cur_item);
6487
6488                 if (!isCommit && oc->creating_subid == mySubid)
6489                 {
6490                         /* cur_item must be removed */
6491                         on_commits = list_delete_cell(on_commits, cur_item, prev_item);
6492                         pfree(oc);
6493                         if (prev_item)
6494                                 cur_item = lnext(prev_item);
6495                         else
6496                                 cur_item = list_head(on_commits);
6497                 }
6498                 else
6499                 {
6500                         /* cur_item must be preserved */
6501                         if (oc->creating_subid == mySubid)
6502                                 oc->creating_subid = parentSubid;
6503                         if (oc->deleting_subid == mySubid)
6504                                 oc->deleting_subid = isCommit ? parentSubid : InvalidSubTransactionId;
6505                         prev_item = cur_item;
6506                         cur_item = lnext(prev_item);
6507                 }
6508         }
6509 }