OSDN Git Service

928d27ae16abf0a702085d0fd182ac24fc4d7a36
[pg-rex/syncrep.git] / src / backend / catalog / pg_aggregate.c
1 /*-------------------------------------------------------------------------
2  *
3  * pg_aggregate.c
4  *        routines to support manipulation of the pg_aggregate relation
5  *
6  * Portions Copyright (c) 1996-2004, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/catalog/pg_aggregate.c,v 1.67 2004/08/29 04:12:28 momjian Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/heapam.h"
18 #include "catalog/catname.h"
19 #include "catalog/dependency.h"
20 #include "catalog/indexing.h"
21 #include "catalog/namespace.h"
22 #include "catalog/pg_aggregate.h"
23 #include "catalog/pg_language.h"
24 #include "catalog/pg_proc.h"
25 #include "optimizer/cost.h"
26 #include "parser/parse_coerce.h"
27 #include "parser/parse_func.h"
28 #include "utils/builtins.h"
29 #include "utils/syscache.h"
30
31
32 static Oid lookup_agg_function(List *fnName, int nargs, Oid *input_types,
33                                         Oid *rettype);
34
35
36 /*
37  * AggregateCreate
38  */
39 void
40 AggregateCreate(const char *aggName,
41                                 Oid aggNamespace,
42                                 List *aggtransfnName,
43                                 List *aggfinalfnName,
44                                 Oid aggBaseType,
45                                 Oid aggTransType,
46                                 const char *agginitval)
47 {
48         Relation        aggdesc;
49         HeapTuple       tup;
50         char            nulls[Natts_pg_aggregate];
51         Datum           values[Natts_pg_aggregate];
52         Form_pg_proc proc;
53         Oid                     transfn;
54         Oid                     finalfn = InvalidOid;   /* can be omitted */
55         Oid                     rettype;
56         Oid                     finaltype;
57         Oid                     fnArgs[FUNC_MAX_ARGS];
58         int                     nargs_transfn;
59         Oid                     procOid;
60         TupleDesc       tupDesc;
61         int                     i;
62         ObjectAddress myself,
63                                 referenced;
64
65         /* sanity checks (caller should have caught these) */
66         if (!aggName)
67                 elog(ERROR, "no aggregate name supplied");
68
69         if (!aggtransfnName)
70                 elog(ERROR, "aggregate must have a transition function");
71
72         /*
73          * If transtype is polymorphic, basetype must be polymorphic also;
74          * else we will have no way to deduce the actual transtype.
75          */
76         if ((aggTransType == ANYARRAYOID || aggTransType == ANYELEMENTOID) &&
77                 !(aggBaseType == ANYARRAYOID || aggBaseType == ANYELEMENTOID))
78                 ereport(ERROR,
79                                 (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
80                                  errmsg("cannot determine transition data type"),
81                                  errdetail("An aggregate using \"anyarray\" or \"anyelement\" as "
82                                  "transition type must have one of them as its base type.")));
83
84         /* handle transfn */
85         MemSet(fnArgs, 0, FUNC_MAX_ARGS * sizeof(Oid));
86         fnArgs[0] = aggTransType;
87         if (aggBaseType == ANYOID)
88                 nargs_transfn = 1;
89         else
90         {
91                 fnArgs[1] = aggBaseType;
92                 nargs_transfn = 2;
93         }
94         transfn = lookup_agg_function(aggtransfnName, nargs_transfn, fnArgs,
95                                                                   &rettype);
96
97         /*
98          * Return type of transfn (possibly after refinement by
99          * enforce_generic_type_consistency, if transtype isn't polymorphic)
100          * must exactly match declared transtype.
101          *
102          * In the non-polymorphic-transtype case, it might be okay to allow a
103          * rettype that's binary-coercible to transtype, but I'm not quite
104          * convinced that it's either safe or useful.  When transtype is
105          * polymorphic we *must* demand exact equality.
106          */
107         if (rettype != aggTransType)
108                 ereport(ERROR,
109                                 (errcode(ERRCODE_DATATYPE_MISMATCH),
110                                  errmsg("return type of transition function %s is not %s",
111                                                 NameListToString(aggtransfnName),
112                                                 format_type_be(aggTransType))));
113
114         tup = SearchSysCache(PROCOID,
115                                                  ObjectIdGetDatum(transfn),
116                                                  0, 0, 0);
117         if (!HeapTupleIsValid(tup))
118                 elog(ERROR, "cache lookup failed for function %u", transfn);
119         proc = (Form_pg_proc) GETSTRUCT(tup);
120
121         /*
122          * If the transfn is strict and the initval is NULL, make sure input
123          * type and transtype are the same (or at least binary-compatible), so
124          * that it's OK to use the first input value as the initial
125          * transValue.
126          */
127         if (proc->proisstrict && agginitval == NULL)
128         {
129                 if (!IsBinaryCoercible(aggBaseType, aggTransType))
130                         ereport(ERROR,
131                                         (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
132                                          errmsg("must not omit initial value when transition function is strict and transition type is not compatible with input type")));
133         }
134         ReleaseSysCache(tup);
135
136         /* handle finalfn, if supplied */
137         if (aggfinalfnName)
138         {
139                 MemSet(fnArgs, 0, FUNC_MAX_ARGS * sizeof(Oid));
140                 fnArgs[0] = aggTransType;
141                 finalfn = lookup_agg_function(aggfinalfnName, 1, fnArgs,
142                                                                           &finaltype);
143         }
144         else
145         {
146                 /*
147                  * If no finalfn, aggregate result type is type of the state value
148                  */
149                 finaltype = aggTransType;
150         }
151         Assert(OidIsValid(finaltype));
152
153         /*
154          * If finaltype (i.e. aggregate return type) is polymorphic, basetype
155          * must be polymorphic also, else parser will fail to deduce result
156          * type.  (Note: given the previous test on transtype and basetype,
157          * this cannot happen, unless someone has snuck a finalfn definition
158          * into the catalogs that itself violates the rule against polymorphic
159          * result with no polymorphic input.)
160          */
161         if ((finaltype == ANYARRAYOID || finaltype == ANYELEMENTOID) &&
162                 !(aggBaseType == ANYARRAYOID || aggBaseType == ANYELEMENTOID))
163                 ereport(ERROR,
164                                 (errcode(ERRCODE_DATATYPE_MISMATCH),
165                                  errmsg("cannot determine result data type"),
166                            errdetail("An aggregate returning \"anyarray\" or \"anyelement\" "
167                                                  "must have one of them as its base type.")));
168
169         /*
170          * Everything looks okay.  Try to create the pg_proc entry for the
171          * aggregate.  (This could fail if there's already a conflicting
172          * entry.)
173          */
174         MemSet(fnArgs, 0, FUNC_MAX_ARGS * sizeof(Oid));
175         fnArgs[0] = aggBaseType;
176
177         procOid = ProcedureCreate(aggName,
178                                                           aggNamespace,
179                                                           false,        /* no replacement */
180                                                           false,        /* doesn't return a set */
181                                                           finaltype,            /* returnType */
182                                                           INTERNALlanguageId,           /* languageObjectId */
183                                                           0,
184                                                           "aggregate_dummy",            /* placeholder proc */
185                                                           "-",          /* probin */
186                                                           true,         /* isAgg */
187                                                           false,        /* security invoker (currently not
188                                                                                  * definable for agg) */
189                                                           false,        /* isStrict (not needed for agg) */
190                                                           PROVOLATILE_IMMUTABLE,        /* volatility (not
191                                                                                                                  * needed for agg) */
192                                                           1,    /* parameterCount */
193                                                           fnArgs, /* parameterTypes */
194                                                           NULL); /* parameterNames */
195
196         /*
197          * Okay to create the pg_aggregate entry.
198          */
199
200         /* initialize nulls and values */
201         for (i = 0; i < Natts_pg_aggregate; i++)
202         {
203                 nulls[i] = ' ';
204                 values[i] = (Datum) NULL;
205         }
206         values[Anum_pg_aggregate_aggfnoid - 1] = ObjectIdGetDatum(procOid);
207         values[Anum_pg_aggregate_aggtransfn - 1] = ObjectIdGetDatum(transfn);
208         values[Anum_pg_aggregate_aggfinalfn - 1] = ObjectIdGetDatum(finalfn);
209         values[Anum_pg_aggregate_aggtranstype - 1] = ObjectIdGetDatum(aggTransType);
210         if (agginitval)
211                 values[Anum_pg_aggregate_agginitval - 1] =
212                         DirectFunctionCall1(textin, CStringGetDatum(agginitval));
213         else
214                 nulls[Anum_pg_aggregate_agginitval - 1] = 'n';
215
216         aggdesc = heap_openr(AggregateRelationName, RowExclusiveLock);
217         tupDesc = aggdesc->rd_att;
218
219         tup = heap_formtuple(tupDesc, values, nulls);
220         simple_heap_insert(aggdesc, tup);
221
222         CatalogUpdateIndexes(aggdesc, tup);
223
224         heap_close(aggdesc, RowExclusiveLock);
225
226         /*
227          * Create dependencies for the aggregate (above and beyond those
228          * already made by ProcedureCreate).  Note: we don't need an explicit
229          * dependency on aggTransType since we depend on it indirectly through
230          * transfn.
231          */
232         myself.classId = RelOid_pg_proc;
233         myself.objectId = procOid;
234         myself.objectSubId = 0;
235
236         /* Depends on transition function */
237         referenced.classId = RelOid_pg_proc;
238         referenced.objectId = transfn;
239         referenced.objectSubId = 0;
240         recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
241
242         /* Depends on final function, if any */
243         if (OidIsValid(finalfn))
244         {
245                 referenced.classId = RelOid_pg_proc;
246                 referenced.objectId = finalfn;
247                 referenced.objectSubId = 0;
248                 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
249         }
250 }
251
252 /*
253  * lookup_agg_function -- common code for finding both transfn and finalfn
254  */
255 static Oid
256 lookup_agg_function(List *fnName,
257                                         int nargs,
258                                         Oid *input_types,
259                                         Oid *rettype)
260 {
261         Oid                     fnOid;
262         bool            retset;
263         Oid                *true_oid_array;
264         FuncDetailCode fdresult;
265
266         /*
267          * func_get_detail looks up the function in the catalogs, does
268          * disambiguation for polymorphic functions, handles inheritance, and
269          * returns the funcid and type and set or singleton status of the
270          * function's return value.  it also returns the true argument types
271          * to the function.
272          */
273         fdresult = func_get_detail(fnName, NIL, nargs, input_types,
274                                                            &fnOid, rettype, &retset,
275                                                            &true_oid_array);
276
277         /* only valid case is a normal function not returning a set */
278         if (fdresult != FUNCDETAIL_NORMAL || !OidIsValid(fnOid))
279                 ereport(ERROR,
280                                 (errcode(ERRCODE_UNDEFINED_FUNCTION),
281                                  errmsg("function %s does not exist",
282                                         func_signature_string(fnName, nargs, input_types))));
283         if (retset)
284                 ereport(ERROR,
285                                 (errcode(ERRCODE_DATATYPE_MISMATCH),
286                                  errmsg("function %s returns a set",
287                                         func_signature_string(fnName, nargs, input_types))));
288
289         /*
290          * If the given type(s) are all polymorphic, there's nothing we can
291          * check.  Otherwise, enforce consistency, and possibly refine the
292          * result type.
293          */
294         if ((input_types[0] == ANYARRAYOID || input_types[0] == ANYELEMENTOID) &&
295                 (nargs == 1 ||
296          (input_types[1] == ANYARRAYOID || input_types[1] == ANYELEMENTOID)))
297         {
298                 /* nothing to check here */
299         }
300         else
301         {
302                 *rettype = enforce_generic_type_consistency(input_types,
303                                                                                                         true_oid_array,
304                                                                                                         nargs,
305                                                                                                         *rettype);
306         }
307
308         /*
309          * func_get_detail will find functions requiring run-time argument
310          * type coercion, but nodeAgg.c isn't prepared to deal with that
311          */
312         if (true_oid_array[0] != ANYARRAYOID &&
313                 true_oid_array[0] != ANYELEMENTOID &&
314                 !IsBinaryCoercible(input_types[0], true_oid_array[0]))
315                 ereport(ERROR,
316                                 (errcode(ERRCODE_DATATYPE_MISMATCH),
317                                  errmsg("function %s requires run-time type coercion",
318                                  func_signature_string(fnName, nargs, true_oid_array))));
319
320         if (nargs == 2 &&
321                 true_oid_array[1] != ANYARRAYOID &&
322                 true_oid_array[1] != ANYELEMENTOID &&
323                 !IsBinaryCoercible(input_types[1], true_oid_array[1]))
324                 ereport(ERROR,
325                                 (errcode(ERRCODE_DATATYPE_MISMATCH),
326                                  errmsg("function %s requires run-time type coercion",
327                                  func_signature_string(fnName, nargs, true_oid_array))));
328
329         return fnOid;
330 }