OSDN Git Service

Fix longstanding bug that kept functional indexes from working when you
[pg-rex/syncrep.git] / src / backend / commands / indexcmds.c
1 /*-------------------------------------------------------------------------
2  *
3  * indexcmds.c
4  *        POSTGRES define, extend and remove index code.
5  *
6  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/commands/indexcmds.c,v 1.22 2000/02/25 02:58:48 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15
16 #include "postgres.h"
17
18 #include "access/genam.h"
19 #include "access/heapam.h"
20 #include "catalog/catname.h"
21 #include "catalog/heap.h"
22 #include "catalog/index.h"
23 #include "catalog/pg_index.h"
24 #include "catalog/pg_opclass.h"
25 #include "catalog/pg_proc.h"
26 #include "catalog/pg_type.h"
27 #include "catalog/pg_database.h"
28 #include "catalog/pg_shadow.h"
29 #include "commands/defrem.h"
30 #include "optimizer/clauses.h"
31 #include "optimizer/planmain.h"
32 #include "optimizer/prep.h"
33 #include "parser/parsetree.h"
34 #include "parser/parse_func.h"
35 #include "utils/builtins.h"
36 #include "utils/syscache.h"
37 #include "miscadmin.h"  /* ReindexDatabase() */
38 #include "utils/portal.h" /* ReindexDatabase() */
39 #include "catalog/catalog.h" /* ReindexDatabase() */
40
41 #define IsFuncIndex(ATTR_LIST) (((IndexElem*)lfirst(ATTR_LIST))->args != NIL)
42
43 /* non-export function prototypes */
44 static void CheckPredicate(List *predList, List *rangeTable, Oid baseRelOid);
45 static void CheckPredExpr(Node *predicate, List *rangeTable, Oid baseRelOid);
46 static void CheckPredClause(Expr *predicate, List *rangeTable, Oid baseRelOid);
47 static void FuncIndexArgs(IndexElem *funcIndex, FuncIndexInfo *funcInfo,
48                                                   AttrNumber *attNumP, Oid *opOidP, Oid relId);
49 static void NormIndexAttrs(List *attList, AttrNumber *attNumP,
50                                                    Oid *opOidP, Oid relId);
51 static void ProcessAttrTypename(IndexElem *attribute,
52                                                                 Oid defType, int32 defTypmod);
53 static Oid      GetAttrOpClass(IndexElem *attribute, Oid attrType);
54 static char *GetDefaultOpClass(Oid atttypid);
55
56 /*
57  * DefineIndex
58  *              Creates a new index.
59  *
60  * 'attributeList' is a list of IndexElem specifying either a functional
61  *              index or a list of attributes to index on.
62  * 'parameterList' is a list of DefElem specified in the with clause.
63  * 'predicate' is the qual specified in the where clause.
64  * 'rangetable' is for the predicate
65  *
66  * Exceptions:
67  *              XXX
68  */
69 void
70 DefineIndex(char *heapRelationName,
71                         char *indexRelationName,
72                         char *accessMethodName,
73                         List *attributeList,
74                         List *parameterList,
75                         bool unique,
76                         bool primary,
77                         Expr *predicate,
78                         List *rangetable)
79 {
80         Oid                *classObjectId;
81         Oid                     accessMethodId;
82         Oid                     relationId;
83         int                     numberOfAttributes;
84         AttrNumber *attributeNumberA;
85         HeapTuple       tuple;
86         uint16          parameterCount = 0;
87         Datum      *parameterA = NULL;
88         FuncIndexInfo fInfo;
89         List       *cnfPred = NULL;
90         bool            lossy = FALSE;
91         List       *pl;
92
93         /*
94          * Handle attributes
95          */
96         numberOfAttributes = length(attributeList);
97         if (numberOfAttributes <= 0)
98                 elog(ERROR, "DefineIndex: must specify at least one attribute");
99         if (numberOfAttributes > INDEX_MAX_KEYS)
100                 elog(ERROR, "Cannot use more than %d attributes in an index",
101                          INDEX_MAX_KEYS);
102
103         /*
104          * compute heap relation id
105          */
106         if ((relationId = RelnameFindRelid(heapRelationName)) == InvalidOid)
107         {
108                 elog(ERROR, "DefineIndex: %s relation not found",
109                          heapRelationName);
110         }
111
112         if (unique && strcmp(accessMethodName, "btree") != 0)
113                 elog(ERROR, "DefineIndex: unique indices are only available with the btree access method");
114
115         if (numberOfAttributes > 1 && strcmp(accessMethodName, "btree") != 0)
116                 elog(ERROR, "DefineIndex: multi-column indices are only available with the btree access method");
117
118         /*
119          * compute access method id
120          */
121         tuple = SearchSysCacheTuple(AMNAME,
122                                                                 PointerGetDatum(accessMethodName),
123                                                                 0, 0, 0);
124         if (!HeapTupleIsValid(tuple))
125         {
126                 elog(ERROR, "DefineIndex: %s access method not found",
127                          accessMethodName);
128         }
129         accessMethodId = tuple->t_data->t_oid;
130
131         /*
132          * WITH clause reinstated to handle lossy indices. -- JMH, 7/22/96
133          */
134         foreach(pl, parameterList)
135         {
136                 DefElem *param = (DefElem *) lfirst(pl);
137
138                 if (!strcasecmp(param->defname, "islossy"))
139                         lossy = TRUE;
140                 else
141                         elog(NOTICE, "Unrecognized index attribute '%s' ignored",
142                                  param->defname);
143         }
144
145         /*
146          * Convert the partial-index predicate from parsetree form to plan
147          * form, so it can be readily evaluated during index creation. Note:
148          * "predicate" comes in as a list containing (1) the predicate itself
149          * (a where_clause), and (2) a corresponding range table.
150          *
151          * [(1) is 'predicate' and (2) is 'rangetable' now. - ay 10/94]
152          */
153         if (predicate != NULL && rangetable != NIL)
154         {
155                 cnfPred = cnfify((Expr *) copyObject(predicate), true);
156                 fix_opids((Node *) cnfPred);
157                 CheckPredicate(cnfPred, rangetable, relationId);
158         }
159
160         if (!IsBootstrapProcessingMode() && !IndexesAreActive(relationId, false))
161                 elog(ERROR, "existent indexes are inactive. REINDEX first");
162         if (IsFuncIndex(attributeList))
163         {
164                 IndexElem  *funcIndex = lfirst(attributeList);
165                 int                     nargs;
166
167                 nargs = length(funcIndex->args);
168                 if (nargs > INDEX_MAX_KEYS)
169                         elog(ERROR, "Index function can take at most %d arguments",
170                                  INDEX_MAX_KEYS);
171
172                 FIsetnArgs(&fInfo, nargs);
173
174                 namestrcpy(&fInfo.funcName, funcIndex->name);
175
176                 attributeNumberA = (AttrNumber *) palloc(nargs *
177                                                                                                  sizeof attributeNumberA[0]);
178
179                 classObjectId = (Oid *) palloc(sizeof(Oid));
180
181                 FuncIndexArgs(funcIndex, &fInfo, attributeNumberA,
182                                           classObjectId, relationId);
183
184                 index_create(heapRelationName,
185                                          indexRelationName,
186                                          &fInfo, NULL, accessMethodId,
187                                          numberOfAttributes, attributeNumberA,
188                                          classObjectId, parameterCount, parameterA,
189                                          (Node *) cnfPred,
190                                          lossy, unique, primary);
191         }
192         else
193         {
194                 attributeNumberA = (AttrNumber *) palloc(numberOfAttributes *
195                                                                                                  sizeof attributeNumberA[0]);
196
197                 classObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
198
199                 NormIndexAttrs(attributeList, attributeNumberA,
200                                            classObjectId, relationId);
201
202                 index_create(heapRelationName, indexRelationName, NULL,
203                                          attributeList,
204                                          accessMethodId, numberOfAttributes, attributeNumberA,
205                                          classObjectId, parameterCount, parameterA,
206                                          (Node *) cnfPred,
207                                          lossy, unique, primary);
208         }
209
210         setRelhasindexInplace(relationId, true, false);
211 }
212
213
214 /*
215  * ExtendIndex
216  *              Extends a partial index.
217  *
218  * Exceptions:
219  *              XXX
220  */
221 void
222 ExtendIndex(char *indexRelationName, Expr *predicate, List *rangetable)
223 {
224         Oid                *classObjectId;
225         Oid                     accessMethodId;
226         Oid                     indexId,
227                                 relationId;
228         Oid                     indproc;
229         int                     numberOfAttributes;
230         AttrNumber *attributeNumberA;
231         HeapTuple       tuple;
232         FuncIndexInfo fInfo;
233         FuncIndexInfo *funcInfo = NULL;
234         Form_pg_index index;
235         Node       *oldPred = NULL;
236         List       *cnfPred = NULL;
237         PredInfo   *predInfo;
238         Relation        heapRelation;
239         Relation        indexRelation;
240         int                     i;
241
242         /*
243          * compute index relation id and access method id
244          */
245         tuple = SearchSysCacheTuple(RELNAME,
246                                                                 PointerGetDatum(indexRelationName),
247                                                                 0, 0, 0);
248         if (!HeapTupleIsValid(tuple))
249         {
250                 elog(ERROR, "ExtendIndex: %s index not found",
251                          indexRelationName);
252         }
253         indexId = tuple->t_data->t_oid;
254         accessMethodId = ((Form_pg_class) GETSTRUCT(tuple))->relam;
255
256         /*
257          * find pg_index tuple
258          */
259         tuple = SearchSysCacheTuple(INDEXRELID,
260                                                                 ObjectIdGetDatum(indexId),
261                                                                 0, 0, 0);
262         if (!HeapTupleIsValid(tuple))
263         {
264                 elog(ERROR, "ExtendIndex: %s is not an index",
265                          indexRelationName);
266         }
267
268         /*
269          * Extract info from the pg_index tuple
270          */
271         index = (Form_pg_index) GETSTRUCT(tuple);
272         Assert(index->indexrelid == indexId);
273         relationId = index->indrelid;
274         indproc = index->indproc;
275
276         for (i = 0; i < INDEX_MAX_KEYS; i++)
277         {
278                 if (index->indkey[i] == InvalidAttrNumber)
279                         break;
280         }
281         numberOfAttributes = i;
282
283         if (VARSIZE(&index->indpred) != 0)
284         {
285                 char       *predString;
286
287                 predString = fmgr(F_TEXTOUT, &index->indpred);
288                 oldPred = stringToNode(predString);
289                 pfree(predString);
290         }
291         if (oldPred == NULL)
292                 elog(ERROR, "ExtendIndex: %s is not a partial index",
293                          indexRelationName);
294
295         /*
296          * Convert the extension predicate from parsetree form to plan form,
297          * so it can be readily evaluated during index creation. Note:
298          * "predicate" comes in as a list containing (1) the predicate itself
299          * (a where_clause), and (2) a corresponding range table.
300          */
301         if (rangetable != NIL)
302         {
303                 cnfPred = cnfify((Expr *) copyObject(predicate), true);
304                 fix_opids((Node *) cnfPred);
305                 CheckPredicate(cnfPred, rangetable, relationId);
306         }
307
308         /* make predInfo list to pass to index_build */
309         predInfo = (PredInfo *) palloc(sizeof(PredInfo));
310         predInfo->pred = (Node *) cnfPred;
311         predInfo->oldPred = oldPred;
312
313         attributeNumberA = (AttrNumber *) palloc(numberOfAttributes *
314                                                                                          sizeof attributeNumberA[0]);
315         classObjectId = (Oid *) palloc(numberOfAttributes * sizeof classObjectId[0]);
316
317
318         for (i = 0; i < numberOfAttributes; i++)
319         {
320                 attributeNumberA[i] = index->indkey[i];
321                 classObjectId[i] = index->indclass[i];
322         }
323
324         if (indproc != InvalidOid)
325         {
326                 funcInfo = &fInfo;
327                 FIsetnArgs(funcInfo, numberOfAttributes);
328
329                 tuple = SearchSysCacheTuple(PROCOID,
330                                                                         ObjectIdGetDatum(indproc),
331                                                                         0, 0, 0);
332                 if (!HeapTupleIsValid(tuple))
333                         elog(ERROR, "ExtendIndex: index procedure not found");
334
335                 namecpy(&(funcInfo->funcName),
336                                 &(((Form_pg_proc) GETSTRUCT(tuple))->proname));
337
338                 FIsetProcOid(funcInfo, tuple->t_data->t_oid);
339         }
340
341         heapRelation = heap_open(relationId, ShareLock);
342         indexRelation = index_open(indexId);
343
344         InitIndexStrategy(numberOfAttributes, indexRelation, accessMethodId);
345
346         index_build(heapRelation, indexRelation, numberOfAttributes,
347                                 attributeNumberA, 0, NULL, funcInfo, predInfo);
348
349         /* heap and index rels are closed as a side-effect of index_build */
350 }
351
352
353 /*
354  * CheckPredicate
355  *              Checks that the given list of partial-index predicates refer
356  *              (via the given range table) only to the given base relation oid,
357  *              and that they're in a form the planner can handle, i.e.,
358  *              boolean combinations of "ATTR OP CONST" (yes, for now, the ATTR
359  *              has to be on the left).
360  */
361
362 static void
363 CheckPredicate(List *predList, List *rangeTable, Oid baseRelOid)
364 {
365         List       *item;
366
367         foreach(item, predList)
368                 CheckPredExpr(lfirst(item), rangeTable, baseRelOid);
369 }
370
371 static void
372 CheckPredExpr(Node *predicate, List *rangeTable, Oid baseRelOid)
373 {
374         List       *clauses = NIL,
375                            *clause;
376
377         if (is_opclause(predicate))
378         {
379                 CheckPredClause((Expr *) predicate, rangeTable, baseRelOid);
380                 return;
381         }
382         else if (or_clause(predicate) || and_clause(predicate))
383                 clauses = ((Expr *) predicate)->args;
384         else
385                 elog(ERROR, "Unsupported partial-index predicate expression type");
386
387         foreach(clause, clauses)
388                 CheckPredExpr(lfirst(clause), rangeTable, baseRelOid);
389 }
390
391 static void
392 CheckPredClause(Expr *predicate, List *rangeTable, Oid baseRelOid)
393 {
394         Var                *pred_var;
395         Const      *pred_const;
396
397         pred_var = (Var *) get_leftop(predicate);
398         pred_const = (Const *) get_rightop(predicate);
399
400         if (!IsA(predicate->oper, Oper) ||
401                 !IsA(pred_var, Var) ||
402                 !IsA(pred_const, Const))
403                 elog(ERROR, "Unsupported partial-index predicate clause type");
404
405         if (getrelid(pred_var->varno, rangeTable) != baseRelOid)
406                 elog(ERROR,
407                  "Partial-index predicates may refer only to the base relation");
408 }
409
410
411 static void
412 FuncIndexArgs(IndexElem *funcIndex,
413                           FuncIndexInfo *funcInfo,
414                           AttrNumber *attNumP,
415                           Oid *opOidP,
416                           Oid relId)
417 {
418         List       *rest;
419         HeapTuple       tuple;
420         Oid                     retType;
421         int                     argn = 0;
422
423         /*
424          * process the function arguments, which are a list of T_String
425          * (someday ought to allow more general expressions?)
426          */
427         MemSet(funcInfo->arglist, 0, FUNC_MAX_ARGS * sizeof(Oid));
428
429         foreach(rest, funcIndex->args)
430         {
431                 char       *arg = strVal(lfirst(rest));
432                 Form_pg_attribute att;
433
434                 tuple = SearchSysCacheTuple(ATTNAME,
435                                                                         ObjectIdGetDatum(relId),
436                                                                         PointerGetDatum(arg), 0, 0);
437
438                 if (!HeapTupleIsValid(tuple))
439                         elog(ERROR, "DefineIndex: attribute \"%s\" not found", arg);
440                 att = (Form_pg_attribute) GETSTRUCT(tuple);
441                 *attNumP++ = att->attnum;
442                 funcInfo->arglist[argn++] = att->atttypid;
443         }
444
445         /* ----------------
446          * Lookup the function procedure to get its OID and result type.
447          * ----------------
448          */
449         tuple = SearchSysCacheTuple(PROCNAME,
450                                                                 PointerGetDatum(FIgetname(funcInfo)),
451                                                                 Int32GetDatum(FIgetnArgs(funcInfo)),
452                                                                 PointerGetDatum(FIgetArglist(funcInfo)),
453                                                                 0);
454
455         if (!HeapTupleIsValid(tuple))
456         {
457                 func_error("DefineIndex", FIgetname(funcInfo),
458                                    FIgetnArgs(funcInfo), FIgetArglist(funcInfo), NULL);
459         }
460
461         FIsetProcOid(funcInfo, tuple->t_data->t_oid);
462         retType = ((Form_pg_proc) GETSTRUCT(tuple))->prorettype;
463
464         /* Process type and opclass, using func return type as default */
465
466         ProcessAttrTypename(funcIndex, retType, -1);
467
468         *opOidP = GetAttrOpClass(funcIndex, retType);
469 }
470
471 static void
472 NormIndexAttrs(List *attList,   /* list of IndexElem's */
473                            AttrNumber *attNumP,
474                            Oid *classOidP,
475                            Oid relId)
476 {
477         List       *rest;
478
479         /*
480          * process attributeList
481          */
482         foreach(rest, attList)
483         {
484                 IndexElem  *attribute = lfirst(rest);
485                 HeapTuple       atttuple;
486                 Form_pg_attribute attform;
487
488                 if (attribute->name == NULL)
489                         elog(ERROR, "missing attribute for define index");
490
491                 atttuple = SearchSysCacheTupleCopy(ATTNAME,
492                                                                                    ObjectIdGetDatum(relId),
493                                                                                    PointerGetDatum(attribute->name),
494                                                                                    0, 0);
495                 if (!HeapTupleIsValid(atttuple))
496                         elog(ERROR, "DefineIndex: attribute \"%s\" not found",
497                                  attribute->name);
498                 attform = (Form_pg_attribute) GETSTRUCT(atttuple);
499
500                 *attNumP++ = attform->attnum;
501
502                 ProcessAttrTypename(attribute, attform->atttypid, attform->atttypmod);
503
504                 *classOidP++ = GetAttrOpClass(attribute, attform->atttypid);
505
506                 heap_freetuple(atttuple);
507         }
508 }
509
510 static void
511 ProcessAttrTypename(IndexElem *attribute,
512                                         Oid defType, int32 defTypmod)
513 {
514         HeapTuple       tuple;
515
516         /* build a type node so we can set the proper alignment, etc. */
517         if (attribute->typename == NULL)
518         {
519                 tuple = SearchSysCacheTuple(TYPEOID,
520                                                                         ObjectIdGetDatum(defType),
521                                                                         0, 0, 0);
522                 if (!HeapTupleIsValid(tuple))
523                         elog(ERROR, "DefineIndex: type for attribute '%s' undefined",
524                                  attribute->name);
525
526                 attribute->typename = makeNode(TypeName);
527                 attribute->typename->name = nameout(&((Form_pg_type) GETSTRUCT(tuple))->typname);
528                 attribute->typename->typmod = defTypmod;
529         }
530 }
531
532 static Oid
533 GetAttrOpClass(IndexElem *attribute, Oid attrType)
534 {
535         HeapTuple       tuple;
536
537         if (attribute->class == NULL)
538         {
539                 /* no operator class specified, so find the default */
540                 attribute->class = GetDefaultOpClass(attrType);
541                 if (attribute->class == NULL)
542                         elog(ERROR, "Can't find a default operator class for type %u",
543                                  attrType);
544         }
545
546         tuple = SearchSysCacheTuple(CLANAME,
547                                                                 PointerGetDatum(attribute->class),
548                                                                 0, 0, 0);
549
550         if (!HeapTupleIsValid(tuple))
551                 elog(ERROR, "DefineIndex: %s opclass not found",
552                          attribute->class);
553
554         return tuple->t_data->t_oid;
555 }
556
557 static char *
558 GetDefaultOpClass(Oid atttypid)
559 {
560         HeapTuple       tuple;
561
562         tuple = SearchSysCacheTuple(CLADEFTYPE,
563                                                                 ObjectIdGetDatum(atttypid),
564                                                                 0, 0, 0);
565         if (!HeapTupleIsValid(tuple))
566                 return 0;
567
568         return nameout(&((Form_pg_opclass) GETSTRUCT(tuple))->opcname);
569 }
570
571 /*
572  * RemoveIndex
573  *              Deletes an index.
574  *
575  * Exceptions:
576  *              BadArg if name is invalid.
577  *              "WARN" if index nonexistent.
578  *              ...
579  */
580 void
581 RemoveIndex(char *name)
582 {
583         HeapTuple       tuple;
584
585         tuple = SearchSysCacheTuple(RELNAME,
586                                                                 PointerGetDatum(name),
587                                                                 0, 0, 0);
588
589         if (!HeapTupleIsValid(tuple))
590                 elog(ERROR, "index \"%s\" nonexistent", name);
591
592         if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_INDEX)
593         {
594                 elog(ERROR, "relation \"%s\" is of type \"%c\"",
595                          name,
596                          ((Form_pg_class) GETSTRUCT(tuple))->relkind);
597         }
598
599         index_drop(tuple->t_data->t_oid);
600 }
601
602 /*
603  * Reindex
604  *              Recreate an index.
605  *
606  * Exceptions:
607  *              "ERROR" if index nonexistent.
608  *              ...
609  */
610 void
611 ReindexIndex(const char *name, bool force /* currently unused */)
612 {
613         HeapTuple       tuple;
614
615         tuple = SearchSysCacheTuple(RELNAME,
616                                                                 PointerGetDatum(name),
617                                                                 0, 0, 0);
618
619         if (!HeapTupleIsValid(tuple))
620                 elog(ERROR, "index \"%s\" nonexistent", name);
621
622         if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_INDEX)
623         {
624                 elog(ERROR, "relation \"%s\" is of type \"%c\"",
625                          name,
626                          ((Form_pg_class) GETSTRUCT(tuple))->relkind);
627         }
628
629         reindex_index(tuple->t_data->t_oid, force);
630 }
631
632 /*
633  * ReindexTable
634  *              Recreate indexes of a table.
635  *
636  * Exceptions:
637  *              "ERROR" if table nonexistent.
638  *              ...
639  */
640 void
641 ReindexTable(const char *name, bool force)
642 {
643         HeapTuple       tuple;
644
645         tuple = SearchSysCacheTuple(RELNAME,
646                                                                 PointerGetDatum(name),
647                                                                 0, 0, 0);
648
649         if (!HeapTupleIsValid(tuple))
650                 elog(ERROR, "table \"%s\" nonexistent", name);
651
652         if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_RELATION)
653         {
654                 elog(ERROR, "relation \"%s\" is of type \"%c\"",
655                          name,
656                          ((Form_pg_class) GETSTRUCT(tuple))->relkind);
657         }
658
659         reindex_relation(tuple->t_data->t_oid, force);
660 }
661
662 /*
663  * ReindexDatabase
664  *              Recreate indexes of a database.
665  *
666  * Exceptions:
667  *              "ERROR" if table nonexistent.
668  *              ...
669  */
670 extern Oid MyDatabaseId;
671 void
672 ReindexDatabase(const char *dbname, bool force, bool all)
673 {
674         Relation        relation, relationRelation;
675         HeapTuple       usertuple, dbtuple, tuple;
676         HeapScanDesc    scan;
677         int4            user_id, db_owner;
678         bool            superuser;
679         Oid             db_id;
680         char            *username;
681         ScanKeyData     scankey;
682         PortalVariableMemory    pmem;
683         MemoryContext   old;
684         int             relcnt, relalc, i, oncealc = 200;
685         Oid             *relids = (Oid *) NULL;
686
687         AssertArg(dbname);
688
689         username = GetPgUserName();
690         usertuple = SearchSysCacheTuple(SHADOWNAME, PointerGetDatum(username),
691                                 0, 0, 0);
692         if (!HeapTupleIsValid(usertuple))
693                 elog(ERROR, "Current user '%s' is invalid.", username);
694         user_id = ((Form_pg_shadow) GETSTRUCT(usertuple))->usesysid;
695         superuser = ((Form_pg_shadow) GETSTRUCT(usertuple))->usesuper;
696
697         relation = heap_openr(DatabaseRelationName, AccessShareLock);
698         ScanKeyEntryInitialize(&scankey, 0, Anum_pg_database_datname,
699                         F_NAMEEQ, NameGetDatum(dbname));
700         scan = heap_beginscan(relation, 0, SnapshotNow, 1, &scankey);
701         dbtuple = heap_getnext(scan, 0);
702         if (!HeapTupleIsValid(dbtuple))
703                 elog(ERROR, "Database '%s' doesn't exist", dbname);
704         db_id = dbtuple->t_data->t_oid;
705         db_owner = ((Form_pg_database) GETSTRUCT(dbtuple))->datdba;
706         heap_endscan(scan);
707         if (user_id != db_owner && !superuser)
708                 elog(ERROR, "REINDEX DATABASE: Permission denied.");
709
710         if (db_id != MyDatabaseId)
711                 elog(ERROR, "REINDEX DATABASE: Can be executed only on the currently open database.");
712
713         heap_close(relation, NoLock);
714         /** reindex_database(db_id, force, !all); **/
715
716         CommonSpecialPortalOpen();
717         pmem = CommonSpecialPortalGetMemory();
718         relationRelation = heap_openr(RelationRelationName, AccessShareLock);
719         scan = heap_beginscan(relationRelation, false, SnapshotNow, 0, NULL);
720         relcnt = relalc = 0;
721         while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
722         {
723                 if (!all)
724                 {
725                         if (!IsSystemRelationName(NameStr(((Form_pg_class) GETSTRUCT(tuple))->relname)))
726                                 continue;
727                         if (((Form_pg_class) GETSTRUCT(tuple))->relhasrules)
728                                 continue;
729                 }
730                 if (((Form_pg_class) GETSTRUCT(tuple))->relkind == RELKIND_RELATION)
731                 {
732                         old = MemoryContextSwitchTo((MemoryContext) pmem);
733                         if (relcnt == 0)
734                         {
735                                 relalc = oncealc;
736                                 relids = palloc(sizeof(Oid) * relalc);
737                         }
738                         else if (relcnt >= relalc)
739                         {
740                                 relalc *= 2;
741                                 relids = repalloc(relids, sizeof(Oid) * relalc);
742                         }
743                         MemoryContextSwitchTo(old);
744                         relids[relcnt] = tuple->t_data->t_oid;
745                         relcnt++;
746                 }
747         }
748         heap_endscan(scan);
749         heap_close(relationRelation, AccessShareLock);
750
751         CommitTransactionCommand();
752         for (i = 0; i < relcnt; i++)
753         {
754                 StartTransactionCommand();
755                 reindex_relation(relids[i], force);
756                 CommitTransactionCommand();
757         }
758         CommonSpecialPortalClose();
759         StartTransactionCommand();
760 }