OSDN Git Service

Support the syntax
[pg-rex/syncrep.git] / src / backend / commands / aggregatecmds.c
1 /*-------------------------------------------------------------------------
2  *
3  * aggregatecmds.c
4  *
5  *        Routines for aggregate-manipulation commands
6  *
7  * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  *
11  * IDENTIFICATION
12  *        $PostgreSQL: pgsql/src/backend/commands/aggregatecmds.c,v 1.34 2006/04/15 17:45:33 tgl Exp $
13  *
14  * DESCRIPTION
15  *        The "DefineFoo" routines take the parse tree and pick out the
16  *        appropriate arguments/flags, passing the results to the
17  *        corresponding "FooDefine" routines (in src/catalog) that do
18  *        the actual catalog-munging.  These routines also verify permission
19  *        of the user to execute the command.
20  *
21  *-------------------------------------------------------------------------
22  */
23 #include "postgres.h"
24
25 #include "access/heapam.h"
26 #include "catalog/dependency.h"
27 #include "catalog/indexing.h"
28 #include "catalog/namespace.h"
29 #include "catalog/pg_aggregate.h"
30 #include "catalog/pg_proc.h"
31 #include "catalog/pg_type.h"
32 #include "commands/defrem.h"
33 #include "miscadmin.h"
34 #include "parser/parse_func.h"
35 #include "parser/parse_type.h"
36 #include "utils/acl.h"
37 #include "utils/builtins.h"
38 #include "utils/lsyscache.h"
39 #include "utils/syscache.h"
40
41
42 /*
43  *      DefineAggregate
44  *
45  * "oldstyle" signals the old (pre-8.2) style where the aggregate input type
46  * is specified by a BASETYPE element in the parameters.  Otherwise,
47  * "args" defines the input type(s).
48  */
49 void
50 DefineAggregate(List *name, List *args, bool oldstyle, List *parameters)
51 {
52         char       *aggName;
53         Oid                     aggNamespace;
54         AclResult       aclresult;
55         List       *transfuncName = NIL;
56         List       *finalfuncName = NIL;
57         List       *sortoperatorName = NIL;
58         TypeName   *baseType = NULL;
59         TypeName   *transType = NULL;
60         char       *initval = NULL;
61         Oid                     baseTypeId;
62         Oid                     transTypeId;
63         ListCell   *pl;
64
65         /* Convert list of names to a name and namespace */
66         aggNamespace = QualifiedNameGetCreationNamespace(name, &aggName);
67
68         /* Check we have creation rights in target namespace */
69         aclresult = pg_namespace_aclcheck(aggNamespace, GetUserId(), ACL_CREATE);
70         if (aclresult != ACLCHECK_OK)
71                 aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
72                                            get_namespace_name(aggNamespace));
73
74         foreach(pl, parameters)
75         {
76                 DefElem    *defel = (DefElem *) lfirst(pl);
77
78                 /*
79                  * sfunc1, stype1, and initcond1 are accepted as obsolete spellings
80                  * for sfunc, stype, initcond.
81                  */
82                 if (pg_strcasecmp(defel->defname, "sfunc") == 0)
83                         transfuncName = defGetQualifiedName(defel);
84                 else if (pg_strcasecmp(defel->defname, "sfunc1") == 0)
85                         transfuncName = defGetQualifiedName(defel);
86                 else if (pg_strcasecmp(defel->defname, "finalfunc") == 0)
87                         finalfuncName = defGetQualifiedName(defel);
88                 else if (pg_strcasecmp(defel->defname, "sortop") == 0)
89                         sortoperatorName = defGetQualifiedName(defel);
90                 else if (pg_strcasecmp(defel->defname, "basetype") == 0)
91                         baseType = defGetTypeName(defel);
92                 else if (pg_strcasecmp(defel->defname, "stype") == 0)
93                         transType = defGetTypeName(defel);
94                 else if (pg_strcasecmp(defel->defname, "stype1") == 0)
95                         transType = defGetTypeName(defel);
96                 else if (pg_strcasecmp(defel->defname, "initcond") == 0)
97                         initval = defGetString(defel);
98                 else if (pg_strcasecmp(defel->defname, "initcond1") == 0)
99                         initval = defGetString(defel);
100                 else
101                         ereport(WARNING,
102                                         (errcode(ERRCODE_SYNTAX_ERROR),
103                                          errmsg("aggregate attribute \"%s\" not recognized",
104                                                         defel->defname)));
105         }
106
107         /*
108          * make sure we have our required definitions
109          */
110         if (transType == NULL)
111                 ereport(ERROR,
112                                 (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
113                                  errmsg("aggregate stype must be specified")));
114         if (transfuncName == NIL)
115                 ereport(ERROR,
116                                 (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
117                                  errmsg("aggregate sfunc must be specified")));
118
119         /*
120          * look up the aggregate's input datatype.
121          */
122         if (oldstyle)
123         {
124                 /*
125                  * Old style: use basetype parameter.  This supports only one input.
126                  *
127                  * Historically we allowed the command to look like basetype = 'ANY'
128                  * so we must do a case-insensitive comparison for the name ANY. Ugh.
129                  */
130                 if (baseType == NULL)
131                         ereport(ERROR,
132                                         (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
133                                          errmsg("aggregate input type must be specified")));
134
135                 if (pg_strcasecmp(TypeNameToString(baseType), "ANY") == 0)
136                         baseTypeId = ANYOID;
137                 else
138                         baseTypeId = typenameTypeId(NULL, baseType);
139         }
140         else
141         {
142                 /*
143                  * New style: args is a list of TypeNames.  For the moment, though,
144                  * we allow at most one.
145                  */
146                 if (baseType != NULL)
147                         ereport(ERROR,
148                                         (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
149                                          errmsg("basetype is redundant with aggregate input type specification")));
150
151                 if (args == NIL)
152                 {
153                         /* special case for agg(*) */
154                         baseTypeId = ANYOID;
155                 }
156                 else if (list_length(args) != 1)
157                 {
158                         /* temporarily reject > 1 arg */
159                         ereport(ERROR,
160                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
161                                          errmsg("aggregates can have only one input")));
162                         baseTypeId = InvalidOid;        /* keep compiler quiet */
163                 }
164                 else
165                 {
166                         baseTypeId = typenameTypeId(NULL, (TypeName *) linitial(args));
167                 }
168         }
169
170         /*
171          * look up the aggregate's transtype.
172          *
173          * transtype can't be a pseudo-type, since we need to be
174          * able to store values of the transtype.  However, we can allow
175          * polymorphic transtype in some cases (AggregateCreate will check).
176          */
177         transTypeId = typenameTypeId(NULL, transType);
178         if (get_typtype(transTypeId) == 'p' &&
179                 transTypeId != ANYARRAYOID &&
180                 transTypeId != ANYELEMENTOID)
181                 ereport(ERROR,
182                                 (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
183                                  errmsg("aggregate transition data type cannot be %s",
184                                                 format_type_be(transTypeId))));
185
186         /*
187          * Most of the argument-checking is done inside of AggregateCreate
188          */
189         AggregateCreate(aggName,        /* aggregate name */
190                                         aggNamespace,           /* namespace */
191                                         baseTypeId, /* type of data being aggregated */
192                                         transfuncName,          /* step function name */
193                                         finalfuncName,          /* final function name */
194                                         sortoperatorName,       /* sort operator name */
195                                         transTypeId,    /* transition data type */
196                                         initval);       /* initial condition */
197 }
198
199
200 /*
201  * RemoveAggregate
202  *              Deletes an aggregate.
203  */
204 void
205 RemoveAggregate(RemoveFuncStmt *stmt)
206 {
207         List       *aggName = stmt->name;
208         List       *aggArgs = stmt->args;
209         Oid                     procOid;
210         HeapTuple       tup;
211         ObjectAddress object;
212
213         /* Look up function and make sure it's an aggregate */
214         procOid = LookupAggNameTypeNames(aggName, aggArgs, false);
215
216         /*
217          * Find the function tuple, do permissions and validity checks
218          */
219         tup = SearchSysCache(PROCOID,
220                                                  ObjectIdGetDatum(procOid),
221                                                  0, 0, 0);
222         if (!HeapTupleIsValid(tup)) /* should not happen */
223                 elog(ERROR, "cache lookup failed for function %u", procOid);
224
225         /* Permission check: must own agg or its namespace */
226         if (!pg_proc_ownercheck(procOid, GetUserId()) &&
227           !pg_namespace_ownercheck(((Form_pg_proc) GETSTRUCT(tup))->pronamespace,
228                                                            GetUserId()))
229                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_PROC,
230                                            NameListToString(aggName));
231
232         ReleaseSysCache(tup);
233
234         /*
235          * Do the deletion
236          */
237         object.classId = ProcedureRelationId;
238         object.objectId = procOid;
239         object.objectSubId = 0;
240
241         performDeletion(&object, stmt->behavior);
242 }
243
244
245 void
246 RenameAggregate(List *name, List *args, const char *newname)
247 {
248         Oid                     procOid;
249         Oid                     namespaceOid;
250         HeapTuple       tup;
251         Form_pg_proc procForm;
252         Relation        rel;
253         AclResult       aclresult;
254
255         rel = heap_open(ProcedureRelationId, RowExclusiveLock);
256
257         /* Look up function and make sure it's an aggregate */
258         procOid = LookupAggNameTypeNames(name, args, false);
259
260         tup = SearchSysCacheCopy(PROCOID,
261                                                          ObjectIdGetDatum(procOid),
262                                                          0, 0, 0);
263         if (!HeapTupleIsValid(tup)) /* should not happen */
264                 elog(ERROR, "cache lookup failed for function %u", procOid);
265         procForm = (Form_pg_proc) GETSTRUCT(tup);
266
267         namespaceOid = procForm->pronamespace;
268
269         /* make sure the new name doesn't exist */
270         if (SearchSysCacheExists(PROCNAMEARGSNSP,
271                                                          CStringGetDatum(newname),
272                                                          PointerGetDatum(&procForm->proargtypes),
273                                                          ObjectIdGetDatum(namespaceOid),
274                                                          0))
275                 ereport(ERROR,
276                                 (errcode(ERRCODE_DUPLICATE_FUNCTION),
277                                  errmsg("function %s already exists in schema \"%s\"",
278                                                 funcname_signature_string(newname,
279                                                                                                   procForm->pronargs,
280                                                                                                   procForm->proargtypes.values),
281                                                 get_namespace_name(namespaceOid))));
282
283         /* must be owner */
284         if (!pg_proc_ownercheck(procOid, GetUserId()))
285                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_PROC,
286                                            NameListToString(name));
287
288         /* must have CREATE privilege on namespace */
289         aclresult = pg_namespace_aclcheck(namespaceOid, GetUserId(), ACL_CREATE);
290         if (aclresult != ACLCHECK_OK)
291                 aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
292                                            get_namespace_name(namespaceOid));
293
294         /* rename */
295         namestrcpy(&(((Form_pg_proc) GETSTRUCT(tup))->proname), newname);
296         simple_heap_update(rel, &tup->t_self, tup);
297         CatalogUpdateIndexes(rel, tup);
298
299         heap_close(rel, NoLock);
300         heap_freetuple(tup);
301 }
302
303 /*
304  * Change aggregate owner
305  */
306 void
307 AlterAggregateOwner(List *name, List *args, Oid newOwnerId)
308 {
309         Oid                     procOid;
310         HeapTuple       tup;
311         Form_pg_proc procForm;
312         Relation        rel;
313         AclResult       aclresult;
314
315         rel = heap_open(ProcedureRelationId, RowExclusiveLock);
316
317         /* Look up function and make sure it's an aggregate */
318         procOid = LookupAggNameTypeNames(name, args, false);
319
320         tup = SearchSysCacheCopy(PROCOID,
321                                                          ObjectIdGetDatum(procOid),
322                                                          0, 0, 0);
323         if (!HeapTupleIsValid(tup)) /* should not happen */
324                 elog(ERROR, "cache lookup failed for function %u", procOid);
325         procForm = (Form_pg_proc) GETSTRUCT(tup);
326
327         /*
328          * If the new owner is the same as the existing owner, consider the
329          * command to have succeeded.  This is for dump restoration purposes.
330          */
331         if (procForm->proowner != newOwnerId)
332         {
333                 /* Superusers can always do it */
334                 if (!superuser())
335                 {
336                         /* Otherwise, must be owner of the existing object */
337                         if (!pg_proc_ownercheck(procOid, GetUserId()))
338                                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_PROC,
339                                                            NameListToString(name));
340
341                         /* Must be able to become new owner */
342                         check_is_member_of_role(GetUserId(), newOwnerId);
343
344                         /* New owner must have CREATE privilege on namespace */
345                         aclresult = pg_namespace_aclcheck(procForm->pronamespace,
346                                                                                           newOwnerId,
347                                                                                           ACL_CREATE);
348                         if (aclresult != ACLCHECK_OK)
349                                 aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
350                                                            get_namespace_name(procForm->pronamespace));
351                 }
352
353                 /*
354                  * Modify the owner --- okay to scribble on tup because it's a copy
355                  */
356                 procForm->proowner = newOwnerId;
357
358                 simple_heap_update(rel, &tup->t_self, tup);
359                 CatalogUpdateIndexes(rel, tup);
360         }
361
362         heap_close(rel, NoLock);
363         heap_freetuple(tup);
364 }