OSDN Git Service

Commit old versions into main branch again.
[pg-rex/syncrep.git] / src / backend / commands / indexcmds.c
1 /*-------------------------------------------------------------------------
2  *
3  * indexcmds.c
4  *        POSTGRES define and remove index code.
5  *
6  * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/commands/indexcmds.c,v 1.75 2002/06/20 20:29:27 momjian Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15
16 #include "postgres.h"
17
18 #include "access/heapam.h"
19 #include "catalog/catalog.h"
20 #include "catalog/catname.h"
21 #include "catalog/index.h"
22 #include "catalog/namespace.h"
23 #include "catalog/pg_opclass.h"
24 #include "catalog/pg_proc.h"
25 #include "commands/defrem.h"
26 #include "miscadmin.h"
27 #include "optimizer/clauses.h"
28 #include "optimizer/planmain.h"
29 #include "optimizer/prep.h"
30 #include "parser/parsetree.h"
31 #include "parser/parse_coerce.h"
32 #include "parser/parse_func.h"
33 #include "utils/acl.h"
34 #include "utils/builtins.h"
35 #include "utils/lsyscache.h"
36 #include "utils/syscache.h"
37
38
39 #define IsFuncIndex(ATTR_LIST) (((IndexElem*)lfirst(ATTR_LIST))->funcname != NIL)
40
41 /* non-export function prototypes */
42 static void CheckPredicate(List *predList, List *rangeTable, Oid baseRelOid);
43 static void FuncIndexArgs(IndexInfo *indexInfo, Oid *classOidP,
44                           IndexElem *funcIndex,
45                           Oid relId,
46                           char *accessMethodName, Oid accessMethodId);
47 static void NormIndexAttrs(IndexInfo *indexInfo, Oid *classOidP,
48                            List *attList,
49                            Oid relId,
50                            char *accessMethodName, Oid accessMethodId);
51 static Oid GetAttrOpClass(IndexElem *attribute, Oid attrType,
52                            char *accessMethodName, Oid accessMethodId);
53 static Oid      GetDefaultOpClass(Oid attrType, Oid accessMethodId);
54
55 /*
56  * DefineIndex
57  *              Creates a new index.
58  *
59  * 'attributeList' is a list of IndexElem specifying either a functional
60  *              index or a list of attributes to index on.
61  * 'predicate' is the qual specified in the where clause.
62  * 'rangetable' is needed to interpret the predicate.
63  */
64 void
65 DefineIndex(RangeVar *heapRelation,
66                         char *indexRelationName,
67                         char *accessMethodName,
68                         List *attributeList,
69                         bool unique,
70                         bool primary,
71                         Expr *predicate,
72                         List *rangetable)
73 {
74         Oid                *classObjectId;
75         Oid                     accessMethodId;
76         Oid                     relationId;
77         Oid                     namespaceId;
78         Relation        rel;
79         HeapTuple       tuple;
80         Form_pg_am      accessMethodForm;
81         IndexInfo  *indexInfo;
82         int                     numberOfAttributes;
83         List       *cnfPred = NIL;
84
85         /*
86          * count attributes in index
87          */
88         numberOfAttributes = length(attributeList);
89         if (numberOfAttributes <= 0)
90                 elog(ERROR, "DefineIndex: must specify at least one attribute");
91         if (numberOfAttributes > INDEX_MAX_KEYS)
92                 elog(ERROR, "Cannot use more than %d attributes in an index",
93                          INDEX_MAX_KEYS);
94
95         /*
96          * Open heap relation, acquire a suitable lock on it, remember its OID
97          */
98         rel = heap_openrv(heapRelation, ShareLock);
99
100         /* Note: during bootstrap may see uncataloged relation */
101         if (rel->rd_rel->relkind != RELKIND_RELATION &&
102                 rel->rd_rel->relkind != RELKIND_UNCATALOGED)
103                 elog(ERROR, "DefineIndex: relation \"%s\" is not a table",
104                          heapRelation->relname);
105
106         relationId = RelationGetRelid(rel);
107         namespaceId = RelationGetNamespace(rel);
108
109         if (!IsBootstrapProcessingMode() &&
110                 IsSystemRelation(rel) &&
111                 !IndexesAreActive(relationId, false))
112                 elog(ERROR, "Existing indexes are inactive. REINDEX first");
113
114         heap_close(rel, NoLock);
115
116         /*
117          * Verify we (still) have CREATE rights in the rel's namespace.
118          * (Presumably we did when the rel was created, but maybe not anymore.)
119          * Skip check if bootstrapping, since permissions machinery may not
120          * be working yet; also, always allow if it's a temp table.
121          */
122         if (!IsBootstrapProcessingMode() && !isTempNamespace(namespaceId))
123         {
124                 AclResult       aclresult;
125
126                 aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(),
127                                                                                   ACL_CREATE);
128                 if (aclresult != ACLCHECK_OK)
129                         aclcheck_error(aclresult, get_namespace_name(namespaceId));
130         }
131
132         /*
133          * look up the access method, verify it can handle the requested
134          * features
135          */
136         tuple = SearchSysCache(AMNAME,
137                                                    PointerGetDatum(accessMethodName),
138                                                    0, 0, 0);
139         if (!HeapTupleIsValid(tuple))
140                 elog(ERROR, "DefineIndex: access method \"%s\" not found",
141                          accessMethodName);
142         accessMethodId = tuple->t_data->t_oid;
143         accessMethodForm = (Form_pg_am) GETSTRUCT(tuple);
144
145         if (unique && !accessMethodForm->amcanunique)
146                 elog(ERROR, "DefineIndex: access method \"%s\" does not support UNIQUE indexes",
147                          accessMethodName);
148         if (numberOfAttributes > 1 && !accessMethodForm->amcanmulticol)
149                 elog(ERROR, "DefineIndex: access method \"%s\" does not support multi-column indexes",
150                          accessMethodName);
151
152         ReleaseSysCache(tuple);
153
154         /*
155          * Convert the partial-index predicate from parsetree form to an
156          * implicit-AND qual expression, for easier evaluation at runtime.
157          * While we are at it, we reduce it to a canonical (CNF or DNF) form
158          * to simplify the task of proving implications.
159          */
160         if (predicate != NULL && rangetable != NIL)
161         {
162                 cnfPred = canonicalize_qual((Expr *) copyObject(predicate), true);
163                 fix_opids((Node *) cnfPred);
164                 CheckPredicate(cnfPred, rangetable, relationId);
165         }
166
167         /*
168          * Prepare arguments for index_create, primarily an IndexInfo
169          * structure
170          */
171         indexInfo = makeNode(IndexInfo);
172         indexInfo->ii_Predicate = cnfPred;
173         indexInfo->ii_FuncOid = InvalidOid;
174         indexInfo->ii_Unique = unique;
175
176         if (IsFuncIndex(attributeList))
177         {
178                 IndexElem  *funcIndex = (IndexElem *) lfirst(attributeList);
179                 int                     nargs;
180
181                 /* Parser should have given us only one list item, but check */
182                 if (numberOfAttributes != 1)
183                         elog(ERROR, "Functional index can only have one attribute");
184
185                 nargs = length(funcIndex->args);
186                 if (nargs > INDEX_MAX_KEYS)
187                         elog(ERROR, "Index function can take at most %d arguments",
188                                  INDEX_MAX_KEYS);
189
190                 indexInfo->ii_NumIndexAttrs = 1;
191                 indexInfo->ii_NumKeyAttrs = nargs;
192
193                 classObjectId = (Oid *) palloc(sizeof(Oid));
194
195                 FuncIndexArgs(indexInfo, classObjectId, funcIndex,
196                                           relationId, accessMethodName, accessMethodId);
197         }
198         else
199         {
200                 indexInfo->ii_NumIndexAttrs = numberOfAttributes;
201                 indexInfo->ii_NumKeyAttrs = numberOfAttributes;
202
203                 classObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
204
205                 NormIndexAttrs(indexInfo, classObjectId, attributeList,
206                                            relationId, accessMethodName, accessMethodId);
207         }
208
209         index_create(relationId, indexRelationName,
210                                  indexInfo, accessMethodId, classObjectId,
211                                  primary, allowSystemTableMods);
212
213         /*
214          * We update the relation's pg_class tuple even if it already has
215          * relhasindex = true.  This is needed to cause a shared-cache-inval
216          * message to be sent for the pg_class tuple, which will cause other
217          * backends to flush their relcache entries and in particular their
218          * cached lists of the indexes for this relation.
219          */
220         setRelhasindex(relationId, true, primary, InvalidOid);
221 }
222
223
224 /*
225  * CheckPredicate
226  *              Checks that the given list of partial-index predicates refer
227  *              (via the given range table) only to the given base relation oid.
228  *
229  * This used to also constrain the form of the predicate to forms that
230  * indxpath.c could do something with.  However, that seems overly
231  * restrictive.  One useful application of partial indexes is to apply
232  * a UNIQUE constraint across a subset of a table, and in that scenario
233  * any evaluatable predicate will work.  So accept any predicate here
234  * (except ones requiring a plan), and let indxpath.c fend for itself.
235  */
236
237 static void
238 CheckPredicate(List *predList, List *rangeTable, Oid baseRelOid)
239 {
240         if (length(rangeTable) != 1 || getrelid(1, rangeTable) != baseRelOid)
241                 elog(ERROR,
242                  "Partial-index predicates may refer only to the base relation");
243
244         /*
245          * We don't currently support generation of an actual query plan for a
246          * predicate, only simple scalar expressions; hence these
247          * restrictions.
248          */
249         if (contain_subplans((Node *) predList))
250                 elog(ERROR, "Cannot use subselect in index predicate");
251         if (contain_agg_clause((Node *) predList))
252                 elog(ERROR, "Cannot use aggregate in index predicate");
253
254         /*
255          * A predicate using mutable functions is probably wrong, for the
256          * same reasons that we don't allow a functional index to use one.
257          */
258         if (contain_mutable_functions((Node *) predList))
259                 elog(ERROR, "Functions in index predicate must be marked isImmutable");
260 }
261
262
263 static void
264 FuncIndexArgs(IndexInfo *indexInfo,
265                           Oid *classOidP,
266                           IndexElem *funcIndex,
267                           Oid relId,
268                           char *accessMethodName,
269                           Oid accessMethodId)
270 {
271         Oid                     argTypes[FUNC_MAX_ARGS];
272         List       *arglist;
273         int                     nargs = 0;
274         int                     i;
275         FuncDetailCode fdresult;
276         Oid                     funcid;
277         Oid                     rettype;
278         bool            retset;
279         Oid                *true_typeids;
280
281         /*
282          * process the function arguments, which are a list of T_String
283          * (someday ought to allow more general expressions?)
284          *
285          * Note caller already checked that list is not too long.
286          */
287         MemSet(argTypes, 0, sizeof(argTypes));
288
289         foreach(arglist, funcIndex->args)
290         {
291                 char       *arg = strVal(lfirst(arglist));
292                 HeapTuple       tuple;
293                 Form_pg_attribute att;
294
295                 tuple = SearchSysCache(ATTNAME,
296                                                            ObjectIdGetDatum(relId),
297                                                            PointerGetDatum(arg),
298                                                            0, 0);
299                 if (!HeapTupleIsValid(tuple))
300                         elog(ERROR, "DefineIndex: attribute \"%s\" not found", arg);
301                 att = (Form_pg_attribute) GETSTRUCT(tuple);
302                 indexInfo->ii_KeyAttrNumbers[nargs] = att->attnum;
303                 argTypes[nargs] = att->atttypid;
304                 ReleaseSysCache(tuple);
305                 nargs++;
306         }
307
308         /*
309          * Lookup the function procedure to get its OID and result type.
310          *
311          * We rely on parse_func.c to find the correct function in the possible
312          * presence of binary-compatible types.  However, parse_func may do
313          * too much: it will accept a function that requires run-time coercion
314          * of input types, and the executor is not currently set up to support
315          * that.  So, check to make sure that the selected function has
316          * exact-match or binary-compatible input types.
317          */
318         fdresult = func_get_detail(funcIndex->funcname, funcIndex->args,
319                                                            nargs, argTypes,
320                                                            &funcid, &rettype, &retset,
321                                                            &true_typeids);
322         if (fdresult != FUNCDETAIL_NORMAL)
323         {
324                 if (fdresult == FUNCDETAIL_AGGREGATE)
325                         elog(ERROR, "DefineIndex: functional index may not use an aggregate function");
326                 else if (fdresult == FUNCDETAIL_COERCION)
327                         elog(ERROR, "DefineIndex: functional index must use a real function, not a type coercion"
328                                  "\n\tTry specifying the index opclass you want to use, instead");
329                 else
330                         func_error("DefineIndex", funcIndex->funcname, nargs, argTypes,
331                                            NULL);
332         }
333
334         if (retset)
335                 elog(ERROR, "DefineIndex: cannot index on a function returning a set");
336
337         for (i = 0; i < nargs; i++)
338         {
339                 if (!IsBinaryCompatible(argTypes[i], true_typeids[i]))
340                         func_error("DefineIndex", funcIndex->funcname, nargs, argTypes,
341                                            "Index function must be binary-compatible with table datatype");
342         }
343
344         /*
345          * Require that the function be marked immutable.  Using a mutable
346          * function for a functional index is highly questionable, since if
347          * you aren't going to get the same result for the same data every
348          * time, it's not clear what the index entries mean at all.
349          */
350         if (func_volatile(funcid) != PROVOLATILE_IMMUTABLE)
351                 elog(ERROR, "DefineIndex: index function must be marked isImmutable");
352
353         /* Process opclass, using func return type as default type */
354
355         classOidP[0] = GetAttrOpClass(funcIndex, rettype,
356                                                                   accessMethodName, accessMethodId);
357
358         /* OK, return results */
359
360         indexInfo->ii_FuncOid = funcid;
361         /* Need to do the fmgr function lookup now, too */
362         fmgr_info(funcid, &indexInfo->ii_FuncInfo);
363 }
364
365 static void
366 NormIndexAttrs(IndexInfo *indexInfo,
367                            Oid *classOidP,
368                            List *attList,       /* list of IndexElem's */
369                            Oid relId,
370                            char *accessMethodName,
371                            Oid accessMethodId)
372 {
373         List       *rest;
374         int                     attn = 0;
375
376         /*
377          * process attributeList
378          */
379         foreach(rest, attList)
380         {
381                 IndexElem  *attribute = (IndexElem *) lfirst(rest);
382                 HeapTuple       atttuple;
383                 Form_pg_attribute attform;
384
385                 if (attribute->name == NULL)
386                         elog(ERROR, "missing attribute for define index");
387
388                 atttuple = SearchSysCache(ATTNAME,
389                                                                   ObjectIdGetDatum(relId),
390                                                                   PointerGetDatum(attribute->name),
391                                                                   0, 0);
392                 if (!HeapTupleIsValid(atttuple))
393                         elog(ERROR, "DefineIndex: attribute \"%s\" not found",
394                                  attribute->name);
395                 attform = (Form_pg_attribute) GETSTRUCT(atttuple);
396
397                 indexInfo->ii_KeyAttrNumbers[attn] = attform->attnum;
398
399                 classOidP[attn] = GetAttrOpClass(attribute, attform->atttypid,
400                                                                            accessMethodName, accessMethodId);
401
402                 ReleaseSysCache(atttuple);
403                 attn++;
404         }
405 }
406
407 static Oid
408 GetAttrOpClass(IndexElem *attribute, Oid attrType,
409                            char *accessMethodName, Oid accessMethodId)
410 {
411         char       *catalogname;
412         char       *schemaname = NULL;
413         char       *opcname = NULL;
414         HeapTuple       tuple;
415         Oid                     opClassId,
416                                 opInputType;
417
418         if (attribute->opclass == NIL)
419         {
420                 /* no operator class specified, so find the default */
421                 opClassId = GetDefaultOpClass(attrType, accessMethodId);
422                 if (!OidIsValid(opClassId))
423                         elog(ERROR, "data type %s has no default operator class for access method \"%s\""
424                                  "\n\tYou must specify an operator class for the index or define a"
425                                  "\n\tdefault operator class for the data type",
426                                  format_type_be(attrType), accessMethodName);
427                 return opClassId;
428         }
429
430         /*
431          * Specific opclass name given, so look up the opclass.
432          */
433
434         /* deconstruct the name list */
435         switch (length(attribute->opclass))
436         {
437                 case 1:
438                         opcname = strVal(lfirst(attribute->opclass));
439                         break;
440                 case 2:
441                         schemaname = strVal(lfirst(attribute->opclass));
442                         opcname = strVal(lsecond(attribute->opclass));
443                         break;
444                 case 3:
445                         catalogname = strVal(lfirst(attribute->opclass));
446                         schemaname = strVal(lsecond(attribute->opclass));
447                         opcname = strVal(lfirst(lnext(lnext(attribute->opclass))));
448                         /*
449                          * We check the catalog name and then ignore it.
450                          */
451                         if (strcmp(catalogname, DatabaseName) != 0)
452                                 elog(ERROR, "Cross-database references are not implemented");
453                         break;
454                 default:
455                         elog(ERROR, "Improper opclass name (too many dotted names): %s",
456                                  NameListToString(attribute->opclass));
457                         break;
458         }
459
460         if (schemaname)
461         {
462                 /* Look in specific schema only */
463                 Oid             namespaceId;
464
465                 namespaceId = GetSysCacheOid(NAMESPACENAME,
466                                                                          CStringGetDatum(schemaname),
467                                                                          0, 0, 0);
468                 if (!OidIsValid(namespaceId))
469                         elog(ERROR, "Namespace \"%s\" does not exist",
470                                  schemaname);
471                 tuple = SearchSysCache(CLAAMNAMENSP,
472                                                            ObjectIdGetDatum(accessMethodId),
473                                                            PointerGetDatum(opcname),
474                                                            ObjectIdGetDatum(namespaceId),
475                                                            0);
476         }
477         else
478         {
479                 /* Unqualified opclass name, so search the search path */
480                 opClassId = OpclassnameGetOpcid(accessMethodId, opcname);
481                 if (!OidIsValid(opClassId))
482                         elog(ERROR, "DefineIndex: operator class \"%s\" not supported by access method \"%s\"",
483                                  opcname, accessMethodName);
484                 tuple = SearchSysCache(CLAOID,
485                                                            ObjectIdGetDatum(opClassId),
486                                                            0, 0, 0);
487         }
488
489         if (!HeapTupleIsValid(tuple))
490                 elog(ERROR, "DefineIndex: operator class \"%s\" not supported by access method \"%s\"",
491                          NameListToString(attribute->opclass), accessMethodName);
492
493         /*
494          * Verify that the index operator class accepts this
495          * datatype.  Note we will accept binary compatibility.
496          */
497         opClassId = tuple->t_data->t_oid;
498         opInputType = ((Form_pg_opclass) GETSTRUCT(tuple))->opcintype;
499
500         if (!IsBinaryCompatible(attrType, opInputType))
501                 elog(ERROR, "operator class \"%s\" does not accept data type %s",
502                          NameListToString(attribute->opclass), format_type_be(attrType));
503
504         ReleaseSysCache(tuple);
505
506         return opClassId;
507 }
508
509 static Oid
510 GetDefaultOpClass(Oid attrType, Oid accessMethodId)
511 {
512         OpclassCandidateList opclass;
513         int                     nexact = 0;
514         int                     ncompatible = 0;
515         Oid                     exactOid = InvalidOid;
516         Oid                     compatibleOid = InvalidOid;
517
518         /*
519          * We scan through all the opclasses available for the access method,
520          * looking for one that is marked default and matches the target type
521          * (either exactly or binary-compatibly, but prefer an exact match).
522          *
523          * We could find more than one binary-compatible match, in which case we
524          * require the user to specify which one he wants.      If we find more
525          * than one exact match, then someone put bogus entries in pg_opclass.
526          *
527          * The initial search is done by namespace.c so that we only consider
528          * opclasses visible in the current namespace search path.
529          */
530         for (opclass = OpclassGetCandidates(accessMethodId);
531                  opclass != NULL;
532                  opclass = opclass->next)
533         {
534                 if (opclass->opcdefault)
535                 {
536                         if (opclass->opcintype == attrType)
537                         {
538                                 nexact++;
539                                 exactOid = opclass->oid;
540                         }
541                         else if (IsBinaryCompatible(opclass->opcintype, attrType))
542                         {
543                                 ncompatible++;
544                                 compatibleOid = opclass->oid;
545                         }
546                 }
547         }
548
549         if (nexact == 1)
550                 return exactOid;
551         if (nexact != 0)
552                 elog(ERROR, "pg_opclass contains multiple default opclasses for data type %s",
553                          format_type_be(attrType));
554         if (ncompatible == 1)
555                 return compatibleOid;
556
557         return InvalidOid;
558 }
559
560 /*
561  * RemoveIndex
562  *              Deletes an index.
563  *
564  * Exceptions:
565  *              BadArg if name is invalid.
566  *              "ERROR" if index nonexistent.
567  *              ...
568  */
569 void
570 RemoveIndex(RangeVar *relation)
571 {
572         Oid                     indOid;
573         HeapTuple       tuple;
574
575         indOid = RangeVarGetRelid(relation, false);
576         tuple = SearchSysCache(RELOID,
577                                                    ObjectIdGetDatum(indOid),
578                                                    0, 0, 0);
579         if (!HeapTupleIsValid(tuple))
580                 elog(ERROR, "index \"%s\" does not exist", relation->relname);
581
582         if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_INDEX)
583                 elog(ERROR, "relation \"%s\" is of type \"%c\"",
584                          relation->relname, ((Form_pg_class) GETSTRUCT(tuple))->relkind);
585
586         ReleaseSysCache(tuple);
587
588         index_drop(indOid);
589 }
590
591 /*
592  * Reindex
593  *              Recreate an index.
594  *
595  * Exceptions:
596  *              "ERROR" if index nonexistent.
597  *              ...
598  */
599 void
600 ReindexIndex(RangeVar *indexRelation, bool force /* currently unused */ )
601 {
602         Oid                     indOid;
603         HeapTuple       tuple;
604         bool            overwrite = false;
605
606         /*
607          * REINDEX within a transaction block is dangerous, because if the
608          * transaction is later rolled back we have no way to undo truncation
609          * of the index's physical file.  Disallow it.
610          */
611         if (IsTransactionBlock())
612                 elog(ERROR, "REINDEX cannot run inside a BEGIN/END block");
613
614         indOid = RangeVarGetRelid(indexRelation, false);
615         tuple = SearchSysCache(RELOID,
616                                                    ObjectIdGetDatum(indOid),
617                                                    0, 0, 0);
618         if (!HeapTupleIsValid(tuple))
619                 elog(ERROR, "index \"%s\" does not exist", indexRelation->relname);
620
621         if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_INDEX)
622                 elog(ERROR, "relation \"%s\" is of type \"%c\"",
623                          indexRelation->relname,
624                          ((Form_pg_class) GETSTRUCT(tuple))->relkind);
625
626         if (IsSystemClass((Form_pg_class) GETSTRUCT(tuple)))
627         {
628                 if (!allowSystemTableMods)
629                         elog(ERROR, "\"%s\" is a system index. call REINDEX under standalone postgres with -O -P options",
630                                  indexRelation->relname);
631                 if (!IsIgnoringSystemIndexes())
632                         elog(ERROR, "\"%s\" is a system index. call REINDEX under standalone postgres with -P -O options",
633                                  indexRelation->relname);
634         }
635
636         ReleaseSysCache(tuple);
637
638         if (IsIgnoringSystemIndexes())
639                 overwrite = true;
640         if (!reindex_index(indOid, force, overwrite))
641                 elog(WARNING, "index \"%s\" wasn't reindexed", indexRelation->relname);
642 }
643
644 /*
645  * ReindexTable
646  *              Recreate indexes of a table.
647  *
648  * Exceptions:
649  *              "ERROR" if table nonexistent.
650  *              ...
651  */
652 void
653 ReindexTable(RangeVar *relation, bool force)
654 {
655         Oid                     heapOid;
656         HeapTuple       tuple;
657
658         /*
659          * REINDEX within a transaction block is dangerous, because if the
660          * transaction is later rolled back we have no way to undo truncation
661          * of the index's physical file.  Disallow it.
662          */
663         if (IsTransactionBlock())
664                 elog(ERROR, "REINDEX cannot run inside a BEGIN/END block");
665
666         heapOid = RangeVarGetRelid(relation, false);
667         tuple = SearchSysCache(RELOID,
668                                                    ObjectIdGetDatum(heapOid),
669                                                    0, 0, 0);
670         if (!HeapTupleIsValid(tuple))
671                 elog(ERROR, "table \"%s\" does not exist", relation->relname);
672
673         if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_RELATION)
674                 elog(ERROR, "relation \"%s\" is of type \"%c\"",
675                          relation->relname,
676                          ((Form_pg_class) GETSTRUCT(tuple))->relkind);
677
678         ReleaseSysCache(tuple);
679
680         if (!reindex_relation(heapOid, force))
681                 elog(WARNING, "table \"%s\" wasn't reindexed", relation->relname);
682 }
683
684 /*
685  * ReindexDatabase
686  *              Recreate indexes of a database.
687  */
688 void
689 ReindexDatabase(const char *dbname, bool force, bool all)
690 {
691         Relation        relationRelation;
692         HeapScanDesc scan;
693         HeapTuple       tuple;
694         MemoryContext private_context;
695         MemoryContext old;
696         int                     relcnt,
697                                 relalc,
698                                 i,
699                                 oncealc = 200;
700         Oid                *relids = (Oid *) NULL;
701
702         AssertArg(dbname);
703
704         if (strcmp(dbname, DatabaseName) != 0)
705                 elog(ERROR, "REINDEX DATABASE: Can be executed only on the currently open database.");
706
707         if (!(superuser() || is_dbadmin(MyDatabaseId)))
708                 elog(ERROR, "REINDEX DATABASE: Permission denied.");
709
710         if (!allowSystemTableMods)
711                 elog(ERROR, "must be called under standalone postgres with -O -P options");
712         if (!IsIgnoringSystemIndexes())
713                 elog(ERROR, "must be called under standalone postgres with -P -O options");
714
715         /*
716          * We cannot run inside a user transaction block; if we were inside a
717          * transaction, then our commit- and start-transaction-command calls
718          * would not have the intended effect!
719          */
720         if (IsTransactionBlock())
721                 elog(ERROR, "REINDEX DATABASE cannot run inside a BEGIN/END block");
722
723         /*
724          * Create a memory context that will survive forced transaction
725          * commits we do below.  Since it is a child of QueryContext, it will
726          * go away eventually even if we suffer an error; there's no need for
727          * special abort cleanup logic.
728          */
729         private_context = AllocSetContextCreate(QueryContext,
730                                                                                         "ReindexDatabase",
731                                                                                         ALLOCSET_DEFAULT_MINSIZE,
732                                                                                         ALLOCSET_DEFAULT_INITSIZE,
733                                                                                         ALLOCSET_DEFAULT_MAXSIZE);
734
735         /*
736          * Scan pg_class to build a list of the relations we need to reindex.
737          */
738         relationRelation = heap_openr(RelationRelationName, AccessShareLock);
739         scan = heap_beginscan(relationRelation, SnapshotNow, 0, NULL);
740         relcnt = relalc = 0;
741         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
742         {
743                 if (!all)
744                 {
745                         if (!IsSystemClass((Form_pg_class) GETSTRUCT(tuple)))
746                                 continue;
747                 }
748                 if (((Form_pg_class) GETSTRUCT(tuple))->relkind == RELKIND_RELATION)
749                 {
750                         old = MemoryContextSwitchTo(private_context);
751                         if (relcnt == 0)
752                         {
753                                 relalc = oncealc;
754                                 relids = palloc(sizeof(Oid) * relalc);
755                         }
756                         else if (relcnt >= relalc)
757                         {
758                                 relalc *= 2;
759                                 relids = repalloc(relids, sizeof(Oid) * relalc);
760                         }
761                         MemoryContextSwitchTo(old);
762                         relids[relcnt] = tuple->t_data->t_oid;
763                         relcnt++;
764                 }
765         }
766         heap_endscan(scan);
767         heap_close(relationRelation, AccessShareLock);
768
769         /* Now reindex each rel in a separate transaction */
770         CommitTransactionCommand();
771         for (i = 0; i < relcnt; i++)
772         {
773                 StartTransactionCommand();
774                 if (reindex_relation(relids[i], force))
775                         elog(WARNING, "relation %u was reindexed", relids[i]);
776                 CommitTransactionCommand();
777         }
778         StartTransactionCommand();
779
780         MemoryContextDelete(private_context);
781 }