*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.239 2008/01/02 23:34:42 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.256 2008/06/14 18:04:33 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#include "access/genam.h"
#include "access/heapam.h"
#include "access/reloptions.h"
+#include "access/sysattr.h"
#include "access/xact.h"
#include "catalog/catalog.h"
#include "catalog/dependency.h"
#include "catalog/pg_tablespace.h"
#include "catalog/pg_trigger.h"
#include "catalog/pg_type.h"
+#include "catalog/pg_type_fn.h"
#include "catalog/toasting.h"
#include "commands/cluster.h"
#include "commands/defrem.h"
+#include "commands/sequence.h"
#include "commands/tablecmds.h"
#include "commands/tablespace.h"
#include "commands/trigger.h"
#include "parser/parser.h"
#include "rewrite/rewriteDefine.h"
#include "rewrite/rewriteHandler.h"
+#include "storage/bufmgr.h"
+#include "storage/lmgr.h"
#include "storage/smgr.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/relcache.h"
+#include "utils/snapmgr.h"
#include "utils/syscache.h"
+#include "utils/tqual.h"
/*
ExprState *exprstate; /* execution state */
} NewColumnValue;
+/*
+ * Error-reporting support for RemoveRelations
+ */
+struct dropmsgstrings
+{
+ char kind;
+ int nonexistent_code;
+ const char *nonexistent_msg;
+ const char *skipping_msg;
+ const char *nota_msg;
+ const char *drophint_msg;
+};
+
+static const struct dropmsgstrings dropmsgstringarray[] = {
+ {RELKIND_RELATION,
+ ERRCODE_UNDEFINED_TABLE,
+ gettext_noop("table \"%s\" does not exist"),
+ gettext_noop("table \"%s\" does not exist, skipping"),
+ gettext_noop("\"%s\" is not a table"),
+ gettext_noop("Use DROP TABLE to remove a table.")},
+ {RELKIND_SEQUENCE,
+ ERRCODE_UNDEFINED_TABLE,
+ gettext_noop("sequence \"%s\" does not exist"),
+ gettext_noop("sequence \"%s\" does not exist, skipping"),
+ gettext_noop("\"%s\" is not a sequence"),
+ gettext_noop("Use DROP SEQUENCE to remove a sequence.")},
+ {RELKIND_VIEW,
+ ERRCODE_UNDEFINED_TABLE,
+ gettext_noop("view \"%s\" does not exist"),
+ gettext_noop("view \"%s\" does not exist, skipping"),
+ gettext_noop("\"%s\" is not a view"),
+ gettext_noop("Use DROP VIEW to remove a view.")},
+ {RELKIND_INDEX,
+ ERRCODE_UNDEFINED_OBJECT,
+ gettext_noop("index \"%s\" does not exist"),
+ gettext_noop("index \"%s\" does not exist, skipping"),
+ gettext_noop("\"%s\" is not an index"),
+ gettext_noop("Use DROP INDEX to remove an index.")},
+ {RELKIND_COMPOSITE_TYPE,
+ ERRCODE_UNDEFINED_OBJECT,
+ gettext_noop("type \"%s\" does not exist"),
+ gettext_noop("type \"%s\" does not exist, skipping"),
+ gettext_noop("\"%s\" is not a type"),
+ gettext_noop("Use DROP TYPE to remove a type.")},
+ {'\0', 0, NULL, NULL, NULL, NULL}
+};
+
static void truncate_check_rel(Relation rel);
static List *MergeAttributes(List *schema, List *supers, bool istemp,
List **supOids, List **supconstr, int *supOidCount);
-static void MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel);
-static void MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel);
-static void add_nonduplicate_constraint(Constraint *cdef,
- ConstrCheck *check, int *ncheck);
+static bool MergeCheckConstraint(List *constraints, char *name, Node *expr);
static bool change_varattnos_walker(Node *node, const AttrNumber *newattno);
+static void MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel);
+static void MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel);
static void StoreCatalogInheritance(Oid relationId, List *supers);
static void StoreCatalogInheritance1(Oid relationId, Oid parentOid,
int16 seqNumber, Relation inhRelation);
Relation rel, Relation pkrel, Oid constraintOid);
static void createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint,
Oid constraintOid);
-static void CheckTableNotInUse(Relation rel);
static void ATController(Relation rel, List *cmds, bool recurse);
static void ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
bool recurse, bool recursing);
static void ATRewriteCatalogs(List **wqueue);
-static void ATExecCmd(AlteredTableInfo *tab, Relation rel, AlterTableCmd *cmd);
+static void ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
+ AlterTableCmd *cmd);
static void ATRewriteTables(List **wqueue);
static void ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap);
static AlteredTableInfo *ATGetQueueEntry(List **wqueue, Relation rel);
bool recurse, bool recursing);
static void ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
IndexStmt *stmt, bool is_rebuild);
-static void ATExecAddConstraint(AlteredTableInfo *tab, Relation rel,
- Node *newConstraint);
+static void ATExecAddConstraint(List **wqueue,
+ AlteredTableInfo *tab, Relation rel,
+ Node *newConstraint, bool recurse);
+static void ATAddCheckConstraint(List **wqueue,
+ AlteredTableInfo *tab, Relation rel,
+ Constraint *constr,
+ bool recurse, bool recursing);
static void ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
FkConstraint *fkconstraint);
-static void ATPrepDropConstraint(List **wqueue, Relation rel,
- bool recurse, AlterTableCmd *cmd);
static void ATExecDropConstraint(Relation rel, const char *constrName,
- DropBehavior behavior, bool quiet);
+ DropBehavior behavior,
+ bool recurse, bool recursing);
static void ATPrepAlterColumnType(List **wqueue,
AlteredTableInfo *tab, Relation rel,
bool recurse, bool recursing,
bool localHasOids;
int parentOidCount;
List *rawDefaults;
+ List *cookedDefaults;
Datum reloptions;
ListCell *listptr;
AttrNumber attnum;
}
/* Check permissions except when using database's default */
- if (OidIsValid(tablespaceId))
+ if (OidIsValid(tablespaceId) && tablespaceId != MyDatabaseTableSpace)
{
AclResult aclresult;
&inheritOids, &old_constraints, &parentOidCount);
/*
- * Create a relation descriptor from the relation schema and create the
- * relation. Note that in this stage only inherited (pre-cooked) defaults
- * and constraints will be included into the new relation.
- * (BuildDescForRelation takes care of the inherited defaults, but we have
- * to copy inherited constraints here.)
+ * Create a tuple descriptor from the relation schema. Note that this
+ * deals with column names, types, and NOT NULL constraints, but not
+ * default values or CHECK constraints; we handle those below.
*/
descriptor = BuildDescForRelation(schema);
localHasOids = interpretOidsOption(stmt->options);
descriptor->tdhasoid = (localHasOids || parentOidCount > 0);
- if (old_constraints || stmt->constraints)
+ /*
+ * Find columns with default values and prepare for insertion of the
+ * defaults. Pre-cooked (that is, inherited) defaults go into a list of
+ * CookedConstraint structs that we'll pass to heap_create_with_catalog,
+ * while raw defaults go into a list of RawColumnDefault structs that
+ * will be processed by AddRelationNewConstraints. (We can't deal with
+ * raw expressions until we can do transformExpr.)
+ *
+ * We can set the atthasdef flags now in the tuple descriptor; this just
+ * saves StoreAttrDefault from having to do an immediate update of the
+ * pg_attribute rows.
+ */
+ rawDefaults = NIL;
+ cookedDefaults = NIL;
+ attnum = 0;
+
+ foreach(listptr, schema)
{
- ConstrCheck *check;
- int ncheck = 0;
-
- /* make array that's certainly big enough */
- check = (ConstrCheck *)
- palloc((list_length(old_constraints) +
- list_length(stmt->constraints)) * sizeof(ConstrCheck));
- /* deal with constraints from MergeAttributes */
- foreach(listptr, old_constraints)
- {
- Constraint *cdef = (Constraint *) lfirst(listptr);
+ ColumnDef *colDef = lfirst(listptr);
- if (cdef->contype == CONSTR_CHECK)
- add_nonduplicate_constraint(cdef, check, &ncheck);
- }
+ attnum++;
- /*
- * parse_utilcmd.c might have passed some precooked constraints too,
- * due to LIKE tab INCLUDING CONSTRAINTS
- */
- foreach(listptr, stmt->constraints)
+ if (colDef->raw_default != NULL)
{
- Constraint *cdef = (Constraint *) lfirst(listptr);
+ RawColumnDefault *rawEnt;
+
+ Assert(colDef->cooked_default == NULL);
- if (cdef->contype == CONSTR_CHECK && cdef->cooked_expr != NULL)
- add_nonduplicate_constraint(cdef, check, &ncheck);
+ rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault));
+ rawEnt->attnum = attnum;
+ rawEnt->raw_default = colDef->raw_default;
+ rawDefaults = lappend(rawDefaults, rawEnt);
+ descriptor->attrs[attnum - 1]->atthasdef = true;
}
- /* if we found any, insert 'em into the descriptor */
- if (ncheck > 0)
+ else if (colDef->cooked_default != NULL)
{
- if (descriptor->constr == NULL)
- {
- descriptor->constr = (TupleConstr *) palloc(sizeof(TupleConstr));
- descriptor->constr->defval = NULL;
- descriptor->constr->num_defval = 0;
- descriptor->constr->has_not_null = false;
- }
- descriptor->constr->num_check = ncheck;
- descriptor->constr->check = check;
+ CookedConstraint *cooked;
+
+ cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint));
+ cooked->contype = CONSTR_DEFAULT;
+ cooked->name = NULL;
+ cooked->attnum = attnum;
+ cooked->expr = stringToNode(colDef->cooked_default);
+ cooked->is_local = true; /* not used for defaults */
+ cooked->inhcount = 0; /* ditto */
+ cookedDefaults = lappend(cookedDefaults, cooked);
+ descriptor->attrs[attnum - 1]->atthasdef = true;
}
}
+ /*
+ * Create the relation. Inherited defaults and constraints are passed
+ * in for immediate handling --- since they don't need parsing, they
+ * can be stored immediately.
+ */
relationId = heap_create_with_catalog(relname,
namespaceId,
tablespaceId,
InvalidOid,
GetUserId(),
descriptor,
+ list_concat(cookedDefaults,
+ old_constraints),
relkind,
false,
localHasOids,
* apply the parser's transformExpr routine, but transformExpr doesn't
* work unless we have a pre-existing relation. So, the transformation has
* to be postponed to this final step of CREATE TABLE.
- *
- * First, scan schema to find new column defaults.
- */
- rawDefaults = NIL;
- attnum = 0;
-
- foreach(listptr, schema)
- {
- ColumnDef *colDef = lfirst(listptr);
-
- attnum++;
-
- if (colDef->raw_default != NULL)
- {
- RawColumnDefault *rawEnt;
-
- Assert(colDef->cooked_default == NULL);
-
- rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault));
- rawEnt->attnum = attnum;
- rawEnt->raw_default = colDef->raw_default;
- rawDefaults = lappend(rawDefaults, rawEnt);
- }
- }
-
- /*
- * Parse and add the defaults/constraints, if any.
*/
if (rawDefaults || stmt->constraints)
- AddRelationRawConstraints(rel, rawDefaults, stmt->constraints);
+ AddRelationNewConstraints(rel, rawDefaults, stmt->constraints,
+ true, true);
/*
* Clean up. We keep lock on new relation (although it shouldn't be
}
/*
- * RemoveRelation
- * Deletes a relation.
+ * Emit the right error or warning message for a "DROP" command issued on a
+ * non-existent relation
+ */
+static void
+DropErrorMsgNonExistent(const char *relname, char rightkind, bool missing_ok)
+{
+ const struct dropmsgstrings *rentry;
+
+ for (rentry = dropmsgstringarray; rentry->kind != '\0'; rentry++)
+ {
+ if (rentry->kind == rightkind)
+ {
+ if (!missing_ok)
+ {
+ ereport(ERROR,
+ (errcode(rentry->nonexistent_code),
+ errmsg(rentry->nonexistent_msg, relname)));
+ }
+ else
+ {
+ ereport(NOTICE, (errmsg(rentry->skipping_msg, relname)));
+ break;
+ }
+ }
+ }
+
+ Assert(rentry->kind != '\0'); /* Should be impossible */
+}
+
+/*
+ * Emit the right error message for a "DROP" command issued on a
+ * relation of the wrong type
+ */
+static void
+DropErrorMsgWrongType(const char *relname, char wrongkind, char rightkind)
+{
+ const struct dropmsgstrings *rentry;
+ const struct dropmsgstrings *wentry;
+
+ for (rentry = dropmsgstringarray; rentry->kind != '\0'; rentry++)
+ if (rentry->kind == rightkind)
+ break;
+ Assert(rentry->kind != '\0');
+
+ for (wentry = dropmsgstringarray; wentry->kind != '\0'; wentry++)
+ if (wentry->kind == wrongkind)
+ break;
+ /* wrongkind could be something we don't have in our table... */
+
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg(rentry->nota_msg, relname),
+ (wentry->kind != '\0') ? errhint(wentry->drophint_msg) : 0));
+}
+
+/*
+ * RemoveRelations
+ * Implements DROP TABLE, DROP INDEX, DROP SEQUENCE, DROP VIEW
*/
void
-RemoveRelation(const RangeVar *relation, DropBehavior behavior)
+RemoveRelations(DropStmt *drop)
{
- Oid relOid;
- ObjectAddress object;
+ ObjectAddresses *objects;
+ char relkind;
+ ListCell *cell;
- relOid = RangeVarGetRelid(relation, false);
+ /*
+ * First we identify all the relations, then we delete them in a single
+ * performMultipleDeletions() call. This is to avoid unwanted
+ * DROP RESTRICT errors if one of the relations depends on another.
+ */
- object.classId = RelationRelationId;
- object.objectId = relOid;
- object.objectSubId = 0;
+ /* Determine required relkind */
+ switch (drop->removeType)
+ {
+ case OBJECT_TABLE:
+ relkind = RELKIND_RELATION;
+ break;
- performDeletion(&object, behavior);
+ case OBJECT_INDEX:
+ relkind = RELKIND_INDEX;
+ break;
+
+ case OBJECT_SEQUENCE:
+ relkind = RELKIND_SEQUENCE;
+ break;
+
+ case OBJECT_VIEW:
+ relkind = RELKIND_VIEW;
+ break;
+
+ default:
+ elog(ERROR, "unrecognized drop object type: %d",
+ (int) drop->removeType);
+ relkind = 0; /* keep compiler quiet */
+ break;
+ }
+
+ /* Lock and validate each relation; build a list of object addresses */
+ objects = new_object_addresses();
+
+ foreach(cell, drop->objects)
+ {
+ RangeVar *rel = makeRangeVarFromNameList((List *) lfirst(cell));
+ Oid relOid;
+ HeapTuple tuple;
+ Form_pg_class classform;
+ ObjectAddress obj;
+
+ /*
+ * These next few steps are a great deal like relation_openrv, but we
+ * don't bother building a relcache entry since we don't need it.
+ *
+ * Check for shared-cache-inval messages before trying to access the
+ * relation. This is needed to cover the case where the name
+ * identifies a rel that has been dropped and recreated since the
+ * start of our transaction: if we don't flush the old syscache entry,
+ * then we'll latch onto that entry and suffer an error later.
+ */
+ AcceptInvalidationMessages();
+
+ /* Look up the appropriate relation using namespace search */
+ relOid = RangeVarGetRelid(rel, true);
+
+ /* Not there? */
+ if (!OidIsValid(relOid))
+ {
+ DropErrorMsgNonExistent(rel->relname, relkind, drop->missing_ok);
+ continue;
+ }
+
+ /* Get the lock before trying to fetch the syscache entry */
+ LockRelationOid(relOid, AccessExclusiveLock);
+
+ tuple = SearchSysCache(RELOID,
+ ObjectIdGetDatum(relOid),
+ 0, 0, 0);
+ if (!HeapTupleIsValid(tuple))
+ elog(ERROR, "cache lookup failed for relation %u", relOid);
+ classform = (Form_pg_class) GETSTRUCT(tuple);
+
+ if (classform->relkind != relkind)
+ DropErrorMsgWrongType(rel->relname, classform->relkind, relkind);
+
+ /* Allow DROP to either table owner or schema owner */
+ if (!pg_class_ownercheck(relOid, GetUserId()) &&
+ !pg_namespace_ownercheck(classform->relnamespace, GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
+ rel->relname);
+
+ if (!allowSystemTableMods && IsSystemClass(classform))
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("permission denied: \"%s\" is a system catalog",
+ rel->relname)));
+
+ /* OK, we're ready to delete this one */
+ obj.classId = RelationRelationId;
+ obj.objectId = relOid;
+ obj.objectSubId = 0;
+
+ add_exact_object_address(&obj, objects);
+
+ ReleaseSysCache(tuple);
+ }
+
+ performMultipleDeletions(objects, drop->behavior);
+
+ free_object_addresses(objects);
}
/*
{
List *rels = NIL;
List *relids = NIL;
+ List *seq_relids = NIL;
+ EState *estate;
+ ResultRelInfo *resultRelInfos;
+ ResultRelInfo *resultRelInfo;
ListCell *cell;
/*
#endif
/*
+ * If we are asked to restart sequences, find all the sequences,
+ * lock them (we only need AccessShareLock because that's all that
+ * ALTER SEQUENCE takes), and check permissions. We want to do this
+ * early since it's pointless to do all the truncation work only to fail
+ * on sequence permissions.
+ */
+ if (stmt->restart_seqs)
+ {
+ foreach(cell, rels)
+ {
+ Relation rel = (Relation) lfirst(cell);
+ List *seqlist = getOwnedSequences(RelationGetRelid(rel));
+ ListCell *seqcell;
+
+ foreach(seqcell, seqlist)
+ {
+ Oid seq_relid = lfirst_oid(seqcell);
+ Relation seq_rel;
+
+ seq_rel = relation_open(seq_relid, AccessShareLock);
+
+ /* This check must match AlterSequence! */
+ if (!pg_class_ownercheck(seq_relid, GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
+ RelationGetRelationName(seq_rel));
+
+ seq_relids = lappend_oid(seq_relids, seq_relid);
+
+ relation_close(seq_rel, NoLock);
+ }
+ }
+ }
+
+ /* Prepare to catch AFTER triggers. */
+ AfterTriggerBeginQuery();
+
+ /*
+ * To fire triggers, we'll need an EState as well as a ResultRelInfo
+ * for each relation.
+ */
+ estate = CreateExecutorState();
+ resultRelInfos = (ResultRelInfo *)
+ palloc(list_length(rels) * sizeof(ResultRelInfo));
+ resultRelInfo = resultRelInfos;
+ foreach(cell, rels)
+ {
+ Relation rel = (Relation) lfirst(cell);
+
+ InitResultRelInfo(resultRelInfo,
+ rel,
+ 0, /* dummy rangetable index */
+ CMD_DELETE, /* don't need any index info */
+ false);
+ resultRelInfo++;
+ }
+ estate->es_result_relations = resultRelInfos;
+ estate->es_num_result_relations = list_length(rels);
+
+ /*
+ * Process all BEFORE STATEMENT TRUNCATE triggers before we begin
+ * truncating (this is because one of them might throw an error).
+ * Also, if we were to allow them to prevent statement execution,
+ * that would need to be handled here.
+ */
+ resultRelInfo = resultRelInfos;
+ foreach(cell, rels)
+ {
+ estate->es_result_relation_info = resultRelInfo;
+ ExecBSTruncateTriggers(estate, resultRelInfo);
+ resultRelInfo++;
+ }
+
+ /*
* OK, truncate each table.
*/
foreach(cell, rels)
heap_relid = RelationGetRelid(rel);
toast_relid = rel->rd_rel->reltoastrelid;
- heap_close(rel, NoLock);
-
/*
* The same for the toast table, if any.
*/
*/
reindex_relation(heap_relid, true);
}
+
+ /*
+ * Process all AFTER STATEMENT TRUNCATE triggers.
+ */
+ resultRelInfo = resultRelInfos;
+ foreach(cell, rels)
+ {
+ estate->es_result_relation_info = resultRelInfo;
+ ExecASTruncateTriggers(estate, resultRelInfo);
+ resultRelInfo++;
+ }
+
+ /* Handle queued AFTER triggers */
+ AfterTriggerEndQuery(estate);
+
+ /* We can clean up the EState now */
+ FreeExecutorState(estate);
+
+ /* And close the rels (can't do this while EState still holds refs) */
+ foreach(cell, rels)
+ {
+ Relation rel = (Relation) lfirst(cell);
+
+ heap_close(rel, NoLock);
+ }
+
+ /*
+ * Lastly, restart any owned sequences if we were asked to. This is done
+ * last because it's nontransactional: restarts will not roll back if
+ * we abort later. Hence it's important to postpone them as long as
+ * possible. (This is also a big reason why we locked and
+ * permission-checked the sequences beforehand.)
+ */
+ if (stmt->restart_seqs)
+ {
+ List *options = list_make1(makeDefElem("restart", NULL));
+
+ foreach(cell, seq_relids)
+ {
+ Oid seq_relid = lfirst_oid(cell);
+
+ AlterSequenceInternal(seq_relid, options);
+ }
+ }
}
/*
errmsg("cannot truncate temporary tables of other sessions")));
/*
- * Also check for pending AFTER trigger events on the relation. We can't
- * just leave those be, since they will try to fetch tuples that the
- * TRUNCATE removes.
+ * Also check for active uses of the relation in the current transaction,
+ * including open scans and pending AFTER trigger events.
*/
- if (AfterTriggerPendingOnRel(RelationGetRelid(rel)))
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_IN_USE),
- errmsg("cannot truncate table \"%s\" because it has pending trigger events",
- RelationGetRelationName(rel))));
+ CheckTableNotInUse(rel, "TRUNCATE");
}
/*----------
}
/*
- * Now copy the constraints of this parent, adjusting attnos using the
- * completed newattno[] map
+ * Now copy the CHECK constraints of this parent, adjusting attnos
+ * using the completed newattno[] map. Identically named constraints
+ * are merged if possible, else we throw error.
*/
if (constr && constr->num_check > 0)
{
for (i = 0; i < constr->num_check; i++)
{
- Constraint *cdef = makeNode(Constraint);
+ char *name = check[i].ccname;
Node *expr;
- cdef->contype = CONSTR_CHECK;
- cdef->name = pstrdup(check[i].ccname);
- cdef->raw_expr = NULL;
/* adjust varattnos of ccbin here */
expr = stringToNode(check[i].ccbin);
change_varattnos_of_a_node(expr, newattno);
- cdef->cooked_expr = nodeToString(expr);
- constraints = lappend(constraints, cdef);
+
+ /* check for duplicate */
+ if (!MergeCheckConstraint(constraints, name, expr))
+ {
+ /* nope, this is a new one */
+ CookedConstraint *cooked;
+
+ cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint));
+ cooked->contype = CONSTR_CHECK;
+ cooked->name = pstrdup(name);
+ cooked->attnum = 0; /* not used for constraints */
+ cooked->expr = expr;
+ cooked->is_local = false;
+ cooked->inhcount = 1;
+ constraints = lappend(constraints, cooked);
+ }
}
}
/*
- * In multiple-inheritance situations, it's possible to inherit
- * the same grandparent constraint through multiple parents.
- * Hence, we want to discard inherited constraints that match as to
- * both name and expression. Otherwise, gripe if there are conflicting
- * names. Nonconflicting constraints are added to the array check[]
- * of length *ncheck ... caller must ensure there is room!
+ * MergeCheckConstraint
+ * Try to merge an inherited CHECK constraint with previous ones
+ *
+ * If we inherit identically-named constraints from multiple parents, we must
+ * merge them, or throw an error if they don't have identical definitions.
+ *
+ * constraints is a list of CookedConstraint structs for previous constraints.
+ *
+ * Returns TRUE if merged (constraint is a duplicate), or FALSE if it's
+ * got a so-far-unique name, or throws error if conflict.
*/
-static void
-add_nonduplicate_constraint(Constraint *cdef, ConstrCheck *check, int *ncheck)
+static bool
+MergeCheckConstraint(List *constraints, char *name, Node *expr)
{
- int i;
+ ListCell *lc;
- /* Should only see precooked constraints here */
- Assert(cdef->contype == CONSTR_CHECK);
- Assert(cdef->name != NULL);
- Assert(cdef->raw_expr == NULL && cdef->cooked_expr != NULL);
-
- for (i = 0; i < *ncheck; i++)
+ foreach(lc, constraints)
{
- if (strcmp(check[i].ccname, cdef->name) != 0)
+ CookedConstraint *ccon = (CookedConstraint *) lfirst(lc);
+
+ Assert(ccon->contype == CONSTR_CHECK);
+
+ /* Non-matching names never conflict */
+ if (strcmp(ccon->name, name) != 0)
continue;
- if (strcmp(check[i].ccbin, cdef->cooked_expr) == 0)
- return; /* duplicate constraint, so ignore it */
+
+ if (equal(expr, ccon->expr))
+ {
+ /* OK to merge */
+ ccon->inhcount++;
+ return true;
+ }
+
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("check constraint name \"%s\" appears multiple times but with different expressions",
- cdef->name)));
+ name)));
}
- /* No match on name, so add it to array */
- check[*ncheck].ccname = cdef->name;
- check[*ncheck].ccbin = pstrdup(cdef->cooked_expr);
- (*ncheck)++;
+
+ return false;
}
* currently.
*/
Assert(newattno[var->varattno - 1] > 0);
- var->varattno = newattno[var->varattno - 1];
+ var->varattno = var->varoattno = newattno[var->varattno - 1];
}
return false;
}
relation_close(targetrelation, NoLock); /* close rel but keep lock */
}
+
/*
- * renamerel - change the name of a relation
+ * Execute ALTER TABLE/INDEX/SEQUENCE/VIEW RENAME
*
- * XXX - When renaming sequences, we don't bother to modify the
- * sequence name that is stored within the sequence itself
- * (this would cause problems with MVCC). In the future,
- * the sequence name should probably be removed from the
- * sequence, AFAIK there's no need for it to be there.
+ * Caller has already done permissions checks.
*/
void
-renamerel(Oid myrelid, const char *newrelname, ObjectType reltype)
+RenameRelation(Oid myrelid, const char *newrelname, ObjectType reltype)
{
Relation targetrelation;
- Relation relrelation; /* for RELATION relation */
- HeapTuple reltup;
- Form_pg_class relform;
Oid namespaceId;
- char *oldrelname;
char relkind;
- bool relhastriggers;
/*
* Grab an exclusive lock on the target table, index, sequence or view,
*/
targetrelation = relation_open(myrelid, AccessExclusiveLock);
- oldrelname = pstrdup(RelationGetRelationName(targetrelation));
namespaceId = RelationGetNamespace(targetrelation);
-
- if (!allowSystemTableMods && IsSystemRelation(targetrelation))
- ereport(ERROR,
- (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
- errmsg("permission denied: \"%s\" is a system catalog",
- RelationGetRelationName(targetrelation))));
+ relkind = targetrelation->rd_rel->relkind;
/*
* For compatibility with prior releases, we don't complain if ALTER TABLE
* or ALTER INDEX is used to rename a sequence or view.
*/
- relkind = targetrelation->rd_rel->relkind;
- if (reltype == OBJECT_SEQUENCE && relkind != 'S')
+ if (reltype == OBJECT_SEQUENCE && relkind != RELKIND_SEQUENCE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a sequence",
RelationGetRelationName(targetrelation))));
- if (reltype == OBJECT_VIEW && relkind != 'v')
+ if (reltype == OBJECT_VIEW && relkind != RELKIND_VIEW)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a view",
RelationGetRelationName(targetrelation))));
- relhastriggers = (targetrelation->rd_rel->reltriggers > 0);
+ /*
+ * Don't allow ALTER TABLE on composite types.
+ * We want people to use ALTER TYPE for that.
+ */
+ if (relkind == RELKIND_COMPOSITE_TYPE)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is a composite type",
+ RelationGetRelationName(targetrelation)),
+ errhint("Use ALTER TYPE instead.")));
+
+ /* Do the work */
+ RenameRelationInternal(myrelid, newrelname, namespaceId);
+
+ /*
+ * Close rel, but keep exclusive lock!
+ */
+ relation_close(targetrelation, NoLock);
+}
+
+/*
+ * RenameRelationInternal - change the name of a relation
+ *
+ * XXX - When renaming sequences, we don't bother to modify the
+ * sequence name that is stored within the sequence itself
+ * (this would cause problems with MVCC). In the future,
+ * the sequence name should probably be removed from the
+ * sequence, AFAIK there's no need for it to be there.
+ */
+void
+RenameRelationInternal(Oid myrelid, const char *newrelname, Oid namespaceId)
+{
+ Relation targetrelation;
+ Relation relrelation; /* for RELATION relation */
+ HeapTuple reltup;
+ Form_pg_class relform;
+
+ /*
+ * Grab an exclusive lock on the target table, index, sequence or
+ * view, which we will NOT release until end of transaction.
+ */
+ targetrelation = relation_open(myrelid, AccessExclusiveLock);
/*
* Find relation's pg_class tuple, and make sure newrelname isn't in use.
* Also rename the associated type, if any.
*/
if (OidIsValid(targetrelation->rd_rel->reltype))
- TypeRename(targetrelation->rd_rel->reltype, newrelname, namespaceId);
+ RenameTypeInternal(targetrelation->rd_rel->reltype,
+ newrelname, namespaceId);
+
+ /*
+ * Also rename the associated constraint, if any.
+ */
+ if (targetrelation->rd_rel->relkind == RELKIND_INDEX)
+ {
+ Oid constraintId = get_index_constraint(myrelid);
+
+ if (OidIsValid(constraintId))
+ RenameConstraintById(constraintId, newrelname);
+ }
/*
* Close rel, but keep exclusive lock!
}
/*
+ * Disallow ALTER TABLE (and similar commands) when the current backend has
+ * any open reference to the target table besides the one just acquired by
+ * the calling command; this implies there's an open cursor or active plan.
+ * We need this check because our AccessExclusiveLock doesn't protect us
+ * against stomping on our own foot, only other people's feet!
+ *
+ * For ALTER TABLE, the only case known to cause serious trouble is ALTER
+ * COLUMN TYPE, and some changes are obviously pretty benign, so this could
+ * possibly be relaxed to only error out for certain types of alterations.
+ * But the use-case for allowing any of these things is not obvious, so we
+ * won't work hard at it for now.
+ *
+ * We also reject these commands if there are any pending AFTER trigger events
+ * for the rel. This is certainly necessary for the rewriting variants of
+ * ALTER TABLE, because they don't preserve tuple TIDs and so the pending
+ * events would try to fetch the wrong tuples. It might be overly cautious
+ * in other cases, but again it seems better to err on the side of paranoia.
+ *
+ * REINDEX calls this with "rel" referencing the index to be rebuilt; here
+ * we are worried about active indexscans on the index. The trigger-event
+ * check can be skipped, since we are doing no damage to the parent table.
+ *
+ * The statement name (eg, "ALTER TABLE") is passed for use in error messages.
+ */
+void
+CheckTableNotInUse(Relation rel, const char *stmt)
+{
+ int expected_refcnt;
+
+ expected_refcnt = rel->rd_isnailed ? 2 : 1;
+ if (rel->rd_refcnt != expected_refcnt)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_IN_USE),
+ /* translator: first %s is a SQL command, eg ALTER TABLE */
+ errmsg("cannot %s \"%s\" because "
+ "it is being used by active queries in this session",
+ stmt, RelationGetRelationName(rel))));
+
+ if (rel->rd_rel->relkind != RELKIND_INDEX &&
+ AfterTriggerPendingOnRel(RelationGetRelid(rel)))
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_IN_USE),
+ /* translator: first %s is a SQL command, eg ALTER TABLE */
+ errmsg("cannot %s \"%s\" because "
+ "it has pending trigger events",
+ stmt, RelationGetRelationName(rel))));
+}
+
+/*
* AlterTable
* Execute ALTER TABLE, which can be a list of subcommands
*
* expressions that need to be evaluated with respect to the old table
* schema.
*
- * ATRewriteCatalogs performs phase 2 for each affected table (note that
- * phases 2 and 3 do no explicit recursion, since phase 1 already did it).
+ * ATRewriteCatalogs performs phase 2 for each affected table. (Note that
+ * phases 2 and 3 normally do no explicit recursion, since phase 1 already
+ * did it --- although some subcommands have to recurse in phase 2 instead.)
* Certain subcommands need to be performed before others to avoid
* unnecessary conflicts; for example, DROP COLUMN should come before
* ADD COLUMN. Therefore phase 1 divides the subcommands into multiple
{
Relation rel = relation_openrv(stmt->relation, AccessExclusiveLock);
- CheckTableNotInUse(rel);
+ CheckTableNotInUse(rel, "ALTER TABLE");
ATController(rel, stmt->cmds, interpretInhOption(stmt->relation->inhOpt));
}
/*
- * Disallow ALTER TABLE when the current backend has any open reference to
- * it besides the one we just got (such as an open cursor or active plan);
- * our AccessExclusiveLock doesn't protect us against stomping on our own
- * foot, only other people's feet!
- *
- * Note: the only case known to cause serious trouble is ALTER COLUMN TYPE,
- * and some changes are obviously pretty benign, so this could possibly be
- * relaxed to only error out for certain types of alterations. But the
- * use-case for allowing any of these things is not obvious, so we won't
- * work hard at it for now.
- *
- * We also reject ALTER TABLE if there are any pending AFTER trigger events
- * for the rel. This is certainly necessary for the rewriting variants of
- * ALTER TABLE, because they don't preserve tuple TIDs and so the pending
- * events would try to fetch the wrong tuples. It might be overly cautious
- * in other cases, but again it seems better to err on the side of paranoia.
- */
-static void
-CheckTableNotInUse(Relation rel)
-{
- int expected_refcnt;
-
- expected_refcnt = rel->rd_isnailed ? 2 : 1;
- if (rel->rd_refcnt != expected_refcnt)
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_IN_USE),
- errmsg("relation \"%s\" is being used by active queries in this session",
- RelationGetRelationName(rel))));
-
- if (AfterTriggerPendingOnRel(RelationGetRelid(rel)))
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_IN_USE),
- errmsg("cannot alter table \"%s\" because it has pending trigger events",
- RelationGetRelationName(rel))));
-}
-
-/*
* AlterTableInternal
*
* ALTER TABLE with target specified by OID
break;
case AT_AddConstraint: /* ADD CONSTRAINT */
ATSimplePermissions(rel, false);
-
- /*
- * Currently we recurse only for CHECK constraints, never for
- * foreign-key constraints. UNIQUE/PKEY constraints won't be seen
- * here.
- */
- if (IsA(cmd->def, Constraint))
- ATSimpleRecursion(wqueue, rel, cmd, recurse);
- /* No command-specific prep needed */
+ /* Recursion occurs during execution phase */
+ /* No command-specific prep needed except saving recurse flag */
+ if (recurse)
+ cmd->subtype = AT_AddConstraintRecurse;
pass = AT_PASS_ADD_CONSTR;
break;
case AT_DropConstraint: /* DROP CONSTRAINT */
ATSimplePermissions(rel, false);
- /* Performs own recursion */
- ATPrepDropConstraint(wqueue, rel, recurse, cmd);
- pass = AT_PASS_DROP;
- break;
- case AT_DropConstraintQuietly: /* DROP CONSTRAINT for child */
- ATSimplePermissions(rel, false);
- ATSimpleRecursion(wqueue, rel, cmd, recurse);
- /* No command-specific prep needed */
+ /* Recursion occurs during execution phase */
+ /* No command-specific prep needed except saving recurse flag */
+ if (recurse)
+ cmd->subtype = AT_DropConstraintRecurse;
pass = AT_PASS_DROP;
break;
case AT_AlterColumnType: /* ALTER COLUMN TYPE */
rel = relation_open(tab->relid, NoLock);
foreach(lcmd, subcmds)
- ATExecCmd(tab, rel, (AlterTableCmd *) lfirst(lcmd));
+ ATExecCmd(wqueue, tab, rel, (AlterTableCmd *) lfirst(lcmd));
/*
* After the ALTER TYPE pass, do cleanup work (this is not done in
* ATExecCmd: dispatch a subcommand to appropriate execution routine
*/
static void
-ATExecCmd(AlteredTableInfo *tab, Relation rel, AlterTableCmd *cmd)
+ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
+ AlterTableCmd *cmd)
{
switch (cmd->subtype)
{
ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, true);
break;
case AT_AddConstraint: /* ADD CONSTRAINT */
- ATExecAddConstraint(tab, rel, cmd->def);
+ ATExecAddConstraint(wqueue, tab, rel, cmd->def, false);
+ break;
+ case AT_AddConstraintRecurse: /* ADD CONSTRAINT with recursion */
+ ATExecAddConstraint(wqueue, tab, rel, cmd->def, true);
break;
case AT_DropConstraint: /* DROP CONSTRAINT */
- ATExecDropConstraint(rel, cmd->name, cmd->behavior, false);
+ ATExecDropConstraint(rel, cmd->name, cmd->behavior, false, false);
break;
- case AT_DropConstraintQuietly: /* DROP CONSTRAINT for child */
- ATExecDropConstraint(rel, cmd->name, cmd->behavior, true);
+ case AT_DropConstraintRecurse: /* DROP CONSTRAINT with recursion */
+ ATExecDropConstraint(rel, cmd->name, cmd->behavior, true, false);
break;
case AT_AlterColumnType: /* ALTER COLUMN TYPE */
ATExecAlterColumnType(tab, rel, cmd->name, (TypeName *) cmd->def);
if (childrelid == relid)
continue;
childrel = relation_open(childrelid, AccessExclusiveLock);
- CheckTableNotInUse(childrel);
+ CheckTableNotInUse(childrel, "ALTER TABLE");
ATPrepCmd(wqueue, childrel, cmd, false, true);
relation_close(childrel, NoLock);
}
Relation childrel;
childrel = relation_open(childrelid, AccessExclusiveLock);
- CheckTableNotInUse(childrel);
+ CheckTableNotInUse(childrel, "ALTER TABLE");
ATPrepCmd(wqueue, childrel, cmd, true, true);
relation_close(childrel, NoLock);
}
* This function is intended for CREATE TABLE, so it processes a
* _list_ of defaults, but we just do one.
*/
- AddRelationRawConstraints(rel, list_make1(rawEnt), NIL);
+ AddRelationNewConstraints(rel, list_make1(rawEnt), NIL, false, true);
/* Make the additional catalog changes visible */
CommandCounterIncrement();
* the constraints more directly.)
*
* Note: we use build_column_default, and not just the cooked default
- * returned by AddRelationRawConstraints, so that the right thing happens
+ * returned by AddRelationNewConstraints, so that the right thing happens
* when a datatype's default applies.
*/
defval = (Expr *) build_column_default(rel, attribute->attnum);
}
/*
+ * If the new column is NOT NULL, tell Phase 3 it needs to test that.
+ */
+ tab->new_notnull |= colDef->is_not_null;
+
+ /*
* Add needed dependency entries for the new column.
*/
add_column_datatype_dependency(myrelid, i, attribute->atttypid);
* This function is intended for CREATE TABLE, so it processes a
* _list_ of defaults, but we just do one.
*/
- AddRelationRawConstraints(rel, list_make1(rawEnt), NIL);
+ AddRelationNewConstraints(rel, list_make1(rawEnt), NIL, false, true);
}
}
Form_pg_attribute childatt;
childrel = heap_open(childrelid, AccessExclusiveLock);
- CheckTableNotInUse(childrel);
+ CheckTableNotInUse(childrel, "ALTER TABLE");
tuple = SearchSysCacheCopyAttName(childrelid, colName);
if (!HeapTupleIsValid(tuple)) /* shouldn't happen */
{
/*
* If we were told to drop ONLY in this table (no recursion),
- * we need to mark the inheritors' attribute as locally
+ * we need to mark the inheritors' attributes as locally
* defined rather than inherited.
*/
childatt->attinhcount--;
* ALTER TABLE ADD CONSTRAINT
*/
static void
-ATExecAddConstraint(AlteredTableInfo *tab, Relation rel, Node *newConstraint)
+ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
+ Node *newConstraint, bool recurse)
{
switch (nodeTag(newConstraint))
{
switch (constr->contype)
{
case CONSTR_CHECK:
- {
- List *newcons;
- ListCell *lcon;
-
- /*
- * Call AddRelationRawConstraints to do the work.
- * It returns a list of cooked constraints.
- */
- newcons = AddRelationRawConstraints(rel, NIL,
- list_make1(constr));
- /* Add each constraint to Phase 3's queue */
- foreach(lcon, newcons)
- {
- CookedConstraint *ccon = (CookedConstraint *) lfirst(lcon);
- NewConstraint *newcon;
-
- newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
- newcon->name = ccon->name;
- newcon->contype = ccon->contype;
- /* ExecQual wants implicit-AND format */
- newcon->qual = (Node *)
- make_ands_implicit((Expr *) ccon->expr);
-
- tab->constraints = lappend(tab->constraints,
- newcon);
- }
- break;
- }
+ ATAddCheckConstraint(wqueue, tab, rel,
+ constr, recurse, false);
+ break;
default:
elog(ERROR, "unrecognized constraint type: %d",
(int) constr->contype);
{
FkConstraint *fkconstraint = (FkConstraint *) newConstraint;
- /*
- * Assign or validate constraint name
- */
- if (fkconstraint->constr_name)
- {
- if (ConstraintNameIsUsed(CONSTRAINT_RELATION,
- RelationGetRelid(rel),
- RelationGetNamespace(rel),
- fkconstraint->constr_name))
- ereport(ERROR,
- (errcode(ERRCODE_DUPLICATE_OBJECT),
- errmsg("constraint \"%s\" for relation \"%s\" already exists",
- fkconstraint->constr_name,
- RelationGetRelationName(rel))));
- }
- else
- fkconstraint->constr_name =
- ChooseConstraintName(RelationGetRelationName(rel),
- strVal(linitial(fkconstraint->fk_attrs)),
- "fkey",
- RelationGetNamespace(rel),
- NIL);
+ /*
+ * Note that we currently never recurse for FK constraints,
+ * so the "recurse" flag is silently ignored.
+ *
+ * Assign or validate constraint name
+ */
+ if (fkconstraint->constr_name)
+ {
+ if (ConstraintNameIsUsed(CONSTRAINT_RELATION,
+ RelationGetRelid(rel),
+ RelationGetNamespace(rel),
+ fkconstraint->constr_name))
+ ereport(ERROR,
+ (errcode(ERRCODE_DUPLICATE_OBJECT),
+ errmsg("constraint \"%s\" for relation \"%s\" already exists",
+ fkconstraint->constr_name,
+ RelationGetRelationName(rel))));
+ }
+ else
+ fkconstraint->constr_name =
+ ChooseConstraintName(RelationGetRelationName(rel),
+ strVal(linitial(fkconstraint->fk_attrs)),
+ "fkey",
+ RelationGetNamespace(rel),
+ NIL);
+
+ ATAddForeignKeyConstraint(tab, rel, fkconstraint);
+
+ break;
+ }
+ default:
+ elog(ERROR, "unrecognized node type: %d",
+ (int) nodeTag(newConstraint));
+ }
+}
+
+/*
+ * Add a check constraint to a single table and its children
+ *
+ * Subroutine for ATExecAddConstraint.
+ *
+ * We must recurse to child tables during execution, rather than using
+ * ALTER TABLE's normal prep-time recursion. The reason is that all the
+ * constraints *must* be given the same name, else they won't be seen as
+ * related later. If the user didn't explicitly specify a name, then
+ * AddRelationNewConstraints would normally assign different names to the
+ * child constraints. To fix that, we must capture the name assigned at
+ * the parent table and pass that down.
+ */
+static void
+ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
+ Constraint *constr, bool recurse, bool recursing)
+{
+ List *newcons;
+ ListCell *lcon;
+ List *children;
+ ListCell *child;
+
+ /* At top level, permission check was done in ATPrepCmd, else do it */
+ if (recursing)
+ ATSimplePermissions(rel, false);
+
+ /*
+ * Call AddRelationNewConstraints to do the work, making sure it works on
+ * a copy of the Constraint so transformExpr can't modify the original.
+ * It returns a list of cooked constraints.
+ *
+ * If the constraint ends up getting merged with a pre-existing one, it's
+ * omitted from the returned list, which is what we want: we do not need
+ * to do any validation work. That can only happen at child tables,
+ * though, since we disallow merging at the top level.
+ */
+ newcons = AddRelationNewConstraints(rel, NIL,
+ list_make1(copyObject(constr)),
+ recursing, !recursing);
+
+ /* Add each constraint to Phase 3's queue */
+ foreach(lcon, newcons)
+ {
+ CookedConstraint *ccon = (CookedConstraint *) lfirst(lcon);
+ NewConstraint *newcon;
+
+ newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
+ newcon->name = ccon->name;
+ newcon->contype = ccon->contype;
+ /* ExecQual wants implicit-AND format */
+ newcon->qual = (Node *) make_ands_implicit((Expr *) ccon->expr);
+
+ tab->constraints = lappend(tab->constraints, newcon);
+
+ /* Save the actually assigned name if it was defaulted */
+ if (constr->name == NULL)
+ constr->name = ccon->name;
+ }
+
+ /* At this point we must have a locked-down name to use */
+ Assert(constr->name != NULL);
+
+ /* Advance command counter in case same table is visited multiple times */
+ CommandCounterIncrement();
+
+ /*
+ * Propagate to children as appropriate. Unlike most other ALTER
+ * routines, we have to do this one level of recursion at a time; we can't
+ * use find_all_inheritors to do it in one pass.
+ */
+ children = find_inheritance_children(RelationGetRelid(rel));
- ATAddForeignKeyConstraint(tab, rel, fkconstraint);
+ /*
+ * If we are told not to recurse, there had better not be any child
+ * tables; else the addition would put them out of step.
+ */
+ if (children && !recurse)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg("constraint must be added to child tables too")));
- break;
- }
- default:
- elog(ERROR, "unrecognized node type: %d",
- (int) nodeTag(newConstraint));
+ foreach(child, children)
+ {
+ Oid childrelid = lfirst_oid(child);
+ Relation childrel;
+ AlteredTableInfo *childtab;
+
+ childrel = heap_open(childrelid, AccessExclusiveLock);
+ CheckTableNotInUse(childrel, "ALTER TABLE");
+
+ /* Find or create work queue entry for this table */
+ childtab = ATGetQueueEntry(wqueue, childrel);
+
+ /* Recurse to child */
+ ATAddCheckConstraint(wqueue, childtab, childrel,
+ constr, recurse, true);
+
+ heap_close(childrel, NoLock);
}
}
/*
* Otherwise, look for an implicit cast from the FK type to the
* opcintype, and if found, use the primary equality operator.
- * This is a bit tricky because opcintype might be a generic type
- * such as ANYARRAY, and so what we have to test is whether the
- * two actual column types can be concurrently cast to that type.
- * (Otherwise, we'd fail to reject combinations such as int[] and
- * point[].)
+ * This is a bit tricky because opcintype might be a polymorphic
+ * type such as ANYARRAY or ANYENUM; so what we have to test is
+ * whether the two actual column types can be concurrently cast to
+ * that type. (Otherwise, we'd fail to reject combinations such
+ * as int[] and point[].)
*/
Oid input_typeids[2];
Oid target_typeids[2];
indexOid,
NULL, /* no check constraint */
NULL,
- NULL);
+ NULL,
+ true, /* islocal */
+ 0); /* inhcount */
/*
* Create the triggers that will enforce the constraint.
/*
* ALTER TABLE DROP CONSTRAINT
+ *
+ * Like DROP COLUMN, we can't use the normal ALTER TABLE recursion mechanism.
*/
static void
-ATPrepDropConstraint(List **wqueue, Relation rel,
- bool recurse, AlterTableCmd *cmd)
+ATExecDropConstraint(Relation rel, const char *constrName,
+ DropBehavior behavior,
+ bool recurse, bool recursing)
{
+ List *children;
+ ListCell *child;
+ Relation conrel;
+ Form_pg_constraint con;
+ SysScanDesc scan;
+ ScanKeyData key;
+ HeapTuple tuple;
+ bool found = false;
+ bool is_check_constraint = false;
+
+ /* At top level, permission check was done in ATPrepCmd, else do it */
+ if (recursing)
+ ATSimplePermissions(rel, false);
+
+ conrel = heap_open(ConstraintRelationId, RowExclusiveLock);
+
/*
- * We don't want errors or noise from child tables, so we have to pass
- * down a modified command.
+ * Find and drop the target constraint
*/
- if (recurse)
+ ScanKeyInit(&key,
+ Anum_pg_constraint_conrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(RelationGetRelid(rel)));
+ scan = systable_beginscan(conrel, ConstraintRelidIndexId,
+ true, SnapshotNow, 1, &key);
+
+ while (HeapTupleIsValid(tuple = systable_getnext(scan)))
{
- AlterTableCmd *childCmd = copyObject(cmd);
+ ObjectAddress conobj;
+
+ con = (Form_pg_constraint) GETSTRUCT(tuple);
+
+ if (strcmp(NameStr(con->conname), constrName) != 0)
+ continue;
+
+ /* Don't drop inherited constraints */
+ if (con->coninhcount > 0 && !recursing)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg("cannot drop inherited constraint \"%s\" of relation \"%s\"",
+ constrName, RelationGetRelationName(rel))));
+
+ /* Right now only CHECK constraints can be inherited */
+ if (con->contype == CONSTRAINT_CHECK)
+ is_check_constraint = true;
+
+ /*
+ * Perform the actual constraint deletion
+ */
+ conobj.classId = ConstraintRelationId;
+ conobj.objectId = HeapTupleGetOid(tuple);
+ conobj.objectSubId = 0;
- childCmd->subtype = AT_DropConstraintQuietly;
- ATSimpleRecursion(wqueue, rel, childCmd, recurse);
+ performDeletion(&conobj, behavior);
+
+ found = true;
}
-}
-static void
-ATExecDropConstraint(Relation rel, const char *constrName,
- DropBehavior behavior, bool quiet)
-{
- int deleted;
+ systable_endscan(scan);
+
+ if (!found)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("constraint \"%s\" of relation \"%s\" does not exist",
+ constrName, RelationGetRelationName(rel))));
- deleted = RemoveRelConstraints(rel, constrName, behavior);
+ /*
+ * Propagate to children as appropriate. Unlike most other ALTER
+ * routines, we have to do this one level of recursion at a time; we can't
+ * use find_all_inheritors to do it in one pass.
+ */
+ if (is_check_constraint)
+ children = find_inheritance_children(RelationGetRelid(rel));
+ else
+ children = NIL;
- if (!quiet)
+ foreach(child, children)
{
- /* If zero constraints deleted, complain */
- if (deleted == 0)
+ Oid childrelid = lfirst_oid(child);
+ Relation childrel;
+
+ childrel = heap_open(childrelid, AccessExclusiveLock);
+ CheckTableNotInUse(childrel, "ALTER TABLE");
+
+ ScanKeyInit(&key,
+ Anum_pg_constraint_conrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(childrelid));
+ scan = systable_beginscan(conrel, ConstraintRelidIndexId,
+ true, SnapshotNow, 1, &key);
+
+ found = false;
+
+ while (HeapTupleIsValid(tuple = systable_getnext(scan)))
+ {
+ HeapTuple copy_tuple;
+
+ con = (Form_pg_constraint) GETSTRUCT(tuple);
+
+ /* Right now only CHECK constraints can be inherited */
+ if (con->contype != CONSTRAINT_CHECK)
+ continue;
+
+ if (strcmp(NameStr(con->conname), constrName) != 0)
+ continue;
+
+ found = true;
+
+ if (con->coninhcount <= 0) /* shouldn't happen */
+ elog(ERROR, "relation %u has non-inherited constraint \"%s\"",
+ childrelid, constrName);
+
+ copy_tuple = heap_copytuple(tuple);
+ con = (Form_pg_constraint) GETSTRUCT(copy_tuple);
+
+ if (recurse)
+ {
+ /*
+ * If the child constraint has other definition sources,
+ * just decrement its inheritance count; if not, recurse
+ * to delete it.
+ */
+ if (con->coninhcount == 1 && !con->conislocal)
+ {
+ /* Time to delete this child constraint, too */
+ ATExecDropConstraint(childrel, constrName, behavior,
+ true, true);
+ }
+ else
+ {
+ /* Child constraint must survive my deletion */
+ con->coninhcount--;
+ simple_heap_update(conrel, ©_tuple->t_self, copy_tuple);
+ CatalogUpdateIndexes(conrel, copy_tuple);
+
+ /* Make update visible */
+ CommandCounterIncrement();
+ }
+ }
+ else
+ {
+ /*
+ * If we were told to drop ONLY in this table (no
+ * recursion), we need to mark the inheritors' constraints
+ * as locally defined rather than inherited.
+ */
+ con->coninhcount--;
+ con->conislocal = true;
+
+ simple_heap_update(conrel, ©_tuple->t_self, copy_tuple);
+ CatalogUpdateIndexes(conrel, copy_tuple);
+
+ /* Make update visible */
+ CommandCounterIncrement();
+ }
+
+ heap_freetuple(copy_tuple);
+ }
+
+ systable_endscan(scan);
+
+ if (!found)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
- errmsg("constraint \"%s\" does not exist",
- constrName)));
- /* Otherwise if more than one constraint deleted, notify */
- else if (deleted > 1)
- ereport(NOTICE,
- (errmsg("multiple constraints named \"%s\" were dropped",
- constrName)));
+ errmsg("constraint \"%s\" of relation \"%s\" does not exist",
+ constrName,
+ RelationGetRelationName(childrel))));
+
+ heap_close(childrel, NoLock);
}
+
+ heap_close(conrel, RowExclusiveLock);
}
/*
*/
RemoveAttrDefault(RelationGetRelid(rel), attnum, DROP_RESTRICT, true);
- StoreAttrDefault(rel, attnum, nodeToString(defaultexpr));
+ StoreAttrDefault(rel, attnum, defaultexpr);
}
/* Cleanup */
RelationGetRelationName(child_rel),
RelationGetRelationName(parent_rel))));
- /* Match up the columns and bump attinhcount and attislocal */
+ /* Match up the columns and bump attinhcount as needed */
MergeAttributesIntoExisting(child_rel, parent_rel);
- /* Match up the constraints and make sure they're present in child */
+ /* Match up the constraints and bump coninhcount as needed */
MergeConstraintsIntoExisting(child_rel, parent_rel);
/*
expr = DirectFunctionCall2(pg_get_expr, attr,
ObjectIdGetDatum(con->conrelid));
- return DatumGetCString(DirectFunctionCall1(textout, expr));
+ return TextDatumGetCString(expr);
+}
+
+/*
+ * Determine whether two check constraints are functionally equivalent
+ *
+ * The test we apply is to see whether they reverse-compile to the same
+ * source string. This insulates us from issues like whether attributes
+ * have the same physical column numbers in parent and child relations.
+ */
+static bool
+constraints_equivalent(HeapTuple a, HeapTuple b, TupleDesc tupleDesc)
+{
+ Form_pg_constraint acon = (Form_pg_constraint) GETSTRUCT(a);
+ Form_pg_constraint bcon = (Form_pg_constraint) GETSTRUCT(b);
+
+ if (acon->condeferrable != bcon->condeferrable ||
+ acon->condeferred != bcon->condeferred ||
+ strcmp(decompile_conbin(a, tupleDesc),
+ decompile_conbin(b, tupleDesc)) != 0)
+ return false;
+ else
+ return true;
}
/*
}
/*
- * Check constraints in child table match up with constraints in parent
+ * Check constraints in child table match up with constraints in parent,
+ * and increment their coninhcount.
*
* Called by ATExecAddInherit
*
static void
MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel)
{
- Relation catalogRelation;
- TupleDesc tupleDesc;
- SysScanDesc scan;
- ScanKeyData key;
- HeapTuple constraintTuple;
- ListCell *elem;
- List *constraints;
+ Relation catalog_relation;
+ TupleDesc tuple_desc;
+ SysScanDesc parent_scan;
+ ScanKeyData parent_key;
+ HeapTuple parent_tuple;
- /* First gather up the child's constraint definitions */
- catalogRelation = heap_open(ConstraintRelationId, AccessShareLock);
- tupleDesc = RelationGetDescr(catalogRelation);
+ catalog_relation = heap_open(ConstraintRelationId, RowExclusiveLock);
+ tuple_desc = RelationGetDescr(catalog_relation);
- ScanKeyInit(&key,
+ /* Outer loop scans through the parent's constraint definitions */
+ ScanKeyInit(&parent_key,
Anum_pg_constraint_conrelid,
BTEqualStrategyNumber, F_OIDEQ,
- ObjectIdGetDatum(RelationGetRelid(child_rel)));
- scan = systable_beginscan(catalogRelation, ConstraintRelidIndexId,
- true, SnapshotNow, 1, &key);
+ ObjectIdGetDatum(RelationGetRelid(parent_rel)));
+ parent_scan = systable_beginscan(catalog_relation, ConstraintRelidIndexId,
+ true, SnapshotNow, 1, &parent_key);
- constraints = NIL;
- while (HeapTupleIsValid(constraintTuple = systable_getnext(scan)))
+ while (HeapTupleIsValid(parent_tuple = systable_getnext(parent_scan)))
{
- Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(constraintTuple);
+ Form_pg_constraint parent_con = (Form_pg_constraint) GETSTRUCT(parent_tuple);
+ SysScanDesc child_scan;
+ ScanKeyData child_key;
+ HeapTuple child_tuple;
+ bool found = false;
- if (con->contype != CONSTRAINT_CHECK)
+ if (parent_con->contype != CONSTRAINT_CHECK)
continue;
- constraints = lappend(constraints, heap_copytuple(constraintTuple));
- }
+ /* Search for a child constraint matching this one */
+ ScanKeyInit(&child_key,
+ Anum_pg_constraint_conrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(RelationGetRelid(child_rel)));
+ child_scan = systable_beginscan(catalog_relation, ConstraintRelidIndexId,
+ true, SnapshotNow, 1, &child_key);
- systable_endscan(scan);
+ while (HeapTupleIsValid(child_tuple = systable_getnext(child_scan)))
+ {
+ Form_pg_constraint child_con = (Form_pg_constraint) GETSTRUCT(child_tuple);
+ HeapTuple child_copy;
- /* Then scan through the parent's constraints looking for matches */
- ScanKeyInit(&key,
- Anum_pg_constraint_conrelid,
- BTEqualStrategyNumber, F_OIDEQ,
- ObjectIdGetDatum(RelationGetRelid(parent_rel)));
- scan = systable_beginscan(catalogRelation, ConstraintRelidIndexId, true,
- SnapshotNow, 1, &key);
+ if (child_con->contype != CONSTRAINT_CHECK)
+ continue;
- while (HeapTupleIsValid(constraintTuple = systable_getnext(scan)))
- {
- Form_pg_constraint parent_con = (Form_pg_constraint) GETSTRUCT(constraintTuple);
- bool found = false;
- Form_pg_constraint child_con = NULL;
- HeapTuple child_contuple = NULL;
+ if (strcmp(NameStr(parent_con->conname),
+ NameStr(child_con->conname)) != 0)
+ continue;
- if (parent_con->contype != CONSTRAINT_CHECK)
- continue;
+ if (!constraints_equivalent(parent_tuple, child_tuple, tuple_desc))
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("child table \"%s\" has different definition for check constraint \"%s\"",
+ RelationGetRelationName(child_rel),
+ NameStr(parent_con->conname))));
- foreach(elem, constraints)
- {
- child_contuple = (HeapTuple) lfirst(elem);
- child_con = (Form_pg_constraint) GETSTRUCT(child_contuple);
- if (strcmp(NameStr(parent_con->conname),
- NameStr(child_con->conname)) == 0)
- {
- found = true;
- break;
- }
+ /*
+ * OK, bump the child constraint's inheritance count. (If we fail
+ * later on, this change will just roll back.)
+ */
+ child_copy = heap_copytuple(child_tuple);
+ child_con = (Form_pg_constraint) GETSTRUCT(child_copy);
+ child_con->coninhcount++;
+ simple_heap_update(catalog_relation, &child_copy->t_self, child_copy);
+ CatalogUpdateIndexes(catalog_relation, child_copy);
+ heap_freetuple(child_copy);
+
+ found = true;
+ break;
}
+ systable_endscan(child_scan);
+
if (!found)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("child table is missing constraint \"%s\"",
NameStr(parent_con->conname))));
-
- if (parent_con->condeferrable != child_con->condeferrable ||
- parent_con->condeferred != child_con->condeferred ||
- strcmp(decompile_conbin(constraintTuple, tupleDesc),
- decompile_conbin(child_contuple, tupleDesc)) != 0)
- ereport(ERROR,
- (errcode(ERRCODE_DATATYPE_MISMATCH),
- errmsg("constraint definition for check constraint \"%s\" does not match",
- NameStr(parent_con->conname))));
-
- /*
- * TODO: add conislocal,coninhcount to constraints. This is where we
- * would have to bump them just like attributes
- */
}
- systable_endscan(scan);
- heap_close(catalogRelation, AccessShareLock);
+ systable_endscan(parent_scan);
+ heap_close(catalog_relation, RowExclusiveLock);
}
/*
* parent then its columns will never be automatically dropped which may
* surprise. But at least we'll never surprise by dropping columns someone
* isn't expecting to be dropped which would actually mean data loss.
+ *
+ * coninhcount and conislocal for inherited constraints are adjusted in
+ * exactly the same way.
*/
static void
ATExecDropInherit(Relation rel, RangeVar *parent)
ScanKeyData key[3];
HeapTuple inheritsTuple,
attributeTuple,
+ constraintTuple,
depTuple;
+ List *connames;
bool found = false;
/*
heap_close(catalogRelation, RowExclusiveLock);
/*
+ * Likewise, find inherited check constraints and disinherit them.
+ * To do this, we first need a list of the names of the parent's check
+ * constraints. (We cheat a bit by only checking for name matches,
+ * assuming that the expressions will match.)
+ */
+ catalogRelation = heap_open(ConstraintRelationId, RowExclusiveLock);
+ ScanKeyInit(&key[0],
+ Anum_pg_constraint_conrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(RelationGetRelid(parent_rel)));
+ scan = systable_beginscan(catalogRelation, ConstraintRelidIndexId,
+ true, SnapshotNow, 1, key);
+
+ connames = NIL;
+
+ while (HeapTupleIsValid(constraintTuple = systable_getnext(scan)))
+ {
+ Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(constraintTuple);
+
+ if (con->contype == CONSTRAINT_CHECK)
+ connames = lappend(connames, pstrdup(NameStr(con->conname)));
+ }
+
+ systable_endscan(scan);
+
+ /* Now scan the child's constraints */
+ ScanKeyInit(&key[0],
+ Anum_pg_constraint_conrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(RelationGetRelid(rel)));
+ scan = systable_beginscan(catalogRelation, ConstraintRelidIndexId,
+ true, SnapshotNow, 1, key);
+
+ while (HeapTupleIsValid(constraintTuple = systable_getnext(scan)))
+ {
+ Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(constraintTuple);
+ bool match;
+ ListCell *lc;
+
+ if (con->contype != CONSTRAINT_CHECK)
+ continue;
+
+ match = false;
+ foreach (lc, connames)
+ {
+ if (strcmp(NameStr(con->conname), (char *) lfirst(lc)) == 0)
+ {
+ match = true;
+ break;
+ }
+ }
+
+ if (match)
+ {
+ /* Decrement inhcount and possibly set islocal to true */
+ HeapTuple copyTuple = heap_copytuple(constraintTuple);
+ Form_pg_constraint copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple);
+ if (copy_con->coninhcount <= 0) /* shouldn't happen */
+ elog(ERROR, "relation %u has non-inherited constraint \"%s\"",
+ RelationGetRelid(rel), NameStr(copy_con->conname));
+
+ copy_con->coninhcount--;
+ if (copy_con->coninhcount == 0)
+ copy_con->conislocal = true;
+
+ simple_heap_update(catalogRelation, ©Tuple->t_self, copyTuple);
+ CatalogUpdateIndexes(catalogRelation, copyTuple);
+ heap_freetuple(copyTuple);
+ }
+ }
+
+ systable_endscan(scan);
+ heap_close(catalogRelation, RowExclusiveLock);
+
+ /*
* Drop the dependency
*
* There's no convenient way to do this, so go trawling through pg_depend