From 68ef051f5cf16f82a5368067a40ffba3c340b0d3 Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Mon, 25 Apr 2011 16:55:11 -0400 Subject: [PATCH] Refactor broken CREATE TABLE IF NOT EXISTS support. Per bug #5988, reported by Marko Tiikkaja, and further analyzed by Tom Lane, the previous coding was broken in several respects: even if the target table already existed, a subsequent CREATE TABLE IF NOT EXISTS might try to add additional constraints or sequences-for-serial specified in the new CREATE TABLE statement. In passing, this also fixes a minor information leak: it's no longer possible to figure out whether a schema to which you don't have CREATE access contains a sequence named like "x_y_seq" by attempting to create a table in that schema called "x" with a serial column called "y". Some more refactoring of this code in the future might be warranted, but that will need to wait for a later major release. --- src/backend/bootstrap/bootparse.y | 3 +-- src/backend/catalog/heap.c | 19 +++---------------- src/backend/catalog/namespace.c | 29 +++++++++++++++++++++++++++++ src/backend/catalog/toasting.c | 3 +-- src/backend/commands/cluster.c | 3 +-- src/backend/commands/tablecmds.c | 29 ++++------------------------- src/backend/executor/execMain.c | 12 ++---------- src/backend/parser/parse_utilcmd.c | 32 ++++++++++++++++++++++++++++---- src/backend/tcop/utility.c | 18 ++---------------- src/include/catalog/heap.h | 3 +-- src/include/catalog/namespace.h | 1 + 11 files changed, 73 insertions(+), 79 deletions(-) diff --git a/src/backend/bootstrap/bootparse.y b/src/backend/bootstrap/bootparse.y index 75a137bbd7..a9d2428698 100644 --- a/src/backend/bootstrap/bootparse.y +++ b/src/backend/bootstrap/bootparse.y @@ -247,8 +247,7 @@ Boot_CreateStmt: ONCOMMIT_NOOP, (Datum) 0, false, - true, - false); + true); elog(DEBUG4, "relation created with oid %u", id); } do_end(); diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c index 4c08967703..71c9931834 100644 --- a/src/backend/catalog/heap.c +++ b/src/backend/catalog/heap.c @@ -973,8 +973,7 @@ heap_create_with_catalog(const char *relname, OnCommitAction oncommit, Datum reloptions, bool use_user_acl, - bool allow_system_table_mods, - bool if_not_exists) + bool allow_system_table_mods) { Relation pg_class_desc; Relation new_rel_desc; @@ -994,26 +993,14 @@ heap_create_with_catalog(const char *relname, CheckAttributeNamesTypes(tupdesc, relkind, allow_system_table_mods); /* - * If the relation already exists, it's an error, unless the user - * specifies "IF NOT EXISTS". In that case, we just print a notice and do - * nothing further. + * This would fail later on anyway, if the relation already exists. But + * by catching it here we can emit a nicer error message. */ existing_relid = get_relname_relid(relname, relnamespace); if (existing_relid != InvalidOid) - { - if (if_not_exists) - { - ereport(NOTICE, - (errcode(ERRCODE_DUPLICATE_TABLE), - errmsg("relation \"%s\" already exists, skipping", - relname))); - heap_close(pg_class_desc, RowExclusiveLock); - return InvalidOid; - } ereport(ERROR, (errcode(ERRCODE_DUPLICATE_TABLE), errmsg("relation \"%s\" already exists", relname))); - } /* * Since we are going to create a rowtype as well, also check for diff --git a/src/backend/catalog/namespace.c b/src/backend/catalog/namespace.c index f8fd827693..d803d28a06 100644 --- a/src/backend/catalog/namespace.c +++ b/src/backend/catalog/namespace.c @@ -361,6 +361,35 @@ RangeVarGetCreationNamespace(const RangeVar *newRelation) } /* + * RangeVarGetAndCheckCreationNamespace + * As RangeVarGetCreationNamespace, but with a permissions check. + */ +Oid +RangeVarGetAndCheckCreationNamespace(const RangeVar *newRelation) +{ + Oid namespaceId; + + namespaceId = RangeVarGetCreationNamespace(newRelation); + + /* + * Check we have permission to create there. Skip check if bootstrapping, + * since permissions machinery may not be working yet. + */ + if (!IsBootstrapProcessingMode()) + { + AclResult aclresult; + + aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(), + ACL_CREATE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, ACL_KIND_NAMESPACE, + get_namespace_name(namespaceId)); + } + + return namespaceId; +} + +/* * RelnameGetRelid * Try to resolve an unqualified relation name. * Returns OID if relation found in search path, else InvalidOid. diff --git a/src/backend/catalog/toasting.c b/src/backend/catalog/toasting.c index 362d26d9d1..a8cf0dbe2f 100644 --- a/src/backend/catalog/toasting.c +++ b/src/backend/catalog/toasting.c @@ -227,8 +227,7 @@ create_toast_table(Relation rel, Oid toastOid, Oid toastIndexOid, Datum reloptio ONCOMMIT_NOOP, reloptions, false, - true, - false); + true); Assert(toast_relid != InvalidOid); /* make the toast relation visible, else heap_open will fail */ diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c index ff228b7d53..191ef543cd 100644 --- a/src/backend/commands/cluster.c +++ b/src/backend/commands/cluster.c @@ -646,8 +646,7 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace) ONCOMMIT_NOOP, reloptions, false, - true, - false); + true); Assert(OIDNewHeap != InvalidOid); ReleaseSysCache(tuple); diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 7660114ec2..60eecb1497 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -439,22 +439,10 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId) errmsg("cannot create temporary table within security-restricted operation"))); /* - * Look up the namespace in which we are supposed to create the relation. - * Check we have permission to create there. Skip check if bootstrapping, - * since permissions machinery may not be working yet. + * Look up the namespace in which we are supposed to create the relation, + * and check we have permission to create there. */ - namespaceId = RangeVarGetCreationNamespace(stmt->relation); - - if (!IsBootstrapProcessingMode()) - { - AclResult aclresult; - - aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(), - ACL_CREATE); - if (aclresult != ACLCHECK_OK) - aclcheck_error(aclresult, ACL_KIND_NAMESPACE, - get_namespace_name(namespaceId)); - } + namespaceId = RangeVarGetAndCheckCreationNamespace(stmt->relation); /* * Select tablespace to use. If not specified, use default tablespace @@ -602,16 +590,7 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId) stmt->oncommit, reloptions, true, - allowSystemTableMods, - stmt->if_not_exists); - - /* - * If heap_create_with_catalog returns InvalidOid, it means that the user - * specified "IF NOT EXISTS" and the relation already exists. In that - * case we do nothing further. - */ - if (relationId == InvalidOid) - return InvalidOid; + allowSystemTableMods); /* Store inheritance information for new rel. */ StoreCatalogInheritance(relationId, inheritOids); diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 86ec987019..620efda838 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -2341,7 +2341,6 @@ OpenIntoRel(QueryDesc *queryDesc) Oid namespaceId; Oid tablespaceId; Datum reloptions; - AclResult aclresult; Oid intoRelationId; TupleDesc tupdesc; DR_intorel *myState; @@ -2378,13 +2377,7 @@ OpenIntoRel(QueryDesc *queryDesc) * Find namespace to create in, check its permissions */ intoName = into->rel->relname; - namespaceId = RangeVarGetCreationNamespace(into->rel); - - aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(), - ACL_CREATE); - if (aclresult != ACLCHECK_OK) - aclcheck_error(aclresult, ACL_KIND_NAMESPACE, - get_namespace_name(namespaceId)); + namespaceId = RangeVarGetAndCheckCreationNamespace(into->rel); /* * Select tablespace to use. If not specified, use default tablespace @@ -2444,8 +2437,7 @@ OpenIntoRel(QueryDesc *queryDesc) into->onCommit, reloptions, true, - allowSystemTableMods, - false); + allowSystemTableMods); Assert(intoRelationId != InvalidOid); FreeTupleDesc(tupdesc); diff --git a/src/backend/parser/parse_utilcmd.c b/src/backend/parser/parse_utilcmd.c index 0078814905..5588cfac0b 100644 --- a/src/backend/parser/parse_utilcmd.c +++ b/src/backend/parser/parse_utilcmd.c @@ -148,6 +148,7 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString) List *result; List *save_alist; ListCell *elements; + Oid namespaceid; /* * We must not scribble on the passed-in CreateStmt, so copy it. (This is @@ -156,6 +157,33 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString) stmt = (CreateStmt *) copyObject(stmt); /* + * Look up the creation namespace. This also checks permissions on the + * target namespace, so that we throw any permissions error as early as + * possible. + */ + namespaceid = RangeVarGetAndCheckCreationNamespace(stmt->relation); + + /* + * If the relation already exists and the user specified "IF NOT EXISTS", + * bail out with a NOTICE. + */ + if (stmt->if_not_exists) + { + Oid existing_relid; + + existing_relid = get_relname_relid(stmt->relation->relname, + namespaceid); + if (existing_relid != InvalidOid) + { + ereport(NOTICE, + (errcode(ERRCODE_DUPLICATE_TABLE), + errmsg("relation \"%s\" already exists, skipping", + stmt->relation->relname))); + return NIL; + } + } + + /* * If the target relation name isn't schema-qualified, make it so. This * prevents some corner cases in which added-on rewritten commands might * think they should apply to other relations that have the same name and @@ -164,11 +192,7 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString) */ if (stmt->relation->schemaname == NULL && stmt->relation->relpersistence != RELPERSISTENCE_TEMP) - { - Oid namespaceid = RangeVarGetCreationNamespace(stmt->relation); - stmt->relation->schemaname = get_namespace_name(namespaceid); - } /* Set up pstate and CreateStmtContext */ pstate = make_parsestate(NULL); diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index b1fd5ec04f..0559998c71 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -530,13 +530,6 @@ standard_ProcessUtility(Node *parsetree, InvalidOid); /* - * If "IF NOT EXISTS" was specified and the relation - * already exists, do nothing further. - */ - if (relOid == InvalidOid) - continue; - - /* * Let AlterTableCreateToastTable decide if this one * needs a secondary relation too. */ @@ -559,15 +552,8 @@ standard_ProcessUtility(Node *parsetree, relOid = DefineRelation((CreateStmt *) stmt, RELKIND_FOREIGN_TABLE, InvalidOid); - - /* - * Unless "IF NOT EXISTS" was specified and the - * relation already exists, create the - * pg_foreign_table entry. - */ - if (relOid != InvalidOid) - CreateForeignTable((CreateForeignTableStmt *) stmt, - relOid); + CreateForeignTable((CreateForeignTableStmt *) stmt, + relOid); } else { diff --git a/src/include/catalog/heap.h b/src/include/catalog/heap.h index 463aff0358..c95e91303b 100644 --- a/src/include/catalog/heap.h +++ b/src/include/catalog/heap.h @@ -63,8 +63,7 @@ extern Oid heap_create_with_catalog(const char *relname, OnCommitAction oncommit, Datum reloptions, bool use_user_acl, - bool allow_system_table_mods, - bool if_not_exists); + bool allow_system_table_mods); extern void heap_drop_with_catalog(Oid relid); diff --git a/src/include/catalog/namespace.h b/src/include/catalog/namespace.h index f59beee80d..53600969ad 100644 --- a/src/include/catalog/namespace.h +++ b/src/include/catalog/namespace.h @@ -49,6 +49,7 @@ typedef struct OverrideSearchPath extern Oid RangeVarGetRelid(const RangeVar *relation, bool failOK); extern Oid RangeVarGetCreationNamespace(const RangeVar *newRelation); +extern Oid RangeVarGetAndCheckCreationNamespace(const RangeVar *newRelation); extern Oid RelnameGetRelid(const char *relname); extern bool RelationIsVisible(Oid relid); -- 2.11.0