* tablecmds.c
* Commands for creating and altering table structures and settings
*
- * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.177 2006/01/30 16:18:58 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.256 2008/06/14 18:04:33 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/genam.h"
-#include "access/tuptoaster.h"
+#include "access/heapam.h"
+#include "access/reloptions.h"
+#include "access/sysattr.h"
+#include "access/xact.h"
#include "catalog/catalog.h"
#include "catalog/dependency.h"
#include "catalog/heap.h"
#include "catalog/pg_inherits.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_opclass.h"
+#include "catalog/pg_tablespace.h"
#include "catalog/pg_trigger.h"
#include "catalog/pg_type.h"
+#include "catalog/pg_type_fn.h"
+#include "catalog/toasting.h"
#include "commands/cluster.h"
#include "commands/defrem.h"
+#include "commands/sequence.h"
#include "commands/tablecmds.h"
#include "commands/tablespace.h"
#include "commands/trigger.h"
#include "commands/typecmds.h"
#include "executor/executor.h"
-#include "lib/stringinfo.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
+#include "nodes/parsenodes.h"
#include "optimizer/clauses.h"
#include "optimizer/plancat.h"
#include "optimizer/prep.h"
-#include "parser/analyze.h"
#include "parser/gramparse.h"
-#include "parser/parser.h"
#include "parser/parse_clause.h"
#include "parser/parse_coerce.h"
#include "parser/parse_expr.h"
#include "parser/parse_oper.h"
#include "parser/parse_relation.h"
#include "parser/parse_type.h"
+#include "parser/parse_utilcmd.h"
+#include "parser/parser.h"
+#include "rewrite/rewriteDefine.h"
#include "rewrite/rewriteHandler.h"
+#include "storage/bufmgr.h"
+#include "storage/lmgr.h"
#include "storage/smgr.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/relcache.h"
+#include "utils/snapmgr.h"
#include "utils/syscache.h"
+#include "utils/tqual.h"
/*
/* Information saved by Phases 1/2 for Phase 3: */
List *constraints; /* List of NewConstraint */
List *newvals; /* List of NewColumnValue */
+ bool new_notnull; /* T if we added new NOT NULL constraints */
Oid newTableSpace; /* new tablespace; 0 means no change */
/* Objects to rebuild after completing ALTER TYPE operations */
List *changedConstraintOids; /* OIDs of constraints to rebuild */
} AlteredTableInfo;
/* Struct describing one new constraint to check in Phase 3 scan */
+/* Note: new NOT NULL constraints are handled elsewhere */
typedef struct NewConstraint
{
char *name; /* Constraint name, or NULL if none */
- ConstrType contype; /* CHECK, NOT_NULL, or FOREIGN */
- AttrNumber attnum; /* only relevant for NOT_NULL */
+ ConstrType contype; /* CHECK or FOREIGN */
Oid refrelid; /* PK rel, if FOREIGN */
+ Oid conid; /* OID of pg_constraint entry, if FOREIGN */
Node *qual; /* Check expr or FkConstraint struct */
List *qualstate; /* Execution state for CHECK */
} NewConstraint;
ExprState *exprstate; /* execution state */
} NewColumnValue;
-
+/*
+ * Error-reporting support for RemoveRelations
+ */
+struct dropmsgstrings
+{
+ char kind;
+ int nonexistent_code;
+ const char *nonexistent_msg;
+ const char *skipping_msg;
+ const char *nota_msg;
+ const char *drophint_msg;
+};
+
+static const struct dropmsgstrings dropmsgstringarray[] = {
+ {RELKIND_RELATION,
+ ERRCODE_UNDEFINED_TABLE,
+ gettext_noop("table \"%s\" does not exist"),
+ gettext_noop("table \"%s\" does not exist, skipping"),
+ gettext_noop("\"%s\" is not a table"),
+ gettext_noop("Use DROP TABLE to remove a table.")},
+ {RELKIND_SEQUENCE,
+ ERRCODE_UNDEFINED_TABLE,
+ gettext_noop("sequence \"%s\" does not exist"),
+ gettext_noop("sequence \"%s\" does not exist, skipping"),
+ gettext_noop("\"%s\" is not a sequence"),
+ gettext_noop("Use DROP SEQUENCE to remove a sequence.")},
+ {RELKIND_VIEW,
+ ERRCODE_UNDEFINED_TABLE,
+ gettext_noop("view \"%s\" does not exist"),
+ gettext_noop("view \"%s\" does not exist, skipping"),
+ gettext_noop("\"%s\" is not a view"),
+ gettext_noop("Use DROP VIEW to remove a view.")},
+ {RELKIND_INDEX,
+ ERRCODE_UNDEFINED_OBJECT,
+ gettext_noop("index \"%s\" does not exist"),
+ gettext_noop("index \"%s\" does not exist, skipping"),
+ gettext_noop("\"%s\" is not an index"),
+ gettext_noop("Use DROP INDEX to remove an index.")},
+ {RELKIND_COMPOSITE_TYPE,
+ ERRCODE_UNDEFINED_OBJECT,
+ gettext_noop("type \"%s\" does not exist"),
+ gettext_noop("type \"%s\" does not exist, skipping"),
+ gettext_noop("\"%s\" is not a type"),
+ gettext_noop("Use DROP TYPE to remove a type.")},
+ {'\0', 0, NULL, NULL, NULL, NULL}
+};
+
+
+static void truncate_check_rel(Relation rel);
static List *MergeAttributes(List *schema, List *supers, bool istemp,
List **supOids, List **supconstr, int *supOidCount);
-static bool change_varattnos_of_a_node(Node *node, const AttrNumber *newattno);
+static bool MergeCheckConstraint(List *constraints, char *name, Node *expr);
+static bool change_varattnos_walker(Node *node, const AttrNumber *newattno);
+static void MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel);
+static void MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel);
static void StoreCatalogInheritance(Oid relationId, List *supers);
+static void StoreCatalogInheritance1(Oid relationId, Oid parentOid,
+ int16 seqNumber, Relation inhRelation);
static int findAttrByName(const char *attributeName, List *schema);
static void setRelhassubclassInRelation(Oid relationId, bool relhassubclass);
-static bool needs_toast_table(Relation rel);
static void AlterIndexNamespaces(Relation classRel, Relation rel,
Oid oldNspOid, Oid newNspOid);
static void AlterSeqNamespaces(Relation classRel, Relation rel,
int numattrs, int16 *attnums,
Oid *opclasses);
static void validateForeignKeyConstraint(FkConstraint *fkconstraint,
- Relation rel, Relation pkrel);
+ Relation rel, Relation pkrel, Oid constraintOid);
static void createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint,
- Oid constrOid);
-static char *fkMatchTypeToString(char match_type);
+ Oid constraintOid);
static void ATController(Relation rel, List *cmds, bool recurse);
static void ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
bool recurse, bool recursing);
static void ATRewriteCatalogs(List **wqueue);
-static void ATExecCmd(AlteredTableInfo *tab, Relation rel, AlterTableCmd *cmd);
+static void ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
+ AlterTableCmd *cmd);
static void ATRewriteTables(List **wqueue);
static void ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap);
static AlteredTableInfo *ATGetQueueEntry(List **wqueue, Relation rel);
static void ATSimplePermissions(Relation rel, bool allowView);
+static void ATSimplePermissionsRelationOrIndex(Relation rel);
static void ATSimpleRecursion(List **wqueue, Relation rel,
AlterTableCmd *cmd, bool recurse);
static void ATOneLevelRecursion(List **wqueue, Relation rel,
AlterTableCmd *cmd);
-static void find_composite_type_dependencies(Oid typeOid,
- const char *origTblName);
static void ATPrepAddColumn(List **wqueue, Relation rel, bool recurse,
AlterTableCmd *cmd);
static void ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
ColumnDef *colDef);
static void add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid);
-static void add_column_support_dependency(Oid relid, int32 attnum,
- RangeVar *support);
static void ATExecDropNotNull(Relation rel, const char *colName);
static void ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
const char *colName);
bool recurse, bool recursing);
static void ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
IndexStmt *stmt, bool is_rebuild);
-static void ATExecAddConstraint(AlteredTableInfo *tab, Relation rel,
- Node *newConstraint);
+static void ATExecAddConstraint(List **wqueue,
+ AlteredTableInfo *tab, Relation rel,
+ Node *newConstraint, bool recurse);
+static void ATAddCheckConstraint(List **wqueue,
+ AlteredTableInfo *tab, Relation rel,
+ Constraint *constr,
+ bool recurse, bool recursing);
static void ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
FkConstraint *fkconstraint);
-static void ATPrepDropConstraint(List **wqueue, Relation rel,
- bool recurse, AlterTableCmd *cmd);
static void ATExecDropConstraint(Relation rel, const char *constrName,
- DropBehavior behavior, bool quiet);
+ DropBehavior behavior,
+ bool recurse, bool recursing);
static void ATPrepAlterColumnType(List **wqueue,
AlteredTableInfo *tab, Relation rel,
bool recurse, bool recursing,
static void ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel,
char *tablespacename);
static void ATExecSetTableSpace(Oid tableOid, Oid newTableSpace);
+static void ATExecSetRelOptions(Relation rel, List *defList, bool isReset);
static void ATExecEnableDisableTrigger(Relation rel, char *trigname,
- bool enable, bool skip_system);
+ char fires_when, bool skip_system);
+static void ATExecEnableDisableRule(Relation rel, char *rulename,
+ char fires_when);
+static void ATExecAddInherit(Relation rel, RangeVar *parent);
+static void ATExecDropInherit(Relation rel, RangeVar *parent);
static void copy_relation_data(Relation rel, SMgrRelation dst);
-static void update_ri_trigger_args(Oid relid,
- const char *oldname,
- const char *newname,
- bool fk_scan,
- bool update_relname);
/* ----------------------------------------------------------------
bool localHasOids;
int parentOidCount;
List *rawDefaults;
+ List *cookedDefaults;
+ Datum reloptions;
ListCell *listptr;
- int i;
AttrNumber attnum;
/*
}
/*
- * Select tablespace to use. If not specified, use default_tablespace
+ * Select tablespace to use. If not specified, use default tablespace
* (which may in turn default to database's default).
*/
if (stmt->tablespacename)
}
else
{
- tablespaceId = GetDefaultTablespace();
+ tablespaceId = GetDefaultTablespace(stmt->relation->istemp);
/* note InvalidOid is OK in this case */
}
/* Check permissions except when using database's default */
- if (OidIsValid(tablespaceId))
+ if (OidIsValid(tablespaceId) && tablespaceId != MyDatabaseTableSpace)
{
AclResult aclresult;
}
/*
+ * Parse and validate reloptions, if any.
+ */
+ reloptions = transformRelOptions((Datum) 0, stmt->options, true, false);
+
+ (void) heap_reloptions(relkind, reloptions, true);
+
+ /*
* Look up inheritance ancestors and generate relation schema, including
* inherited attributes.
*/
&inheritOids, &old_constraints, &parentOidCount);
/*
- * Create a relation descriptor from the relation schema and create the
- * relation. Note that in this stage only inherited (pre-cooked) defaults
- * and constraints will be included into the new relation.
- * (BuildDescForRelation takes care of the inherited defaults, but we have
- * to copy inherited constraints here.)
+ * Create a tuple descriptor from the relation schema. Note that this
+ * deals with column names, types, and NOT NULL constraints, but not
+ * default values or CHECK constraints; we handle those below.
*/
descriptor = BuildDescForRelation(schema);
- localHasOids = interpretOidsOption(stmt->hasoids);
+ localHasOids = interpretOidsOption(stmt->options);
descriptor->tdhasoid = (localHasOids || parentOidCount > 0);
- if (old_constraints != NIL)
+ /*
+ * Find columns with default values and prepare for insertion of the
+ * defaults. Pre-cooked (that is, inherited) defaults go into a list of
+ * CookedConstraint structs that we'll pass to heap_create_with_catalog,
+ * while raw defaults go into a list of RawColumnDefault structs that
+ * will be processed by AddRelationNewConstraints. (We can't deal with
+ * raw expressions until we can do transformExpr.)
+ *
+ * We can set the atthasdef flags now in the tuple descriptor; this just
+ * saves StoreAttrDefault from having to do an immediate update of the
+ * pg_attribute rows.
+ */
+ rawDefaults = NIL;
+ cookedDefaults = NIL;
+ attnum = 0;
+
+ foreach(listptr, schema)
{
- ConstrCheck *check = (ConstrCheck *)
- palloc0(list_length(old_constraints) * sizeof(ConstrCheck));
- int ncheck = 0;
+ ColumnDef *colDef = lfirst(listptr);
- foreach(listptr, old_constraints)
+ attnum++;
+
+ if (colDef->raw_default != NULL)
{
- Constraint *cdef = (Constraint *) lfirst(listptr);
- bool dup = false;
+ RawColumnDefault *rawEnt;
- if (cdef->contype != CONSTR_CHECK)
- continue;
- Assert(cdef->name != NULL);
- Assert(cdef->raw_expr == NULL && cdef->cooked_expr != NULL);
+ Assert(colDef->cooked_default == NULL);
- /*
- * In multiple-inheritance situations, it's possible to inherit
- * the same grandparent constraint through multiple parents.
- * Hence, discard inherited constraints that match as to both name
- * and expression. Otherwise, gripe if the names conflict.
- */
- for (i = 0; i < ncheck; i++)
- {
- if (strcmp(check[i].ccname, cdef->name) != 0)
- continue;
- if (strcmp(check[i].ccbin, cdef->cooked_expr) == 0)
- {
- dup = true;
- break;
- }
- ereport(ERROR,
- (errcode(ERRCODE_DUPLICATE_OBJECT),
- errmsg("duplicate check constraint name \"%s\"",
- cdef->name)));
- }
- if (!dup)
- {
- check[ncheck].ccname = cdef->name;
- check[ncheck].ccbin = pstrdup(cdef->cooked_expr);
- ncheck++;
- }
+ rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault));
+ rawEnt->attnum = attnum;
+ rawEnt->raw_default = colDef->raw_default;
+ rawDefaults = lappend(rawDefaults, rawEnt);
+ descriptor->attrs[attnum - 1]->atthasdef = true;
}
- if (ncheck > 0)
+ else if (colDef->cooked_default != NULL)
{
- if (descriptor->constr == NULL)
- {
- descriptor->constr = (TupleConstr *) palloc(sizeof(TupleConstr));
- descriptor->constr->defval = NULL;
- descriptor->constr->num_defval = 0;
- descriptor->constr->has_not_null = false;
- }
- descriptor->constr->num_check = ncheck;
- descriptor->constr->check = check;
+ CookedConstraint *cooked;
+
+ cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint));
+ cooked->contype = CONSTR_DEFAULT;
+ cooked->name = NULL;
+ cooked->attnum = attnum;
+ cooked->expr = stringToNode(colDef->cooked_default);
+ cooked->is_local = true; /* not used for defaults */
+ cooked->inhcount = 0; /* ditto */
+ cookedDefaults = lappend(cookedDefaults, cooked);
+ descriptor->attrs[attnum - 1]->atthasdef = true;
}
}
+ /*
+ * Create the relation. Inherited defaults and constraints are passed
+ * in for immediate handling --- since they don't need parsing, they
+ * can be stored immediately.
+ */
relationId = heap_create_with_catalog(relname,
namespaceId,
tablespaceId,
InvalidOid,
GetUserId(),
descriptor,
+ list_concat(cookedDefaults,
+ old_constraints),
relkind,
false,
localHasOids,
parentOidCount,
stmt->oncommit,
+ reloptions,
allowSystemTableMods);
StoreCatalogInheritance(relationId, inheritOids);
* apply the parser's transformExpr routine, but transformExpr doesn't
* work unless we have a pre-existing relation. So, the transformation has
* to be postponed to this final step of CREATE TABLE.
- *
- * Another task that's conveniently done at this step is to add dependency
- * links between columns and supporting relations (such as SERIAL
- * sequences).
- *
- * First, scan schema to find new column defaults.
- */
- rawDefaults = NIL;
- attnum = 0;
-
- foreach(listptr, schema)
- {
- ColumnDef *colDef = lfirst(listptr);
-
- attnum++;
-
- if (colDef->raw_default != NULL)
- {
- RawColumnDefault *rawEnt;
-
- Assert(colDef->cooked_default == NULL);
-
- rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault));
- rawEnt->attnum = attnum;
- rawEnt->raw_default = colDef->raw_default;
- rawDefaults = lappend(rawDefaults, rawEnt);
- }
-
- /* Create dependency for supporting relation for this column */
- if (colDef->support != NULL)
- add_column_support_dependency(relationId, attnum, colDef->support);
- }
-
- /*
- * Parse and add the defaults/constraints, if any.
*/
if (rawDefaults || stmt->constraints)
- AddRelationRawConstraints(rel, rawDefaults, stmt->constraints);
+ AddRelationNewConstraints(rel, rawDefaults, stmt->constraints,
+ true, true);
/*
* Clean up. We keep lock on new relation (although it shouldn't be
}
/*
- * RemoveRelation
- * Deletes a relation.
+ * Emit the right error or warning message for a "DROP" command issued on a
+ * non-existent relation
+ */
+static void
+DropErrorMsgNonExistent(const char *relname, char rightkind, bool missing_ok)
+{
+ const struct dropmsgstrings *rentry;
+
+ for (rentry = dropmsgstringarray; rentry->kind != '\0'; rentry++)
+ {
+ if (rentry->kind == rightkind)
+ {
+ if (!missing_ok)
+ {
+ ereport(ERROR,
+ (errcode(rentry->nonexistent_code),
+ errmsg(rentry->nonexistent_msg, relname)));
+ }
+ else
+ {
+ ereport(NOTICE, (errmsg(rentry->skipping_msg, relname)));
+ break;
+ }
+ }
+ }
+
+ Assert(rentry->kind != '\0'); /* Should be impossible */
+}
+
+/*
+ * Emit the right error message for a "DROP" command issued on a
+ * relation of the wrong type
+ */
+static void
+DropErrorMsgWrongType(const char *relname, char wrongkind, char rightkind)
+{
+ const struct dropmsgstrings *rentry;
+ const struct dropmsgstrings *wentry;
+
+ for (rentry = dropmsgstringarray; rentry->kind != '\0'; rentry++)
+ if (rentry->kind == rightkind)
+ break;
+ Assert(rentry->kind != '\0');
+
+ for (wentry = dropmsgstringarray; wentry->kind != '\0'; wentry++)
+ if (wentry->kind == wrongkind)
+ break;
+ /* wrongkind could be something we don't have in our table... */
+
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg(rentry->nota_msg, relname),
+ (wentry->kind != '\0') ? errhint(wentry->drophint_msg) : 0));
+}
+
+/*
+ * RemoveRelations
+ * Implements DROP TABLE, DROP INDEX, DROP SEQUENCE, DROP VIEW
*/
void
-RemoveRelation(const RangeVar *relation, DropBehavior behavior)
+RemoveRelations(DropStmt *drop)
{
- Oid relOid;
- ObjectAddress object;
+ ObjectAddresses *objects;
+ char relkind;
+ ListCell *cell;
- relOid = RangeVarGetRelid(relation, false);
+ /*
+ * First we identify all the relations, then we delete them in a single
+ * performMultipleDeletions() call. This is to avoid unwanted
+ * DROP RESTRICT errors if one of the relations depends on another.
+ */
- object.classId = RelationRelationId;
- object.objectId = relOid;
- object.objectSubId = 0;
+ /* Determine required relkind */
+ switch (drop->removeType)
+ {
+ case OBJECT_TABLE:
+ relkind = RELKIND_RELATION;
+ break;
- performDeletion(&object, behavior);
+ case OBJECT_INDEX:
+ relkind = RELKIND_INDEX;
+ break;
+
+ case OBJECT_SEQUENCE:
+ relkind = RELKIND_SEQUENCE;
+ break;
+
+ case OBJECT_VIEW:
+ relkind = RELKIND_VIEW;
+ break;
+
+ default:
+ elog(ERROR, "unrecognized drop object type: %d",
+ (int) drop->removeType);
+ relkind = 0; /* keep compiler quiet */
+ break;
+ }
+
+ /* Lock and validate each relation; build a list of object addresses */
+ objects = new_object_addresses();
+
+ foreach(cell, drop->objects)
+ {
+ RangeVar *rel = makeRangeVarFromNameList((List *) lfirst(cell));
+ Oid relOid;
+ HeapTuple tuple;
+ Form_pg_class classform;
+ ObjectAddress obj;
+
+ /*
+ * These next few steps are a great deal like relation_openrv, but we
+ * don't bother building a relcache entry since we don't need it.
+ *
+ * Check for shared-cache-inval messages before trying to access the
+ * relation. This is needed to cover the case where the name
+ * identifies a rel that has been dropped and recreated since the
+ * start of our transaction: if we don't flush the old syscache entry,
+ * then we'll latch onto that entry and suffer an error later.
+ */
+ AcceptInvalidationMessages();
+
+ /* Look up the appropriate relation using namespace search */
+ relOid = RangeVarGetRelid(rel, true);
+
+ /* Not there? */
+ if (!OidIsValid(relOid))
+ {
+ DropErrorMsgNonExistent(rel->relname, relkind, drop->missing_ok);
+ continue;
+ }
+
+ /* Get the lock before trying to fetch the syscache entry */
+ LockRelationOid(relOid, AccessExclusiveLock);
+
+ tuple = SearchSysCache(RELOID,
+ ObjectIdGetDatum(relOid),
+ 0, 0, 0);
+ if (!HeapTupleIsValid(tuple))
+ elog(ERROR, "cache lookup failed for relation %u", relOid);
+ classform = (Form_pg_class) GETSTRUCT(tuple);
+
+ if (classform->relkind != relkind)
+ DropErrorMsgWrongType(rel->relname, classform->relkind, relkind);
+
+ /* Allow DROP to either table owner or schema owner */
+ if (!pg_class_ownercheck(relOid, GetUserId()) &&
+ !pg_namespace_ownercheck(classform->relnamespace, GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
+ rel->relname);
+
+ if (!allowSystemTableMods && IsSystemClass(classform))
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("permission denied: \"%s\" is a system catalog",
+ rel->relname)));
+
+ /* OK, we're ready to delete this one */
+ obj.classId = RelationRelationId;
+ obj.objectId = relOid;
+ obj.objectSubId = 0;
+
+ add_exact_object_address(&obj, objects);
+
+ ReleaseSysCache(tuple);
+ }
+
+ performMultipleDeletions(objects, drop->behavior);
+
+ free_object_addresses(objects);
}
/*
* ExecuteTruncate
* Executes a TRUNCATE command.
*
- * This is a multi-relation truncate. It first opens and grabs exclusive
- * locks on all relations involved, checking permissions and otherwise
- * verifying that the relation is OK for truncation. When they are all
- * open, it checks foreign key references on them, namely that FK references
- * are all internal to the group that's being truncated. Finally all
- * relations are truncated and reindexed.
+ * This is a multi-relation truncate. We first open and grab exclusive
+ * lock on all relations involved, checking permissions and otherwise
+ * verifying that the relation is OK for truncation. In CASCADE mode,
+ * relations having FK references to the targeted relations are automatically
+ * added to the group; in RESTRICT mode, we check that all FK references are
+ * internal to the group that's being truncated. Finally all the relations
+ * are truncated and reindexed.
*/
void
-ExecuteTruncate(List *relations)
+ExecuteTruncate(TruncateStmt *stmt)
{
List *rels = NIL;
+ List *relids = NIL;
+ List *seq_relids = NIL;
+ EState *estate;
+ ResultRelInfo *resultRelInfos;
+ ResultRelInfo *resultRelInfo;
ListCell *cell;
- foreach(cell, relations)
+ /*
+ * Open, exclusive-lock, and check all the explicitly-specified relations
+ */
+ foreach(cell, stmt->relations)
{
RangeVar *rv = lfirst(cell);
Relation rel;
- /* Grab exclusive lock in preparation for truncate */
rel = heap_openrv(rv, AccessExclusiveLock);
+ truncate_check_rel(rel);
+ rels = lappend(rels, rel);
+ relids = lappend_oid(relids, RelationGetRelid(rel));
+ }
- /* Only allow truncate on regular tables */
- if (rel->rd_rel->relkind != RELKIND_RELATION)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not a table",
- RelationGetRelationName(rel))));
+ /*
+ * In CASCADE mode, suck in all referencing relations as well. This
+ * requires multiple iterations to find indirectly-dependent relations. At
+ * each phase, we need to exclusive-lock new rels before looking for their
+ * dependencies, else we might miss something. Also, we check each rel as
+ * soon as we open it, to avoid a faux pas such as holding lock for a long
+ * time on a rel we have no permissions for.
+ */
+ if (stmt->behavior == DROP_CASCADE)
+ {
+ for (;;)
+ {
+ List *newrelids;
- /* Permissions checks */
- if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
- aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
- RelationGetRelationName(rel));
+ newrelids = heap_truncate_find_FKs(relids);
+ if (newrelids == NIL)
+ break; /* nothing else to add */
- if (!allowSystemTableMods && IsSystemRelation(rel))
- ereport(ERROR,
- (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
- errmsg("permission denied: \"%s\" is a system catalog",
- RelationGetRelationName(rel))));
+ foreach(cell, newrelids)
+ {
+ Oid relid = lfirst_oid(cell);
+ Relation rel;
- /*
- * We can never allow truncation of shared or nailed-in-cache
- * relations, because we can't support changing their relfilenode
- * values.
- */
- if (rel->rd_rel->relisshared || rel->rd_isnailed)
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("cannot truncate system relation \"%s\"",
- RelationGetRelationName(rel))));
+ rel = heap_open(relid, AccessExclusiveLock);
+ ereport(NOTICE,
+ (errmsg("truncate cascades to table \"%s\"",
+ RelationGetRelationName(rel))));
+ truncate_check_rel(rel);
+ rels = lappend(rels, rel);
+ relids = lappend_oid(relids, relid);
+ }
+ }
+ }
- /*
- * Don't allow truncate on temp tables of other backends ... their
- * local buffer manager is not going to cope.
- */
- if (isOtherTempNamespace(RelationGetNamespace(rel)))
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("cannot truncate temporary tables of other sessions")));
+ /*
+ * Check foreign key references. In CASCADE mode, this should be
+ * unnecessary since we just pulled in all the references; but as a
+ * cross-check, do it anyway if in an Assert-enabled build.
+ */
+#ifdef USE_ASSERT_CHECKING
+ heap_truncate_check_FKs(rels, false);
+#else
+ if (stmt->behavior == DROP_RESTRICT)
+ heap_truncate_check_FKs(rels, false);
+#endif
- /* Save it into the list of rels to truncate */
- rels = lappend(rels, rel);
+ /*
+ * If we are asked to restart sequences, find all the sequences,
+ * lock them (we only need AccessShareLock because that's all that
+ * ALTER SEQUENCE takes), and check permissions. We want to do this
+ * early since it's pointless to do all the truncation work only to fail
+ * on sequence permissions.
+ */
+ if (stmt->restart_seqs)
+ {
+ foreach(cell, rels)
+ {
+ Relation rel = (Relation) lfirst(cell);
+ List *seqlist = getOwnedSequences(RelationGetRelid(rel));
+ ListCell *seqcell;
+
+ foreach(seqcell, seqlist)
+ {
+ Oid seq_relid = lfirst_oid(seqcell);
+ Relation seq_rel;
+
+ seq_rel = relation_open(seq_relid, AccessShareLock);
+
+ /* This check must match AlterSequence! */
+ if (!pg_class_ownercheck(seq_relid, GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
+ RelationGetRelationName(seq_rel));
+
+ seq_relids = lappend_oid(seq_relids, seq_relid);
+
+ relation_close(seq_rel, NoLock);
+ }
+ }
}
+ /* Prepare to catch AFTER triggers. */
+ AfterTriggerBeginQuery();
+
/*
- * Check foreign key references.
+ * To fire triggers, we'll need an EState as well as a ResultRelInfo
+ * for each relation.
*/
- heap_truncate_check_FKs(rels, false);
+ estate = CreateExecutorState();
+ resultRelInfos = (ResultRelInfo *)
+ palloc(list_length(rels) * sizeof(ResultRelInfo));
+ resultRelInfo = resultRelInfos;
+ foreach(cell, rels)
+ {
+ Relation rel = (Relation) lfirst(cell);
+
+ InitResultRelInfo(resultRelInfo,
+ rel,
+ 0, /* dummy rangetable index */
+ CMD_DELETE, /* don't need any index info */
+ false);
+ resultRelInfo++;
+ }
+ estate->es_result_relations = resultRelInfos;
+ estate->es_num_result_relations = list_length(rels);
+
+ /*
+ * Process all BEFORE STATEMENT TRUNCATE triggers before we begin
+ * truncating (this is because one of them might throw an error).
+ * Also, if we were to allow them to prevent statement execution,
+ * that would need to be handled here.
+ */
+ resultRelInfo = resultRelInfos;
+ foreach(cell, rels)
+ {
+ estate->es_result_relation_info = resultRelInfo;
+ ExecBSTruncateTriggers(estate, resultRelInfo);
+ resultRelInfo++;
+ }
/*
* OK, truncate each table.
*/
foreach(cell, rels)
{
- Relation rel = lfirst(cell);
+ Relation rel = (Relation) lfirst(cell);
Oid heap_relid;
Oid toast_relid;
* the relfilenode value. The old storage file is scheduled for
* deletion at commit.
*/
- setNewRelfilenode(rel);
+ setNewRelfilenode(rel, RecentXmin);
heap_relid = RelationGetRelid(rel);
toast_relid = rel->rd_rel->reltoastrelid;
- heap_close(rel, NoLock);
-
/*
* The same for the toast table, if any.
*/
if (OidIsValid(toast_relid))
{
rel = relation_open(toast_relid, AccessExclusiveLock);
- setNewRelfilenode(rel);
+ setNewRelfilenode(rel, RecentXmin);
heap_close(rel, NoLock);
}
*/
reindex_relation(heap_relid, true);
}
+
+ /*
+ * Process all AFTER STATEMENT TRUNCATE triggers.
+ */
+ resultRelInfo = resultRelInfos;
+ foreach(cell, rels)
+ {
+ estate->es_result_relation_info = resultRelInfo;
+ ExecASTruncateTriggers(estate, resultRelInfo);
+ resultRelInfo++;
+ }
+
+ /* Handle queued AFTER triggers */
+ AfterTriggerEndQuery(estate);
+
+ /* We can clean up the EState now */
+ FreeExecutorState(estate);
+
+ /* And close the rels (can't do this while EState still holds refs) */
+ foreach(cell, rels)
+ {
+ Relation rel = (Relation) lfirst(cell);
+
+ heap_close(rel, NoLock);
+ }
+
+ /*
+ * Lastly, restart any owned sequences if we were asked to. This is done
+ * last because it's nontransactional: restarts will not roll back if
+ * we abort later. Hence it's important to postpone them as long as
+ * possible. (This is also a big reason why we locked and
+ * permission-checked the sequences beforehand.)
+ */
+ if (stmt->restart_seqs)
+ {
+ List *options = list_make1(makeDefElem("restart", NULL));
+
+ foreach(cell, seq_relids)
+ {
+ Oid seq_relid = lfirst_oid(cell);
+
+ AlterSequenceInternal(seq_relid, options);
+ }
+ }
}
-/*----------
- * MergeAttributes
- * Returns new schema given initial schema and superclasses.
- *
- * Input arguments:
- * 'schema' is the column/attribute definition for the table. (It's a list
- * of ColumnDef's.) It is destructively changed.
- * 'supers' is a list of names (as RangeVar nodes) of parent relations.
- * 'istemp' is TRUE if we are creating a temp relation.
- *
- * Output arguments:
- * 'supOids' receives a list of the OIDs of the parent relations.
- * 'supconstr' receives a list of constraints belonging to the parents,
- * updated as necessary to be valid for the child.
- * 'supOidCount' is set to the number of parents that have OID columns.
- *
+/*
+ * Check that a given rel is safe to truncate. Subroutine for ExecuteTruncate
+ */
+static void
+truncate_check_rel(Relation rel)
+{
+ /* Only allow truncate on regular tables */
+ if (rel->rd_rel->relkind != RELKIND_RELATION)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is not a table",
+ RelationGetRelationName(rel))));
+
+ /* Permissions checks */
+ if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
+ RelationGetRelationName(rel));
+
+ if (!allowSystemTableMods && IsSystemRelation(rel))
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("permission denied: \"%s\" is a system catalog",
+ RelationGetRelationName(rel))));
+
+ /*
+ * We can never allow truncation of shared or nailed-in-cache relations,
+ * because we can't support changing their relfilenode values.
+ */
+ if (rel->rd_rel->relisshared || rel->rd_isnailed)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot truncate system relation \"%s\"",
+ RelationGetRelationName(rel))));
+
+ /*
+ * Don't allow truncate on temp tables of other backends ... their local
+ * buffer manager is not going to cope.
+ */
+ if (isOtherTempNamespace(RelationGetNamespace(rel)))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot truncate temporary tables of other sessions")));
+
+ /*
+ * Also check for active uses of the relation in the current transaction,
+ * including open scans and pending AFTER trigger events.
+ */
+ CheckTableNotInUse(rel, "TRUNCATE");
+}
+
+/*----------
+ * MergeAttributes
+ * Returns new schema given initial schema and superclasses.
+ *
+ * Input arguments:
+ * 'schema' is the column/attribute definition for the table. (It's a list
+ * of ColumnDef's.) It is destructively changed.
+ * 'supers' is a list of names (as RangeVar nodes) of parent relations.
+ * 'istemp' is TRUE if we are creating a temp relation.
+ *
+ * Output arguments:
+ * 'supOids' receives a list of the OIDs of the parent relations.
+ * 'supconstr' receives a list of constraints belonging to the parents,
+ * updated as necessary to be valid for the child.
+ * 'supOidCount' is set to the number of parents that have OID columns.
+ *
* Return value:
* Completed schema list.
*
if (strcmp(coldef->colname, restdef->colname) == 0)
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_COLUMN),
- errmsg("column \"%s\" duplicated",
+ errmsg("column \"%s\" specified more than once",
coldef->colname)));
}
}
if (list_member_oid(parentOids, RelationGetRelid(relation)))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_TABLE),
- errmsg("inherited relation \"%s\" duplicated",
- parent->relname)));
+ errmsg("relation \"%s\" would be inherited from more than once",
+ parent->relname)));
parentOids = lappend_oid(parentOids, RelationGetRelid(relation));
char *attributeName = NameStr(attribute->attname);
int exist_attno;
ColumnDef *def;
- TypeName *typename;
/*
* Ignore dropped columns in the parent.
exist_attno = findAttrByName(attributeName, inhSchema);
if (exist_attno > 0)
{
+ Oid defTypeId;
+ int32 deftypmod;
+
/*
* Yes, try to merge the two column definitions. They must
* have the same type and typmod.
(errmsg("merging multiple inherited definitions of column \"%s\"",
attributeName)));
def = (ColumnDef *) list_nth(inhSchema, exist_attno - 1);
- if (typenameTypeId(def->typename) != attribute->atttypid ||
- def->typename->typmod != attribute->atttypmod)
+ defTypeId = typenameTypeId(NULL, def->typename, &deftypmod);
+ if (defTypeId != attribute->atttypid ||
+ deftypmod != attribute->atttypmod)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("inherited column \"%s\" has a type conflict",
*/
def = makeNode(ColumnDef);
def->colname = pstrdup(attributeName);
- typename = makeNode(TypeName);
- typename->typeid = attribute->atttypid;
- typename->typmod = attribute->atttypmod;
- def->typename = typename;
+ def->typename = makeTypeNameFromOid(attribute->atttypid,
+ attribute->atttypmod);
def->inhcount = 1;
def->is_local = false;
def->is_not_null = attribute->attnotnull;
def->raw_default = NULL;
def->cooked_default = NULL;
def->constraints = NIL;
- def->support = NULL;
inhSchema = lappend(inhSchema, def);
newattno[parent_attno - 1] = ++child_attno;
}
}
/*
- * Now copy the constraints of this parent, adjusting attnos using the
- * completed newattno[] map
+ * Now copy the CHECK constraints of this parent, adjusting attnos
+ * using the completed newattno[] map. Identically named constraints
+ * are merged if possible, else we throw error.
*/
if (constr && constr->num_check > 0)
{
for (i = 0; i < constr->num_check; i++)
{
- Constraint *cdef = makeNode(Constraint);
+ char *name = check[i].ccname;
Node *expr;
- cdef->contype = CONSTR_CHECK;
- cdef->name = pstrdup(check[i].ccname);
- cdef->raw_expr = NULL;
/* adjust varattnos of ccbin here */
expr = stringToNode(check[i].ccbin);
change_varattnos_of_a_node(expr, newattno);
- cdef->cooked_expr = nodeToString(expr);
- constraints = lappend(constraints, cdef);
+
+ /* check for duplicate */
+ if (!MergeCheckConstraint(constraints, name, expr))
+ {
+ /* nope, this is a new one */
+ CookedConstraint *cooked;
+
+ cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint));
+ cooked->contype = CONSTR_CHECK;
+ cooked->name = pstrdup(name);
+ cooked->attnum = 0; /* not used for constraints */
+ cooked->expr = expr;
+ cooked->is_local = false;
+ cooked->inhcount = 1;
+ constraints = lappend(constraints, cooked);
+ }
}
}
if (exist_attno > 0)
{
ColumnDef *def;
+ Oid defTypeId,
+ newTypeId;
+ int32 deftypmod,
+ newtypmod;
/*
* Yes, try to merge the two column definitions. They must
(errmsg("merging column \"%s\" with inherited definition",
attributeName)));
def = (ColumnDef *) list_nth(inhSchema, exist_attno - 1);
- if (typenameTypeId(def->typename) != typenameTypeId(newdef->typename) ||
- def->typename->typmod != newdef->typename->typmod)
+ defTypeId = typenameTypeId(NULL, def->typename, &deftypmod);
+ newTypeId = typenameTypeId(NULL, newdef->typename, &newtypmod);
+ if (defTypeId != newTypeId || deftypmod != newtypmod)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("column \"%s\" has a type conflict",
return schema;
}
+
/*
- * complementary static functions for MergeAttributes().
+ * MergeCheckConstraint
+ * Try to merge an inherited CHECK constraint with previous ones
*
- * Varattnos of pg_constraint.conbin must be rewritten when subclasses inherit
- * constraints from parent classes, since the inherited attributes could
- * be given different column numbers in multiple-inheritance cases.
+ * If we inherit identically-named constraints from multiple parents, we must
+ * merge them, or throw an error if they don't have identical definitions.
+ *
+ * constraints is a list of CookedConstraint structs for previous constraints.
+ *
+ * Returns TRUE if merged (constraint is a duplicate), or FALSE if it's
+ * got a so-far-unique name, or throws error if conflict.
+ */
+static bool
+MergeCheckConstraint(List *constraints, char *name, Node *expr)
+{
+ ListCell *lc;
+
+ foreach(lc, constraints)
+ {
+ CookedConstraint *ccon = (CookedConstraint *) lfirst(lc);
+
+ Assert(ccon->contype == CONSTR_CHECK);
+
+ /* Non-matching names never conflict */
+ if (strcmp(ccon->name, name) != 0)
+ continue;
+
+ if (equal(expr, ccon->expr))
+ {
+ /* OK to merge */
+ ccon->inhcount++;
+ return true;
+ }
+
+ ereport(ERROR,
+ (errcode(ERRCODE_DUPLICATE_OBJECT),
+ errmsg("check constraint name \"%s\" appears multiple times but with different expressions",
+ name)));
+ }
+
+ return false;
+}
+
+
+/*
+ * Replace varattno values in an expression tree according to the given
+ * map array, that is, varattno N is replaced by newattno[N-1]. It is
+ * caller's responsibility to ensure that the array is long enough to
+ * define values for all user varattnos present in the tree. System column
+ * attnos remain unchanged.
*
- * Note that the passed node tree is modified in place!
+ * Note that the passed node tree is modified in-place!
*/
+void
+change_varattnos_of_a_node(Node *node, const AttrNumber *newattno)
+{
+ /* no setup needed, so away we go */
+ (void) change_varattnos_walker(node, newattno);
+}
+
static bool
change_varattnos_walker(Node *node, const AttrNumber *newattno)
{
* currently.
*/
Assert(newattno[var->varattno - 1] > 0);
- var->varattno = newattno[var->varattno - 1];
+ var->varattno = var->varoattno = newattno[var->varattno - 1];
}
return false;
}
(void *) newattno);
}
-static bool
-change_varattnos_of_a_node(Node *node, const AttrNumber *newattno)
+/*
+ * Generate a map for change_varattnos_of_a_node from old and new TupleDesc's,
+ * matching according to column name.
+ */
+AttrNumber *
+varattnos_map(TupleDesc old, TupleDesc new)
+{
+ AttrNumber *attmap;
+ int i,
+ j;
+
+ attmap = (AttrNumber *) palloc0(sizeof(AttrNumber) * old->natts);
+ for (i = 1; i <= old->natts; i++)
+ {
+ if (old->attrs[i - 1]->attisdropped)
+ continue; /* leave the entry as zero */
+
+ for (j = 1; j <= new->natts; j++)
+ {
+ if (strcmp(NameStr(old->attrs[i - 1]->attname),
+ NameStr(new->attrs[j - 1]->attname)) == 0)
+ {
+ attmap[i - 1] = j;
+ break;
+ }
+ }
+ }
+ return attmap;
+}
+
+/*
+ * Generate a map for change_varattnos_of_a_node from a TupleDesc and a list
+ * of ColumnDefs
+ */
+AttrNumber *
+varattnos_map_schema(TupleDesc old, List *schema)
{
- return change_varattnos_walker(node, newattno);
+ AttrNumber *attmap;
+ int i;
+
+ attmap = (AttrNumber *) palloc0(sizeof(AttrNumber) * old->natts);
+ for (i = 1; i <= old->natts; i++)
+ {
+ if (old->attrs[i - 1]->attisdropped)
+ continue; /* leave the entry as zero */
+
+ attmap[i - 1] = findAttrByName(NameStr(old->attrs[i - 1]->attname),
+ schema);
+ }
+ return attmap;
}
+
/*
* StoreCatalogInheritance
* Updates the system catalogs with proper inheritance information.
StoreCatalogInheritance(Oid relationId, List *supers)
{
Relation relation;
- TupleDesc desc;
int16 seqNumber;
ListCell *entry;
- HeapTuple tuple;
/*
* sanity checks
* anymore, there's no need to look for indirect ancestors.)
*/
relation = heap_open(InheritsRelationId, RowExclusiveLock);
- desc = RelationGetDescr(relation);
seqNumber = 1;
foreach(entry, supers)
{
Oid parentOid = lfirst_oid(entry);
- Datum datum[Natts_pg_inherits];
- char nullarr[Natts_pg_inherits];
- ObjectAddress childobject,
- parentobject;
- datum[0] = ObjectIdGetDatum(relationId); /* inhrel */
- datum[1] = ObjectIdGetDatum(parentOid); /* inhparent */
- datum[2] = Int16GetDatum(seqNumber); /* inhseqno */
+ StoreCatalogInheritance1(relationId, parentOid, seqNumber, relation);
+ seqNumber++;
+ }
- nullarr[0] = ' ';
- nullarr[1] = ' ';
- nullarr[2] = ' ';
+ heap_close(relation, RowExclusiveLock);
+}
- tuple = heap_formtuple(desc, datum, nullarr);
+/*
+ * Make catalog entries showing relationId as being an inheritance child
+ * of parentOid. inhRelation is the already-opened pg_inherits catalog.
+ */
+static void
+StoreCatalogInheritance1(Oid relationId, Oid parentOid,
+ int16 seqNumber, Relation inhRelation)
+{
+ TupleDesc desc = RelationGetDescr(inhRelation);
+ Datum datum[Natts_pg_inherits];
+ char nullarr[Natts_pg_inherits];
+ ObjectAddress childobject,
+ parentobject;
+ HeapTuple tuple;
- simple_heap_insert(relation, tuple);
+ /*
+ * Make the pg_inherits entry
+ */
+ datum[0] = ObjectIdGetDatum(relationId); /* inhrelid */
+ datum[1] = ObjectIdGetDatum(parentOid); /* inhparent */
+ datum[2] = Int16GetDatum(seqNumber); /* inhseqno */
- CatalogUpdateIndexes(relation, tuple);
+ nullarr[0] = ' ';
+ nullarr[1] = ' ';
+ nullarr[2] = ' ';
- heap_freetuple(tuple);
+ tuple = heap_formtuple(desc, datum, nullarr);
- /*
- * Store a dependency too
- */
- parentobject.classId = RelationRelationId;
- parentobject.objectId = parentOid;
- parentobject.objectSubId = 0;
- childobject.classId = RelationRelationId;
- childobject.objectId = relationId;
- childobject.objectSubId = 0;
+ simple_heap_insert(inhRelation, tuple);
- recordDependencyOn(&childobject, &parentobject, DEPENDENCY_NORMAL);
+ CatalogUpdateIndexes(inhRelation, tuple);
- /*
- * Mark the parent as having subclasses.
- */
- setRelhassubclassInRelation(parentOid, true);
+ heap_freetuple(tuple);
- seqNumber += 1;
- }
+ /*
+ * Store a dependency too
+ */
+ parentobject.classId = RelationRelationId;
+ parentobject.objectId = parentOid;
+ parentobject.objectSubId = 0;
+ childobject.classId = RelationRelationId;
+ childobject.objectId = relationId;
+ childobject.objectSubId = 0;
- heap_close(relation, RowExclusiveLock);
+ recordDependencyOn(&childobject, &parentobject, DEPENDENCY_NORMAL);
+
+ /*
+ * Mark the parent as having subclasses.
+ */
+ setRelhassubclassInRelation(parentOid, true);
}
/*
heap_close(attrelation, RowExclusiveLock);
+ relation_close(targetrelation, NoLock); /* close rel but keep lock */
+}
+
+
+/*
+ * Execute ALTER TABLE/INDEX/SEQUENCE/VIEW RENAME
+ *
+ * Caller has already done permissions checks.
+ */
+void
+RenameRelation(Oid myrelid, const char *newrelname, ObjectType reltype)
+{
+ Relation targetrelation;
+ Oid namespaceId;
+ char relkind;
+
/*
- * Update att name in any RI triggers associated with the relation.
+ * Grab an exclusive lock on the target table, index, sequence or view,
+ * which we will NOT release until end of transaction.
*/
- if (targetrelation->rd_rel->reltriggers > 0)
- {
- /* update tgargs column reference where att is primary key */
- update_ri_trigger_args(RelationGetRelid(targetrelation),
- oldattname, newattname,
- false, false);
- /* update tgargs column reference where att is foreign key */
- update_ri_trigger_args(RelationGetRelid(targetrelation),
- oldattname, newattname,
- true, false);
- }
+ targetrelation = relation_open(myrelid, AccessExclusiveLock);
- relation_close(targetrelation, NoLock); /* close rel but keep lock */
+ namespaceId = RelationGetNamespace(targetrelation);
+ relkind = targetrelation->rd_rel->relkind;
+
+ /*
+ * For compatibility with prior releases, we don't complain if ALTER TABLE
+ * or ALTER INDEX is used to rename a sequence or view.
+ */
+ if (reltype == OBJECT_SEQUENCE && relkind != RELKIND_SEQUENCE)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is not a sequence",
+ RelationGetRelationName(targetrelation))));
+
+ if (reltype == OBJECT_VIEW && relkind != RELKIND_VIEW)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is not a view",
+ RelationGetRelationName(targetrelation))));
+
+ /*
+ * Don't allow ALTER TABLE on composite types.
+ * We want people to use ALTER TYPE for that.
+ */
+ if (relkind == RELKIND_COMPOSITE_TYPE)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is a composite type",
+ RelationGetRelationName(targetrelation)),
+ errhint("Use ALTER TYPE instead.")));
+
+ /* Do the work */
+ RenameRelationInternal(myrelid, newrelname, namespaceId);
+
+ /*
+ * Close rel, but keep exclusive lock!
+ */
+ relation_close(targetrelation, NoLock);
}
/*
- * renamerel - change the name of a relation
+ * RenameRelationInternal - change the name of a relation
*
* XXX - When renaming sequences, we don't bother to modify the
* sequence name that is stored within the sequence itself
* sequence, AFAIK there's no need for it to be there.
*/
void
-renamerel(Oid myrelid, const char *newrelname)
+RenameRelationInternal(Oid myrelid, const char *newrelname, Oid namespaceId)
{
Relation targetrelation;
Relation relrelation; /* for RELATION relation */
HeapTuple reltup;
- Oid namespaceId;
- char *oldrelname;
- char relkind;
- bool relhastriggers;
+ Form_pg_class relform;
/*
- * Grab an exclusive lock on the target table or index, which we will NOT
- * release until end of transaction.
+ * Grab an exclusive lock on the target table, index, sequence or
+ * view, which we will NOT release until end of transaction.
*/
targetrelation = relation_open(myrelid, AccessExclusiveLock);
- oldrelname = pstrdup(RelationGetRelationName(targetrelation));
- namespaceId = RelationGetNamespace(targetrelation);
-
- if (!allowSystemTableMods && IsSystemRelation(targetrelation))
- ereport(ERROR,
- (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
- errmsg("permission denied: \"%s\" is a system catalog",
- RelationGetRelationName(targetrelation))));
-
- relkind = targetrelation->rd_rel->relkind;
- relhastriggers = (targetrelation->rd_rel->reltriggers > 0);
-
/*
* Find relation's pg_class tuple, and make sure newrelname isn't in use.
*/
relrelation = heap_open(RelationRelationId, RowExclusiveLock);
reltup = SearchSysCacheCopy(RELOID,
- PointerGetDatum(myrelid),
+ ObjectIdGetDatum(myrelid),
0, 0, 0);
if (!HeapTupleIsValid(reltup)) /* shouldn't happen */
elog(ERROR, "cache lookup failed for relation %u", myrelid);
+ relform = (Form_pg_class) GETSTRUCT(reltup);
if (get_relname_relid(newrelname, namespaceId) != InvalidOid)
ereport(ERROR,
* Update pg_class tuple with new relname. (Scribbling on reltup is OK
* because it's a copy...)
*/
- namestrcpy(&(((Form_pg_class) GETSTRUCT(reltup))->relname), newrelname);
+ namestrcpy(&(relform->relname), newrelname);
simple_heap_update(relrelation, &reltup->t_self, reltup);
/*
* Also rename the associated type, if any.
*/
- if (relkind != RELKIND_INDEX)
- TypeRename(oldrelname, namespaceId, newrelname);
+ if (OidIsValid(targetrelation->rd_rel->reltype))
+ RenameTypeInternal(targetrelation->rd_rel->reltype,
+ newrelname, namespaceId);
/*
- * Update rel name in any RI triggers associated with the relation.
+ * Also rename the associated constraint, if any.
*/
- if (relhastriggers)
+ if (targetrelation->rd_rel->relkind == RELKIND_INDEX)
{
- /* update tgargs where relname is primary key */
- update_ri_trigger_args(myrelid,
- oldrelname,
- newrelname,
- false, true);
- /* update tgargs where relname is foreign key */
- update_ri_trigger_args(myrelid,
- oldrelname,
- newrelname,
- true, true);
+ Oid constraintId = get_index_constraint(myrelid);
+
+ if (OidIsValid(constraintId))
+ RenameConstraintById(constraintId, newrelname);
}
/*
}
/*
- * Scan pg_trigger for RI triggers that are on the specified relation
- * (if fk_scan is false) or have it as the tgconstrrel (if fk_scan
- * is true). Update RI trigger args fields matching oldname to contain
- * newname instead. If update_relname is true, examine the relname
- * fields; otherwise examine the attname fields.
+ * Disallow ALTER TABLE (and similar commands) when the current backend has
+ * any open reference to the target table besides the one just acquired by
+ * the calling command; this implies there's an open cursor or active plan.
+ * We need this check because our AccessExclusiveLock doesn't protect us
+ * against stomping on our own foot, only other people's feet!
+ *
+ * For ALTER TABLE, the only case known to cause serious trouble is ALTER
+ * COLUMN TYPE, and some changes are obviously pretty benign, so this could
+ * possibly be relaxed to only error out for certain types of alterations.
+ * But the use-case for allowing any of these things is not obvious, so we
+ * won't work hard at it for now.
+ *
+ * We also reject these commands if there are any pending AFTER trigger events
+ * for the rel. This is certainly necessary for the rewriting variants of
+ * ALTER TABLE, because they don't preserve tuple TIDs and so the pending
+ * events would try to fetch the wrong tuples. It might be overly cautious
+ * in other cases, but again it seems better to err on the side of paranoia.
+ *
+ * REINDEX calls this with "rel" referencing the index to be rebuilt; here
+ * we are worried about active indexscans on the index. The trigger-event
+ * check can be skipped, since we are doing no damage to the parent table.
+ *
+ * The statement name (eg, "ALTER TABLE") is passed for use in error messages.
*/
-static void
-update_ri_trigger_args(Oid relid,
- const char *oldname,
- const char *newname,
- bool fk_scan,
- bool update_relname)
+void
+CheckTableNotInUse(Relation rel, const char *stmt)
{
- Relation tgrel;
- ScanKeyData skey[1];
- SysScanDesc trigscan;
- HeapTuple tuple;
- Datum values[Natts_pg_trigger];
- char nulls[Natts_pg_trigger];
- char replaces[Natts_pg_trigger];
-
- tgrel = heap_open(TriggerRelationId, RowExclusiveLock);
- if (fk_scan)
- {
- ScanKeyInit(&skey[0],
- Anum_pg_trigger_tgconstrrelid,
- BTEqualStrategyNumber, F_OIDEQ,
- ObjectIdGetDatum(relid));
- trigscan = systable_beginscan(tgrel, TriggerConstrRelidIndexId,
- true, SnapshotNow,
- 1, skey);
- }
- else
- {
- ScanKeyInit(&skey[0],
- Anum_pg_trigger_tgrelid,
- BTEqualStrategyNumber, F_OIDEQ,
- ObjectIdGetDatum(relid));
- trigscan = systable_beginscan(tgrel, TriggerRelidNameIndexId,
- true, SnapshotNow,
- 1, skey);
- }
+ int expected_refcnt;
- while ((tuple = systable_getnext(trigscan)) != NULL)
- {
- Form_pg_trigger pg_trigger = (Form_pg_trigger) GETSTRUCT(tuple);
- bytea *val;
- bytea *newtgargs;
- bool isnull;
- int tg_type;
- bool examine_pk;
- bool changed;
- int tgnargs;
- int i;
- int newlen;
- const char *arga[RI_MAX_ARGUMENTS];
- const char *argp;
+ expected_refcnt = rel->rd_isnailed ? 2 : 1;
+ if (rel->rd_refcnt != expected_refcnt)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_IN_USE),
+ /* translator: first %s is a SQL command, eg ALTER TABLE */
+ errmsg("cannot %s \"%s\" because "
+ "it is being used by active queries in this session",
+ stmt, RelationGetRelationName(rel))));
+
+ if (rel->rd_rel->relkind != RELKIND_INDEX &&
+ AfterTriggerPendingOnRel(RelationGetRelid(rel)))
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_IN_USE),
+ /* translator: first %s is a SQL command, eg ALTER TABLE */
+ errmsg("cannot %s \"%s\" because "
+ "it has pending trigger events",
+ stmt, RelationGetRelationName(rel))));
+}
- tg_type = RI_FKey_trigger_type(pg_trigger->tgfoid);
- if (tg_type == RI_TRIGGER_NONE)
- {
- /* Not an RI trigger, forget it */
- continue;
- }
+/*
+ * AlterTable
+ * Execute ALTER TABLE, which can be a list of subcommands
+ *
+ * ALTER TABLE is performed in three phases:
+ * 1. Examine subcommands and perform pre-transformation checking.
+ * 2. Update system catalogs.
+ * 3. Scan table(s) to check new constraints, and optionally recopy
+ * the data into new table(s).
+ * Phase 3 is not performed unless one or more of the subcommands requires
+ * it. The intention of this design is to allow multiple independent
+ * updates of the table schema to be performed with only one pass over the
+ * data.
+ *
+ * ATPrepCmd performs phase 1. A "work queue" entry is created for
+ * each table to be affected (there may be multiple affected tables if the
+ * commands traverse a table inheritance hierarchy). Also we do preliminary
+ * validation of the subcommands, including parse transformation of those
+ * expressions that need to be evaluated with respect to the old table
+ * schema.
+ *
+ * ATRewriteCatalogs performs phase 2 for each affected table. (Note that
+ * phases 2 and 3 normally do no explicit recursion, since phase 1 already
+ * did it --- although some subcommands have to recurse in phase 2 instead.)
+ * Certain subcommands need to be performed before others to avoid
+ * unnecessary conflicts; for example, DROP COLUMN should come before
+ * ADD COLUMN. Therefore phase 1 divides the subcommands into multiple
+ * lists, one for each logical "pass" of phase 2.
+ *
+ * ATRewriteTables performs phase 3 for those tables that need it.
+ *
+ * Thanks to the magic of MVCC, an error anywhere along the way rolls back
+ * the whole operation; we don't have to do anything special to clean up.
+ */
+void
+AlterTable(AlterTableStmt *stmt)
+{
+ Relation rel = relation_openrv(stmt->relation, AccessExclusiveLock);
- /*
- * It is an RI trigger, so parse the tgargs bytea.
- *
- * NB: we assume the field will never be compressed or moved out of
- * line; so does trigger.c ...
- */
- tgnargs = pg_trigger->tgnargs;
- val = (bytea *)
- DatumGetPointer(fastgetattr(tuple,
- Anum_pg_trigger_tgargs,
- tgrel->rd_att, &isnull));
- if (isnull || tgnargs < RI_FIRST_ATTNAME_ARGNO ||
- tgnargs > RI_MAX_ARGUMENTS)
- {
- /* This probably shouldn't happen, but ignore busted triggers */
- continue;
- }
- argp = (const char *) VARDATA(val);
- for (i = 0; i < tgnargs; i++)
- {
- arga[i] = argp;
- argp += strlen(argp) + 1;
- }
+ CheckTableNotInUse(rel, "ALTER TABLE");
- /*
- * Figure out which item(s) to look at. If the trigger is primary-key
- * type and attached to my rel, I should look at the PK fields; if it
- * is foreign-key type and attached to my rel, I should look at the FK
- * fields. But the opposite rule holds when examining triggers found
- * by tgconstrrel search.
- */
- examine_pk = (tg_type == RI_TRIGGER_PK) == (!fk_scan);
-
- changed = false;
- if (update_relname)
- {
- /* Change the relname if needed */
- i = examine_pk ? RI_PK_RELNAME_ARGNO : RI_FK_RELNAME_ARGNO;
- if (strcmp(arga[i], oldname) == 0)
- {
- arga[i] = newname;
- changed = true;
- }
- }
- else
- {
- /* Change attname(s) if needed */
- i = examine_pk ? RI_FIRST_ATTNAME_ARGNO + RI_KEYPAIR_PK_IDX :
- RI_FIRST_ATTNAME_ARGNO + RI_KEYPAIR_FK_IDX;
- for (; i < tgnargs; i += 2)
- {
- if (strcmp(arga[i], oldname) == 0)
- {
- arga[i] = newname;
- changed = true;
- }
- }
- }
-
- if (!changed)
- {
- /* Don't need to update this tuple */
- continue;
- }
-
- /*
- * Construct modified tgargs bytea.
- */
- newlen = VARHDRSZ;
- for (i = 0; i < tgnargs; i++)
- newlen += strlen(arga[i]) + 1;
- newtgargs = (bytea *) palloc(newlen);
- VARATT_SIZEP(newtgargs) = newlen;
- newlen = VARHDRSZ;
- for (i = 0; i < tgnargs; i++)
- {
- strcpy(((char *) newtgargs) + newlen, arga[i]);
- newlen += strlen(arga[i]) + 1;
- }
-
- /*
- * Build modified tuple.
- */
- for (i = 0; i < Natts_pg_trigger; i++)
- {
- values[i] = (Datum) 0;
- replaces[i] = ' ';
- nulls[i] = ' ';
- }
- values[Anum_pg_trigger_tgargs - 1] = PointerGetDatum(newtgargs);
- replaces[Anum_pg_trigger_tgargs - 1] = 'r';
-
- tuple = heap_modifytuple(tuple, RelationGetDescr(tgrel), values, nulls, replaces);
-
- /*
- * Update pg_trigger and its indexes
- */
- simple_heap_update(tgrel, &tuple->t_self, tuple);
-
- CatalogUpdateIndexes(tgrel, tuple);
-
- /*
- * Invalidate trigger's relation's relcache entry so that other
- * backends (and this one too!) are sent SI message to make them
- * rebuild relcache entries. (Ideally this should happen
- * automatically...)
- *
- * We can skip this for triggers on relid itself, since that relcache
- * flush will happen anyway due to the table or column rename. We
- * just need to catch the far ends of RI relationships.
- */
- pg_trigger = (Form_pg_trigger) GETSTRUCT(tuple);
- if (pg_trigger->tgrelid != relid)
- CacheInvalidateRelcacheByRelid(pg_trigger->tgrelid);
-
- /* free up our scratch memory */
- pfree(newtgargs);
- heap_freetuple(tuple);
- }
-
- systable_endscan(trigscan);
-
- heap_close(tgrel, RowExclusiveLock);
-
- /*
- * Increment cmd counter to make updates visible; this is needed in case
- * the same tuple has to be updated again by next pass (can happen in case
- * of a self-referential FK relationship).
- */
- CommandCounterIncrement();
-}
-
-/*
- * AlterTable
- * Execute ALTER TABLE, which can be a list of subcommands
- *
- * ALTER TABLE is performed in three phases:
- * 1. Examine subcommands and perform pre-transformation checking.
- * 2. Update system catalogs.
- * 3. Scan table(s) to check new constraints, and optionally recopy
- * the data into new table(s).
- * Phase 3 is not performed unless one or more of the subcommands requires
- * it. The intention of this design is to allow multiple independent
- * updates of the table schema to be performed with only one pass over the
- * data.
- *
- * ATPrepCmd performs phase 1. A "work queue" entry is created for
- * each table to be affected (there may be multiple affected tables if the
- * commands traverse a table inheritance hierarchy). Also we do preliminary
- * validation of the subcommands, including parse transformation of those
- * expressions that need to be evaluated with respect to the old table
- * schema.
- *
- * ATRewriteCatalogs performs phase 2 for each affected table (note that
- * phases 2 and 3 do no explicit recursion, since phase 1 already did it).
- * Certain subcommands need to be performed before others to avoid
- * unnecessary conflicts; for example, DROP COLUMN should come before
- * ADD COLUMN. Therefore phase 1 divides the subcommands into multiple
- * lists, one for each logical "pass" of phase 2.
- *
- * ATRewriteTables performs phase 3 for those tables that need it.
- *
- * Thanks to the magic of MVCC, an error anywhere along the way rolls back
- * the whole operation; we don't have to do anything special to clean up.
- */
-void
-AlterTable(AlterTableStmt *stmt)
-{
- ATController(relation_openrv(stmt->relation, AccessExclusiveLock),
- stmt->cmds,
- interpretInhOption(stmt->relation->inhOpt));
-}
+ ATController(rel, stmt->cmds, interpretInhOption(stmt->relation->inhOpt));
+}
/*
* AlterTableInternal
*
* ALTER TABLE with target specified by OID
+ *
+ * We do not reject if the relation is already open, because it's quite
+ * likely that one or more layers of caller have it open. That means it
+ * is unsafe to use this entry point for alterations that could break
+ * existing query plans. On the assumption it's not used for such, we
+ * don't have to reject pending AFTER triggers, either.
*/
void
AlterTableInternal(Oid relid, List *cmds, bool recurse)
{
- ATController(relation_open(relid, AccessExclusiveLock),
- cmds,
- recurse);
+ Relation rel = relation_open(relid, AccessExclusiveLock);
+
+ ATController(rel, cmds, recurse);
}
static void
ATSimplePermissions(rel, true);
ATSimpleRecursion(wqueue, rel, cmd, recurse);
/* No command-specific prep needed */
- pass = AT_PASS_ADD_CONSTR;
+ pass = cmd->def ? AT_PASS_ADD_CONSTR : AT_PASS_DROP;
break;
case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */
ATSimplePermissions(rel, false);
break;
case AT_AddConstraint: /* ADD CONSTRAINT */
ATSimplePermissions(rel, false);
-
- /*
- * Currently we recurse only for CHECK constraints, never for
- * foreign-key constraints. UNIQUE/PKEY constraints won't be seen
- * here.
- */
- if (IsA(cmd->def, Constraint))
- ATSimpleRecursion(wqueue, rel, cmd, recurse);
- /* No command-specific prep needed */
+ /* Recursion occurs during execution phase */
+ /* No command-specific prep needed except saving recurse flag */
+ if (recurse)
+ cmd->subtype = AT_AddConstraintRecurse;
pass = AT_PASS_ADD_CONSTR;
break;
case AT_DropConstraint: /* DROP CONSTRAINT */
ATSimplePermissions(rel, false);
- /* Performs own recursion */
- ATPrepDropConstraint(wqueue, rel, recurse, cmd);
- pass = AT_PASS_DROP;
- break;
- case AT_DropConstraintQuietly: /* DROP CONSTRAINT for child */
- ATSimplePermissions(rel, false);
- ATSimpleRecursion(wqueue, rel, cmd, recurse);
- /* No command-specific prep needed */
+ /* Recursion occurs during execution phase */
+ /* No command-specific prep needed except saving recurse flag */
+ if (recurse)
+ cmd->subtype = AT_DropConstraintRecurse;
pass = AT_PASS_DROP;
break;
case AT_AlterColumnType: /* ALTER COLUMN TYPE */
ATPrepAlterColumnType(wqueue, tab, rel, recurse, recursing, cmd);
pass = AT_PASS_ALTER_TYPE;
break;
- case AT_ToastTable: /* CREATE TOAST TABLE */
- ATSimplePermissions(rel, false);
- /* This command never recurses */
- /* No command-specific prep needed */
- pass = AT_PASS_MISC;
- break;
case AT_ChangeOwner: /* ALTER OWNER */
/* This command never recurses */
/* No command-specific prep needed */
pass = AT_PASS_DROP;
break;
case AT_SetTableSpace: /* SET TABLESPACE */
+ ATSimplePermissionsRelationOrIndex(rel);
/* This command never recurses */
ATPrepSetTableSpace(tab, rel, cmd->name);
pass = AT_PASS_MISC; /* doesn't actually matter */
break;
+ case AT_SetRelOptions: /* SET (...) */
+ case AT_ResetRelOptions: /* RESET (...) */
+ ATSimplePermissionsRelationOrIndex(rel);
+ /* This command never recurses */
+ /* No command-specific prep needed */
+ pass = AT_PASS_MISC;
+ break;
case AT_EnableTrig: /* ENABLE TRIGGER variants */
+ case AT_EnableAlwaysTrig:
+ case AT_EnableReplicaTrig:
case AT_EnableTrigAll:
case AT_EnableTrigUser:
case AT_DisableTrig: /* DISABLE TRIGGER variants */
case AT_DisableTrigAll:
case AT_DisableTrigUser:
+ case AT_EnableRule: /* ENABLE/DISABLE RULE variants */
+ case AT_EnableAlwaysRule:
+ case AT_EnableReplicaRule:
+ case AT_DisableRule:
+ case AT_AddInherit: /* INHERIT / NO INHERIT */
+ case AT_DropInherit:
ATSimplePermissions(rel, false);
/* These commands never recurse */
/* No command-specific prep needed */
rel = relation_open(tab->relid, NoLock);
foreach(lcmd, subcmds)
- ATExecCmd(tab, rel, (AlterTableCmd *) lfirst(lcmd));
+ ATExecCmd(wqueue, tab, rel, (AlterTableCmd *) lfirst(lcmd));
/*
* After the ALTER TYPE pass, do cleanup work (this is not done in
}
/*
- * Do an implicit CREATE TOAST TABLE if we executed any subcommands that
- * might have added a column or changed column storage.
+ * Check to see if a toast table must be added, if we executed any
+ * subcommands that might have added a column or changed column storage.
*/
foreach(ltab, *wqueue)
{
(tab->subcmds[AT_PASS_ADD_COL] ||
tab->subcmds[AT_PASS_ALTER_TYPE] ||
tab->subcmds[AT_PASS_COL_ATTRS]))
- AlterTableCreateToastTable(tab->relid, true);
+ AlterTableCreateToastTable(tab->relid);
}
}
* ATExecCmd: dispatch a subcommand to appropriate execution routine
*/
static void
-ATExecCmd(AlteredTableInfo *tab, Relation rel, AlterTableCmd *cmd)
+ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
+ AlterTableCmd *cmd)
{
switch (cmd->subtype)
{
ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, true);
break;
case AT_AddConstraint: /* ADD CONSTRAINT */
- ATExecAddConstraint(tab, rel, cmd->def);
+ ATExecAddConstraint(wqueue, tab, rel, cmd->def, false);
+ break;
+ case AT_AddConstraintRecurse: /* ADD CONSTRAINT with recursion */
+ ATExecAddConstraint(wqueue, tab, rel, cmd->def, true);
break;
case AT_DropConstraint: /* DROP CONSTRAINT */
- ATExecDropConstraint(rel, cmd->name, cmd->behavior, false);
+ ATExecDropConstraint(rel, cmd->name, cmd->behavior, false, false);
break;
- case AT_DropConstraintQuietly: /* DROP CONSTRAINT for child */
- ATExecDropConstraint(rel, cmd->name, cmd->behavior, true);
+ case AT_DropConstraintRecurse: /* DROP CONSTRAINT with recursion */
+ ATExecDropConstraint(rel, cmd->name, cmd->behavior, true, false);
break;
case AT_AlterColumnType: /* ALTER COLUMN TYPE */
ATExecAlterColumnType(tab, rel, cmd->name, (TypeName *) cmd->def);
break;
- case AT_ToastTable: /* CREATE TOAST TABLE */
- AlterTableCreateToastTable(RelationGetRelid(rel), false);
- break;
case AT_ChangeOwner: /* ALTER OWNER */
ATExecChangeOwner(RelationGetRelid(rel),
get_roleid_checked(cmd->name),
* Nothing to do here; Phase 3 does the work
*/
break;
+ case AT_SetRelOptions: /* SET (...) */
+ ATExecSetRelOptions(rel, (List *) cmd->def, false);
+ break;
+ case AT_ResetRelOptions: /* RESET (...) */
+ ATExecSetRelOptions(rel, (List *) cmd->def, true);
+ break;
+
case AT_EnableTrig: /* ENABLE TRIGGER name */
- ATExecEnableDisableTrigger(rel, cmd->name, true, false);
+ ATExecEnableDisableTrigger(rel, cmd->name,
+ TRIGGER_FIRES_ON_ORIGIN, false);
+ break;
+ case AT_EnableAlwaysTrig: /* ENABLE ALWAYS TRIGGER name */
+ ATExecEnableDisableTrigger(rel, cmd->name,
+ TRIGGER_FIRES_ALWAYS, false);
+ break;
+ case AT_EnableReplicaTrig: /* ENABLE REPLICA TRIGGER name */
+ ATExecEnableDisableTrigger(rel, cmd->name,
+ TRIGGER_FIRES_ON_REPLICA, false);
break;
case AT_DisableTrig: /* DISABLE TRIGGER name */
- ATExecEnableDisableTrigger(rel, cmd->name, false, false);
+ ATExecEnableDisableTrigger(rel, cmd->name,
+ TRIGGER_DISABLED, false);
break;
case AT_EnableTrigAll: /* ENABLE TRIGGER ALL */
- ATExecEnableDisableTrigger(rel, NULL, true, false);
+ ATExecEnableDisableTrigger(rel, NULL,
+ TRIGGER_FIRES_ON_ORIGIN, false);
break;
case AT_DisableTrigAll: /* DISABLE TRIGGER ALL */
- ATExecEnableDisableTrigger(rel, NULL, false, false);
+ ATExecEnableDisableTrigger(rel, NULL,
+ TRIGGER_DISABLED, false);
break;
case AT_EnableTrigUser: /* ENABLE TRIGGER USER */
- ATExecEnableDisableTrigger(rel, NULL, true, true);
+ ATExecEnableDisableTrigger(rel, NULL,
+ TRIGGER_FIRES_ON_ORIGIN, true);
break;
case AT_DisableTrigUser: /* DISABLE TRIGGER USER */
- ATExecEnableDisableTrigger(rel, NULL, false, true);
+ ATExecEnableDisableTrigger(rel, NULL,
+ TRIGGER_DISABLED, true);
+ break;
+
+ case AT_EnableRule: /* ENABLE RULE name */
+ ATExecEnableDisableRule(rel, cmd->name,
+ RULE_FIRES_ON_ORIGIN);
+ break;
+ case AT_EnableAlwaysRule: /* ENABLE ALWAYS RULE name */
+ ATExecEnableDisableRule(rel, cmd->name,
+ RULE_FIRES_ALWAYS);
+ break;
+ case AT_EnableReplicaRule: /* ENABLE REPLICA RULE name */
+ ATExecEnableDisableRule(rel, cmd->name,
+ RULE_FIRES_ON_REPLICA);
+ break;
+ case AT_DisableRule: /* DISABLE RULE name */
+ ATExecEnableDisableRule(rel, cmd->name,
+ RULE_DISABLED);
+ break;
+
+ case AT_AddInherit:
+ ATExecAddInherit(rel, (RangeVar *) cmd->def);
+ break;
+ case AT_DropInherit:
+ ATExecDropInherit(rel, (RangeVar *) cmd->def);
break;
default: /* oops */
elog(ERROR, "unrecognized alter table type: %d",
*/
ATRewriteTable(tab, OIDNewHeap);
- /* Swap the physical files of the old and new heaps. */
- swap_relation_files(tab->relid, OIDNewHeap);
+ /*
+ * Swap the physical files of the old and new heaps. Since we are
+ * generating a new heap, we can use RecentXmin for the table's
+ * new relfrozenxid because we rewrote all the tuples on
+ * ATRewriteTable, so no older Xid remains on the table.
+ */
+ swap_relation_files(tab->relid, OIDNewHeap, RecentXmin);
CommandCounterIncrement();
* Test the current data within the table against new constraints
* generated by ALTER TABLE commands, but don't rebuild data.
*/
- if (tab->constraints != NIL)
+ if (tab->constraints != NIL || tab->new_notnull)
ATRewriteTable(tab, InvalidOid);
/*
refrel = heap_open(con->refrelid, RowShareLock);
- validateForeignKeyConstraint(fkconstraint, rel, refrel);
+ validateForeignKeyConstraint(fkconstraint, rel, refrel,
+ con->conid);
heap_close(refrel, NoLock);
}
TupleDesc oldTupDesc;
TupleDesc newTupDesc;
bool needscan = false;
+ List *notnull_attrs;
int i;
ListCell *l;
EState *estate;
*/
if (newrel)
find_composite_type_dependencies(oldrel->rd_rel->reltype,
- RelationGetRelationName(oldrel));
+ RelationGetRelationName(oldrel),
+ NULL);
/*
* Generate the constraint and default execution states
case CONSTR_FOREIGN:
/* Nothing to do here */
break;
- case CONSTR_NOTNULL:
- needscan = true;
- break;
default:
elog(ERROR, "unrecognized constraint type: %d",
(int) con->contype);
ex->exprstate = ExecPrepareExpr((Expr *) ex->expr, estate);
}
+ notnull_attrs = NIL;
+ if (newrel || tab->new_notnull)
+ {
+ /*
+ * If we are rebuilding the tuples OR if we added any new NOT NULL
+ * constraints, check all not-null constraints. This is a bit of
+ * overkill but it minimizes risk of bugs, and heap_attisnull is a
+ * pretty cheap test anyway.
+ */
+ for (i = 0; i < newTupDesc->natts; i++)
+ {
+ if (newTupDesc->attrs[i]->attnotnull &&
+ !newTupDesc->attrs[i]->attisdropped)
+ notnull_attrs = lappend_int(notnull_attrs, i);
+ }
+ if (notnull_attrs)
+ needscan = true;
+ }
+
if (needscan)
{
ExprContext *econtext;
ExecStoreTuple(tuple, newslot, InvalidBuffer, false);
econtext->ecxt_scantuple = newslot;
+ foreach(l, notnull_attrs)
+ {
+ int attn = lfirst_int(l);
+
+ if (heap_attisnull(tuple, attn + 1))
+ ereport(ERROR,
+ (errcode(ERRCODE_NOT_NULL_VIOLATION),
+ errmsg("column \"%s\" contains null values",
+ NameStr(newTupDesc->attrs[attn]->attname))));
+ }
+
foreach(l, tab->constraints)
{
NewConstraint *con = lfirst(l);
errmsg("check constraint \"%s\" is violated by some row",
con->name)));
break;
- case CONSTR_NOTNULL:
- {
- Datum d;
- bool isnull;
-
- d = heap_getattr(tuple, con->attnum, newTupDesc,
- &isnull);
- if (isnull)
- ereport(ERROR,
- (errcode(ERRCODE_NOT_NULL_VIOLATION),
- errmsg("column \"%s\" contains null values",
- get_attname(tab->relid,
- con->attnum))));
- }
- break;
case CONSTR_FOREIGN:
/* Nothing to do here */
break;
MemoryContextSwitchTo(oldCxt);
heap_endscan(scan);
+
+ ExecDropSingleTupleTableSlot(oldslot);
+ ExecDropSingleTupleTableSlot(newslot);
}
FreeExecutorState(estate);
}
/*
+ * ATSimplePermissionsRelationOrIndex
+ *
+ * - Ensure that it is a relation or an index
+ * - Ensure this user is the owner
+ * - Ensure that it is not a system table
+ */
+static void
+ATSimplePermissionsRelationOrIndex(Relation rel)
+{
+ if (rel->rd_rel->relkind != RELKIND_RELATION &&
+ rel->rd_rel->relkind != RELKIND_INDEX)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is not a table or index",
+ RelationGetRelationName(rel))));
+
+ /* Permissions checks */
+ if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
+ RelationGetRelationName(rel));
+
+ if (!allowSystemTableMods && IsSystemRelation(rel))
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("permission denied: \"%s\" is a system catalog",
+ RelationGetRelationName(rel))));
+}
+
+/*
* ATSimpleRecursion
*
* Simple table recursion sufficient for most ALTER TABLE operations.
if (childrelid == relid)
continue;
childrel = relation_open(childrelid, AccessExclusiveLock);
+ CheckTableNotInUse(childrel, "ALTER TABLE");
ATPrepCmd(wqueue, childrel, cmd, false, true);
relation_close(childrel, NoLock);
}
Relation childrel;
childrel = relation_open(childrelid, AccessExclusiveLock);
+ CheckTableNotInUse(childrel, "ALTER TABLE");
ATPrepCmd(wqueue, childrel, cmd, true, true);
relation_close(childrel, NoLock);
}
/*
* find_composite_type_dependencies
*
- * Check to see if a table's rowtype is being used as a column in some
+ * Check to see if a composite type is being used as a column in some
* other table (possibly nested several levels deep in composite types!).
* Eventually, we'd like to propagate the check or rewrite operation
* into other such tables, but for now, just error out if we find any.
*
+ * Caller should provide either a table name or a type name (not both) to
+ * report in the error message, if any.
+ *
* We assume that functions and views depending on the type are not reasons
* to reject the ALTER. (How safe is this really?)
*/
-static void
-find_composite_type_dependencies(Oid typeOid, const char *origTblName)
+void
+find_composite_type_dependencies(Oid typeOid,
+ const char *origTblName,
+ const char *origTypeName)
{
Relation depRel;
ScanKeyData key[2];
SysScanDesc depScan;
HeapTuple depTup;
+ Oid arrayOid;
/*
* We scan pg_depend to find those things that depend on the rowtype. (We
if (rel->rd_rel->relkind == RELKIND_RELATION)
{
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("cannot alter table \"%s\" because column \"%s\".\"%s\" uses its rowtype",
- origTblName,
- RelationGetRelationName(rel),
- NameStr(att->attname))));
+ if (origTblName)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot alter table \"%s\" because column \"%s\".\"%s\" uses its rowtype",
+ origTblName,
+ RelationGetRelationName(rel),
+ NameStr(att->attname))));
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot alter type \"%s\" because column \"%s\".\"%s\" uses it",
+ origTypeName,
+ RelationGetRelationName(rel),
+ NameStr(att->attname))));
}
else if (OidIsValid(rel->rd_rel->reltype))
{
* recursively check for indirect dependencies via its rowtype.
*/
find_composite_type_dependencies(rel->rd_rel->reltype,
- origTblName);
+ origTblName, origTypeName);
}
relation_close(rel, AccessShareLock);
systable_endscan(depScan);
relation_close(depRel, AccessShareLock);
+
+ /*
+ * If there's an array type for the rowtype, must check for uses of it,
+ * too.
+ */
+ arrayOid = get_array_type(typeOid);
+ if (OidIsValid(arrayOid))
+ find_composite_type_dependencies(arrayOid, origTblName, origTypeName);
}
*
* Adds an additional attribute to a relation making the assumption that
* CHECK, NOT NULL, and FOREIGN KEY constraints will be removed from the
- * AT_AddColumn AlterTableCmd by analyze.c and added as independent
+ * AT_AddColumn AlterTableCmd by parse_utilcmd.c and added as independent
* AlterTableCmd's.
*/
static void
/* Child should see column as singly inherited */
colDefChild->inhcount = 1;
colDefChild->is_local = false;
- /* and don't make a support dependency on the child */
- colDefChild->support = NULL;
ATOneLevelRecursion(wqueue, rel, childCmd);
}
maxatts;
HeapTuple typeTuple;
Oid typeOid;
+ int32 typmod;
Form_pg_type tform;
Expr *defval;
if (HeapTupleIsValid(tuple))
{
Form_pg_attribute childatt = (Form_pg_attribute) GETSTRUCT(tuple);
+ Oid ctypeId;
+ int32 ctypmod;
/* Okay if child matches by type */
- if (typenameTypeId(colDef->typename) != childatt->atttypid ||
- colDef->typename->typmod != childatt->atttypmod)
+ ctypeId = typenameTypeId(NULL, colDef->typename, &ctypmod);
+ if (ctypeId != childatt->atttypid ||
+ ctypmod != childatt->atttypmod)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("child table \"%s\" has different type for column \"%s\"",
MaxHeapAttributeNumber)));
i = minattnum + 1;
- typeTuple = typenameType(colDef->typename);
+ typeTuple = typenameType(NULL, colDef->typename, &typmod);
tform = (Form_pg_type) GETSTRUCT(typeTuple);
typeOid = HeapTupleGetOid(typeTuple);
attribute->attstattarget = -1;
attribute->attlen = tform->typlen;
attribute->attcacheoff = -1;
- attribute->atttypmod = colDef->typename->typmod;
+ attribute->atttypmod = typmod;
attribute->attnum = i;
attribute->attbyval = tform->typbyval;
attribute->attndims = list_length(colDef->typename->arrayBounds);
* This function is intended for CREATE TABLE, so it processes a
* _list_ of defaults, but we just do one.
*/
- AddRelationRawConstraints(rel, list_make1(rawEnt), NIL);
+ AddRelationNewConstraints(rel, list_make1(rawEnt), NIL, false, true);
/* Make the additional catalog changes visible */
CommandCounterIncrement();
* the constraints more directly.)
*
* Note: we use build_column_default, and not just the cooked default
- * returned by AddRelationRawConstraints, so that the right thing happens
+ * returned by AddRelationNewConstraints, so that the right thing happens
* when a datatype's default applies.
*/
defval = (Expr *) build_column_default(rel, attribute->attnum);
if (!defval && GetDomainConstraints(typeOid) != NIL)
{
- Oid basetype = getBaseType(typeOid);
+ Oid baseTypeId;
+ int32 baseTypeMod;
- defval = (Expr *) makeNullConst(basetype);
+ baseTypeMod = typmod;
+ baseTypeId = getBaseTypeAndTypmod(typeOid, &baseTypeMod);
+ defval = (Expr *) makeNullConst(baseTypeId, baseTypeMod);
defval = (Expr *) coerce_to_target_type(NULL,
(Node *) defval,
- basetype,
+ baseTypeId,
typeOid,
- colDef->typename->typmod,
+ typmod,
COERCION_ASSIGNMENT,
COERCE_IMPLICIT_CAST);
if (defval == NULL) /* should not happen */
}
/*
+ * If the new column is NOT NULL, tell Phase 3 it needs to test that.
+ */
+ tab->new_notnull |= colDef->is_not_null;
+
+ /*
* Add needed dependency entries for the new column.
*/
add_column_datatype_dependency(myrelid, i, attribute->atttypid);
- if (colDef->support != NULL)
- add_column_support_dependency(myrelid, i, colDef->support);
}
/*
}
/*
- * Install a dependency for a column's supporting relation (serial sequence).
- */
-static void
-add_column_support_dependency(Oid relid, int32 attnum, RangeVar *support)
-{
- ObjectAddress colobject,
- suppobject;
-
- colobject.classId = RelationRelationId;
- colobject.objectId = relid;
- colobject.objectSubId = attnum;
- suppobject.classId = RelationRelationId;
- suppobject.objectId = RangeVarGetRelid(support, false);
- suppobject.objectSubId = 0;
- recordDependencyOn(&suppobject, &colobject, DEPENDENCY_INTERNAL);
-}
-
-/*
* ALTER TABLE ALTER COLUMN DROP NOT NULL
*/
static void
HeapTuple tuple;
AttrNumber attnum;
Relation attr_rel;
- NewConstraint *newcon;
/*
* lookup the attribute
/* keep the system catalog indexes current */
CatalogUpdateIndexes(attr_rel, tuple);
- /* Tell Phase 3 to test the constraint */
- newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
- newcon->contype = CONSTR_NOTNULL;
- newcon->attnum = attnum;
- newcon->name = "NOT NULL";
-
- tab->constraints = lappend(tab->constraints, newcon);
+ /* Tell Phase 3 it needs to test the constraint */
+ tab->new_notnull = true;
}
heap_close(attr_rel, RowExclusiveLock);
* This function is intended for CREATE TABLE, so it processes a
* _list_ of defaults, but we just do one.
*/
- AddRelationRawConstraints(rel, list_make1(rawEnt), NIL);
+ AddRelationNewConstraints(rel, list_make1(rawEnt), NIL, false, true);
}
}
Form_pg_attribute childatt;
childrel = heap_open(childrelid, AccessExclusiveLock);
+ CheckTableNotInUse(childrel, "ALTER TABLE");
tuple = SearchSysCacheCopyAttName(childrelid, colName);
if (!HeapTupleIsValid(tuple)) /* shouldn't happen */
{
/*
* If we were told to drop ONLY in this table (no recursion),
- * we need to mark the inheritors' attribute as locally
+ * we need to mark the inheritors' attributes as locally
* defined rather than inherited.
*/
childatt->attinhcount--;
/*
* ALTER TABLE ADD INDEX
*
- * There is no such command in the grammar, but the parser converts UNIQUE
- * and PRIMARY KEY constraints into AT_AddIndex subcommands. This lets us
- * schedule creation of the index at the appropriate time during ALTER.
+ * There is no such command in the grammar, but parse_utilcmd.c converts
+ * UNIQUE and PRIMARY KEY constraints into AT_AddIndex subcommands. This lets
+ * us schedule creation of the index at the appropriate time during ALTER.
*/
static void
ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
/* suppress notices when rebuilding existing index */
quiet = is_rebuild;
+ /* The IndexStmt has already been through transformIndexStmt */
+
DefineIndex(stmt->relation, /* relation */
stmt->idxname, /* index name */
InvalidOid, /* no predefined OID */
stmt->tableSpace,
stmt->indexParams, /* parameters */
(Expr *) stmt->whereClause,
- stmt->rangetable,
+ stmt->options,
stmt->unique,
stmt->primary,
stmt->isconstraint,
true, /* is_alter_table */
check_rights,
skip_build,
- quiet);
+ quiet,
+ false);
}
/*
* ALTER TABLE ADD CONSTRAINT
*/
static void
-ATExecAddConstraint(AlteredTableInfo *tab, Relation rel, Node *newConstraint)
+ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
+ Node *newConstraint, bool recurse)
{
switch (nodeTag(newConstraint))
{
/*
* Currently, we only expect to see CONSTR_CHECK nodes
* arriving here (see the preprocessing done in
- * parser/analyze.c). Use a switch anyway to make it easier
- * to add more code later.
+ * parse_utilcmd.c). Use a switch anyway to make it easier to
+ * add more code later.
*/
switch (constr->contype)
{
case CONSTR_CHECK:
- {
- List *newcons;
- ListCell *lcon;
-
- /*
- * Call AddRelationRawConstraints to do the work.
- * It returns a list of cooked constraints.
- */
- newcons = AddRelationRawConstraints(rel, NIL,
- list_make1(constr));
- /* Add each constraint to Phase 3's queue */
- foreach(lcon, newcons)
- {
- CookedConstraint *ccon = (CookedConstraint *) lfirst(lcon);
- NewConstraint *newcon;
-
- newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
- newcon->name = ccon->name;
- newcon->contype = ccon->contype;
- newcon->attnum = ccon->attnum;
- /* ExecQual wants implicit-AND format */
- newcon->qual = (Node *)
- make_ands_implicit((Expr *) ccon->expr);
-
- tab->constraints = lappend(tab->constraints,
- newcon);
- }
- break;
- }
+ ATAddCheckConstraint(wqueue, tab, rel,
+ constr, recurse, false);
+ break;
default:
elog(ERROR, "unrecognized constraint type: %d",
(int) constr->contype);
FkConstraint *fkconstraint = (FkConstraint *) newConstraint;
/*
+ * Note that we currently never recurse for FK constraints,
+ * so the "recurse" flag is silently ignored.
+ *
* Assign or validate constraint name
*/
if (fkconstraint->constr_name)
}
/*
+ * Add a check constraint to a single table and its children
+ *
+ * Subroutine for ATExecAddConstraint.
+ *
+ * We must recurse to child tables during execution, rather than using
+ * ALTER TABLE's normal prep-time recursion. The reason is that all the
+ * constraints *must* be given the same name, else they won't be seen as
+ * related later. If the user didn't explicitly specify a name, then
+ * AddRelationNewConstraints would normally assign different names to the
+ * child constraints. To fix that, we must capture the name assigned at
+ * the parent table and pass that down.
+ */
+static void
+ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
+ Constraint *constr, bool recurse, bool recursing)
+{
+ List *newcons;
+ ListCell *lcon;
+ List *children;
+ ListCell *child;
+
+ /* At top level, permission check was done in ATPrepCmd, else do it */
+ if (recursing)
+ ATSimplePermissions(rel, false);
+
+ /*
+ * Call AddRelationNewConstraints to do the work, making sure it works on
+ * a copy of the Constraint so transformExpr can't modify the original.
+ * It returns a list of cooked constraints.
+ *
+ * If the constraint ends up getting merged with a pre-existing one, it's
+ * omitted from the returned list, which is what we want: we do not need
+ * to do any validation work. That can only happen at child tables,
+ * though, since we disallow merging at the top level.
+ */
+ newcons = AddRelationNewConstraints(rel, NIL,
+ list_make1(copyObject(constr)),
+ recursing, !recursing);
+
+ /* Add each constraint to Phase 3's queue */
+ foreach(lcon, newcons)
+ {
+ CookedConstraint *ccon = (CookedConstraint *) lfirst(lcon);
+ NewConstraint *newcon;
+
+ newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
+ newcon->name = ccon->name;
+ newcon->contype = ccon->contype;
+ /* ExecQual wants implicit-AND format */
+ newcon->qual = (Node *) make_ands_implicit((Expr *) ccon->expr);
+
+ tab->constraints = lappend(tab->constraints, newcon);
+
+ /* Save the actually assigned name if it was defaulted */
+ if (constr->name == NULL)
+ constr->name = ccon->name;
+ }
+
+ /* At this point we must have a locked-down name to use */
+ Assert(constr->name != NULL);
+
+ /* Advance command counter in case same table is visited multiple times */
+ CommandCounterIncrement();
+
+ /*
+ * Propagate to children as appropriate. Unlike most other ALTER
+ * routines, we have to do this one level of recursion at a time; we can't
+ * use find_all_inheritors to do it in one pass.
+ */
+ children = find_inheritance_children(RelationGetRelid(rel));
+
+ /*
+ * If we are told not to recurse, there had better not be any child
+ * tables; else the addition would put them out of step.
+ */
+ if (children && !recurse)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg("constraint must be added to child tables too")));
+
+ foreach(child, children)
+ {
+ Oid childrelid = lfirst_oid(child);
+ Relation childrel;
+ AlteredTableInfo *childtab;
+
+ childrel = heap_open(childrelid, AccessExclusiveLock);
+ CheckTableNotInUse(childrel, "ALTER TABLE");
+
+ /* Find or create work queue entry for this table */
+ childtab = ATGetQueueEntry(wqueue, childrel);
+
+ /* Recurse to child */
+ ATAddCheckConstraint(wqueue, childtab, childrel,
+ constr, recurse, true);
+
+ heap_close(childrel, NoLock);
+ }
+}
+
+/*
* Add a foreign-key constraint to a single table
*
* Subroutine for ATExecAddConstraint. Must already hold exclusive
Oid pktypoid[INDEX_MAX_KEYS];
Oid fktypoid[INDEX_MAX_KEYS];
Oid opclasses[INDEX_MAX_KEYS];
+ Oid pfeqoperators[INDEX_MAX_KEYS];
+ Oid ppeqoperators[INDEX_MAX_KEYS];
+ Oid ffeqoperators[INDEX_MAX_KEYS];
int i;
int numfks,
numpks;
MemSet(pktypoid, 0, sizeof(pktypoid));
MemSet(fktypoid, 0, sizeof(fktypoid));
MemSet(opclasses, 0, sizeof(opclasses));
+ MemSet(pfeqoperators, 0, sizeof(pfeqoperators));
+ MemSet(ppeqoperators, 0, sizeof(ppeqoperators));
+ MemSet(ffeqoperators, 0, sizeof(ffeqoperators));
numfks = transformColumnNameList(RelationGetRelid(rel),
fkconstraint->fk_attrs,
opclasses);
}
- /* Be sure referencing and referenced column types are comparable */
+ /*
+ * Look up the equality operators to use in the constraint.
+ *
+ * Note that we have to be careful about the difference between the actual
+ * PK column type and the opclass' declared input type, which might be
+ * only binary-compatible with it. The declared opcintype is the right
+ * thing to probe pg_amop with.
+ */
if (numfks != numpks)
ereport(ERROR,
(errcode(ERRCODE_INVALID_FOREIGN_KEY),
for (i = 0; i < numpks; i++)
{
- /*
- * pktypoid[i] is the primary key table's i'th key's type fktypoid[i]
- * is the foreign key table's i'th key's type
- *
- * Note that we look for an operator with the PK type on the left;
- * when the types are different this is critical because the PK index
- * will need operators with the indexkey on the left. (Ordinarily both
- * commutator operators will exist if either does, but we won't get
- * the right answer from the test below on opclass membership unless
- * we select the proper operator.)
+ Oid pktype = pktypoid[i];
+ Oid fktype = fktypoid[i];
+ Oid fktyped;
+ HeapTuple cla_ht;
+ Form_pg_opclass cla_tup;
+ Oid amid;
+ Oid opfamily;
+ Oid opcintype;
+ Oid pfeqop;
+ Oid ppeqop;
+ Oid ffeqop;
+ int16 eqstrategy;
+
+ /* We need several fields out of the pg_opclass entry */
+ cla_ht = SearchSysCache(CLAOID,
+ ObjectIdGetDatum(opclasses[i]),
+ 0, 0, 0);
+ if (!HeapTupleIsValid(cla_ht))
+ elog(ERROR, "cache lookup failed for opclass %u", opclasses[i]);
+ cla_tup = (Form_pg_opclass) GETSTRUCT(cla_ht);
+ amid = cla_tup->opcmethod;
+ opfamily = cla_tup->opcfamily;
+ opcintype = cla_tup->opcintype;
+ ReleaseSysCache(cla_ht);
+
+ /*
+ * Check it's a btree; currently this can never fail since no other
+ * index AMs support unique indexes. If we ever did have other types
+ * of unique indexes, we'd need a way to determine which operator
+ * strategy number is equality. (Is it reasonable to insist that
+ * every such index AM use btree's number for equality?)
+ */
+ if (amid != BTREE_AM_OID)
+ elog(ERROR, "only b-tree indexes are supported for foreign keys");
+ eqstrategy = BTEqualStrategyNumber;
+
+ /*
+ * There had better be a primary equality operator for the index.
+ * We'll use it for PK = PK comparisons.
+ */
+ ppeqop = get_opfamily_member(opfamily, opcintype, opcintype,
+ eqstrategy);
+
+ if (!OidIsValid(ppeqop))
+ elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
+ eqstrategy, opcintype, opcintype, opfamily);
+
+ /*
+ * Are there equality operators that take exactly the FK type? Assume
+ * we should look through any domain here.
*/
- Operator o = oper(list_make1(makeString("=")),
- pktypoid[i], fktypoid[i], true);
+ fktyped = getBaseType(fktype);
+
+ pfeqop = get_opfamily_member(opfamily, opcintype, fktyped,
+ eqstrategy);
+ if (OidIsValid(pfeqop))
+ ffeqop = get_opfamily_member(opfamily, fktyped, fktyped,
+ eqstrategy);
+ else
+ ffeqop = InvalidOid; /* keep compiler quiet */
+
+ if (!(OidIsValid(pfeqop) && OidIsValid(ffeqop)))
+ {
+ /*
+ * Otherwise, look for an implicit cast from the FK type to the
+ * opcintype, and if found, use the primary equality operator.
+ * This is a bit tricky because opcintype might be a polymorphic
+ * type such as ANYARRAY or ANYENUM; so what we have to test is
+ * whether the two actual column types can be concurrently cast to
+ * that type. (Otherwise, we'd fail to reject combinations such
+ * as int[] and point[].)
+ */
+ Oid input_typeids[2];
+ Oid target_typeids[2];
+
+ input_typeids[0] = pktype;
+ input_typeids[1] = fktype;
+ target_typeids[0] = opcintype;
+ target_typeids[1] = opcintype;
+ if (can_coerce_type(2, input_typeids, target_typeids,
+ COERCION_IMPLICIT))
+ pfeqop = ffeqop = ppeqop;
+ }
- if (o == NULL)
+ if (!(OidIsValid(pfeqop) && OidIsValid(ffeqop)))
ereport(ERROR,
- (errcode(ERRCODE_UNDEFINED_FUNCTION),
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("foreign key constraint \"%s\" "
"cannot be implemented",
fkconstraint->constr_name),
"are of incompatible types: %s and %s.",
strVal(list_nth(fkconstraint->fk_attrs, i)),
strVal(list_nth(fkconstraint->pk_attrs, i)),
- format_type_be(fktypoid[i]),
- format_type_be(pktypoid[i]))));
-
- /*
- * Check that the found operator is compatible with the PK index, and
- * generate a warning if not, since otherwise costly seqscans will be
- * incurred to check FK validity.
- */
- if (!op_in_opclass(oprid(o), opclasses[i]))
- ereport(WARNING,
- (errmsg("foreign key constraint \"%s\" "
- "will require costly sequential scans",
- fkconstraint->constr_name),
- errdetail("Key columns \"%s\" and \"%s\" "
- "are of different types: %s and %s.",
- strVal(list_nth(fkconstraint->fk_attrs, i)),
- strVal(list_nth(fkconstraint->pk_attrs, i)),
- format_type_be(fktypoid[i]),
- format_type_be(pktypoid[i]))));
-
- ReleaseSysCache(o);
- }
-
- /*
- * Tell Phase 3 to check that the constraint is satisfied by existing rows
- * (we can skip this during table creation).
- */
- if (!fkconstraint->skip_validation)
- {
- NewConstraint *newcon;
-
- newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
- newcon->name = fkconstraint->constr_name;
- newcon->contype = CONSTR_FOREIGN;
- newcon->refrelid = RelationGetRelid(pkrel);
- newcon->qual = (Node *) fkconstraint;
+ format_type_be(fktype),
+ format_type_be(pktype))));
- tab->constraints = lappend(tab->constraints, newcon);
+ pfeqoperators[i] = pfeqop;
+ ppeqoperators[i] = ppeqop;
+ ffeqoperators[i] = ffeqop;
}
/*
* constraint */
RelationGetRelid(pkrel),
pkattnum,
+ pfeqoperators,
+ ppeqoperators,
+ ffeqoperators,
numpks,
fkconstraint->fk_upd_action,
fkconstraint->fk_del_action,
indexOid,
NULL, /* no check constraint */
NULL,
- NULL);
+ NULL,
+ true, /* islocal */
+ 0); /* inhcount */
/*
* Create the triggers that will enforce the constraint.
createForeignKeyTriggers(rel, fkconstraint, constrOid);
/*
+ * Tell Phase 3 to check that the constraint is satisfied by existing rows
+ * (we can skip this during table creation).
+ */
+ if (!fkconstraint->skip_validation)
+ {
+ NewConstraint *newcon;
+
+ newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
+ newcon->name = fkconstraint->constr_name;
+ newcon->contype = CONSTR_FOREIGN;
+ newcon->refrelid = RelationGetRelid(pkrel);
+ newcon->conid = constrOid;
+ newcon->qual = (Node *) fkconstraint;
+
+ tab->constraints = lappend(tab->constraints, newcon);
+ }
+
+ /*
* Close pk table, but keep lock until we've committed.
*/
heap_close(pkrel, NoLock);
static void
validateForeignKeyConstraint(FkConstraint *fkconstraint,
Relation rel,
- Relation pkrel)
+ Relation pkrel,
+ Oid constraintOid)
{
HeapScanDesc scan;
HeapTuple tuple;
Trigger trig;
- ListCell *list;
- int count;
-
- /*
- * See if we can do it with a single LEFT JOIN query. A FALSE result
- * indicates we must proceed with the fire-the-trigger method.
- */
- if (RI_Initial_Check(fkconstraint, rel, pkrel))
- return;
/*
- * Scan through each tuple, calling RI_FKey_check_ins (insert trigger) as
- * if that tuple had just been inserted. If any of those fail, it should
- * ereport(ERROR) and that's that.
+ * Build a trigger call structure; we'll need it either way.
*/
MemSet(&trig, 0, sizeof(trig));
trig.tgoid = InvalidOid;
trig.tgname = fkconstraint->constr_name;
- trig.tgenabled = TRUE;
+ trig.tgenabled = TRIGGER_FIRES_ON_ORIGIN;
trig.tgisconstraint = TRUE;
trig.tgconstrrelid = RelationGetRelid(pkrel);
+ trig.tgconstraint = constraintOid;
trig.tgdeferrable = FALSE;
trig.tginitdeferred = FALSE;
+ /* we needn't fill in tgargs */
- trig.tgargs = (char **) palloc(sizeof(char *) *
- (4 + list_length(fkconstraint->fk_attrs)
- + list_length(fkconstraint->pk_attrs)));
-
- trig.tgargs[0] = trig.tgname;
- trig.tgargs[1] = RelationGetRelationName(rel);
- trig.tgargs[2] = RelationGetRelationName(pkrel);
- trig.tgargs[3] = fkMatchTypeToString(fkconstraint->fk_matchtype);
- count = 4;
- foreach(list, fkconstraint->fk_attrs)
- {
- char *fk_at = strVal(lfirst(list));
-
- trig.tgargs[count] = fk_at;
- count += 2;
- }
- count = 5;
- foreach(list, fkconstraint->pk_attrs)
- {
- char *pk_at = strVal(lfirst(list));
-
- trig.tgargs[count] = pk_at;
- count += 2;
- }
- trig.tgnargs = count - 1;
+ /*
+ * See if we can do it with a single LEFT JOIN query. A FALSE result
+ * indicates we must proceed with the fire-the-trigger method.
+ */
+ if (RI_Initial_Check(&trig, rel, pkrel))
+ return;
+ /*
+ * Scan through each tuple, calling RI_FKey_check_ins (insert trigger) as
+ * if that tuple had just been inserted. If any of those fail, it should
+ * ereport(ERROR) and that's that.
+ */
scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
}
heap_endscan(scan);
-
- pfree(trig.tgargs);
}
static void
CreateFKCheckTrigger(RangeVar *myRel, FkConstraint *fkconstraint,
- ObjectAddress *constrobj, ObjectAddress *trigobj,
- bool on_insert)
+ Oid constraintOid, bool on_insert)
{
CreateTrigStmt *fk_trigger;
- ListCell *fk_attr;
- ListCell *pk_attr;
fk_trigger = makeNode(CreateTrigStmt);
fk_trigger->trigname = fkconstraint->constr_name;
fk_trigger->deferrable = fkconstraint->deferrable;
fk_trigger->initdeferred = fkconstraint->initdeferred;
fk_trigger->constrrel = fkconstraint->pktable;
-
fk_trigger->args = NIL;
- fk_trigger->args = lappend(fk_trigger->args,
- makeString(fkconstraint->constr_name));
- fk_trigger->args = lappend(fk_trigger->args,
- makeString(myRel->relname));
- fk_trigger->args = lappend(fk_trigger->args,
- makeString(fkconstraint->pktable->relname));
- fk_trigger->args = lappend(fk_trigger->args,
- makeString(fkMatchTypeToString(fkconstraint->fk_matchtype)));
- if (list_length(fkconstraint->fk_attrs) != list_length(fkconstraint->pk_attrs))
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_FOREIGN_KEY),
- errmsg("number of referencing and referenced columns for foreign key disagree")));
-
- forboth(fk_attr, fkconstraint->fk_attrs,
- pk_attr, fkconstraint->pk_attrs)
- {
- fk_trigger->args = lappend(fk_trigger->args, lfirst(fk_attr));
- fk_trigger->args = lappend(fk_trigger->args, lfirst(pk_attr));
- }
-
- trigobj->objectId = CreateTrigger(fk_trigger, true);
- /* Register dependency from trigger to constraint */
- recordDependencyOn(trigobj, constrobj, DEPENDENCY_INTERNAL);
+ (void) CreateTrigger(fk_trigger, constraintOid);
/* Make changes-so-far visible */
CommandCounterIncrement();
*/
static void
createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint,
- Oid constrOid)
+ Oid constraintOid)
{
RangeVar *myRel;
CreateTrigStmt *fk_trigger;
- ListCell *fk_attr;
- ListCell *pk_attr;
- ObjectAddress trigobj,
- constrobj;
/*
* Reconstruct a RangeVar for my relation (not passed in, unfortunately).
myRel = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)),
pstrdup(RelationGetRelationName(rel)));
- /*
- * Preset objectAddress fields
- */
- constrobj.classId = ConstraintRelationId;
- constrobj.objectId = constrOid;
- constrobj.objectSubId = 0;
- trigobj.classId = TriggerRelationId;
- trigobj.objectSubId = 0;
-
/* Make changes-so-far visible */
CommandCounterIncrement();
* Build and execute a CREATE CONSTRAINT TRIGGER statement for the CHECK
* action for both INSERTs and UPDATEs on the referencing table.
*/
- CreateFKCheckTrigger(myRel, fkconstraint, &constrobj, &trigobj, true);
- CreateFKCheckTrigger(myRel, fkconstraint, &constrobj, &trigobj, false);
+ CreateFKCheckTrigger(myRel, fkconstraint, constraintOid, true);
+ CreateFKCheckTrigger(myRel, fkconstraint, constraintOid, false);
/*
* Build and execute a CREATE CONSTRAINT TRIGGER statement for the ON
(int) fkconstraint->fk_del_action);
break;
}
-
fk_trigger->args = NIL;
- fk_trigger->args = lappend(fk_trigger->args,
- makeString(fkconstraint->constr_name));
- fk_trigger->args = lappend(fk_trigger->args,
- makeString(myRel->relname));
- fk_trigger->args = lappend(fk_trigger->args,
- makeString(fkconstraint->pktable->relname));
- fk_trigger->args = lappend(fk_trigger->args,
- makeString(fkMatchTypeToString(fkconstraint->fk_matchtype)));
- forboth(fk_attr, fkconstraint->fk_attrs,
- pk_attr, fkconstraint->pk_attrs)
- {
- fk_trigger->args = lappend(fk_trigger->args, lfirst(fk_attr));
- fk_trigger->args = lappend(fk_trigger->args, lfirst(pk_attr));
- }
- trigobj.objectId = CreateTrigger(fk_trigger, true);
-
- /* Register dependency from trigger to constraint */
- recordDependencyOn(&trigobj, &constrobj, DEPENDENCY_INTERNAL);
+ (void) CreateTrigger(fk_trigger, constraintOid);
/* Make changes-so-far visible */
CommandCounterIncrement();
(int) fkconstraint->fk_upd_action);
break;
}
-
fk_trigger->args = NIL;
- fk_trigger->args = lappend(fk_trigger->args,
- makeString(fkconstraint->constr_name));
- fk_trigger->args = lappend(fk_trigger->args,
- makeString(myRel->relname));
- fk_trigger->args = lappend(fk_trigger->args,
- makeString(fkconstraint->pktable->relname));
- fk_trigger->args = lappend(fk_trigger->args,
- makeString(fkMatchTypeToString(fkconstraint->fk_matchtype)));
- forboth(fk_attr, fkconstraint->fk_attrs,
- pk_attr, fkconstraint->pk_attrs)
- {
- fk_trigger->args = lappend(fk_trigger->args, lfirst(fk_attr));
- fk_trigger->args = lappend(fk_trigger->args, lfirst(pk_attr));
- }
-
- trigobj.objectId = CreateTrigger(fk_trigger, true);
-
- /* Register dependency from trigger to constraint */
- recordDependencyOn(&trigobj, &constrobj, DEPENDENCY_INTERNAL);
-}
-/*
- * fkMatchTypeToString -
- * convert FKCONSTR_MATCH_xxx code to string to use in trigger args
- */
-static char *
-fkMatchTypeToString(char match_type)
-{
- switch (match_type)
- {
- case FKCONSTR_MATCH_FULL:
- return pstrdup("FULL");
- case FKCONSTR_MATCH_PARTIAL:
- return pstrdup("PARTIAL");
- case FKCONSTR_MATCH_UNSPECIFIED:
- return pstrdup("UNSPECIFIED");
- default:
- elog(ERROR, "unrecognized match type: %d",
- (int) match_type);
- }
- return NULL; /* can't get here */
+ (void) CreateTrigger(fk_trigger, constraintOid);
}
/*
* ALTER TABLE DROP CONSTRAINT
+ *
+ * Like DROP COLUMN, we can't use the normal ALTER TABLE recursion mechanism.
*/
static void
-ATPrepDropConstraint(List **wqueue, Relation rel,
- bool recurse, AlterTableCmd *cmd)
+ATExecDropConstraint(Relation rel, const char *constrName,
+ DropBehavior behavior,
+ bool recurse, bool recursing)
{
+ List *children;
+ ListCell *child;
+ Relation conrel;
+ Form_pg_constraint con;
+ SysScanDesc scan;
+ ScanKeyData key;
+ HeapTuple tuple;
+ bool found = false;
+ bool is_check_constraint = false;
+
+ /* At top level, permission check was done in ATPrepCmd, else do it */
+ if (recursing)
+ ATSimplePermissions(rel, false);
+
+ conrel = heap_open(ConstraintRelationId, RowExclusiveLock);
+
/*
- * We don't want errors or noise from child tables, so we have to pass
- * down a modified command.
+ * Find and drop the target constraint
*/
- if (recurse)
+ ScanKeyInit(&key,
+ Anum_pg_constraint_conrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(RelationGetRelid(rel)));
+ scan = systable_beginscan(conrel, ConstraintRelidIndexId,
+ true, SnapshotNow, 1, &key);
+
+ while (HeapTupleIsValid(tuple = systable_getnext(scan)))
{
- AlterTableCmd *childCmd = copyObject(cmd);
+ ObjectAddress conobj;
+
+ con = (Form_pg_constraint) GETSTRUCT(tuple);
+
+ if (strcmp(NameStr(con->conname), constrName) != 0)
+ continue;
+
+ /* Don't drop inherited constraints */
+ if (con->coninhcount > 0 && !recursing)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg("cannot drop inherited constraint \"%s\" of relation \"%s\"",
+ constrName, RelationGetRelationName(rel))));
+
+ /* Right now only CHECK constraints can be inherited */
+ if (con->contype == CONSTRAINT_CHECK)
+ is_check_constraint = true;
+
+ /*
+ * Perform the actual constraint deletion
+ */
+ conobj.classId = ConstraintRelationId;
+ conobj.objectId = HeapTupleGetOid(tuple);
+ conobj.objectSubId = 0;
- childCmd->subtype = AT_DropConstraintQuietly;
- ATSimpleRecursion(wqueue, rel, childCmd, recurse);
+ performDeletion(&conobj, behavior);
+
+ found = true;
}
-}
-static void
-ATExecDropConstraint(Relation rel, const char *constrName,
- DropBehavior behavior, bool quiet)
-{
- int deleted;
+ systable_endscan(scan);
+
+ if (!found)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("constraint \"%s\" of relation \"%s\" does not exist",
+ constrName, RelationGetRelationName(rel))));
- deleted = RemoveRelConstraints(rel, constrName, behavior);
+ /*
+ * Propagate to children as appropriate. Unlike most other ALTER
+ * routines, we have to do this one level of recursion at a time; we can't
+ * use find_all_inheritors to do it in one pass.
+ */
+ if (is_check_constraint)
+ children = find_inheritance_children(RelationGetRelid(rel));
+ else
+ children = NIL;
- if (!quiet)
+ foreach(child, children)
{
- /* If zero constraints deleted, complain */
- if (deleted == 0)
+ Oid childrelid = lfirst_oid(child);
+ Relation childrel;
+
+ childrel = heap_open(childrelid, AccessExclusiveLock);
+ CheckTableNotInUse(childrel, "ALTER TABLE");
+
+ ScanKeyInit(&key,
+ Anum_pg_constraint_conrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(childrelid));
+ scan = systable_beginscan(conrel, ConstraintRelidIndexId,
+ true, SnapshotNow, 1, &key);
+
+ found = false;
+
+ while (HeapTupleIsValid(tuple = systable_getnext(scan)))
+ {
+ HeapTuple copy_tuple;
+
+ con = (Form_pg_constraint) GETSTRUCT(tuple);
+
+ /* Right now only CHECK constraints can be inherited */
+ if (con->contype != CONSTRAINT_CHECK)
+ continue;
+
+ if (strcmp(NameStr(con->conname), constrName) != 0)
+ continue;
+
+ found = true;
+
+ if (con->coninhcount <= 0) /* shouldn't happen */
+ elog(ERROR, "relation %u has non-inherited constraint \"%s\"",
+ childrelid, constrName);
+
+ copy_tuple = heap_copytuple(tuple);
+ con = (Form_pg_constraint) GETSTRUCT(copy_tuple);
+
+ if (recurse)
+ {
+ /*
+ * If the child constraint has other definition sources,
+ * just decrement its inheritance count; if not, recurse
+ * to delete it.
+ */
+ if (con->coninhcount == 1 && !con->conislocal)
+ {
+ /* Time to delete this child constraint, too */
+ ATExecDropConstraint(childrel, constrName, behavior,
+ true, true);
+ }
+ else
+ {
+ /* Child constraint must survive my deletion */
+ con->coninhcount--;
+ simple_heap_update(conrel, ©_tuple->t_self, copy_tuple);
+ CatalogUpdateIndexes(conrel, copy_tuple);
+
+ /* Make update visible */
+ CommandCounterIncrement();
+ }
+ }
+ else
+ {
+ /*
+ * If we were told to drop ONLY in this table (no
+ * recursion), we need to mark the inheritors' constraints
+ * as locally defined rather than inherited.
+ */
+ con->coninhcount--;
+ con->conislocal = true;
+
+ simple_heap_update(conrel, ©_tuple->t_self, copy_tuple);
+ CatalogUpdateIndexes(conrel, copy_tuple);
+
+ /* Make update visible */
+ CommandCounterIncrement();
+ }
+
+ heap_freetuple(copy_tuple);
+ }
+
+ systable_endscan(scan);
+
+ if (!found)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
- errmsg("constraint \"%s\" does not exist",
- constrName)));
- /* Otherwise if more than one constraint deleted, notify */
- else if (deleted > 1)
- ereport(NOTICE,
- (errmsg("multiple constraints named \"%s\" were dropped",
- constrName)));
+ errmsg("constraint \"%s\" of relation \"%s\" does not exist",
+ constrName,
+ RelationGetRelationName(childrel))));
+
+ heap_close(childrel, NoLock);
}
+
+ heap_close(conrel, RowExclusiveLock);
}
/*
Form_pg_attribute attTup;
AttrNumber attnum;
Oid targettype;
+ int32 targettypmod;
Node *transform;
NewColumnValue *newval;
ParseState *pstate = make_parsestate(NULL);
colName)));
/* Look up the target type */
- targettype = LookupTypeName(typename);
- if (!OidIsValid(targettype))
- ereport(ERROR,
- (errcode(ERRCODE_UNDEFINED_OBJECT),
- errmsg("type \"%s\" does not exist",
- TypeNameToString(typename))));
+ targettype = typenameTypeId(NULL, typename, &targettypmod);
/* make sure datatype is legal for a column */
CheckAttributeType(colName, targettype);
transform = coerce_to_target_type(pstate,
transform, exprType(transform),
- targettype, typename->typmod,
+ targettype, targettypmod,
COERCION_ASSIGNMENT,
COERCE_IMPLICIT_CAST);
if (transform == NULL)
HeapTuple typeTuple;
Form_pg_type tform;
Oid targettype;
+ int32 targettypmod;
Node *defaultexpr;
Relation attrelation;
Relation depRel;
colName)));
/* Look up the target type (should not fail, since prep found it) */
- typeTuple = typenameType(typename);
+ typeTuple = typenameType(NULL, typename, &targettypmod);
tform = (Form_pg_type) GETSTRUCT(typeTuple);
targettype = HeapTupleGetOid(typeTuple);
defaultexpr = strip_implicit_coercions(defaultexpr);
defaultexpr = coerce_to_target_type(NULL, /* no UNKNOWN params */
defaultexpr, exprType(defaultexpr),
- targettype, typename->typmod,
+ targettype, targettypmod,
COERCION_ASSIGNMENT,
COERCE_IMPLICIT_CAST);
if (defaultexpr == NULL)
if (!list_member_oid(tab->changedConstraintOids,
foundObject.objectId))
{
- char *defstring = pg_get_constraintdef_string(foundObject.objectId);
+ char *defstring = pg_get_constraintdef_string(foundObject.objectId);
/*
* Put NORMAL dependencies at the front of the list and
case OCLASS_LANGUAGE:
case OCLASS_OPERATOR:
case OCLASS_OPCLASS:
+ case OCLASS_OPFAMILY:
case OCLASS_TRIGGER:
case OCLASS_SCHEMA:
+ case OCLASS_TSPARSER:
+ case OCLASS_TSDICT:
+ case OCLASS_TSTEMPLATE:
+ case OCLASS_TSCONFIG:
/*
* We don't expect any of these sorts of objects to depend on
* copy of the syscache entry, so okay to scribble on.)
*/
attTup->atttypid = targettype;
- attTup->atttypmod = typename->typmod;
+ attTup->atttypmod = targettypmod;
attTup->attndims = list_length(typename->arrayBounds);
attTup->attlen = tform->typlen;
attTup->attbyval = tform->typbyval;
*/
RemoveAttrDefault(RelationGetRelid(rel), attnum, DROP_RESTRICT, true);
- StoreAttrDefault(rel, attnum, nodeToString(defaultexpr));
+ StoreAttrDefault(rel, attnum, defaultexpr);
}
/* Cleanup */
/*
* Now we can drop the existing constraints and indexes --- constraints
* first, since some of them might depend on the indexes. In fact, we
- * have to delete FOREIGN KEY constraints before UNIQUE constraints,
- * but we already ordered the constraint list to ensure that would happen.
- * It should be okay to use DROP_RESTRICT here, since nothing else should
- * be depending on these objects.
+ * have to delete FOREIGN KEY constraints before UNIQUE constraints, but
+ * we already ordered the constraint list to ensure that would happen. It
+ * should be okay to use DROP_RESTRICT here, since nothing else should be
+ * depending on these objects.
*/
foreach(l, tab->changedConstraintOids)
{
ListCell *list_item;
/*
- * We expect that we only have to do raw parsing and parse analysis, not
- * any rule rewriting, since these will all be utility statements.
+ * We expect that we will get only ALTER TABLE and CREATE INDEX
+ * statements. Hence, there is no need to pass them through
+ * parse_analyze() or the rewriter, but instead we need to pass them
+ * through parse_utilcmd.c to make them ready for execution.
*/
raw_parsetree_list = raw_parser(cmd);
querytree_list = NIL;
foreach(list_item, raw_parsetree_list)
{
- Node *parsetree = (Node *) lfirst(list_item);
-
- querytree_list = list_concat(querytree_list,
- parse_analyze(parsetree, NULL, 0));
+ Node *stmt = (Node *) lfirst(list_item);
+
+ if (IsA(stmt, IndexStmt))
+ querytree_list = lappend(querytree_list,
+ transformIndexStmt((IndexStmt *) stmt,
+ cmd));
+ else if (IsA(stmt, AlterTableStmt))
+ querytree_list = list_concat(querytree_list,
+ transformAlterTableStmt((AlterTableStmt *) stmt,
+ cmd));
+ else
+ querytree_list = lappend(querytree_list, stmt);
}
/*
*/
foreach(list_item, querytree_list)
{
- Query *query = (Query *) lfirst(list_item);
+ Node *stm = (Node *) lfirst(list_item);
Relation rel;
AlteredTableInfo *tab;
- Assert(IsA(query, Query));
- Assert(query->commandType == CMD_UTILITY);
- switch (nodeTag(query->utilityStmt))
+ switch (nodeTag(stm))
{
case T_IndexStmt:
{
- IndexStmt *stmt = (IndexStmt *) query->utilityStmt;
+ IndexStmt *stmt = (IndexStmt *) stm;
AlterTableCmd *newcmd;
rel = relation_openrv(stmt->relation, AccessExclusiveLock);
}
case T_AlterTableStmt:
{
- AlterTableStmt *stmt = (AlterTableStmt *) query->utilityStmt;
+ AlterTableStmt *stmt = (AlterTableStmt *) stm;
ListCell *lcmd;
rel = relation_openrv(stmt->relation, AccessExclusiveLock);
}
default:
elog(ERROR, "unexpected statement type: %d",
- (int) nodeTag(query->utilityStmt));
+ (int) nodeTag(stm));
}
}
}
/*
* ALTER TABLE OWNER
*
- * recursing is true if we are recursing from a table to its indexes or
- * toast table. We don't allow the ownership of those things to be
- * changed separately from the parent table. Also, we can skip permission
+ * recursing is true if we are recursing from a table to its indexes,
+ * sequences, or toast table. We don't allow the ownership of those things to
+ * be changed separately from the parent table. Also, we can skip permission
* checks (this is necessary not just an optimization, else we'd fail to
* handle toast tables properly).
+ *
+ * recursing is also true if ALTER TYPE OWNER is calling us to fix up a
+ * free-standing composite type.
*/
void
ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing)
{
case RELKIND_RELATION:
case RELKIND_VIEW:
- case RELKIND_SEQUENCE:
/* ok to change owner */
break;
case RELKIND_INDEX:
newOwnerId = tuple_class->relowner;
}
break;
+ case RELKIND_SEQUENCE:
+ if (!recursing &&
+ tuple_class->relowner != newOwnerId)
+ {
+ /* if it's an owned sequence, disallow changing it by itself */
+ Oid tableId;
+ int32 colId;
+
+ if (sequenceIsOwned(relationOid, &tableId, &colId))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot change owner of sequence \"%s\"",
+ NameStr(tuple_class->relname)),
+ errdetail("Sequence \"%s\" is linked to table \"%s\".",
+ NameStr(tuple_class->relname),
+ get_rel_name(tableId))));
+ }
+ break;
+ case RELKIND_COMPOSITE_TYPE:
+ if (recursing)
+ break;
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is a composite type",
+ NameStr(tuple_class->relname)),
+ errhint("Use ALTER TYPE instead.")));
+ break;
case RELKIND_TOASTVALUE:
if (recursing)
break;
heap_freetuple(newtuple);
- /* Update owner dependency reference */
- changeDependencyOnOwner(RelationRelationId, relationOid, newOwnerId);
+ /*
+ * Update owner dependency reference, if any. A composite type has
+ * none, because it's tracked for the pg_type entry instead of here;
+ * indexes and TOAST tables don't have their own entries either.
+ */
+ if (tuple_class->relkind != RELKIND_COMPOSITE_TYPE &&
+ tuple_class->relkind != RELKIND_INDEX &&
+ tuple_class->relkind != RELKIND_TOASTVALUE)
+ changeDependencyOnOwner(RelationRelationId, relationOid,
+ newOwnerId);
/*
* Also change the ownership of the table's rowtype, if it has one
*/
if (tuple_class->relkind != RELKIND_INDEX)
- AlterTypeOwnerInternal(tuple_class->reltype, newOwnerId);
+ AlterTypeOwnerInternal(tuple_class->reltype, newOwnerId,
+ tuple_class->relkind == RELKIND_COMPOSITE_TYPE);
/*
* If we are operating on a table, also change the ownership of any
HeapTuple tup;
/*
- * SERIAL sequences are those having an internal dependency on one of the
+ * SERIAL sequences are those having an auto dependency on one of the
* table's columns (we don't care *which* column, exactly).
*/
depRel = heap_open(DependRelationId, AccessShareLock);
Form_pg_depend depForm = (Form_pg_depend) GETSTRUCT(tup);
Relation seqRel;
- /* skip dependencies other than internal dependencies on columns */
+ /* skip dependencies other than auto dependencies on columns */
if (depForm->refobjsubid == 0 ||
depForm->classid != RelationRelationId ||
depForm->objsubid != 0 ||
- depForm->deptype != DEPENDENCY_INTERNAL)
+ depForm->deptype != DEPENDENCY_AUTO)
continue;
/* Use relation_open just in case it's an index */
}
/* We don't need to close the sequence while we alter it. */
- ATExecChangeOwner(depForm->objid, newOwnerId, false);
+ ATExecChangeOwner(depForm->objid, newOwnerId, true);
/* Now we can close it. Keep the lock till end of transaction. */
relation_close(seqRel, NoLock);
Oid tablespaceId;
AclResult aclresult;
- /*
- * We do our own permission checking because we want to allow this on
- * indexes.
- */
- if (rel->rd_rel->relkind != RELKIND_RELATION &&
- rel->rd_rel->relkind != RELKIND_INDEX)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not a table or index",
- RelationGetRelationName(rel))));
-
- /* Permissions checks */
- if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
- aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
- RelationGetRelationName(rel));
-
- if (!allowSystemTableMods && IsSystemRelation(rel))
- ereport(ERROR,
- (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
- errmsg("permission denied: \"%s\" is a system catalog",
- RelationGetRelationName(rel))));
-
/* Check that the tablespace exists */
tablespaceId = get_tablespace_oid(tablespacename);
if (!OidIsValid(tablespaceId))
}
/*
- * Execute ALTER TABLE SET TABLESPACE for cases where there is no tuple
- * rewriting to be done, so we just want to copy the data as fast as possible.
+ * ALTER TABLE/INDEX SET (...) or RESET (...)
*/
static void
-ATExecSetTableSpace(Oid tableOid, Oid newTableSpace)
+ATExecSetRelOptions(Relation rel, List *defList, bool isReset)
{
- Relation rel;
- Oid oldTableSpace;
- Oid reltoastrelid;
- Oid reltoastidxid;
- RelFileNode newrnode;
- SMgrRelation dstrel;
+ Oid relid;
+ Relation pgclass;
+ HeapTuple tuple;
+ HeapTuple newtuple;
+ Datum datum;
+ bool isnull;
+ Datum newOptions;
+ Datum repl_val[Natts_pg_class];
+ char repl_null[Natts_pg_class];
+ char repl_repl[Natts_pg_class];
+
+ if (defList == NIL)
+ return; /* nothing to do */
+
+ pgclass = heap_open(RelationRelationId, RowExclusiveLock);
+
+ /* Get the old reloptions */
+ relid = RelationGetRelid(rel);
+ tuple = SearchSysCache(RELOID,
+ ObjectIdGetDatum(relid),
+ 0, 0, 0);
+ if (!HeapTupleIsValid(tuple))
+ elog(ERROR, "cache lookup failed for relation %u", relid);
+
+ datum = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reloptions, &isnull);
+
+ /* Generate new proposed reloptions (text array) */
+ newOptions = transformRelOptions(isnull ? (Datum) 0 : datum,
+ defList, false, isReset);
+
+ /* Validate */
+ switch (rel->rd_rel->relkind)
+ {
+ case RELKIND_RELATION:
+ case RELKIND_TOASTVALUE:
+ (void) heap_reloptions(rel->rd_rel->relkind, newOptions, true);
+ break;
+ case RELKIND_INDEX:
+ (void) index_reloptions(rel->rd_am->amoptions, newOptions, true);
+ break;
+ default:
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is not a table, index, or TOAST table",
+ RelationGetRelationName(rel))));
+ break;
+ }
+
+ /*
+ * All we need do here is update the pg_class row; the new options will be
+ * propagated into relcaches during post-commit cache inval.
+ */
+ memset(repl_val, 0, sizeof(repl_val));
+ memset(repl_null, ' ', sizeof(repl_null));
+ memset(repl_repl, ' ', sizeof(repl_repl));
+
+ if (newOptions != (Datum) 0)
+ repl_val[Anum_pg_class_reloptions - 1] = newOptions;
+ else
+ repl_null[Anum_pg_class_reloptions - 1] = 'n';
+
+ repl_repl[Anum_pg_class_reloptions - 1] = 'r';
+
+ newtuple = heap_modifytuple(tuple, RelationGetDescr(pgclass),
+ repl_val, repl_null, repl_repl);
+
+ simple_heap_update(pgclass, &newtuple->t_self, newtuple);
+
+ CatalogUpdateIndexes(pgclass, newtuple);
+
+ heap_freetuple(newtuple);
+
+ ReleaseSysCache(tuple);
+
+ heap_close(pgclass, RowExclusiveLock);
+}
+
+/*
+ * Execute ALTER TABLE SET TABLESPACE for cases where there is no tuple
+ * rewriting to be done, so we just want to copy the data as fast as possible.
+ */
+static void
+ATExecSetTableSpace(Oid tableOid, Oid newTableSpace)
+{
+ Relation rel;
+ Oid oldTableSpace;
+ Oid reltoastrelid;
+ Oid reltoastidxid;
+ RelFileNode newrnode;
+ SMgrRelation dstrel;
Relation pg_class;
HeapTuple tuple;
Form_pg_class rd_rel;
- rel = relation_open(tableOid, NoLock);
+ /*
+ * Need lock here in case we are recursing to toast table or index
+ */
+ rel = relation_open(tableOid, AccessExclusiveLock);
/*
* We can never allow moving of shared or nailed-in-cache relations,
errmsg("cannot move system relation \"%s\"",
RelationGetRelationName(rel))));
+ /* Can't move a non-shared relation into pg_global */
+ if (newTableSpace == GLOBALTABLESPACE_OID)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("only shared relations can be placed in pg_global tablespace")));
+
/*
* Don't allow moving temp tables of other backends ... their local buffer
* manager is not going to cope.
/* XLOG stuff */
if (use_wal)
- {
- xl_heap_newpage xlrec;
- XLogRecPtr recptr;
- XLogRecData rdata[2];
-
- /* NO ELOG(ERROR) from here till newpage op is logged */
- START_CRIT_SECTION();
-
- xlrec.node = dst->smgr_rnode;
- xlrec.blkno = blkno;
-
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfHeapNewpage;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &(rdata[1]);
-
- rdata[1].data = (char *) page;
- rdata[1].len = BLCKSZ;
- rdata[1].buffer = InvalidBuffer;
- rdata[1].next = NULL;
-
- recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_NEWPAGE, rdata);
-
- PageSetLSN(page, recptr);
- PageSetTLI(page, ThisTimeLineID);
-
- END_CRIT_SECTION();
- }
+ log_newpage(&dst->smgr_rnode, blkno, page);
/*
* Now write the page. We say isTemp = true even if it's not a temp
* rel, because there's no need for smgr to schedule an fsync for this
* write; we'll do it ourselves below.
*/
- smgrwrite(dst, blkno, buf, true);
+ smgrextend(dst, blkno, buf, true);
}
/*
*/
static void
ATExecEnableDisableTrigger(Relation rel, char *trigname,
- bool enable, bool skip_system)
+ char fires_when, bool skip_system)
{
- EnableDisableTrigger(rel, trigname, enable, skip_system);
+ EnableDisableTrigger(rel, trigname, fires_when, skip_system);
}
/*
- * ALTER TABLE CREATE TOAST TABLE
+ * ALTER TABLE ENABLE/DISABLE RULE
*
- * Note: this is also invoked from outside this module; in such cases we
- * expect the caller to have verified that the relation is a table and we
- * have all the right permissions. Callers expect this function
- * to end with CommandCounterIncrement if it makes any changes.
+ * We just pass this off to rewriteDefine.c.
*/
-void
-AlterTableCreateToastTable(Oid relOid, bool silent)
+static void
+ATExecEnableDisableRule(Relation rel, char *trigname,
+ char fires_when)
{
- Relation rel;
- HeapTuple reltup;
- TupleDesc tupdesc;
- bool shared_relation;
- Relation class_rel;
- Oid toast_relid;
- Oid toast_idxid;
- char toast_relname[NAMEDATALEN];
- char toast_idxname[NAMEDATALEN];
- IndexInfo *indexInfo;
- Oid classObjectId[2];
- ObjectAddress baseobject,
- toastobject;
+ EnableDisableRule(rel, trigname, fires_when);
+}
+
+/*
+ * ALTER TABLE INHERIT
+ *
+ * Add a parent to the child's parents. This verifies that all the columns and
+ * check constraints of the parent appear in the child and that they have the
+ * same data types and expressions.
+ */
+static void
+ATExecAddInherit(Relation child_rel, RangeVar *parent)
+{
+ Relation parent_rel,
+ catalogRelation;
+ SysScanDesc scan;
+ ScanKeyData key;
+ HeapTuple inheritsTuple;
+ int32 inhseqno;
+ List *children;
/*
- * Grab an exclusive lock on the target table, which we will NOT release
- * until end of transaction. (This is probably redundant in all present
- * uses...)
+ * AccessShareLock on the parent is what's obtained during normal CREATE
+ * TABLE ... INHERITS ..., so should be enough here.
*/
- rel = heap_open(relOid, AccessExclusiveLock);
+ parent_rel = heap_openrv(parent, AccessShareLock);
/*
- * Toast table is shared if and only if its parent is.
- *
- * We cannot allow toasting a shared relation after initdb (because
- * there's no way to mark it toasted in other databases' pg_class).
- * Unfortunately we can't distinguish initdb from a manually started
- * standalone backend (toasting happens after the bootstrap phase, so
- * checking IsBootstrapProcessingMode() won't work). However, we can at
- * least prevent this mistake under normal multi-user operation.
- */
- shared_relation = rel->rd_rel->relisshared;
- if (shared_relation && IsUnderPostmaster)
+ * Must be owner of both parent and child -- child was checked by
+ * ATSimplePermissions call in ATPrepCmd
+ */
+ ATSimplePermissions(parent_rel, false);
+
+ /* Permanent rels cannot inherit from temporary ones */
+ if (!isTempNamespace(RelationGetNamespace(child_rel)) &&
+ isTempNamespace(RelationGetNamespace(parent_rel)))
ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("shared tables cannot be toasted after initdb")));
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot inherit from temporary relation \"%s\"",
+ RelationGetRelationName(parent_rel))));
/*
- * Is it already toasted?
+ * Check for duplicates in the list of parents, and determine the highest
+ * inhseqno already present; we'll use the next one for the new parent.
+ * (Note: get RowExclusiveLock because we will write pg_inherits below.)
+ *
+ * Note: we do not reject the case where the child already inherits from
+ * the parent indirectly; CREATE TABLE doesn't reject comparable cases.
*/
- if (rel->rd_rel->reltoastrelid != InvalidOid)
+ catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock);
+ ScanKeyInit(&key,
+ Anum_pg_inherits_inhrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(RelationGetRelid(child_rel)));
+ scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId,
+ true, SnapshotNow, 1, &key);
+
+ /* inhseqno sequences start at 1 */
+ inhseqno = 0;
+ while (HeapTupleIsValid(inheritsTuple = systable_getnext(scan)))
{
- if (silent)
- {
- heap_close(rel, NoLock);
- return;
- }
+ Form_pg_inherits inh = (Form_pg_inherits) GETSTRUCT(inheritsTuple);
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("table \"%s\" already has a TOAST table",
- RelationGetRelationName(rel))));
+ if (inh->inhparent == RelationGetRelid(parent_rel))
+ ereport(ERROR,
+ (errcode(ERRCODE_DUPLICATE_TABLE),
+ errmsg("relation \"%s\" would be inherited from more than once",
+ RelationGetRelationName(parent_rel))));
+ if (inh->inhseqno > inhseqno)
+ inhseqno = inh->inhseqno;
}
+ systable_endscan(scan);
/*
- * Check to see whether the table actually needs a TOAST table.
+ * Prevent circularity by seeing if proposed parent inherits from child.
+ * (In particular, this disallows making a rel inherit from itself.)
+ *
+ * This is not completely bulletproof because of race conditions: in
+ * multi-level inheritance trees, someone else could concurrently be
+ * making another inheritance link that closes the loop but does not join
+ * either of the rels we have locked. Preventing that seems to require
+ * exclusive locks on the entire inheritance tree, which is a cure worse
+ * than the disease. find_all_inheritors() will cope with circularity
+ * anyway, so don't sweat it too much.
*/
- if (!needs_toast_table(rel))
- {
- if (silent)
- {
- heap_close(rel, NoLock);
- return;
- }
+ children = find_all_inheritors(RelationGetRelid(child_rel));
+ if (list_member_oid(children, RelationGetRelid(parent_rel)))
ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("table \"%s\" does not need a TOAST table",
- RelationGetRelationName(rel))));
- }
+ (errcode(ERRCODE_DUPLICATE_TABLE),
+ errmsg("circular inheritance not allowed"),
+ errdetail("\"%s\" is already a child of \"%s\".",
+ parent->relname,
+ RelationGetRelationName(child_rel))));
- /*
- * Create the toast table and its index
- */
- snprintf(toast_relname, sizeof(toast_relname),
- "pg_toast_%u", relOid);
- snprintf(toast_idxname, sizeof(toast_idxname),
- "pg_toast_%u_index", relOid);
-
- /* this is pretty painful... need a tuple descriptor */
- tupdesc = CreateTemplateTupleDesc(3, false);
- TupleDescInitEntry(tupdesc, (AttrNumber) 1,
- "chunk_id",
- OIDOID,
- -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 2,
- "chunk_seq",
- INT4OID,
- -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 3,
- "chunk_data",
- BYTEAOID,
- -1, 0);
-
- /*
- * Ensure that the toast table doesn't itself get toasted, or we'll be
- * toast :-(. This is essential for chunk_data because type bytea is
- * toastable; hit the other two just to be sure.
- */
- tupdesc->attrs[0]->attstorage = 'p';
- tupdesc->attrs[1]->attstorage = 'p';
- tupdesc->attrs[2]->attstorage = 'p';
-
- /*
- * Note: the toast relation is placed in the regular pg_toast namespace
- * even if its master relation is a temp table. There cannot be any
- * naming collision, and the toast rel will be destroyed when its master
- * is, so there's no need to handle the toast rel as temp.
- */
- toast_relid = heap_create_with_catalog(toast_relname,
- PG_TOAST_NAMESPACE,
- rel->rd_rel->reltablespace,
- InvalidOid,
- rel->rd_rel->relowner,
- tupdesc,
- RELKIND_TOASTVALUE,
- shared_relation,
- true,
- 0,
- ONCOMMIT_NOOP,
- true);
-
- /* make the toast relation visible, else index creation will fail */
- CommandCounterIncrement();
+ /* If parent has OIDs then child must have OIDs */
+ if (parent_rel->rd_rel->relhasoids && !child_rel->rd_rel->relhasoids)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("table \"%s\" without OIDs cannot inherit from table \"%s\" with OIDs",
+ RelationGetRelationName(child_rel),
+ RelationGetRelationName(parent_rel))));
+
+ /* Match up the columns and bump attinhcount as needed */
+ MergeAttributesIntoExisting(child_rel, parent_rel);
+
+ /* Match up the constraints and bump coninhcount as needed */
+ MergeConstraintsIntoExisting(child_rel, parent_rel);
/*
- * Create unique index on chunk_id, chunk_seq.
- *
- * NOTE: the normal TOAST access routines could actually function with a
- * single-column index on chunk_id only. However, the slice access
- * routines use both columns for faster access to an individual chunk. In
- * addition, we want it to be unique as a check against the possibility of
- * duplicate TOAST chunk OIDs. The index might also be a little more
- * efficient this way, since btree isn't all that happy with large numbers
- * of equal keys.
+ * OK, it looks valid. Make the catalog entries that show inheritance.
*/
+ StoreCatalogInheritance1(RelationGetRelid(child_rel),
+ RelationGetRelid(parent_rel),
+ inhseqno + 1,
+ catalogRelation);
+
+ /* Now we're done with pg_inherits */
+ heap_close(catalogRelation, RowExclusiveLock);
+
+ /* keep our lock on the parent relation until commit */
+ heap_close(parent_rel, NoLock);
+}
+
+/*
+ * Obtain the source-text form of the constraint expression for a check
+ * constraint, given its pg_constraint tuple
+ */
+static char *
+decompile_conbin(HeapTuple contup, TupleDesc tupdesc)
+{
+ Form_pg_constraint con;
+ bool isnull;
+ Datum attr;
+ Datum expr;
+
+ con = (Form_pg_constraint) GETSTRUCT(contup);
+ attr = heap_getattr(contup, Anum_pg_constraint_conbin, tupdesc, &isnull);
+ if (isnull)
+ elog(ERROR, "null conbin for constraint %u", HeapTupleGetOid(contup));
+
+ expr = DirectFunctionCall2(pg_get_expr, attr,
+ ObjectIdGetDatum(con->conrelid));
+ return TextDatumGetCString(expr);
+}
+
+/*
+ * Determine whether two check constraints are functionally equivalent
+ *
+ * The test we apply is to see whether they reverse-compile to the same
+ * source string. This insulates us from issues like whether attributes
+ * have the same physical column numbers in parent and child relations.
+ */
+static bool
+constraints_equivalent(HeapTuple a, HeapTuple b, TupleDesc tupleDesc)
+{
+ Form_pg_constraint acon = (Form_pg_constraint) GETSTRUCT(a);
+ Form_pg_constraint bcon = (Form_pg_constraint) GETSTRUCT(b);
+
+ if (acon->condeferrable != bcon->condeferrable ||
+ acon->condeferred != bcon->condeferred ||
+ strcmp(decompile_conbin(a, tupleDesc),
+ decompile_conbin(b, tupleDesc)) != 0)
+ return false;
+ else
+ return true;
+}
+
+/*
+ * Check columns in child table match up with columns in parent, and increment
+ * their attinhcount.
+ *
+ * Called by ATExecAddInherit
+ *
+ * Currently all parent columns must be found in child. Missing columns are an
+ * error. One day we might consider creating new columns like CREATE TABLE
+ * does. However, that is widely unpopular --- in the common use case of
+ * partitioned tables it's a foot-gun.
+ *
+ * The data type must match exactly. If the parent column is NOT NULL then
+ * the child must be as well. Defaults are not compared, however.
+ */
+static void
+MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel)
+{
+ Relation attrrel;
+ AttrNumber parent_attno;
+ int parent_natts;
+ TupleDesc tupleDesc;
+ TupleConstr *constr;
+ HeapTuple tuple;
+
+ attrrel = heap_open(AttributeRelationId, RowExclusiveLock);
+
+ tupleDesc = RelationGetDescr(parent_rel);
+ parent_natts = tupleDesc->natts;
+ constr = tupleDesc->constr;
+
+ for (parent_attno = 1; parent_attno <= parent_natts; parent_attno++)
+ {
+ Form_pg_attribute attribute = tupleDesc->attrs[parent_attno - 1];
+ char *attributeName = NameStr(attribute->attname);
+
+ /* Ignore dropped columns in the parent. */
+ if (attribute->attisdropped)
+ continue;
+
+ /* Find same column in child (matching on column name). */
+ tuple = SearchSysCacheCopyAttName(RelationGetRelid(child_rel),
+ attributeName);
+ if (HeapTupleIsValid(tuple))
+ {
+ /* Check they are same type and typmod */
+ Form_pg_attribute childatt = (Form_pg_attribute) GETSTRUCT(tuple);
+
+ if (attribute->atttypid != childatt->atttypid ||
+ attribute->atttypmod != childatt->atttypmod)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("child table \"%s\" has different type for column \"%s\"",
+ RelationGetRelationName(child_rel),
+ attributeName)));
+
+ if (attribute->attnotnull && !childatt->attnotnull)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("column \"%s\" in child table must be marked NOT NULL",
+ attributeName)));
+
+ /*
+ * OK, bump the child column's inheritance count. (If we fail
+ * later on, this change will just roll back.)
+ */
+ childatt->attinhcount++;
+ simple_heap_update(attrrel, &tuple->t_self, tuple);
+ CatalogUpdateIndexes(attrrel, tuple);
+ heap_freetuple(tuple);
+ }
+ else
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("child table is missing column \"%s\"",
+ attributeName)));
+ }
+ }
+
+ heap_close(attrrel, RowExclusiveLock);
+}
+
+/*
+ * Check constraints in child table match up with constraints in parent,
+ * and increment their coninhcount.
+ *
+ * Called by ATExecAddInherit
+ *
+ * Currently all constraints in parent must be present in the child. One day we
+ * may consider adding new constraints like CREATE TABLE does. We may also want
+ * to allow an optional flag on parent table constraints indicating they are
+ * intended to ONLY apply to the master table, not to the children. That would
+ * make it possible to ensure no records are mistakenly inserted into the
+ * master in partitioned tables rather than the appropriate child.
+ *
+ * XXX This is O(N^2) which may be an issue with tables with hundreds of
+ * constraints. As long as tables have more like 10 constraints it shouldn't be
+ * a problem though. Even 100 constraints ought not be the end of the world.
+ */
+static void
+MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel)
+{
+ Relation catalog_relation;
+ TupleDesc tuple_desc;
+ SysScanDesc parent_scan;
+ ScanKeyData parent_key;
+ HeapTuple parent_tuple;
+
+ catalog_relation = heap_open(ConstraintRelationId, RowExclusiveLock);
+ tuple_desc = RelationGetDescr(catalog_relation);
+
+ /* Outer loop scans through the parent's constraint definitions */
+ ScanKeyInit(&parent_key,
+ Anum_pg_constraint_conrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(RelationGetRelid(parent_rel)));
+ parent_scan = systable_beginscan(catalog_relation, ConstraintRelidIndexId,
+ true, SnapshotNow, 1, &parent_key);
- indexInfo = makeNode(IndexInfo);
- indexInfo->ii_NumIndexAttrs = 2;
- indexInfo->ii_KeyAttrNumbers[0] = 1;
- indexInfo->ii_KeyAttrNumbers[1] = 2;
- indexInfo->ii_Expressions = NIL;
- indexInfo->ii_ExpressionsState = NIL;
- indexInfo->ii_Predicate = NIL;
- indexInfo->ii_PredicateState = NIL;
- indexInfo->ii_Unique = true;
+ while (HeapTupleIsValid(parent_tuple = systable_getnext(parent_scan)))
+ {
+ Form_pg_constraint parent_con = (Form_pg_constraint) GETSTRUCT(parent_tuple);
+ SysScanDesc child_scan;
+ ScanKeyData child_key;
+ HeapTuple child_tuple;
+ bool found = false;
- classObjectId[0] = OID_BTREE_OPS_OID;
- classObjectId[1] = INT4_BTREE_OPS_OID;
+ if (parent_con->contype != CONSTRAINT_CHECK)
+ continue;
- toast_idxid = index_create(toast_relid, toast_idxname, InvalidOid,
- indexInfo,
- BTREE_AM_OID,
- rel->rd_rel->reltablespace,
- classObjectId,
- true, false, true, false);
+ /* Search for a child constraint matching this one */
+ ScanKeyInit(&child_key,
+ Anum_pg_constraint_conrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(RelationGetRelid(child_rel)));
+ child_scan = systable_beginscan(catalog_relation, ConstraintRelidIndexId,
+ true, SnapshotNow, 1, &child_key);
+
+ while (HeapTupleIsValid(child_tuple = systable_getnext(child_scan)))
+ {
+ Form_pg_constraint child_con = (Form_pg_constraint) GETSTRUCT(child_tuple);
+ HeapTuple child_copy;
+
+ if (child_con->contype != CONSTRAINT_CHECK)
+ continue;
+
+ if (strcmp(NameStr(parent_con->conname),
+ NameStr(child_con->conname)) != 0)
+ continue;
+
+ if (!constraints_equivalent(parent_tuple, child_tuple, tuple_desc))
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("child table \"%s\" has different definition for check constraint \"%s\"",
+ RelationGetRelationName(child_rel),
+ NameStr(parent_con->conname))));
+
+ /*
+ * OK, bump the child constraint's inheritance count. (If we fail
+ * later on, this change will just roll back.)
+ */
+ child_copy = heap_copytuple(child_tuple);
+ child_con = (Form_pg_constraint) GETSTRUCT(child_copy);
+ child_con->coninhcount++;
+ simple_heap_update(catalog_relation, &child_copy->t_self, child_copy);
+ CatalogUpdateIndexes(catalog_relation, child_copy);
+ heap_freetuple(child_copy);
+
+ found = true;
+ break;
+ }
+
+ systable_endscan(child_scan);
+
+ if (!found)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("child table is missing constraint \"%s\"",
+ NameStr(parent_con->conname))));
+ }
+
+ systable_endscan(parent_scan);
+ heap_close(catalog_relation, RowExclusiveLock);
+}
+
+/*
+ * ALTER TABLE NO INHERIT
+ *
+ * Drop a parent from the child's parents. This just adjusts the attinhcount
+ * and attislocal of the columns and removes the pg_inherit and pg_depend
+ * entries.
+ *
+ * If attinhcount goes to 0 then attislocal gets set to true. If it goes back
+ * up attislocal stays true, which means if a child is ever removed from a
+ * parent then its columns will never be automatically dropped which may
+ * surprise. But at least we'll never surprise by dropping columns someone
+ * isn't expecting to be dropped which would actually mean data loss.
+ *
+ * coninhcount and conislocal for inherited constraints are adjusted in
+ * exactly the same way.
+ */
+static void
+ATExecDropInherit(Relation rel, RangeVar *parent)
+{
+ Relation parent_rel;
+ Relation catalogRelation;
+ SysScanDesc scan;
+ ScanKeyData key[3];
+ HeapTuple inheritsTuple,
+ attributeTuple,
+ constraintTuple,
+ depTuple;
+ List *connames;
+ bool found = false;
/*
- * Update toast rel's pg_class entry to show that it has an index. The
- * index OID is stored into the reltoastidxid field for easy access by the
- * tuple toaster.
+ * AccessShareLock on the parent is probably enough, seeing that DROP
+ * TABLE doesn't lock parent tables at all. We need some lock since we'll
+ * be inspecting the parent's schema.
*/
- setRelhasindex(toast_relid, true, true, toast_idxid);
+ parent_rel = heap_openrv(parent, AccessShareLock);
/*
- * Store the toast table's OID in the parent relation's pg_class row
+ * We don't bother to check ownership of the parent table --- ownership of
+ * the child is presumed enough rights.
*/
- class_rel = heap_open(RelationRelationId, RowExclusiveLock);
-
- reltup = SearchSysCacheCopy(RELOID,
- ObjectIdGetDatum(relOid),
- 0, 0, 0);
- if (!HeapTupleIsValid(reltup))
- elog(ERROR, "cache lookup failed for relation %u", relOid);
- ((Form_pg_class) GETSTRUCT(reltup))->reltoastrelid = toast_relid;
+ /*
+ * Find and destroy the pg_inherits entry linking the two, or error out if
+ * there is none.
+ */
+ catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock);
+ ScanKeyInit(&key[0],
+ Anum_pg_inherits_inhrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(RelationGetRelid(rel)));
+ scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId,
+ true, SnapshotNow, 1, key);
- simple_heap_update(class_rel, &reltup->t_self, reltup);
+ while (HeapTupleIsValid(inheritsTuple = systable_getnext(scan)))
+ {
+ Oid inhparent;
- /* Keep catalog indexes current */
- CatalogUpdateIndexes(class_rel, reltup);
+ inhparent = ((Form_pg_inherits) GETSTRUCT(inheritsTuple))->inhparent;
+ if (inhparent == RelationGetRelid(parent_rel))
+ {
+ simple_heap_delete(catalogRelation, &inheritsTuple->t_self);
+ found = true;
+ break;
+ }
+ }
- heap_freetuple(reltup);
+ systable_endscan(scan);
+ heap_close(catalogRelation, RowExclusiveLock);
- heap_close(class_rel, RowExclusiveLock);
+ if (!found)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_TABLE),
+ errmsg("relation \"%s\" is not a parent of relation \"%s\"",
+ RelationGetRelationName(parent_rel),
+ RelationGetRelationName(rel))));
/*
- * Register dependency from the toast table to the master, so that the
- * toast table will be deleted if the master is.
+ * Search through child columns looking for ones matching parent rel
*/
- baseobject.classId = RelationRelationId;
- baseobject.objectId = relOid;
- baseobject.objectSubId = 0;
- toastobject.classId = RelationRelationId;
- toastobject.objectId = toast_relid;
- toastobject.objectSubId = 0;
+ catalogRelation = heap_open(AttributeRelationId, RowExclusiveLock);
+ ScanKeyInit(&key[0],
+ Anum_pg_attribute_attrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(RelationGetRelid(rel)));
+ scan = systable_beginscan(catalogRelation, AttributeRelidNumIndexId,
+ true, SnapshotNow, 1, key);
+ while (HeapTupleIsValid(attributeTuple = systable_getnext(scan)))
+ {
+ Form_pg_attribute att = (Form_pg_attribute) GETSTRUCT(attributeTuple);
+
+ /* Ignore if dropped or not inherited */
+ if (att->attisdropped)
+ continue;
+ if (att->attinhcount <= 0)
+ continue;
+
+ if (SearchSysCacheExistsAttName(RelationGetRelid(parent_rel),
+ NameStr(att->attname)))
+ {
+ /* Decrement inhcount and possibly set islocal to true */
+ HeapTuple copyTuple = heap_copytuple(attributeTuple);
+ Form_pg_attribute copy_att = (Form_pg_attribute) GETSTRUCT(copyTuple);
- recordDependencyOn(&toastobject, &baseobject, DEPENDENCY_INTERNAL);
+ copy_att->attinhcount--;
+ if (copy_att->attinhcount == 0)
+ copy_att->attislocal = true;
+
+ simple_heap_update(catalogRelation, ©Tuple->t_self, copyTuple);
+ CatalogUpdateIndexes(catalogRelation, copyTuple);
+ heap_freetuple(copyTuple);
+ }
+ }
+ systable_endscan(scan);
+ heap_close(catalogRelation, RowExclusiveLock);
/*
- * Clean up and make changes visible
+ * Likewise, find inherited check constraints and disinherit them.
+ * To do this, we first need a list of the names of the parent's check
+ * constraints. (We cheat a bit by only checking for name matches,
+ * assuming that the expressions will match.)
*/
- heap_close(rel, NoLock);
+ catalogRelation = heap_open(ConstraintRelationId, RowExclusiveLock);
+ ScanKeyInit(&key[0],
+ Anum_pg_constraint_conrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(RelationGetRelid(parent_rel)));
+ scan = systable_beginscan(catalogRelation, ConstraintRelidIndexId,
+ true, SnapshotNow, 1, key);
- CommandCounterIncrement();
-}
+ connames = NIL;
-/*
- * Check to see whether the table needs a TOAST table. It does only if
- * (1) there are any toastable attributes, and (2) the maximum length
- * of a tuple could exceed TOAST_TUPLE_THRESHOLD. (We don't want to
- * create a toast table for something like "f1 varchar(20)".)
- */
-static bool
-needs_toast_table(Relation rel)
-{
- int32 data_length = 0;
- bool maxlength_unknown = false;
- bool has_toastable_attrs = false;
- TupleDesc tupdesc;
- Form_pg_attribute *att;
- int32 tuple_length;
- int i;
+ while (HeapTupleIsValid(constraintTuple = systable_getnext(scan)))
+ {
+ Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(constraintTuple);
+
+ if (con->contype == CONSTRAINT_CHECK)
+ connames = lappend(connames, pstrdup(NameStr(con->conname)));
+ }
- tupdesc = rel->rd_att;
- att = tupdesc->attrs;
+ systable_endscan(scan);
+
+ /* Now scan the child's constraints */
+ ScanKeyInit(&key[0],
+ Anum_pg_constraint_conrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(RelationGetRelid(rel)));
+ scan = systable_beginscan(catalogRelation, ConstraintRelidIndexId,
+ true, SnapshotNow, 1, key);
- for (i = 0; i < tupdesc->natts; i++)
+ while (HeapTupleIsValid(constraintTuple = systable_getnext(scan)))
{
- if (att[i]->attisdropped)
+ Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(constraintTuple);
+ bool match;
+ ListCell *lc;
+
+ if (con->contype != CONSTRAINT_CHECK)
continue;
- data_length = att_align(data_length, att[i]->attalign);
- if (att[i]->attlen > 0)
+
+ match = false;
+ foreach (lc, connames)
{
- /* Fixed-length types are never toastable */
- data_length += att[i]->attlen;
+ if (strcmp(NameStr(con->conname), (char *) lfirst(lc)) == 0)
+ {
+ match = true;
+ break;
+ }
}
- else
- {
- int32 maxlen = type_maximum_size(att[i]->atttypid,
- att[i]->atttypmod);
- if (maxlen < 0)
- maxlength_unknown = true;
- else
- data_length += maxlen;
- if (att[i]->attstorage != 'p')
- has_toastable_attrs = true;
+ if (match)
+ {
+ /* Decrement inhcount and possibly set islocal to true */
+ HeapTuple copyTuple = heap_copytuple(constraintTuple);
+ Form_pg_constraint copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple);
+ if (copy_con->coninhcount <= 0) /* shouldn't happen */
+ elog(ERROR, "relation %u has non-inherited constraint \"%s\"",
+ RelationGetRelid(rel), NameStr(copy_con->conname));
+
+ copy_con->coninhcount--;
+ if (copy_con->coninhcount == 0)
+ copy_con->conislocal = true;
+
+ simple_heap_update(catalogRelation, ©Tuple->t_self, copyTuple);
+ CatalogUpdateIndexes(catalogRelation, copyTuple);
+ heap_freetuple(copyTuple);
}
}
- if (!has_toastable_attrs)
- return false; /* nothing to toast? */
- if (maxlength_unknown)
- return true; /* any unlimited-length attrs? */
- tuple_length = MAXALIGN(offsetof(HeapTupleHeaderData, t_bits) +
- BITMAPLEN(tupdesc->natts)) +
- MAXALIGN(data_length);
- return (tuple_length > TOAST_TUPLE_THRESHOLD);
+
+ systable_endscan(scan);
+ heap_close(catalogRelation, RowExclusiveLock);
+
+ /*
+ * Drop the dependency
+ *
+ * There's no convenient way to do this, so go trawling through pg_depend
+ */
+ catalogRelation = heap_open(DependRelationId, RowExclusiveLock);
+
+ ScanKeyInit(&key[0],
+ Anum_pg_depend_classid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(RelationRelationId));
+ ScanKeyInit(&key[1],
+ Anum_pg_depend_objid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(RelationGetRelid(rel)));
+ ScanKeyInit(&key[2],
+ Anum_pg_depend_objsubid,
+ BTEqualStrategyNumber, F_INT4EQ,
+ Int32GetDatum(0));
+
+ scan = systable_beginscan(catalogRelation, DependDependerIndexId, true,
+ SnapshotNow, 3, key);
+
+ while (HeapTupleIsValid(depTuple = systable_getnext(scan)))
+ {
+ Form_pg_depend dep = (Form_pg_depend) GETSTRUCT(depTuple);
+
+ if (dep->refclassid == RelationRelationId &&
+ dep->refobjid == RelationGetRelid(parent_rel) &&
+ dep->refobjsubid == 0 &&
+ dep->deptype == DEPENDENCY_NORMAL)
+ simple_heap_delete(catalogRelation, &depTuple->t_self);
+ }
+
+ systable_endscan(scan);
+ heap_close(catalogRelation, RowExclusiveLock);
+
+ /* keep our lock on the parent relation until commit */
+ heap_close(parent_rel, NoLock);
}
Oid nspOid;
Relation classRel;
- rel = heap_openrv(relation, AccessExclusiveLock);
-
- /* heap_openrv allows TOAST, but we don't want to */
- if (rel->rd_rel->relkind == RELKIND_TOASTVALUE)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is a TOAST relation",
- RelationGetRelationName(rel))));
+ rel = relation_openrv(relation, AccessExclusiveLock);
relid = RelationGetRelid(rel);
oldNspOid = RelationGetNamespace(rel);
+ /* Can we change the schema of this tuple? */
+ switch (rel->rd_rel->relkind)
+ {
+ case RELKIND_RELATION:
+ case RELKIND_VIEW:
+ /* ok to change schema */
+ break;
+ case RELKIND_SEQUENCE:
+ {
+ /* if it's an owned sequence, disallow moving it by itself */
+ Oid tableId;
+ int32 colId;
+
+ if (sequenceIsOwned(relid, &tableId, &colId))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot move an owned sequence into another schema"),
+ errdetail("Sequence \"%s\" is linked to table \"%s\".",
+ RelationGetRelationName(rel),
+ get_rel_name(tableId))));
+ }
+ break;
+ case RELKIND_COMPOSITE_TYPE:
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is a composite type",
+ RelationGetRelationName(rel)),
+ errhint("Use ALTER TYPE instead.")));
+ break;
+ case RELKIND_INDEX:
+ case RELKIND_TOASTVALUE:
+ /* FALL THRU */
+ default:
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is not a table, view, or sequence",
+ RelationGetRelationName(rel))));
+ }
+
/* get schema OID and check its permissions */
nspOid = LookupCreationNamespace(newschema);
AlterRelationNamespaceInternal(classRel, relid, oldNspOid, nspOid, true);
/* Fix the table's rowtype too */
- AlterTypeNamespaceInternal(rel->rd_rel->reltype, nspOid, false);
+ AlterTypeNamespaceInternal(rel->rd_rel->reltype, nspOid, false, false);
/* Fix other dependent stuff */
if (rel->rd_rel->relkind == RELKIND_RELATION)
HeapTuple tup;
/*
- * SERIAL sequences are those having an internal dependency on one of the
+ * SERIAL sequences are those having an auto dependency on one of the
* table's columns (we don't care *which* column, exactly).
*/
depRel = heap_open(DependRelationId, AccessShareLock);
Form_pg_depend depForm = (Form_pg_depend) GETSTRUCT(tup);
Relation seqRel;
- /* skip dependencies other than internal dependencies on columns */
+ /* skip dependencies other than auto dependencies on columns */
if (depForm->refobjsubid == 0 ||
depForm->classid != RelationRelationId ||
depForm->objsubid != 0 ||
- depForm->deptype != DEPENDENCY_INTERNAL)
+ depForm->deptype != DEPENDENCY_AUTO)
continue;
/* Use relation_open just in case it's an index */
* them to the new namespace, too.
*/
AlterTypeNamespaceInternal(RelationGetForm(seqRel)->reltype,
- newNspOid, false);
+ newNspOid, false, false);
/* Now we can close it. Keep the lock till end of transaction. */
relation_close(seqRel, NoLock);