OSDN Git Service

Restructure system-catalog index updating logic. Instead of having
[pg-rex/syncrep.git] / src / backend / catalog / pg_operator.c
1 /*-------------------------------------------------------------------------
2  *
3  * pg_operator.c
4  *        routines to support manipulation of the pg_operator relation
5  *
6  * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/catalog/pg_operator.c,v 1.75 2002/08/05 03:29:16 tgl Exp $
12  *
13  * NOTES
14  *        these routines moved here from commands/define.c and somewhat cleaned up.
15  *
16  *-------------------------------------------------------------------------
17  */
18 #include "postgres.h"
19
20 #include "access/heapam.h"
21 #include "catalog/catname.h"
22 #include "catalog/dependency.h"
23 #include "catalog/indexing.h"
24 #include "catalog/namespace.h"
25 #include "catalog/pg_operator.h"
26 #include "catalog/pg_type.h"
27 #include "miscadmin.h"
28 #include "parser/parse_func.h"
29 #include "parser/parse_oper.h"
30 #include "utils/acl.h"
31 #include "utils/builtins.h"
32 #include "utils/lsyscache.h"
33 #include "utils/syscache.h"
34
35
36 static Oid OperatorGet(const char *operatorName,
37                                            Oid operatorNamespace,
38                                            Oid leftObjectId,
39                                            Oid rightObjectId,
40                                            bool *defined);
41
42 static Oid OperatorLookup(List *operatorName,
43                                                   Oid leftObjectId,
44                                                   Oid rightObjectId,
45                                                   bool *defined);
46
47 static Oid OperatorShellMake(const char *operatorName,
48                                                          Oid operatorNamespace,
49                                                          Oid leftTypeId,
50                                                          Oid rightTypeId);
51
52 static void OperatorUpd(Oid baseId, Oid commId, Oid negId);
53
54 static Oid get_other_operator(List *otherOp,
55                                                           Oid otherLeftTypeId, Oid otherRightTypeId,
56                                                           const char *operatorName, Oid operatorNamespace,
57                                                           Oid leftTypeId, Oid rightTypeId,
58                                                           bool isCommutator);
59
60 static void makeOperatorDependencies(HeapTuple tuple, Oid pg_operator_relid);
61
62
63 /*
64  * Check whether a proposed operator name is legal
65  *
66  * This had better match the behavior of parser/scan.l!
67  *
68  * We need this because the parser is not smart enough to check that
69  * the arguments of CREATE OPERATOR's COMMUTATOR, NEGATOR, etc clauses
70  * are operator names rather than some other lexical entity.
71  */
72 static bool
73 validOperatorName(const char *name)
74 {
75         size_t          len = strlen(name);
76
77         /* Can't be empty or too long */
78         if (len == 0 || len >= NAMEDATALEN)
79                 return false;
80
81         /* Can't contain any invalid characters */
82         /* Test string here should match op_chars in scan.l */
83         if (strspn(name, "~!@#^&|`?$+-*/%<>=") != len)
84                 return false;
85
86         /* Can't contain slash-star or dash-dash (comment starts) */
87         if (strstr(name, "/*") || strstr(name, "--"))
88                 return false;
89
90         /*
91          * For SQL92 compatibility, '+' and '-' cannot be the last char of a
92          * multi-char operator unless the operator contains chars that are not
93          * in SQL92 operators. The idea is to lex '=-' as two operators, but
94          * not to forbid operator names like '?-' that could not be sequences
95          * of SQL92 operators.
96          */
97         if (len > 1 &&
98                 (name[len - 1] == '+' ||
99                  name[len - 1] == '-'))
100         {
101                 int                     ic;
102
103                 for (ic = len - 2; ic >= 0; ic--)
104                 {
105                         if (strchr("~!@#^&|`?$%", name[ic]))
106                                 break;
107                 }
108                 if (ic < 0)
109                         return false;           /* nope, not valid */
110         }
111
112         /* != isn't valid either, because parser will convert it to <> */
113         if (strcmp(name, "!=") == 0)
114                 return false;
115
116         return true;
117 }
118
119
120 /*
121  * OperatorGet
122  *
123  *              finds an operator given an exact specification (name, namespace,
124  *              left and right type IDs).
125  *
126  *              *defined is set TRUE if defined (not a shell)
127  */
128 static Oid
129 OperatorGet(const char *operatorName,
130                         Oid operatorNamespace,
131                         Oid leftObjectId,
132                         Oid rightObjectId,
133                         bool *defined)
134 {
135         HeapTuple       tup;
136         Oid                     operatorObjectId;
137
138         tup = SearchSysCache(OPERNAMENSP,
139                                                  PointerGetDatum(operatorName),
140                                                  ObjectIdGetDatum(leftObjectId),
141                                                  ObjectIdGetDatum(rightObjectId),
142                                                  ObjectIdGetDatum(operatorNamespace));
143         if (HeapTupleIsValid(tup))
144         {
145                 RegProcedure oprcode = ((Form_pg_operator) GETSTRUCT(tup))->oprcode;
146
147                 operatorObjectId = HeapTupleGetOid(tup);
148                 *defined = RegProcedureIsValid(oprcode);
149                 ReleaseSysCache(tup);
150         }
151         else
152         {
153                 operatorObjectId = InvalidOid;
154                 *defined = false;
155         }
156
157         return operatorObjectId;
158 }
159
160 /*
161  * OperatorLookup
162  *
163  *              looks up an operator given a possibly-qualified name and
164  *              left and right type IDs.
165  *
166  *              *defined is set TRUE if defined (not a shell)
167  */
168 static Oid
169 OperatorLookup(List *operatorName,
170                            Oid leftObjectId,
171                            Oid rightObjectId,
172                            bool *defined)
173 {
174         Oid                     operatorObjectId;
175         RegProcedure oprcode;
176
177         operatorObjectId = LookupOperName(operatorName, leftObjectId,
178                                                                           rightObjectId);
179         if (!OidIsValid(operatorObjectId))
180         {
181                 *defined = false;
182                 return InvalidOid;
183         }
184
185         oprcode = get_opcode(operatorObjectId);
186         *defined = RegProcedureIsValid(oprcode);
187
188         return operatorObjectId;
189 }
190
191
192 /*
193  * OperatorShellMake
194  *              Make a "shell" entry for a not-yet-existing operator.
195  */
196 static Oid
197 OperatorShellMake(const char *operatorName,
198                                   Oid operatorNamespace,
199                                   Oid leftTypeId,
200                                   Oid rightTypeId)
201 {
202         Relation        pg_operator_desc;
203         Oid                     operatorObjectId;
204         int                     i;
205         HeapTuple       tup;
206         Datum           values[Natts_pg_operator];
207         char            nulls[Natts_pg_operator];
208         NameData        oname;
209         TupleDesc       tupDesc;
210
211         /*
212          * validate operator name
213          */
214         if (!validOperatorName(operatorName))
215                 elog(ERROR, "\"%s\" is not a valid operator name", operatorName);
216
217         /*
218          * initialize our *nulls and *values arrays
219          */
220         for (i = 0; i < Natts_pg_operator; ++i)
221         {
222                 nulls[i] = ' ';
223                 values[i] = (Datum) NULL;               /* redundant, but safe */
224         }
225
226         /*
227          * initialize values[] with the operator name and input data types.
228          * Note that oprcode is set to InvalidOid, indicating it's a shell.
229          */
230         i = 0;
231         namestrcpy(&oname, operatorName);
232         values[i++] = NameGetDatum(&oname);                             /* oprname */
233         values[i++] = ObjectIdGetDatum(operatorNamespace);      /* oprnamespace */
234         values[i++] = Int32GetDatum(GetUserId());               /* oprowner */
235         values[i++] = CharGetDatum(leftTypeId ? (rightTypeId ? 'b' : 'r') : 'l');       /* oprkind */
236         values[i++] = BoolGetDatum(false);                              /* oprcanhash */
237         values[i++] = ObjectIdGetDatum(leftTypeId);             /* oprleft */
238         values[i++] = ObjectIdGetDatum(rightTypeId);    /* oprright */
239         values[i++] = ObjectIdGetDatum(InvalidOid);             /* oprresult */
240         values[i++] = ObjectIdGetDatum(InvalidOid);             /* oprcom */
241         values[i++] = ObjectIdGetDatum(InvalidOid);             /* oprnegate */
242         values[i++] = ObjectIdGetDatum(InvalidOid);             /* oprlsortop */
243         values[i++] = ObjectIdGetDatum(InvalidOid);             /* oprrsortop */
244         values[i++] = ObjectIdGetDatum(InvalidOid);             /* oprltcmpop */
245         values[i++] = ObjectIdGetDatum(InvalidOid);             /* oprgtcmpop */
246         values[i++] = ObjectIdGetDatum(InvalidOid);             /* oprcode */
247         values[i++] = ObjectIdGetDatum(InvalidOid);             /* oprrest */
248         values[i++] = ObjectIdGetDatum(InvalidOid);             /* oprjoin */
249
250         /*
251          * open pg_operator
252          */
253         pg_operator_desc = heap_openr(OperatorRelationName, RowExclusiveLock);
254         tupDesc = pg_operator_desc->rd_att;
255
256         /*
257          * create a new operator tuple
258          */
259         tup = heap_formtuple(tupDesc, values, nulls);
260
261         /*
262          * insert our "shell" operator tuple
263          */
264         operatorObjectId = simple_heap_insert(pg_operator_desc, tup);
265
266         CatalogUpdateIndexes(pg_operator_desc, tup);
267
268         /* Add dependencies for the entry */
269         makeOperatorDependencies(tup, RelationGetRelid(pg_operator_desc));
270
271         heap_freetuple(tup);
272
273         /*
274          * close the operator relation and return the oid.
275          */
276         heap_close(pg_operator_desc, RowExclusiveLock);
277
278         return operatorObjectId;
279 }
280
281 /*
282  * OperatorCreate
283  *
284  * "X" indicates an optional argument (i.e. one that can be NULL or 0)
285  *              operatorName                    name for new operator
286  *              operatorNamespace               namespace for new operator
287  *              leftTypeId                              X left type ID
288  *              rightTypeId                             X right type ID
289  *              procedureName                   procedure for operator
290  *              commutatorName                  X commutator operator
291  *              negatorName                             X negator operator
292  *              restrictionName                 X restriction sel. procedure
293  *              joinName                                X join sel. procedure
294  *              canHash                                 hash join can be used with this operator
295  *              leftSortName                    X left sort operator (for merge join)
296  *              rightSortName                   X right sort operator (for merge join)
297  *              ltCompareName                   X L<R compare operator (for merge join)
298  *              gtCompareName                   X L>R compare operator (for merge join)
299  *
300  * This routine gets complicated because it allows the user to
301  * specify operators that do not exist.  For example, if operator
302  * "op" is being defined, the negator operator "negop" and the
303  * commutator "commop" can also be defined without specifying
304  * any information other than their names.      Since in order to
305  * add "op" to the PG_OPERATOR catalog, all the Oid's for these
306  * operators must be placed in the fields of "op", a forward
307  * declaration is done on the commutator and negator operators.
308  * This is called creating a shell, and its main effect is to
309  * create a tuple in the PG_OPERATOR catalog with minimal
310  * information about the operator (just its name and types).
311  * Forward declaration is used only for this purpose, it is
312  * not available to the user as it is for type definition.
313  *
314  * Algorithm:
315  *
316  * check if operator already defined
317  *        if so, but oprcode is null, save the Oid -- we are filling in a shell
318  *        otherwise error
319  * get the attribute types from relation descriptor for pg_operator
320  * assign values to the fields of the operator:
321  *       operatorName
322  *       owner id (simply the user id of the caller)
323  *       operator "kind" either "b" for binary or "l" for left unary
324  *       canHash boolean
325  *       leftTypeObjectId -- type must already be defined
326  *       rightTypeObjectId -- this is optional, enter ObjectId=0 if none specified
327  *       resultType -- defer this, since it must be determined from
328  *                                 the pg_procedure catalog
329  *       commutatorObjectId -- if this is NULL, enter ObjectId=0
330  *                                        else if this already exists, enter its ObjectId
331  *                                        else if this does not yet exist, and is not
332  *                                              the same as the main operatorName, then create
333  *                                              a shell and enter the new ObjectId
334  *                                        else if this does not exist but IS the same
335  *                                              name & types as the main operator, set the ObjectId=0.
336  *                                              (We are creating a self-commutating operator.)
337  *                                              The link will be fixed later by OperatorUpd.
338  *       negatorObjectId   -- same as for commutatorObjectId
339  *       leftSortObjectId  -- same as for commutatorObjectId
340  *       rightSortObjectId -- same as for commutatorObjectId
341  *       operatorProcedure -- must access the pg_procedure catalog to get the
342  *                                 ObjectId of the procedure that actually does the operator
343  *                                 actions this is required.  Do a lookup to find out the
344  *                                 return type of the procedure
345  *       restrictionProcedure -- must access the pg_procedure catalog to get
346  *                                 the ObjectId but this is optional
347  *       joinProcedure -- same as restrictionProcedure
348  * now either insert or replace the operator into the pg_operator catalog
349  * if the operator shell is being filled in
350  *       access the catalog in order to get a valid buffer
351  *       create a tuple using ModifyHeapTuple
352  *       get the t_self from the modified tuple and call RelationReplaceHeapTuple
353  * else if a new operator is being created
354  *       create a tuple using heap_formtuple
355  *       call simple_heap_insert
356  */
357 void
358 OperatorCreate(const char *operatorName,
359                            Oid operatorNamespace,
360                            Oid leftTypeId,
361                            Oid rightTypeId,
362                            List *procedureName,
363                            List *commutatorName,
364                            List *negatorName,
365                            List *restrictionName,
366                            List *joinName,
367                            bool canHash,
368                            List *leftSortName,
369                            List *rightSortName,
370                            List *ltCompareName,
371                            List *gtCompareName)
372 {
373         Relation        pg_operator_desc;
374         HeapTuple       tup;
375         char            nulls[Natts_pg_operator];
376         char            replaces[Natts_pg_operator];
377         Datum           values[Natts_pg_operator];
378         Oid                     operatorObjectId;
379         bool            operatorAlreadyDefined;
380         Oid                     procOid;
381         Oid                     operResultType;
382         Oid                     commutatorId,
383                                 negatorId,
384                                 leftSortId,
385                                 rightSortId,
386                                 ltCompareId,
387                                 gtCompareId,
388                                 restOid,
389                                 joinOid;
390         bool            selfCommutator = false;
391         Oid                     typeId[FUNC_MAX_ARGS];
392         int                     nargs;
393         NameData        oname;
394         TupleDesc       tupDesc;
395         int                     i;
396
397         /*
398          * Sanity checks
399          */
400         if (!validOperatorName(operatorName))
401                 elog(ERROR, "\"%s\" is not a valid operator name", operatorName);
402
403         if (!OidIsValid(leftTypeId) && !OidIsValid(rightTypeId))
404                 elog(ERROR, "at least one of leftarg or rightarg must be specified");
405
406         if (!(OidIsValid(leftTypeId) && OidIsValid(rightTypeId)))
407         {
408                 /* If it's not a binary op, these things mustn't be set: */
409                 if (commutatorName)
410                         elog(ERROR, "only binary operators can have commutators");
411                 if (joinName)
412                         elog(ERROR, "only binary operators can have join selectivity");
413                 if (canHash)
414                         elog(ERROR, "only binary operators can hash");
415                 if (leftSortName || rightSortName || ltCompareName || gtCompareName)
416                         elog(ERROR, "only binary operators can mergejoin");
417         }
418
419         operatorObjectId = OperatorGet(operatorName,
420                                                                    operatorNamespace,
421                                                                    leftTypeId,
422                                                                    rightTypeId,
423                                                                    &operatorAlreadyDefined);
424
425         if (operatorAlreadyDefined)
426                 elog(ERROR, "OperatorDef: operator \"%s\" already defined",
427                          operatorName);
428
429         /*
430          * At this point, if operatorObjectId is not InvalidOid then we are
431          * filling in a previously-created shell.
432          */
433
434         /*
435          * Look up registered procedures -- find the return type of
436          * procedureName to place in "result" field. Do this before shells are
437          * created so we don't have to worry about deleting them later.
438          */
439         MemSet(typeId, 0, FUNC_MAX_ARGS * sizeof(Oid));
440         if (!OidIsValid(leftTypeId))
441         {
442                 typeId[0] = rightTypeId;
443                 nargs = 1;
444         }
445         else if (!OidIsValid(rightTypeId))
446         {
447                 typeId[0] = leftTypeId;
448                 nargs = 1;
449         }
450         else
451         {
452                 typeId[0] = leftTypeId;
453                 typeId[1] = rightTypeId;
454                 nargs = 2;
455         }
456         procOid = LookupFuncName(procedureName, nargs, typeId);
457         if (!OidIsValid(procOid))
458                 func_error("OperatorDef", procedureName, nargs, typeId, NULL);
459         operResultType = get_func_rettype(procOid);
460
461         /*
462          * find restriction estimator
463          */
464         if (restrictionName)
465         {
466                 MemSet(typeId, 0, FUNC_MAX_ARGS * sizeof(Oid));
467                 typeId[0] = 0;                  /* Query (opaque type) */
468                 typeId[1] = OIDOID;             /* operator OID */
469                 typeId[2] = 0;                  /* args list (opaque type) */
470                 typeId[3] = INT4OID;    /* varRelid */
471
472                 restOid = LookupFuncName(restrictionName, 4, typeId);
473                 if (!OidIsValid(restOid))
474                         func_error("OperatorDef", restrictionName, 4, typeId, NULL);
475         }
476         else
477                 restOid = InvalidOid;
478
479         /*
480          * find join estimator
481          */
482         if (joinName)
483         {
484                 MemSet(typeId, 0, FUNC_MAX_ARGS * sizeof(Oid));
485                 typeId[0] = 0;                  /* Query (opaque type) */
486                 typeId[1] = OIDOID;             /* operator OID */
487                 typeId[2] = 0;                  /* args list (opaque type) */
488
489                 joinOid = LookupFuncName(joinName, 3, typeId);
490                 if (!OidIsValid(joinOid))
491                         func_error("OperatorDef", joinName, 3, typeId, NULL);
492         }
493         else
494                 joinOid = InvalidOid;
495
496         /*
497          * set up values in the operator tuple
498          */
499
500         for (i = 0; i < Natts_pg_operator; ++i)
501         {
502                 values[i] = (Datum) NULL;
503                 replaces[i] = 'r';
504                 nulls[i] = ' ';
505         }
506
507         i = 0;
508         namestrcpy(&oname, operatorName);
509         values[i++] = NameGetDatum(&oname);                     /* oprname */
510         values[i++] = ObjectIdGetDatum(operatorNamespace);      /* oprnamespace */
511         values[i++] = Int32GetDatum(GetUserId());               /* oprowner */
512         values[i++] = CharGetDatum(leftTypeId ? (rightTypeId ? 'b' : 'r') : 'l');       /* oprkind */
513         values[i++] = BoolGetDatum(canHash);                    /* oprcanhash */
514         values[i++] = ObjectIdGetDatum(leftTypeId);             /* oprleft */
515         values[i++] = ObjectIdGetDatum(rightTypeId);    /* oprright */
516         values[i++] = ObjectIdGetDatum(operResultType); /* oprresult */
517
518         /*
519          * Set up the other operators.  If they do not currently exist, create
520          * shells in order to get ObjectId's.
521          */
522
523         if (commutatorName)
524         {
525                 /* commutator has reversed arg types */
526                 commutatorId = get_other_operator(commutatorName,
527                                                                                   rightTypeId, leftTypeId,
528                                                                                   operatorName, operatorNamespace,
529                                                                                   leftTypeId, rightTypeId,
530                                                                                   true);
531                 /*
532                  * self-linkage to this operator; will fix below. Note
533                  * that only self-linkage for commutation makes sense.
534                  */
535                 if (!OidIsValid(commutatorId))
536                         selfCommutator = true;
537         }
538         else
539                 commutatorId = InvalidOid;
540         values[i++] = ObjectIdGetDatum(commutatorId);   /* oprcom */
541
542         if (negatorName)
543         {
544                 /* negator has same arg types */
545                 negatorId = get_other_operator(negatorName,
546                                                                            leftTypeId, rightTypeId,
547                                                                            operatorName, operatorNamespace,
548                                                                            leftTypeId, rightTypeId,
549                                                                            false);
550         }
551         else
552                 negatorId = InvalidOid;
553         values[i++] = ObjectIdGetDatum(negatorId);              /* oprnegate */
554
555         if (leftSortName)
556         {
557                 /* left sort op takes left-side data type */
558                 leftSortId = get_other_operator(leftSortName,
559                                                                            leftTypeId, leftTypeId,
560                                                                            operatorName, operatorNamespace,
561                                                                            leftTypeId, rightTypeId,
562                                                                            false);
563         }
564         else
565                 leftSortId = InvalidOid;
566         values[i++] = ObjectIdGetDatum(leftSortId);             /* oprlsortop */
567
568         if (rightSortName)
569         {
570                 /* right sort op takes right-side data type */
571                 rightSortId = get_other_operator(rightSortName,
572                                                                                  rightTypeId, rightTypeId,
573                                                                                  operatorName, operatorNamespace,
574                                                                                  leftTypeId, rightTypeId,
575                                                                                  false);
576         }
577         else
578                 rightSortId = InvalidOid;
579         values[i++] = ObjectIdGetDatum(rightSortId);    /* oprrsortop */
580
581         if (ltCompareName)
582         {
583                 /* comparator has same arg types */
584                 ltCompareId = get_other_operator(ltCompareName,
585                                                                                  leftTypeId, rightTypeId,
586                                                                                  operatorName, operatorNamespace,
587                                                                                  leftTypeId, rightTypeId,
588                                                                                  false);
589         }
590         else
591                 ltCompareId = InvalidOid;
592         values[i++] = ObjectIdGetDatum(ltCompareId);    /* oprltcmpop */
593
594         if (gtCompareName)
595         {
596                 /* comparator has same arg types */
597                 gtCompareId = get_other_operator(gtCompareName,
598                                                                                  leftTypeId, rightTypeId,
599                                                                                  operatorName, operatorNamespace,
600                                                                                  leftTypeId, rightTypeId,
601                                                                                  false);
602         }
603         else
604                 gtCompareId = InvalidOid;
605         values[i++] = ObjectIdGetDatum(gtCompareId);    /* oprgtcmpop */
606
607         values[i++] = ObjectIdGetDatum(procOid);                /* oprcode */
608         values[i++] = ObjectIdGetDatum(restOid);                /* oprrest */
609         values[i++] = ObjectIdGetDatum(joinOid);                /* oprjoin */
610
611         pg_operator_desc = heap_openr(OperatorRelationName, RowExclusiveLock);
612
613         /*
614          * If we are adding to an operator shell, update; else insert
615          */
616         if (operatorObjectId)
617         {
618                 tup = SearchSysCacheCopy(OPEROID,
619                                                                  ObjectIdGetDatum(operatorObjectId),
620                                                                  0, 0, 0);
621                 if (!HeapTupleIsValid(tup))
622                         elog(ERROR, "OperatorDef: operator %u not found",
623                                  operatorObjectId);
624
625                 tup = heap_modifytuple(tup,
626                                                            pg_operator_desc,
627                                                            values,
628                                                            nulls,
629                                                            replaces);
630
631                 simple_heap_update(pg_operator_desc, &tup->t_self, tup);
632         }
633         else
634         {
635                 tupDesc = pg_operator_desc->rd_att;
636                 tup = heap_formtuple(tupDesc, values, nulls);
637
638                 operatorObjectId = simple_heap_insert(pg_operator_desc, tup);
639         }
640
641         /* Must update the indexes in either case */
642         CatalogUpdateIndexes(pg_operator_desc, tup);
643
644         /* Add dependencies for the entry */
645         makeOperatorDependencies(tup, RelationGetRelid(pg_operator_desc));
646
647         heap_close(pg_operator_desc, RowExclusiveLock);
648
649         /*
650          * If a commutator and/or negator link is provided, update the other
651          * operator(s) to point at this one, if they don't already have a
652          * link. This supports an alternate style of operator definition
653          * wherein the user first defines one operator without giving negator
654          * or commutator, then defines the other operator of the pair with the
655          * proper commutator or negator attribute.      That style doesn't require
656          * creation of a shell, and it's the only style that worked right
657          * before Postgres version 6.5. This code also takes care of the
658          * situation where the new operator is its own commutator.
659          */
660         if (selfCommutator)
661                 commutatorId = operatorObjectId;
662
663         if (OidIsValid(commutatorId) || OidIsValid(negatorId))
664                 OperatorUpd(operatorObjectId, commutatorId, negatorId);
665 }
666
667 /*
668  * Try to lookup another operator (commutator, etc)
669  *
670  * If not found, check to see if it is exactly the operator we are trying
671  * to define; if so, return InvalidOid.  (Note that this case is only
672  * sensible for a commutator, so we error out otherwise.)  If it is not
673  * the same operator, create a shell operator.
674  */
675 static Oid
676 get_other_operator(List *otherOp, Oid otherLeftTypeId, Oid otherRightTypeId,
677                                    const char *operatorName, Oid operatorNamespace,
678                                    Oid leftTypeId, Oid rightTypeId, bool isCommutator)
679 {
680         Oid                     other_oid;
681         bool            otherDefined;
682         char       *otherName;
683         Oid                     otherNamespace;
684         AclResult       aclresult;
685
686         other_oid = OperatorLookup(otherOp,
687                                                            otherLeftTypeId,
688                                                            otherRightTypeId,
689                                                            &otherDefined);
690
691         if (OidIsValid(other_oid))
692         {
693                 /* other op already in catalogs */
694                 return other_oid;
695         }
696
697         otherNamespace = QualifiedNameGetCreationNamespace(otherOp,
698                                                                                                            &otherName);
699
700         if (strcmp(otherName, operatorName) == 0 &&
701                 otherNamespace == operatorNamespace &&
702                 otherLeftTypeId == leftTypeId &&
703                 otherRightTypeId == rightTypeId)
704         {
705                 /*
706                  * self-linkage to this operator; caller will fix later. Note
707                  * that only self-linkage for commutation makes sense.
708                  */
709                 if (!isCommutator)
710                         elog(ERROR, "operator cannot be its own negator or sort operator");
711                 return InvalidOid;
712         }
713
714         /* not in catalogs, different from operator, so make shell */
715
716         aclresult = pg_namespace_aclcheck(otherNamespace, GetUserId(),
717                                                                           ACL_CREATE);
718         if (aclresult != ACLCHECK_OK)
719                 aclcheck_error(aclresult, get_namespace_name(otherNamespace));
720
721         other_oid = OperatorShellMake(otherName,
722                                                                   otherNamespace,
723                                                                   otherLeftTypeId,
724                                                                   otherRightTypeId);
725         if (!OidIsValid(other_oid))
726                 elog(ERROR,
727                          "OperatorDef: can't create operator shell \"%s\"",
728                          NameListToString(otherOp));
729         return other_oid;
730 }
731
732 /*
733  * OperatorUpd
734  *
735  *      For a given operator, look up its negator and commutator operators.
736  *      If they are defined, but their negator and commutator fields
737  *      (respectively) are empty, then use the new operator for neg or comm.
738  *      This solves a problem for users who need to insert two new operators
739  *      which are the negator or commutator of each other.
740  */
741 static void
742 OperatorUpd(Oid baseId, Oid commId, Oid negId)
743 {
744         int                     i;
745         Relation        pg_operator_desc;
746         HeapTuple       tup;
747         char            nulls[Natts_pg_operator];
748         char            replaces[Natts_pg_operator];
749         Datum           values[Natts_pg_operator];
750
751         for (i = 0; i < Natts_pg_operator; ++i)
752         {
753                 values[i] = (Datum) 0;
754                 replaces[i] = ' ';
755                 nulls[i] = ' ';
756         }
757
758         /*
759          * check and update the commutator & negator, if necessary
760          *
761          * First make sure we can see them...
762          */
763         CommandCounterIncrement();
764
765         pg_operator_desc = heap_openr(OperatorRelationName, RowExclusiveLock);
766
767         tup = SearchSysCacheCopy(OPEROID,
768                                                          ObjectIdGetDatum(commId),
769                                                          0, 0, 0);
770
771         /*
772          * if the commutator and negator are the same operator, do one update.
773          * XXX this is probably useless code --- I doubt it ever makes sense
774          * for commutator and negator to be the same thing...
775          */
776         if (commId == negId)
777         {
778                 if (HeapTupleIsValid(tup))
779                 {
780                         Form_pg_operator t = (Form_pg_operator) GETSTRUCT(tup);
781
782                         if (!OidIsValid(t->oprcom) || !OidIsValid(t->oprnegate))
783                         {
784                                 if (!OidIsValid(t->oprnegate))
785                                 {
786                                         values[Anum_pg_operator_oprnegate - 1] = ObjectIdGetDatum(baseId);
787                                         replaces[Anum_pg_operator_oprnegate - 1] = 'r';
788                                 }
789
790                                 if (!OidIsValid(t->oprcom))
791                                 {
792                                         values[Anum_pg_operator_oprcom - 1] = ObjectIdGetDatum(baseId);
793                                         replaces[Anum_pg_operator_oprcom - 1] = 'r';
794                                 }
795
796                                 tup = heap_modifytuple(tup,
797                                                                            pg_operator_desc,
798                                                                            values,
799                                                                            nulls,
800                                                                            replaces);
801
802                                 simple_heap_update(pg_operator_desc, &tup->t_self, tup);
803
804                                 CatalogUpdateIndexes(pg_operator_desc, tup);
805                         }
806                 }
807
808                 heap_close(pg_operator_desc, RowExclusiveLock);
809
810                 return;
811         }
812
813         /* if commutator and negator are different, do two updates */
814
815         if (HeapTupleIsValid(tup) &&
816                 !(OidIsValid(((Form_pg_operator) GETSTRUCT(tup))->oprcom)))
817         {
818                 values[Anum_pg_operator_oprcom - 1] = ObjectIdGetDatum(baseId);
819                 replaces[Anum_pg_operator_oprcom - 1] = 'r';
820
821                 tup = heap_modifytuple(tup,
822                                                            pg_operator_desc,
823                                                            values,
824                                                            nulls,
825                                                            replaces);
826
827                 simple_heap_update(pg_operator_desc, &tup->t_self, tup);
828
829                 CatalogUpdateIndexes(pg_operator_desc, tup);
830
831                 values[Anum_pg_operator_oprcom - 1] = (Datum) NULL;
832                 replaces[Anum_pg_operator_oprcom - 1] = ' ';
833         }
834
835         /* check and update the negator, if necessary */
836
837         tup = SearchSysCacheCopy(OPEROID,
838                                                          ObjectIdGetDatum(negId),
839                                                          0, 0, 0);
840
841         if (HeapTupleIsValid(tup) &&
842                 !(OidIsValid(((Form_pg_operator) GETSTRUCT(tup))->oprnegate)))
843         {
844                 values[Anum_pg_operator_oprnegate - 1] = ObjectIdGetDatum(baseId);
845                 replaces[Anum_pg_operator_oprnegate - 1] = 'r';
846
847                 tup = heap_modifytuple(tup,
848                                                            pg_operator_desc,
849                                                            values,
850                                                            nulls,
851                                                            replaces);
852
853                 simple_heap_update(pg_operator_desc, &tup->t_self, tup);
854
855                 CatalogUpdateIndexes(pg_operator_desc, tup);
856         }
857
858         heap_close(pg_operator_desc, RowExclusiveLock);
859 }
860
861 /*
862  * Create dependencies for a new operator (either a freshly inserted
863  * complete operator, a new shell operator, or a just-updated shell).
864  *
865  * NB: the OidIsValid tests in this routine are necessary, in case
866  * the given operator is a shell.
867  */
868 static void
869 makeOperatorDependencies(HeapTuple tuple, Oid pg_operator_relid)
870 {
871         Form_pg_operator        oper = (Form_pg_operator) GETSTRUCT(tuple);
872         ObjectAddress   myself,
873                                         referenced;
874
875         myself.classId = pg_operator_relid;
876         myself.objectId = HeapTupleGetOid(tuple);
877         myself.objectSubId = 0;
878
879         /* In case we are updating a shell, delete any existing entries */
880         deleteDependencyRecordsFor(myself.classId, myself.objectId);
881
882         /* Dependency on namespace */
883         if (OidIsValid(oper->oprnamespace))
884         {
885                 referenced.classId = get_system_catalog_relid(NamespaceRelationName);
886                 referenced.objectId = oper->oprnamespace;
887                 referenced.objectSubId = 0;
888                 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
889         }
890
891         /* Dependency on left type */
892         if (OidIsValid(oper->oprleft))
893         {
894                 referenced.classId = RelOid_pg_type;
895                 referenced.objectId = oper->oprleft;
896                 referenced.objectSubId = 0;
897                 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
898         }
899
900         /* Dependency on right type */
901         if (OidIsValid(oper->oprright))
902         {
903                 referenced.classId = RelOid_pg_type;
904                 referenced.objectId = oper->oprright;
905                 referenced.objectSubId = 0;
906                 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
907         }
908
909         /* Dependency on result type */
910         if (OidIsValid(oper->oprresult))
911         {
912                 referenced.classId = RelOid_pg_type;
913                 referenced.objectId = oper->oprresult;
914                 referenced.objectSubId = 0;
915                 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
916         }
917
918         /*
919          * NOTE: we do not consider the operator to depend on the associated
920          * operators oprcom, oprnegate, oprlsortop, oprrsortop, oprltcmpop,
921          * oprgtcmpop.  We would not want to delete this operator if those
922          * go away, but only reset the link fields; which is not a function
923          * that the dependency code can presently handle.  (Something could
924          * perhaps be done with objectSubId though.)  For now, it's okay to
925          * let those links dangle if a referenced operator is removed.
926          */
927
928         /* Dependency on implementation function */
929         if (OidIsValid(oper->oprcode))
930         {
931                 referenced.classId = RelOid_pg_proc;
932                 referenced.objectId = oper->oprcode;
933                 referenced.objectSubId = 0;
934                 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
935         }
936
937         /* Dependency on restriction selectivity function */
938         if (OidIsValid(oper->oprrest))
939         {
940                 referenced.classId = RelOid_pg_proc;
941                 referenced.objectId = oper->oprrest;
942                 referenced.objectSubId = 0;
943                 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
944         }
945
946         /* Dependency on join selectivity function */
947         if (OidIsValid(oper->oprjoin))
948         {
949                 referenced.classId = RelOid_pg_proc;
950                 referenced.objectId = oper->oprjoin;
951                 referenced.objectSubId = 0;
952                 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
953         }
954 }