OSDN Git Service

Standardize get_whatever_oid functions for object types with
[pg-rex/syncrep.git] / src / backend / commands / schemacmds.c
1 /*-------------------------------------------------------------------------
2  *
3  * schemacmds.c
4  *        schema creation/manipulation commands
5  *
6  * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/commands/schemacmds.c,v 1.58 2010/08/05 14:45:01 rhaas Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/heapam.h"
18 #include "access/xact.h"
19 #include "catalog/catalog.h"
20 #include "catalog/dependency.h"
21 #include "catalog/indexing.h"
22 #include "catalog/namespace.h"
23 #include "catalog/pg_namespace.h"
24 #include "commands/dbcommands.h"
25 #include "commands/schemacmds.h"
26 #include "miscadmin.h"
27 #include "parser/parse_utilcmd.h"
28 #include "tcop/utility.h"
29 #include "utils/acl.h"
30 #include "utils/builtins.h"
31 #include "utils/lsyscache.h"
32 #include "utils/syscache.h"
33
34
35 static void AlterSchemaOwner_internal(HeapTuple tup, Relation rel, Oid newOwnerId);
36
37 /*
38  * CREATE SCHEMA
39  */
40 void
41 CreateSchemaCommand(CreateSchemaStmt *stmt, const char *queryString)
42 {
43         const char *schemaName = stmt->schemaname;
44         const char *authId = stmt->authid;
45         Oid                     namespaceId;
46         OverrideSearchPath *overridePath;
47         List       *parsetree_list;
48         ListCell   *parsetree_item;
49         Oid                     owner_uid;
50         Oid                     saved_uid;
51         int                     save_sec_context;
52         AclResult       aclresult;
53
54         GetUserIdAndSecContext(&saved_uid, &save_sec_context);
55
56         /*
57          * Who is supposed to own the new schema?
58          */
59         if (authId)
60                 owner_uid = get_role_oid(authId, false);
61         else
62                 owner_uid = saved_uid;
63
64         /*
65          * To create a schema, must have schema-create privilege on the current
66          * database and must be able to become the target role (this does not
67          * imply that the target role itself must have create-schema privilege).
68          * The latter provision guards against "giveaway" attacks.      Note that a
69          * superuser will always have both of these privileges a fortiori.
70          */
71         aclresult = pg_database_aclcheck(MyDatabaseId, saved_uid, ACL_CREATE);
72         if (aclresult != ACLCHECK_OK)
73                 aclcheck_error(aclresult, ACL_KIND_DATABASE,
74                                            get_database_name(MyDatabaseId));
75
76         check_is_member_of_role(saved_uid, owner_uid);
77
78         /* Additional check to protect reserved schema names */
79         if (!allowSystemTableMods && IsReservedName(schemaName))
80                 ereport(ERROR,
81                                 (errcode(ERRCODE_RESERVED_NAME),
82                                  errmsg("unacceptable schema name \"%s\"", schemaName),
83                    errdetail("The prefix \"pg_\" is reserved for system schemas.")));
84
85         /*
86          * If the requested authorization is different from the current user,
87          * temporarily set the current user so that the object(s) will be created
88          * with the correct ownership.
89          *
90          * (The setting will be restored at the end of this routine, or in case of
91          * error, transaction abort will clean things up.)
92          */
93         if (saved_uid != owner_uid)
94                 SetUserIdAndSecContext(owner_uid,
95                                                         save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
96
97         /* Create the schema's namespace */
98         namespaceId = NamespaceCreate(schemaName, owner_uid);
99
100         /* Advance cmd counter to make the namespace visible */
101         CommandCounterIncrement();
102
103         /*
104          * Temporarily make the new namespace be the front of the search path, as
105          * well as the default creation target namespace.  This will be undone at
106          * the end of this routine, or upon error.
107          */
108         overridePath = GetOverrideSearchPath(CurrentMemoryContext);
109         overridePath->schemas = lcons_oid(namespaceId, overridePath->schemas);
110         /* XXX should we clear overridePath->useTemp? */
111         PushOverrideSearchPath(overridePath);
112
113         /*
114          * Examine the list of commands embedded in the CREATE SCHEMA command, and
115          * reorganize them into a sequentially executable order with no forward
116          * references.  Note that the result is still a list of raw parsetrees ---
117          * we cannot, in general, run parse analysis on one statement until we
118          * have actually executed the prior ones.
119          */
120         parsetree_list = transformCreateSchemaStmt(stmt);
121
122         /*
123          * Execute each command contained in the CREATE SCHEMA.  Since the grammar
124          * allows only utility commands in CREATE SCHEMA, there is no need to pass
125          * them through parse_analyze() or the rewriter; we can just hand them
126          * straight to ProcessUtility.
127          */
128         foreach(parsetree_item, parsetree_list)
129         {
130                 Node       *stmt = (Node *) lfirst(parsetree_item);
131
132                 /* do this step */
133                 ProcessUtility(stmt,
134                                            queryString,
135                                            NULL,
136                                            false,       /* not top level */
137                                            None_Receiver,
138                                            NULL);
139                 /* make sure later steps can see the object created here */
140                 CommandCounterIncrement();
141         }
142
143         /* Reset search path to normal state */
144         PopOverrideSearchPath();
145
146         /* Reset current user and security context */
147         SetUserIdAndSecContext(saved_uid, save_sec_context);
148 }
149
150
151 /*
152  *      RemoveSchemas
153  *              Implements DROP SCHEMA.
154  */
155 void
156 RemoveSchemas(DropStmt *drop)
157 {
158         ObjectAddresses *objects;
159         ListCell   *cell;
160
161         /*
162          * First we identify all the schemas, then we delete them in a single
163          * performMultipleDeletions() call.  This is to avoid unwanted DROP
164          * RESTRICT errors if one of the schemas depends on another.
165          */
166         objects = new_object_addresses();
167
168         foreach(cell, drop->objects)
169         {
170                 List       *names = (List *) lfirst(cell);
171                 char       *namespaceName;
172                 Oid                     namespaceId;
173                 ObjectAddress object;
174
175                 if (list_length(names) != 1)
176                         ereport(ERROR,
177                                         (errcode(ERRCODE_SYNTAX_ERROR),
178                                          errmsg("schema name cannot be qualified")));
179                 namespaceName = strVal(linitial(names));
180
181                 namespaceId = get_namespace_oid(namespaceName, drop->missing_ok);
182
183                 if (!OidIsValid(namespaceId))
184                 {
185                         ereport(NOTICE,
186                                         (errmsg("schema \"%s\" does not exist, skipping",
187                                                         namespaceName)));
188                         continue;
189                 }
190
191                 /* Permission check */
192                 if (!pg_namespace_ownercheck(namespaceId, GetUserId()))
193                         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_NAMESPACE,
194                                                    namespaceName);
195
196                 object.classId = NamespaceRelationId;
197                 object.objectId = namespaceId;
198                 object.objectSubId = 0;
199
200                 add_exact_object_address(&object, objects);
201         }
202
203         /*
204          * Do the deletions.  Objects contained in the schema(s) are removed by
205          * means of their dependency links to the schema.
206          */
207         performMultipleDeletions(objects, drop->behavior);
208
209         free_object_addresses(objects);
210 }
211
212
213 /*
214  * Guts of schema deletion.
215  */
216 void
217 RemoveSchemaById(Oid schemaOid)
218 {
219         Relation        relation;
220         HeapTuple       tup;
221
222         relation = heap_open(NamespaceRelationId, RowExclusiveLock);
223
224         tup = SearchSysCache1(NAMESPACEOID,
225                                                   ObjectIdGetDatum(schemaOid));
226         if (!HeapTupleIsValid(tup)) /* should not happen */
227                 elog(ERROR, "cache lookup failed for namespace %u", schemaOid);
228
229         simple_heap_delete(relation, &tup->t_self);
230
231         ReleaseSysCache(tup);
232
233         heap_close(relation, RowExclusiveLock);
234 }
235
236
237 /*
238  * Rename schema
239  */
240 void
241 RenameSchema(const char *oldname, const char *newname)
242 {
243         HeapTuple       tup;
244         Relation        rel;
245         AclResult       aclresult;
246
247         rel = heap_open(NamespaceRelationId, RowExclusiveLock);
248
249         tup = SearchSysCacheCopy1(NAMESPACENAME, CStringGetDatum(oldname));
250         if (!HeapTupleIsValid(tup))
251                 ereport(ERROR,
252                                 (errcode(ERRCODE_UNDEFINED_SCHEMA),
253                                  errmsg("schema \"%s\" does not exist", oldname)));
254
255         /* make sure the new name doesn't exist */
256         if (OidIsValid(get_namespace_oid(newname, true)))
257                 ereport(ERROR,
258                                 (errcode(ERRCODE_DUPLICATE_SCHEMA),
259                                  errmsg("schema \"%s\" already exists", newname)));
260
261         /* must be owner */
262         if (!pg_namespace_ownercheck(HeapTupleGetOid(tup), GetUserId()))
263                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_NAMESPACE,
264                                            oldname);
265
266         /* must have CREATE privilege on database */
267         aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(), ACL_CREATE);
268         if (aclresult != ACLCHECK_OK)
269                 aclcheck_error(aclresult, ACL_KIND_DATABASE,
270                                            get_database_name(MyDatabaseId));
271
272         if (!allowSystemTableMods && IsReservedName(newname))
273                 ereport(ERROR,
274                                 (errcode(ERRCODE_RESERVED_NAME),
275                                  errmsg("unacceptable schema name \"%s\"", newname),
276                    errdetail("The prefix \"pg_\" is reserved for system schemas.")));
277
278         /* rename */
279         namestrcpy(&(((Form_pg_namespace) GETSTRUCT(tup))->nspname), newname);
280         simple_heap_update(rel, &tup->t_self, tup);
281         CatalogUpdateIndexes(rel, tup);
282
283         heap_close(rel, NoLock);
284         heap_freetuple(tup);
285 }
286
287 void
288 AlterSchemaOwner_oid(Oid oid, Oid newOwnerId)
289 {
290         HeapTuple       tup;
291         Relation        rel;
292
293         rel = heap_open(NamespaceRelationId, RowExclusiveLock);
294
295         tup = SearchSysCache1(NAMESPACEOID, ObjectIdGetDatum(oid));
296         if (!HeapTupleIsValid(tup))
297                 elog(ERROR, "cache lookup failed for schema %u", oid);
298
299         AlterSchemaOwner_internal(tup, rel, newOwnerId);
300
301         ReleaseSysCache(tup);
302
303         heap_close(rel, RowExclusiveLock);
304 }
305
306
307 /*
308  * Change schema owner
309  */
310 void
311 AlterSchemaOwner(const char *name, Oid newOwnerId)
312 {
313         HeapTuple       tup;
314         Relation        rel;
315
316         rel = heap_open(NamespaceRelationId, RowExclusiveLock);
317
318         tup = SearchSysCache1(NAMESPACENAME, CStringGetDatum(name));
319         if (!HeapTupleIsValid(tup))
320                 ereport(ERROR,
321                                 (errcode(ERRCODE_UNDEFINED_SCHEMA),
322                                  errmsg("schema \"%s\" does not exist", name)));
323
324         AlterSchemaOwner_internal(tup, rel, newOwnerId);
325
326         ReleaseSysCache(tup);
327
328         heap_close(rel, RowExclusiveLock);
329 }
330
331 static void
332 AlterSchemaOwner_internal(HeapTuple tup, Relation rel, Oid newOwnerId)
333 {
334         Form_pg_namespace nspForm;
335
336         Assert(tup->t_tableOid == NamespaceRelationId);
337         Assert(RelationGetRelid(rel) == NamespaceRelationId);
338
339         nspForm = (Form_pg_namespace) GETSTRUCT(tup);
340
341         /*
342          * If the new owner is the same as the existing owner, consider the
343          * command to have succeeded.  This is for dump restoration purposes.
344          */
345         if (nspForm->nspowner != newOwnerId)
346         {
347                 Datum           repl_val[Natts_pg_namespace];
348                 bool            repl_null[Natts_pg_namespace];
349                 bool            repl_repl[Natts_pg_namespace];
350                 Acl                *newAcl;
351                 Datum           aclDatum;
352                 bool            isNull;
353                 HeapTuple       newtuple;
354                 AclResult       aclresult;
355
356                 /* Otherwise, must be owner of the existing object */
357                 if (!pg_namespace_ownercheck(HeapTupleGetOid(tup), GetUserId()))
358                         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_NAMESPACE,
359                                                    NameStr(nspForm->nspname));
360
361                 /* Must be able to become new owner */
362                 check_is_member_of_role(GetUserId(), newOwnerId);
363
364                 /*
365                  * must have create-schema rights
366                  *
367                  * NOTE: This is different from other alter-owner checks in that the
368                  * current user is checked for create privileges instead of the
369                  * destination owner.  This is consistent with the CREATE case for
370                  * schemas.  Because superusers will always have this right, we need
371                  * no special case for them.
372                  */
373                 aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(),
374                                                                                  ACL_CREATE);
375                 if (aclresult != ACLCHECK_OK)
376                         aclcheck_error(aclresult, ACL_KIND_DATABASE,
377                                                    get_database_name(MyDatabaseId));
378
379                 memset(repl_null, false, sizeof(repl_null));
380                 memset(repl_repl, false, sizeof(repl_repl));
381
382                 repl_repl[Anum_pg_namespace_nspowner - 1] = true;
383                 repl_val[Anum_pg_namespace_nspowner - 1] = ObjectIdGetDatum(newOwnerId);
384
385                 /*
386                  * Determine the modified ACL for the new owner.  This is only
387                  * necessary when the ACL is non-null.
388                  */
389                 aclDatum = SysCacheGetAttr(NAMESPACENAME, tup,
390                                                                    Anum_pg_namespace_nspacl,
391                                                                    &isNull);
392                 if (!isNull)
393                 {
394                         newAcl = aclnewowner(DatumGetAclP(aclDatum),
395                                                                  nspForm->nspowner, newOwnerId);
396                         repl_repl[Anum_pg_namespace_nspacl - 1] = true;
397                         repl_val[Anum_pg_namespace_nspacl - 1] = PointerGetDatum(newAcl);
398                 }
399
400                 newtuple = heap_modify_tuple(tup, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
401
402                 simple_heap_update(rel, &newtuple->t_self, newtuple);
403                 CatalogUpdateIndexes(rel, newtuple);
404
405                 heap_freetuple(newtuple);
406
407                 /* Update owner dependency reference */
408                 changeDependencyOnOwner(NamespaceRelationId, HeapTupleGetOid(tup),
409                                                                 newOwnerId);
410         }
411
412 }